use std::collections::BTreeMap;
use crate::replay::replay_trace;
use crate::runtime::{WorkflowEventKind, WorkflowRetryEvent, WorkflowRunResult};
use crate::trace::{TraceEventKind, TraceTerminalStatus, WorkflowTrace};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NodeTimelineEntry {
pub index: usize,
pub step: usize,
pub node_id: String,
pub event: String,
pub details: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RetryReasonSummary {
pub node_id: String,
pub operation: String,
pub retries: usize,
pub reasons: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplayTraceInspection {
pub valid: bool,
pub total_events: usize,
pub terminal_status: Option<TraceTerminalStatus>,
pub violations: Vec<String>,
}
pub fn node_timeline(result: &WorkflowRunResult) -> Vec<NodeTimelineEntry> {
result
.events
.iter()
.enumerate()
.map(|(index, event)| {
let (label, details) = match &event.kind {
WorkflowEventKind::NodeStarted => ("node_started", None),
WorkflowEventKind::NodeCompleted { data } => {
("node_completed", Some(format!("{:?}", data)))
}
WorkflowEventKind::NodeFailed { message } => ("node_failed", Some(message.clone())),
};
NodeTimelineEntry {
index,
step: event.step,
node_id: event.node_id.clone(),
event: label.to_string(),
details,
}
})
.collect()
}
pub fn retry_reason_summary(retry_events: &[WorkflowRetryEvent]) -> Vec<RetryReasonSummary> {
let mut grouped: BTreeMap<(String, String), Vec<String>> = BTreeMap::new();
for retry in retry_events {
grouped
.entry((retry.node_id.clone(), retry.operation.clone()))
.or_default()
.push(retry.reason.clone());
}
grouped
.into_iter()
.map(|((node_id, operation), reasons)| {
let mut unique = reasons;
unique.sort();
unique.dedup();
RetryReasonSummary {
node_id,
operation,
retries: unique.len(),
reasons: unique,
}
})
.collect()
}
pub fn inspect_replay_trace(trace: &WorkflowTrace) -> ReplayTraceInspection {
let terminal_status = trace.events.iter().find_map(|event| match event.kind {
TraceEventKind::Terminal { status } => Some(status),
_ => None,
});
match replay_trace(trace) {
Ok(report) => ReplayTraceInspection {
valid: true,
total_events: report.total_events,
terminal_status: Some(report.terminal_status),
violations: Vec::new(),
},
Err(error) => ReplayTraceInspection {
valid: false,
total_events: trace.events.len(),
terminal_status,
violations: error
.violations
.iter()
.map(|violation| violation.message.clone())
.collect(),
},
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use crate::runtime::{NodeExecutionData, WorkflowEvent, WorkflowRetryEvent};
use crate::trace::{TraceEvent, TraceEventKind, WorkflowTrace, WorkflowTraceMetadata};
use super::*;
#[test]
fn builds_timeline_entries() {
let result = WorkflowRunResult {
workflow_name: "wf".to_string(),
terminal_node_id: "end".to_string(),
node_executions: Vec::new(),
events: vec![
WorkflowEvent {
step: 0,
node_id: "start".to_string(),
kind: WorkflowEventKind::NodeStarted,
},
WorkflowEvent {
step: 0,
node_id: "start".to_string(),
kind: WorkflowEventKind::NodeCompleted {
data: NodeExecutionData::Start {
next: "end".to_string(),
},
},
},
],
retry_events: Vec::new(),
node_outputs: Default::default(),
trace: None,
replay_report: None,
};
let timeline = node_timeline(&result);
assert_eq!(timeline.len(), 2);
assert_eq!(timeline[0].event, "node_started");
assert_eq!(timeline[1].event, "node_completed");
}
#[test]
fn aggregates_retry_reasons() {
let summary = retry_reason_summary(&[
WorkflowRetryEvent {
step: 1,
node_id: "llm".to_string(),
operation: "llm".to_string(),
failed_attempt: 1,
reason: "attempt 1 timed out after 10 ms".to_string(),
},
WorkflowRetryEvent {
step: 1,
node_id: "llm".to_string(),
operation: "llm".to_string(),
failed_attempt: 2,
reason: "upstream overloaded".to_string(),
},
]);
assert_eq!(summary.len(), 1);
assert_eq!(summary[0].node_id, "llm");
assert_eq!(summary[0].retries, 2);
}
#[test]
fn inspects_replay_trace() {
let trace = WorkflowTrace {
metadata: WorkflowTraceMetadata {
trace_id: "trace".to_string(),
workflow_name: "wf".to_string(),
workflow_version: "v0".to_string(),
started_at_unix_ms: 0,
finished_at_unix_ms: Some(1),
},
events: vec![
TraceEvent {
seq: 0,
timestamp_unix_ms: 0,
kind: TraceEventKind::NodeEnter {
node_id: "start".to_string(),
},
},
TraceEvent {
seq: 1,
timestamp_unix_ms: 1,
kind: TraceEventKind::NodeExit {
node_id: "start".to_string(),
},
},
TraceEvent {
seq: 2,
timestamp_unix_ms: 2,
kind: TraceEventKind::Terminal {
status: TraceTerminalStatus::Completed,
},
},
],
};
let inspection = inspect_replay_trace(&trace);
assert!(inspection.valid);
assert_eq!(inspection.total_events, 3);
assert_eq!(
inspection.terminal_status,
Some(TraceTerminalStatus::Completed)
);
assert!(inspection.violations.is_empty());
}
#[test]
fn keeps_invalid_trace_violations() {
let trace = WorkflowTrace {
metadata: WorkflowTraceMetadata {
trace_id: "trace".to_string(),
workflow_name: "wf".to_string(),
workflow_version: "v0".to_string(),
started_at_unix_ms: 0,
finished_at_unix_ms: Some(1),
},
events: vec![TraceEvent {
seq: 0,
timestamp_unix_ms: 0,
kind: TraceEventKind::NodeEnter {
node_id: "start".to_string(),
},
}],
};
let inspection = inspect_replay_trace(&trace);
assert!(!inspection.valid);
assert!(!inspection.violations.is_empty());
let _ = json!(inspection.violations);
}
}