cloudiful-scheduler 0.4.0

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
mod support;

use chrono::{Datelike, NaiveTime, Timelike, Utc, Weekday};
use chrono_tz::Asia::Shanghai;
use scheduler::{
    InMemoryStateStore, Job, JobTimeWindow, OverlapPolicy, RunSkipReason, Schedule, Scheduler,
    SchedulerConfig, SchedulerEvent, SchedulerObserver, Task, TimeWindowSegment,
};
use std::sync::{
    Arc, Mutex,
    atomic::{AtomicUsize, Ordering},
};
use std::time::Duration;
use support::shanghai_after;

#[derive(Clone, Default)]
struct RecordingObserver {
    events: Arc<Mutex<Vec<SchedulerEvent>>>,
}

impl RecordingObserver {
    fn snapshot(&self) -> Vec<SchedulerEvent> {
        self.events.lock().unwrap().clone()
    }
}

impl SchedulerObserver for RecordingObserver {
    fn on_event(&self, event: &SchedulerEvent) {
        self.events.lock().unwrap().push(event.clone());
    }
}

fn next_weekday(weekday: Weekday) -> Weekday {
    match weekday {
        Weekday::Mon => Weekday::Tue,
        Weekday::Tue => Weekday::Wed,
        Weekday::Wed => Weekday::Thu,
        Weekday::Thu => Weekday::Fri,
        Weekday::Fri => Weekday::Sat,
        Weekday::Sat => Weekday::Sun,
        Weekday::Sun => Weekday::Mon,
    }
}

fn shift_time(time: chrono::NaiveTime, delta_seconds: i64) -> chrono::NaiveTime {
    let seconds = time.num_seconds_from_midnight() as i64 + delta_seconds;
    let seconds = seconds.rem_euclid(24 * 60 * 60) as u32;
    NaiveTime::from_num_seconds_from_midnight_opt(seconds, time.nanosecond()).unwrap()
}

#[tokio::test]
async fn job_without_window_keeps_existing_behavior() {
    let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();

    let report = scheduler
        .run(
            Job::without_deps(
                "no-window",
                Schedule::AtTimes(vec![shanghai_after(20)]),
                Task::from_async(move |_| {
                    let seen = seen.clone();
                    async move {
                        seen.fetch_add(1, Ordering::SeqCst);
                        Ok(())
                    }
                }),
            )
            .with_max_runs(1),
        )
        .await
        .unwrap();

    assert_eq!(invocations.load(Ordering::SeqCst), 1);
    assert!(report.last_skip_reason.is_none());
}

#[tokio::test]
async fn inside_window_runs_normally_with_scheduler_timezone_fallback() {
    let local_now = Utc::now().with_timezone(&Shanghai);
    let observer = RecordingObserver::default();
    let scheduler = Scheduler::with_observer(
        SchedulerConfig::default(),
        InMemoryStateStore::new(),
        observer.clone(),
    );
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();

    let report = scheduler
        .run(
            Job::without_deps(
                "inside-window",
                Schedule::AtTimes(vec![shanghai_after(20)]),
                Task::from_async(move |_| {
                    let seen = seen.clone();
                    async move {
                        seen.fetch_add(1, Ordering::SeqCst);
                        Ok(())
                    }
                }),
            )
            .with_time_window(JobTimeWindow {
                timezone: None,
                weekdays: vec![local_now.weekday()],
                segments: vec![],
            }),
        )
        .await
        .unwrap();

    let events = observer.snapshot();

    assert_eq!(invocations.load(Ordering::SeqCst), 1);
    assert_eq!(report.history.len(), 1);
    assert!(report.last_skip_reason.is_none());
    assert!(
        !events
            .iter()
            .any(|event| matches!(event, SchedulerEvent::RunSkipped { .. }))
    );
}

#[tokio::test]
async fn outside_window_skips_run_and_records_reason() {
    let local_now = Utc::now().with_timezone(&Shanghai);
    let observer = RecordingObserver::default();
    let scheduler = Scheduler::with_observer(
        SchedulerConfig::default(),
        InMemoryStateStore::new(),
        observer.clone(),
    );
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();
    let window = JobTimeWindow {
        timezone: None,
        weekdays: vec![next_weekday(local_now.weekday())],
        segments: vec![],
    };

    let report = scheduler
        .run(
            Job::without_deps(
                "outside-window",
                Schedule::AtTimes(vec![shanghai_after(20)]),
                Task::from_async(move |_| {
                    let seen = seen.clone();
                    async move {
                        seen.fetch_add(1, Ordering::SeqCst);
                        Ok(())
                    }
                }),
            )
            .with_time_window(window),
        )
        .await
        .unwrap();

    let events = observer.snapshot();

    assert_eq!(invocations.load(Ordering::SeqCst), 0);
    assert!(report.history.is_empty());
    assert_eq!(
        report.last_skip_reason,
        Some(RunSkipReason::OutsideTimeWindow)
    );
    assert!(events.iter().any(|event| matches!(
        event,
        SchedulerEvent::RunSkipped {
            job_id,
            reason,
            ..
        } if job_id == "outside-window" && *reason == RunSkipReason::OutsideTimeWindow
    )));
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queued_trigger_is_rechecked_before_starting() {
    let local_now = Utc::now().with_timezone(&Shanghai);
    let observer = RecordingObserver::default();
    let scheduler = Scheduler::with_observer(
        SchedulerConfig::default(),
        InMemoryStateStore::new(),
        observer.clone(),
    );
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();
    let window = JobTimeWindow {
        timezone: Some(Shanghai),
        weekdays: vec![local_now.weekday()],
        segments: vec![TimeWindowSegment::new(
            shift_time(local_now.time(), -1),
            shift_time(local_now.time(), 2),
        )],
    };

    let report = tokio::time::timeout(
        Duration::from_secs(6),
        scheduler.run(
            Job::without_deps(
                "queued-window-skip",
                Schedule::AtTimes(vec![shanghai_after(100), shanghai_after(200)]),
                Task::from_async(move |_| {
                    let seen = seen.clone();
                    async move {
                        let invocation = seen.fetch_add(1, Ordering::SeqCst);
                        if invocation == 0 {
                            tokio::time::sleep(Duration::from_millis(2500)).await;
                        }
                        Ok(())
                    }
                }),
            )
            .with_overlap_policy(OverlapPolicy::QueueOne)
            .with_time_window(window)
            .with_max_runs(2),
        ),
    )
    .await
    .expect("scheduler run timed out")
    .unwrap();

    let events = observer.snapshot();

    assert_eq!(invocations.load(Ordering::SeqCst), 1);
    assert_eq!(report.history.len(), 1);
    assert_eq!(
        report.last_skip_reason,
        Some(RunSkipReason::OutsideTimeWindow)
    );
    assert!(events.iter().any(|event| matches!(
        event,
        SchedulerEvent::RunSkipped {
            job_id,
            reason,
            ..
        } if job_id == "queued-window-skip" && *reason == RunSkipReason::OutsideTimeWindow
    )));
}