use std::sync::Arc;
use chrono::{Duration, DurationRound, Local};
use futures::FutureExt;
use graphile_worker::parse_crontab;
use graphile_worker::HookRegistry;
use graphile_worker::IntoTaskHandlerResult;
use graphile_worker::Worker;
use graphile_worker_crontab_runner::{CronRunner, MockClock};
use graphile_worker_shutdown_signal::ShutdownSignal;
use serde::{Deserialize, Serialize};
use sqlx::query;
use tokio::sync::Notify;
use tokio::{task::spawn_local, time::Instant};
use crate::helpers::{with_test_db, StaticCounter};
use graphile_worker::{TaskHandler, WorkerContext};
mod helpers;
fn create_shutdown_signal() -> (ShutdownSignal, Arc<Notify>) {
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let signal: ShutdownSignal = async move {
notify_clone.notified().await;
}
.boxed()
.shared();
(signal, notify)
}
#[tokio::test]
async fn register_identifiers() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
#[derive(Serialize, Deserialize)]
struct Job3 {
a: u32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB3_CALL_COUNT.increment().await;
}
}
with_test_db(|test_db| async move {
test_db
.worker_utils()
.migrate()
.await
.expect("Failed to migrate");
let test_pool = test_db.test_pool.clone();
let worker_handle = spawn_local(async move {
Worker::options()
.pg_pool(test_pool)
.concurrency(3)
.define_job::<Job3>()
.with_crontab(
r#"
0 */4 * * * do_it ?fill=1d
"#,
)
.expect("Invalid crontab")
.init()
.await
.expect("Failed to create worker")
.run()
.await
.expect("Failed to run worker");
});
let mut known_crontabs = test_db.get_known_crontabs().await;
let start = Instant::now();
while known_crontabs.is_empty() {
if start.elapsed().as_secs() > 5 {
panic!("Crontab should have been registered by now");
}
known_crontabs = test_db.get_known_crontabs().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
let known_crontab = &known_crontabs[0];
assert_eq!(known_crontab.identifier(), "do_it");
assert!(known_crontab.known_since() < &chrono::Utc::now());
assert!(known_crontab.last_execution().is_none());
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 0);
worker_handle.abort();
})
.await;
}
#[tokio::test]
async fn backfills_if_identifier_already_registered_5h_ago() {
with_test_db(|test_db| async move {
test_db
.worker_utils()
.migrate()
.await
.expect("Failed to migrate");
let now = Local::now();
let four_hours = Duration::hours(4);
let expected_time = now
.duration_trunc(four_hours)
.expect("Failed to truncate time");
query(
r#"
insert into graphile_worker._private_known_crontabs as known_crontabs (
identifier,
known_since,
last_execution
)
values (
'do_it',
NOW() - interval '14 days',
NOW() - interval '5 hours'
)
"#,
)
.execute(&test_db.test_pool)
.await
.expect("Failed to insert known crontab");
let test_pool = test_db.test_pool.clone();
let worker_handle = spawn_local(async move {
Worker::options()
.pg_pool(test_pool)
.concurrency(3)
.with_crontab(
r#"
0 */4 * * * do_it ?fill=1d
"#,
)
.expect("Invalid crontab")
.init()
.await
.expect("Failed to create worker")
.run()
.await
.expect("Failed to run worker");
});
let start_time = Instant::now();
loop {
let known_crontabs = test_db.get_known_crontabs().await;
assert_eq!(known_crontabs.len(), 1);
let known_crontab = &known_crontabs[0];
assert_eq!(known_crontab.identifier(), "do_it");
assert!(known_crontab.known_since() < &chrono::Utc::now());
assert!(known_crontab.last_execution().is_some());
let last_execution = known_crontab.last_execution().unwrap();
let is_expected_time = last_execution == expected_time;
let is_expected_time_plus_4h = last_execution == (expected_time + four_hours);
if start_time.elapsed().as_secs() > 5 || is_expected_time || is_expected_time_plus_4h {
assert!(
is_expected_time || is_expected_time_plus_4h,
r#"
There's a small window every 4 hours where the expect might fail
due to the clock advancing, so we account for that by checking
both of the expected times.
Last execution: {:?}
Expected time: {:?}
Expected time + 4h: {:?}
"#,
last_execution,
expected_time,
expected_time + four_hours,
);
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
let jobs = test_db.get_jobs().await;
assert!(
jobs.len() == 1 || jobs.len() == 2,
"It's a 5 hour window for a job that runs every 4 hours, there should be 1 or 2 jobs"
);
assert!(jobs.iter().all(|job| &job.task_identifier == "do_it"));
worker_handle.abort();
})
.await;
}
#[tokio::test]
async fn backfills_if_identifier_already_registered_25h_ago() {
with_test_db(|test_db| async move {
test_db
.worker_utils()
.migrate()
.await
.expect("Failed to migrate");
let now = Local::now();
let four_hours = Duration::hours(4);
let expected_time = now
.duration_trunc(four_hours)
.expect("Failed to truncate time");
query(
r#"
insert into graphile_worker._private_known_crontabs as known_crontabs (
identifier,
known_since,
last_execution
)
values (
'do_it',
NOW() - interval '14 days',
NOW() - interval '25 hours'
)
"#,
)
.execute(&test_db.test_pool)
.await
.expect("Failed to insert known crontab");
let test_pool = test_db.test_pool.clone();
let worker_handle = spawn_local(async move {
Worker::options()
.pg_pool(test_pool)
.concurrency(3)
.with_crontab(
r#"
0 */4 * * * do_it ?fill=1d
"#,
)
.expect("Invalid crontab")
.init()
.await
.expect("Failed to create worker")
.run()
.await
.expect("Failed to run worker");
});
let start_time = Instant::now();
loop {
let known_crontabs = test_db.get_known_crontabs().await;
assert_eq!(known_crontabs.len(), 1);
let known_crontab = &known_crontabs[0];
assert_eq!(known_crontab.identifier(), "do_it");
assert!(known_crontab.known_since() < &chrono::Utc::now());
assert!(known_crontab.last_execution().is_some());
let last_execution = known_crontab.last_execution().unwrap();
let is_expected_time = last_execution == expected_time;
let is_expected_time_plus_4h = last_execution == (expected_time + four_hours);
if start_time.elapsed().as_secs() > 5 || is_expected_time || is_expected_time_plus_4h {
assert!(
is_expected_time || is_expected_time_plus_4h,
r#"
There's a small window every 4 hours where the expect might fail
due to the clock advancing, so we account for that by checking
both of the expected times.
Last execution: {:?}
Expected time: {:?}
Expected time + 4h: {:?}
"#,
last_execution,
expected_time,
expected_time + four_hours,
);
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
let jobs = test_db.get_jobs().await;
assert!(
jobs.len() == 6 || jobs.len() == 7,
"It's a 25 hour window for a job that runs every 4 hours, there should be 6 or 7 jobs"
);
assert!(jobs.iter().all(|job| &job.task_identifier == "do_it"));
worker_handle.abort();
})
.await;
}
#[tokio::test]
async fn cron_runner_schedules_job_on_tick() {
with_test_db(|test_db| async move {
test_db
.worker_utils()
.migrate()
.await
.expect("Failed to migrate");
let initial_time = Local::now();
let clock = Arc::new(MockClock::new(initial_time));
let (shutdown_signal, shutdown_notify) = create_shutdown_signal();
let crontabs = parse_crontab("* * * * * test_task").expect("Failed to parse crontab");
let hooks = HookRegistry::default();
let test_pool = test_db.test_pool.clone();
let clock_for_runner = clock.clone();
let runner_handle = spawn_local(async move {
CronRunner::new(&test_pool, "graphile_worker", &crontabs, &hooks)
.with_clock(clock_for_runner)
.run(shutdown_signal)
.await
});
tokio::task::yield_now().await;
clock.advance(Duration::minutes(2));
let start = Instant::now();
loop {
let jobs = test_db.get_jobs().await;
if !jobs.is_empty() {
assert_eq!(jobs[0].task_identifier, "test_task");
break;
}
if start.elapsed().as_secs() > 5 {
panic!("Job should have been scheduled by now");
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
shutdown_notify.notify_one();
runner_handle.await.ok();
})
.await;
}
#[tokio::test]
async fn cron_runner_catches_up_after_clock_jump() {
with_test_db(|test_db| async move {
test_db
.worker_utils()
.migrate()
.await
.expect("Failed to migrate");
let initial_time = Local::now();
let clock = Arc::new(MockClock::new(initial_time));
let (shutdown_signal, shutdown_notify) = create_shutdown_signal();
let crontabs = parse_crontab("* * * * * catchup_task").expect("Failed to parse crontab");
let hooks = HookRegistry::default();
let test_pool = test_db.test_pool.clone();
let clock_for_runner = clock.clone();
let runner_handle = spawn_local(async move {
CronRunner::new(&test_pool, "graphile_worker", &crontabs, &hooks)
.with_clock(clock_for_runner)
.run(shutdown_signal)
.await
});
tokio::task::yield_now().await;
clock.advance(Duration::minutes(5));
let start = Instant::now();
loop {
let jobs = test_db.get_jobs().await;
if jobs.len() >= 4 {
assert!(jobs.iter().all(|j| j.task_identifier == "catchup_task"));
break;
}
if start.elapsed().as_secs() > 5 {
let jobs = test_db.get_jobs().await;
panic!(
"Expected at least 4 jobs after 5 minute clock jump, got {}",
jobs.len()
);
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
shutdown_notify.notify_one();
runner_handle.await.ok();
})
.await;
}
#[tokio::test]
async fn cron_runner_calls_hooks() {
use graphile_worker::{CronJobScheduled, CronTick, Plugin};
use std::sync::atomic::{AtomicU32, Ordering};
static TICK_COUNT: AtomicU32 = AtomicU32::new(0);
static SCHEDULED_COUNT: AtomicU32 = AtomicU32::new(0);
struct CronHooksPlugin;
impl Plugin for CronHooksPlugin {
fn register(self, hooks: &mut HookRegistry) {
hooks.on(CronTick, |_ctx| async {
TICK_COUNT.fetch_add(1, Ordering::SeqCst);
});
hooks.on(CronJobScheduled, |_ctx| async {
SCHEDULED_COUNT.fetch_add(1, Ordering::SeqCst);
});
}
}
with_test_db(|test_db| async move {
test_db
.worker_utils()
.migrate()
.await
.expect("Failed to migrate");
let initial_time = Local::now();
let clock = Arc::new(MockClock::new(initial_time));
let (shutdown_signal, shutdown_notify) = create_shutdown_signal();
let crontabs = parse_crontab("* * * * * hook_task").expect("Failed to parse crontab");
let hooks = HookRegistry::default().with_plugin(CronHooksPlugin);
let test_pool = test_db.test_pool.clone();
let clock_for_runner = clock.clone();
let runner_handle = spawn_local(async move {
CronRunner::new(&test_pool, "graphile_worker", &crontabs, &hooks)
.with_clock(clock_for_runner)
.run(shutdown_signal)
.await
});
tokio::task::yield_now().await;
clock.advance(Duration::minutes(3));
let start = Instant::now();
loop {
let tick_count = TICK_COUNT.load(Ordering::SeqCst);
let scheduled_count = SCHEDULED_COUNT.load(Ordering::SeqCst);
if tick_count >= 2 && scheduled_count >= 2 {
break;
}
if start.elapsed().as_secs() > 5 {
panic!(
"Expected hooks to be called. tick_count={}, scheduled_count={}",
tick_count, scheduled_count
);
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
shutdown_notify.notify_one();
runner_handle.await.ok();
})
.await;
}
#[tokio::test]
async fn cron_runner_shutdown_cleanly() {
with_test_db(|test_db| async move {
test_db
.worker_utils()
.migrate()
.await
.expect("Failed to migrate");
let initial_time = Local::now();
let clock = Arc::new(MockClock::new(initial_time));
let (shutdown_signal, shutdown_notify) = create_shutdown_signal();
let crontabs = parse_crontab("* * * * * shutdown_task").expect("Failed to parse crontab");
let hooks = HookRegistry::default();
let test_pool = test_db.test_pool.clone();
let clock_for_runner = clock.clone();
let runner_handle = spawn_local(async move {
CronRunner::new(&test_pool, "graphile_worker", &crontabs, &hooks)
.with_clock(clock_for_runner)
.run(shutdown_signal)
.await
});
tokio::task::yield_now().await;
shutdown_notify.notify_one();
let result = runner_handle.await;
assert!(result.is_ok(), "Runner should complete without error");
assert!(
result.unwrap().is_ok(),
"Runner should return Ok on shutdown"
);
})
.await;
}