use chrono::prelude::*;
use graphile_worker_crontab_types::Crontab;
use graphile_worker_database::{DbExecutorArg, Schema};
use tracing::debug;
use crate::{
sql::{
get_known_crontabs, insert_unknown_crontabs, schedule_cron_jobs, CrontabJob, KnownCrontab,
ScheduleCronJobError,
},
utils::{round_date_minute, ONE_MINUTE},
};
pub(crate) struct BackfillItemAndDate<'a, 'b> {
item: &'a Crontab,
not_before: &'b DateTime<Local>,
}
pub(crate) struct BackfillAndUnknownItems<'a, 'b> {
backfill_items_and_date: Vec<BackfillItemAndDate<'a, 'b>>,
unknown_identifiers: Vec<&'a String>,
}
pub(crate) fn get_backfill_and_unknown_items<'a, 'b>(
crontabs: &'a [Crontab],
known_crontabs: &'b [KnownCrontab],
) -> BackfillAndUnknownItems<'a, 'b> {
let mut backfill_items_and_date = vec![];
let mut unknown_identifiers = vec![];
for crontab in crontabs {
let known = known_crontabs
.iter()
.find(|uc| uc.identifier() == crontab.task_identifier());
if let Some(known) = known {
let not_before = known
.last_execution()
.as_ref()
.unwrap_or_else(|| known.known_since());
backfill_items_and_date.push(BackfillItemAndDate {
item: crontab,
not_before,
});
} else {
unknown_identifiers.push(crontab.task_identifier());
}
}
BackfillAndUnknownItems {
backfill_items_and_date,
unknown_identifiers,
}
}
pub(crate) async fn register_and_backfill_items<Tz: TimeZone>(
mut executor: impl DbExecutorArg,
schema: &Schema,
crontabs: &[Crontab],
start_time: &DateTime<Tz>,
use_local_time: bool,
) -> Result<(), ScheduleCronJobError>
where
Tz::Offset: Send + Sync,
{
let known_crontabs = get_known_crontabs(&mut executor, schema).await?;
let BackfillAndUnknownItems {
backfill_items_and_date,
unknown_identifiers,
} = get_backfill_and_unknown_items(crontabs, &known_crontabs);
if !unknown_identifiers.is_empty() {
insert_unknown_crontabs(&mut executor, schema, &unknown_identifiers, start_time).await?;
}
let largest_backfill = backfill_items_and_date
.iter()
.filter_map(|c| c.item.options().fill().as_ref())
.map(|f| f.to_secs())
.max();
if let Some(largest_backfill) = largest_backfill {
let mut ts = round_date_minute(
start_time.to_owned() - chrono::Duration::seconds(largest_backfill as i64),
true,
);
while &ts < start_time {
let time_ago = (start_time.to_owned() - ts.to_owned()).num_seconds();
let to_backfill: Vec<CrontabJob> = backfill_items_and_date
.iter()
.filter_map(|b| {
let backfill = b.item.options().fill().as_ref()?;
if backfill.to_secs() as i64 >= time_ago
&& &ts >= b.not_before
&& b.item.should_run_at(&ts.naive_local())
{
return Some(CrontabJob::for_cron(b.item, &ts, true));
}
None
})
.collect();
if !to_backfill.is_empty() {
debug!(nb_jobs = to_backfill.len(), at = ?ts, "cron:backfill");
schedule_cron_jobs(&mut executor, &to_backfill, &ts, schema, use_local_time)
.await?;
}
ts += *ONE_MINUTE;
}
}
Ok(())
}