1use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::InternalExecutor;
8use crate::engine::message::{AuditTrail, Change, Message};
9use crate::engine::task_executor::TaskExecutor;
10use crate::engine::trace::{ExecutionStep, ExecutionTrace};
11use crate::engine::workflow::Workflow;
12use chrono::Utc;
13use log::{debug, error, info, warn};
14use serde_json::json;
15use std::sync::Arc;
16
17pub struct WorkflowExecutor {
25 task_executor: Arc<TaskExecutor>,
27 internal_executor: Arc<InternalExecutor>,
29}
30
31impl WorkflowExecutor {
32 pub fn new(task_executor: Arc<TaskExecutor>, internal_executor: Arc<InternalExecutor>) -> Self {
34 Self {
35 task_executor,
36 internal_executor,
37 }
38 }
39
40 pub async fn execute(&self, workflow: &Workflow, message: &mut Message) -> Result<bool> {
55 let context_arc = message.get_context_arc();
57
58 let should_execute = self
60 .internal_executor
61 .evaluate_condition(workflow.condition_index, context_arc)?;
62
63 if !should_execute {
64 debug!("Skipping workflow {} - condition not met", workflow.id);
65 return Ok(false);
66 }
67
68 match self.execute_tasks(workflow, message).await {
70 Ok(_) => {
71 info!("Successfully completed workflow: {}", workflow.id);
72 Ok(true)
73 }
74 Err(e) if workflow.continue_on_error => {
75 warn!(
76 "Workflow {} encountered error but continuing: {:?}",
77 workflow.id, e
78 );
79 message.errors.push(
80 ErrorInfo::builder(
81 "WORKFLOW_ERROR",
82 format!("Workflow {} error: {}", workflow.id, e),
83 )
84 .workflow_id(&workflow.id)
85 .build(),
86 );
87 Ok(true)
88 }
89 Err(e) => {
90 error!("Workflow {} failed: {:?}", workflow.id, e);
91 Err(e)
92 }
93 }
94 }
95
96 pub async fn execute_with_trace(
108 &self,
109 workflow: &Workflow,
110 message: &mut Message,
111 trace: &mut ExecutionTrace,
112 ) -> Result<bool> {
113 let context_arc = message.get_context_arc();
115
116 let should_execute = self
118 .internal_executor
119 .evaluate_condition(workflow.condition_index, context_arc)?;
120
121 if !should_execute {
122 debug!("Skipping workflow {} - condition not met", workflow.id);
123 trace.add_step(ExecutionStep::workflow_skipped(&workflow.id));
125 return Ok(false);
126 }
127
128 match self
130 .execute_tasks_with_trace(workflow, message, trace)
131 .await
132 {
133 Ok(_) => {
134 info!("Successfully completed workflow: {}", workflow.id);
135 Ok(true)
136 }
137 Err(e) if workflow.continue_on_error => {
138 warn!(
139 "Workflow {} encountered error but continuing: {:?}",
140 workflow.id, e
141 );
142 message.errors.push(
143 ErrorInfo::builder(
144 "WORKFLOW_ERROR",
145 format!("Workflow {} error: {}", workflow.id, e),
146 )
147 .workflow_id(&workflow.id)
148 .build(),
149 );
150 Ok(true)
151 }
152 Err(e) => {
153 error!("Workflow {} failed: {:?}", workflow.id, e);
154 Err(e)
155 }
156 }
157 }
158
159 async fn execute_tasks(&self, workflow: &Workflow, message: &mut Message) -> Result<()> {
161 for task in &workflow.tasks {
162 let context_arc = message.get_context_arc();
164
165 let should_execute = self
167 .internal_executor
168 .evaluate_condition(task.condition_index, context_arc)?;
169
170 if !should_execute {
171 debug!("Skipping task {} - condition not met", task.id);
172 continue;
173 }
174
175 let result = self.task_executor.execute(task, message).await;
177
178 self.handle_task_result(
180 result,
181 &workflow.id,
182 &task.id,
183 task.continue_on_error,
184 message,
185 )?;
186 }
187
188 Ok(())
189 }
190
191 async fn execute_tasks_with_trace(
193 &self,
194 workflow: &Workflow,
195 message: &mut Message,
196 trace: &mut ExecutionTrace,
197 ) -> Result<()> {
198 for task in &workflow.tasks {
199 let context_arc = message.get_context_arc();
201
202 let should_execute = self
204 .internal_executor
205 .evaluate_condition(task.condition_index, context_arc)?;
206
207 if !should_execute {
208 debug!("Skipping task {} - condition not met", task.id);
209 trace.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
211 continue;
212 }
213
214 let result = self.task_executor.execute_with_trace(task, message).await;
216
217 let mapping_contexts = match &result {
219 Ok((_, _, contexts)) => contexts.clone(),
220 Err(_) => None,
221 };
222
223 let standard_result = result.map(|(status, changes, _)| (status, changes));
225
226 self.handle_task_result(
228 standard_result,
229 &workflow.id,
230 &task.id,
231 task.continue_on_error,
232 message,
233 )?;
234
235 let mut step = ExecutionStep::executed(&workflow.id, &task.id, message);
237 if let Some(contexts) = mapping_contexts {
238 step = step.with_mapping_contexts(contexts);
239 }
240 trace.add_step(step);
241 }
242
243 Ok(())
244 }
245
246 fn handle_task_result(
248 &self,
249 result: Result<(usize, Vec<Change>)>,
250 workflow_id: &str,
251 task_id: &str,
252 continue_on_error: bool,
253 message: &mut Message,
254 ) -> Result<()> {
255 match result {
256 Ok((status, changes)) => {
257 message.audit_trail.push(AuditTrail {
259 timestamp: Utc::now(),
260 workflow_id: Arc::from(workflow_id),
261 task_id: Arc::from(task_id),
262 status,
263 changes,
264 });
265
266 if let Some(metadata) = message.context["metadata"].as_object_mut() {
268 if let Some(progress) = metadata.get_mut("progress") {
270 if let Some(progress_obj) = progress.as_object_mut() {
271 progress_obj.insert("workflow_id".to_string(), json!(workflow_id));
272 progress_obj.insert("task_id".to_string(), json!(task_id));
273 progress_obj.insert("status_code".to_string(), json!(status));
274 }
275 } else {
276 metadata.insert(
277 "progress".to_string(),
278 json!({
279 "workflow_id": workflow_id,
280 "task_id": task_id,
281 "status_code": status
282 }),
283 );
284 }
285 }
286 message.invalidate_context_cache();
287
288 if (400..500).contains(&status) {
290 warn!("Task {} returned client error status: {}", task_id, status);
291 } else if status >= 500 {
292 error!("Task {} returned server error status: {}", task_id, status);
293 if !continue_on_error {
294 return Err(DataflowError::Task(format!(
295 "Task {} failed with status {}",
296 task_id, status
297 )));
298 }
299 }
300 Ok(())
301 }
302 Err(e) => {
303 error!("Task {} failed: {:?}", task_id, e);
304
305 message.audit_trail.push(AuditTrail {
307 timestamp: Utc::now(),
308 workflow_id: Arc::from(workflow_id),
309 task_id: Arc::from(task_id),
310 status: 500,
311 changes: vec![],
312 });
313
314 message.errors.push(
316 ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
317 .workflow_id(workflow_id)
318 .task_id(task_id)
319 .build(),
320 );
321
322 if !continue_on_error { Err(e) } else { Ok(()) }
323 }
324 }
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use crate::engine::compiler::LogicCompiler;
332 use serde_json::json;
333 use std::collections::HashMap;
334
335 #[tokio::test]
336 async fn test_workflow_executor_skip_condition() {
337 let workflow_json = r#"{
339 "id": "test_workflow",
340 "name": "Test Workflow",
341 "condition": false,
342 "tasks": [{
343 "id": "dummy_task",
344 "name": "Dummy Task",
345 "function": {
346 "name": "map",
347 "input": {"mappings": []}
348 }
349 }]
350 }"#;
351
352 let mut compiler = LogicCompiler::new();
353 let mut workflow = Workflow::from_json(workflow_json).unwrap();
354
355 let workflows = compiler.compile_workflows(vec![workflow.clone()]);
357 if let Some(compiled_workflow) = workflows.get("test_workflow") {
358 workflow = compiled_workflow.clone();
359 }
360
361 let (datalogic, logic_cache) = compiler.into_parts();
362 let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
363 let task_executor = Arc::new(TaskExecutor::new(
364 Arc::new(HashMap::new()),
365 internal_executor.clone(),
366 datalogic,
367 ));
368 let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
369
370 let mut message = Message::from_value(&json!({}));
371
372 let executed = workflow_executor
374 .execute(&workflow, &mut message)
375 .await
376 .unwrap();
377 assert!(!executed);
378 assert_eq!(message.audit_trail.len(), 0);
379 }
380
381 #[tokio::test]
382 async fn test_workflow_executor_execute_success() {
383 let workflow_json = r#"{
385 "id": "test_workflow",
386 "name": "Test Workflow",
387 "condition": true,
388 "tasks": [{
389 "id": "dummy_task",
390 "name": "Dummy Task",
391 "function": {
392 "name": "map",
393 "input": {"mappings": []}
394 }
395 }]
396 }"#;
397
398 let mut compiler = LogicCompiler::new();
399 let mut workflow = Workflow::from_json(workflow_json).unwrap();
400
401 let workflows = compiler.compile_workflows(vec![workflow.clone()]);
403 if let Some(compiled_workflow) = workflows.get("test_workflow") {
404 workflow = compiled_workflow.clone();
405 }
406
407 let (datalogic, logic_cache) = compiler.into_parts();
408 let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
409 let task_executor = Arc::new(TaskExecutor::new(
410 Arc::new(HashMap::new()),
411 internal_executor.clone(),
412 datalogic,
413 ));
414 let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
415
416 let mut message = Message::from_value(&json!({}));
417
418 let executed = workflow_executor
420 .execute(&workflow, &mut message)
421 .await
422 .unwrap();
423 assert!(executed);
424 }
425}