1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use std::cmp::Ordering;

use backfill::register_and_backfill_items;
use chrono::prelude::*;
use graphile_worker_crontab_types::Crontab;
use graphile_worker_shutdown_signal::ShutdownSignal;
use sqlx::PgExecutor;
use tracing::{debug, warn};

pub use crate::sql::KnownCrontab;
pub use crate::sql::ScheduleCronJobError;
use crate::{
    sql::{schedule_cron_jobs, CrontabJob},
    utils::{round_date_minute, sleep_until, ONE_MINUTE},
};

mod backfill;
mod sql;
mod utils;

pub async fn cron_main<'e>(
    executor: impl PgExecutor<'e> + Clone,
    escaped_schema: &str,
    crontabs: &[Crontab],
    use_local_time: bool,
    mut shutdown_signal: ShutdownSignal,
) -> Result<(), ScheduleCronJobError> {
    let start = Local::now();
    debug!(start = ?start, "cron:starting");

    register_and_backfill_items(
        executor.clone(),
        escaped_schema,
        crontabs,
        &start,
        use_local_time,
    )
    .await?;
    debug!(start = ?start, "cron:started");

    let mut ts = round_date_minute(start, true);

    loop {
        tokio::select! {
            _ = sleep_until(ts) => (),
            _ = (&mut shutdown_signal) => break Ok(()),
        };

        let current_ts = round_date_minute(Local::now(), false);
        let ts_delta = current_ts - ts;

        match ts_delta.num_minutes().cmp(&0) {
            Ordering::Greater => {
                warn!(
                    "Cron fired {}s too early (clock skew?); rescheduling",
                    ts_delta.num_seconds()
                );
                continue;
            }
            Ordering::Less => {
                warn!(
                    "Cron fired too late; catching up {}m{}s behind",
                    ts_delta.num_minutes(),
                    ts_delta.num_seconds() % 60
                );
            }
            _ => {}
        }

        let mut jobs: Vec<CrontabJob> = vec![];

        // Gather relevant jobs
        for cron in crontabs {
            if cron.should_run_at(&ts.naive_local()) {
                jobs.push(CrontabJob::for_cron(cron, &ts, false));
            }
        }

        if !jobs.is_empty() {
            debug!(nb_jobs = jobs.len(), at = ?ts, "cron:schedule");
            schedule_cron_jobs(executor.clone(), &jobs, &ts, escaped_schema, use_local_time)
                .await?;
            debug!(nb_jobs = jobs.len(), at = ?ts, "cron:scheduled");
        }

        ts += *ONE_MINUTE;
    }
}