use crate::engine::error::{DataflowError, Result};
use crate::engine::executor::InternalExecutor;
use crate::engine::functions::{AsyncFunctionHandler, FunctionConfig};
use crate::engine::message::{Change, Message};
use crate::engine::task::Task;
use datalogic_rs::DataLogic;
use log::{debug, error};
use std::collections::HashMap;
use std::sync::Arc;
pub struct TaskExecutor {
task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
executor: Arc<InternalExecutor>,
datalogic: Arc<DataLogic>,
}
impl TaskExecutor {
pub fn new(
task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
executor: Arc<InternalExecutor>,
datalogic: Arc<DataLogic>,
) -> Self {
Self {
task_functions,
executor,
datalogic,
}
}
pub async fn execute(
&self,
task: &Task,
message: &mut Message,
) -> Result<(usize, Vec<Change>)> {
debug!(
"Executing task: {} with function: {:?}",
task.id,
task.function.function_name()
);
match &task.function {
FunctionConfig::Map { input, .. } => {
self.executor.execute_map(message, input)
}
FunctionConfig::Validation { input, .. } => {
self.executor.execute_validation(message, input)
}
FunctionConfig::Custom { name, .. } => {
self.execute_custom_function(name, message, &task.function)
.await
}
}
}
async fn execute_custom_function(
&self,
name: &str,
message: &mut Message,
config: &FunctionConfig,
) -> Result<(usize, Vec<Change>)> {
if let Some(handler) = self.task_functions.get(name) {
handler
.execute(message, config, Arc::clone(&self.datalogic))
.await
} else {
error!("Function handler not found: {}", name);
Err(DataflowError::FunctionNotFound(name.to_string()))
}
}
pub fn has_function(&self, name: &str) -> bool {
match name {
"map" | "validation" | "validate" => true,
custom_name => self.task_functions.contains_key(custom_name),
}
}
pub fn custom_function_count(&self) -> usize {
self.task_functions.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::compiler::LogicCompiler;
#[test]
fn test_has_function() {
let compiler = LogicCompiler::new();
let (datalogic, logic_cache) = compiler.into_parts();
let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), executor, datalogic);
assert!(task_executor.has_function("map"));
assert!(task_executor.has_function("validation"));
assert!(task_executor.has_function("validate"));
assert!(!task_executor.has_function("nonexistent"));
}
#[test]
fn test_custom_function_count() {
let mut custom_functions = HashMap::new();
custom_functions.insert(
"custom_test".to_string(),
Box::new(MockAsyncFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>,
);
let compiler = LogicCompiler::new();
let (datalogic, logic_cache) = compiler.into_parts();
let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
let task_executor = TaskExecutor::new(Arc::new(custom_functions), executor, datalogic);
assert_eq!(task_executor.custom_function_count(), 1);
}
struct MockAsyncFunction;
#[async_trait::async_trait]
impl AsyncFunctionHandler for MockAsyncFunction {
async fn execute(
&self,
_message: &mut Message,
_config: &FunctionConfig,
_datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
Ok((200, vec![]))
}
}
}