dataflow_rs/engine/
workflow_executor.rs

1//! # Workflow Execution Module
2//!
3//! This module handles the execution of workflows and their associated tasks.
4//! It provides a clean separation between workflow orchestration and task execution.
5
6use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::InternalExecutor;
8use crate::engine::message::{AuditTrail, Change, Message};
9use crate::engine::task_executor::TaskExecutor;
10use crate::engine::workflow::Workflow;
11use chrono::Utc;
12use log::{debug, error, info, warn};
13use serde_json::json;
14use std::sync::Arc;
15
16/// Handles the execution of workflows and their tasks
17///
18/// The `WorkflowExecutor` is responsible for:
19/// - Evaluating workflow conditions
20/// - Orchestrating task execution within workflows
21/// - Managing workflow-level error handling
22/// - Recording audit trails
23pub struct WorkflowExecutor {
24    /// Task executor for executing individual tasks
25    task_executor: Arc<TaskExecutor>,
26    /// Internal executor for condition evaluation
27    internal_executor: Arc<InternalExecutor>,
28}
29
30impl WorkflowExecutor {
31    /// Create a new WorkflowExecutor
32    pub fn new(task_executor: Arc<TaskExecutor>, internal_executor: Arc<InternalExecutor>) -> Self {
33        Self {
34            task_executor,
35            internal_executor,
36        }
37    }
38
39    /// Execute a workflow if its condition is met
40    ///
41    /// This method:
42    /// 1. Evaluates the workflow condition
43    /// 2. Executes tasks sequentially if condition is met
44    /// 3. Handles error recovery based on workflow configuration
45    /// 4. Updates message metadata and audit trail
46    ///
47    /// # Arguments
48    /// * `workflow` - The workflow to execute
49    /// * `message` - The message being processed
50    ///
51    /// # Returns
52    /// * `Result<bool>` - Ok(true) if workflow was executed, Ok(false) if skipped, Err on failure
53    pub async fn execute(&self, workflow: &Workflow, message: &mut Message) -> Result<bool> {
54        // Use cached context Arc for condition evaluation
55        let context_arc = message.get_context_arc();
56
57        // Evaluate workflow condition
58        let should_execute = self
59            .internal_executor
60            .evaluate_condition(workflow.condition_index, context_arc)?;
61
62        if !should_execute {
63            debug!("Skipping workflow {} - condition not met", workflow.id);
64            return Ok(false);
65        }
66
67        // Execute workflow tasks
68        match self.execute_tasks(workflow, message).await {
69            Ok(_) => {
70                info!("Successfully completed workflow: {}", workflow.id);
71                Ok(true)
72            }
73            Err(e) if workflow.continue_on_error => {
74                warn!(
75                    "Workflow {} encountered error but continuing: {:?}",
76                    workflow.id, e
77                );
78                message.errors.push(
79                    ErrorInfo::builder(
80                        "WORKFLOW_ERROR",
81                        format!("Workflow {} error: {}", workflow.id, e),
82                    )
83                    .workflow_id(&workflow.id)
84                    .build(),
85                );
86                Ok(true)
87            }
88            Err(e) => {
89                error!("Workflow {} failed: {:?}", workflow.id, e);
90                Err(e)
91            }
92        }
93    }
94
95    /// Execute all tasks in a workflow
96    async fn execute_tasks(&self, workflow: &Workflow, message: &mut Message) -> Result<()> {
97        for task in &workflow.tasks {
98            // Use cached context Arc - it will be fresh if previous task modified it
99            let context_arc = message.get_context_arc();
100
101            // Evaluate task condition
102            let should_execute = self
103                .internal_executor
104                .evaluate_condition(task.condition_index, context_arc)?;
105
106            if !should_execute {
107                debug!("Skipping task {} - condition not met", task.id);
108                continue;
109            }
110
111            // Execute the task
112            let result = self.task_executor.execute(task, message).await;
113
114            // Handle task result
115            self.handle_task_result(
116                result,
117                &workflow.id,
118                &task.id,
119                task.continue_on_error,
120                message,
121            )?;
122        }
123
124        Ok(())
125    }
126
127    /// Handle the result of a task execution
128    fn handle_task_result(
129        &self,
130        result: Result<(usize, Vec<Change>)>,
131        workflow_id: &str,
132        task_id: &str,
133        continue_on_error: bool,
134        message: &mut Message,
135    ) -> Result<()> {
136        match result {
137            Ok((status, changes)) => {
138                // Record audit trail
139                message.audit_trail.push(AuditTrail {
140                    timestamp: Utc::now(),
141                    workflow_id: Arc::from(workflow_id),
142                    task_id: Arc::from(task_id),
143                    status,
144                    changes,
145                });
146
147                // Update progress metadata for workflow chaining
148                if let Some(metadata) = message.context["metadata"].as_object_mut() {
149                    // Update existing progress or create new one
150                    if let Some(progress) = metadata.get_mut("progress") {
151                        if let Some(progress_obj) = progress.as_object_mut() {
152                            progress_obj.insert("workflow_id".to_string(), json!(workflow_id));
153                            progress_obj.insert("task_id".to_string(), json!(task_id));
154                            progress_obj.insert("status_code".to_string(), json!(status));
155                        }
156                    } else {
157                        metadata.insert(
158                            "progress".to_string(),
159                            json!({
160                                "workflow_id": workflow_id,
161                                "task_id": task_id,
162                                "status_code": status
163                            }),
164                        );
165                    }
166                }
167                message.invalidate_context_cache();
168
169                // Check status code
170                if (400..500).contains(&status) {
171                    warn!("Task {} returned client error status: {}", task_id, status);
172                } else if status >= 500 {
173                    error!("Task {} returned server error status: {}", task_id, status);
174                    if !continue_on_error {
175                        return Err(DataflowError::Task(format!(
176                            "Task {} failed with status {}",
177                            task_id, status
178                        )));
179                    }
180                }
181                Ok(())
182            }
183            Err(e) => {
184                error!("Task {} failed: {:?}", task_id, e);
185
186                // Record error in audit trail
187                message.audit_trail.push(AuditTrail {
188                    timestamp: Utc::now(),
189                    workflow_id: Arc::from(workflow_id),
190                    task_id: Arc::from(task_id),
191                    status: 500,
192                    changes: vec![],
193                });
194
195                // Add error to message
196                message.errors.push(
197                    ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
198                        .workflow_id(workflow_id)
199                        .task_id(task_id)
200                        .build(),
201                );
202
203                if !continue_on_error { Err(e) } else { Ok(()) }
204            }
205        }
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use crate::engine::compiler::LogicCompiler;
213    use serde_json::json;
214    use std::collections::HashMap;
215
216    #[tokio::test]
217    async fn test_workflow_executor_skip_condition() {
218        // Create a workflow with a false condition
219        let workflow_json = r#"{
220            "id": "test_workflow",
221            "name": "Test Workflow",
222            "condition": false,
223            "tasks": [{
224                "id": "dummy_task",
225                "name": "Dummy Task",
226                "function": {
227                    "name": "map",
228                    "input": {"mappings": []}
229                }
230            }]
231        }"#;
232
233        let mut compiler = LogicCompiler::new();
234        let mut workflow = Workflow::from_json(workflow_json).unwrap();
235
236        // Compile the workflow condition
237        let workflows = compiler.compile_workflows(vec![workflow.clone()]);
238        if let Some(compiled_workflow) = workflows.get("test_workflow") {
239            workflow = compiled_workflow.clone();
240        }
241
242        let (datalogic, logic_cache) = compiler.into_parts();
243        let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
244        let task_executor = Arc::new(TaskExecutor::new(
245            Arc::new(HashMap::new()),
246            internal_executor.clone(),
247            datalogic,
248        ));
249        let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
250
251        let mut message = Message::from_value(&json!({}));
252
253        // Execute workflow - should be skipped due to false condition
254        let executed = workflow_executor
255            .execute(&workflow, &mut message)
256            .await
257            .unwrap();
258        assert!(!executed);
259        assert_eq!(message.audit_trail.len(), 0);
260    }
261
262    #[tokio::test]
263    async fn test_workflow_executor_execute_success() {
264        // Create a workflow with a true condition
265        let workflow_json = r#"{
266            "id": "test_workflow",
267            "name": "Test Workflow",
268            "condition": true,
269            "tasks": [{
270                "id": "dummy_task",
271                "name": "Dummy Task",
272                "function": {
273                    "name": "map",
274                    "input": {"mappings": []}
275                }
276            }]
277        }"#;
278
279        let mut compiler = LogicCompiler::new();
280        let mut workflow = Workflow::from_json(workflow_json).unwrap();
281
282        // Compile the workflow
283        let workflows = compiler.compile_workflows(vec![workflow.clone()]);
284        if let Some(compiled_workflow) = workflows.get("test_workflow") {
285            workflow = compiled_workflow.clone();
286        }
287
288        let (datalogic, logic_cache) = compiler.into_parts();
289        let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
290        let task_executor = Arc::new(TaskExecutor::new(
291            Arc::new(HashMap::new()),
292            internal_executor.clone(),
293            datalogic,
294        ));
295        let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
296
297        let mut message = Message::from_value(&json!({}));
298
299        // Execute workflow - should succeed with empty task list
300        let executed = workflow_executor
301            .execute(&workflow, &mut message)
302            .await
303            .unwrap();
304        assert!(executed);
305    }
306}