1use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::InternalExecutor;
8use crate::engine::message::{AuditTrail, Change, Message};
9use crate::engine::task_executor::TaskExecutor;
10use crate::engine::workflow::Workflow;
11use chrono::Utc;
12use log::{debug, error, info, warn};
13use serde_json::json;
14use std::sync::Arc;
15
16pub struct WorkflowExecutor {
24 task_executor: Arc<TaskExecutor>,
26 internal_executor: Arc<InternalExecutor>,
28}
29
30impl WorkflowExecutor {
31 pub fn new(task_executor: Arc<TaskExecutor>, internal_executor: Arc<InternalExecutor>) -> Self {
33 Self {
34 task_executor,
35 internal_executor,
36 }
37 }
38
39 pub async fn execute(&self, workflow: &Workflow, message: &mut Message) -> Result<bool> {
54 let context_arc = message.get_context_arc();
56
57 let should_execute = self
59 .internal_executor
60 .evaluate_condition(workflow.condition_index, context_arc)?;
61
62 if !should_execute {
63 debug!("Skipping workflow {} - condition not met", workflow.id);
64 return Ok(false);
65 }
66
67 match self.execute_tasks(workflow, message).await {
69 Ok(_) => {
70 info!("Successfully completed workflow: {}", workflow.id);
71 Ok(true)
72 }
73 Err(e) if workflow.continue_on_error => {
74 warn!(
75 "Workflow {} encountered error but continuing: {:?}",
76 workflow.id, e
77 );
78 message.errors.push(
79 ErrorInfo::builder(
80 "WORKFLOW_ERROR",
81 format!("Workflow {} error: {}", workflow.id, e),
82 )
83 .workflow_id(&workflow.id)
84 .build(),
85 );
86 Ok(true)
87 }
88 Err(e) => {
89 error!("Workflow {} failed: {:?}", workflow.id, e);
90 Err(e)
91 }
92 }
93 }
94
95 async fn execute_tasks(&self, workflow: &Workflow, message: &mut Message) -> Result<()> {
97 for task in &workflow.tasks {
98 let context_arc = message.get_context_arc();
100
101 let should_execute = self
103 .internal_executor
104 .evaluate_condition(task.condition_index, context_arc)?;
105
106 if !should_execute {
107 debug!("Skipping task {} - condition not met", task.id);
108 continue;
109 }
110
111 let result = self.task_executor.execute(task, message).await;
113
114 self.handle_task_result(
116 result,
117 &workflow.id,
118 &task.id,
119 task.continue_on_error,
120 message,
121 )?;
122 }
123
124 Ok(())
125 }
126
127 fn handle_task_result(
129 &self,
130 result: Result<(usize, Vec<Change>)>,
131 workflow_id: &str,
132 task_id: &str,
133 continue_on_error: bool,
134 message: &mut Message,
135 ) -> Result<()> {
136 match result {
137 Ok((status, changes)) => {
138 message.audit_trail.push(AuditTrail {
140 timestamp: Utc::now(),
141 workflow_id: Arc::from(workflow_id),
142 task_id: Arc::from(task_id),
143 status,
144 changes,
145 });
146
147 if let Some(metadata) = message.context["metadata"].as_object_mut() {
149 if let Some(progress) = metadata.get_mut("progress") {
151 if let Some(progress_obj) = progress.as_object_mut() {
152 progress_obj.insert("workflow_id".to_string(), json!(workflow_id));
153 progress_obj.insert("task_id".to_string(), json!(task_id));
154 progress_obj.insert("status_code".to_string(), json!(status));
155 }
156 } else {
157 metadata.insert(
158 "progress".to_string(),
159 json!({
160 "workflow_id": workflow_id,
161 "task_id": task_id,
162 "status_code": status
163 }),
164 );
165 }
166 }
167 message.invalidate_context_cache();
168
169 if (400..500).contains(&status) {
171 warn!("Task {} returned client error status: {}", task_id, status);
172 } else if status >= 500 {
173 error!("Task {} returned server error status: {}", task_id, status);
174 if !continue_on_error {
175 return Err(DataflowError::Task(format!(
176 "Task {} failed with status {}",
177 task_id, status
178 )));
179 }
180 }
181 Ok(())
182 }
183 Err(e) => {
184 error!("Task {} failed: {:?}", task_id, e);
185
186 message.audit_trail.push(AuditTrail {
188 timestamp: Utc::now(),
189 workflow_id: Arc::from(workflow_id),
190 task_id: Arc::from(task_id),
191 status: 500,
192 changes: vec![],
193 });
194
195 message.errors.push(
197 ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
198 .workflow_id(workflow_id)
199 .task_id(task_id)
200 .build(),
201 );
202
203 if !continue_on_error { Err(e) } else { Ok(()) }
204 }
205 }
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use crate::engine::compiler::LogicCompiler;
213 use serde_json::json;
214 use std::collections::HashMap;
215
216 #[tokio::test]
217 async fn test_workflow_executor_skip_condition() {
218 let workflow_json = r#"{
220 "id": "test_workflow",
221 "name": "Test Workflow",
222 "condition": false,
223 "tasks": [{
224 "id": "dummy_task",
225 "name": "Dummy Task",
226 "function": {
227 "name": "map",
228 "input": {"mappings": []}
229 }
230 }]
231 }"#;
232
233 let mut compiler = LogicCompiler::new();
234 let mut workflow = Workflow::from_json(workflow_json).unwrap();
235
236 let workflows = compiler.compile_workflows(vec![workflow.clone()]);
238 if let Some(compiled_workflow) = workflows.get("test_workflow") {
239 workflow = compiled_workflow.clone();
240 }
241
242 let (datalogic, logic_cache) = compiler.into_parts();
243 let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
244 let task_executor = Arc::new(TaskExecutor::new(
245 Arc::new(HashMap::new()),
246 internal_executor.clone(),
247 datalogic,
248 ));
249 let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
250
251 let mut message = Message::from_value(&json!({}));
252
253 let executed = workflow_executor
255 .execute(&workflow, &mut message)
256 .await
257 .unwrap();
258 assert!(!executed);
259 assert_eq!(message.audit_trail.len(), 0);
260 }
261
262 #[tokio::test]
263 async fn test_workflow_executor_execute_success() {
264 let workflow_json = r#"{
266 "id": "test_workflow",
267 "name": "Test Workflow",
268 "condition": true,
269 "tasks": [{
270 "id": "dummy_task",
271 "name": "Dummy Task",
272 "function": {
273 "name": "map",
274 "input": {"mappings": []}
275 }
276 }]
277 }"#;
278
279 let mut compiler = LogicCompiler::new();
280 let mut workflow = Workflow::from_json(workflow_json).unwrap();
281
282 let workflows = compiler.compile_workflows(vec![workflow.clone()]);
284 if let Some(compiled_workflow) = workflows.get("test_workflow") {
285 workflow = compiled_workflow.clone();
286 }
287
288 let (datalogic, logic_cache) = compiler.into_parts();
289 let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
290 let task_executor = Arc::new(TaskExecutor::new(
291 Arc::new(HashMap::new()),
292 internal_executor.clone(),
293 datalogic,
294 ));
295 let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
296
297 let mut message = Message::from_value(&json!({}));
298
299 let executed = workflow_executor
301 .execute(&workflow, &mut message)
302 .await
303 .unwrap();
304 assert!(executed);
305 }
306}