rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Shared test helpers.
//!
//! Cargo compiles each `tests/*.rs` file as its own crate, so this module is
//! included via `mod common;` in each test file and compiled once per
//! test binary. Within a single test binary all tests share one Postgres
//! container; each test gets its own ephemeral database for isolation.

#![allow(dead_code, unused_imports)]

use rust_job_queue_api_worker_system::{connect, migrate, PoolConfig};
use sqlx::query::query;
use sqlx_postgres::PgPool;
use testcontainers::{runners::AsyncRunner, ContainerAsync};
use testcontainers_modules::postgres::Postgres;
use tokio::sync::OnceCell;

pub struct Shared {
    pub port: u16,
    _container: ContainerAsync<Postgres>,
}

static SHARED: OnceCell<Shared> = OnceCell::const_new();

pub async fn shared() -> &'static Shared {
    SHARED
        .get_or_init(|| async {
            let container = Postgres::default()
                .start()
                .await
                .expect("start postgres container");
            let port = container
                .get_host_port_ipv4(5432)
                .await
                .expect("postgres host port");
            Shared {
                port,
                _container: container,
            }
        })
        .await
}

/// Allocates a fresh database in the shared container, runs migrations, and
/// returns a connected `PgPool`.
pub async fn fresh_pool() -> PgPool {
    let s = shared().await;
    let port = s.port;

    let admin = sqlx_postgres::PgPool::connect(&format!(
        "postgres://postgres:postgres@127.0.0.1:{port}/postgres"
    ))
    .await
    .expect("admin connect");
    let db = format!("t{}", uuid::Uuid::now_v7().simple());
    query(&format!("CREATE DATABASE {db}"))
        .execute(&admin)
        .await
        .expect("create db");
    admin.close().await;

    let pool = connect(
        &PoolConfig::from_url(format!(
            "postgres://postgres:postgres@127.0.0.1:{port}/{db}"
        ))
        .with_max_connections(16),
    )
    .await
    .expect("connect");
    migrate(&pool).await.expect("migrate");
    pool
}

// ----- API test server helper (only available when the api feature is on) -----

#[cfg(feature = "api")]
pub use api_helpers::{spawn_api, ApiServer};

#[cfg(feature = "api")]
mod api_helpers {
    use std::sync::Arc;

    use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
    use rust_job_queue_api_worker_system::api::{build_router, AppState};
    use sqlx_postgres::PgPool;
    use tokio::net::TcpListener;
    use tokio::sync::OnceCell;

    static METRICS: OnceCell<PrometheusHandle> = OnceCell::const_new();

    async fn metrics_handle() -> PrometheusHandle {
        METRICS
            .get_or_init(|| async {
                let recorder = PrometheusBuilder::new().build_recorder();
                let handle = recorder.handle();
                // `set_global_recorder` returns Err if already set. We only
                // ever attempt to install it once per binary thanks to the
                // OnceCell; if a previous test in the same binary somehow
                // already set a different recorder we just live with it.
                let _ = metrics::set_global_recorder(recorder);
                handle
            })
            .await
            .clone()
    }

    pub struct ApiServer {
        pub base_url: String,
        cancel: tokio::sync::oneshot::Sender<()>,
        task: tokio::task::JoinHandle<()>,
    }

    impl ApiServer {
        pub async fn shutdown(self) {
            let _ = self.cancel.send(());
            let _ = tokio::time::timeout(std::time::Duration::from_secs(5), self.task).await;
        }
    }

    pub async fn spawn_api(pool: PgPool) -> ApiServer {
        let metrics = metrics_handle().await;
        let state = AppState {
            pool,
            metrics: Arc::new(metrics),
        };
        let app = build_router(state);
        let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
        let local_addr = listener.local_addr().expect("local_addr");
        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
        let task = tokio::spawn(async move {
            let _ = axum::serve(listener, app)
                .with_graceful_shutdown(async move {
                    let _ = rx.await;
                })
                .await;
        });
        ApiServer {
            base_url: format!("http://{local_addr}"),
            cancel: tx,
            task,
        }
    }
}