timer-lib 0.4.0

A feature-rich Rust library for creating and managing timers.
Documentation
use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc,
};
use std::time::Duration;

use timer_lib::{RecurringSchedule, Timer, TimerFinishReason, TimerRegistry};
use tokio::task::yield_now;
use tokio::time::{advance, timeout};

async fn settle() {
    for _ in 0..8 {
        yield_now().await;
    }
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn many_timers_complete_under_registry_load() {
    const TIMER_COUNT: usize = 512;

    let registry = TimerRegistry::new();
    let executions = Arc::new(AtomicUsize::new(0));
    let mut timers = Vec::with_capacity(TIMER_COUNT);

    for index in 0..TIMER_COUNT {
        let executions = Arc::clone(&executions);
        let (timer_id, timer) = registry
            .start_once(Duration::from_millis((index as u64) + 1), move || {
                let executions = Arc::clone(&executions);
                async move {
                    executions.fetch_add(1, Ordering::SeqCst);
                    Ok(())
                }
            })
            .await
            .unwrap();

        assert!(registry.get(timer_id).await.is_some());
        timers.push(timer);
    }

    advance(Duration::from_millis(TIMER_COUNT as u64)).await;
    settle().await;

    for timer in timers {
        let outcome = timer.join().await.unwrap();
        assert_eq!(outcome.reason, TimerFinishReason::Completed);
    }

    assert_eq!(executions.load(Ordering::SeqCst), TIMER_COUNT);
    assert!(registry.active_ids().await.is_empty());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_joiners_see_the_same_completed_outcome() {
    let timer = Timer::new();
    let run_id = timer
        .start_once(Duration::from_millis(20), || async { Ok(()) })
        .await
        .unwrap();

    let joiner_a = {
        let timer = timer.clone();
        tokio::spawn(async move { timer.join().await.unwrap() })
    };
    let joiner_b = {
        let timer = timer.clone();
        tokio::spawn(async move { timer.join().await.unwrap() })
    };

    let outcome_a = timeout(Duration::from_secs(1), joiner_a)
        .await
        .unwrap()
        .unwrap();
    let outcome_b = timeout(Duration::from_secs(1), joiner_b)
        .await
        .unwrap()
        .unwrap();

    assert_eq!(outcome_a.run_id, run_id);
    assert_eq!(outcome_b.run_id, run_id);
    assert_eq!(outcome_a.reason, TimerFinishReason::Completed);
    assert_eq!(outcome_b.reason, TimerFinishReason::Completed);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn registry_cancel_all_stops_many_recurring_timers_concurrently() {
    const TIMER_COUNT: usize = 512;

    let registry = TimerRegistry::new();
    let mut timers = Vec::with_capacity(TIMER_COUNT);

    for _ in 0..TIMER_COUNT {
        let (timer_id, timer) = registry
            .start_recurring(
                RecurringSchedule::new(Duration::from_millis(50)),
                || async { Ok(()) },
            )
            .await
            .unwrap();
        assert!(registry.get(timer_id).await.is_some());
        timers.push(timer);
    }

    registry.cancel_all().await;

    for timer in timers {
        let outcome = timeout(Duration::from_secs(1), timer.join())
            .await
            .unwrap()
            .unwrap();
        assert_eq!(outcome.reason, TimerFinishReason::Cancelled);
    }

    assert!(registry.active_ids().await.is_empty());
}