dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Unit tests for the worker pool: success, retries, timeout, cancellation.

use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use crate::fixtures::flaky_task;
use dag_executor::advanced::RetryPolicy;
use dag_executor::context::Context;
use dag_executor::dag::WorkerPool;
use dag_executor::metrics::MetricsCollector;
use dag_executor::prelude::*;

fn pool(retry: RetryPolicy, timeout: Option<Duration>) -> WorkerPool {
    WorkerPool::new(4, retry, timeout, Arc::new(MetricsCollector::new()))
}

#[tokio::test]
async fn runs_a_task_to_success() {
    let p = pool(RetryPolicy::none(), None);
    let task = crate::fixtures::ok_task("t", &[]);
    let res = p.spawn(task, Context::for_tests(), None).await.unwrap();
    assert_eq!(res.attempts, 1);
    assert!(res.outcome.is_ok());
}

#[tokio::test]
async fn retries_until_success() {
    let p = pool(RetryPolicy::fixed(5, Duration::ZERO), None);
    let (task, counter) = flaky_task("t", 2); // fail twice, then succeed
    let res = p.spawn(task, Context::for_tests(), None).await.unwrap();
    assert!(res.outcome.is_ok());
    assert_eq!(res.attempts, 3);
    assert_eq!(counter.load(Ordering::SeqCst), 3);
}

#[tokio::test]
async fn gives_up_after_max_attempts() {
    let p = pool(RetryPolicy::fixed(2, Duration::ZERO), None);
    let (task, _counter) = flaky_task("t", 10); // always fails within budget
    let res = p.spawn(task, Context::for_tests(), None).await.unwrap();
    assert!(res.outcome.is_err());
    assert_eq!(res.attempts, 2);
}

#[tokio::test]
async fn enforces_timeout() {
    let p = pool(RetryPolicy::none(), Some(Duration::from_millis(20)));
    let task: Arc<dyn Task> = Arc::new(BasicTask::new("slow", |_| async {
        tokio::time::sleep(Duration::from_secs(10)).await;
        Ok(serde_json::Value::Null)
    }));
    let res = p.spawn(task, Context::for_tests(), None).await.unwrap();
    assert!(matches!(res.outcome, Err(TaskError::Timeout(_))));
}

#[tokio::test]
async fn respects_cancellation() {
    let p = pool(RetryPolicy::none(), None);
    let ctx = Context::for_tests();
    ctx.cancel();
    let task = crate::fixtures::ok_task("t", &[]);
    let res = p.spawn(task, ctx, None).await.unwrap();
    assert!(matches!(res.outcome, Err(TaskError::Cancelled)));
}