use std::time::Duration;
use rust_job_queue_api_worker_system::{
queue::{self, CancelOutcome, ListFilter},
JobKind, JobStatus, NewJob,
};
use serde_json::json;
use sqlx::query::query;
use testcontainers::{runners::AsyncRunner, ContainerAsync};
use testcontainers_modules::postgres::Postgres;
use tokio::sync::OnceCell;
struct Shared {
port: u16,
_container: ContainerAsync<Postgres>,
}
static SHARED: OnceCell<Shared> = OnceCell::const_new();
async fn shared() -> &'static Shared {
SHARED
.get_or_init(|| async {
let container = Postgres::default()
.start()
.await
.expect("start postgres container");
let port = container
.get_host_port_ipv4(5432)
.await
.expect("get host port");
Shared {
port,
_container: container,
}
})
.await
}
async fn fresh_pool() -> sqlx_postgres::PgPool {
let s = shared().await;
let port = s.port;
let admin = sqlx_postgres::PgPool::connect(&format!(
"postgres://postgres:postgres@127.0.0.1:{port}/postgres"
))
.await
.expect("admin connect");
let db = format!("t{}", uuid::Uuid::now_v7().simple());
query(&format!("CREATE DATABASE {db}"))
.execute(&admin)
.await
.expect("create db");
admin.close().await;
let pool = sqlx_postgres::PgPool::connect(&format!(
"postgres://postgres:postgres@127.0.0.1:{port}/{db}"
))
.await
.expect("pool connect");
rust_job_queue_api_worker_system::migrate(&pool)
.await
.expect("run migrations");
pool
}
fn email_payload() -> serde_json::Value {
json!({ "to": "a@b.c", "subject": "hi", "body": "hello" })
}
#[tokio::test]
async fn enqueue_creates_queued_row() {
let pool = fresh_pool().await;
let out = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
assert!(out.is_new());
let job = out.job();
assert_eq!(job.status, JobStatus::Queued);
assert_eq!(job.attempts, 0);
assert_eq!(job.max_attempts, 3);
assert_eq!(job.kind, JobKind::SendEmail);
assert!(job.last_error.is_none());
assert!(!job.cancel_requested);
}
#[tokio::test]
async fn enqueue_validates_payload() {
let pool = fresh_pool().await;
let err = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: json!({ "to": "a@b.c" }), max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap_err();
assert_eq!(err.kind(), "payload_invalid");
}
#[tokio::test]
async fn enqueue_idempotent_on_same_key() {
let pool = fresh_pool().await;
let key = "user-42-welcome-email".to_string();
let first = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: Some(key.clone()),
},
)
.await
.unwrap();
assert!(first.is_new());
let second = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: Some(key),
},
)
.await
.unwrap();
assert!(!second.is_new());
assert_eq!(first.job().id, second.job().id);
}
#[tokio::test]
async fn fetch_next_claims_queued_job() {
let pool = fresh_pool().await;
let enqueued = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
let claimed = queue::fetch_next(&pool, "worker-A")
.await
.unwrap()
.expect("a job is available");
assert_eq!(claimed.id, enqueued.job().id);
assert_eq!(claimed.status, JobStatus::Running);
assert_eq!(claimed.attempts, 1);
assert_eq!(claimed.locked_by.as_deref(), Some("worker-A"));
assert!(claimed.locked_at.is_some());
}
#[tokio::test]
async fn fetch_next_returns_none_when_empty() {
let pool = fresh_pool().await;
let result = queue::fetch_next(&pool, "worker-A").await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn fetch_next_skips_future_run_at() {
let pool = fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
query("UPDATE jobs SET run_at = now() + interval '1 hour' WHERE id = $1")
.bind(job.job().id.as_uuid())
.execute(&pool)
.await
.unwrap();
let claimed = queue::fetch_next(&pool, "worker-A").await.unwrap();
assert!(claimed.is_none());
}
#[tokio::test]
async fn two_concurrent_workers_each_get_a_distinct_job() {
let pool = fresh_pool().await;
for _ in 0..2 {
queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
}
let p1 = pool.clone();
let p2 = pool.clone();
let (a, b) = tokio::join!(
async move { queue::fetch_next(&p1, "w1").await.unwrap().unwrap() },
async move { queue::fetch_next(&p2, "w2").await.unwrap().unwrap() },
);
assert_ne!(
a.id, b.id,
"two workers must claim distinct jobs under SKIP LOCKED"
);
}
#[tokio::test]
async fn mark_succeeded_transitions_running_to_succeeded() {
let pool = fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
let id = job.job().id;
queue::fetch_next(&pool, "w").await.unwrap();
queue::mark_succeeded(&pool, id).await.unwrap();
let after = queue::get(&pool, id).await.unwrap().unwrap();
assert_eq!(after.status, JobStatus::Succeeded);
assert!(after.locked_by.is_none());
assert!(after.locked_at.is_none());
}
#[tokio::test]
async fn mark_succeeded_errors_when_not_running() {
let pool = fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
let err = queue::mark_succeeded(&pool, job.job().id)
.await
.unwrap_err();
assert_eq!(err.kind(), "invalid_transition");
}
#[tokio::test]
async fn mark_failed_returns_to_retrying_with_future_run_at() {
let pool = fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: Some(3),
idempotency_key: None,
},
)
.await
.unwrap();
let id = job.job().id;
let before = chrono::Utc::now();
queue::fetch_next(&pool, "w").await.unwrap(); let updated = queue::mark_failed_or_retry(&pool, id, "boom")
.await
.unwrap();
assert_eq!(updated.status, JobStatus::Retrying);
assert_eq!(updated.attempts, 1);
assert_eq!(updated.last_error.as_deref(), Some("boom"));
assert!(updated.run_at > before, "run_at should be in the future");
}
#[tokio::test]
async fn mark_failed_lands_in_failed_permanent_at_max_attempts() {
let pool = fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: Some(1),
idempotency_key: None,
},
)
.await
.unwrap();
let id = job.job().id;
queue::fetch_next(&pool, "w").await.unwrap(); let updated = queue::mark_failed_or_retry(&pool, id, "fatal")
.await
.unwrap();
assert_eq!(updated.status, JobStatus::FailedPermanent);
assert_eq!(updated.last_error.as_deref(), Some("fatal"));
}
#[tokio::test]
async fn request_cancel_on_queued_cancels_immediately() {
let pool = fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
let outcome = queue::request_cancel(&pool, job.job().id).await.unwrap();
assert_eq!(outcome, CancelOutcome::CancelledNow);
let after = queue::get(&pool, job.job().id).await.unwrap().unwrap();
assert_eq!(after.status, JobStatus::Cancelled);
}
#[tokio::test]
async fn request_cancel_on_running_sets_pending_flag() {
let pool = fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
queue::fetch_next(&pool, "w").await.unwrap();
let outcome = queue::request_cancel(&pool, job.job().id).await.unwrap();
assert_eq!(outcome, CancelOutcome::PendingOnWorker);
let after = queue::get(&pool, job.job().id).await.unwrap().unwrap();
assert!(after.cancel_requested);
assert_eq!(after.status, JobStatus::Running);
queue::finalize_cancelled(&pool, job.job().id)
.await
.unwrap();
let after = queue::get(&pool, job.job().id).await.unwrap().unwrap();
assert_eq!(after.status, JobStatus::Cancelled);
}
#[tokio::test]
async fn request_cancel_on_terminal_is_noop() {
let pool = fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
queue::fetch_next(&pool, "w").await.unwrap();
queue::mark_succeeded(&pool, job.job().id).await.unwrap();
let outcome = queue::request_cancel(&pool, job.job().id).await.unwrap();
assert_eq!(
outcome,
CancelOutcome::AlreadyTerminal(JobStatus::Succeeded)
);
}
#[tokio::test]
async fn request_cancel_on_missing_id_returns_not_found() {
let pool = fresh_pool().await;
let err = queue::request_cancel(&pool, rust_job_queue_api_worker_system::JobId::new())
.await
.unwrap_err();
assert_eq!(err.kind(), "not_found");
}
#[tokio::test]
async fn recover_stale_resets_old_running_rows() {
let pool = fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
queue::fetch_next(&pool, "doomed-worker").await.unwrap();
query("UPDATE jobs SET locked_at = now() - interval '1 hour' WHERE id = $1")
.bind(job.job().id.as_uuid())
.execute(&pool)
.await
.unwrap();
let recovered = queue::recover_stale(&pool, 60).await.unwrap();
assert_eq!(recovered, 1);
let after = queue::get(&pool, job.job().id).await.unwrap().unwrap();
assert_eq!(after.status, JobStatus::Retrying);
assert!(after.locked_by.is_none());
}
#[tokio::test]
async fn list_filters_by_status_and_kind() {
let pool = fresh_pool().await;
let a = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
let _b = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SummarizeText,
payload: json!({ "text": "lorem ipsum" }),
max_attempts: None,
idempotency_key: None,
},
)
.await
.unwrap();
queue::fetch_next(&pool, "w").await.unwrap();
queue::mark_succeeded(&pool, a.job().id).await.unwrap();
let succeeded = queue::list(
&pool,
ListFilter {
status: Some(JobStatus::Succeeded),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(succeeded.len(), 1);
assert_eq!(succeeded[0].id, a.job().id);
let summarize = queue::list(
&pool,
ListFilter {
kind: Some(JobKind::SummarizeText),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(summarize.len(), 1);
}
#[tokio::test]
async fn get_returns_none_for_unknown_id() {
let pool = fresh_pool().await;
let res = queue::get(&pool, rust_job_queue_api_worker_system::JobId::new())
.await
.unwrap();
assert!(res.is_none());
}
#[tokio::test]
async fn retry_run_at_advances_by_at_least_base_backoff() {
let pool = fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SendEmail,
payload: email_payload(),
max_attempts: Some(3),
idempotency_key: None,
},
)
.await
.unwrap();
let id = job.job().id;
let before = chrono::Utc::now();
queue::fetch_next(&pool, "w").await.unwrap();
let updated = queue::mark_failed_or_retry(&pool, id, "err").await.unwrap();
let delta = (updated.run_at - before).to_std().unwrap_or(Duration::ZERO);
assert!(
delta >= Duration::from_millis(900), "expected at least ~1s backoff, got {delta:?}"
);
}