dataflow_rs/engine/functions/mod.rs
1use crate::engine::error::{DataflowError, Result};
2use crate::engine::task_context::TaskContext;
3use crate::engine::task_outcome::TaskOutcome;
4use async_trait::async_trait;
5use serde::de::DeserializeOwned;
6use serde_json::Value;
7use std::any::Any;
8
9pub mod config;
10pub use config::{CompiledCustomInput, FunctionConfig};
11
12pub mod validation;
13pub use validation::{ValidationConfig, ValidationRule};
14
15pub mod map;
16pub use map::{MapConfig, MapMapping};
17
18pub mod parse;
19pub use parse::ParseConfig;
20
21pub mod publish;
22pub use publish::PublishConfig;
23
24pub mod filter;
25pub use filter::{FilterConfig, RejectAction};
26
27pub mod log;
28pub use log::{LogConfig, LogLevel};
29
30pub mod integration;
31pub use integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
32
33/// Async interface for task functions that operate on messages.
34///
35/// Implement this trait for custom processing logic. The trait associates a
36/// typed `Input` deserialized from the task's `FunctionConfig` so that
37/// handlers receive their config already parsed — no `match
38/// FunctionConfig::Custom { input, .. }` boilerplate, no per-call
39/// `serde_json::from_value` cost in the hot path. The engine deserializes the
40/// `Custom.input` JSON exactly once at `Engine::new()` time and caches the
41/// typed value alongside the task; mismatched config shapes therefore fail
42/// at startup rather than on first message.
43///
44/// Handlers mutate the message via [`TaskContext`] — its `set` family records
45/// changes on the audit trail automatically when `message.capture_changes`
46/// is enabled, so handlers don't have to hand-build [`crate::engine::message::Change`]
47/// entries.
48///
49/// ## Example
50///
51/// ```rust,no_run
52/// use async_trait::async_trait;
53/// use dataflow_rs::{
54/// AsyncFunctionHandler, Result, TaskContext, TaskOutcome,
55/// };
56/// use datavalue::OwnedDataValue;
57/// use serde::Deserialize;
58///
59/// #[derive(Deserialize)]
60/// struct StatsInput {
61/// data_path: String,
62/// output_path: String,
63/// }
64///
65/// struct StatisticsFunction;
66///
67/// #[async_trait]
68/// impl AsyncFunctionHandler for StatisticsFunction {
69/// type Input = StatsInput;
70///
71/// async fn execute(
72/// &self,
73/// ctx: &mut TaskContext<'_>,
74/// input: &StatsInput,
75/// ) -> Result<TaskOutcome> {
76/// let count = ctx.data()
77/// .get(input.data_path.as_str())
78/// .and_then(|v| v.as_array())
79/// .map(|a| a.len())
80/// .unwrap_or(0);
81/// ctx.set(
82/// &format!("data.{}.count", input.output_path),
83/// OwnedDataValue::from(&serde_json::json!(count)),
84/// );
85/// Ok(TaskOutcome::Success)
86/// }
87/// }
88/// ```
89#[async_trait]
90pub trait AsyncFunctionHandler: Send + Sync + 'static {
91 /// Typed configuration shape for this handler. Use
92 /// `serde_json::Value` for handlers that take freeform JSON.
93 type Input: DeserializeOwned + Send + Sync + 'static;
94
95 /// Parse the raw `FunctionConfig::Custom { input }` JSON into
96 /// `Self::Input`. Default impl uses `serde_json::from_value`. Override
97 /// only if you need custom validation beyond what serde provides.
98 ///
99 /// Built-in async function variants (`HttpCall`, `Enrich`,
100 /// `PublishKafka`) bypass this method — their typed configs are already
101 /// parsed by `serde(untagged)` on `FunctionConfig` and dispatched
102 /// directly to the registered handler.
103 fn parse_input(input: &Value) -> Result<Self::Input> {
104 serde_json::from_value(input.clone()).map_err(DataflowError::from_serde)
105 }
106
107 /// Execute the handler. The `ctx` accumulates audit-trail changes
108 /// pushed via its `set` family; the workflow executor folds them into
109 /// the audit trail when this method returns.
110 async fn execute(&self, ctx: &mut TaskContext<'_>, input: &Self::Input) -> Result<TaskOutcome>;
111}
112
113/// Object-safe sibling of [`AsyncFunctionHandler`]. Engine-internal — users
114/// should not implement this directly; the blanket impl below derives it
115/// for any `AsyncFunctionHandler`. Exposed (rather than `pub(crate)`) only
116/// because [`BoxedFunctionHandler`] mentions it in its public type alias.
117#[doc(hidden)]
118#[async_trait]
119pub trait DynAsyncFunctionHandler: Send + Sync + 'static {
120 /// Pre-parse the raw JSON input into the handler's typed shape and box
121 /// it as `dyn Any`. Called once per task at `Engine::new()` time.
122 fn parse_input_box(&self, input: &Value) -> Result<Box<dyn Any + Send + Sync>>;
123
124 /// Execute against an already-parsed typed input. The implementation
125 /// downcasts `input` to `<Self as AsyncFunctionHandler>::Input`; the
126 /// downcast is infallible in the engine's call paths because
127 /// `parse_input_box` produced the very same type.
128 async fn dyn_execute(
129 &self,
130 ctx: &mut TaskContext<'_>,
131 input: &(dyn Any + Send + Sync),
132 ) -> Result<TaskOutcome>;
133}
134
135#[async_trait]
136impl<F: AsyncFunctionHandler> DynAsyncFunctionHandler for F {
137 fn parse_input_box(&self, input: &Value) -> Result<Box<dyn Any + Send + Sync>> {
138 let typed = <F as AsyncFunctionHandler>::parse_input(input)?;
139 Ok(Box::new(typed))
140 }
141
142 async fn dyn_execute(
143 &self,
144 ctx: &mut TaskContext<'_>,
145 input: &(dyn Any + Send + Sync),
146 ) -> Result<TaskOutcome> {
147 let typed = input.downcast_ref::<F::Input>().ok_or_else(|| {
148 DataflowError::Validation(format!(
149 "Handler input type mismatch (expected {})",
150 std::any::type_name::<F::Input>()
151 ))
152 })?;
153 AsyncFunctionHandler::execute(self, ctx, typed).await
154 }
155}
156
157/// Boxed handler stored in the engine's function registry. Users construct
158/// these with `Box::new(MyHandler)` — the blanket impl above auto-coerces
159/// any `AsyncFunctionHandler` into `Box<dyn DynAsyncFunctionHandler + Send + Sync>`.
160pub type BoxedFunctionHandler = Box<dyn DynAsyncFunctionHandler + Send + Sync>;