Skip to main content

dataflow_rs/engine/
workflow_executor.rs

1//! # Workflow Execution Module
2//!
3//! This module handles the execution of workflows and their associated tasks.
4//! It provides a clean separation between workflow orchestration and task execution.
5
6use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::InternalExecutor;
8use crate::engine::functions::{AsyncFunctionHandler, FILTER_STATUS_HALT, FILTER_STATUS_SKIP};
9use crate::engine::message::{AuditTrail, Change, Message};
10use crate::engine::task_executor::TaskExecutor;
11use crate::engine::trace::{ExecutionStep, ExecutionTrace};
12use crate::engine::workflow::Workflow;
13use chrono::Utc;
14use log::{debug, error, info, warn};
15use serde_json::json;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19/// Result of handling a task, including possible control flow signals
20enum TaskControlFlow {
21    /// Continue executing the next task
22    Continue,
23    /// Stop executing further tasks in this workflow (filter halt)
24    HaltWorkflow,
25}
26
27/// Handles the execution of workflows and their tasks
28///
29/// The `WorkflowExecutor` is responsible for:
30/// - Evaluating workflow conditions
31/// - Orchestrating task execution within workflows
32/// - Managing workflow-level error handling
33/// - Recording audit trails
34pub struct WorkflowExecutor {
35    /// Task executor for executing individual tasks
36    task_executor: Arc<TaskExecutor>,
37    /// Internal executor for condition evaluation
38    internal_executor: Arc<InternalExecutor>,
39}
40
41impl WorkflowExecutor {
42    /// Create a new WorkflowExecutor
43    pub fn new(task_executor: Arc<TaskExecutor>, internal_executor: Arc<InternalExecutor>) -> Self {
44        Self {
45            task_executor,
46            internal_executor,
47        }
48    }
49
50    /// Get a clone of the task_functions Arc for reuse in new engines
51    pub fn task_functions(
52        &self,
53    ) -> Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>> {
54        self.task_executor.task_functions()
55    }
56
57    /// Execute a workflow if its condition is met
58    ///
59    /// This method:
60    /// 1. Evaluates the workflow condition
61    /// 2. Executes tasks sequentially if condition is met
62    /// 3. Handles error recovery based on workflow configuration
63    /// 4. Updates message metadata and audit trail
64    ///
65    /// # Arguments
66    /// * `workflow` - The workflow to execute
67    /// * `message` - The message being processed
68    ///
69    /// # Returns
70    /// * `Result<bool>` - Ok(true) if workflow was executed, Ok(false) if skipped, Err on failure
71    pub async fn execute(&self, workflow: &Workflow, message: &mut Message) -> Result<bool> {
72        // Use cached context Arc for condition evaluation
73        let context_arc = message.get_context_arc();
74
75        // Evaluate workflow condition
76        let should_execute = self
77            .internal_executor
78            .evaluate_condition(workflow.condition_index, context_arc)?;
79
80        if !should_execute {
81            debug!("Skipping workflow {} - condition not met", workflow.id);
82            return Ok(false);
83        }
84
85        // Execute workflow tasks
86        match self.execute_tasks(workflow, message).await {
87            Ok(_) => {
88                info!("Successfully completed workflow: {}", workflow.id);
89                Ok(true)
90            }
91            Err(e) if workflow.continue_on_error => {
92                warn!(
93                    "Workflow {} encountered error but continuing: {:?}",
94                    workflow.id, e
95                );
96                message.errors.push(
97                    ErrorInfo::builder(
98                        "WORKFLOW_ERROR",
99                        format!("Workflow {} error: {}", workflow.id, e),
100                    )
101                    .workflow_id(&workflow.id)
102                    .build(),
103                );
104                Ok(true)
105            }
106            Err(e) => {
107                error!("Workflow {} failed: {:?}", workflow.id, e);
108                Err(e)
109            }
110        }
111    }
112
113    /// Execute a workflow with step-by-step tracing
114    ///
115    /// Similar to `execute` but records execution steps for debugging.
116    ///
117    /// # Arguments
118    /// * `workflow` - The workflow to execute
119    /// * `message` - The message being processed
120    /// * `trace` - The execution trace to record steps to
121    ///
122    /// # Returns
123    /// * `Result<bool>` - Ok(true) if workflow was executed, Ok(false) if skipped, Err on failure
124    pub async fn execute_with_trace(
125        &self,
126        workflow: &Workflow,
127        message: &mut Message,
128        trace: &mut ExecutionTrace,
129    ) -> Result<bool> {
130        // Use cached context Arc for condition evaluation
131        let context_arc = message.get_context_arc();
132
133        // Evaluate workflow condition
134        let should_execute = self
135            .internal_executor
136            .evaluate_condition(workflow.condition_index, context_arc)?;
137
138        if !should_execute {
139            debug!("Skipping workflow {} - condition not met", workflow.id);
140            // Record skipped workflow step
141            trace.add_step(ExecutionStep::workflow_skipped(&workflow.id));
142            return Ok(false);
143        }
144
145        // Execute workflow tasks with trace collection
146        match self
147            .execute_tasks_with_trace(workflow, message, trace)
148            .await
149        {
150            Ok(_) => {
151                info!("Successfully completed workflow: {}", workflow.id);
152                Ok(true)
153            }
154            Err(e) if workflow.continue_on_error => {
155                warn!(
156                    "Workflow {} encountered error but continuing: {:?}",
157                    workflow.id, e
158                );
159                message.errors.push(
160                    ErrorInfo::builder(
161                        "WORKFLOW_ERROR",
162                        format!("Workflow {} error: {}", workflow.id, e),
163                    )
164                    .workflow_id(&workflow.id)
165                    .build(),
166                );
167                Ok(true)
168            }
169            Err(e) => {
170                error!("Workflow {} failed: {:?}", workflow.id, e);
171                Err(e)
172            }
173        }
174    }
175
176    /// Execute all tasks in a workflow
177    async fn execute_tasks(&self, workflow: &Workflow, message: &mut Message) -> Result<()> {
178        for task in &workflow.tasks {
179            // Use cached context Arc - it will be fresh if previous task modified it
180            let context_arc = message.get_context_arc();
181
182            // Evaluate task condition
183            let should_execute = self
184                .internal_executor
185                .evaluate_condition(task.condition_index, context_arc)?;
186
187            if !should_execute {
188                debug!("Skipping task {} - condition not met", task.id);
189                continue;
190            }
191
192            // Execute the task
193            let result = self.task_executor.execute(task, message).await;
194
195            // Handle task result with control flow
196            match self.handle_task_result(
197                result,
198                &workflow.id,
199                &task.id,
200                task.continue_on_error,
201                message,
202            )? {
203                TaskControlFlow::HaltWorkflow => break,
204                TaskControlFlow::Continue => {}
205            }
206        }
207
208        Ok(())
209    }
210
211    /// Execute all tasks in a workflow with trace collection
212    async fn execute_tasks_with_trace(
213        &self,
214        workflow: &Workflow,
215        message: &mut Message,
216        trace: &mut ExecutionTrace,
217    ) -> Result<()> {
218        for task in &workflow.tasks {
219            // Use cached context Arc - it will be fresh if previous task modified it
220            let context_arc = message.get_context_arc();
221
222            // Evaluate task condition
223            let should_execute = self
224                .internal_executor
225                .evaluate_condition(task.condition_index, context_arc)?;
226
227            if !should_execute {
228                debug!("Skipping task {} - condition not met", task.id);
229                // Record skipped task step
230                trace.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
231                continue;
232            }
233
234            // Execute the task with trace support
235            let result = self.task_executor.execute_with_trace(task, message).await;
236
237            // Extract mapping_contexts before passing result to handle_task_result
238            let mapping_contexts = match &result {
239                Ok((_, _, contexts)) => contexts.clone(),
240                Err(_) => None,
241            };
242
243            // Convert to the standard result format for handle_task_result
244            let standard_result = result.map(|(status, changes, _)| (status, changes));
245
246            // Handle task result with control flow
247            let control_flow = self.handle_task_result(
248                standard_result,
249                &workflow.id,
250                &task.id,
251                task.continue_on_error,
252                message,
253            )?;
254
255            // Record executed step with message snapshot and optional mapping contexts
256            let mut step = ExecutionStep::executed(&workflow.id, &task.id, message);
257            if let Some(contexts) = mapping_contexts {
258                step = step.with_mapping_contexts(contexts);
259            }
260            trace.add_step(step);
261
262            if let TaskControlFlow::HaltWorkflow = control_flow {
263                break;
264            }
265        }
266
267        Ok(())
268    }
269
270    /// Handle the result of a task execution
271    fn handle_task_result(
272        &self,
273        result: Result<(usize, Vec<Change>)>,
274        workflow_id: &str,
275        task_id: &str,
276        continue_on_error: bool,
277        message: &mut Message,
278    ) -> Result<TaskControlFlow> {
279        match result {
280            Ok((status, changes)) => {
281                // Handle filter skip — no audit trail, just continue
282                if status == FILTER_STATUS_SKIP {
283                    debug!("Task {} signaled skip (filter gate)", task_id);
284                    return Ok(TaskControlFlow::Continue);
285                }
286
287                // Record audit trail
288                message.audit_trail.push(AuditTrail {
289                    timestamp: Utc::now(),
290                    workflow_id: Arc::from(workflow_id),
291                    task_id: Arc::from(task_id),
292                    status,
293                    changes,
294                });
295
296                // Update progress metadata for workflow chaining
297                if let Some(metadata) = message.context["metadata"].as_object_mut() {
298                    // Update existing progress or create new one
299                    if let Some(progress) = metadata.get_mut("progress") {
300                        if let Some(progress_obj) = progress.as_object_mut() {
301                            progress_obj.insert("workflow_id".to_string(), json!(workflow_id));
302                            progress_obj.insert("task_id".to_string(), json!(task_id));
303                            progress_obj.insert("status_code".to_string(), json!(status));
304                        }
305                    } else {
306                        metadata.insert(
307                            "progress".to_string(),
308                            json!({
309                                "workflow_id": workflow_id,
310                                "task_id": task_id,
311                                "status_code": status
312                            }),
313                        );
314                    }
315                }
316                message.invalidate_context_cache();
317
318                // Handle filter halt — audit trail is recorded, halt the workflow
319                if status == FILTER_STATUS_HALT {
320                    info!(
321                        "Task {} halted workflow {} (filter gate)",
322                        task_id, workflow_id
323                    );
324                    return Ok(TaskControlFlow::HaltWorkflow);
325                }
326
327                // Check status code
328                if (400..500).contains(&status) {
329                    warn!("Task {} returned client error status: {}", task_id, status);
330                } else if status >= 500 {
331                    error!("Task {} returned server error status: {}", task_id, status);
332                    if !continue_on_error {
333                        return Err(DataflowError::Task(format!(
334                            "Task {} failed with status {}",
335                            task_id, status
336                        )));
337                    }
338                }
339                Ok(TaskControlFlow::Continue)
340            }
341            Err(e) => {
342                error!("Task {} failed: {:?}", task_id, e);
343
344                // Record error in audit trail
345                message.audit_trail.push(AuditTrail {
346                    timestamp: Utc::now(),
347                    workflow_id: Arc::from(workflow_id),
348                    task_id: Arc::from(task_id),
349                    status: 500,
350                    changes: vec![],
351                });
352
353                // Add error to message
354                message.errors.push(
355                    ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
356                        .workflow_id(workflow_id)
357                        .task_id(task_id)
358                        .build(),
359                );
360
361                if !continue_on_error {
362                    Err(e)
363                } else {
364                    Ok(TaskControlFlow::Continue)
365                }
366            }
367        }
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::engine::compiler::LogicCompiler;
375    use serde_json::json;
376    use std::collections::HashMap;
377
378    #[tokio::test]
379    async fn test_workflow_executor_skip_condition() {
380        // Create a workflow with a false condition
381        let workflow_json = r#"{
382            "id": "test_workflow",
383            "name": "Test Workflow",
384            "condition": false,
385            "tasks": [{
386                "id": "dummy_task",
387                "name": "Dummy Task",
388                "function": {
389                    "name": "map",
390                    "input": {"mappings": []}
391                }
392            }]
393        }"#;
394
395        let mut compiler = LogicCompiler::new();
396        let mut workflow = Workflow::from_json(workflow_json).unwrap();
397
398        // Compile the workflow condition
399        let workflows = compiler.compile_workflows(vec![workflow.clone()]);
400        if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
401            workflow = compiled_workflow.clone();
402        }
403
404        let (datalogic, logic_cache) = compiler.into_parts();
405        let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
406        let task_executor = Arc::new(TaskExecutor::new(
407            Arc::new(HashMap::new()),
408            internal_executor.clone(),
409            datalogic,
410        ));
411        let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
412
413        let mut message = Message::from_value(&json!({}));
414
415        // Execute workflow - should be skipped due to false condition
416        let executed = workflow_executor
417            .execute(&workflow, &mut message)
418            .await
419            .unwrap();
420        assert!(!executed);
421        assert_eq!(message.audit_trail.len(), 0);
422    }
423
424    #[tokio::test]
425    async fn test_workflow_executor_execute_success() {
426        // Create a workflow with a true condition
427        let workflow_json = r#"{
428            "id": "test_workflow",
429            "name": "Test Workflow",
430            "condition": true,
431            "tasks": [{
432                "id": "dummy_task",
433                "name": "Dummy Task",
434                "function": {
435                    "name": "map",
436                    "input": {"mappings": []}
437                }
438            }]
439        }"#;
440
441        let mut compiler = LogicCompiler::new();
442        let mut workflow = Workflow::from_json(workflow_json).unwrap();
443
444        // Compile the workflow
445        let workflows = compiler.compile_workflows(vec![workflow.clone()]);
446        if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
447            workflow = compiled_workflow.clone();
448        }
449
450        let (datalogic, logic_cache) = compiler.into_parts();
451        let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
452        let task_executor = Arc::new(TaskExecutor::new(
453            Arc::new(HashMap::new()),
454            internal_executor.clone(),
455            datalogic,
456        ));
457        let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
458
459        let mut message = Message::from_value(&json!({}));
460
461        // Execute workflow - should succeed with empty task list
462        let executed = workflow_executor
463            .execute(&workflow, &mut message)
464            .await
465            .unwrap();
466        assert!(executed);
467    }
468}