1use std::collections::BTreeMap;
2
3use crate::replay::replay_trace;
4use crate::runtime::{WorkflowEventKind, WorkflowRetryEvent, WorkflowRunResult};
5use crate::trace::{TraceEventKind, TraceTerminalStatus, WorkflowTrace};
6
7#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct NodeTimelineEntry {
10 pub index: usize,
12 pub step: usize,
14 pub node_id: String,
16 pub event: String,
18 pub details: Option<String>,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct RetryReasonSummary {
25 pub node_id: String,
27 pub operation: String,
29 pub retries: usize,
31 pub reasons: Vec<String>,
33}
34
35#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct ReplayTraceInspection {
38 pub valid: bool,
40 pub total_events: usize,
42 pub terminal_status: Option<TraceTerminalStatus>,
44 pub violations: Vec<String>,
46}
47
48pub fn node_timeline(result: &WorkflowRunResult) -> Vec<NodeTimelineEntry> {
50 result
51 .events
52 .iter()
53 .enumerate()
54 .map(|(index, event)| {
55 let (label, details) = match &event.kind {
56 WorkflowEventKind::NodeStarted => ("node_started", None),
57 WorkflowEventKind::NodeCompleted { data } => {
58 ("node_completed", Some(format!("{:?}", data)))
59 }
60 WorkflowEventKind::NodeFailed { message } => ("node_failed", Some(message.clone())),
61 };
62
63 NodeTimelineEntry {
64 index,
65 step: event.step,
66 node_id: event.node_id.clone(),
67 event: label.to_string(),
68 details,
69 }
70 })
71 .collect()
72}
73
74pub fn retry_reason_summary(retry_events: &[WorkflowRetryEvent]) -> Vec<RetryReasonSummary> {
76 let mut grouped: BTreeMap<(String, String), Vec<String>> = BTreeMap::new();
77 for retry in retry_events {
78 grouped
79 .entry((retry.node_id.clone(), retry.operation.clone()))
80 .or_default()
81 .push(retry.reason.clone());
82 }
83
84 grouped
85 .into_iter()
86 .map(|((node_id, operation), reasons)| {
87 let mut unique = reasons;
88 unique.sort();
89 unique.dedup();
90 RetryReasonSummary {
91 node_id,
92 operation,
93 retries: unique.len(),
94 reasons: unique,
95 }
96 })
97 .collect()
98}
99
100pub fn inspect_replay_trace(trace: &WorkflowTrace) -> ReplayTraceInspection {
102 let terminal_status = trace.events.iter().find_map(|event| match event.kind {
103 TraceEventKind::Terminal { status } => Some(status),
104 _ => None,
105 });
106
107 match replay_trace(trace) {
108 Ok(report) => ReplayTraceInspection {
109 valid: true,
110 total_events: report.total_events,
111 terminal_status: Some(report.terminal_status),
112 violations: Vec::new(),
113 },
114 Err(error) => ReplayTraceInspection {
115 valid: false,
116 total_events: trace.events.len(),
117 terminal_status,
118 violations: error
119 .violations
120 .iter()
121 .map(|violation| violation.message.clone())
122 .collect(),
123 },
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use serde_json::json;
130
131 use crate::runtime::{NodeExecutionData, WorkflowEvent, WorkflowRetryEvent};
132 use crate::trace::{TraceEvent, TraceEventKind, WorkflowTrace, WorkflowTraceMetadata};
133
134 use super::*;
135
136 #[test]
137 fn builds_timeline_entries() {
138 let result = WorkflowRunResult {
139 workflow_name: "wf".to_string(),
140 terminal_node_id: "end".to_string(),
141 node_executions: Vec::new(),
142 events: vec![
143 WorkflowEvent {
144 step: 0,
145 node_id: "start".to_string(),
146 kind: WorkflowEventKind::NodeStarted,
147 },
148 WorkflowEvent {
149 step: 0,
150 node_id: "start".to_string(),
151 kind: WorkflowEventKind::NodeCompleted {
152 data: NodeExecutionData::Start {
153 next: "end".to_string(),
154 },
155 },
156 },
157 ],
158 retry_events: Vec::new(),
159 node_outputs: Default::default(),
160 trace: None,
161 replay_report: None,
162 };
163
164 let timeline = node_timeline(&result);
165 assert_eq!(timeline.len(), 2);
166 assert_eq!(timeline[0].event, "node_started");
167 assert_eq!(timeline[1].event, "node_completed");
168 }
169
170 #[test]
171 fn aggregates_retry_reasons() {
172 let summary = retry_reason_summary(&[
173 WorkflowRetryEvent {
174 step: 1,
175 node_id: "llm".to_string(),
176 operation: "llm".to_string(),
177 failed_attempt: 1,
178 reason: "attempt 1 timed out after 10 ms".to_string(),
179 },
180 WorkflowRetryEvent {
181 step: 1,
182 node_id: "llm".to_string(),
183 operation: "llm".to_string(),
184 failed_attempt: 2,
185 reason: "upstream overloaded".to_string(),
186 },
187 ]);
188
189 assert_eq!(summary.len(), 1);
190 assert_eq!(summary[0].node_id, "llm");
191 assert_eq!(summary[0].retries, 2);
192 }
193
194 #[test]
195 fn inspects_replay_trace() {
196 let trace = WorkflowTrace {
197 metadata: WorkflowTraceMetadata {
198 trace_id: "trace".to_string(),
199 workflow_name: "wf".to_string(),
200 workflow_version: "v0".to_string(),
201 started_at_unix_ms: 0,
202 finished_at_unix_ms: Some(1),
203 },
204 events: vec![
205 TraceEvent {
206 seq: 0,
207 timestamp_unix_ms: 0,
208 kind: TraceEventKind::NodeEnter {
209 node_id: "start".to_string(),
210 },
211 },
212 TraceEvent {
213 seq: 1,
214 timestamp_unix_ms: 1,
215 kind: TraceEventKind::NodeExit {
216 node_id: "start".to_string(),
217 },
218 },
219 TraceEvent {
220 seq: 2,
221 timestamp_unix_ms: 2,
222 kind: TraceEventKind::Terminal {
223 status: TraceTerminalStatus::Completed,
224 },
225 },
226 ],
227 };
228
229 let inspection = inspect_replay_trace(&trace);
230 assert!(inspection.valid);
231 assert_eq!(inspection.total_events, 3);
232 assert_eq!(
233 inspection.terminal_status,
234 Some(TraceTerminalStatus::Completed)
235 );
236 assert!(inspection.violations.is_empty());
237 }
238
239 #[test]
240 fn keeps_invalid_trace_violations() {
241 let trace = WorkflowTrace {
242 metadata: WorkflowTraceMetadata {
243 trace_id: "trace".to_string(),
244 workflow_name: "wf".to_string(),
245 workflow_version: "v0".to_string(),
246 started_at_unix_ms: 0,
247 finished_at_unix_ms: Some(1),
248 },
249 events: vec![TraceEvent {
250 seq: 0,
251 timestamp_unix_ms: 0,
252 kind: TraceEventKind::NodeEnter {
253 node_id: "start".to_string(),
254 },
255 }],
256 };
257
258 let inspection = inspect_replay_trace(&trace);
259 assert!(!inspection.valid);
260 assert!(!inspection.violations.is_empty());
261 let _ = json!(inspection.violations);
262 }
263}