Skip to main content

dataflow_rs/engine/
workflow_executor.rs

1//! # Workflow Execution Module
2//!
3//! This module handles the execution of workflows and their associated tasks.
4//! It provides a clean separation between workflow orchestration and task execution.
5
6use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::InternalExecutor;
8use crate::engine::message::{AuditTrail, Change, Message};
9use crate::engine::task_executor::TaskExecutor;
10use crate::engine::trace::{ExecutionStep, ExecutionTrace};
11use crate::engine::workflow::Workflow;
12use chrono::Utc;
13use log::{debug, error, info, warn};
14use serde_json::json;
15use std::sync::Arc;
16
17/// Handles the execution of workflows and their tasks
18///
19/// The `WorkflowExecutor` is responsible for:
20/// - Evaluating workflow conditions
21/// - Orchestrating task execution within workflows
22/// - Managing workflow-level error handling
23/// - Recording audit trails
24pub struct WorkflowExecutor {
25    /// Task executor for executing individual tasks
26    task_executor: Arc<TaskExecutor>,
27    /// Internal executor for condition evaluation
28    internal_executor: Arc<InternalExecutor>,
29}
30
31impl WorkflowExecutor {
32    /// Create a new WorkflowExecutor
33    pub fn new(task_executor: Arc<TaskExecutor>, internal_executor: Arc<InternalExecutor>) -> Self {
34        Self {
35            task_executor,
36            internal_executor,
37        }
38    }
39
40    /// Execute a workflow if its condition is met
41    ///
42    /// This method:
43    /// 1. Evaluates the workflow condition
44    /// 2. Executes tasks sequentially if condition is met
45    /// 3. Handles error recovery based on workflow configuration
46    /// 4. Updates message metadata and audit trail
47    ///
48    /// # Arguments
49    /// * `workflow` - The workflow to execute
50    /// * `message` - The message being processed
51    ///
52    /// # Returns
53    /// * `Result<bool>` - Ok(true) if workflow was executed, Ok(false) if skipped, Err on failure
54    pub async fn execute(&self, workflow: &Workflow, message: &mut Message) -> Result<bool> {
55        // Use cached context Arc for condition evaluation
56        let context_arc = message.get_context_arc();
57
58        // Evaluate workflow condition
59        let should_execute = self
60            .internal_executor
61            .evaluate_condition(workflow.condition_index, context_arc)?;
62
63        if !should_execute {
64            debug!("Skipping workflow {} - condition not met", workflow.id);
65            return Ok(false);
66        }
67
68        // Execute workflow tasks
69        match self.execute_tasks(workflow, message).await {
70            Ok(_) => {
71                info!("Successfully completed workflow: {}", workflow.id);
72                Ok(true)
73            }
74            Err(e) if workflow.continue_on_error => {
75                warn!(
76                    "Workflow {} encountered error but continuing: {:?}",
77                    workflow.id, e
78                );
79                message.errors.push(
80                    ErrorInfo::builder(
81                        "WORKFLOW_ERROR",
82                        format!("Workflow {} error: {}", workflow.id, e),
83                    )
84                    .workflow_id(&workflow.id)
85                    .build(),
86                );
87                Ok(true)
88            }
89            Err(e) => {
90                error!("Workflow {} failed: {:?}", workflow.id, e);
91                Err(e)
92            }
93        }
94    }
95
96    /// Execute a workflow with step-by-step tracing
97    ///
98    /// Similar to `execute` but records execution steps for debugging.
99    ///
100    /// # Arguments
101    /// * `workflow` - The workflow to execute
102    /// * `message` - The message being processed
103    /// * `trace` - The execution trace to record steps to
104    ///
105    /// # Returns
106    /// * `Result<bool>` - Ok(true) if workflow was executed, Ok(false) if skipped, Err on failure
107    pub async fn execute_with_trace(
108        &self,
109        workflow: &Workflow,
110        message: &mut Message,
111        trace: &mut ExecutionTrace,
112    ) -> Result<bool> {
113        // Use cached context Arc for condition evaluation
114        let context_arc = message.get_context_arc();
115
116        // Evaluate workflow condition
117        let should_execute = self
118            .internal_executor
119            .evaluate_condition(workflow.condition_index, context_arc)?;
120
121        if !should_execute {
122            debug!("Skipping workflow {} - condition not met", workflow.id);
123            // Record skipped workflow step
124            trace.add_step(ExecutionStep::workflow_skipped(&workflow.id));
125            return Ok(false);
126        }
127
128        // Execute workflow tasks with trace collection
129        match self
130            .execute_tasks_with_trace(workflow, message, trace)
131            .await
132        {
133            Ok(_) => {
134                info!("Successfully completed workflow: {}", workflow.id);
135                Ok(true)
136            }
137            Err(e) if workflow.continue_on_error => {
138                warn!(
139                    "Workflow {} encountered error but continuing: {:?}",
140                    workflow.id, e
141                );
142                message.errors.push(
143                    ErrorInfo::builder(
144                        "WORKFLOW_ERROR",
145                        format!("Workflow {} error: {}", workflow.id, e),
146                    )
147                    .workflow_id(&workflow.id)
148                    .build(),
149                );
150                Ok(true)
151            }
152            Err(e) => {
153                error!("Workflow {} failed: {:?}", workflow.id, e);
154                Err(e)
155            }
156        }
157    }
158
159    /// Execute all tasks in a workflow
160    async fn execute_tasks(&self, workflow: &Workflow, message: &mut Message) -> Result<()> {
161        for task in &workflow.tasks {
162            // Use cached context Arc - it will be fresh if previous task modified it
163            let context_arc = message.get_context_arc();
164
165            // Evaluate task condition
166            let should_execute = self
167                .internal_executor
168                .evaluate_condition(task.condition_index, context_arc)?;
169
170            if !should_execute {
171                debug!("Skipping task {} - condition not met", task.id);
172                continue;
173            }
174
175            // Execute the task
176            let result = self.task_executor.execute(task, message).await;
177
178            // Handle task result
179            self.handle_task_result(
180                result,
181                &workflow.id,
182                &task.id,
183                task.continue_on_error,
184                message,
185            )?;
186        }
187
188        Ok(())
189    }
190
191    /// Execute all tasks in a workflow with trace collection
192    async fn execute_tasks_with_trace(
193        &self,
194        workflow: &Workflow,
195        message: &mut Message,
196        trace: &mut ExecutionTrace,
197    ) -> Result<()> {
198        for task in &workflow.tasks {
199            // Use cached context Arc - it will be fresh if previous task modified it
200            let context_arc = message.get_context_arc();
201
202            // Evaluate task condition
203            let should_execute = self
204                .internal_executor
205                .evaluate_condition(task.condition_index, context_arc)?;
206
207            if !should_execute {
208                debug!("Skipping task {} - condition not met", task.id);
209                // Record skipped task step
210                trace.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
211                continue;
212            }
213
214            // Execute the task
215            let result = self.task_executor.execute(task, message).await;
216
217            // Handle task result
218            self.handle_task_result(
219                result,
220                &workflow.id,
221                &task.id,
222                task.continue_on_error,
223                message,
224            )?;
225
226            // Record executed step with message snapshot
227            trace.add_step(ExecutionStep::executed(&workflow.id, &task.id, message));
228        }
229
230        Ok(())
231    }
232
233    /// Handle the result of a task execution
234    fn handle_task_result(
235        &self,
236        result: Result<(usize, Vec<Change>)>,
237        workflow_id: &str,
238        task_id: &str,
239        continue_on_error: bool,
240        message: &mut Message,
241    ) -> Result<()> {
242        match result {
243            Ok((status, changes)) => {
244                // Record audit trail
245                message.audit_trail.push(AuditTrail {
246                    timestamp: Utc::now(),
247                    workflow_id: Arc::from(workflow_id),
248                    task_id: Arc::from(task_id),
249                    status,
250                    changes,
251                });
252
253                // Update progress metadata for workflow chaining
254                if let Some(metadata) = message.context["metadata"].as_object_mut() {
255                    // Update existing progress or create new one
256                    if let Some(progress) = metadata.get_mut("progress") {
257                        if let Some(progress_obj) = progress.as_object_mut() {
258                            progress_obj.insert("workflow_id".to_string(), json!(workflow_id));
259                            progress_obj.insert("task_id".to_string(), json!(task_id));
260                            progress_obj.insert("status_code".to_string(), json!(status));
261                        }
262                    } else {
263                        metadata.insert(
264                            "progress".to_string(),
265                            json!({
266                                "workflow_id": workflow_id,
267                                "task_id": task_id,
268                                "status_code": status
269                            }),
270                        );
271                    }
272                }
273                message.invalidate_context_cache();
274
275                // Check status code
276                if (400..500).contains(&status) {
277                    warn!("Task {} returned client error status: {}", task_id, status);
278                } else if status >= 500 {
279                    error!("Task {} returned server error status: {}", task_id, status);
280                    if !continue_on_error {
281                        return Err(DataflowError::Task(format!(
282                            "Task {} failed with status {}",
283                            task_id, status
284                        )));
285                    }
286                }
287                Ok(())
288            }
289            Err(e) => {
290                error!("Task {} failed: {:?}", task_id, e);
291
292                // Record error in audit trail
293                message.audit_trail.push(AuditTrail {
294                    timestamp: Utc::now(),
295                    workflow_id: Arc::from(workflow_id),
296                    task_id: Arc::from(task_id),
297                    status: 500,
298                    changes: vec![],
299                });
300
301                // Add error to message
302                message.errors.push(
303                    ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
304                        .workflow_id(workflow_id)
305                        .task_id(task_id)
306                        .build(),
307                );
308
309                if !continue_on_error { Err(e) } else { Ok(()) }
310            }
311        }
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use crate::engine::compiler::LogicCompiler;
319    use serde_json::json;
320    use std::collections::HashMap;
321
322    #[tokio::test]
323    async fn test_workflow_executor_skip_condition() {
324        // Create a workflow with a false condition
325        let workflow_json = r#"{
326            "id": "test_workflow",
327            "name": "Test Workflow",
328            "condition": false,
329            "tasks": [{
330                "id": "dummy_task",
331                "name": "Dummy Task",
332                "function": {
333                    "name": "map",
334                    "input": {"mappings": []}
335                }
336            }]
337        }"#;
338
339        let mut compiler = LogicCompiler::new();
340        let mut workflow = Workflow::from_json(workflow_json).unwrap();
341
342        // Compile the workflow condition
343        let workflows = compiler.compile_workflows(vec![workflow.clone()]);
344        if let Some(compiled_workflow) = workflows.get("test_workflow") {
345            workflow = compiled_workflow.clone();
346        }
347
348        let (datalogic, logic_cache) = compiler.into_parts();
349        let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
350        let task_executor = Arc::new(TaskExecutor::new(
351            Arc::new(HashMap::new()),
352            internal_executor.clone(),
353            datalogic,
354        ));
355        let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
356
357        let mut message = Message::from_value(&json!({}));
358
359        // Execute workflow - should be skipped due to false condition
360        let executed = workflow_executor
361            .execute(&workflow, &mut message)
362            .await
363            .unwrap();
364        assert!(!executed);
365        assert_eq!(message.audit_trail.len(), 0);
366    }
367
368    #[tokio::test]
369    async fn test_workflow_executor_execute_success() {
370        // Create a workflow with a true condition
371        let workflow_json = r#"{
372            "id": "test_workflow",
373            "name": "Test Workflow",
374            "condition": true,
375            "tasks": [{
376                "id": "dummy_task",
377                "name": "Dummy Task",
378                "function": {
379                    "name": "map",
380                    "input": {"mappings": []}
381                }
382            }]
383        }"#;
384
385        let mut compiler = LogicCompiler::new();
386        let mut workflow = Workflow::from_json(workflow_json).unwrap();
387
388        // Compile the workflow
389        let workflows = compiler.compile_workflows(vec![workflow.clone()]);
390        if let Some(compiled_workflow) = workflows.get("test_workflow") {
391            workflow = compiled_workflow.clone();
392        }
393
394        let (datalogic, logic_cache) = compiler.into_parts();
395        let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
396        let task_executor = Arc::new(TaskExecutor::new(
397            Arc::new(HashMap::new()),
398            internal_executor.clone(),
399            datalogic,
400        ));
401        let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
402
403        let mut message = Message::from_value(&json!({}));
404
405        // Execute workflow - should succeed with empty task list
406        let executed = workflow_executor
407            .execute(&workflow, &mut message)
408            .await
409            .unwrap();
410        assert!(executed);
411    }
412}