Skip to main content

dataflow_rs/engine/
task_executor.rs

1//! # Task Execution Module
2//!
3//! This module handles the execution of individual tasks within workflows.
4//! It provides a clean separation of concerns by isolating task execution logic
5//! from the main engine orchestration.
6
7use crate::engine::error::{DataflowError, Result};
8use crate::engine::executor::InternalExecutor;
9use crate::engine::functions::{AsyncFunctionHandler, FunctionConfig};
10use crate::engine::message::{Change, Message};
11use crate::engine::task::Task;
12use datalogic_rs::DataLogic;
13use log::{debug, error};
14use serde_json::Value;
15use std::collections::HashMap;
16use std::sync::Arc;
17
18/// Handles the execution of tasks with their associated functions
19///
20/// The `TaskExecutor` is responsible for:
21/// - Executing built-in functions (map, validation)
22/// - Executing custom async function handlers
23/// - Managing function registry
24/// - Handling task-level error recovery
25pub struct TaskExecutor {
26    /// Registry of async function handlers
27    task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
28    /// Internal executor for built-in functions
29    executor: Arc<InternalExecutor>,
30    /// Shared DataLogic instance
31    datalogic: Arc<DataLogic>,
32}
33
34impl TaskExecutor {
35    /// Create a new TaskExecutor
36    pub fn new(
37        task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
38        executor: Arc<InternalExecutor>,
39        datalogic: Arc<DataLogic>,
40    ) -> Self {
41        Self {
42            task_functions,
43            executor,
44            datalogic,
45        }
46    }
47
48    /// Execute a single task
49    ///
50    /// This method:
51    /// 1. Determines the function type (built-in or custom)
52    /// 2. Executes the appropriate handler
53    /// 3. Returns the status code and changes for audit trail
54    ///
55    /// # Arguments
56    /// * `task` - The task to execute
57    /// * `message` - The message being processed
58    ///
59    /// # Returns
60    /// * `Result<(usize, Vec<Change>)>` - Status code and changes, or error
61    pub async fn execute(
62        &self,
63        task: &Task,
64        message: &mut Message,
65    ) -> Result<(usize, Vec<Change>)> {
66        debug!(
67            "Executing task: {} with function: {:?}",
68            task.id,
69            task.function.function_name()
70        );
71
72        match &task.function {
73            FunctionConfig::Map { input, .. } => {
74                // Execute built-in map function
75                self.executor.execute_map(message, input)
76            }
77            FunctionConfig::Validation { input, .. } => {
78                // Execute built-in validation function
79                self.executor.execute_validation(message, input)
80            }
81            FunctionConfig::ParseJson { input, .. } => {
82                // Execute built-in parse_json function
83                crate::engine::functions::parse::execute_parse_json(message, input)
84            }
85            FunctionConfig::ParseXml { input, .. } => {
86                // Execute built-in parse_xml function
87                crate::engine::functions::parse::execute_parse_xml(message, input)
88            }
89            FunctionConfig::PublishJson { input, .. } => {
90                // Execute built-in publish_json function
91                crate::engine::functions::publish::execute_publish_json(message, input)
92            }
93            FunctionConfig::PublishXml { input, .. } => {
94                // Execute built-in publish_xml function
95                crate::engine::functions::publish::execute_publish_xml(message, input)
96            }
97            FunctionConfig::Filter { input, .. } => {
98                // Execute built-in filter function
99                self.executor.execute_filter(message, input)
100            }
101            FunctionConfig::Log { input, .. } => {
102                // Execute built-in log function
103                self.executor.execute_log(message, input)
104            }
105            FunctionConfig::HttpCall { .. } => {
106                // Route to async handler registered as "http_call"
107                self.execute_custom_function("http_call", message, &task.function)
108                    .await
109            }
110            FunctionConfig::Enrich { .. } => {
111                // Route to async handler registered as "enrich"
112                self.execute_custom_function("enrich", message, &task.function)
113                    .await
114            }
115            FunctionConfig::PublishKafka { .. } => {
116                // Route to async handler registered as "publish_kafka"
117                self.execute_custom_function("publish_kafka", message, &task.function)
118                    .await
119            }
120            FunctionConfig::Custom { name, .. } => {
121                // Execute custom function handler
122                self.execute_custom_function(name, message, &task.function)
123                    .await
124            }
125        }
126    }
127
128    /// Execute a single task with trace support
129    ///
130    /// Same as `execute()` but for map tasks, captures per-mapping context snapshots.
131    /// Returns `Option<Vec<Value>>` which is `Some` only for map tasks.
132    pub async fn execute_with_trace(
133        &self,
134        task: &Task,
135        message: &mut Message,
136    ) -> Result<(usize, Vec<Change>, Option<Vec<Value>>)> {
137        debug!(
138            "Executing task (trace): {} with function: {:?}",
139            task.id,
140            task.function.function_name()
141        );
142
143        match &task.function {
144            FunctionConfig::Map { input, .. } => {
145                let (status, changes, contexts) =
146                    self.executor.execute_map_with_trace(message, input)?;
147                Ok((status, changes, Some(contexts)))
148            }
149            _ => {
150                let (status, changes) = self.execute(task, message).await?;
151                Ok((status, changes, None))
152            }
153        }
154    }
155
156    /// Execute a custom function handler
157    async fn execute_custom_function(
158        &self,
159        name: &str,
160        message: &mut Message,
161        config: &FunctionConfig,
162    ) -> Result<(usize, Vec<Change>)> {
163        if let Some(handler) = self.task_functions.get(name) {
164            handler
165                .execute(message, config, Arc::clone(&self.datalogic))
166                .await
167        } else {
168            error!("Function handler not found: {}", name);
169            Err(DataflowError::FunctionNotFound(name.to_string()))
170        }
171    }
172
173    /// Check if a function handler exists
174    pub fn has_function(&self, name: &str) -> bool {
175        match name {
176            "map" | "validation" | "validate" | "parse_json" | "parse_xml" | "publish_json"
177            | "publish_xml" | "filter" | "log" | "http_call" | "enrich" | "publish_kafka" => true,
178            custom_name => self.task_functions.contains_key(custom_name),
179        }
180    }
181
182    /// Get a clone of the task_functions Arc for reuse in new engines
183    pub fn task_functions(
184        &self,
185    ) -> Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>> {
186        Arc::clone(&self.task_functions)
187    }
188
189    /// Get the count of registered custom functions
190    pub fn custom_function_count(&self) -> usize {
191        self.task_functions.len()
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use crate::engine::compiler::LogicCompiler;
199
200    #[test]
201    fn test_has_function() {
202        let compiler = LogicCompiler::new();
203        let (datalogic, logic_cache) = compiler.into_parts();
204        let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
205        let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), executor, datalogic);
206
207        // Test built-in functions
208        assert!(task_executor.has_function("map"));
209        assert!(task_executor.has_function("validation"));
210        assert!(task_executor.has_function("validate"));
211
212        // Test non-existent function
213        assert!(!task_executor.has_function("nonexistent"));
214    }
215
216    #[test]
217    fn test_custom_function_count() {
218        let mut custom_functions = HashMap::new();
219        // Add a dummy custom function (we'll use a mock for testing)
220        custom_functions.insert(
221            "custom_test".to_string(),
222            Box::new(MockAsyncFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>,
223        );
224
225        let compiler = LogicCompiler::new();
226        let (datalogic, logic_cache) = compiler.into_parts();
227        let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
228        let task_executor = TaskExecutor::new(Arc::new(custom_functions), executor, datalogic);
229
230        assert_eq!(task_executor.custom_function_count(), 1);
231    }
232
233    // Mock async function for testing
234    struct MockAsyncFunction;
235
236    #[async_trait::async_trait]
237    impl AsyncFunctionHandler for MockAsyncFunction {
238        async fn execute(
239            &self,
240            _message: &mut Message,
241            _config: &FunctionConfig,
242            _datalogic: Arc<DataLogic>,
243        ) -> Result<(usize, Vec<Change>)> {
244            Ok((200, vec![]))
245        }
246    }
247}