taskvisor 0.2.1

Event-driven task orchestration with restart, backoff, and user-defined subscribers
Documentation
//! Per-attempt timeout integration tests.

mod common;

use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;

use common::*;
use taskvisor::BackoffSource;
use taskvisor::prelude::*;

#[tokio::test(flavor = "current_thread")]
async fn per_attempt_timeout_emits_timeout_hit_before_task_failed_then_retries() {
    let collector = EventCollector::new();
    let subs: Vec<Arc<dyn Subscribe>> = vec![collector.clone() as Arc<dyn Subscribe>];
    let sup = Supervisor::new(SupervisorConfig::default(), subs);

    let task = TaskFn::arc("slow", |_ctx: CancellationToken| async move {
        tokio::time::sleep(Duration::from_secs(3600)).await;
        Ok(())
    });
    let spec = TaskSpec::restartable(task)
        .with_timeout(Some(Duration::from_millis(50)))
        .with_backoff(fast_backoff())
        .with_max_retries(1);
    with_timeout(10, sup.run(vec![spec]))
        .await
        .expect("run() should return Ok");

    assert!(
        poll_until(Duration::from_secs(2), || async {
            collector.find(EventKind::ActorExhausted).is_some()
        })
        .await
    );

    assert_eq!(collector.count(EventKind::TimeoutHit), 2);
    assert_eq!(collector.count(EventKind::TaskFailed), 2);
    assert_eq!(collector.count(EventKind::BackoffScheduled), 1);
    assert_eq!(collector.count(EventKind::ActorExhausted), 1);

    for attempt in 1..=2u32 {
        let hit = collector
            .find_all(EventKind::TimeoutHit)
            .into_iter()
            .find(|e| e.attempt == Some(attempt))
            .unwrap();
        let failed = collector
            .find_all(EventKind::TaskFailed)
            .into_iter()
            .find(|e| e.attempt == Some(attempt))
            .unwrap();
        assert!(hit.seq < failed.seq, "TimeoutHit must precede TaskFailed");
        assert!(failed.reason.as_deref().unwrap().contains("timed out"));
        assert_eq!(failed.exit_code, None);
    }

    let reason = collector
        .find(EventKind::ActorExhausted)
        .unwrap()
        .reason
        .unwrap();
    assert!(reason.contains("max_retries_exceeded") && reason.contains("(1/1)"));
}

#[tokio::test(flavor = "current_thread")]
async fn timeout_then_success_unlimited_retries_exhausts_on_success() {
    let collector = EventCollector::new();
    let subs: Vec<Arc<dyn Subscribe>> = vec![collector.clone() as Arc<dyn Subscribe>];
    let sup = Supervisor::new(SupervisorConfig::default(), subs);

    let n = Arc::new(AtomicU32::new(0));
    let nc = n.clone();
    let task = TaskFn::arc("slow-then-ok", move |_ctx: CancellationToken| {
        let nc = nc.clone();
        async move {
            let c = nc.fetch_add(1, Ordering::SeqCst);
            if c == 0 {
                tokio::time::sleep(Duration::from_secs(3600)).await;
            }
            Ok(())
        }
    });
    let spec = TaskSpec::restartable(task)
        .with_timeout(Some(Duration::from_millis(50)))
        .with_backoff(fast_backoff());
    with_timeout(10, sup.run(vec![spec]))
        .await
        .expect("run() should return Ok");

    assert!(
        poll_until(Duration::from_secs(2), || async {
            collector.find(EventKind::ActorExhausted).is_some()
        })
        .await
    );

    assert_eq!(collector.count(EventKind::TimeoutHit), 1);
    assert_eq!(collector.count(EventKind::TaskFailed), 1);
    assert_eq!(collector.count(EventKind::BackoffScheduled), 1);
    assert_eq!(collector.count(EventKind::TaskStarting), 2);
    assert_eq!(collector.count(EventKind::TaskStopped), 1);
    assert_eq!(collector.count(EventKind::ActorExhausted), 1);
    assert_eq!(collector.count(EventKind::ActorDead), 0);

    assert_eq!(
        collector
            .find(EventKind::BackoffScheduled)
            .unwrap()
            .backoff_source,
        Some(BackoffSource::Failure)
    );
    assert_eq!(
        collector
            .find(EventKind::ActorExhausted)
            .unwrap()
            .reason
            .as_deref(),
        Some("policy_exhausted_success")
    );
}

#[tokio::test(flavor = "current_thread")]
async fn zero_timeout_means_no_timeout_task_runs_to_completion() {
    let collector = EventCollector::new();
    let subs: Vec<Arc<dyn Subscribe>> = vec![collector.clone() as Arc<dyn Subscribe>];
    let sup = Supervisor::new(SupervisorConfig::default(), subs);

    let task = TaskFn::arc("zero-to", |_ctx: CancellationToken| async move {
        tokio::time::sleep(Duration::from_millis(30)).await;
        Ok(())
    });
    let spec = TaskSpec::once(task).with_timeout(Some(Duration::ZERO));
    with_timeout(5, sup.run(vec![spec]))
        .await
        .expect("run() should return Ok");

    assert!(
        poll_until(Duration::from_secs(2), || async {
            collector.find(EventKind::ActorExhausted).is_some()
        })
        .await
    );

    assert_eq!(collector.count(EventKind::TimeoutHit), 0);
    assert_eq!(collector.count(EventKind::TaskStopped), 1);
    assert_eq!(collector.count(EventKind::TaskFailed), 0);
    assert_eq!(
        collector
            .find(EventKind::ActorExhausted)
            .unwrap()
            .reason
            .as_deref(),
        Some("policy_exhausted_success")
    );
}