use std::borrow::Cow;
use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, Utc};
use tokio_util::sync::CancellationToken;
use super::routing::Router;
use crate::cron_expr::parse_cron;
use crate::storage::Storage;
use crate::storage::error::Result;
use crate::storage::types::{CronScheduleRecord, EnqueueRequest};
pub const CRON_TICK: Duration = Duration::from_secs(5);
pub(super) const CRON_LEASE_TTL: Duration = Duration::from_secs(15);
const CRON_MIN_SLEEP: Duration = Duration::from_millis(500);
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct CronTickReport {
pub fired: u64,
pub seeded: u64,
pub errors: u64,
pub next_due: Option<DateTime<Utc>>,
}
#[tracing::instrument(skip(storage, router))]
pub async fn cron_tick_once(
storage: &Storage,
router: &dyn Router,
now: DateTime<Utc>,
) -> Result<CronTickReport> {
let rows = storage.cron.list_schedules().await?;
let total = rows.len();
let enabled = rows.iter().filter(|r| r.enabled).count();
tracing::debug!(total, enabled, "cron: evaluating schedules");
let mut report = CronTickReport::default();
for row in rows {
if !row.enabled {
continue;
}
if let Some(when) = process_row(storage, router, &row, now, &mut report).await {
report.next_due = Some(report.next_due.map_or(when, |cur| cur.min(when)));
}
}
Ok(report)
}
async fn process_row(
storage: &Storage,
router: &dyn Router,
row: &CronScheduleRecord,
now: DateTime<Utc>,
report: &mut CronTickReport,
) -> Option<DateTime<Utc>> {
let sched = match parse_cron(&row.cron_expr) {
Ok(s) => s,
Err(e) => {
tracing::warn!(name = row.name, error = %e, "cron: invalid expression");
if let Err(persist_err) = storage
.cron
.record_parse_error(&row.name, &format!("invalid cron expression: {e}"))
.await
{
tracing::warn!(
name = row.name,
?persist_err,
"cron: failed to persist parse error"
);
}
report.errors += 1;
return None;
}
};
let next = next_cron_after(&sched, now)?;
match row.next_fire_at {
None => {
let fired_at = row.last_fired_at.unwrap_or(now);
if let Err(e) = storage.cron.record_fire(&row.name, fired_at, next).await {
tracing::warn!(name = row.name, ?e, "cron: seed record_fire failed");
report.errors += 1;
} else {
report.seeded += 1;
}
Some(next)
}
Some(at) if at <= now => {
match storage
.cron
.try_advance_fire(&row.name, at, now, next)
.await
{
Ok(true) => {}
Ok(false) => {
tracing::debug!(name = row.name, "cron: fire already claimed elsewhere");
return Some(next);
}
Err(e) => {
tracing::warn!(name = row.name, ?e, "cron: claim fire failed");
report.errors += 1;
return Some(next);
}
}
tracing::info!(name = row.name, kind = row.kind, "cron: firing schedule");
let queue_name = row.queue_name.clone().map_or_else(
|| Cow::Borrowed(router.route(row.kind.as_str())),
Cow::Owned,
);
let req = EnqueueRequest {
kind: Cow::Owned(row.kind.clone()),
payload: row.payload.clone(),
queue_name: Some(queue_name),
dedupe_key: None,
max_attempts: row.max_attempts,
run_at: None,
priority: 0,
};
match storage.jobs.enqueue(req).await {
Ok(_) => report.fired += 1,
Err(e) => {
tracing::warn!(name = row.name, error = %e, "cron: enqueue failed");
report.errors += 1;
if let Err(persist_err) = storage
.cron
.record_parse_error(&row.name, &format!("enqueue failed: {e}"))
.await
{
tracing::warn!(name = row.name, ?persist_err, "cron: persist err failed");
}
}
}
Some(next)
}
Some(at) => Some(at),
}
}
pub(super) async fn cron_loop(
storage: Storage,
router: Arc<dyn Router>,
host_id: String,
shutdown: CancellationToken,
) {
tracing::debug!(%host_id, "cron: start");
let mut next_sleep = Duration::ZERO;
loop {
tokio::select! {
biased;
() = shutdown.cancelled() => {
tracing::debug!("cron: shutdown");
return;
}
() = tokio::time::sleep(next_sleep) => {
match storage.cron.try_cron_lease(&host_id, CRON_LEASE_TTL).await {
Ok(false) => {
tracing::trace!(%host_id, "cron: not leader this tick");
next_sleep = CRON_TICK;
continue;
}
Ok(true) => {}
Err(e) => {
tracing::warn!(?e, %host_id, "cron: lease check failed; skipping tick");
next_sleep = CRON_TICK;
continue;
}
}
let now = Utc::now();
let report = match cron_tick_once(&storage, router.as_ref(), now).await {
Ok(report) => {
if report.fired > 0 || report.seeded > 0 || report.errors > 0 {
tracing::info!(
fired = report.fired,
seeded = report.seeded,
errors = report.errors,
"cron: tick"
);
}
report
}
Err(e) => {
tracing::warn!(?e, "cron: tick failed");
CronTickReport::default()
}
};
next_sleep = sleep_until_next(report.next_due, now);
}
}
}
}
fn sleep_until_next(next_due: Option<DateTime<Utc>>, now: DateTime<Utc>) -> Duration {
next_due.map_or(CRON_TICK, |t| {
(t - now)
.to_std()
.unwrap_or(CRON_MIN_SLEEP)
.clamp(CRON_MIN_SLEEP, CRON_TICK)
})
}
fn next_cron_after(sched: &cron::Schedule, now: DateTime<Utc>) -> Option<DateTime<Utc>> {
sched.after(&now).next()
}
pub async fn ensure_schedules(
storage: &crate::Storage,
schedules: &[crate::NewCronSchedule],
) -> crate::storage::error::Result<()> {
for schedule in schedules {
storage.cron.ensure_schedule(schedule.clone()).await?;
}
tracing::info!(count = schedules.len(), "cron: default schedules ensured");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sleep_until_next_clamps_to_bounds() {
let now = Utc::now();
assert_eq!(
sleep_until_next(Some(now + chrono::Duration::hours(1)), now),
CRON_TICK
);
assert_eq!(
sleep_until_next(Some(now - chrono::Duration::seconds(5)), now),
CRON_MIN_SLEEP
);
assert_eq!(sleep_until_next(None, now), CRON_TICK);
assert_eq!(
sleep_until_next(Some(now + chrono::Duration::seconds(2)), now),
Duration::from_secs(2)
);
}
}