pub trait AsyncFunctionHandler:
Send
+ Sync
+ 'static {
type Input: DeserializeOwned + Send + Sync + 'static;
// Required method
fn execute<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
ctx: &'life1 mut TaskContext<'life2>,
input: &'life3 Self::Input,
) -> Pin<Box<dyn Future<Output = Result<TaskOutcome>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait;
// Provided method
fn parse_input(input: &Value) -> Result<Self::Input> { ... }
}Expand description
Async interface for task functions that operate on messages.
Implement this trait for custom processing logic. The trait associates a
typed Input deserialized from the task’s FunctionConfig so that
handlers receive their config already parsed — no match FunctionConfig::Custom { input, .. } boilerplate, no per-call
serde_json::from_value cost in the hot path. The engine deserializes the
Custom.input JSON exactly once at Engine::new() time and caches the
typed value alongside the task; mismatched config shapes therefore fail
at startup rather than on first message.
Handlers mutate the message via TaskContext — its set family records
changes on the audit trail automatically when message.capture_changes
is enabled, so handlers don’t have to hand-build crate::engine::message::Change
entries.
§Example
use async_trait::async_trait;
use dataflow_rs::{
AsyncFunctionHandler, Result, TaskContext, TaskOutcome,
};
use datavalue::OwnedDataValue;
use serde::Deserialize;
#[derive(Deserialize)]
struct StatsInput {
data_path: String,
output_path: String,
}
struct StatisticsFunction;
#[async_trait]
impl AsyncFunctionHandler for StatisticsFunction {
type Input = StatsInput;
async fn execute(
&self,
ctx: &mut TaskContext<'_>,
input: &StatsInput,
) -> Result<TaskOutcome> {
let count = ctx.data()
.get(input.data_path.as_str())
.and_then(|v| v.as_array())
.map(|a| a.len())
.unwrap_or(0);
ctx.set(
&format!("data.{}.count", input.output_path),
OwnedDataValue::from(&serde_json::json!(count)),
);
Ok(TaskOutcome::Success)
}
}Required Associated Types§
Sourcetype Input: DeserializeOwned + Send + Sync + 'static
type Input: DeserializeOwned + Send + Sync + 'static
Typed configuration shape for this handler. Use
serde_json::Value for handlers that take freeform JSON.
Required Methods§
Sourcefn execute<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
ctx: &'life1 mut TaskContext<'life2>,
input: &'life3 Self::Input,
) -> Pin<Box<dyn Future<Output = Result<TaskOutcome>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
fn execute<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
ctx: &'life1 mut TaskContext<'life2>,
input: &'life3 Self::Input,
) -> Pin<Box<dyn Future<Output = Result<TaskOutcome>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Execute the handler. The ctx accumulates audit-trail changes
pushed via its set family; the workflow executor folds them into
the audit trail when this method returns.
Provided Methods§
Sourcefn parse_input(input: &Value) -> Result<Self::Input>
fn parse_input(input: &Value) -> Result<Self::Input>
Parse the raw FunctionConfig::Custom { input } JSON into
Self::Input. Default impl uses serde_json::from_value. Override
only if you need custom validation beyond what serde provides.
Built-in async function variants (HttpCall, Enrich,
PublishKafka) bypass this method — their typed configs are already
parsed by serde(untagged) on FunctionConfig and dispatched
directly to the registered handler.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.