dataflow_rs/engine/
task_executor.rs1use crate::engine::error::{DataflowError, Result};
10use crate::engine::functions::{BoxedFunctionHandler, FunctionConfig};
11use crate::engine::message::{Change, Message};
12use crate::engine::task::Task;
13use crate::engine::task_context::TaskContext;
14use crate::engine::task_outcome::TaskOutcome;
15use datalogic_rs::Engine;
16use log::{debug, error};
17use std::any::Any;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21pub struct TaskExecutor {
32 task_functions: Arc<HashMap<String, BoxedFunctionHandler>>,
34 engine: Arc<Engine>,
36}
37
38impl TaskExecutor {
39 pub fn new(
41 task_functions: Arc<HashMap<String, BoxedFunctionHandler>>,
42 engine: Arc<Engine>,
43 ) -> Self {
44 Self {
45 task_functions,
46 engine,
47 }
48 }
49
50 pub async fn execute(
54 &self,
55 task: &Task,
56 message: &mut Message,
57 ) -> Result<(TaskOutcome, Vec<Change>)> {
58 debug!(
59 "Executing task: {} with function: {:?}",
60 task.id,
61 task.function.function_name()
62 );
63
64 match &task.function {
65 FunctionConfig::Map { input, .. } => input.execute(message, &self.engine),
68 FunctionConfig::Validation { input, .. } => input.execute(message, &self.engine),
69 FunctionConfig::ParseJson { input, .. } => {
70 crate::engine::functions::parse::execute_parse_json(message, input)
71 }
72 FunctionConfig::ParseXml { input, .. } => {
73 crate::engine::functions::parse::execute_parse_xml(message, input)
74 }
75 FunctionConfig::PublishJson { input, .. } => {
76 crate::engine::functions::publish::execute_publish_json(message, input)
77 }
78 FunctionConfig::PublishXml { input, .. } => {
79 crate::engine::functions::publish::execute_publish_xml(message, input)
80 }
81 FunctionConfig::Filter { input, .. } => input.execute(message, &self.engine),
82 FunctionConfig::Log { input, .. } => input.execute(message, &self.engine),
83 FunctionConfig::HttpCall { input, .. } => {
85 self.dispatch_handler("http_call", message, input).await
86 }
87 FunctionConfig::Enrich { input, .. } => {
88 self.dispatch_handler("enrich", message, input).await
89 }
90 FunctionConfig::PublishKafka { input, .. } => {
91 self.dispatch_handler("publish_kafka", message, input).await
92 }
93 FunctionConfig::Custom {
94 name,
95 compiled_input,
96 ..
97 } => {
98 let any_input = compiled_input.as_ref().ok_or_else(|| {
99 DataflowError::Validation(format!(
100 "Custom function '{}' has no precompiled input — \
101 was the workflow built outside Engine::new?",
102 name
103 ))
104 })?;
105 self.dispatch_handler_any(name, message, any_input.as_any())
106 .await
107 }
108 }
109 }
110
111 async fn dispatch_handler<T>(
116 &self,
117 name: &str,
118 message: &mut Message,
119 input: &T,
120 ) -> Result<(TaskOutcome, Vec<Change>)>
121 where
122 T: Any + Send + Sync,
123 {
124 let any_input: &(dyn Any + Send + Sync) = input;
125 self.dispatch_handler_any(name, message, any_input).await
126 }
127
128 async fn dispatch_handler_any(
131 &self,
132 name: &str,
133 message: &mut Message,
134 any_input: &(dyn Any + Send + Sync),
135 ) -> Result<(TaskOutcome, Vec<Change>)> {
136 let handler = self.task_functions.get(name).ok_or_else(|| {
137 error!("Function handler not found: {}", name);
138 DataflowError::FunctionNotFound(name.to_string())
139 })?;
140 let mut ctx = TaskContext::new(message, &self.engine);
141 let outcome = handler.dyn_execute(&mut ctx, any_input).await?;
142 let changes = ctx.into_changes();
143 Ok((outcome, changes))
144 }
145
146 pub fn has_function(&self, name: &str) -> bool {
148 match name {
149 "map" | "validation" | "validate" | "parse_json" | "parse_xml" | "publish_json"
150 | "publish_xml" | "filter" | "log" | "http_call" | "enrich" | "publish_kafka" => true,
151 custom_name => self.task_functions.contains_key(custom_name),
152 }
153 }
154
155 pub fn task_functions(&self) -> Arc<HashMap<String, BoxedFunctionHandler>> {
157 Arc::clone(&self.task_functions)
158 }
159
160 pub fn custom_function_count(&self) -> usize {
162 self.task_functions.len()
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use crate::engine::AsyncFunctionHandler;
170 use crate::engine::compiler::LogicCompiler;
171
172 #[test]
173 fn test_has_function() {
174 let engine = LogicCompiler::new().into_engine();
175 let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), engine);
176
177 assert!(task_executor.has_function("map"));
179 assert!(task_executor.has_function("validation"));
180 assert!(task_executor.has_function("validate"));
181
182 assert!(!task_executor.has_function("nonexistent"));
184 }
185
186 #[test]
187 fn test_custom_function_count() {
188 let mut custom_functions: HashMap<String, BoxedFunctionHandler> = HashMap::new();
189 custom_functions.insert("custom_test".to_string(), Box::new(MockAsyncFunction));
190
191 let engine = LogicCompiler::new().into_engine();
192 let task_executor = TaskExecutor::new(Arc::new(custom_functions), engine);
193
194 assert_eq!(task_executor.custom_function_count(), 1);
195 }
196
197 struct MockAsyncFunction;
199
200 #[async_trait::async_trait]
201 impl AsyncFunctionHandler for MockAsyncFunction {
202 type Input = serde_json::Value;
203
204 async fn execute(
205 &self,
206 _ctx: &mut TaskContext<'_>,
207 _input: &serde_json::Value,
208 ) -> Result<TaskOutcome> {
209 Ok(TaskOutcome::Success)
210 }
211 }
212}