use crate::engine::error::{DataflowError, Result};
use crate::engine::executor::InternalExecutor;
use crate::engine::functions::{AsyncFunctionHandler, FunctionConfig};
use crate::engine::message::{Change, Message};
use crate::engine::task::Task;
use datalogic_rs::DataLogic;
use log::{debug, error};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
pub struct TaskExecutor {
task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
executor: Arc<InternalExecutor>,
datalogic: Arc<DataLogic>,
}
impl TaskExecutor {
pub fn new(
task_functions: Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
executor: Arc<InternalExecutor>,
datalogic: Arc<DataLogic>,
) -> Self {
Self {
task_functions,
executor,
datalogic,
}
}
pub async fn execute(
&self,
task: &Task,
message: &mut Message,
) -> Result<(usize, Vec<Change>)> {
debug!(
"Executing task: {} with function: {:?}",
task.id,
task.function.function_name()
);
match &task.function {
FunctionConfig::Map { input, .. } => {
self.executor.execute_map(message, input)
}
FunctionConfig::Validation { input, .. } => {
self.executor.execute_validation(message, input)
}
FunctionConfig::ParseJson { input, .. } => {
crate::engine::functions::parse::execute_parse_json(message, input)
}
FunctionConfig::ParseXml { input, .. } => {
crate::engine::functions::parse::execute_parse_xml(message, input)
}
FunctionConfig::PublishJson { input, .. } => {
crate::engine::functions::publish::execute_publish_json(message, input)
}
FunctionConfig::PublishXml { input, .. } => {
crate::engine::functions::publish::execute_publish_xml(message, input)
}
FunctionConfig::Filter { input, .. } => {
self.executor.execute_filter(message, input)
}
FunctionConfig::Log { input, .. } => {
self.executor.execute_log(message, input)
}
FunctionConfig::HttpCall { .. } => {
self.execute_custom_function("http_call", message, &task.function)
.await
}
FunctionConfig::Enrich { .. } => {
self.execute_custom_function("enrich", message, &task.function)
.await
}
FunctionConfig::PublishKafka { .. } => {
self.execute_custom_function("publish_kafka", message, &task.function)
.await
}
FunctionConfig::Custom { name, .. } => {
self.execute_custom_function(name, message, &task.function)
.await
}
}
}
pub async fn execute_with_trace(
&self,
task: &Task,
message: &mut Message,
) -> Result<(usize, Vec<Change>, Option<Vec<Value>>)> {
debug!(
"Executing task (trace): {} with function: {:?}",
task.id,
task.function.function_name()
);
match &task.function {
FunctionConfig::Map { input, .. } => {
let (status, changes, contexts) =
self.executor.execute_map_with_trace(message, input)?;
Ok((status, changes, Some(contexts)))
}
_ => {
let (status, changes) = self.execute(task, message).await?;
Ok((status, changes, None))
}
}
}
async fn execute_custom_function(
&self,
name: &str,
message: &mut Message,
config: &FunctionConfig,
) -> Result<(usize, Vec<Change>)> {
if let Some(handler) = self.task_functions.get(name) {
handler
.execute(message, config, Arc::clone(&self.datalogic))
.await
} else {
error!("Function handler not found: {}", name);
Err(DataflowError::FunctionNotFound(name.to_string()))
}
}
pub fn has_function(&self, name: &str) -> bool {
match name {
"map" | "validation" | "validate" | "parse_json" | "parse_xml" | "publish_json"
| "publish_xml" | "filter" | "log" | "http_call" | "enrich" | "publish_kafka" => true,
custom_name => self.task_functions.contains_key(custom_name),
}
}
pub fn task_functions(
&self,
) -> Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>> {
Arc::clone(&self.task_functions)
}
pub fn custom_function_count(&self) -> usize {
self.task_functions.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::compiler::LogicCompiler;
#[test]
fn test_has_function() {
let compiler = LogicCompiler::new();
let (datalogic, logic_cache) = compiler.into_parts();
let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), executor, datalogic);
assert!(task_executor.has_function("map"));
assert!(task_executor.has_function("validation"));
assert!(task_executor.has_function("validate"));
assert!(!task_executor.has_function("nonexistent"));
}
#[test]
fn test_custom_function_count() {
let mut custom_functions = HashMap::new();
custom_functions.insert(
"custom_test".to_string(),
Box::new(MockAsyncFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>,
);
let compiler = LogicCompiler::new();
let (datalogic, logic_cache) = compiler.into_parts();
let executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
let task_executor = TaskExecutor::new(Arc::new(custom_functions), executor, datalogic);
assert_eq!(task_executor.custom_function_count(), 1);
}
struct MockAsyncFunction;
#[async_trait::async_trait]
impl AsyncFunctionHandler for MockAsyncFunction {
async fn execute(
&self,
_message: &mut Message,
_config: &FunctionConfig,
_datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
Ok((200, vec![]))
}
}
}