use std::{collections::VecDeque, ops::Add, rc::Rc};
use crate::helpers::StaticCounter;
use chrono::{Duration, Timelike, Utc};
use graphile_worker::{IntoTaskHandlerResult, JobSpec, JobSpecBuilder, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::{
sync::{oneshot, Mutex, OnceCell},
task::spawn_local,
time::{sleep, Instant},
};
mod helpers;
#[tokio::test]
async fn it_should_run_jobs() {
static JOB2_CALL_COUNT: StaticCounter = StaticCounter::new();
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
#[derive(Serialize, Deserialize)]
struct Job2 {
a: u32,
}
impl TaskHandler for Job2 {
const IDENTIFIER: &'static str = "job2";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB2_CALL_COUNT.increment().await;
}
}
#[derive(Serialize, Deserialize)]
struct Job3 {
a: u32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB3_CALL_COUNT.increment().await;
}
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job::<Job2>()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
let start = Utc::now();
{
let utils = worker.create_utils();
utils
.add_raw_job(
"job3",
json!({ "a": 1 }),
JobSpec {
queue_name: Some("myqueue".to_string()),
..Default::default()
},
)
.await
.expect("Failed to add job");
}
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1);
let job = &jobs[0];
let start_diff_ms = (job.run_at.timestamp_millis() - start.timestamp_millis()).abs();
assert!(
job.run_at >= start || start_diff_ms <= 5,
"job.run_at should be >= start or within 5ms tolerance, diff: {}ms",
start_diff_ms
);
assert!(job.run_at <= Utc::now(), "job.run_at should be <= now");
let job_queues = test_db.get_job_queues().await;
assert_eq!(job_queues.len(), 1);
let job_queue = &job_queues[0];
assert_eq!(job_queue.queue_name, "myqueue");
assert_eq!(job_queue.job_count, 1);
assert_eq!(job_queue.locked_at, None);
assert_eq!(job_queue.locked_by, None);
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 1);
assert_eq!(JOB2_CALL_COUNT.get().await, 0);
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 0);
})
.await;
}
#[tokio::test]
async fn it_should_schedule_errors_for_retry() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
#[derive(Serialize, Deserialize)]
struct Job3 {
a: u32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB3_CALL_COUNT.increment().await;
Err("fail".to_string())
}
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
let start = Utc::now();
{
let utils = worker.create_utils();
utils
.add_raw_job(
"job3",
json!({ "a": 1 }),
JobSpec {
queue_name: Some("myqueue".to_string()),
..Default::default()
},
)
.await
.expect("Failed to add job");
}
{
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1);
let job = &jobs[0];
assert_eq!(job.task_identifier, "job3");
assert_eq!(job.payload, json!({ "a": 1 }));
let now = Utc::now();
let start_diff_ms = (job.run_at.timestamp_millis() - start.timestamp_millis()).abs();
assert!(
job.run_at >= start || start_diff_ms <= 5,
"job.run_at should be >= start or within 5ms tolerance, diff: {}ms",
start_diff_ms
);
assert!(job.run_at <= now, "job.run_at should be <= now");
let job_queues = test_db.get_job_queues().await;
assert_eq!(job_queues.len(), 1);
let job_queue = &job_queues[0];
assert_eq!(job_queue.queue_name, "myqueue");
assert_eq!(job_queue.job_count, 1);
assert_eq!(job_queue.locked_at, None);
assert_eq!(job_queue.locked_by, None);
}
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 1);
{
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1);
let job = &jobs[0];
assert_eq!(job.task_identifier, "job3");
assert_eq!(job.attempts, 1);
assert_eq!(job.max_attempts, 25);
assert_eq!(
job.last_error,
Some("TaskError(\"\\\"fail\\\"\")".to_string())
);
assert!(job.run_at > start + chrono::Duration::milliseconds(2719));
assert!(job.run_at < Utc::now() + chrono::Duration::milliseconds(2719));
let job_queues = test_db.get_job_queues().await;
assert_eq!(job_queues.len(), 1);
let q = &job_queues[0];
assert_eq!(q.queue_name, "myqueue");
assert_eq!(q.job_count, 1);
assert_eq!(q.locked_at, None);
assert_eq!(q.locked_by, None);
}
})
.await;
}
#[tokio::test]
async fn it_should_retry_jobs() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
#[derive(Serialize, Deserialize)]
struct Job3 {
a: u32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB3_CALL_COUNT.increment().await;
Err("fail 2".to_string())
}
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
{
let utils = worker.create_utils();
utils
.add_raw_job(
"job3",
json!({ "a": 1 }),
JobSpec {
queue_name: Some("myqueue".to_string()),
..Default::default()
},
)
.await
.expect("Failed to add job");
}
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 1);
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 1);
test_db.make_jobs_run_now("job3").await;
let start = Utc::now();
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 2);
{
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1);
let job = &jobs[0];
assert_eq!(job.task_identifier, "job3");
assert_eq!(job.attempts, 2);
assert_eq!(job.max_attempts, 25);
assert_eq!(
job.last_error,
Some("TaskError(\"\\\"fail 2\\\"\")".to_string())
);
assert!(job.run_at > start + chrono::Duration::milliseconds(7389));
assert!(job.run_at < Utc::now() + chrono::Duration::milliseconds(7389));
let job_queues = test_db.get_job_queues().await;
assert_eq!(job_queues.len(), 1);
let q = &job_queues[0];
assert_eq!(q.queue_name, "myqueue");
assert_eq!(q.job_count, 1);
assert_eq!(q.locked_at, None);
assert_eq!(q.locked_by, None);
}
})
.await;
}
#[tokio::test]
async fn it_should_supports_future_scheduled_jobs() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
#[derive(Serialize, Deserialize)]
struct Job3 {
a: u32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB3_CALL_COUNT.increment().await;
}
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
{
let utils = worker.create_utils();
utils
.add_raw_job(
"job3",
json!({ "a": 1 }),
JobSpec {
run_at: Some(Utc::now() + chrono::Duration::seconds(3)),
..Default::default()
},
)
.await
.expect("Failed to add job");
}
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 0);
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 0);
test_db.make_jobs_run_now("job3").await;
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 1);
{
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 0);
let job_queues = test_db.get_job_queues().await;
assert_eq!(job_queues.len(), 0);
}
})
.await;
}
#[tokio::test]
async fn it_shoud_allow_update_of_pending_jobs() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
#[derive(Serialize, Deserialize)]
struct Job3 {
a: String,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB3_CALL_COUNT.increment().await;
}
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
let run_at = Utc::now()
.add(chrono::Duration::seconds(60))
.with_nanosecond(0)
.unwrap();
let utils = worker.create_utils();
utils
.add_raw_job(
"job3",
json!({ "a": "wrong" }),
JobSpec {
run_at: Some(run_at),
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to add job");
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1);
let job = &jobs[0];
assert_eq!(job.run_at, run_at);
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 0);
let now = Utc::now().with_nanosecond(0).unwrap();
utils
.add_raw_job(
"job3",
json!({ "a": "right" }),
JobSpec {
run_at: Some(now),
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to add job");
let updated_jobs = test_db.get_jobs().await;
assert_eq!(updated_jobs.len(), 1);
let updated_job = &updated_jobs[0];
assert_eq!(job.id, updated_job.id);
assert_eq!(updated_job.run_at, now);
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 1);
})
.await;
}
#[tokio::test]
async fn it_schedules_a_new_job_if_existing_is_completed() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
#[derive(Serialize, Deserialize)]
struct Job3 {
a: String,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB3_CALL_COUNT.increment().await;
}
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
let utils = worker.create_utils();
utils
.add_raw_job(
"job3",
json!({ "a": "first" }),
JobSpec {
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to add job");
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 1);
utils
.add_raw_job(
"job3",
json!({ "a": "second" }),
JobSpec {
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to add job");
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 2);
})
.await;
}
#[tokio::test]
async fn schedules_a_new_job_if_existing_is_being_processed() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
static RX1: OnceCell<Mutex<Option<oneshot::Receiver<()>>>> = OnceCell::const_new();
static RX2: OnceCell<Mutex<Option<oneshot::Receiver<()>>>> = OnceCell::const_new();
#[derive(Deserialize, Serialize)]
struct Job3 {
a: String,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
let n = JOB3_CALL_COUNT.increment().await;
match n {
1 => {
let mut rx_opt = RX1.get().unwrap().lock().await;
if let Some(rx) = rx_opt.take() {
rx.await.unwrap();
}
}
2 => {
let mut rx_opt = RX2.get().unwrap().lock().await;
if let Some(rx) = rx_opt.take() {
rx.await.unwrap();
}
}
_ => unreachable!("Job3 should only be called twice"),
};
Ok::<_, ()>(())
}
}
helpers::with_test_db(|test_db| async move {
let (tx1, rx1) = oneshot::channel::<()>();
let (tx2, rx2) = oneshot::channel::<()>();
RX1.set(Mutex::new(Some(rx1))).unwrap();
RX2.set(Mutex::new(Some(rx2))).unwrap();
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
worker
.create_utils()
.add_raw_job(
"job3",
json!({ "a": "first" }),
JobSpec {
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to add first job");
let worker = Rc::new(worker);
tracing::info!("Starting worker");
let run_once_1 = spawn_local({
let worker = worker.clone();
async move {
worker.run_once().await.expect("Failed to run worker");
}
});
tracing::info!("Waiting for first job to be picked up");
let start_time = Instant::now();
while JOB3_CALL_COUNT.get().await < 1 {
if start_time.elapsed().as_secs() > 5 {
panic!("Job3 should have been executed by now");
}
sleep(tokio::time::Duration::from_millis(100)).await;
}
assert_eq!(JOB3_CALL_COUNT.get().await, 1);
worker
.create_utils()
.add_raw_job(
"job3",
json!({ "a": "second" }),
JobSpec {
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to add second job");
tx1.send(()).unwrap();
run_once_1.await.expect("Failed to run worker");
let run_once_2 = spawn_local({
let worker = worker.clone();
async move {
worker.run_once().await.expect("Failed to run worker");
}
});
tx2.send(()).unwrap();
run_once_2.await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 2);
})
.await;
}
#[tokio::test]
async fn schedules_a_new_job_if_the_existing_is_pending_retry() {
static JOB5_CALL_COUNT: StaticCounter = StaticCounter::new();
#[derive(Deserialize, Serialize)]
struct Job5 {
succeed: bool,
}
impl TaskHandler for Job5 {
const IDENTIFIER: &'static str = "job5";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB5_CALL_COUNT.increment().await;
if !self.succeed {
return Err("fail".to_string());
}
Ok(())
}
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job::<Job5>()
.init()
.await
.expect("Failed to create worker");
worker
.create_utils()
.add_job(
Job5 { succeed: false },
JobSpec {
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to add job");
worker.run_once().await.expect("Failed to run worker");
assert_eq!(
JOB5_CALL_COUNT.get().await,
1,
"job5 should have been called once"
);
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1, "There should be one job scheduled for retry");
let job = &jobs[0];
assert_eq!(job.attempts, 1, "The job should have one failed attempt");
let last_error = job
.last_error
.as_ref()
.expect("The job should have a last error");
assert!(
last_error.contains("fail"),
"The job's last error should contain 'fail'"
);
worker.run_once().await.expect("Failed to run worker");
assert_eq!(
JOB5_CALL_COUNT.get().await,
1,
"job5 should still be called only once"
);
worker
.create_utils()
.add_job(
Job5 { succeed: true },
JobSpec {
job_key: Some("abc".into()),
run_at: Some(Utc::now()),
..Default::default()
},
)
.await
.expect("Failed to update job");
let updated_jobs = test_db.get_jobs().await;
assert_eq!(
updated_jobs.len(),
1,
"There should still be only one job in the database"
);
let updated_job = &updated_jobs[0];
assert_eq!(
updated_job.attempts, 0,
"The job's attempts should be reset to 0"
);
assert!(
updated_job.last_error.is_none(),
"The job's last error should be cleared"
);
worker.run_once().await.expect("Failed to run worker");
assert_eq!(
JOB5_CALL_COUNT.get().await,
2,
"job5 should have been called twice"
);
})
.await;
}
#[tokio::test]
async fn job_details_are_reset_if_not_specified_in_update() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
#[derive(Serialize, Deserialize)]
struct Job3 {
a: u32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB3_CALL_COUNT.increment().await;
}
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
let run_at = Utc::now()
.add(Duration::seconds(3))
.with_nanosecond(0)
.unwrap();
worker
.create_utils()
.add_job(
Job3 { a: 1 },
JobSpec {
queue_name: Some("queue1".into()),
run_at: Some(run_at),
max_attempts: Some(10),
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to add job");
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1);
let original = &jobs[0];
assert_eq!(original.attempts, 0);
assert_eq!(original.key, Some("abc".to_string()));
assert_eq!(original.max_attempts, 10);
assert_eq!(original.payload, serde_json::json!({"a": 1}));
assert_eq!(original.queue_name, Some("queue1".to_string()));
assert_eq!(original.run_at, run_at);
assert_eq!(original.task_identifier, "job3");
worker
.create_utils()
.add_job(
Job3 { a: 1 }, JobSpec {
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to update job");
let updated_jobs = test_db.get_jobs().await;
assert_eq!(updated_jobs.len(), 1);
let updated_job = &updated_jobs[0];
assert_eq!(updated_job.attempts, 0);
assert_eq!(updated_job.key, Some("abc".to_string()));
assert_eq!(updated_job.max_attempts, 25); assert_eq!(updated_job.payload, serde_json::json!({"a": 1})); assert_eq!(updated_job.queue_name, None); assert_ne!(updated_job.run_at, run_at);
let run_at2 = Utc::now()
.add(Duration::seconds(5))
.with_nanosecond(0)
.unwrap();
worker
.create_utils()
.add_job(
Job3 { a: 2 },
JobSpec {
queue_name: Some("queue2".into()),
run_at: Some(run_at2),
max_attempts: Some(100),
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to update job with new details");
let final_jobs = test_db.get_jobs().await;
assert_eq!(final_jobs.len(), 1);
let final_job = &final_jobs[0];
assert_eq!(final_job.attempts, 0);
assert_eq!(final_job.key, Some("abc".to_string()));
assert_eq!(final_job.max_attempts, 100);
assert_eq!(final_job.payload, serde_json::json!({"a": 2}));
assert_eq!(final_job.queue_name, Some("queue2".to_string()));
assert_eq!(final_job.run_at, run_at2);
assert_eq!(final_job.task_identifier, "job3"); })
.await;
}
#[tokio::test]
async fn pending_jobs_can_be_removed() {
#[derive(Serialize, Deserialize)]
struct Job3 {
a: u32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {}
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
worker
.create_utils()
.add_job(
Job3 { a: 1 },
JobSpec {
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to add job");
let jobs_before_removal = test_db.get_jobs().await;
assert_eq!(
jobs_before_removal.len(),
1,
"There should be one job scheduled"
);
worker
.create_utils()
.remove_job("abc")
.await
.expect("Failed to remove job");
let jobs_after_removal = test_db.get_jobs().await;
assert_eq!(
jobs_after_removal.len(),
0,
"There should be no jobs scheduled after removal"
);
})
.await;
}
#[tokio::test]
async fn jobs_in_progress_cannot_be_removed() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
static RX: OnceCell<Mutex<Option<oneshot::Receiver<()>>>> = OnceCell::const_new();
#[derive(Deserialize, Serialize)]
struct Job3 {
a: i32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB3_CALL_COUNT.increment().await;
let mut rx_mutex_guard = RX.get().unwrap().lock().await;
if let Some(rx) = rx_mutex_guard.take() {
rx.await.unwrap();
}
}
}
helpers::with_test_db(|test_db| async move {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
RX.set(Mutex::new(Some(rx))).unwrap();
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
let utils = worker.create_utils();
utils
.add_job(
Job3 { a: 123 },
JobSpec {
job_key: Some("abc".into()),
..Default::default()
},
)
.await
.expect("Failed to add job");
let jobs_before = test_db.get_jobs().await;
assert_eq!(jobs_before.len(), 1, "Job should be scheduled");
let worker_handle = spawn_local(async move {
worker.run_once().await.expect("Failed to run worker");
});
let start_time = Instant::now();
while JOB3_CALL_COUNT.get().await < 1 {
if start_time.elapsed().as_secs() > 5 {
panic!("Job3 should have been picked up by now");
}
sleep(tokio::time::Duration::from_millis(100)).await;
}
assert_eq!(JOB3_CALL_COUNT.get().await, 1, "Job should be in progress");
utils
.remove_job("abc")
.await
.expect("Failed to attempt job removal");
let jobs_during = test_db.get_jobs().await;
assert_eq!(
jobs_during.len(),
1,
"Job should not be removed while in progress"
);
tx.send(()).expect("Failed to send completion signal");
worker_handle.await.expect("Worker task failed");
assert_eq!(JOB3_CALL_COUNT.get().await, 1, "Job should have completed");
let jobs_after = test_db.get_jobs().await;
assert_eq!(
jobs_after.len(),
0,
"Job should be removed after completion"
);
})
.await;
}
#[tokio::test]
async fn runs_jobs_asynchronously() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
static JOB_RX: OnceCell<Mutex<Option<oneshot::Receiver<()>>>> = OnceCell::const_new();
#[derive(Deserialize, Serialize)]
struct Job3 {
a: i32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
JOB3_CALL_COUNT.increment().await;
let mut rx = JOB_RX.get().unwrap().lock().await;
if let Some(receiver) = rx.take() {
receiver.await.unwrap();
}
}
}
helpers::with_test_db(|test_db| async move {
let (tx, rx) = oneshot::channel::<()>();
JOB_RX.set(Mutex::new(Some(rx))).unwrap();
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
let start = Utc::now();
worker
.create_utils()
.add_job(
Job3 { a: 1 },
JobSpec {
queue_name: Some("myqueue".into()),
..Default::default()
},
)
.await
.expect("Failed to add job");
let worker_id = worker.worker_id().to_owned();
let worker_handle = spawn_local(async move {
worker.run_once().await.expect("Failed to run worker");
});
let start_time = Instant::now();
while JOB3_CALL_COUNT.get().await < 1 {
if start_time.elapsed().as_secs() > 5 {
panic!("Job3 should have been picked up by now");
}
sleep(tokio::time::Duration::from_millis(100)).await;
}
assert_eq!(
JOB3_CALL_COUNT.get().await,
1,
"Job should be in progress but not completed"
);
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1, "There should be one job in progress");
let job = &jobs[0];
assert_eq!(job.task_identifier, "job3");
assert_eq!(job.payload, serde_json::json!({ "a": 1 }));
let now = Utc::now();
let start_diff_ms = (job.run_at.timestamp_millis() - start.timestamp_millis()).abs();
assert!(
(job.run_at >= start || start_diff_ms <= 5) && job.run_at <= now,
"Job run_at should be within expected range (>= start or within 5ms tolerance, and <= now). Diff from start: {}ms",
start_diff_ms
);
assert_eq!(job.attempts, 1, "Job attempts should be incremented");
let job_queues = test_db.get_job_queues().await;
assert_eq!(
job_queues.len(),
1,
"There should be one queue with a job in progress"
);
let q = &job_queues[0];
assert_eq!(&q.queue_name, job.queue_name.as_ref().unwrap());
assert_eq!(q.job_count, 1);
assert!(q.locked_at.is_some(), "The job should be locked");
assert!(
q.locked_at.unwrap() >= start && q.locked_at.unwrap() <= Utc::now(),
"The lock time should be within expected range"
);
assert_eq!(q.locked_by, Some(worker_id));
tx.send(()).expect("Failed to send completion signal");
worker_handle.await.expect("Worker task failed");
assert_eq!(JOB3_CALL_COUNT.get().await, 1, "Job should have completed");
let jobs_after = test_db.get_jobs().await;
assert_eq!(
jobs_after.len(),
0,
"Job should be removed after completion"
);
})
.await;
}
#[tokio::test]
async fn runs_jobs_in_parallel() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
static RXS: OnceCell<Mutex<Vec<oneshot::Receiver<()>>>> = OnceCell::const_new();
RXS.set(Mutex::new(vec![])).unwrap();
let mut txs = vec![];
#[derive(Deserialize, Serialize)]
struct Job3 {
a: i32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
let rx = RXS
.get()
.expect("OnceCell should be set globally at the beginning of the test")
.lock()
.await
.remove(0); JOB3_CALL_COUNT.increment().await;
rx.await
.expect("The receiver should not be dropped before the job completes");
}
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.concurrency(10)
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
let worker = Rc::new(worker);
for i in 1..=5 {
let (tx, rx) = oneshot::channel::<()>();
txs.push(tx);
RXS.get().unwrap().lock().await.push(rx);
worker
.create_utils()
.add_job(
Job3 { a: i },
JobSpec {
queue_name: Some(format!("queue_{}", i)),
..Default::default()
},
)
.await
.expect("Failed to add job");
}
let start = Utc::now();
let mut handles = vec![];
let worker_clone = worker.clone();
handles.push(spawn_local(async move {
worker_clone.run_once().await.expect("Failed to run worker");
}));
let start_time = Instant::now();
while JOB3_CALL_COUNT.get().await < 5 {
if start_time.elapsed().as_secs() > 20 {
panic!("Job3 should have been picked up by now");
}
sleep(tokio::time::Duration::from_millis(100)).await;
}
assert_eq!(
JOB3_CALL_COUNT.get().await,
5,
"All jobs should be in progress"
);
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 5, "There should be 5 jobs in progress");
let job_queues = test_db.get_job_queues().await;
assert_eq!(job_queues.len(), 5, "There should be 5 job queues");
for q in job_queues {
assert_eq!(q.job_count, 1, "Each queue should have one job");
let locked_at = q.locked_at.unwrap();
let start_diff_ms = (locked_at.timestamp_millis() - start.timestamp_millis()).abs();
assert!(
locked_at >= start || start_diff_ms <= 5,
"locked_at should be >= start or within 5ms tolerance, diff: {}ms",
start_diff_ms
);
assert!(locked_at <= Utc::now(), "locked_at should be <= now");
}
for tx in txs {
tx.send(()).expect("Failed to send completion signal");
}
for handle in handles {
handle.await.expect("Worker task failed");
}
assert_eq!(
JOB3_CALL_COUNT.get().await,
5,
"All jobs should have completed"
);
let jobs_after = test_db.get_jobs().await;
assert_eq!(
jobs_after.len(),
0,
"All jobs should be removed after completion"
);
})
.await;
}
#[tokio::test]
async fn single_worker_runs_jobs_in_series_purges_all_before_exit() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
static RXS: OnceCell<Mutex<VecDeque<oneshot::Receiver<()>>>> = OnceCell::const_new();
RXS.set(Mutex::new(VecDeque::new())).unwrap();
let mut txs = vec![];
#[derive(Deserialize, Serialize)]
struct Job3 {
a: i32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
let rx = RXS.get().unwrap().lock().await.pop_front().unwrap(); rx.await.unwrap(); JOB3_CALL_COUNT.increment().await; }
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
for _ in 1..=5 {
let (tx, rx) = oneshot::channel::<()>();
txs.push(tx);
RXS.get().unwrap().lock().await.push_back(rx);
worker
.create_utils()
.add_job(Job3 { a: 1 }, JobSpec::default())
.await
.expect("Failed to add job");
}
let worker_handle = spawn_local(async move {
worker.run_once().await.expect("Failed to run worker");
});
let mut i = 0;
for tx in txs {
i += 1;
tx.send(()).expect("Failed to send completion signal");
sleep(tokio::time::Duration::from_millis(100)).await;
assert_eq!(
JOB3_CALL_COUNT.get().await,
i,
"Job {} should be completed",
i,
);
}
worker_handle.await.expect("Worker task failed");
assert_eq!(
JOB3_CALL_COUNT.get().await,
5,
"All jobs should have completed"
);
let jobs_after = test_db.get_jobs().await;
assert_eq!(
jobs_after.len(),
0,
"All jobs should be removed after completion"
);
})
.await;
}
#[tokio::test]
async fn jobs_added_to_the_same_queue_will_be_ran_serially_even_if_multiple_workers() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
static RXS: OnceCell<Mutex<VecDeque<oneshot::Receiver<()>>>> = OnceCell::const_new();
RXS.set(Mutex::new(VecDeque::new())).unwrap();
let mut txs = vec![];
#[derive(Deserialize, Serialize)]
struct Job3 {
a: i32,
}
impl TaskHandler for Job3 {
const IDENTIFIER: &'static str = "job3";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
let rx = RXS.get().unwrap().lock().await.pop_front().unwrap(); rx.await.unwrap(); JOB3_CALL_COUNT.increment().await; }
}
helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job::<Job3>()
.init()
.await
.expect("Failed to create worker");
let worker = Rc::new(worker);
for _ in 1..=5 {
let (tx, rx) = oneshot::channel::<()>();
txs.push(tx);
RXS.get().unwrap().lock().await.push_back(rx);
worker
.create_utils()
.add_job(
Job3 { a: 1 },
JobSpecBuilder::new().queue_name("serial").build(),
)
.await
.expect("Failed to add job");
}
let mut handles = vec![];
for _ in 0..3 {
let worker = worker.clone();
handles.push(spawn_local(async move {
worker.run_once().await.expect("Failed to run worker");
}));
}
for i in 1..=5 {
sleep(tokio::time::Duration::from_millis(50)).await;
txs.remove(0)
.send(())
.expect("Failed to send completion signal");
let start_time = Instant::now();
while JOB3_CALL_COUNT.get().await < i {
if start_time.elapsed().as_secs() > 5 {
panic!("Job3 should have been picked up by now");
}
sleep(tokio::time::Duration::from_millis(100)).await;
}
}
for handle in handles {
handle.await.expect("Worker task failed");
}
assert_eq!(
JOB3_CALL_COUNT.get().await,
5,
"All jobs should have completed"
);
let jobs_after = test_db.get_jobs().await;
assert_eq!(
jobs_after.len(),
0,
"All jobs should be removed after completion"
);
})
.await;
}