1use crate::engine::error::{DataflowError, Result};
8use crate::engine::executor::InternalExecutor;
9use crate::engine::functions::{AsyncFunctionHandler, FunctionConfig};
10use crate::engine::message::{Change, Message};
11use crate::engine::task::Task;
12use datalogic_rs::DataLogic;
13use log::{debug, error};
14use serde_json::Value;
15use std::collections::HashMap;
16use std::sync::Arc;
17
18pub struct TaskExecutor {
26 task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
28 executor: Arc<InternalExecutor>,
30 datalogic: Arc<DataLogic>,
32}
33
34impl TaskExecutor {
35 pub fn new(
37 task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
38 executor: Arc<InternalExecutor>,
39 datalogic: Arc<DataLogic>,
40 ) -> Self {
41 Self {
42 task_functions,
43 executor,
44 datalogic,
45 }
46 }
47
48 pub async fn execute(
62 &self,
63 task: &Task,
64 message: &mut Message,
65 ) -> Result<(usize, Vec<Change>)> {
66 debug!(
67 "Executing task: {} with function: {:?}",
68 task.id,
69 task.function.function_name()
70 );
71
72 match &task.function {
73 FunctionConfig::Map { input, .. } => {
74 self.executor.execute_map(message, input)
76 }
77 FunctionConfig::Validation { input, .. } => {
78 self.executor.execute_validation(message, input)
80 }
81 FunctionConfig::ParseJson { input, .. } => {
82 crate::engine::functions::parse::execute_parse_json(message, input)
84 }
85 FunctionConfig::ParseXml { input, .. } => {
86 crate::engine::functions::parse::execute_parse_xml(message, input)
88 }
89 FunctionConfig::PublishJson { input, .. } => {
90 crate::engine::functions::publish::execute_publish_json(message, input)
92 }
93 FunctionConfig::PublishXml { input, .. } => {
94 crate::engine::functions::publish::execute_publish_xml(message, input)
96 }
97 FunctionConfig::Filter { input, .. } => {
98 self.executor.execute_filter(message, input)
100 }
101 FunctionConfig::Log { input, .. } => {
102 self.executor.execute_log(message, input)
104 }
105 FunctionConfig::HttpCall { .. } => {
106 self.execute_custom_function("http_call", message, &task.function)
108 .await
109 }
110 FunctionConfig::Enrich { .. } => {
111 self.execute_custom_function("enrich", message, &task.function)
113 .await
114 }
115 FunctionConfig::PublishKafka { .. } => {
116 self.execute_custom_function("publish_kafka", message, &task.function)
118 .await
119 }
120 FunctionConfig::Custom { name, .. } => {
121 self.execute_custom_function(name, message, &task.function)
123 .await
124 }
125 }
126 }
127
128 pub async fn execute_with_trace(
133 &self,
134 task: &Task,
135 message: &mut Message,
136 ) -> Result<(usize, Vec<Change>, Option<Vec<Value>>)> {
137 debug!(
138 "Executing task (trace): {} with function: {:?}",
139 task.id,
140 task.function.function_name()
141 );
142
143 match &task.function {
144 FunctionConfig::Map { input, .. } => {
145 let (status, changes, contexts) =
146 self.executor.execute_map_with_trace(message, input)?;
147 Ok((status, changes, Some(contexts)))
148 }
149 _ => {
150 let (status, changes) = self.execute(task, message).await?;
151 Ok((status, changes, None))
152 }
153 }
154 }
155
156 async fn execute_custom_function(
158 &self,
159 name: &str,
160 message: &mut Message,
161 config: &FunctionConfig,
162 ) -> Result<(usize, Vec<Change>)> {
163 if let Some(handler) = self.task_functions.get(name) {
164 handler
165 .execute(message, config, Arc::clone(&self.datalogic))
166 .await
167 } else {
168 error!("Function handler not found: {}", name);
169 Err(DataflowError::FunctionNotFound(name.to_string()))
170 }
171 }
172
173 pub fn has_function(&self, name: &str) -> bool {
175 match name {
176 "map" | "validation" | "validate" | "parse_json" | "parse_xml" | "publish_json"
177 | "publish_xml" | "filter" | "log" | "http_call" | "enrich" | "publish_kafka" => true,
178 custom_name => self.task_functions.contains_key(custom_name),
179 }
180 }
181
182 pub fn task_functions(
184 &self,
185 ) -> Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>> {
186 Arc::clone(&self.task_functions)
187 }
188
189 pub fn custom_function_count(&self) -> usize {
191 self.task_functions.len()
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use crate::engine::compiler::LogicCompiler;
199
200 #[test]
201 fn test_has_function() {
202 let compiler = LogicCompiler::new();
203 let (datalogic, logic_cache) = compiler.into_parts();
204 let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
205 let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), executor, datalogic);
206
207 assert!(task_executor.has_function("map"));
209 assert!(task_executor.has_function("validation"));
210 assert!(task_executor.has_function("validate"));
211
212 assert!(!task_executor.has_function("nonexistent"));
214 }
215
216 #[test]
217 fn test_custom_function_count() {
218 let mut custom_functions = HashMap::new();
219 custom_functions.insert(
221 "custom_test".to_string(),
222 Box::new(MockAsyncFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>,
223 );
224
225 let compiler = LogicCompiler::new();
226 let (datalogic, logic_cache) = compiler.into_parts();
227 let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
228 let task_executor = TaskExecutor::new(Arc::new(custom_functions), executor, datalogic);
229
230 assert_eq!(task_executor.custom_function_count(), 1);
231 }
232
233 struct MockAsyncFunction;
235
236 #[async_trait::async_trait]
237 impl AsyncFunctionHandler for MockAsyncFunction {
238 async fn execute(
239 &self,
240 _message: &mut Message,
241 _config: &FunctionConfig,
242 _datalogic: Arc<DataLogic>,
243 ) -> Result<(usize, Vec<Change>)> {
244 Ok((200, vec![]))
245 }
246 }
247}