dataflow_rs/engine/
task_executor.rs1use crate::engine::error::{DataflowError, Result};
8use crate::engine::executor::InternalExecutor;
9use crate::engine::functions::{AsyncFunctionHandler, FunctionConfig};
10use crate::engine::message::{Change, Message};
11use crate::engine::task::Task;
12use datalogic_rs::DataLogic;
13use log::{debug, error};
14use std::collections::HashMap;
15use std::sync::Arc;
16
17pub struct TaskExecutor {
25 task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
27 executor: Arc<InternalExecutor>,
29 datalogic: Arc<DataLogic>,
31}
32
33impl TaskExecutor {
34 pub fn new(
36 task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
37 executor: Arc<InternalExecutor>,
38 datalogic: Arc<DataLogic>,
39 ) -> Self {
40 Self {
41 task_functions,
42 executor,
43 datalogic,
44 }
45 }
46
47 pub async fn execute(
61 &self,
62 task: &Task,
63 message: &mut Message,
64 ) -> Result<(usize, Vec<Change>)> {
65 debug!(
66 "Executing task: {} with function: {:?}",
67 task.id,
68 task.function.function_name()
69 );
70
71 match &task.function {
72 FunctionConfig::Map { input, .. } => {
73 self.executor.execute_map(message, input)
75 }
76 FunctionConfig::Validation { input, .. } => {
77 self.executor.execute_validation(message, input)
79 }
80 FunctionConfig::Custom { name, .. } => {
81 self.execute_custom_function(name, message, &task.function)
83 .await
84 }
85 }
86 }
87
88 async fn execute_custom_function(
90 &self,
91 name: &str,
92 message: &mut Message,
93 config: &FunctionConfig,
94 ) -> Result<(usize, Vec<Change>)> {
95 if let Some(handler) = self.task_functions.get(name) {
96 handler
97 .execute(message, config, Arc::clone(&self.datalogic))
98 .await
99 } else {
100 error!("Function handler not found: {}", name);
101 Err(DataflowError::FunctionNotFound(name.to_string()))
102 }
103 }
104
105 pub fn has_function(&self, name: &str) -> bool {
107 match name {
108 "map" | "validation" | "validate" => true,
109 custom_name => self.task_functions.contains_key(custom_name),
110 }
111 }
112
113 pub fn custom_function_count(&self) -> usize {
115 self.task_functions.len()
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122 use crate::engine::compiler::LogicCompiler;
123
124 #[test]
125 fn test_has_function() {
126 let compiler = LogicCompiler::new();
127 let (datalogic, logic_cache) = compiler.into_parts();
128 let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
129 let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), executor, datalogic);
130
131 assert!(task_executor.has_function("map"));
133 assert!(task_executor.has_function("validation"));
134 assert!(task_executor.has_function("validate"));
135
136 assert!(!task_executor.has_function("nonexistent"));
138 }
139
140 #[test]
141 fn test_custom_function_count() {
142 let mut custom_functions = HashMap::new();
143 custom_functions.insert(
145 "custom_test".to_string(),
146 Box::new(MockAsyncFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>,
147 );
148
149 let compiler = LogicCompiler::new();
150 let (datalogic, logic_cache) = compiler.into_parts();
151 let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
152 let task_executor = TaskExecutor::new(Arc::new(custom_functions), executor, datalogic);
153
154 assert_eq!(task_executor.custom_function_count(), 1);
155 }
156
157 struct MockAsyncFunction;
159
160 #[async_trait::async_trait]
161 impl AsyncFunctionHandler for MockAsyncFunction {
162 async fn execute(
163 &self,
164 _message: &mut Message,
165 _config: &FunctionConfig,
166 _datalogic: Arc<DataLogic>,
167 ) -> Result<(usize, Vec<Change>)> {
168 Ok((200, vec![]))
169 }
170 }
171}