Skip to main content

dataflow_rs/engine/
trace.rs

1//! # Execution Trace Module
2//!
3//! This module provides step-by-step execution tracing for debugging workflows.
4//! It captures message snapshots after each step, including which workflows/tasks
5//! were executed or skipped.
6
7use crate::engine::message::Message;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11/// Result of executing a step (workflow or task)
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "lowercase")]
14pub enum StepResult {
15    /// The step was executed
16    Executed,
17    /// The step was skipped due to condition being false
18    Skipped,
19}
20
21/// A single step in the execution trace
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ExecutionStep {
24    /// ID of the workflow this step belongs to
25    pub workflow_id: String,
26    /// ID of the task (None for workflow-level skips)
27    pub task_id: Option<String>,
28    /// Result of the step execution
29    pub result: StepResult,
30    /// Message snapshot after this step (only for Executed steps)
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub message: Option<Message>,
33    /// Context snapshots before each mapping (map tasks only, trace mode only).
34    /// mapping_contexts[i] = message.context before mapping[i] executed.
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub mapping_contexts: Option<Vec<Value>>,
37}
38
39impl ExecutionStep {
40    /// Create a new executed step with a message snapshot
41    pub fn executed(workflow_id: &str, task_id: &str, message: &Message) -> Self {
42        Self {
43            workflow_id: workflow_id.to_string(),
44            task_id: Some(task_id.to_string()),
45            result: StepResult::Executed,
46            message: Some(message.clone()),
47            mapping_contexts: None,
48        }
49    }
50
51    /// Create a skipped task step
52    pub fn task_skipped(workflow_id: &str, task_id: &str) -> Self {
53        Self {
54            workflow_id: workflow_id.to_string(),
55            task_id: Some(task_id.to_string()),
56            result: StepResult::Skipped,
57            message: None,
58            mapping_contexts: None,
59        }
60    }
61
62    /// Create a skipped workflow step
63    pub fn workflow_skipped(workflow_id: &str) -> Self {
64        Self {
65            workflow_id: workflow_id.to_string(),
66            task_id: None,
67            result: StepResult::Skipped,
68            message: None,
69            mapping_contexts: None,
70        }
71    }
72
73    /// Set mapping context snapshots (for map tasks in trace mode)
74    pub fn with_mapping_contexts(mut self, contexts: Vec<Value>) -> Self {
75        self.mapping_contexts = Some(contexts);
76        self
77    }
78}
79
80/// Complete execution trace containing all steps
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ExecutionTrace {
83    /// All execution steps in order
84    pub steps: Vec<ExecutionStep>,
85}
86
87impl ExecutionTrace {
88    /// Create a new empty execution trace
89    pub fn new() -> Self {
90        Self { steps: Vec::new() }
91    }
92
93    /// Add a step to the trace
94    pub fn add_step(&mut self, step: ExecutionStep) {
95        self.steps.push(step);
96    }
97
98    /// Get the final message (from the last executed step)
99    pub fn final_message(&self) -> Option<&Message> {
100        self.steps
101            .iter()
102            .rev()
103            .find(|s| s.result == StepResult::Executed)
104            .and_then(|s| s.message.as_ref())
105    }
106
107    /// Check if execution was successful (no errors in final message)
108    pub fn is_success(&self) -> bool {
109        self.final_message()
110            .map(|m| m.errors.is_empty())
111            .unwrap_or(true)
112    }
113
114    /// Get number of executed steps
115    pub fn executed_count(&self) -> usize {
116        self.steps
117            .iter()
118            .filter(|s| s.result == StepResult::Executed)
119            .count()
120    }
121
122    /// Get number of skipped steps
123    pub fn skipped_count(&self) -> usize {
124        self.steps
125            .iter()
126            .filter(|s| s.result == StepResult::Skipped)
127            .count()
128    }
129}
130
131impl Default for ExecutionTrace {
132    fn default() -> Self {
133        Self::new()
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use serde_json::json;
141
142    #[test]
143    fn test_step_result_serialization() {
144        assert_eq!(
145            serde_json::to_string(&StepResult::Executed).unwrap(),
146            "\"executed\""
147        );
148        assert_eq!(
149            serde_json::to_string(&StepResult::Skipped).unwrap(),
150            "\"skipped\""
151        );
152    }
153
154    #[test]
155    fn test_execution_step_executed() {
156        let message = Message::from_value(&json!({"test": "data"}));
157        let step = ExecutionStep::executed("workflow1", "task1", &message);
158
159        assert_eq!(step.workflow_id, "workflow1");
160        assert_eq!(step.task_id, Some("task1".to_string()));
161        assert_eq!(step.result, StepResult::Executed);
162        assert!(step.message.is_some());
163    }
164
165    #[test]
166    fn test_execution_step_task_skipped() {
167        let step = ExecutionStep::task_skipped("workflow1", "task1");
168
169        assert_eq!(step.workflow_id, "workflow1");
170        assert_eq!(step.task_id, Some("task1".to_string()));
171        assert_eq!(step.result, StepResult::Skipped);
172        assert!(step.message.is_none());
173    }
174
175    #[test]
176    fn test_execution_step_workflow_skipped() {
177        let step = ExecutionStep::workflow_skipped("workflow1");
178
179        assert_eq!(step.workflow_id, "workflow1");
180        assert_eq!(step.task_id, None);
181        assert_eq!(step.result, StepResult::Skipped);
182        assert!(step.message.is_none());
183    }
184
185    #[test]
186    fn test_execution_step_with_mapping_contexts() {
187        let message = Message::from_value(&json!({"test": "data"}));
188        let contexts = vec![json!({"data": {"a": 1}}), json!({"data": {"a": 1, "b": 2}})];
189
190        let step = ExecutionStep::executed("workflow1", "task1", &message)
191            .with_mapping_contexts(contexts.clone());
192
193        assert_eq!(step.mapping_contexts, Some(contexts));
194
195        // Verify serialization includes mapping_contexts
196        let serialized = serde_json::to_value(&step).unwrap();
197        assert!(serialized.get("mapping_contexts").is_some());
198        assert_eq!(serialized["mapping_contexts"].as_array().unwrap().len(), 2);
199    }
200
201    #[test]
202    fn test_execution_step_without_mapping_contexts_serialization() {
203        let message = Message::from_value(&json!({"test": "data"}));
204        let step = ExecutionStep::executed("workflow1", "task1", &message);
205
206        // mapping_contexts is None, should be omitted in serialization
207        let serialized = serde_json::to_value(&step).unwrap();
208        assert!(serialized.get("mapping_contexts").is_none());
209    }
210
211    #[test]
212    fn test_execution_trace() {
213        let mut trace = ExecutionTrace::new();
214        let message = Message::from_value(&json!({"test": "data"}));
215
216        trace.add_step(ExecutionStep::workflow_skipped("workflow0"));
217        trace.add_step(ExecutionStep::executed("workflow1", "task1", &message));
218        trace.add_step(ExecutionStep::task_skipped("workflow1", "task2"));
219
220        assert_eq!(trace.steps.len(), 3);
221        assert_eq!(trace.executed_count(), 1);
222        assert_eq!(trace.skipped_count(), 2);
223        assert!(trace.final_message().is_some());
224        assert!(trace.is_success());
225    }
226}