rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Throughput benchmark for `queue::fetch_next` + `mark_succeeded`.
//!
//! Measures the steady-state dequeue+ack rate against a fresh Postgres
//! container. The bench is committed for reproducibility; the README
//! deliberately does NOT publish numbers (a measurement without a
//! reproducible methodology is noise).
//!
//! Run with:
//!
//! ```bash
//! cargo bench --bench dequeue --features worker
//! ```
//!
//! Requirements: Docker on the host (testcontainers spins Postgres).

use std::time::Duration;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use rust_job_queue_api_worker_system::{
    connect, migrate,
    queue::{enqueue, fetch_next, mark_succeeded},
    JobKind, NewJob, PoolConfig,
};
use serde_json::json;
use sqlx_postgres::PgPool;
use testcontainers::{runners::AsyncRunner, ContainerAsync};
use testcontainers_modules::postgres::Postgres;
use tokio::runtime::Runtime;

/// Boot a Postgres container, return its pool. The container is kept alive
/// by the returned guard.
async fn bootstrap_pool() -> (PgPool, ContainerAsync<Postgres>) {
    let container = Postgres::default().start().await.expect("start pg");
    let port = container.get_host_port_ipv4(5432).await.expect("port");
    let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
    let pool = connect(&PoolConfig::from_url(url).with_max_connections(8))
        .await
        .expect("connect");
    migrate(&pool).await.expect("migrate");
    (pool, container)
}

async fn seed_jobs(pool: &PgPool, n: usize) {
    for _ in 0..n {
        enqueue(
            pool,
            NewJob {
                kind: JobKind::SummarizeText,
                payload: json!({ "text": "lorem" }),
                max_attempts: Some(1),
                idempotency_key: None,
            },
        )
        .await
        .expect("enqueue");
    }
}

fn bench_dequeue(c: &mut Criterion) {
    let rt = Runtime::new().expect("tokio runtime");
    let (pool, _container) = rt.block_on(bootstrap_pool());

    let mut group = c.benchmark_group("dequeue_and_ack");
    group.measurement_time(Duration::from_secs(15));
    group.sample_size(10);

    // Each measurement: seed N jobs, then dequeue+ack each one serially
    // from a single worker. Throughput is "jobs / sec".
    for &n in &[100usize, 500] {
        group.throughput(Throughput::Elements(n as u64));
        group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, &n| {
            b.to_async(&rt).iter(|| async {
                seed_jobs(&pool, n).await;
                for _ in 0..n {
                    let job = fetch_next(&pool, "bench-worker")
                        .await
                        .expect("fetch_next")
                        .expect("a job available");
                    mark_succeeded(&pool, job.id).await.expect("mark_succeeded");
                }
            });
        });
    }
    group.finish();
}

criterion_group!(benches, bench_dequeue);
criterion_main!(benches);