cloudiful-scheduler 0.3.5

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
use crate::error::SchedulerError;
use crate::model::{Job, JobState};
use crate::scheduler::trigger_math::{
    advance_state_to, consume_due_summary, inspect_due_window, is_missed,
};
use chrono::{DateTime, Utc};
use chrono_tz::Tz;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct PendingTrigger {
    pub(crate) scheduled_at: DateTime<Utc>,
    pub(crate) catch_up: bool,
    pub(crate) trigger_count: u32,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum TriggerDecision {
    Idle,
    StateAdvanced,
    Trigger(PendingTrigger),
}

pub(crate) fn next_trigger<D>(
    job: &Job<D>,
    state: &mut JobState,
    now: DateTime<Utc>,
    timezone: Tz,
) -> Result<TriggerDecision, SchedulerError>
where
    D: Send + Sync + 'static,
{
    let Some(next_run_at) = state.next_run_at else {
        return Ok(TriggerDecision::Idle);
    };

    if next_run_at > now {
        return Ok(TriggerDecision::Idle);
    }

    match job.missed_run_policy {
        crate::MissedRunPolicy::Skip => {
            let Some(summary) = consume_due_summary(job, state, now, timezone)? else {
                return Ok(TriggerDecision::Idle);
            };

            if summary.count == 1 && !is_missed(summary.first_due, now) {
                return Ok(TriggerDecision::Trigger(PendingTrigger {
                    scheduled_at: summary.first_due,
                    catch_up: false,
                    trigger_count: state.trigger_count,
                }));
            }

            Ok(TriggerDecision::StateAdvanced)
        }
        crate::MissedRunPolicy::CatchUpOnce => {
            let Some(summary) = consume_due_summary(job, state, now, timezone)? else {
                return Ok(TriggerDecision::Idle);
            };

            let scheduled_at = if summary.count == 1 {
                summary.first_due
            } else {
                summary.last_due
            };

            Ok(TriggerDecision::Trigger(PendingTrigger {
                scheduled_at,
                catch_up: summary.count > 1 || is_missed(scheduled_at, now),
                trigger_count: state.trigger_count,
            }))
        }
        crate::MissedRunPolicy::ReplayAll => {
            let Some(window) = inspect_due_window(job, state, now, timezone)? else {
                return Ok(TriggerDecision::Idle);
            };

            let first_due = window.first_due;
            advance_state_to(job, state, first_due, timezone)?;
            Ok(TriggerDecision::Trigger(PendingTrigger {
                scheduled_at: first_due,
                catch_up: window.has_multiple || is_missed(first_due, now),
                trigger_count: state.trigger_count,
            }))
        }
    }
}

#[cfg(test)]
mod tests {
    use super::{PendingTrigger, TriggerDecision, next_trigger};
    use crate::scheduler::trigger_math::{
        advance_state_for, collect_due_times, compute_next_after, consume_due_summary,
        inspect_due_window,
    };
    use crate::{CronSchedule, Job, JobState, MissedRunPolicy, Schedule, Task};
    use chrono::{TimeDelta, TimeZone, Utc};
    use chrono_tz::Asia::Shanghai;
    use std::time::Duration;

    fn noop_job(schedule: Schedule) -> Job<()> {
        Job::without_deps("job", schedule, Task::from_async(|_| async { Ok(()) }))
    }

    #[test]
    fn compute_next_after_stops_at_max_runs_for_interval() {
        let scheduled_at = Utc::now();
        let job = noop_job(Schedule::Interval(Duration::from_secs(5))).with_max_runs(2);

        let next = compute_next_after(&job, scheduled_at, 2, Shanghai).unwrap();

        assert!(next.is_none());
    }

    #[test]
    fn collect_due_times_replays_all_past_at_times() {
        let now = Utc::now();
        let times = vec![
            (now - TimeDelta::seconds(2)).with_timezone(&Shanghai),
            (now - TimeDelta::seconds(1)).with_timezone(&Shanghai),
            (now + TimeDelta::seconds(3)).with_timezone(&Shanghai),
        ];
        let state = JobState::new("job", Some(times[0].with_timezone(&Utc)));
        let job = noop_job(Schedule::AtTimes(times));

        let due = collect_due_times(&job, &state, now, Shanghai).unwrap();

        assert_eq!(due.len(), 2);
    }

    #[test]
    fn inspect_due_window_detects_multiple_due_runs() {
        let now = Utc::now();
        let state = JobState::new("job", Some(now - TimeDelta::seconds(2)));
        let job = noop_job(Schedule::Interval(Duration::from_secs(1))).with_max_runs(4);

        let window = inspect_due_window(&job, &state, now, Shanghai)
            .unwrap()
            .unwrap();

        assert_eq!(window.first_due, now - TimeDelta::seconds(2));
        assert!(window.has_multiple);
    }

    #[test]
    fn next_trigger_skip_advances_state_without_emitting_run() {
        let now = Utc::now();
        let times = vec![
            (now - TimeDelta::seconds(2)).with_timezone(&Shanghai),
            (now + TimeDelta::seconds(2)).with_timezone(&Shanghai),
        ];
        let mut state = JobState::new("job", Some(times[0].with_timezone(&Utc)));
        let job = noop_job(Schedule::AtTimes(times)).with_missed_run_policy(MissedRunPolicy::Skip);

        let trigger = next_trigger(&job, &mut state, now, Shanghai).unwrap();

        assert_eq!(trigger, TriggerDecision::StateAdvanced);
        assert_eq!(state.trigger_count, 1);
        assert!(state.next_run_at.unwrap() > now);
    }

    #[test]
    fn next_trigger_replay_all_returns_oldest_due_run() {
        let now = Utc::now();
        let first = (now - TimeDelta::seconds(2)).with_timezone(&Shanghai);
        let second = (now - TimeDelta::seconds(1)).with_timezone(&Shanghai);
        let mut state = JobState::new("job", Some(first.with_timezone(&Utc)));
        let job = noop_job(Schedule::AtTimes(vec![first, second]))
            .with_missed_run_policy(MissedRunPolicy::ReplayAll);

        let trigger = next_trigger(&job, &mut state, now, Shanghai).unwrap();

        assert_eq!(
            trigger,
            TriggerDecision::Trigger(PendingTrigger {
                scheduled_at: first.with_timezone(&Utc),
                catch_up: true,
                trigger_count: 1,
            })
        );
        assert_eq!(state.trigger_count, 1);
    }

    #[test]
    fn advance_state_for_consumes_multiple_due_times() {
        let now = Utc::now();
        let first = (now - TimeDelta::seconds(3)).with_timezone(&Shanghai);
        let second = (now - TimeDelta::seconds(2)).with_timezone(&Shanghai);
        let third = (now + TimeDelta::seconds(1)).with_timezone(&Shanghai);
        let mut state = JobState::new("job", Some(first.with_timezone(&Utc)));
        let job = noop_job(Schedule::AtTimes(vec![first, second, third]));

        advance_state_for(
            &job,
            &mut state,
            &[first.with_timezone(&Utc), second.with_timezone(&Utc)],
            Shanghai,
        )
        .unwrap();

        assert_eq!(state.trigger_count, 2);
        assert_eq!(state.next_run_at, Some(third.with_timezone(&Utc)));
    }

    #[test]
    fn consume_due_summary_advances_without_collecting_every_due_time() {
        let now = Utc::now();
        let mut state = JobState::new("job", Some(now - TimeDelta::seconds(3)));
        let job = noop_job(Schedule::Interval(Duration::from_secs(1)));

        let summary = consume_due_summary(&job, &mut state, now, Shanghai)
            .unwrap()
            .unwrap();

        assert_eq!(summary.first_due, now - TimeDelta::seconds(3));
        assert_eq!(summary.last_due, now);
        assert_eq!(summary.count, 4);
        assert_eq!(state.trigger_count, 4);
        assert_eq!(state.next_run_at, Some(now + TimeDelta::seconds(1)));
    }

    #[test]
    fn compute_next_after_advances_cron_from_scheduled_time() {
        let scheduled_at = Utc.with_ymd_and_hms(2026, 4, 3, 1, 2, 0).unwrap();
        let job = noop_job(Schedule::Cron(CronSchedule::parse("* * * * *").unwrap()));

        let next = compute_next_after(&job, scheduled_at, 1, chrono_tz::UTC).unwrap();

        assert_eq!(
            next,
            Some(Utc.with_ymd_and_hms(2026, 4, 3, 1, 3, 0).unwrap())
        );
    }
}