use std::sync::Arc;
use std::time::Instant;
use dag_executor::context::Context;
use dag_executor::prelude::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn executes_over_one_thousand_tasks() {
const N: usize = 1500;
let mut dag = Dag::new();
dag.add_task(Arc::new(BasicTask::new("root", |_| async {
Ok(serde_json::Value::Null)
})))
.unwrap();
for i in 0..N {
dag.add_task(Arc::new(
BasicTask::new(format!("t{i}"), |_| async { Ok(serde_json::json!(1)) })
.with_deps(["root"]),
))
.unwrap();
}
let executor = DagExecutor::builder()
.persist(false)
.concurrency(512)
.build();
let started = Instant::now();
let report = executor.run(dag).await.unwrap();
let elapsed = started.elapsed();
assert!(report.is_success());
assert_eq!(report.count_in(TaskState::Completed), N + 1);
assert!(elapsed.as_secs() < 30, "took too long: {elapsed:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn deep_chain_resolves_in_order() {
const DEPTH: usize = 500;
let mut dag = Dag::new();
let mut prev: Option<String> = None;
for i in 0..DEPTH {
let id = format!("s{i}");
let mut t = BasicTask::new(id.clone(), |_| async { Ok(serde_json::json!(0)) });
if let Some(p) = prev.take() {
t = t.with_deps([p]);
}
prev = Some(id);
dag.add_task(Arc::new(t)).unwrap();
}
let executor = DagExecutor::builder()
.persist(false)
.concurrency(64)
.build();
let report = executor
.run_with_context(dag, Context::for_tests())
.await
.unwrap();
assert!(report.is_success());
assert_eq!(report.count_in(TaskState::Completed), DEPTH);
}