dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Minimal example: a three-task linear workflow.
//!
//! Run with: `cargo run --example basic_usage`

use std::sync::Arc;

use dag_executor::context::Context;
use dag_executor::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut dag = Dag::new();

    dag.add_task(Arc::new(BasicTask::new(
        "extract",
        |_ctx: Arc<Context>| async {
            println!("extracting...");
            Ok(serde_json::json!({ "rows": 100 }))
        },
    )))?;

    dag.add_task(Arc::new(
        BasicTask::new("transform", |ctx: Arc<Context>| async move {
            let rows = ctx
                .get("extract")
                .and_then(|v| v.get("rows").and_then(|r| r.as_u64()))
                .unwrap_or(0);
            println!("transforming {rows} rows...");
            Ok(serde_json::json!({ "rows": rows * 2 }))
        })
        .with_deps(["extract"]),
    ))?;

    dag.add_task(Arc::new(
        BasicTask::new("load", |ctx: Arc<Context>| async move {
            let rows = ctx
                .get("transform")
                .and_then(|v| v.get("rows").and_then(|r| r.as_u64()))
                .unwrap_or(0);
            println!("loading {rows} rows...");
            Ok(serde_json::Value::Null)
        })
        .with_deps(["transform"]),
    ))?;

    let executor = DagExecutor::builder().persist(false).build();
    let report = executor.run(dag).await?;

    println!(
        "done: success={} completed={}",
        report.is_success(),
        report.metrics.tasks_completed
    );
    Ok(())
}