use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::fixtures::{failing_task, flaky_task, ok_task};
use dag_executor::advanced::{Backoff, CircuitBreaker, CircuitState, RetryPolicy};
use dag_executor::prelude::*;
#[tokio::test]
async fn flaky_task_succeeds_after_retries() {
let mut dag = Dag::new();
let (task, counter) = flaky_task("flaky", 2);
dag.add_task(task).unwrap();
let executor = DagExecutor::builder()
.persist(false)
.retry(RetryPolicy {
max_attempts: 5,
backoff: Backoff::Fixed(Duration::ZERO),
jitter: false,
})
.build();
let report = executor.run(dag).await.unwrap();
assert!(report.is_success());
assert_eq!(counter.load(Ordering::SeqCst), 3);
assert!(report.metrics.retries >= 2);
}
#[tokio::test]
async fn failure_dead_letters_and_skips_dependents() {
let mut dag = Dag::new();
dag.add_task(failing_task("bad", &[])).unwrap();
dag.add_task(ok_task("downstream", &["bad"])).unwrap();
dag.add_task(ok_task("further", &["downstream"])).unwrap();
let executor = DagExecutor::builder()
.persist(false)
.retry(RetryPolicy::none())
.build();
let report = executor.run(dag).await.unwrap();
assert!(!report.is_success());
assert_eq!(report.records["bad"].state, TaskState::DeadLettered);
assert_eq!(report.records["downstream"].state, TaskState::Skipped);
assert_eq!(report.records["further"].state, TaskState::Skipped);
let dlq = executor.dead_letter().list().await.unwrap();
assert_eq!(dlq.len(), 1);
assert_eq!(dlq[0].task_id, "bad");
}
#[tokio::test]
async fn unrelated_branches_still_succeed_after_a_failure() {
let mut dag = Dag::new();
dag.add_task(failing_task("bad", &[])).unwrap();
dag.add_task(ok_task("good", &[])).unwrap();
dag.add_task(ok_task("good_child", &["good"])).unwrap();
let executor = DagExecutor::builder()
.persist(false)
.retry(RetryPolicy::none())
.build();
let report = executor.run(dag).await.unwrap();
assert_eq!(report.records["good"].state, TaskState::Completed);
assert_eq!(report.records["good_child"].state, TaskState::Completed);
assert_eq!(report.records["bad"].state, TaskState::DeadLettered);
}
#[test]
fn circuit_breaker_opens_and_recovers() {
let breaker = CircuitBreaker::new(2, 1, Duration::from_millis(50));
assert_eq!(breaker.state(), CircuitState::Closed);
breaker.record_failure();
assert!(breaker.allow_request());
breaker.record_failure();
assert_eq!(breaker.state(), CircuitState::Open);
assert!(!breaker.allow_request());
std::thread::sleep(Duration::from_millis(60));
assert_eq!(breaker.state(), CircuitState::HalfOpen);
assert!(breaker.allow_request());
breaker.record_success();
assert_eq!(breaker.state(), CircuitState::Closed);
}