Skip to main content

dataflow_rs/engine/
task_executor.rs

1//! # Task Execution Module
2//!
3//! This module handles the execution of individual tasks within workflows.
4//! It provides a clean separation of concerns by isolating task execution logic
5//! from the main engine orchestration.
6
7use crate::engine::error::{DataflowError, Result};
8use crate::engine::executor::InternalExecutor;
9use crate::engine::functions::{AsyncFunctionHandler, FunctionConfig};
10use crate::engine::message::{Change, Message};
11use crate::engine::task::Task;
12use datalogic_rs::DataLogic;
13use log::{debug, error};
14use serde_json::Value;
15use std::collections::HashMap;
16use std::sync::Arc;
17
18/// Handles the execution of tasks with their associated functions
19///
20/// The `TaskExecutor` is responsible for:
21/// - Executing built-in functions (map, validation)
22/// - Executing custom async function handlers
23/// - Managing function registry
24/// - Handling task-level error recovery
25pub struct TaskExecutor {
26    /// Registry of async function handlers
27    task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
28    /// Internal executor for built-in functions
29    executor: Arc<InternalExecutor>,
30    /// Shared DataLogic instance
31    datalogic: Arc<DataLogic>,
32}
33
34impl TaskExecutor {
35    /// Create a new TaskExecutor
36    pub fn new(
37        task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
38        executor: Arc<InternalExecutor>,
39        datalogic: Arc<DataLogic>,
40    ) -> Self {
41        Self {
42            task_functions,
43            executor,
44            datalogic,
45        }
46    }
47
48    /// Execute a single task
49    ///
50    /// This method:
51    /// 1. Determines the function type (built-in or custom)
52    /// 2. Executes the appropriate handler
53    /// 3. Returns the status code and changes for audit trail
54    ///
55    /// # Arguments
56    /// * `task` - The task to execute
57    /// * `message` - The message being processed
58    ///
59    /// # Returns
60    /// * `Result<(usize, Vec<Change>)>` - Status code and changes, or error
61    pub async fn execute(
62        &self,
63        task: &Task,
64        message: &mut Message,
65    ) -> Result<(usize, Vec<Change>)> {
66        debug!(
67            "Executing task: {} with function: {:?}",
68            task.id,
69            task.function.function_name()
70        );
71
72        match &task.function {
73            FunctionConfig::Map { input, .. } => {
74                // Execute built-in map function
75                self.executor.execute_map(message, input)
76            }
77            FunctionConfig::Validation { input, .. } => {
78                // Execute built-in validation function
79                self.executor.execute_validation(message, input)
80            }
81            FunctionConfig::ParseJson { input, .. } => {
82                // Execute built-in parse_json function
83                crate::engine::functions::parse::execute_parse_json(message, input)
84            }
85            FunctionConfig::ParseXml { input, .. } => {
86                // Execute built-in parse_xml function
87                crate::engine::functions::parse::execute_parse_xml(message, input)
88            }
89            FunctionConfig::PublishJson { input, .. } => {
90                // Execute built-in publish_json function
91                crate::engine::functions::publish::execute_publish_json(message, input)
92            }
93            FunctionConfig::PublishXml { input, .. } => {
94                // Execute built-in publish_xml function
95                crate::engine::functions::publish::execute_publish_xml(message, input)
96            }
97            FunctionConfig::Custom { name, .. } => {
98                // Execute custom function handler
99                self.execute_custom_function(name, message, &task.function)
100                    .await
101            }
102        }
103    }
104
105    /// Execute a single task with trace support
106    ///
107    /// Same as `execute()` but for map tasks, captures per-mapping context snapshots.
108    /// Returns `Option<Vec<Value>>` which is `Some` only for map tasks.
109    pub async fn execute_with_trace(
110        &self,
111        task: &Task,
112        message: &mut Message,
113    ) -> Result<(usize, Vec<Change>, Option<Vec<Value>>)> {
114        debug!(
115            "Executing task (trace): {} with function: {:?}",
116            task.id,
117            task.function.function_name()
118        );
119
120        match &task.function {
121            FunctionConfig::Map { input, .. } => {
122                let (status, changes, contexts) =
123                    self.executor.execute_map_with_trace(message, input)?;
124                Ok((status, changes, Some(contexts)))
125            }
126            _ => {
127                let (status, changes) = self.execute(task, message).await?;
128                Ok((status, changes, None))
129            }
130        }
131    }
132
133    /// Execute a custom function handler
134    async fn execute_custom_function(
135        &self,
136        name: &str,
137        message: &mut Message,
138        config: &FunctionConfig,
139    ) -> Result<(usize, Vec<Change>)> {
140        if let Some(handler) = self.task_functions.get(name) {
141            handler
142                .execute(message, config, Arc::clone(&self.datalogic))
143                .await
144        } else {
145            error!("Function handler not found: {}", name);
146            Err(DataflowError::FunctionNotFound(name.to_string()))
147        }
148    }
149
150    /// Check if a function handler exists
151    pub fn has_function(&self, name: &str) -> bool {
152        match name {
153            "map" | "validation" | "validate" | "parse_json" | "parse_xml" | "publish_json"
154            | "publish_xml" => true,
155            custom_name => self.task_functions.contains_key(custom_name),
156        }
157    }
158
159    /// Get the count of registered custom functions
160    pub fn custom_function_count(&self) -> usize {
161        self.task_functions.len()
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use crate::engine::compiler::LogicCompiler;
169
170    #[test]
171    fn test_has_function() {
172        let compiler = LogicCompiler::new();
173        let (datalogic, logic_cache) = compiler.into_parts();
174        let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
175        let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), executor, datalogic);
176
177        // Test built-in functions
178        assert!(task_executor.has_function("map"));
179        assert!(task_executor.has_function("validation"));
180        assert!(task_executor.has_function("validate"));
181
182        // Test non-existent function
183        assert!(!task_executor.has_function("nonexistent"));
184    }
185
186    #[test]
187    fn test_custom_function_count() {
188        let mut custom_functions = HashMap::new();
189        // Add a dummy custom function (we'll use a mock for testing)
190        custom_functions.insert(
191            "custom_test".to_string(),
192            Box::new(MockAsyncFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>,
193        );
194
195        let compiler = LogicCompiler::new();
196        let (datalogic, logic_cache) = compiler.into_parts();
197        let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
198        let task_executor = TaskExecutor::new(Arc::new(custom_functions), executor, datalogic);
199
200        assert_eq!(task_executor.custom_function_count(), 1);
201    }
202
203    // Mock async function for testing
204    struct MockAsyncFunction;
205
206    #[async_trait::async_trait]
207    impl AsyncFunctionHandler for MockAsyncFunction {
208        async fn execute(
209            &self,
210            _message: &mut Message,
211            _config: &FunctionConfig,
212            _datalogic: Arc<DataLogic>,
213        ) -> Result<(usize, Vec<Change>)> {
214            Ok((200, vec![]))
215        }
216    }
217}