graphile_worker_crontab_runner 0.7.9

Crontab runner package for graphile worker, a high performance Rust/PostgreSQL job queue
Documentation
mod schedule;

use std::cmp::Ordering;

use chrono::prelude::*;
use futures::FutureExt;
use graphile_worker_crontab_types::Crontab;
use graphile_worker_database::{DbExecutorArg, Schema};
use graphile_worker_lifecycle_hooks::HookRegistry;
use graphile_worker_shutdown_signal::ShutdownSignal;
use tracing::{debug, warn};

use crate::backfill::register_and_backfill_items;
use crate::clock::{Clock, SystemClock};
use crate::sql::ScheduleCronJobError;
use crate::utils::{round_date_minute, ONE_MINUTE};

use self::schedule::emit_tick_and_schedule_jobs;

pub async fn cron_main(
    executor: impl DbExecutorArg,
    schema: impl Into<Schema>,
    crontabs: &[Crontab],
    use_local_time: bool,
    shutdown_signal: ShutdownSignal,
    hooks: &HookRegistry,
) -> Result<(), ScheduleCronJobError> {
    CronRunner::new(executor, schema, crontabs, hooks)
        .use_local_time(use_local_time)
        .run(shutdown_signal)
        .await
}

pub struct CronRunner<'a, E, C = SystemClock> {
    executor: E,
    schema: Schema,
    crontabs: &'a [Crontab],
    use_local_time: bool,
    hooks: &'a HookRegistry,
    clock: C,
}

impl<'a, E: DbExecutorArg> CronRunner<'a, E, SystemClock> {
    pub fn new(
        executor: E,
        schema: impl Into<Schema>,
        crontabs: &'a [Crontab],
        hooks: &'a HookRegistry,
    ) -> Self {
        Self {
            executor,
            schema: schema.into(),
            crontabs,
            use_local_time: false,
            hooks,
            clock: SystemClock,
        }
    }
}

impl<'a, E: DbExecutorArg, C: Clock> CronRunner<'a, E, C> {
    pub fn use_local_time(mut self, use_local_time: bool) -> Self {
        self.use_local_time = use_local_time;
        self
    }

    pub fn with_clock<C2: Clock>(self, clock: C2) -> CronRunner<'a, E, C2> {
        CronRunner {
            executor: self.executor,
            schema: self.schema,
            crontabs: self.crontabs,
            use_local_time: self.use_local_time,
            hooks: self.hooks,
            clock,
        }
    }

    pub async fn run(
        mut self,
        mut shutdown_signal: ShutdownSignal,
    ) -> Result<(), ScheduleCronJobError> {
        let start = self.clock.now();
        debug!(start = ?start, "cron:starting");

        register_and_backfill_items(
            &mut self.executor,
            &self.schema,
            self.crontabs,
            &start,
            self.use_local_time,
        )
        .await?;
        debug!(start = ?start, "cron:started");

        let mut ts = round_date_minute(start, true);

        loop {
            let should_shutdown = {
                let sleep = self.clock.sleep_until(ts).fuse();
                let shutdown = (&mut shutdown_signal).fuse();
                futures::pin_mut!(sleep, shutdown);

                futures::select_biased! {
                    _ = shutdown => true,
                    _ = sleep => false,
                }
            };

            if should_shutdown {
                break Ok(());
            }

            let current_ts = round_date_minute(self.clock.now(), false);
            let ts_delta = current_ts - ts;

            match ts_delta.num_minutes().cmp(&0) {
                Ordering::Less => {
                    warn!(
                        "Cron fired {}s too early (clock skew?); rescheduling",
                        -ts_delta.num_seconds()
                    );
                    continue;
                }
                Ordering::Greater => {
                    warn!(
                        "Cron fired too late; catching up {}m{}s behind",
                        ts_delta.num_minutes(),
                        ts_delta.num_seconds() % 60
                    );
                }
                _ => {}
            }

            self.emit_tick_and_schedule_jobs(ts).await?;
            ts += *ONE_MINUTE;
        }
    }

    async fn emit_tick_and_schedule_jobs(
        &mut self,
        ts: DateTime<Local>,
    ) -> Result<(), ScheduleCronJobError> {
        emit_tick_and_schedule_jobs(
            &mut self.executor,
            &self.schema,
            self.crontabs,
            self.use_local_time,
            self.hooks,
            ts,
        )
        .await
    }
}