use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use crate::fixtures::flaky_task;
use dag_executor::advanced::RetryPolicy;
use dag_executor::context::Context;
use dag_executor::dag::WorkerPool;
use dag_executor::metrics::MetricsCollector;
use dag_executor::prelude::*;
fn pool(retry: RetryPolicy, timeout: Option<Duration>) -> WorkerPool {
WorkerPool::new(4, retry, timeout, Arc::new(MetricsCollector::new()))
}
#[tokio::test]
async fn runs_a_task_to_success() {
let p = pool(RetryPolicy::none(), None);
let task = crate::fixtures::ok_task("t", &[]);
let res = p.spawn(task, Context::for_tests(), None).await.unwrap();
assert_eq!(res.attempts, 1);
assert!(res.outcome.is_ok());
}
#[tokio::test]
async fn retries_until_success() {
let p = pool(RetryPolicy::fixed(5, Duration::ZERO), None);
let (task, counter) = flaky_task("t", 2); let res = p.spawn(task, Context::for_tests(), None).await.unwrap();
assert!(res.outcome.is_ok());
assert_eq!(res.attempts, 3);
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn gives_up_after_max_attempts() {
let p = pool(RetryPolicy::fixed(2, Duration::ZERO), None);
let (task, _counter) = flaky_task("t", 10); let res = p.spawn(task, Context::for_tests(), None).await.unwrap();
assert!(res.outcome.is_err());
assert_eq!(res.attempts, 2);
}
#[tokio::test]
async fn enforces_timeout() {
let p = pool(RetryPolicy::none(), Some(Duration::from_millis(20)));
let task: Arc<dyn Task> = Arc::new(BasicTask::new("slow", |_| async {
tokio::time::sleep(Duration::from_secs(10)).await;
Ok(serde_json::Value::Null)
}));
let res = p.spawn(task, Context::for_tests(), None).await.unwrap();
assert!(matches!(res.outcome, Err(TaskError::Timeout(_))));
}
#[tokio::test]
async fn respects_cancellation() {
let p = pool(RetryPolicy::none(), None);
let ctx = Context::for_tests();
ctx.cancel();
let task = crate::fixtures::ok_task("t", &[]);
let res = p.spawn(task, ctx, None).await.unwrap();
assert!(matches!(res.outcome, Err(TaskError::Cancelled)));
}