Skip to main content

AsyncFunctionHandler

Trait AsyncFunctionHandler 

Source
pub trait AsyncFunctionHandler:
    Send
    + Sync
    + 'static {
    type Input: DeserializeOwned + Send + Sync + 'static;

    // Required method
    fn execute<'life0, 'life1, 'life2, 'life3, 'async_trait>(
        &'life0 self,
        ctx: &'life1 mut TaskContext<'life2>,
        input: &'life3 Self::Input,
    ) -> Pin<Box<dyn Future<Output = Result<TaskOutcome>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait,
             'life3: 'async_trait;

    // Provided method
    fn parse_input(input: &Value) -> Result<Self::Input> { ... }
}
Expand description

Async interface for task functions that operate on messages.

Implement this trait for custom processing logic. The trait associates a typed Input deserialized from the task’s FunctionConfig so that handlers receive their config already parsed — no match FunctionConfig::Custom { input, .. } boilerplate, no per-call serde_json::from_value cost in the hot path. The engine deserializes the Custom.input JSON exactly once at Engine::new() time and caches the typed value alongside the task; mismatched config shapes therefore fail at startup rather than on first message.

Handlers mutate the message via TaskContext — its set family records changes on the audit trail automatically when message.capture_changes is enabled, so handlers don’t have to hand-build crate::engine::message::Change entries.

§Example

use async_trait::async_trait;
use dataflow_rs::{
    AsyncFunctionHandler, Result, TaskContext, TaskOutcome,
};
use datavalue::OwnedDataValue;
use serde::Deserialize;

#[derive(Deserialize)]
struct StatsInput {
    data_path: String,
    output_path: String,
}

struct StatisticsFunction;

#[async_trait]
impl AsyncFunctionHandler for StatisticsFunction {
    type Input = StatsInput;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &StatsInput,
    ) -> Result<TaskOutcome> {
        let count = ctx.data()
            .get(input.data_path.as_str())
            .and_then(|v| v.as_array())
            .map(|a| a.len())
            .unwrap_or(0);
        ctx.set(
            &format!("data.{}.count", input.output_path),
            OwnedDataValue::from(&serde_json::json!(count)),
        );
        Ok(TaskOutcome::Success)
    }
}

Required Associated Types§

Source

type Input: DeserializeOwned + Send + Sync + 'static

Typed configuration shape for this handler. Use serde_json::Value for handlers that take freeform JSON.

Required Methods§

Source

fn execute<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, ctx: &'life1 mut TaskContext<'life2>, input: &'life3 Self::Input, ) -> Pin<Box<dyn Future<Output = Result<TaskOutcome>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Execute the handler. The ctx accumulates audit-trail changes pushed via its set family; the workflow executor folds them into the audit trail when this method returns.

Provided Methods§

Source

fn parse_input(input: &Value) -> Result<Self::Input>

Parse the raw FunctionConfig::Custom { input } JSON into Self::Input. Default impl uses serde_json::from_value. Override only if you need custom validation beyond what serde provides.

Built-in async function variants (HttpCall, Enrich, PublishKafka) bypass this method — their typed configs are already parsed by serde(untagged) on FunctionConfig and dispatched directly to the registered handler.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§