Skip to main content

Module fanout

Module fanout 

Source
Expand description

FanOutTask – a composite task that runs multiple child tasks in parallel

This task provides simple parallelism within a single graph node. It executes a fixed set of child tasks concurrently, waits for them to finish, aggregates their responses into the shared Context, and then returns control back to the graph with NextAction::Continue (by default).

Design goals:

  • Keep engine changes minimal (no changes to Graph needed)
  • Keep semantics simple and predictable
  • Make context aggregation explicit and easy to consume by downstream tasks

Important caveats:

  • Child tasks’ NextAction is ignored by FanOutTask. Children are treated as units-of-work that produce outputs and/or write to context, not as control-flow steps. The FanOutTask itself controls the next step of the graph.
  • By default, all children share the same Context (concurrent writes must be coordinated by the user). To avoid key collisions, you can set a prefix so that each child’s output is stored under "<prefix>.<child_id>.*".
  • Error policy is conservative: if any child fails, FanOutTask fails.

Example:

use graph_flow::{Context, Task, TaskResult, NextAction};
use graph_flow::fanout::FanOutTask;
use async_trait::async_trait;
use std::sync::Arc;

struct ChildA;
struct ChildB;

#[async_trait]
impl Task for ChildA {
    fn id(&self) -> &str { "child_a" }
    async fn run(&self, ctx: Context) -> graph_flow::Result<TaskResult> {
        ctx.set("a", 1_i32).await;
        Ok(TaskResult::new(Some("A done".to_string()), NextAction::End))
    }
}

#[async_trait]
impl Task for ChildB {
    fn id(&self) -> &str { "child_b" }
    async fn run(&self, ctx: Context) -> graph_flow::Result<TaskResult> {
        ctx.set("b", 2_i32).await;
        Ok(TaskResult::new(Some("B done".to_string()), NextAction::End))
    }
}

let fan = FanOutTask::new("fan", vec![Arc::new(ChildA), Arc::new(ChildB)])
    .with_prefix("fanout");
let ctx = Context::new();
let _ = fan.run(ctx.clone()).await?;
// Aggregated entries under prefix:
// fanout.child_a.response, fanout.child_b.response

Structs§

FanOutTask
Composite task that executes multiple child tasks concurrently and aggregates results.