Skip to main content

dataflow_rs/engine/
task_executor.rs

1//! # Task Execution Module
2//!
3//! Dispatches a single `Task` to its function implementation. Built-in sync
4//! variants of `FunctionConfig` are dispatched in `workflow_executor`'s sync
5//! stretch via [`FunctionConfig::try_execute_in_arena`]; this module owns
6//! the async path — `HttpCall`, `Enrich`, `PublishKafka`, and `Custom` —
7//! routed to the matching registered handler.
8
9use crate::engine::error::{DataflowError, Result};
10use crate::engine::functions::{BoxedFunctionHandler, FunctionConfig};
11use crate::engine::message::{Change, Message};
12use crate::engine::task::Task;
13use crate::engine::task_context::TaskContext;
14use crate::engine::task_outcome::TaskOutcome;
15use datalogic_rs::Engine;
16use log::{debug, error};
17use std::any::Any;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21/// Handles the execution of tasks with their associated functions.
22///
23/// The `TaskExecutor` is responsible for:
24/// - Routing async functions (http_call, enrich, publish_kafka, custom) to
25///   the matching registered handler via [`crate::engine::functions::DynAsyncFunctionHandler`]
26/// - Owning the function registry
27///
28/// Sync built-ins are *not* routed through `execute` — `workflow_executor`
29/// calls [`FunctionConfig::try_execute_in_arena`] inside its sync stretch
30/// for those, sharing one arena across consecutive sync tasks.
31pub struct TaskExecutor {
32    /// Registry of async function handlers
33    task_functions: Arc<HashMap<String, BoxedFunctionHandler>>,
34    /// Shared datalogic Engine (Send + Sync; Arc-shared across tasks)
35    engine: Arc<Engine>,
36}
37
38impl TaskExecutor {
39    /// Create a new TaskExecutor
40    pub fn new(
41        task_functions: Arc<HashMap<String, BoxedFunctionHandler>>,
42        engine: Arc<Engine>,
43    ) -> Self {
44        Self {
45            task_functions,
46            engine,
47        }
48    }
49
50    /// Execute a single task. Sync built-ins reach here only when called from
51    /// outside the workflow executor's sync-stretch path — they fall back to
52    /// their `execute()` methods (which open a fresh thread-local arena).
53    pub async fn execute(
54        &self,
55        task: &Task,
56        message: &mut Message,
57    ) -> Result<(TaskOutcome, Vec<Change>)> {
58        debug!(
59            "Executing task: {} with function: {:?}",
60            task.id,
61            task.function.function_name()
62        );
63
64        match &task.function {
65            // Sync built-ins — only hit here when called outside the workflow
66            // sync stretch (test harness, direct `TaskExecutor::execute`).
67            FunctionConfig::Map { input, .. } => input.execute(message, &self.engine),
68            FunctionConfig::Validation { input, .. } => input.execute(message, &self.engine),
69            FunctionConfig::ParseJson { input, .. } => {
70                crate::engine::functions::parse::execute_parse_json(message, input)
71            }
72            FunctionConfig::ParseXml { input, .. } => {
73                crate::engine::functions::parse::execute_parse_xml(message, input)
74            }
75            FunctionConfig::PublishJson { input, .. } => {
76                crate::engine::functions::publish::execute_publish_json(message, input)
77            }
78            FunctionConfig::PublishXml { input, .. } => {
79                crate::engine::functions::publish::execute_publish_xml(message, input)
80            }
81            FunctionConfig::Filter { input, .. } => input.execute(message, &self.engine),
82            FunctionConfig::Log { input, .. } => input.execute(message, &self.engine),
83            // Async / user-registered handlers
84            FunctionConfig::HttpCall { input, .. } => {
85                self.dispatch_handler("http_call", message, input).await
86            }
87            FunctionConfig::Enrich { input, .. } => {
88                self.dispatch_handler("enrich", message, input).await
89            }
90            FunctionConfig::PublishKafka { input, .. } => {
91                self.dispatch_handler("publish_kafka", message, input).await
92            }
93            FunctionConfig::Custom {
94                name,
95                compiled_input,
96                ..
97            } => {
98                let any_input = compiled_input.as_ref().ok_or_else(|| {
99                    DataflowError::Validation(format!(
100                        "Custom function '{}' has no precompiled input — \
101                         was the workflow built outside Engine::new?",
102                        name
103                    ))
104                })?;
105                self.dispatch_handler_any(name, message, any_input.as_any())
106                    .await
107            }
108        }
109    }
110
111    /// Generic-Input flavour: takes any `T: Any + Send + Sync`, hands it to
112    /// the registered handler as `&dyn Any`. Used by the built-in async
113    /// dispatch (`HttpCallConfig`, `EnrichConfig`, `PublishKafkaConfig`)
114    /// where the typed config is already on the `FunctionConfig` enum.
115    async fn dispatch_handler<T>(
116        &self,
117        name: &str,
118        message: &mut Message,
119        input: &T,
120    ) -> Result<(TaskOutcome, Vec<Change>)>
121    where
122        T: Any + Send + Sync,
123    {
124        let any_input: &(dyn Any + Send + Sync) = input;
125        self.dispatch_handler_any(name, message, any_input).await
126    }
127
128    /// Inner dispatch: build a `TaskContext`, invoke the handler, drain the
129    /// accumulated `Change` buffer.
130    async fn dispatch_handler_any(
131        &self,
132        name: &str,
133        message: &mut Message,
134        any_input: &(dyn Any + Send + Sync),
135    ) -> Result<(TaskOutcome, Vec<Change>)> {
136        let handler = self.task_functions.get(name).ok_or_else(|| {
137            error!("Function handler not found: {}", name);
138            DataflowError::FunctionNotFound(name.to_string())
139        })?;
140        let mut ctx = TaskContext::new(message, &self.engine);
141        let outcome = handler.dyn_execute(&mut ctx, any_input).await?;
142        let changes = ctx.into_changes();
143        Ok((outcome, changes))
144    }
145
146    /// Check if a function handler exists
147    pub fn has_function(&self, name: &str) -> bool {
148        match name {
149            "map" | "validation" | "validate" | "parse_json" | "parse_xml" | "publish_json"
150            | "publish_xml" | "filter" | "log" | "http_call" | "enrich" | "publish_kafka" => true,
151            custom_name => self.task_functions.contains_key(custom_name),
152        }
153    }
154
155    /// Get a clone of the task_functions Arc for reuse in new engines
156    pub fn task_functions(&self) -> Arc<HashMap<String, BoxedFunctionHandler>> {
157        Arc::clone(&self.task_functions)
158    }
159
160    /// Get the count of registered custom functions
161    pub fn custom_function_count(&self) -> usize {
162        self.task_functions.len()
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use crate::engine::AsyncFunctionHandler;
170    use crate::engine::compiler::LogicCompiler;
171
172    #[test]
173    fn test_has_function() {
174        let engine = LogicCompiler::new().into_engine();
175        let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), engine);
176
177        // Built-in functions
178        assert!(task_executor.has_function("map"));
179        assert!(task_executor.has_function("validation"));
180        assert!(task_executor.has_function("validate"));
181
182        // Non-existent function
183        assert!(!task_executor.has_function("nonexistent"));
184    }
185
186    #[test]
187    fn test_custom_function_count() {
188        let mut custom_functions: HashMap<String, BoxedFunctionHandler> = HashMap::new();
189        custom_functions.insert("custom_test".to_string(), Box::new(MockAsyncFunction));
190
191        let engine = LogicCompiler::new().into_engine();
192        let task_executor = TaskExecutor::new(Arc::new(custom_functions), engine);
193
194        assert_eq!(task_executor.custom_function_count(), 1);
195    }
196
197    // Mock async function for testing
198    struct MockAsyncFunction;
199
200    #[async_trait::async_trait]
201    impl AsyncFunctionHandler for MockAsyncFunction {
202        type Input = serde_json::Value;
203
204        async fn execute(
205            &self,
206            _ctx: &mut TaskContext<'_>,
207            _input: &serde_json::Value,
208        ) -> Result<TaskOutcome> {
209            Ok(TaskOutcome::Success)
210        }
211    }
212}