Skip to main content

dataflow_rs/engine/
workflow_executor.rs

1//! # Workflow Execution Module
2//!
3//! This module handles the execution of workflows and their associated tasks.
4//! It provides a clean separation between workflow orchestration and task execution.
5
6use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::InternalExecutor;
8use crate::engine::message::{AuditTrail, Change, Message};
9use crate::engine::task_executor::TaskExecutor;
10use crate::engine::trace::{ExecutionStep, ExecutionTrace};
11use crate::engine::workflow::Workflow;
12use chrono::Utc;
13use log::{debug, error, info, warn};
14use serde_json::json;
15use std::sync::Arc;
16
17/// Handles the execution of workflows and their tasks
18///
19/// The `WorkflowExecutor` is responsible for:
20/// - Evaluating workflow conditions
21/// - Orchestrating task execution within workflows
22/// - Managing workflow-level error handling
23/// - Recording audit trails
24pub struct WorkflowExecutor {
25    /// Task executor for executing individual tasks
26    task_executor: Arc<TaskExecutor>,
27    /// Internal executor for condition evaluation
28    internal_executor: Arc<InternalExecutor>,
29}
30
31impl WorkflowExecutor {
32    /// Create a new WorkflowExecutor
33    pub fn new(task_executor: Arc<TaskExecutor>, internal_executor: Arc<InternalExecutor>) -> Self {
34        Self {
35            task_executor,
36            internal_executor,
37        }
38    }
39
40    /// Execute a workflow if its condition is met
41    ///
42    /// This method:
43    /// 1. Evaluates the workflow condition
44    /// 2. Executes tasks sequentially if condition is met
45    /// 3. Handles error recovery based on workflow configuration
46    /// 4. Updates message metadata and audit trail
47    ///
48    /// # Arguments
49    /// * `workflow` - The workflow to execute
50    /// * `message` - The message being processed
51    ///
52    /// # Returns
53    /// * `Result<bool>` - Ok(true) if workflow was executed, Ok(false) if skipped, Err on failure
54    pub async fn execute(&self, workflow: &Workflow, message: &mut Message) -> Result<bool> {
55        // Use cached context Arc for condition evaluation
56        let context_arc = message.get_context_arc();
57
58        // Evaluate workflow condition
59        let should_execute = self
60            .internal_executor
61            .evaluate_condition(workflow.condition_index, context_arc)?;
62
63        if !should_execute {
64            debug!("Skipping workflow {} - condition not met", workflow.id);
65            return Ok(false);
66        }
67
68        // Execute workflow tasks
69        match self.execute_tasks(workflow, message).await {
70            Ok(_) => {
71                info!("Successfully completed workflow: {}", workflow.id);
72                Ok(true)
73            }
74            Err(e) if workflow.continue_on_error => {
75                warn!(
76                    "Workflow {} encountered error but continuing: {:?}",
77                    workflow.id, e
78                );
79                message.errors.push(
80                    ErrorInfo::builder(
81                        "WORKFLOW_ERROR",
82                        format!("Workflow {} error: {}", workflow.id, e),
83                    )
84                    .workflow_id(&workflow.id)
85                    .build(),
86                );
87                Ok(true)
88            }
89            Err(e) => {
90                error!("Workflow {} failed: {:?}", workflow.id, e);
91                Err(e)
92            }
93        }
94    }
95
96    /// Execute a workflow with step-by-step tracing
97    ///
98    /// Similar to `execute` but records execution steps for debugging.
99    ///
100    /// # Arguments
101    /// * `workflow` - The workflow to execute
102    /// * `message` - The message being processed
103    /// * `trace` - The execution trace to record steps to
104    ///
105    /// # Returns
106    /// * `Result<bool>` - Ok(true) if workflow was executed, Ok(false) if skipped, Err on failure
107    pub async fn execute_with_trace(
108        &self,
109        workflow: &Workflow,
110        message: &mut Message,
111        trace: &mut ExecutionTrace,
112    ) -> Result<bool> {
113        // Use cached context Arc for condition evaluation
114        let context_arc = message.get_context_arc();
115
116        // Evaluate workflow condition
117        let should_execute = self
118            .internal_executor
119            .evaluate_condition(workflow.condition_index, context_arc)?;
120
121        if !should_execute {
122            debug!("Skipping workflow {} - condition not met", workflow.id);
123            // Record skipped workflow step
124            trace.add_step(ExecutionStep::workflow_skipped(&workflow.id));
125            return Ok(false);
126        }
127
128        // Execute workflow tasks with trace collection
129        match self
130            .execute_tasks_with_trace(workflow, message, trace)
131            .await
132        {
133            Ok(_) => {
134                info!("Successfully completed workflow: {}", workflow.id);
135                Ok(true)
136            }
137            Err(e) if workflow.continue_on_error => {
138                warn!(
139                    "Workflow {} encountered error but continuing: {:?}",
140                    workflow.id, e
141                );
142                message.errors.push(
143                    ErrorInfo::builder(
144                        "WORKFLOW_ERROR",
145                        format!("Workflow {} error: {}", workflow.id, e),
146                    )
147                    .workflow_id(&workflow.id)
148                    .build(),
149                );
150                Ok(true)
151            }
152            Err(e) => {
153                error!("Workflow {} failed: {:?}", workflow.id, e);
154                Err(e)
155            }
156        }
157    }
158
159    /// Execute all tasks in a workflow
160    async fn execute_tasks(&self, workflow: &Workflow, message: &mut Message) -> Result<()> {
161        for task in &workflow.tasks {
162            // Use cached context Arc - it will be fresh if previous task modified it
163            let context_arc = message.get_context_arc();
164
165            // Evaluate task condition
166            let should_execute = self
167                .internal_executor
168                .evaluate_condition(task.condition_index, context_arc)?;
169
170            if !should_execute {
171                debug!("Skipping task {} - condition not met", task.id);
172                continue;
173            }
174
175            // Execute the task
176            let result = self.task_executor.execute(task, message).await;
177
178            // Handle task result
179            self.handle_task_result(
180                result,
181                &workflow.id,
182                &task.id,
183                task.continue_on_error,
184                message,
185            )?;
186        }
187
188        Ok(())
189    }
190
191    /// Execute all tasks in a workflow with trace collection
192    async fn execute_tasks_with_trace(
193        &self,
194        workflow: &Workflow,
195        message: &mut Message,
196        trace: &mut ExecutionTrace,
197    ) -> Result<()> {
198        for task in &workflow.tasks {
199            // Use cached context Arc - it will be fresh if previous task modified it
200            let context_arc = message.get_context_arc();
201
202            // Evaluate task condition
203            let should_execute = self
204                .internal_executor
205                .evaluate_condition(task.condition_index, context_arc)?;
206
207            if !should_execute {
208                debug!("Skipping task {} - condition not met", task.id);
209                // Record skipped task step
210                trace.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
211                continue;
212            }
213
214            // Execute the task with trace support
215            let result = self.task_executor.execute_with_trace(task, message).await;
216
217            // Extract mapping_contexts before passing result to handle_task_result
218            let mapping_contexts = match &result {
219                Ok((_, _, contexts)) => contexts.clone(),
220                Err(_) => None,
221            };
222
223            // Convert to the standard result format for handle_task_result
224            let standard_result = result.map(|(status, changes, _)| (status, changes));
225
226            // Handle task result
227            self.handle_task_result(
228                standard_result,
229                &workflow.id,
230                &task.id,
231                task.continue_on_error,
232                message,
233            )?;
234
235            // Record executed step with message snapshot and optional mapping contexts
236            let mut step = ExecutionStep::executed(&workflow.id, &task.id, message);
237            if let Some(contexts) = mapping_contexts {
238                step = step.with_mapping_contexts(contexts);
239            }
240            trace.add_step(step);
241        }
242
243        Ok(())
244    }
245
246    /// Handle the result of a task execution
247    fn handle_task_result(
248        &self,
249        result: Result<(usize, Vec<Change>)>,
250        workflow_id: &str,
251        task_id: &str,
252        continue_on_error: bool,
253        message: &mut Message,
254    ) -> Result<()> {
255        match result {
256            Ok((status, changes)) => {
257                // Record audit trail
258                message.audit_trail.push(AuditTrail {
259                    timestamp: Utc::now(),
260                    workflow_id: Arc::from(workflow_id),
261                    task_id: Arc::from(task_id),
262                    status,
263                    changes,
264                });
265
266                // Update progress metadata for workflow chaining
267                if let Some(metadata) = message.context["metadata"].as_object_mut() {
268                    // Update existing progress or create new one
269                    if let Some(progress) = metadata.get_mut("progress") {
270                        if let Some(progress_obj) = progress.as_object_mut() {
271                            progress_obj.insert("workflow_id".to_string(), json!(workflow_id));
272                            progress_obj.insert("task_id".to_string(), json!(task_id));
273                            progress_obj.insert("status_code".to_string(), json!(status));
274                        }
275                    } else {
276                        metadata.insert(
277                            "progress".to_string(),
278                            json!({
279                                "workflow_id": workflow_id,
280                                "task_id": task_id,
281                                "status_code": status
282                            }),
283                        );
284                    }
285                }
286                message.invalidate_context_cache();
287
288                // Check status code
289                if (400..500).contains(&status) {
290                    warn!("Task {} returned client error status: {}", task_id, status);
291                } else if status >= 500 {
292                    error!("Task {} returned server error status: {}", task_id, status);
293                    if !continue_on_error {
294                        return Err(DataflowError::Task(format!(
295                            "Task {} failed with status {}",
296                            task_id, status
297                        )));
298                    }
299                }
300                Ok(())
301            }
302            Err(e) => {
303                error!("Task {} failed: {:?}", task_id, e);
304
305                // Record error in audit trail
306                message.audit_trail.push(AuditTrail {
307                    timestamp: Utc::now(),
308                    workflow_id: Arc::from(workflow_id),
309                    task_id: Arc::from(task_id),
310                    status: 500,
311                    changes: vec![],
312                });
313
314                // Add error to message
315                message.errors.push(
316                    ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
317                        .workflow_id(workflow_id)
318                        .task_id(task_id)
319                        .build(),
320                );
321
322                if !continue_on_error { Err(e) } else { Ok(()) }
323            }
324        }
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use crate::engine::compiler::LogicCompiler;
332    use serde_json::json;
333    use std::collections::HashMap;
334
335    #[tokio::test]
336    async fn test_workflow_executor_skip_condition() {
337        // Create a workflow with a false condition
338        let workflow_json = r#"{
339            "id": "test_workflow",
340            "name": "Test Workflow",
341            "condition": false,
342            "tasks": [{
343                "id": "dummy_task",
344                "name": "Dummy Task",
345                "function": {
346                    "name": "map",
347                    "input": {"mappings": []}
348                }
349            }]
350        }"#;
351
352        let mut compiler = LogicCompiler::new();
353        let mut workflow = Workflow::from_json(workflow_json).unwrap();
354
355        // Compile the workflow condition
356        let workflows = compiler.compile_workflows(vec![workflow.clone()]);
357        if let Some(compiled_workflow) = workflows.get("test_workflow") {
358            workflow = compiled_workflow.clone();
359        }
360
361        let (datalogic, logic_cache) = compiler.into_parts();
362        let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
363        let task_executor = Arc::new(TaskExecutor::new(
364            Arc::new(HashMap::new()),
365            internal_executor.clone(),
366            datalogic,
367        ));
368        let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
369
370        let mut message = Message::from_value(&json!({}));
371
372        // Execute workflow - should be skipped due to false condition
373        let executed = workflow_executor
374            .execute(&workflow, &mut message)
375            .await
376            .unwrap();
377        assert!(!executed);
378        assert_eq!(message.audit_trail.len(), 0);
379    }
380
381    #[tokio::test]
382    async fn test_workflow_executor_execute_success() {
383        // Create a workflow with a true condition
384        let workflow_json = r#"{
385            "id": "test_workflow",
386            "name": "Test Workflow",
387            "condition": true,
388            "tasks": [{
389                "id": "dummy_task",
390                "name": "Dummy Task",
391                "function": {
392                    "name": "map",
393                    "input": {"mappings": []}
394                }
395            }]
396        }"#;
397
398        let mut compiler = LogicCompiler::new();
399        let mut workflow = Workflow::from_json(workflow_json).unwrap();
400
401        // Compile the workflow
402        let workflows = compiler.compile_workflows(vec![workflow.clone()]);
403        if let Some(compiled_workflow) = workflows.get("test_workflow") {
404            workflow = compiled_workflow.clone();
405        }
406
407        let (datalogic, logic_cache) = compiler.into_parts();
408        let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
409        let task_executor = Arc::new(TaskExecutor::new(
410            Arc::new(HashMap::new()),
411            internal_executor.clone(),
412            datalogic,
413        ));
414        let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
415
416        let mut message = Message::from_value(&json!({}));
417
418        // Execute workflow - should succeed with empty task list
419        let executed = workflow_executor
420            .execute(&workflow, &mut message)
421            .await
422            .unwrap();
423        assert!(executed);
424    }
425}