dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Integration tests for retries, dead-lettering, skip-cascades and the
//! circuit breaker.

use std::sync::atomic::Ordering;
use std::time::Duration;

use crate::fixtures::{failing_task, flaky_task, ok_task};
use dag_executor::advanced::{Backoff, CircuitBreaker, CircuitState, RetryPolicy};
use dag_executor::prelude::*;

#[tokio::test]
async fn flaky_task_succeeds_after_retries() {
    let mut dag = Dag::new();
    let (task, counter) = flaky_task("flaky", 2);
    dag.add_task(task).unwrap();

    let executor = DagExecutor::builder()
        .persist(false)
        .retry(RetryPolicy {
            max_attempts: 5,
            backoff: Backoff::Fixed(Duration::ZERO),
            jitter: false,
        })
        .build();

    let report = executor.run(dag).await.unwrap();
    assert!(report.is_success());
    assert_eq!(counter.load(Ordering::SeqCst), 3);
    assert!(report.metrics.retries >= 2);
}

#[tokio::test]
async fn failure_dead_letters_and_skips_dependents() {
    let mut dag = Dag::new();
    dag.add_task(failing_task("bad", &[])).unwrap();
    dag.add_task(ok_task("downstream", &["bad"])).unwrap();
    dag.add_task(ok_task("further", &["downstream"])).unwrap();

    let executor = DagExecutor::builder()
        .persist(false)
        .retry(RetryPolicy::none())
        .build();
    let report = executor.run(dag).await.unwrap();

    assert!(!report.is_success());
    assert_eq!(report.records["bad"].state, TaskState::DeadLettered);
    // The whole downstream chain is skipped.
    assert_eq!(report.records["downstream"].state, TaskState::Skipped);
    assert_eq!(report.records["further"].state, TaskState::Skipped);

    let dlq = executor.dead_letter().list().await.unwrap();
    assert_eq!(dlq.len(), 1);
    assert_eq!(dlq[0].task_id, "bad");
}

#[tokio::test]
async fn unrelated_branches_still_succeed_after_a_failure() {
    let mut dag = Dag::new();
    dag.add_task(failing_task("bad", &[])).unwrap();
    dag.add_task(ok_task("good", &[])).unwrap();
    dag.add_task(ok_task("good_child", &["good"])).unwrap();

    let executor = DagExecutor::builder()
        .persist(false)
        .retry(RetryPolicy::none())
        .build();
    let report = executor.run(dag).await.unwrap();

    assert_eq!(report.records["good"].state, TaskState::Completed);
    assert_eq!(report.records["good_child"].state, TaskState::Completed);
    assert_eq!(report.records["bad"].state, TaskState::DeadLettered);
}

#[test]
fn circuit_breaker_opens_and_recovers() {
    let breaker = CircuitBreaker::new(2, 1, Duration::from_millis(50));
    assert_eq!(breaker.state(), CircuitState::Closed);

    breaker.record_failure();
    assert!(breaker.allow_request());
    breaker.record_failure();
    // Threshold reached -> open, requests rejected.
    assert_eq!(breaker.state(), CircuitState::Open);
    assert!(!breaker.allow_request());

    // After cooldown it half-opens to probe.
    std::thread::sleep(Duration::from_millis(60));
    assert_eq!(breaker.state(), CircuitState::HalfOpen);
    assert!(breaker.allow_request());

    // A successful probe closes it again.
    breaker.record_success();
    assert_eq!(breaker.state(), CircuitState::Closed);
}