Skip to main content

dataflow_rs/engine/functions/
mod.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::task_context::TaskContext;
3use crate::engine::task_outcome::TaskOutcome;
4use async_trait::async_trait;
5use serde::de::DeserializeOwned;
6use serde_json::Value;
7use std::any::Any;
8
9pub mod config;
10pub use config::{CompiledCustomInput, FunctionConfig};
11
12pub mod validation;
13pub use validation::{ValidationConfig, ValidationRule};
14
15pub mod map;
16pub use map::{MapConfig, MapMapping};
17
18pub mod parse;
19pub use parse::ParseConfig;
20
21pub mod publish;
22pub use publish::PublishConfig;
23
24pub mod filter;
25pub use filter::{FilterConfig, RejectAction};
26
27pub mod log;
28pub use log::{LogConfig, LogLevel};
29
30pub mod integration;
31pub use integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
32
33/// Async interface for task functions that operate on messages.
34///
35/// Implement this trait for custom processing logic. The trait associates a
36/// typed `Input` deserialized from the task's `FunctionConfig` so that
37/// handlers receive their config already parsed — no `match
38/// FunctionConfig::Custom { input, .. }` boilerplate, no per-call
39/// `serde_json::from_value` cost in the hot path. The engine deserializes the
40/// `Custom.input` JSON exactly once at `Engine::new()` time and caches the
41/// typed value alongside the task; mismatched config shapes therefore fail
42/// at startup rather than on first message.
43///
44/// Handlers mutate the message via [`TaskContext`] — its `set` family records
45/// changes on the audit trail automatically when `message.capture_changes`
46/// is enabled, so handlers don't have to hand-build [`crate::engine::message::Change`]
47/// entries.
48///
49/// ## Example
50///
51/// ```rust,no_run
52/// use async_trait::async_trait;
53/// use dataflow_rs::{
54///     AsyncFunctionHandler, Result, TaskContext, TaskOutcome,
55/// };
56/// use datavalue::OwnedDataValue;
57/// use serde::Deserialize;
58///
59/// #[derive(Deserialize)]
60/// struct StatsInput {
61///     data_path: String,
62///     output_path: String,
63/// }
64///
65/// struct StatisticsFunction;
66///
67/// #[async_trait]
68/// impl AsyncFunctionHandler for StatisticsFunction {
69///     type Input = StatsInput;
70///
71///     async fn execute(
72///         &self,
73///         ctx: &mut TaskContext<'_>,
74///         input: &StatsInput,
75///     ) -> Result<TaskOutcome> {
76///         let count = ctx.data()
77///             .get(input.data_path.as_str())
78///             .and_then(|v| v.as_array())
79///             .map(|a| a.len())
80///             .unwrap_or(0);
81///         ctx.set(
82///             &format!("data.{}.count", input.output_path),
83///             OwnedDataValue::from(&serde_json::json!(count)),
84///         );
85///         Ok(TaskOutcome::Success)
86///     }
87/// }
88/// ```
89#[async_trait]
90pub trait AsyncFunctionHandler: Send + Sync + 'static {
91    /// Typed configuration shape for this handler. Use
92    /// `serde_json::Value` for handlers that take freeform JSON.
93    type Input: DeserializeOwned + Send + Sync + 'static;
94
95    /// Parse the raw `FunctionConfig::Custom { input }` JSON into
96    /// `Self::Input`. Default impl uses `serde_json::from_value`. Override
97    /// only if you need custom validation beyond what serde provides.
98    ///
99    /// Built-in async function variants (`HttpCall`, `Enrich`,
100    /// `PublishKafka`) bypass this method — their typed configs are already
101    /// parsed by `serde(untagged)` on `FunctionConfig` and dispatched
102    /// directly to the registered handler.
103    fn parse_input(input: &Value) -> Result<Self::Input> {
104        serde_json::from_value(input.clone()).map_err(DataflowError::from_serde)
105    }
106
107    /// Execute the handler. The `ctx` accumulates audit-trail changes
108    /// pushed via its `set` family; the workflow executor folds them into
109    /// the audit trail when this method returns.
110    async fn execute(&self, ctx: &mut TaskContext<'_>, input: &Self::Input) -> Result<TaskOutcome>;
111}
112
113/// Object-safe sibling of [`AsyncFunctionHandler`]. Engine-internal — users
114/// should not implement this directly; the blanket impl below derives it
115/// for any `AsyncFunctionHandler`. Exposed (rather than `pub(crate)`) only
116/// because [`BoxedFunctionHandler`] mentions it in its public type alias.
117#[doc(hidden)]
118#[async_trait]
119pub trait DynAsyncFunctionHandler: Send + Sync + 'static {
120    /// Pre-parse the raw JSON input into the handler's typed shape and box
121    /// it as `dyn Any`. Called once per task at `Engine::new()` time.
122    fn parse_input_box(&self, input: &Value) -> Result<Box<dyn Any + Send + Sync>>;
123
124    /// Execute against an already-parsed typed input. The implementation
125    /// downcasts `input` to `<Self as AsyncFunctionHandler>::Input`; the
126    /// downcast is infallible in the engine's call paths because
127    /// `parse_input_box` produced the very same type.
128    async fn dyn_execute(
129        &self,
130        ctx: &mut TaskContext<'_>,
131        input: &(dyn Any + Send + Sync),
132    ) -> Result<TaskOutcome>;
133}
134
135#[async_trait]
136impl<F: AsyncFunctionHandler> DynAsyncFunctionHandler for F {
137    fn parse_input_box(&self, input: &Value) -> Result<Box<dyn Any + Send + Sync>> {
138        let typed = <F as AsyncFunctionHandler>::parse_input(input)?;
139        Ok(Box::new(typed))
140    }
141
142    async fn dyn_execute(
143        &self,
144        ctx: &mut TaskContext<'_>,
145        input: &(dyn Any + Send + Sync),
146    ) -> Result<TaskOutcome> {
147        let typed = input.downcast_ref::<F::Input>().ok_or_else(|| {
148            DataflowError::Validation(format!(
149                "Handler input type mismatch (expected {})",
150                std::any::type_name::<F::Input>()
151            ))
152        })?;
153        AsyncFunctionHandler::execute(self, ctx, typed).await
154    }
155}
156
157/// Boxed handler stored in the engine's function registry. Users construct
158/// these with `Box::new(MyHandler)` — the blanket impl above auto-coerces
159/// any `AsyncFunctionHandler` into `Box<dyn DynAsyncFunctionHandler + Send + Sync>`.
160pub type BoxedFunctionHandler = Box<dyn DynAsyncFunctionHandler + Send + Sync>;