dataflow_rs/engine/
trace.rs1use crate::engine::message::Message;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "lowercase")]
14pub enum StepResult {
15 Executed,
17 Skipped,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ExecutionStep {
24 pub workflow_id: String,
26 pub task_id: Option<String>,
28 pub result: StepResult,
30 #[serde(skip_serializing_if = "Option::is_none")]
32 pub message: Option<Message>,
33 #[serde(skip_serializing_if = "Option::is_none")]
36 pub mapping_contexts: Option<Vec<Value>>,
37}
38
39impl ExecutionStep {
40 pub fn executed(workflow_id: &str, task_id: &str, message: &Message) -> Self {
42 Self {
43 workflow_id: workflow_id.to_string(),
44 task_id: Some(task_id.to_string()),
45 result: StepResult::Executed,
46 message: Some(message.clone()),
47 mapping_contexts: None,
48 }
49 }
50
51 pub fn task_skipped(workflow_id: &str, task_id: &str) -> Self {
53 Self {
54 workflow_id: workflow_id.to_string(),
55 task_id: Some(task_id.to_string()),
56 result: StepResult::Skipped,
57 message: None,
58 mapping_contexts: None,
59 }
60 }
61
62 pub fn workflow_skipped(workflow_id: &str) -> Self {
64 Self {
65 workflow_id: workflow_id.to_string(),
66 task_id: None,
67 result: StepResult::Skipped,
68 message: None,
69 mapping_contexts: None,
70 }
71 }
72
73 pub fn with_mapping_contexts(mut self, contexts: Vec<Value>) -> Self {
75 self.mapping_contexts = Some(contexts);
76 self
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ExecutionTrace {
83 pub steps: Vec<ExecutionStep>,
85}
86
87impl ExecutionTrace {
88 pub fn new() -> Self {
90 Self { steps: Vec::new() }
91 }
92
93 pub fn add_step(&mut self, step: ExecutionStep) {
95 self.steps.push(step);
96 }
97
98 pub fn final_message(&self) -> Option<&Message> {
100 self.steps
101 .iter()
102 .rev()
103 .find(|s| s.result == StepResult::Executed)
104 .and_then(|s| s.message.as_ref())
105 }
106
107 pub fn is_success(&self) -> bool {
109 self.final_message()
110 .map(|m| m.errors.is_empty())
111 .unwrap_or(true)
112 }
113
114 pub fn executed_count(&self) -> usize {
116 self.steps
117 .iter()
118 .filter(|s| s.result == StepResult::Executed)
119 .count()
120 }
121
122 pub fn skipped_count(&self) -> usize {
124 self.steps
125 .iter()
126 .filter(|s| s.result == StepResult::Skipped)
127 .count()
128 }
129}
130
131impl Default for ExecutionTrace {
132 fn default() -> Self {
133 Self::new()
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use serde_json::json;
141
142 #[test]
143 fn test_step_result_serialization() {
144 assert_eq!(
145 serde_json::to_string(&StepResult::Executed).unwrap(),
146 "\"executed\""
147 );
148 assert_eq!(
149 serde_json::to_string(&StepResult::Skipped).unwrap(),
150 "\"skipped\""
151 );
152 }
153
154 #[test]
155 fn test_execution_step_executed() {
156 let message = Message::from_value(&json!({"test": "data"}));
157 let step = ExecutionStep::executed("workflow1", "task1", &message);
158
159 assert_eq!(step.workflow_id, "workflow1");
160 assert_eq!(step.task_id, Some("task1".to_string()));
161 assert_eq!(step.result, StepResult::Executed);
162 assert!(step.message.is_some());
163 }
164
165 #[test]
166 fn test_execution_step_task_skipped() {
167 let step = ExecutionStep::task_skipped("workflow1", "task1");
168
169 assert_eq!(step.workflow_id, "workflow1");
170 assert_eq!(step.task_id, Some("task1".to_string()));
171 assert_eq!(step.result, StepResult::Skipped);
172 assert!(step.message.is_none());
173 }
174
175 #[test]
176 fn test_execution_step_workflow_skipped() {
177 let step = ExecutionStep::workflow_skipped("workflow1");
178
179 assert_eq!(step.workflow_id, "workflow1");
180 assert_eq!(step.task_id, None);
181 assert_eq!(step.result, StepResult::Skipped);
182 assert!(step.message.is_none());
183 }
184
185 #[test]
186 fn test_execution_step_with_mapping_contexts() {
187 let message = Message::from_value(&json!({"test": "data"}));
188 let contexts = vec![json!({"data": {"a": 1}}), json!({"data": {"a": 1, "b": 2}})];
189
190 let step = ExecutionStep::executed("workflow1", "task1", &message)
191 .with_mapping_contexts(contexts.clone());
192
193 assert_eq!(step.mapping_contexts, Some(contexts));
194
195 let serialized = serde_json::to_value(&step).unwrap();
197 assert!(serialized.get("mapping_contexts").is_some());
198 assert_eq!(serialized["mapping_contexts"].as_array().unwrap().len(), 2);
199 }
200
201 #[test]
202 fn test_execution_step_without_mapping_contexts_serialization() {
203 let message = Message::from_value(&json!({"test": "data"}));
204 let step = ExecutionStep::executed("workflow1", "task1", &message);
205
206 let serialized = serde_json::to_value(&step).unwrap();
208 assert!(serialized.get("mapping_contexts").is_none());
209 }
210
211 #[test]
212 fn test_execution_trace() {
213 let mut trace = ExecutionTrace::new();
214 let message = Message::from_value(&json!({"test": "data"}));
215
216 trace.add_step(ExecutionStep::workflow_skipped("workflow0"));
217 trace.add_step(ExecutionStep::executed("workflow1", "task1", &message));
218 trace.add_step(ExecutionStep::task_skipped("workflow1", "task2"));
219
220 assert_eq!(trace.steps.len(), 3);
221 assert_eq!(trace.executed_count(), 1);
222 assert_eq!(trace.skipped_count(), 2);
223 assert!(trace.final_message().is_some());
224 assert!(trace.is_success());
225 }
226}