dataflow_rs/engine/
task_executor.rs1use crate::engine::error::{DataflowError, Result};
8use crate::engine::executor::InternalExecutor;
9use crate::engine::functions::{AsyncFunctionHandler, FunctionConfig};
10use crate::engine::message::{Change, Message};
11use crate::engine::task::Task;
12use datalogic_rs::DataLogic;
13use log::{debug, error};
14use serde_json::Value;
15use std::collections::HashMap;
16use std::sync::Arc;
17
18pub struct TaskExecutor {
26 task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
28 executor: Arc<InternalExecutor>,
30 datalogic: Arc<DataLogic>,
32}
33
34impl TaskExecutor {
35 pub fn new(
37 task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
38 executor: Arc<InternalExecutor>,
39 datalogic: Arc<DataLogic>,
40 ) -> Self {
41 Self {
42 task_functions,
43 executor,
44 datalogic,
45 }
46 }
47
48 pub async fn execute(
62 &self,
63 task: &Task,
64 message: &mut Message,
65 ) -> Result<(usize, Vec<Change>)> {
66 debug!(
67 "Executing task: {} with function: {:?}",
68 task.id,
69 task.function.function_name()
70 );
71
72 match &task.function {
73 FunctionConfig::Map { input, .. } => {
74 self.executor.execute_map(message, input)
76 }
77 FunctionConfig::Validation { input, .. } => {
78 self.executor.execute_validation(message, input)
80 }
81 FunctionConfig::ParseJson { input, .. } => {
82 crate::engine::functions::parse::execute_parse_json(message, input)
84 }
85 FunctionConfig::ParseXml { input, .. } => {
86 crate::engine::functions::parse::execute_parse_xml(message, input)
88 }
89 FunctionConfig::PublishJson { input, .. } => {
90 crate::engine::functions::publish::execute_publish_json(message, input)
92 }
93 FunctionConfig::PublishXml { input, .. } => {
94 crate::engine::functions::publish::execute_publish_xml(message, input)
96 }
97 FunctionConfig::Custom { name, .. } => {
98 self.execute_custom_function(name, message, &task.function)
100 .await
101 }
102 }
103 }
104
105 pub async fn execute_with_trace(
110 &self,
111 task: &Task,
112 message: &mut Message,
113 ) -> Result<(usize, Vec<Change>, Option<Vec<Value>>)> {
114 debug!(
115 "Executing task (trace): {} with function: {:?}",
116 task.id,
117 task.function.function_name()
118 );
119
120 match &task.function {
121 FunctionConfig::Map { input, .. } => {
122 let (status, changes, contexts) =
123 self.executor.execute_map_with_trace(message, input)?;
124 Ok((status, changes, Some(contexts)))
125 }
126 _ => {
127 let (status, changes) = self.execute(task, message).await?;
128 Ok((status, changes, None))
129 }
130 }
131 }
132
133 async fn execute_custom_function(
135 &self,
136 name: &str,
137 message: &mut Message,
138 config: &FunctionConfig,
139 ) -> Result<(usize, Vec<Change>)> {
140 if let Some(handler) = self.task_functions.get(name) {
141 handler
142 .execute(message, config, Arc::clone(&self.datalogic))
143 .await
144 } else {
145 error!("Function handler not found: {}", name);
146 Err(DataflowError::FunctionNotFound(name.to_string()))
147 }
148 }
149
150 pub fn has_function(&self, name: &str) -> bool {
152 match name {
153 "map" | "validation" | "validate" | "parse_json" | "parse_xml" | "publish_json"
154 | "publish_xml" => true,
155 custom_name => self.task_functions.contains_key(custom_name),
156 }
157 }
158
159 pub fn custom_function_count(&self) -> usize {
161 self.task_functions.len()
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use crate::engine::compiler::LogicCompiler;
169
170 #[test]
171 fn test_has_function() {
172 let compiler = LogicCompiler::new();
173 let (datalogic, logic_cache) = compiler.into_parts();
174 let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
175 let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), executor, datalogic);
176
177 assert!(task_executor.has_function("map"));
179 assert!(task_executor.has_function("validation"));
180 assert!(task_executor.has_function("validate"));
181
182 assert!(!task_executor.has_function("nonexistent"));
184 }
185
186 #[test]
187 fn test_custom_function_count() {
188 let mut custom_functions = HashMap::new();
189 custom_functions.insert(
191 "custom_test".to_string(),
192 Box::new(MockAsyncFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>,
193 );
194
195 let compiler = LogicCompiler::new();
196 let (datalogic, logic_cache) = compiler.into_parts();
197 let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
198 let task_executor = TaskExecutor::new(Arc::new(custom_functions), executor, datalogic);
199
200 assert_eq!(task_executor.custom_function_count(), 1);
201 }
202
203 struct MockAsyncFunction;
205
206 #[async_trait::async_trait]
207 impl AsyncFunctionHandler for MockAsyncFunction {
208 async fn execute(
209 &self,
210 _message: &mut Message,
211 _config: &FunctionConfig,
212 _datalogic: Arc<DataLogic>,
213 ) -> Result<(usize, Vec<Change>)> {
214 Ok((200, vec![]))
215 }
216 }
217}