dataflow_rs/engine/
trace.rs1use crate::engine::message::Message;
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "lowercase")]
13pub enum StepResult {
14 Executed,
16 Skipped,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ExecutionStep {
23 pub workflow_id: String,
25 pub task_id: Option<String>,
27 pub result: StepResult,
29 #[serde(skip_serializing_if = "Option::is_none")]
31 pub message: Option<Message>,
32}
33
34impl ExecutionStep {
35 pub fn executed(workflow_id: &str, task_id: &str, message: &Message) -> Self {
37 Self {
38 workflow_id: workflow_id.to_string(),
39 task_id: Some(task_id.to_string()),
40 result: StepResult::Executed,
41 message: Some(message.clone()),
42 }
43 }
44
45 pub fn task_skipped(workflow_id: &str, task_id: &str) -> Self {
47 Self {
48 workflow_id: workflow_id.to_string(),
49 task_id: Some(task_id.to_string()),
50 result: StepResult::Skipped,
51 message: None,
52 }
53 }
54
55 pub fn workflow_skipped(workflow_id: &str) -> Self {
57 Self {
58 workflow_id: workflow_id.to_string(),
59 task_id: None,
60 result: StepResult::Skipped,
61 message: None,
62 }
63 }
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct ExecutionTrace {
69 pub steps: Vec<ExecutionStep>,
71}
72
73impl ExecutionTrace {
74 pub fn new() -> Self {
76 Self { steps: Vec::new() }
77 }
78
79 pub fn add_step(&mut self, step: ExecutionStep) {
81 self.steps.push(step);
82 }
83
84 pub fn final_message(&self) -> Option<&Message> {
86 self.steps
87 .iter()
88 .rev()
89 .find(|s| s.result == StepResult::Executed)
90 .and_then(|s| s.message.as_ref())
91 }
92
93 pub fn is_success(&self) -> bool {
95 self.final_message()
96 .map(|m| m.errors.is_empty())
97 .unwrap_or(true)
98 }
99
100 pub fn executed_count(&self) -> usize {
102 self.steps
103 .iter()
104 .filter(|s| s.result == StepResult::Executed)
105 .count()
106 }
107
108 pub fn skipped_count(&self) -> usize {
110 self.steps
111 .iter()
112 .filter(|s| s.result == StepResult::Skipped)
113 .count()
114 }
115}
116
117impl Default for ExecutionTrace {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use serde_json::json;
127
128 #[test]
129 fn test_step_result_serialization() {
130 assert_eq!(
131 serde_json::to_string(&StepResult::Executed).unwrap(),
132 "\"executed\""
133 );
134 assert_eq!(
135 serde_json::to_string(&StepResult::Skipped).unwrap(),
136 "\"skipped\""
137 );
138 }
139
140 #[test]
141 fn test_execution_step_executed() {
142 let message = Message::from_value(&json!({"test": "data"}));
143 let step = ExecutionStep::executed("workflow1", "task1", &message);
144
145 assert_eq!(step.workflow_id, "workflow1");
146 assert_eq!(step.task_id, Some("task1".to_string()));
147 assert_eq!(step.result, StepResult::Executed);
148 assert!(step.message.is_some());
149 }
150
151 #[test]
152 fn test_execution_step_task_skipped() {
153 let step = ExecutionStep::task_skipped("workflow1", "task1");
154
155 assert_eq!(step.workflow_id, "workflow1");
156 assert_eq!(step.task_id, Some("task1".to_string()));
157 assert_eq!(step.result, StepResult::Skipped);
158 assert!(step.message.is_none());
159 }
160
161 #[test]
162 fn test_execution_step_workflow_skipped() {
163 let step = ExecutionStep::workflow_skipped("workflow1");
164
165 assert_eq!(step.workflow_id, "workflow1");
166 assert_eq!(step.task_id, None);
167 assert_eq!(step.result, StepResult::Skipped);
168 assert!(step.message.is_none());
169 }
170
171 #[test]
172 fn test_execution_trace() {
173 let mut trace = ExecutionTrace::new();
174 let message = Message::from_value(&json!({"test": "data"}));
175
176 trace.add_step(ExecutionStep::workflow_skipped("workflow0"));
177 trace.add_step(ExecutionStep::executed("workflow1", "task1", &message));
178 trace.add_step(ExecutionStep::task_skipped("workflow1", "task2"));
179
180 assert_eq!(trace.steps.len(), 3);
181 assert_eq!(trace.executed_count(), 1);
182 assert_eq!(trace.skipped_count(), 2);
183 assert!(trace.final_message().is_some());
184 assert!(trace.is_success());
185 }
186}