cloudiful-scheduler 0.3.3

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
use crate::error::SchedulerError;
use crate::model::{Job, JobState, Schedule, utc_time};
use chrono::{DateTime, TimeDelta, Utc};
use chrono_tz::Tz;
use std::time::Duration;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct DueSummary {
    pub(crate) first_due: DateTime<Utc>,
    pub(crate) last_due: DateTime<Utc>,
    pub(crate) count: u32,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct DueWindow {
    pub(crate) first_due: DateTime<Utc>,
    pub(crate) has_multiple: bool,
}

pub(crate) fn initial_next_run_at<D>(
    job: &Job<D>,
    timezone: Tz,
) -> Result<Option<DateTime<Utc>>, SchedulerError> {
    if matches!(job.max_runs, Some(0)) {
        return Ok(None);
    }

    match &job.schedule {
        Schedule::Interval(every) => duration_to_delta(*every)
            .ok_or_else(SchedulerError::invalid_interval_out_of_range)
            .and_then(|delta| {
                Utc::now()
                    .checked_add_signed(delta)
                    .ok_or_else(SchedulerError::invalid_interval_out_of_range)
            })
            .map(Some),
        Schedule::AtTimes(times) => Ok(times.first().copied().map(utc_time)),
        Schedule::Cron(schedule) => Ok(schedule.next_after(Utc::now(), timezone)),
    }
}

pub(crate) fn next_run_is_in_future(next_run_at: Option<DateTime<Utc>>) -> bool {
    next_run_at.map(|value| value > Utc::now()).unwrap_or(false)
}

pub(crate) fn inspect_due_window<D>(
    job: &Job<D>,
    state: &JobState,
    now: DateTime<Utc>,
    timezone: Tz,
) -> Result<Option<DueWindow>, SchedulerError>
where
    D: Send + Sync + 'static,
{
    let Some(first_due) = state.next_run_at else {
        return Ok(None);
    };

    if first_due > now {
        return Ok(None);
    }

    let next_trigger_count = state.trigger_count + 1;
    let has_multiple = compute_next_after(job, first_due, next_trigger_count, timezone)?
        .map(|next_due| next_due <= now)
        .unwrap_or(false);

    Ok(Some(DueWindow {
        first_due,
        has_multiple,
    }))
}

pub(crate) fn consume_due_summary<D>(
    job: &Job<D>,
    state: &mut JobState,
    now: DateTime<Utc>,
    timezone: Tz,
) -> Result<Option<DueSummary>, SchedulerError>
where
    D: Send + Sync + 'static,
{
    let mut first_due = None;
    let mut last_due = None;
    let mut count = 0u32;

    while let Some(value) = state.next_run_at {
        if value > now {
            break;
        }

        if first_due.is_none() {
            first_due = Some(value);
        }
        last_due = Some(value);
        count += 1;
        advance_state_to(job, state, value, timezone)?;
    }

    Ok(first_due.map(|first_due| DueSummary {
        first_due,
        last_due: last_due.expect("last_due exists when first_due exists"),
        count,
    }))
}

pub(crate) fn advance_state_to<D>(
    job: &Job<D>,
    state: &mut JobState,
    scheduled_at: DateTime<Utc>,
    timezone: Tz,
) -> Result<(), SchedulerError>
where
    D: Send + Sync + 'static,
{
    state.trigger_count += 1;
    state.next_run_at = compute_next_after(job, scheduled_at, state.trigger_count, timezone)?;
    Ok(())
}

pub(crate) fn compute_next_after<D>(
    job: &Job<D>,
    scheduled_at: DateTime<Utc>,
    trigger_count: u32,
    timezone: Tz,
) -> Result<Option<DateTime<Utc>>, SchedulerError>
where
    D: Send + Sync + 'static,
{
    if let Some(max_runs) = job.max_runs
        && trigger_count >= max_runs
    {
        return Ok(None);
    }

    match &job.schedule {
        Schedule::Interval(every) => {
            let delta = duration_to_delta(*every)
                .ok_or_else(SchedulerError::invalid_interval_out_of_range)?;
            Ok(scheduled_at.checked_add_signed(delta))
        }
        Schedule::AtTimes(times) => Ok(times.get(trigger_count as usize).copied().map(utc_time)),
        Schedule::Cron(schedule) => Ok(schedule.next_after(scheduled_at, timezone)),
    }
}

pub(crate) fn is_missed(scheduled_at: DateTime<Utc>, now: DateTime<Utc>) -> bool {
    let tolerance = TimeDelta::milliseconds(25);
    scheduled_at
        .checked_add_signed(tolerance)
        .map(|adjusted| adjusted < now)
        .unwrap_or(false)
}

fn duration_to_delta(duration: Duration) -> Option<TimeDelta> {
    TimeDelta::from_std(duration).ok()
}

#[cfg(test)]
pub(crate) fn collect_due_times<D>(
    job: &Job<D>,
    state: &JobState,
    now: DateTime<Utc>,
    timezone: Tz,
) -> Result<Vec<DateTime<Utc>>, SchedulerError>
where
    D: Send + Sync + 'static,
{
    let mut due_times = Vec::new();
    let mut trigger_count = state.trigger_count;
    let mut next_run_at = state.next_run_at;

    while let Some(value) = next_run_at {
        if value > now {
            break;
        }

        due_times.push(value);
        trigger_count += 1;
        next_run_at = compute_next_after(job, value, trigger_count, timezone)?;
    }

    Ok(due_times)
}

#[cfg(test)]
pub(crate) fn advance_state_for<D>(
    job: &Job<D>,
    state: &mut JobState,
    due_times: &[DateTime<Utc>],
    timezone: Tz,
) -> Result<(), SchedulerError>
where
    D: Send + Sync + 'static,
{
    for scheduled_at in due_times {
        advance_state_to(job, state, *scheduled_at, timezone)?;
    }

    Ok(())
}