dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Integration tests for complete workflows and task types.

use std::sync::Arc;
use std::time::Duration;

use crate::fixtures::ok_task;
use dag_executor::context::Context;
use dag_executor::prelude::*;

fn executor() -> DagExecutor {
    DagExecutor::builder()
        .persist(false)
        .concurrency(16)
        .build()
}

#[tokio::test]
async fn runs_a_linear_workflow() {
    let mut dag = Dag::new();
    dag.add_task(ok_task("a", &[])).unwrap();
    dag.add_task(ok_task("b", &["a"])).unwrap();
    dag.add_task(ok_task("c", &["b"])).unwrap();

    let report = executor().run(dag).await.unwrap();
    assert!(report.is_success());
    assert_eq!(report.count_in(TaskState::Completed), 3);
}

#[tokio::test]
async fn runs_diamond_with_fan_out_in() {
    let mut dag = Dag::new();
    dag.add_task(ok_task("start", &[])).unwrap();
    for task in patterns::fan_out_in(
        "stage",
        16,
        Some("start"),
        |_ctx, i| async move { Ok(serde_json::json!({ "value": i })) },
        |_ctx, results| async move {
            let sum: u64 = results
                .iter()
                .filter_map(|r| r.get("value").and_then(|v| v.as_u64()))
                .sum();
            Ok(serde_json::json!(sum))
        },
    ) {
        dag.add_task(task).unwrap();
    }

    let report = executor().run(dag).await.unwrap();
    assert!(report.is_success());
    // Sum of 0..16 == 120.
    assert_eq!(
        report.records["stage.aggregate"].output,
        Some(serde_json::json!(120))
    );
}

#[tokio::test]
async fn conditional_branch_drives_downstream() {
    let mut dag = Dag::new();
    dag.add_task(Arc::new(ConditionalTask::new(
        "cond",
        "yes",
        "no",
        |_ctx| async { Ok(true) },
    )))
    .unwrap();
    dag.add_task(Arc::new(
        BasicTask::new("after", |ctx: Arc<Context>| async move {
            Ok(ctx.get("cond.branch").unwrap_or(serde_json::Value::Null))
        })
        .with_deps(["cond"]),
    ))
    .unwrap();

    let report = executor().run(dag).await.unwrap();
    assert!(report.is_success());
    assert_eq!(
        report.records["after"].output,
        Some(serde_json::json!("yes"))
    );
}

#[tokio::test]
async fn loop_task_breaks_early() {
    let mut dag = Dag::new();
    dag.add_task(Arc::new(
        LoopTask::new(
            "loop",
            10,
            |_ctx, i| async move { Ok(serde_json::json!(i)) },
        )
        .with_break(|out| out.as_u64() == Some(2)),
    ))
    .unwrap();

    let report = executor().run(dag).await.unwrap();
    let out = report.records["loop"].output.clone().unwrap();
    assert_eq!(out["iterations"], serde_json::json!(3)); // i = 0,1,2
    assert_eq!(out["broke_early"], serde_json::json!(true));
}

#[tokio::test]
async fn event_driven_task_waits_for_signal() {
    let mut dag = Dag::new();
    dag.add_task(Arc::new(
        EventDrivenTask::new("waiter", "go", |_ctx| async {
            Ok(serde_json::json!("woke up"))
        })
        .with_timeout(Duration::from_secs(5)),
    ))
    .unwrap();
    dag.add_task(Arc::new(BasicTask::new(
        "emitter",
        |ctx: Arc<Context>| async move {
            // Give the waiter time to register before emitting.
            tokio::time::sleep(Duration::from_millis(50)).await;
            ctx.emit("go");
            Ok(serde_json::Value::Null)
        },
    )))
    .unwrap();

    let report = executor().run(dag).await.unwrap();
    assert!(report.is_success());
    assert_eq!(
        report.records["waiter"].output,
        Some(serde_json::json!("woke up"))
    );
}