use std::collections::HashMap;
use std::str::FromStr as _;
use actionqueue_core::ids::TaskId;
use actionqueue_core::run::run_instance::RunInstance;
use actionqueue_core::task::run_policy::CronPolicy;
use chrono::{TimeZone as _, Utc};
use super::{map_run_construction_error, DerivationError, DerivationSuccess};
use crate::time::clock::Clock;
#[derive(Default)]
pub struct CronScheduleCache {
cache: HashMap<TaskId, cron::Schedule>,
}
impl CronScheduleCache {
pub fn new() -> Self {
Self::default()
}
pub fn ensure(&mut self, task_id: TaskId, policy: &CronPolicy) {
self.cache.entry(task_id).or_insert_with(|| {
cron::Schedule::from_str(policy.expression())
.expect("cron expression pre-validated at CronPolicy::new")
});
}
pub fn get(&self, task_id: TaskId) -> Option<&cron::Schedule> {
self.cache.get(&task_id)
}
pub fn remove(&mut self, task_id: TaskId) {
self.cache.remove(&task_id);
}
}
pub fn derive_cron_cached(
task_id: TaskId,
schedule: &cron::Schedule,
after_secs: u64,
created_at: u64,
count: u32,
) -> Result<Vec<RunInstance>, DerivationError> {
if count == 0 {
return Ok(Vec::new());
}
let ts = i64::try_from(after_secs).unwrap_or(i64::MAX);
let after_dt = Utc
.timestamp_opt(ts, 0)
.single()
.unwrap_or_else(|| Utc.timestamp_opt(0, 0).single().expect("epoch is valid"));
let occurrences: Vec<u64> = schedule
.after(&after_dt)
.take(count as usize)
.filter_map(|dt| u64::try_from(dt.timestamp()).ok())
.collect();
let mut result = Vec::with_capacity(occurrences.len());
for scheduled_at in occurrences {
let run = RunInstance::new_scheduled(task_id, scheduled_at, created_at)
.map_err(|source| map_run_construction_error(task_id, source))?;
result.push(run);
}
Ok(result)
}
pub const CRON_WINDOW_SIZE: u32 = 5;
pub fn derive_cron(
task_id: TaskId,
policy: &CronPolicy,
after_secs: u64,
created_at: u64,
count: u32,
) -> Result<Vec<RunInstance>, DerivationError> {
if count == 0 {
return Ok(Vec::new());
}
let occurrences = policy.next_occurrences_after(after_secs, count as usize);
let mut result = Vec::with_capacity(occurrences.len());
for scheduled_at in occurrences {
let run = RunInstance::new_scheduled(task_id, scheduled_at, created_at)
.map_err(|source| map_run_construction_error(task_id, source))?;
result.push(run);
}
Ok(result)
}
pub(super) fn derive_cron_initial(
clock: &impl Clock,
task_id: TaskId,
policy: &CronPolicy,
already_derived: u32,
schedule_origin: u64,
) -> Result<DerivationSuccess, DerivationError> {
if already_derived > 0 {
return Ok(DerivationSuccess::new(Vec::new(), already_derived));
}
let window = if let Some(max) = policy.max_occurrences() {
CRON_WINDOW_SIZE.min(max)
} else {
CRON_WINDOW_SIZE
};
let after_secs = schedule_origin.saturating_sub(1);
let runs = derive_cron(task_id, policy, after_secs, clock.now(), window)?;
let new_total = runs.len() as u32;
Ok(DerivationSuccess::new(runs, new_total))
}
#[cfg(test)]
mod tests {
use actionqueue_core::ids::TaskId;
use actionqueue_core::task::run_policy::CronPolicy;
use super::*;
use crate::time::clock::MockClock;
fn every_minute_policy() -> CronPolicy {
CronPolicy::new("0 * * * * * *").expect("valid cron expression")
}
fn every_second_policy() -> CronPolicy {
CronPolicy::new("* * * * * * *").expect("valid cron expression")
}
#[test]
fn derive_cron_returns_empty_for_count_zero() {
let task_id = TaskId::new();
let policy = every_minute_policy();
let result = derive_cron(task_id, &policy, 0, 1000, 0).expect("should not error");
assert!(result.is_empty());
}
#[test]
fn derive_cron_returns_expected_count() {
let task_id = TaskId::new();
let policy = every_second_policy();
let result = derive_cron(task_id, &policy, 1000, 999, 3).expect("should not error");
assert_eq!(result.len(), 3);
}
#[test]
fn derive_cron_runs_are_strictly_after_after_secs() {
let task_id = TaskId::new();
let policy = every_second_policy();
let after_secs = 1000u64;
let result = derive_cron(task_id, &policy, after_secs, 999, 5).expect("should not error");
for run in &result {
assert!(
run.scheduled_at() > after_secs,
"scheduled_at {} must be > after_secs {}",
run.scheduled_at(),
after_secs
);
}
}
#[test]
fn derive_cron_runs_are_monotonically_increasing() {
let task_id = TaskId::new();
let policy = every_second_policy();
let result = derive_cron(task_id, &policy, 0, 0, 5).expect("should not error");
let times: Vec<u64> = result.iter().map(|r| r.scheduled_at()).collect();
for i in 1..times.len() {
assert!(
times[i] > times[i - 1],
"occurrence {} ({}) must be after {} ({})",
i,
times[i],
i - 1,
times[i - 1]
);
}
}
#[test]
fn derive_cron_initial_creates_window_on_first_call() {
let clock = MockClock::new(1_000);
let task_id = TaskId::new();
let policy = every_second_policy();
let result =
derive_cron_initial(&clock, task_id, &policy, 0, 1_000).expect("should not error");
assert_eq!(result.derived().len(), CRON_WINDOW_SIZE as usize);
assert_eq!(result.already_derived(), CRON_WINDOW_SIZE);
}
#[test]
fn derive_cron_initial_is_noop_when_already_derived() {
let clock = MockClock::new(1_000);
let task_id = TaskId::new();
let policy = every_second_policy();
let result =
derive_cron_initial(&clock, task_id, &policy, 5, 1_000).expect("should not error");
assert!(result.derived().is_empty());
assert_eq!(result.already_derived(), 5);
}
#[test]
fn derive_cron_initial_respects_max_occurrences() {
let clock = MockClock::new(1_000);
let task_id = TaskId::new();
let policy =
CronPolicy::new("* * * * * * *").expect("valid").with_max_occurrences(3).expect("ok");
let result =
derive_cron_initial(&clock, task_id, &policy, 0, 1_000).expect("should not error");
assert_eq!(result.derived().len(), 3);
assert_eq!(result.already_derived(), 3);
}
#[test]
fn derive_cron_rejects_nil_task_id() {
let task_id = "00000000-0000-0000-0000-000000000000"
.parse::<TaskId>()
.expect("nil task id literal must parse");
let policy = every_second_policy();
let result = derive_cron(task_id, &policy, 0, 0, 1);
assert!(matches!(result, Err(DerivationError::InvalidTaskIdForRunConstruction { .. })));
}
}