rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Concurrency regression test: 200 jobs, 8 worker tasks against a single
//! Postgres. Each job records its execution into a separate `processed_log`
//! table; the test asserts every job appears once and the recorded count is
//! exactly 200. This validates no duplicate processing in this bounded
//! contention scenario.

#![cfg(feature = "worker")]

mod common;

use std::sync::Arc;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use rust_job_queue_api_worker_system::{
    queue,
    worker::{ExecutionContext, ExecutionOutcome, Executor, WorkerRuntime},
    Job, JobKind, NewJob,
};
use serde_json::json;
use sqlx::{query::query, query_as::query_as};
use tokio_util::sync::CancellationToken;

struct RecordingExecutor;

#[async_trait]
impl Executor for RecordingExecutor {
    async fn execute(&self, ctx: &ExecutionContext, job: &Job) -> ExecutionOutcome {
        // A small sleep so workers actually contend for jobs rather than
        // each grabbing one and instantly returning.
        tokio::time::sleep(Duration::from_millis(3)).await;
        let r = query("INSERT INTO processed_log (job_id, worker_id) VALUES ($1, $2)")
            .bind(job.id.as_uuid())
            .bind(&ctx.worker_id)
            .execute(&ctx.pool)
            .await;
        match r {
            Ok(_) => ExecutionOutcome::Succeeded,
            Err(e) => ExecutionOutcome::Failed(format!("log insert: {e}")),
        }
    }
}

#[tokio::test]
async fn two_hundred_jobs_eight_workers_no_duplicates() {
    let pool = common::fresh_pool().await;

    query(
        "CREATE TABLE processed_log (
            job_id      UUID        NOT NULL,
            worker_id   TEXT        NOT NULL,
            recorded_at TIMESTAMPTZ NOT NULL DEFAULT now()
        )",
    )
    .execute(&pool)
    .await
    .unwrap();

    const N_JOBS: usize = 200;
    const N_WORKERS: usize = 8;

    for _ in 0..N_JOBS {
        queue::enqueue(
            &pool,
            NewJob {
                kind: JobKind::SummarizeText,
                payload: json!({"text": "lorem ipsum"}),
                max_attempts: Some(1),
                idempotency_key: None,
            },
        )
        .await
        .unwrap();
    }

    let cancel = CancellationToken::new();
    let mut handles = Vec::new();
    for i in 0..N_WORKERS {
        let runtime = WorkerRuntime::new(pool.clone(), Arc::new(RecordingExecutor))
            .with_concurrency(1)
            .with_poll_interval(Duration::from_millis(10), Duration::from_millis(50))
            .with_worker_id_prefix(format!("w{i}"));
        let token = cancel.clone();
        handles.push(tokio::spawn(async move { runtime.run(token).await }));
    }

    // Wait for all jobs to reach a terminal state.
    let deadline = Instant::now() + Duration::from_secs(60);
    loop {
        let (succ, fail): (i64, i64) = query_as(
            "SELECT (SELECT COUNT(*) FROM jobs WHERE status = 'succeeded'),
                    (SELECT COUNT(*) FROM jobs WHERE status = 'failed_permanent')",
        )
        .fetch_one(&pool)
        .await
        .unwrap();
        if succ as usize == N_JOBS {
            break;
        }
        if fail > 0 {
            panic!("unexpected failed_permanent jobs: {fail}");
        }
        if Instant::now() > deadline {
            panic!("timeout: only {succ}/{N_JOBS} succeeded");
        }
        tokio::time::sleep(Duration::from_millis(100)).await;
    }

    cancel.cancel();
    for h in handles {
        let _ = tokio::time::timeout(Duration::from_secs(10), h).await;
    }

    // Exactly N_JOBS rows in processed_log, all distinct.
    let total: (i64,) = query_as("SELECT COUNT(*) FROM processed_log")
        .fetch_one(&pool)
        .await
        .unwrap();
    let distinct: (i64,) = query_as("SELECT COUNT(DISTINCT job_id) FROM processed_log")
        .fetch_one(&pool)
        .await
        .unwrap();
    assert_eq!(total.0, N_JOBS as i64, "total log rows");
    assert_eq!(distinct.0, N_JOBS as i64, "distinct job ids");

    let attempts_one: (i64,) = query_as("SELECT COUNT(*) FROM jobs WHERE attempts = 1")
        .fetch_one(&pool)
        .await
        .unwrap();
    assert_eq!(
        attempts_one.0, N_JOBS as i64,
        "every job processed exactly once (attempts = 1)"
    );

    // Sanity: more than one worker should have touched jobs (otherwise
    // contention wasn't actually exercised).
    let workers: (i64,) = query_as("SELECT COUNT(DISTINCT worker_id) FROM processed_log")
        .fetch_one(&pool)
        .await
        .unwrap();
    assert!(
        workers.0 > 1,
        "expected multiple workers to have processed jobs; only {} did",
        workers.0
    );
}