rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Concurrent dequeue throughput harness.
//!
//! This is a custom bench (no Criterion), driven directly so we can
//! control container boot, configuration overrides, warmup, and the
//! contention pattern. The single-worker Criterion bench at
//! `benches/dequeue.rs` measures queue overhead; this one measures
//! *real* concurrent throughput under contention.
//!
//! Methodology in one paragraph:
//! For each (Postgres config, concurrency level) pair, the harness
//! seeds `TOTAL_JOBS` queued rows, spawns `concurrency` Tokio tasks
//! synced on a Barrier (so the wall-clock measurement starts only when
//! every worker is ready to dequeue), and each task runs a tight
//! `fetch_next` + `mark_succeeded` loop against the shared pool until
//! every job has been processed. The measurement is wall-clock time
//! from "all workers go" to "all jobs succeeded". Throughput is
//! `TOTAL_JOBS / elapsed_seconds`. Each (config, concurrency) point is
//! sampled `SAMPLES` times and the mean + stddev are reported.
//!
//! Run with:
//!
//! ```bash
//! cargo bench --bench throughput --features worker
//! ```
//!
//! Run via `bench/run.sh` to also capture hardware info and write the
//! full markdown report to `bench/RESULTS.md`.

#![cfg(feature = "worker")]

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

use rust_job_queue_api_worker_system::{
    connect, migrate,
    queue::{enqueue, fetch_next, mark_succeeded},
    JobKind, NewJob, PoolConfig,
};
use serde_json::json;
use sqlx::query::query;
use sqlx_postgres::PgPool;
use testcontainers::{runners::AsyncRunner, ContainerAsync, ImageExt};
use testcontainers_modules::postgres::Postgres;
use tokio::runtime::Builder;
use tokio::sync::Barrier;

/// Number of jobs each measurement processes. Larger reduces the
/// signal/noise ratio of one-off setup work; smaller keeps individual
/// measurements quick. 2,000 is a balance: ~5–30 s of wall time per
/// measurement across the concurrency sweep.
const TOTAL_JOBS: usize = 2_000;

/// Concurrency levels swept. Single → maxed out for an 8-core box.
const CONCURRENCIES: &[usize] = &[1, 2, 4, 8, 16];

/// Number of independent runs per (config, concurrency) point. Reported
/// as mean + standard deviation in the markdown table.
const SAMPLES: usize = 3;

#[derive(Clone, Copy)]
enum PgConfig {
    /// `postgres:16-alpine` with default settings. Baseline.
    Default,
    /// `shared_buffers=256MB`, `work_mem=8MB`, `max_connections=200`.
    /// Models the "we tuned for the workload" production posture.
    Tuned,
    /// `synchronous_commit=off`. The canonical "trade some durability
    /// for throughput" knob; commits return as soon as WAL is in the
    /// kernel page cache rather than on disk.
    AsyncCommit,
}

impl PgConfig {
    fn label(&self) -> &'static str {
        match self {
            PgConfig::Default => "default",
            PgConfig::Tuned => "tuned",
            PgConfig::AsyncCommit => "async_commit",
        }
    }
    fn description(&self) -> &'static str {
        match self {
            PgConfig::Default => "postgres:16-alpine, defaults",
            PgConfig::Tuned => "shared_buffers=256MB, work_mem=8MB, max_connections=200",
            PgConfig::AsyncCommit => "synchronous_commit=off, max_connections=200",
        }
    }
    /// Postgres CLI args appended after `postgres` in the container's
    /// command line. Empty for the default case.
    fn cmd(&self) -> Vec<String> {
        match self {
            PgConfig::Default => vec!["postgres".into()],
            PgConfig::Tuned => vec![
                "postgres".into(),
                "-c".into(),
                "shared_buffers=256MB".into(),
                "-c".into(),
                "work_mem=8MB".into(),
                "-c".into(),
                "max_connections=200".into(),
            ],
            PgConfig::AsyncCommit => vec![
                "postgres".into(),
                "-c".into(),
                "synchronous_commit=off".into(),
                "-c".into(),
                "max_connections=200".into(),
            ],
        }
    }
}

/// One row of the results table: the wall-clock samples and the
/// derived throughput per sample.
struct Measurement {
    config: &'static str,
    config_description: &'static str,
    concurrency: usize,
    elapsed_per_sample: Vec<Duration>,
}

impl Measurement {
    fn throughputs_jobs_per_sec(&self) -> Vec<f64> {
        self.elapsed_per_sample
            .iter()
            .map(|d| TOTAL_JOBS as f64 / d.as_secs_f64())
            .collect()
    }
    fn mean(&self) -> f64 {
        let v = self.throughputs_jobs_per_sec();
        v.iter().sum::<f64>() / v.len() as f64
    }
    fn stddev(&self) -> f64 {
        let v = self.throughputs_jobs_per_sec();
        let mean = self.mean();
        let var = v.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / v.len() as f64;
        var.sqrt()
    }
}

async fn boot_pg(config: PgConfig) -> ContainerAsync<Postgres> {
    Postgres::default()
        .with_cmd(config.cmd())
        .start()
        .await
        .expect("start pg container")
}

/// Allocate a fresh database inside the running container and return a
/// pool sized generously enough for the highest concurrency level we'll
/// run against it.
async fn fresh_pool(container: &ContainerAsync<Postgres>) -> PgPool {
    let port = container.get_host_port_ipv4(5432).await.expect("port");
    let admin = sqlx_postgres::PgPool::connect(&format!(
        "postgres://postgres:postgres@127.0.0.1:{port}/postgres"
    ))
    .await
    .expect("admin connect");
    let db = format!("t{}", uuid::Uuid::now_v7().simple());
    query(&format!("CREATE DATABASE {db}"))
        .execute(&admin)
        .await
        .expect("create db");
    admin.close().await;

    // Pool sized to comfortably handle the highest concurrency in
    // `CONCURRENCIES` plus seeding overhead.
    let pool = connect(
        &PoolConfig::from_url(format!(
            "postgres://postgres:postgres@127.0.0.1:{port}/{db}"
        ))
        .with_max_connections(32),
    )
    .await
    .expect("pool connect");
    migrate(&pool).await.expect("migrate");
    pool
}

async fn seed_jobs(pool: &PgPool, n: usize) {
    for _ in 0..n {
        enqueue(
            pool,
            NewJob {
                kind: JobKind::SummarizeText,
                payload: json!({ "text": "lorem" }),
                max_attempts: Some(1),
                idempotency_key: None,
            },
        )
        .await
        .expect("enqueue");
    }
}

/// Run `concurrency` workers in parallel until `target` jobs have been
/// dequeued and acked. Return the wall-clock duration from the
/// post-barrier start to the last ack.
async fn run_workers(pool: &PgPool, concurrency: usize, target: usize) -> Duration {
    let processed = Arc::new(AtomicUsize::new(0));
    // Use `concurrency + 1` so the main task can also wait on the
    // barrier — this means the timer starts after every worker has
    // crossed the line, not after spawn.
    let barrier = Arc::new(Barrier::new(concurrency + 1));

    let mut tasks = Vec::with_capacity(concurrency);
    for i in 0..concurrency {
        let pool = pool.clone();
        let processed = processed.clone();
        let barrier = barrier.clone();
        let worker_id = format!("bench-worker-{i}");
        tasks.push(tokio::spawn(async move {
            barrier.wait().await;
            loop {
                // Early-exit check: if all the seeded work has been
                // accounted for, we're done.
                if processed.load(Ordering::Relaxed) >= target {
                    return;
                }
                match fetch_next(&pool, &worker_id).await {
                    Ok(Some(job)) => {
                        // mark_succeeded is what the worker would call
                        // after `Executor::execute` returns succeeded;
                        // no executor work in the bench so this measures
                        // the queue itself.
                        let _ = mark_succeeded(&pool, job.id).await;
                        processed.fetch_add(1, Ordering::Relaxed);
                    }
                    Ok(None) => {
                        // No rows available right now. If we hit the
                        // target during a fetch, exit. Otherwise yield
                        // and retry — a stray race where another worker
                        // is mid-fetch is the only realistic cause.
                        if processed.load(Ordering::Relaxed) >= target {
                            return;
                        }
                        tokio::task::yield_now().await;
                    }
                    Err(e) => {
                        eprintln!("bench: fetch_next error: {e}");
                        return;
                    }
                }
            }
        }));
    }

    // Wait until every worker has reached the barrier, then start the
    // clock and release them.
    barrier.wait().await;
    let started = Instant::now();
    // The barrier release happens here — workers had been parked on
    // `barrier.wait().await` and the main task's `wait()` above unblocks
    // them.
    for t in tasks {
        let _ = t.await;
    }
    started.elapsed()
}

fn main() {
    // Single-threaded `current_thread` runtime would serialise all
    // tokio tasks onto one thread and defeat the whole point. Use a
    // multi-thread runtime sized to the host so the OS scheduler can
    // dispatch workers across cores.
    let rt = Builder::new_multi_thread()
        .enable_all()
        .build()
        .expect("tokio runtime");

    let configs = [PgConfig::Default, PgConfig::Tuned, PgConfig::AsyncCommit];
    let mut measurements: Vec<Measurement> = Vec::new();

    for cfg in configs {
        eprintln!(
            "\n=== Postgres config: {} ({}) ===",
            cfg.label(),
            cfg.description()
        );

        rt.block_on(async {
            // One container per config; the inner DBs are recreated
            // between samples to wipe the queue state.
            let container = boot_pg(cfg).await;

            for &concurrency in CONCURRENCIES {
                let mut samples = Vec::with_capacity(SAMPLES);

                for sample_i in 0..SAMPLES {
                    // Fresh DB so each sample starts from a clean
                    // `jobs` table (no rows, no autovacuum debt from
                    // the previous sample).
                    let pool = fresh_pool(&container).await;
                    seed_jobs(&pool, TOTAL_JOBS).await;

                    let elapsed = run_workers(&pool, concurrency, TOTAL_JOBS).await;
                    let throughput = TOTAL_JOBS as f64 / elapsed.as_secs_f64();
                    eprintln!(
                        "  concurrency={concurrency:>2}  sample={}  elapsed={:>7.3}s  throughput={:>8.1} jobs/s",
                        sample_i + 1,
                        elapsed.as_secs_f64(),
                        throughput
                    );
                    samples.push(elapsed);

                    pool.close().await;
                }

                measurements.push(Measurement {
                    config: cfg.label(),
                    config_description: cfg.description(),
                    concurrency,
                    elapsed_per_sample: samples,
                });
            }
        });
    }

    // Emit the markdown table to stdout. `run.sh` redirects this into
    // the right section of bench/RESULTS.md.
    print_markdown_results(&measurements);
}

fn print_markdown_results(measurements: &[Measurement]) {
    println!();
    println!("<!-- BEGIN: bench results table -->");
    println!();
    println!("| Postgres config | Concurrency | Throughput (jobs/s) | Stddev (jobs/s) | Samples |");
    println!("|---|---:|---:|---:|---:|");

    let mut last_cfg = "";
    for m in measurements {
        let mean = m.mean();
        let stddev = m.stddev();
        let cfg_label = if m.config == last_cfg {
            "".to_string()
        } else {
            format!("**{}** _({})_", m.config, m.config_description)
        };
        last_cfg = m.config;
        println!(
            "| {} | {} | {:.1} | {:.1} | {} |",
            cfg_label,
            m.concurrency,
            mean,
            stddev,
            m.elapsed_per_sample.len()
        );
    }
    println!();
    println!("<!-- END: bench results table -->");
}