use std::sync::Arc;
use std::time::Duration;
use crate::fixtures::ok_task;
use dag_executor::context::Context;
use dag_executor::prelude::*;
fn executor() -> DagExecutor {
DagExecutor::builder()
.persist(false)
.concurrency(16)
.build()
}
#[tokio::test]
async fn runs_a_linear_workflow() {
let mut dag = Dag::new();
dag.add_task(ok_task("a", &[])).unwrap();
dag.add_task(ok_task("b", &["a"])).unwrap();
dag.add_task(ok_task("c", &["b"])).unwrap();
let report = executor().run(dag).await.unwrap();
assert!(report.is_success());
assert_eq!(report.count_in(TaskState::Completed), 3);
}
#[tokio::test]
async fn runs_diamond_with_fan_out_in() {
let mut dag = Dag::new();
dag.add_task(ok_task("start", &[])).unwrap();
for task in patterns::fan_out_in(
"stage",
16,
Some("start"),
|_ctx, i| async move { Ok(serde_json::json!({ "value": i })) },
|_ctx, results| async move {
let sum: u64 = results
.iter()
.filter_map(|r| r.get("value").and_then(|v| v.as_u64()))
.sum();
Ok(serde_json::json!(sum))
},
) {
dag.add_task(task).unwrap();
}
let report = executor().run(dag).await.unwrap();
assert!(report.is_success());
assert_eq!(
report.records["stage.aggregate"].output,
Some(serde_json::json!(120))
);
}
#[tokio::test]
async fn conditional_branch_drives_downstream() {
let mut dag = Dag::new();
dag.add_task(Arc::new(ConditionalTask::new(
"cond",
"yes",
"no",
|_ctx| async { Ok(true) },
)))
.unwrap();
dag.add_task(Arc::new(
BasicTask::new("after", |ctx: Arc<Context>| async move {
Ok(ctx.get("cond.branch").unwrap_or(serde_json::Value::Null))
})
.with_deps(["cond"]),
))
.unwrap();
let report = executor().run(dag).await.unwrap();
assert!(report.is_success());
assert_eq!(
report.records["after"].output,
Some(serde_json::json!("yes"))
);
}
#[tokio::test]
async fn loop_task_breaks_early() {
let mut dag = Dag::new();
dag.add_task(Arc::new(
LoopTask::new(
"loop",
10,
|_ctx, i| async move { Ok(serde_json::json!(i)) },
)
.with_break(|out| out.as_u64() == Some(2)),
))
.unwrap();
let report = executor().run(dag).await.unwrap();
let out = report.records["loop"].output.clone().unwrap();
assert_eq!(out["iterations"], serde_json::json!(3)); assert_eq!(out["broke_early"], serde_json::json!(true));
}
#[tokio::test]
async fn event_driven_task_waits_for_signal() {
let mut dag = Dag::new();
dag.add_task(Arc::new(
EventDrivenTask::new("waiter", "go", |_ctx| async {
Ok(serde_json::json!("woke up"))
})
.with_timeout(Duration::from_secs(5)),
))
.unwrap();
dag.add_task(Arc::new(BasicTask::new(
"emitter",
|ctx: Arc<Context>| async move {
tokio::time::sleep(Duration::from_millis(50)).await;
ctx.emit("go");
Ok(serde_json::Value::Null)
},
)))
.unwrap();
let report = executor().run(dag).await.unwrap();
assert!(report.is_success());
assert_eq!(
report.records["waiter"].output,
Some(serde_json::json!("woke up"))
);
}