use crate::engine::error::{DataflowError, Result};
use crate::engine::task_context::TaskContext;
use crate::engine::task_outcome::TaskOutcome;
use async_trait::async_trait;
use serde::de::DeserializeOwned;
use serde_json::Value;
use std::any::Any;
pub mod config;
pub use config::{CompiledCustomInput, FunctionConfig};
pub mod validation;
pub use validation::{ValidationConfig, ValidationRule};
pub mod map;
pub use map::{MapConfig, MapMapping};
pub mod parse;
pub use parse::ParseConfig;
pub mod publish;
pub use publish::PublishConfig;
pub mod filter;
pub use filter::{FilterConfig, RejectAction};
pub mod log;
pub use log::{LogConfig, LogLevel};
pub mod integration;
pub use integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
#[async_trait]
pub trait AsyncFunctionHandler: Send + Sync + 'static {
type Input: DeserializeOwned + Send + Sync + 'static;
fn parse_input(input: &Value) -> Result<Self::Input> {
serde_json::from_value(input.clone()).map_err(DataflowError::from_serde)
}
async fn execute(&self, ctx: &mut TaskContext<'_>, input: &Self::Input) -> Result<TaskOutcome>;
}
#[doc(hidden)]
#[async_trait]
pub trait DynAsyncFunctionHandler: Send + Sync + 'static {
fn parse_input_box(&self, input: &Value) -> Result<Box<dyn Any + Send + Sync>>;
async fn dyn_execute(
&self,
ctx: &mut TaskContext<'_>,
input: &(dyn Any + Send + Sync),
) -> Result<TaskOutcome>;
}
#[async_trait]
impl<F: AsyncFunctionHandler> DynAsyncFunctionHandler for F {
fn parse_input_box(&self, input: &Value) -> Result<Box<dyn Any + Send + Sync>> {
let typed = <F as AsyncFunctionHandler>::parse_input(input)?;
Ok(Box::new(typed))
}
async fn dyn_execute(
&self,
ctx: &mut TaskContext<'_>,
input: &(dyn Any + Send + Sync),
) -> Result<TaskOutcome> {
let typed = input.downcast_ref::<F::Input>().ok_or_else(|| {
DataflowError::Validation(format!(
"Handler input type mismatch (expected {})",
std::any::type_name::<F::Input>()
))
})?;
AsyncFunctionHandler::execute(self, ctx, typed).await
}
}
pub type BoxedFunctionHandler = Box<dyn DynAsyncFunctionHandler + Send + Sync>;