dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Example: conditional branching, loops, and fan-out/fan-in together.
//!
//! Run with: `cargo run --example advanced_patterns`

use std::sync::Arc;

use dag_executor::context::Context;
use dag_executor::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut dag = Dag::new();

    // Conditional: decide which path the workflow takes.
    dag.add_task(Arc::new(ConditionalTask::new(
        "should_process",
        "process",
        "skip",
        |_ctx| async { Ok(true) },
    )))?;

    // Loop: retry-like polling that breaks once a value is "ready".
    dag.add_task(Arc::new(
        LoopTask::new("poll", 5, |_ctx: Arc<Context>, i| async move {
            println!("poll attempt {i}");
            Ok(serde_json::json!({ "ready": i >= 2 }))
        })
        .with_break(|out| out.get("ready").and_then(|v| v.as_bool()).unwrap_or(false))
        .with_deps(["should_process"]),
    ))?;

    // Fan-out / fan-in over the loop's result.
    for task in patterns::fan_out_in(
        "compute",
        6,
        Some("poll"),
        |_ctx, i| async move { Ok(serde_json::json!(i * 10)) },
        |_ctx, results| async move {
            let total: u64 = results.iter().filter_map(|r| r.as_u64()).sum();
            println!("aggregated total = {total}");
            Ok(serde_json::json!(total))
        },
    ) {
        dag.add_task(task)?;
    }

    let executor = DagExecutor::builder().persist(false).build();
    let report = executor.run(dag).await?;
    println!("workflow success: {}", report.is_success());
    Ok(())
}