use std::cmp::Ordering;
use backfill::register_and_backfill_items;
use chrono::prelude::*;
use graphile_worker_crontab_types::Crontab;
use graphile_worker_shutdown_signal::ShutdownSignal;
use sqlx::PgExecutor;
use tracing::{debug, warn};
pub use crate::sql::KnownCrontab;
pub use crate::sql::ScheduleCronJobError;
use crate::{
sql::{schedule_cron_jobs, CrontabJob},
utils::{round_date_minute, sleep_until, ONE_MINUTE},
};
mod backfill;
mod sql;
mod utils;
pub async fn cron_main<'e>(
executor: impl PgExecutor<'e> + Clone,
escaped_schema: &str,
crontabs: &[Crontab],
use_local_time: bool,
mut shutdown_signal: ShutdownSignal,
) -> Result<(), ScheduleCronJobError> {
let start = Local::now();
debug!(start = ?start, "cron:starting");
register_and_backfill_items(
executor.clone(),
escaped_schema,
crontabs,
&start,
use_local_time,
)
.await?;
debug!(start = ?start, "cron:started");
let mut ts = round_date_minute(start, true);
loop {
tokio::select! {
_ = sleep_until(ts) => (),
_ = (&mut shutdown_signal) => break Ok(()),
};
let current_ts = round_date_minute(Local::now(), false);
let ts_delta = current_ts - ts;
match ts_delta.num_minutes().cmp(&0) {
Ordering::Greater => {
warn!(
"Cron fired {}s too early (clock skew?); rescheduling",
ts_delta.num_seconds()
);
continue;
}
Ordering::Less => {
warn!(
"Cron fired too late; catching up {}m{}s behind",
ts_delta.num_minutes(),
ts_delta.num_seconds() % 60
);
}
_ => {}
}
let mut jobs: Vec<CrontabJob> = vec![];
for cron in crontabs {
if cron.should_run_at(&ts.naive_local()) {
jobs.push(CrontabJob::for_cron(cron, &ts, false));
}
}
if !jobs.is_empty() {
debug!(nb_jobs = jobs.len(), at = ?ts, "cron:schedule");
schedule_cron_jobs(executor.clone(), &jobs, &ts, escaped_schema, use_local_time)
.await?;
debug!(nb_jobs = jobs.len(), at = ?ts, "cron:scheduled");
}
ts += *ONE_MINUTE;
}
}