use std::cmp::Ordering;
use backfill::register_and_backfill_items;
use chrono::prelude::*;
use futures::FutureExt;
use graphile_worker_crontab_types::Crontab;
use graphile_worker_database::DbExecutorArg;
use graphile_worker_lifecycle_hooks::{CronJobScheduledContext, CronTickContext, HookRegistry};
use graphile_worker_shutdown_signal::ShutdownSignal;
use tracing::{debug, warn};
pub use crate::clock::mock::MockClock;
pub use crate::clock::Clock;
pub use crate::sql::KnownCrontab;
pub use crate::sql::ScheduleCronJobError;
use crate::{
clock::SystemClock,
sql::{schedule_cron_jobs, CrontabJob},
utils::{round_date_minute, ONE_MINUTE},
};
mod backfill;
pub mod clock;
mod sql;
mod utils;
pub async fn cron_main(
executor: impl DbExecutorArg,
escaped_schema: &str,
crontabs: &[Crontab],
use_local_time: bool,
shutdown_signal: ShutdownSignal,
hooks: &HookRegistry,
) -> Result<(), ScheduleCronJobError> {
CronRunner::new(executor, escaped_schema, crontabs, hooks)
.use_local_time(use_local_time)
.run(shutdown_signal)
.await
}
pub struct CronRunner<'a, E, C = SystemClock> {
executor: E,
escaped_schema: &'a str,
crontabs: &'a [Crontab],
use_local_time: bool,
hooks: &'a HookRegistry,
clock: C,
}
impl<'a, E: DbExecutorArg> CronRunner<'a, E, SystemClock> {
pub fn new(
executor: E,
escaped_schema: &'a str,
crontabs: &'a [Crontab],
hooks: &'a HookRegistry,
) -> Self {
Self {
executor,
escaped_schema,
crontabs,
use_local_time: false,
hooks,
clock: SystemClock,
}
}
}
impl<'a, E: DbExecutorArg, C: Clock> CronRunner<'a, E, C> {
pub fn use_local_time(mut self, use_local_time: bool) -> Self {
self.use_local_time = use_local_time;
self
}
pub fn with_clock<C2: Clock>(self, clock: C2) -> CronRunner<'a, E, C2> {
CronRunner {
executor: self.executor,
escaped_schema: self.escaped_schema,
crontabs: self.crontabs,
use_local_time: self.use_local_time,
hooks: self.hooks,
clock,
}
}
pub async fn run(
mut self,
mut shutdown_signal: ShutdownSignal,
) -> Result<(), ScheduleCronJobError> {
let start = self.clock.now();
debug!(start = ?start, "cron:starting");
register_and_backfill_items(
&mut self.executor,
self.escaped_schema,
self.crontabs,
&start,
self.use_local_time,
)
.await?;
debug!(start = ?start, "cron:started");
let mut ts = round_date_minute(start, true);
loop {
let sleep = self.clock.sleep_until(ts).fuse();
let shutdown = (&mut shutdown_signal).fuse();
futures::pin_mut!(sleep, shutdown);
let should_shutdown = futures::select_biased! {
_ = shutdown => true,
_ = sleep => false,
};
if should_shutdown {
break Ok(());
}
let current_ts = round_date_minute(self.clock.now(), false);
let ts_delta = current_ts - ts;
match ts_delta.num_minutes().cmp(&0) {
Ordering::Less => {
warn!(
"Cron fired {}s too early (clock skew?); rescheduling",
-ts_delta.num_seconds()
);
continue;
}
Ordering::Greater => {
warn!(
"Cron fired too late; catching up {}m{}s behind",
ts_delta.num_minutes(),
ts_delta.num_seconds() % 60
);
}
_ => {}
}
let tick_ctx = CronTickContext {
timestamp: ts.with_timezone(&Utc),
crontabs: self.crontabs.to_vec(),
};
self.hooks.emit(tick_ctx).await;
let mut jobs: Vec<CrontabJob> = vec![];
for cron in self.crontabs {
if cron.should_run_at(&ts.naive_local()) {
jobs.push(CrontabJob::for_cron(cron, &ts, false));
let scheduled_ctx = CronJobScheduledContext {
crontab: cron.clone(),
scheduled_at: ts.with_timezone(&Utc),
};
self.hooks.emit(scheduled_ctx).await;
}
}
if !jobs.is_empty() {
debug!(nb_jobs = jobs.len(), at = ?ts, "cron:schedule");
schedule_cron_jobs(
&mut self.executor,
&jobs,
&ts,
self.escaped_schema,
self.use_local_time,
)
.await?;
debug!(nb_jobs = jobs.len(), at = ?ts, "cron:scheduled");
}
ts += *ONE_MINUTE;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::clock::mock::MockClock;
use chrono::Duration;
fn test_time() -> DateTime<Local> {
Local.with_ymd_and_hms(2024, 1, 15, 10, 30, 0).unwrap()
}
#[test]
fn test_round_date_minute_no_round_up() {
let time = Local.with_ymd_and_hms(2024, 1, 15, 10, 30, 45).unwrap();
let rounded = round_date_minute(time, false);
assert_eq!(rounded.minute(), 30);
assert_eq!(rounded.second(), 0);
assert_eq!(rounded.nanosecond(), 0);
}
#[test]
fn test_round_date_minute_with_round_up() {
let time = Local.with_ymd_and_hms(2024, 1, 15, 10, 30, 45).unwrap();
let rounded = round_date_minute(time, true);
assert_eq!(rounded.minute(), 31);
assert_eq!(rounded.second(), 0);
assert_eq!(rounded.nanosecond(), 0);
}
#[test]
fn test_clock_skew_too_early_detection() {
let scheduled = Local.with_ymd_and_hms(2024, 1, 15, 10, 31, 0).unwrap();
let current = Local.with_ymd_and_hms(2024, 1, 15, 10, 30, 0).unwrap();
let current_rounded = round_date_minute(current, false);
let ts_delta = current_rounded - scheduled;
assert!(
ts_delta.num_minutes() < 0,
"When current time is before scheduled, delta should be negative"
);
assert!(
matches!(ts_delta.num_minutes().cmp(&0), Ordering::Less),
"Should match Ordering::Less for too early case"
);
}
#[test]
fn test_clock_skew_too_late_detection() {
let scheduled = Local.with_ymd_and_hms(2024, 1, 15, 10, 31, 0).unwrap();
let current = Local.with_ymd_and_hms(2024, 1, 15, 10, 45, 0).unwrap();
let current_rounded = round_date_minute(current, false);
let ts_delta = current_rounded - scheduled;
assert!(
ts_delta.num_minutes() > 0,
"When current time is after scheduled, delta should be positive"
);
assert!(
matches!(ts_delta.num_minutes().cmp(&0), Ordering::Greater),
"Should match Ordering::Greater for too late case"
);
}
#[test]
fn test_clock_skew_on_time_detection() {
let scheduled = Local.with_ymd_and_hms(2024, 1, 15, 10, 31, 0).unwrap();
let current = Local.with_ymd_and_hms(2024, 1, 15, 10, 31, 30).unwrap();
let current_rounded = round_date_minute(current, false);
let ts_delta = current_rounded - scheduled;
assert_eq!(
ts_delta.num_minutes(),
0,
"When current time equals scheduled (same minute), delta should be zero"
);
assert!(
matches!(ts_delta.num_minutes().cmp(&0), Ordering::Equal),
"Should match Ordering::Equal for on-time case"
);
}
#[test]
fn test_system_sleep_scenario() {
let scheduled = Local.with_ymd_and_hms(2024, 1, 15, 10, 1, 0).unwrap();
let after_sleep = Local.with_ymd_and_hms(2024, 1, 15, 11, 25, 0).unwrap();
let current_rounded = round_date_minute(after_sleep, false);
let ts_delta = current_rounded - scheduled;
assert_eq!(ts_delta.num_minutes(), 84, "Should be 84 minutes behind");
assert!(
matches!(ts_delta.num_minutes().cmp(&0), Ordering::Greater),
"Should match Ordering::Greater for catch-up mode"
);
}
#[tokio::test]
async fn test_mock_clock_now() {
let initial = test_time();
let clock = MockClock::new(initial);
assert_eq!(clock.now(), initial);
}
#[tokio::test]
async fn test_mock_clock_set_time() {
let initial = test_time();
let clock = MockClock::new(initial);
let new_time = initial + Duration::hours(2);
clock.set_time(new_time);
assert_eq!(clock.now(), new_time);
}
#[tokio::test]
async fn test_mock_clock_advance() {
let initial = test_time();
let clock = MockClock::new(initial);
clock.advance(Duration::minutes(30));
assert_eq!(clock.now(), initial + Duration::minutes(30));
}
#[tokio::test]
async fn test_mock_clock_sleep_until_already_passed() {
let initial = test_time();
let clock = MockClock::new(initial);
let past_time = initial - Duration::hours(1);
clock.sleep_until(past_time).await;
}
#[tokio::test]
async fn test_mock_clock_sleep_until_wakes_on_time_advance() {
use std::sync::Arc;
use tokio::sync::Mutex;
let initial = test_time();
let clock = Arc::new(MockClock::new(initial));
let target = initial + Duration::minutes(5);
let clock_clone = Arc::clone(&clock);
let woke_up = Arc::new(Mutex::new(false));
let woke_up_clone = Arc::clone(&woke_up);
let handle = tokio::spawn(async move {
clock_clone.sleep_until(target).await;
*woke_up_clone.lock().await = true;
});
tokio::task::yield_now().await;
assert!(!*woke_up.lock().await, "Should not have woken up yet");
clock.set_time(target);
handle.await.unwrap();
assert!(
*woke_up.lock().await,
"Should have woken up after time advance"
);
}
}