use crate::engine::error::{DataflowError, Result};
use crate::engine::functions::{BoxedFunctionHandler, FunctionConfig};
use crate::engine::message::{Change, Message};
use crate::engine::task::Task;
use crate::engine::task_context::TaskContext;
use crate::engine::task_outcome::TaskOutcome;
use datalogic_rs::Engine;
use log::{debug, error};
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
pub struct TaskExecutor {
task_functions: Arc<HashMap<String, BoxedFunctionHandler>>,
engine: Arc<Engine>,
}
impl TaskExecutor {
pub fn new(
task_functions: Arc<HashMap<String, BoxedFunctionHandler>>,
engine: Arc<Engine>,
) -> Self {
Self {
task_functions,
engine,
}
}
pub async fn execute(
&self,
task: &Task,
message: &mut Message,
) -> Result<(TaskOutcome, Vec<Change>)> {
debug!(
"Executing task: {} with function: {:?}",
task.id,
task.function.function_name()
);
match &task.function {
FunctionConfig::Map { input, .. } => input.execute(message, &self.engine),
FunctionConfig::Validation { input, .. } => input.execute(message, &self.engine),
FunctionConfig::ParseJson { input, .. } => {
crate::engine::functions::parse::execute_parse_json(message, input)
}
FunctionConfig::ParseXml { input, .. } => {
crate::engine::functions::parse::execute_parse_xml(message, input)
}
FunctionConfig::PublishJson { input, .. } => {
crate::engine::functions::publish::execute_publish_json(message, input)
}
FunctionConfig::PublishXml { input, .. } => {
crate::engine::functions::publish::execute_publish_xml(message, input)
}
FunctionConfig::Filter { input, .. } => input.execute(message, &self.engine),
FunctionConfig::Log { input, .. } => input.execute(message, &self.engine),
FunctionConfig::HttpCall { input, .. } => {
self.dispatch_handler("http_call", message, input).await
}
FunctionConfig::Enrich { input, .. } => {
self.dispatch_handler("enrich", message, input).await
}
FunctionConfig::PublishKafka { input, .. } => {
self.dispatch_handler("publish_kafka", message, input).await
}
FunctionConfig::Custom {
name,
compiled_input,
..
} => {
let any_input = compiled_input.as_ref().ok_or_else(|| {
DataflowError::Validation(format!(
"Custom function '{}' has no precompiled input — \
was the workflow built outside Engine::new?",
name
))
})?;
self.dispatch_handler_any(name, message, any_input.as_any())
.await
}
}
}
async fn dispatch_handler<T>(
&self,
name: &str,
message: &mut Message,
input: &T,
) -> Result<(TaskOutcome, Vec<Change>)>
where
T: Any + Send + Sync,
{
let any_input: &(dyn Any + Send + Sync) = input;
self.dispatch_handler_any(name, message, any_input).await
}
async fn dispatch_handler_any(
&self,
name: &str,
message: &mut Message,
any_input: &(dyn Any + Send + Sync),
) -> Result<(TaskOutcome, Vec<Change>)> {
let handler = self.task_functions.get(name).ok_or_else(|| {
error!("Function handler not found: {}", name);
DataflowError::FunctionNotFound(name.to_string())
})?;
let mut ctx = TaskContext::new(message, &self.engine);
let outcome = handler.dyn_execute(&mut ctx, any_input).await?;
let changes = ctx.into_changes();
Ok((outcome, changes))
}
pub fn has_function(&self, name: &str) -> bool {
match name {
"map" | "validation" | "validate" | "parse_json" | "parse_xml" | "publish_json"
| "publish_xml" | "filter" | "log" | "http_call" | "enrich" | "publish_kafka" => true,
custom_name => self.task_functions.contains_key(custom_name),
}
}
pub fn task_functions(&self) -> Arc<HashMap<String, BoxedFunctionHandler>> {
Arc::clone(&self.task_functions)
}
pub fn custom_function_count(&self) -> usize {
self.task_functions.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::AsyncFunctionHandler;
use crate::engine::compiler::LogicCompiler;
#[test]
fn test_has_function() {
let engine = LogicCompiler::new().into_engine();
let task_executor = TaskExecutor::new(Arc::new(HashMap::new()), engine);
assert!(task_executor.has_function("map"));
assert!(task_executor.has_function("validation"));
assert!(task_executor.has_function("validate"));
assert!(!task_executor.has_function("nonexistent"));
}
#[test]
fn test_custom_function_count() {
let mut custom_functions: HashMap<String, BoxedFunctionHandler> = HashMap::new();
custom_functions.insert("custom_test".to_string(), Box::new(MockAsyncFunction));
let engine = LogicCompiler::new().into_engine();
let task_executor = TaskExecutor::new(Arc::new(custom_functions), engine);
assert_eq!(task_executor.custom_function_count(), 1);
}
struct MockAsyncFunction;
#[async_trait::async_trait]
impl AsyncFunctionHandler for MockAsyncFunction {
type Input = serde_json::Value;
async fn execute(
&self,
_ctx: &mut TaskContext<'_>,
_input: &serde_json::Value,
) -> Result<TaskOutcome> {
Ok(TaskOutcome::Success)
}
}
}