dataflow_rs/engine/
task_executor.rs

1//! # Task Execution Module
2//!
3//! This module handles the execution of individual tasks within workflows.
4//! It provides a clean separation of concerns by isolating task execution logic
5//! from the main engine orchestration.
6
7use crate::engine::error::{DataflowError, Result};
8use crate::engine::executor::InternalExecutor;
9use crate::engine::functions::{AsyncFunctionHandler, FunctionConfig};
10use crate::engine::message::{Change, Message};
11use crate::engine::task::Task;
12use datalogic_rs::DataLogic;
13use log::{debug, error};
14use std::collections::HashMap;
15use std::sync::Arc;
16
17/// Handles the execution of tasks with their associated functions
18///
19/// The `TaskExecutor` is responsible for:
20/// - Executing built-in functions (map, validation)
21/// - Executing custom async function handlers
22/// - Managing function registry
23/// - Handling task-level error recovery
24pub struct TaskExecutor {
25    /// Registry of async function handlers
26    task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
27    /// Internal executor for built-in functions
28    executor: Arc<InternalExecutor>,
29    /// Shared DataLogic instance
30    datalogic: Arc<DataLogic>,
31}
32
33impl TaskExecutor {
34    /// Create a new TaskExecutor
35    pub fn new(
36        task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
37        executor: Arc<InternalExecutor>,
38        datalogic: Arc<DataLogic>,
39    ) -> Self {
40        Self {
41            task_functions,
42            executor,
43            datalogic,
44        }
45    }
46
47    /// Execute a single task
48    ///
49    /// This method:
50    /// 1. Determines the function type (built-in or custom)
51    /// 2. Executes the appropriate handler
52    /// 3. Returns the status code and changes for audit trail
53    ///
54    /// # Arguments
55    /// * `task` - The task to execute
56    /// * `message` - The message being processed
57    ///
58    /// # Returns
59    /// * `Result<(usize, Vec<Change>)>` - Status code and changes, or error
60    pub async fn execute(
61        &self,
62        task: &Task,
63        message: &mut Message,
64    ) -> Result<(usize, Vec<Change>)> {
65        debug!(
66            "Executing task: {} with function: {:?}",
67            task.id,
68            task.function.function_name()
69        );
70
71        match &task.function {
72            FunctionConfig::Map { input, .. } => {
73                // Execute built-in map function
74                self.executor.execute_map(message, input)
75            }
76            FunctionConfig::Validation { input, .. } => {
77                // Execute built-in validation function
78                self.executor.execute_validation(message, input)
79            }
80            FunctionConfig::Custom { name, .. } => {
81                // Execute custom function handler
82                self.execute_custom_function(name, message, &task.function)
83                    .await
84            }
85        }
86    }
87
88    /// Execute a custom function handler
89    async fn execute_custom_function(
90        &self,
91        name: &str,
92        message: &mut Message,
93        config: &FunctionConfig,
94    ) -> Result<(usize, Vec<Change>)> {
95        if let Some(handler) = self.task_functions.get(name) {
96            handler
97                .execute(message, config, Arc::clone(&self.datalogic))
98                .await
99        } else {
100            error!("Function handler not found: {}", name);
101            Err(DataflowError::FunctionNotFound(name.to_string()))
102        }
103    }
104
105    /// Check if a function handler exists
106    pub fn has_function(&self, name: &str) -> bool {
107        match name {
108            "map" | "validation" | "validate" => true,
109            custom_name => self.task_functions.contains_key(custom_name),
110        }
111    }
112
113    /// Get the count of registered custom functions
114    pub fn custom_function_count(&self) -> usize {
115        self.task_functions.len()
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use crate::engine::compiler::LogicCompiler;
123
124    #[test]
125    fn test_has_function() {
126        let compiler = LogicCompiler::new();
127        let (datalogic, logic_cache) = compiler.into_parts();
128        let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
129        let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), executor, datalogic);
130
131        // Test built-in functions
132        assert!(task_executor.has_function("map"));
133        assert!(task_executor.has_function("validation"));
134        assert!(task_executor.has_function("validate"));
135
136        // Test non-existent function
137        assert!(!task_executor.has_function("nonexistent"));
138    }
139
140    #[test]
141    fn test_custom_function_count() {
142        let mut custom_functions = HashMap::new();
143        // Add a dummy custom function (we'll use a mock for testing)
144        custom_functions.insert(
145            "custom_test".to_string(),
146            Box::new(MockAsyncFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>,
147        );
148
149        let compiler = LogicCompiler::new();
150        let (datalogic, logic_cache) = compiler.into_parts();
151        let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
152        let task_executor = TaskExecutor::new(Arc::new(custom_functions), executor, datalogic);
153
154        assert_eq!(task_executor.custom_function_count(), 1);
155    }
156
157    // Mock async function for testing
158    struct MockAsyncFunction;
159
160    #[async_trait::async_trait]
161    impl AsyncFunctionHandler for MockAsyncFunction {
162        async fn execute(
163            &self,
164            _message: &mut Message,
165            _config: &FunctionConfig,
166            _datalogic: Arc<DataLogic>,
167        ) -> Result<(usize, Vec<Change>)> {
168            Ok((200, vec![]))
169        }
170    }
171}