#![cfg(feature = "worker")]
mod common;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use rust_job_queue_api_worker_system::{
queue,
worker::{ExecutionContext, ExecutionOutcome, Executor, WorkerRuntime},
Job, JobKind, NewJob,
};
use serde_json::json;
use sqlx::{query::query, query_as::query_as};
use tokio_util::sync::CancellationToken;
struct RecordingExecutor;
#[async_trait]
impl Executor for RecordingExecutor {
async fn execute(&self, ctx: &ExecutionContext, job: &Job) -> ExecutionOutcome {
tokio::time::sleep(Duration::from_millis(3)).await;
let r = query("INSERT INTO processed_log (job_id, worker_id) VALUES ($1, $2)")
.bind(job.id.as_uuid())
.bind(&ctx.worker_id)
.execute(&ctx.pool)
.await;
match r {
Ok(_) => ExecutionOutcome::Succeeded,
Err(e) => ExecutionOutcome::Failed(format!("log insert: {e}")),
}
}
}
#[tokio::test]
async fn two_hundred_jobs_eight_workers_no_duplicates() {
let pool = common::fresh_pool().await;
query(
"CREATE TABLE processed_log (
job_id UUID NOT NULL,
worker_id TEXT NOT NULL,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT now()
)",
)
.execute(&pool)
.await
.unwrap();
const N_JOBS: usize = 200;
const N_WORKERS: usize = 8;
for _ in 0..N_JOBS {
queue::enqueue(
&pool,
NewJob {
kind: JobKind::SummarizeText,
payload: json!({"text": "lorem ipsum"}),
max_attempts: Some(1),
idempotency_key: None,
},
)
.await
.unwrap();
}
let cancel = CancellationToken::new();
let mut handles = Vec::new();
for i in 0..N_WORKERS {
let runtime = WorkerRuntime::new(pool.clone(), Arc::new(RecordingExecutor))
.with_concurrency(1)
.with_poll_interval(Duration::from_millis(10), Duration::from_millis(50))
.with_worker_id_prefix(format!("w{i}"));
let token = cancel.clone();
handles.push(tokio::spawn(async move { runtime.run(token).await }));
}
let deadline = Instant::now() + Duration::from_secs(60);
loop {
let (succ, fail): (i64, i64) = query_as(
"SELECT (SELECT COUNT(*) FROM jobs WHERE status = 'succeeded'),
(SELECT COUNT(*) FROM jobs WHERE status = 'failed_permanent')",
)
.fetch_one(&pool)
.await
.unwrap();
if succ as usize == N_JOBS {
break;
}
if fail > 0 {
panic!("unexpected failed_permanent jobs: {fail}");
}
if Instant::now() > deadline {
panic!("timeout: only {succ}/{N_JOBS} succeeded");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
cancel.cancel();
for h in handles {
let _ = tokio::time::timeout(Duration::from_secs(10), h).await;
}
let total: (i64,) = query_as("SELECT COUNT(*) FROM processed_log")
.fetch_one(&pool)
.await
.unwrap();
let distinct: (i64,) = query_as("SELECT COUNT(DISTINCT job_id) FROM processed_log")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(total.0, N_JOBS as i64, "total log rows");
assert_eq!(distinct.0, N_JOBS as i64, "distinct job ids");
let attempts_one: (i64,) = query_as("SELECT COUNT(*) FROM jobs WHERE attempts = 1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
attempts_one.0, N_JOBS as i64,
"every job processed exactly once (attempts = 1)"
);
let workers: (i64,) = query_as("SELECT COUNT(DISTINCT worker_id) FROM processed_log")
.fetch_one(&pool)
.await
.unwrap();
assert!(
workers.0 > 1,
"expected multiple workers to have processed jobs; only {} did",
workers.0
);
}