dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Benchmark: end-to-end throughput for a chain + fan-out workload.

use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use dag_executor::context::Context;
use dag_executor::prelude::*;
use tokio::runtime::Runtime;

fn build_dag(n: usize) -> Dag {
    let mut dag = Dag::new();
    for task in patterns::fan_out_in(
        "fan",
        n,
        None,
        |_, i| async move { Ok(serde_json::json!(i)) },
        |_, results| async move { Ok(serde_json::json!(results.len())) },
    ) {
        dag.add_task(task).unwrap();
    }
    dag
}

fn bench_throughput(c: &mut Criterion) {
    let rt = Runtime::new().unwrap();
    let n = 1000usize;
    let mut group = c.benchmark_group("throughput");
    group.throughput(Throughput::Elements(n as u64 + 1));
    group.bench_function("fan_out_1000", |b| {
        b.iter(|| {
            let executor = DagExecutor::builder()
                .persist(false)
                .concurrency(512)
                .build();
            let dag = build_dag(n);
            let ctx = Context::for_tests();
            rt.block_on(executor.run_with_context(dag, ctx)).unwrap();
        });
    });
    group.finish();
}

criterion_group!(benches, bench_throughput);
criterion_main!(benches);