dataflow_rs/engine/
task_executor.rs1use crate::engine::error::{DataflowError, Result};
8use crate::engine::executor::InternalExecutor;
9use crate::engine::functions::{AsyncFunctionHandler, FunctionConfig};
10use crate::engine::message::{Change, Message};
11use crate::engine::task::Task;
12use datalogic_rs::DataLogic;
13use log::{debug, error};
14use std::collections::HashMap;
15use std::sync::Arc;
16
17pub struct TaskExecutor {
25 task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
27 executor: Arc<InternalExecutor>,
29 datalogic: Arc<DataLogic>,
31}
32
33impl TaskExecutor {
34 pub fn new(
36 task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
37 executor: Arc<InternalExecutor>,
38 datalogic: Arc<DataLogic>,
39 ) -> Self {
40 Self {
41 task_functions,
42 executor,
43 datalogic,
44 }
45 }
46
47 pub async fn execute(
61 &self,
62 task: &Task,
63 message: &mut Message,
64 ) -> Result<(usize, Vec<Change>)> {
65 debug!(
66 "Executing task: {} with function: {:?}",
67 task.id,
68 task.function.function_name()
69 );
70
71 match &task.function {
72 FunctionConfig::Map { input, .. } => {
73 self.executor.execute_map(message, input)
75 }
76 FunctionConfig::Validation { input, .. } => {
77 self.executor.execute_validation(message, input)
79 }
80 FunctionConfig::ParseJson { input, .. } => {
81 crate::engine::functions::parse::execute_parse_json(message, input)
83 }
84 FunctionConfig::ParseXml { input, .. } => {
85 crate::engine::functions::parse::execute_parse_xml(message, input)
87 }
88 FunctionConfig::PublishJson { input, .. } => {
89 crate::engine::functions::publish::execute_publish_json(message, input)
91 }
92 FunctionConfig::PublishXml { input, .. } => {
93 crate::engine::functions::publish::execute_publish_xml(message, input)
95 }
96 FunctionConfig::Custom { name, .. } => {
97 self.execute_custom_function(name, message, &task.function)
99 .await
100 }
101 }
102 }
103
104 async fn execute_custom_function(
106 &self,
107 name: &str,
108 message: &mut Message,
109 config: &FunctionConfig,
110 ) -> Result<(usize, Vec<Change>)> {
111 if let Some(handler) = self.task_functions.get(name) {
112 handler
113 .execute(message, config, Arc::clone(&self.datalogic))
114 .await
115 } else {
116 error!("Function handler not found: {}", name);
117 Err(DataflowError::FunctionNotFound(name.to_string()))
118 }
119 }
120
121 pub fn has_function(&self, name: &str) -> bool {
123 match name {
124 "map" | "validation" | "validate" | "parse_json" | "parse_xml" | "publish_json"
125 | "publish_xml" => true,
126 custom_name => self.task_functions.contains_key(custom_name),
127 }
128 }
129
130 pub fn custom_function_count(&self) -> usize {
132 self.task_functions.len()
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use crate::engine::compiler::LogicCompiler;
140
141 #[test]
142 fn test_has_function() {
143 let compiler = LogicCompiler::new();
144 let (datalogic, logic_cache) = compiler.into_parts();
145 let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
146 let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), executor, datalogic);
147
148 assert!(task_executor.has_function("map"));
150 assert!(task_executor.has_function("validation"));
151 assert!(task_executor.has_function("validate"));
152
153 assert!(!task_executor.has_function("nonexistent"));
155 }
156
157 #[test]
158 fn test_custom_function_count() {
159 let mut custom_functions = HashMap::new();
160 custom_functions.insert(
162 "custom_test".to_string(),
163 Box::new(MockAsyncFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>,
164 );
165
166 let compiler = LogicCompiler::new();
167 let (datalogic, logic_cache) = compiler.into_parts();
168 let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
169 let task_executor = TaskExecutor::new(Arc::new(custom_functions), executor, datalogic);
170
171 assert_eq!(task_executor.custom_function_count(), 1);
172 }
173
174 struct MockAsyncFunction;
176
177 #[async_trait::async_trait]
178 impl AsyncFunctionHandler for MockAsyncFunction {
179 async fn execute(
180 &self,
181 _message: &mut Message,
182 _config: &FunctionConfig,
183 _datalogic: Arc<DataLogic>,
184 ) -> Result<(usize, Vec<Change>)> {
185 Ok((200, vec![]))
186 }
187 }
188}