Skip to main content

dataflow_rs/engine/
trace.rs

1//! # Execution Trace Module
2//!
3//! This module provides step-by-step execution tracing for debugging workflows.
4//! It captures message snapshots after each step, including which workflows/tasks
5//! were executed or skipped.
6
7use crate::engine::message::Message;
8use serde::{Deserialize, Serialize};
9
10/// Result of executing a step (workflow or task)
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "lowercase")]
13pub enum StepResult {
14    /// The step was executed
15    Executed,
16    /// The step was skipped due to condition being false
17    Skipped,
18}
19
20/// A single step in the execution trace
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ExecutionStep {
23    /// ID of the workflow this step belongs to
24    pub workflow_id: String,
25    /// ID of the task (None for workflow-level skips)
26    pub task_id: Option<String>,
27    /// Result of the step execution
28    pub result: StepResult,
29    /// Message snapshot after this step (only for Executed steps)
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub message: Option<Message>,
32}
33
34impl ExecutionStep {
35    /// Create a new executed step with a message snapshot
36    pub fn executed(workflow_id: &str, task_id: &str, message: &Message) -> Self {
37        Self {
38            workflow_id: workflow_id.to_string(),
39            task_id: Some(task_id.to_string()),
40            result: StepResult::Executed,
41            message: Some(message.clone()),
42        }
43    }
44
45    /// Create a skipped task step
46    pub fn task_skipped(workflow_id: &str, task_id: &str) -> Self {
47        Self {
48            workflow_id: workflow_id.to_string(),
49            task_id: Some(task_id.to_string()),
50            result: StepResult::Skipped,
51            message: None,
52        }
53    }
54
55    /// Create a skipped workflow step
56    pub fn workflow_skipped(workflow_id: &str) -> Self {
57        Self {
58            workflow_id: workflow_id.to_string(),
59            task_id: None,
60            result: StepResult::Skipped,
61            message: None,
62        }
63    }
64}
65
66/// Complete execution trace containing all steps
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct ExecutionTrace {
69    /// All execution steps in order
70    pub steps: Vec<ExecutionStep>,
71}
72
73impl ExecutionTrace {
74    /// Create a new empty execution trace
75    pub fn new() -> Self {
76        Self { steps: Vec::new() }
77    }
78
79    /// Add a step to the trace
80    pub fn add_step(&mut self, step: ExecutionStep) {
81        self.steps.push(step);
82    }
83
84    /// Get the final message (from the last executed step)
85    pub fn final_message(&self) -> Option<&Message> {
86        self.steps
87            .iter()
88            .rev()
89            .find(|s| s.result == StepResult::Executed)
90            .and_then(|s| s.message.as_ref())
91    }
92
93    /// Check if execution was successful (no errors in final message)
94    pub fn is_success(&self) -> bool {
95        self.final_message()
96            .map(|m| m.errors.is_empty())
97            .unwrap_or(true)
98    }
99
100    /// Get number of executed steps
101    pub fn executed_count(&self) -> usize {
102        self.steps
103            .iter()
104            .filter(|s| s.result == StepResult::Executed)
105            .count()
106    }
107
108    /// Get number of skipped steps
109    pub fn skipped_count(&self) -> usize {
110        self.steps
111            .iter()
112            .filter(|s| s.result == StepResult::Skipped)
113            .count()
114    }
115}
116
117impl Default for ExecutionTrace {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use serde_json::json;
127
128    #[test]
129    fn test_step_result_serialization() {
130        assert_eq!(
131            serde_json::to_string(&StepResult::Executed).unwrap(),
132            "\"executed\""
133        );
134        assert_eq!(
135            serde_json::to_string(&StepResult::Skipped).unwrap(),
136            "\"skipped\""
137        );
138    }
139
140    #[test]
141    fn test_execution_step_executed() {
142        let message = Message::from_value(&json!({"test": "data"}));
143        let step = ExecutionStep::executed("workflow1", "task1", &message);
144
145        assert_eq!(step.workflow_id, "workflow1");
146        assert_eq!(step.task_id, Some("task1".to_string()));
147        assert_eq!(step.result, StepResult::Executed);
148        assert!(step.message.is_some());
149    }
150
151    #[test]
152    fn test_execution_step_task_skipped() {
153        let step = ExecutionStep::task_skipped("workflow1", "task1");
154
155        assert_eq!(step.workflow_id, "workflow1");
156        assert_eq!(step.task_id, Some("task1".to_string()));
157        assert_eq!(step.result, StepResult::Skipped);
158        assert!(step.message.is_none());
159    }
160
161    #[test]
162    fn test_execution_step_workflow_skipped() {
163        let step = ExecutionStep::workflow_skipped("workflow1");
164
165        assert_eq!(step.workflow_id, "workflow1");
166        assert_eq!(step.task_id, None);
167        assert_eq!(step.result, StepResult::Skipped);
168        assert!(step.message.is_none());
169    }
170
171    #[test]
172    fn test_execution_trace() {
173        let mut trace = ExecutionTrace::new();
174        let message = Message::from_value(&json!({"test": "data"}));
175
176        trace.add_step(ExecutionStep::workflow_skipped("workflow0"));
177        trace.add_step(ExecutionStep::executed("workflow1", "task1", &message));
178        trace.add_step(ExecutionStep::task_skipped("workflow1", "task2"));
179
180        assert_eq!(trace.steps.len(), 3);
181        assert_eq!(trace.executed_count(), 1);
182        assert_eq!(trace.skipped_count(), 2);
183        assert!(trace.final_message().is_some());
184        assert!(trace.is_success());
185    }
186}