cloudiful-scheduler 0.4.2

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
use chrono::{TimeDelta, Timelike, Utc};
use scheduler::{
    CronSchedule, InMemoryStateStore, Job, JobState, MissedRunPolicy, OverlapPolicy, Schedule,
    Scheduler, SchedulerConfig, StateStore, Task,
};
use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};
use std::time::Duration;
use tokio::sync::{Mutex, mpsc};

fn minute_floor(value: chrono::DateTime<Utc>) -> chrono::DateTime<Utc> {
    value
        .with_second(0)
        .expect("valid second")
        .with_nanosecond(0)
        .expect("valid nanosecond")
}

#[tokio::test]
async fn cron_replay_all_runs_each_missed_minute_in_order() {
    let store = Arc::new(InMemoryStateStore::new());
    let scheduler = Scheduler::new(SchedulerConfig::default(), store.clone());
    let recorded = Arc::new(Mutex::new(Vec::new()));
    let seen = recorded.clone();
    let now_floor = minute_floor(Utc::now());
    let first_due = now_floor - TimeDelta::minutes(2);
    let second_due = now_floor - TimeDelta::minutes(1);
    let expected = vec![first_due, second_due, now_floor];

    store
        .save(&JobState::new("cron-replay-all", Some(first_due)))
        .await
        .unwrap();

    let job = Job::without_deps(
        "cron-replay-all",
        Schedule::Cron(CronSchedule::parse("* * * * *").unwrap()),
        Task::from_async(move |context| {
            let seen = seen.clone();
            async move {
                seen.lock().await.push(context.run.scheduled_at);
                Ok(())
            }
        }),
    )
    .with_missed_run_policy(MissedRunPolicy::ReplayAll)
    .with_max_runs(3);

    let report = scheduler.run(job).await.unwrap();

    assert_eq!(*recorded.lock().await, expected);
    assert_eq!(report.history.len(), 3);
    assert_eq!(report.state.trigger_count, 3);
    assert!(report.state.next_run_at.is_none());
}

#[tokio::test]
async fn cron_catch_up_once_replays_only_the_latest_due_minute() {
    let store = Arc::new(InMemoryStateStore::new());
    let scheduler = Scheduler::new(SchedulerConfig::default(), store.clone());
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();
    let now_floor = minute_floor(Utc::now());
    let first_due = now_floor - TimeDelta::minutes(2);

    store
        .save(&JobState::new("cron-catch-up-once", Some(first_due)))
        .await
        .unwrap();

    let job = Job::without_deps(
        "cron-catch-up-once",
        Schedule::Cron(CronSchedule::parse("* * * * *").unwrap()),
        Task::from_async(move |_| {
            let seen = seen.clone();
            async move {
                seen.fetch_add(1, Ordering::SeqCst);
                Ok(())
            }
        }),
    )
    .with_missed_run_policy(MissedRunPolicy::CatchUpOnce)
    .with_max_runs(3);

    let report = scheduler.run(job).await.unwrap();

    assert_eq!(invocations.load(Ordering::SeqCst), 1);
    assert_eq!(report.history.len(), 1);
    assert_eq!(report.history[0].scheduled_at, now_floor);
    assert!(report.history[0].catch_up);
    assert_eq!(report.state.trigger_count, 3);
    assert!(report.state.next_run_at.is_none());
}

#[tokio::test]
async fn cron_skip_advances_state_then_waits_for_the_next_future_minute() {
    let store = Arc::new(InMemoryStateStore::new());
    let scheduler = Scheduler::new(SchedulerConfig::default(), store.clone());
    let handle = scheduler.handle();
    let now_floor = minute_floor(Utc::now());
    let first_due = now_floor - TimeDelta::minutes(2);
    let expected_next = now_floor + TimeDelta::minutes(1);

    store
        .save(&JobState::new("cron-skip", Some(first_due)))
        .await
        .unwrap();

    let task = tokio::spawn(async move {
        scheduler
            .run(
                Job::without_deps(
                    "cron-skip",
                    Schedule::Cron(CronSchedule::parse("* * * * *").unwrap()),
                    Task::from_async(|_| async { Ok(()) }),
                )
                .with_missed_run_policy(MissedRunPolicy::Skip),
            )
            .await
            .unwrap()
    });

    tokio::time::sleep(Duration::from_millis(40)).await;
    handle.cancel();

    let report = tokio::time::timeout(Duration::from_secs(1), task)
        .await
        .expect("scheduler task timed out")
        .unwrap();

    assert!(report.history.is_empty());
    assert_eq!(report.state.trigger_count, 3);
    assert_eq!(report.state.next_run_at, Some(expected_next));
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn cron_overlap_allow_parallel_runs_due_replay_concurrently() {
    let store = Arc::new(InMemoryStateStore::new());
    let scheduler = Scheduler::new(SchedulerConfig::default(), store.clone());
    let current = Arc::new(AtomicUsize::new(0));
    let peak = Arc::new(AtomicUsize::new(0));
    let active = current.clone();
    let max_seen = peak.clone();
    let now_floor = minute_floor(Utc::now());
    let first_due = now_floor - TimeDelta::minutes(3);

    store
        .save(&JobState::new("cron-parallel", Some(first_due)))
        .await
        .unwrap();

    let job = Job::without_deps(
        "cron-parallel",
        Schedule::Cron(CronSchedule::parse("* * * * *").unwrap()),
        Task::from_async(move |_| {
            let active = active.clone();
            let max_seen = max_seen.clone();
            async move {
                let count = active.fetch_add(1, Ordering::SeqCst) + 1;
                max_seen.fetch_max(count, Ordering::SeqCst);
                tokio::time::sleep(Duration::from_millis(40)).await;
                active.fetch_sub(1, Ordering::SeqCst);
                Ok(())
            }
        }),
    )
    .with_missed_run_policy(MissedRunPolicy::ReplayAll)
    .with_overlap_policy(OverlapPolicy::AllowParallel)
    .with_max_runs(4);

    let report = scheduler.run(job).await.unwrap();

    assert_eq!(report.history.len(), 4);
    assert!(peak.load(Ordering::SeqCst) > 1);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn cron_state_is_restored_across_scheduler_instances() {
    let store = Arc::new(InMemoryStateStore::new());
    let scheduler_one = Scheduler::new(SchedulerConfig::default(), store.clone());
    let handle = scheduler_one.handle();
    let (tx, mut rx) = mpsc::channel::<()>(1);
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();
    let now_floor = minute_floor(Utc::now());
    let first_due = now_floor - TimeDelta::minutes(1);

    store
        .save(&JobState::new("cron-restore", Some(first_due)))
        .await
        .unwrap();

    let job = Job::without_deps(
        "cron-restore",
        Schedule::Cron(CronSchedule::parse("* * * * *").unwrap()),
        Task::from_async(move |_| {
            let tx = tx.clone();
            let seen = seen.clone();
            async move {
                seen.fetch_add(1, Ordering::SeqCst);
                let _ = tx.send(()).await;
                tokio::time::sleep(Duration::from_millis(20)).await;
                Ok(())
            }
        }),
    )
    .with_missed_run_policy(MissedRunPolicy::ReplayAll)
    .with_max_runs(2);

    let shutdown_handle = handle.clone();
    tokio::spawn(async move {
        let _ = rx.recv().await;
        shutdown_handle.shutdown();
    });

    let first_report = scheduler_one.run(job).await.unwrap();

    assert_eq!(invocations.load(Ordering::SeqCst), 1);
    assert_eq!(first_report.history.len(), 1);

    let saved_state = store.load("cron-restore").await.unwrap().unwrap();
    assert_eq!(saved_state.next_run_at, Some(now_floor));

    let scheduler_two = Scheduler::new(SchedulerConfig::default(), store.clone());
    let seen = invocations.clone();
    let job = Job::without_deps(
        "cron-restore",
        Schedule::Cron(CronSchedule::parse("* * * * *").unwrap()),
        Task::from_async(move |_| {
            let seen = seen.clone();
            async move {
                seen.fetch_add(1, Ordering::SeqCst);
                Ok(())
            }
        }),
    )
    .with_missed_run_policy(MissedRunPolicy::ReplayAll)
    .with_max_runs(2);

    let second_report = scheduler_two.run(job).await.unwrap();

    assert_eq!(invocations.load(Ordering::SeqCst), 2);
    assert_eq!(second_report.history.len(), 1);
    assert_eq!(second_report.state.trigger_count, 2);
    assert!(second_report.state.next_run_at.is_none());
}