dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Benchmark: scheduling throughput for a wide, shallow DAG.

use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use dag_executor::context::Context;
use dag_executor::prelude::*;
use tokio::runtime::Runtime;

fn build_wide_dag(n: usize) -> Dag {
    let mut dag = Dag::new();
    dag.add_task(Arc::new(BasicTask::new("root", |_| async {
        Ok(serde_json::Value::Null)
    })))
    .unwrap();
    for i in 0..n {
        dag.add_task(Arc::new(
            BasicTask::new(format!("t{i}"), |_| async { Ok(serde_json::json!(1)) })
                .with_deps(["root"]),
        ))
        .unwrap();
    }
    dag
}

fn bench_scheduling(c: &mut Criterion) {
    let rt = Runtime::new().unwrap();
    let mut group = c.benchmark_group("scheduling");
    for n in [100usize, 1000] {
        group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, &n| {
            b.iter(|| {
                let executor = DagExecutor::builder()
                    .persist(false)
                    .concurrency(256)
                    .build();
                let dag = build_wide_dag(n);
                let ctx = Context::for_tests();
                rt.block_on(executor.run_with_context(dag, ctx)).unwrap();
            });
        });
    }
    group.finish();
}

criterion_group!(benches, bench_scheduling);
criterion_main!(benches);