cloudiful-scheduler 0.4.7

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
use crate::{GroupedIntervalSchedule, SchedulerError, StaggeredIntervalSchedule};
use chrono::{DateTime, Utc};
use std::time::Duration;

pub(crate) fn staggered_initial_next_run_at(
    now: DateTime<Utc>,
    staggered: &StaggeredIntervalSchedule,
    seed: &str,
) -> Result<DateTime<Utc>, SchedulerError> {
    let interval_nanos = interval_nanos(staggered.every)?;
    let phase_nanos =
        u128::from(stable_seed_hash(staggered.seed.as_deref().unwrap_or(seed))) % interval_nanos;

    aligned_initial_next_run_at(now, interval_nanos, phase_nanos)
}

pub(crate) fn grouped_initial_next_run_at(
    now: DateTime<Utc>,
    grouped: &GroupedIntervalSchedule,
) -> Result<DateTime<Utc>, SchedulerError> {
    validate_grouped_interval(grouped)?;
    let interval_nanos = interval_nanos(grouped.every)?;
    let base_phase_nanos = interval_nanos
        .checked_mul(u128::from(grouped.member_index))
        .ok_or_else(SchedulerError::invalid_interval_out_of_range)?
        / u128::from(grouped.group_size);
    let group_offset_nanos = grouped
        .group_seed
        .as_deref()
        .map(|seed| u128::from(stable_seed_hash(seed)) % interval_nanos)
        .unwrap_or(0);
    let phase_nanos = base_phase_nanos
        .checked_add(group_offset_nanos)
        .ok_or_else(SchedulerError::invalid_interval_out_of_range)?
        % interval_nanos;

    aligned_initial_next_run_at(now, interval_nanos, phase_nanos)
}

pub(crate) fn validate_grouped_interval(
    grouped: &GroupedIntervalSchedule,
) -> Result<(), SchedulerError> {
    if grouped.group_size == 0 {
        return Err(SchedulerError::invalid_job_with_kind(
            crate::InvalidJobKind::Other,
            "grouped interval group_size must be greater than zero",
        ));
    }
    if grouped.member_index >= grouped.group_size {
        return Err(SchedulerError::invalid_job_with_kind(
            crate::InvalidJobKind::Other,
            "grouped interval member_index must be less than group_size",
        ));
    }
    Ok(())
}

fn duration_to_nanos(duration: Duration) -> Option<u128> {
    let nanos = u128::from(duration.as_secs())
        .checked_mul(1_000_000_000)?
        .checked_add(u128::from(duration.subsec_nanos()))?;
    Some(nanos)
}

fn utc_to_nanos(value: DateTime<Utc>) -> i128 {
    i128::from(value.timestamp())
        .saturating_mul(1_000_000_000)
        .saturating_add(i128::from(value.timestamp_subsec_nanos()))
}

fn nanos_to_utc(value: i128) -> Option<DateTime<Utc>> {
    let seconds = value.div_euclid(1_000_000_000);
    let nanos = value.rem_euclid(1_000_000_000) as u32;
    let seconds = i64::try_from(seconds).ok()?;
    chrono::TimeZone::timestamp_opt(&Utc, seconds, nanos).single()
}

fn stable_seed_hash(seed: &str) -> u64 {
    let mut hash = 0xcbf29ce484222325u64;
    for byte in seed.as_bytes() {
        hash ^= u64::from(*byte);
        hash = hash.wrapping_mul(0x100000001b3);
    }
    hash
}

fn interval_nanos(every: Duration) -> Result<u128, SchedulerError> {
    let interval_nanos =
        duration_to_nanos(every).ok_or_else(SchedulerError::invalid_interval_out_of_range)?;
    if interval_nanos == 0 {
        return Err(SchedulerError::invalid_zero_interval());
    }
    Ok(interval_nanos)
}

fn aligned_initial_next_run_at(
    now: DateTime<Utc>,
    interval_nanos: u128,
    phase_nanos: u128,
) -> Result<DateTime<Utc>, SchedulerError> {
    let interval_nanos = i128::try_from(interval_nanos)
        .map_err(|_| SchedulerError::invalid_interval_out_of_range())?;
    let phase_nanos =
        i128::try_from(phase_nanos).map_err(|_| SchedulerError::invalid_interval_out_of_range())?;
    let now_nanos = utc_to_nanos(now);
    let cycle_start = now_nanos.div_euclid(interval_nanos) * interval_nanos;
    let mut candidate = cycle_start + phase_nanos;
    if candidate < now_nanos {
        candidate += interval_nanos;
    }

    nanos_to_utc(candidate).ok_or_else(SchedulerError::invalid_interval_out_of_range)
}

#[cfg(test)]
#[path = "interval_phase_tests.rs"]
mod tests;