rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Integration test: enqueue a job, run a worker, verify it is processed.

#![cfg(feature = "worker")]

mod common;

use std::sync::Arc;
use std::time::{Duration, Instant};

use rust_job_queue_api_worker_system::{
    queue,
    worker::{SimulatedExecutor, WorkerRuntime},
    JobKind, JobStatus, NewJob,
};
use serde_json::json;
use tokio_util::sync::CancellationToken;

#[tokio::test]
async fn worker_processes_queued_job_to_succeeded() {
    let pool = common::fresh_pool().await;

    let job = queue::enqueue(
        &pool,
        NewJob {
            kind: JobKind::SummarizeText,
            payload: json!({"text": "lorem ipsum"}),
            max_attempts: Some(3),
            idempotency_key: None,
        },
    )
    .await
    .expect("enqueue");
    let id = job.job().id;

    let cancel = CancellationToken::new();
    let runtime = WorkerRuntime::new(pool.clone(), Arc::new(SimulatedExecutor))
        .with_concurrency(1)
        .with_poll_interval(Duration::from_millis(20), Duration::from_millis(100));
    let cancel_for_runtime = cancel.clone();
    let task = tokio::spawn(async move { runtime.run(cancel_for_runtime).await });

    let deadline = Instant::now() + Duration::from_secs(15);
    let final_status = loop {
        let cur = queue::get(&pool, id).await.expect("get").expect("present");
        if cur.status == JobStatus::Succeeded || cur.status == JobStatus::FailedPermanent {
            break cur.status;
        }
        if Instant::now() > deadline {
            panic!("timeout: status={:?} attempts={}", cur.status, cur.attempts);
        }
        tokio::time::sleep(Duration::from_millis(80)).await;
    };

    cancel.cancel();
    let _ = tokio::time::timeout(Duration::from_secs(5), task).await;

    assert_eq!(final_status, JobStatus::Succeeded);
    let after = queue::get(&pool, id).await.unwrap().unwrap();
    assert!(after.locked_by.is_none());
    assert!(after.locked_at.is_none());
}