dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Integration tests exercising the executor at scale.

use std::sync::Arc;
use std::time::Instant;

use dag_executor::context::Context;
use dag_executor::prelude::*;

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn executes_over_one_thousand_tasks() {
    const N: usize = 1500;
    let mut dag = Dag::new();
    dag.add_task(Arc::new(BasicTask::new("root", |_| async {
        Ok(serde_json::Value::Null)
    })))
    .unwrap();
    for i in 0..N {
        dag.add_task(Arc::new(
            BasicTask::new(format!("t{i}"), |_| async { Ok(serde_json::json!(1)) })
                .with_deps(["root"]),
        ))
        .unwrap();
    }

    let executor = DagExecutor::builder()
        .persist(false)
        .concurrency(512)
        .build();
    let started = Instant::now();
    let report = executor.run(dag).await.unwrap();
    let elapsed = started.elapsed();

    assert!(report.is_success());
    assert_eq!(report.count_in(TaskState::Completed), N + 1);
    // Loose sanity bound; the engine should chew through this near-instantly.
    assert!(elapsed.as_secs() < 30, "took too long: {elapsed:?}");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn deep_chain_resolves_in_order() {
    const DEPTH: usize = 500;
    let mut dag = Dag::new();
    let mut prev: Option<String> = None;
    for i in 0..DEPTH {
        let id = format!("s{i}");
        let mut t = BasicTask::new(id.clone(), |_| async { Ok(serde_json::json!(0)) });
        if let Some(p) = prev.take() {
            t = t.with_deps([p]);
        }
        prev = Some(id);
        dag.add_task(Arc::new(t)).unwrap();
    }

    let executor = DagExecutor::builder()
        .persist(false)
        .concurrency(64)
        .build();
    let report = executor
        .run_with_context(dag, Context::for_tests())
        .await
        .unwrap();
    assert!(report.is_success());
    assert_eq!(report.count_in(TaskState::Completed), DEPTH);
}