rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Router builder and shared application state.
//!
//! The state and router are deliberately split from the binary entry
//! point so library consumers can mount the whole HTTP surface (or any
//! subset) into their own Axum server. The `job-queue-api` binary is
//! itself only ~50 lines for this reason — almost all of the HTTP wiring
//! is right here, reusable.
//!
//! The middleware stack is the most subtle part of this file. Order
//! matters: layers added later wrap earlier layers, so a request flows
//! through them in the order they appear in this file (top to bottom).
//! The chosen order is:
//!
//! 1. `SetRequestIdLayer` — generates a UUID for every request and sets
//!    `x-request-id` if the client did not provide one. Runs first so
//!    downstream layers can reference the id.
//! 2. `TraceLayer` with a custom span builder — opens a tracing span
//!    that carries `method`, `uri`, and the `request_id` already on the
//!    request. This is what makes log correlation work across the API
//!    and the worker (the worker reads request_id from the payload and
//!    opens its own span with the same id).
//! 3. `PropagateRequestIdLayer` — copies the id from the request onto
//!    the response. Clients with their own tracing can pivot on the id.
//! 4. `TimeoutLayer` — aborts the request with HTTP 504 (Gateway
//!    Timeout) if the handler doesn't complete within 30 seconds.

use std::sync::Arc;
use std::time::Duration;

use axum::routing::{get, post};
use axum::Router;
use metrics_exporter_prometheus::PrometheusHandle;
use sqlx_postgres::PgPool;
use tower::ServiceBuilder;
use tower_http::request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer};
use tower_http::timeout::TimeoutLayer;
use tower_http::trace::TraceLayer;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;

use crate::api::openapi::ApiDoc;
use crate::api::routes;

/// Shared state handed to every Axum handler via [`axum::extract::State`].
///
/// Cheap to clone: `PgPool` is internally an `Arc` and `PrometheusHandle`
/// is wrapped in one explicitly. The API state stays in this struct (not
/// a thread-local or global) so tests can construct independent instances
/// without leaking recorder state across them.
#[derive(Clone)]
pub struct AppState {
    /// Connection pool to the Postgres database hosting the `jobs`
    /// table.
    pub pool: PgPool,
    /// Handle used by the `/metrics` route to render Prometheus text
    /// format. The recorder itself is installed once at startup by the
    /// binary; this handle is the read side.
    pub metrics: Arc<PrometheusHandle>,
}

/// Build the full Axum `Router` for the API.
///
/// Mounts all `/jobs`-prefix routes, `/health`, `/metrics`, Swagger UI
/// at `/docs`, and the OpenAPI JSON at `/api-docs/openapi.json`. Wraps
/// everything in the standard middleware stack (request id, tracing,
/// propagate id, timeout).
///
/// Library consumers who want to add their own routes can call this and
/// then `.route(...)` further. They can also skip this and assemble their
/// own router using the individual handlers in [`routes`].
pub fn build_router(state: AppState) -> Router {
    // Swagger UI mounted at `/docs` serves an interactive form for every
    // endpoint declared in `ApiDoc`. The spec itself is served at
    // `/api-docs/openapi.json` so machine consumers can fetch it.
    let swagger = SwaggerUi::new("/docs").url("/api-docs/openapi.json", ApiDoc::openapi());

    let middleware = ServiceBuilder::new()
        // Step 1: generate / preserve a request id.
        .layer(SetRequestIdLayer::x_request_id(MakeRequestUuid))
        // Step 2: open a tracing span that includes the request id. The
        // custom `make_span_with` lets us read the id off the headers
        // (now set by the layer above) and stamp it into the span. This
        // is the linchpin of request-id → job-id correlation.
        .layer(
            TraceLayer::new_for_http().make_span_with(|req: &axum::http::Request<_>| {
                let request_id = req
                    .headers()
                    .get("x-request-id")
                    .and_then(|v| v.to_str().ok())
                    .unwrap_or("");
                tracing::info_span!(
                    "http",
                    method = %req.method(),
                    uri = %req.uri(),
                    request_id = %request_id,
                )
            }),
        )
        // Step 3: copy the id onto the response so clients can correlate
        // their own logs against ours.
        .layer(PropagateRequestIdLayer::x_request_id())
        // Step 4: hard timeout. A handler that takes longer than 30 s is
        // almost certainly stuck; returning 504 lets the caller retry.
        .layer(TimeoutLayer::with_status_code(
            axum::http::StatusCode::GATEWAY_TIMEOUT,
            Duration::from_secs(30),
        ));

    Router::new()
        // POST /jobs (enqueue), GET /jobs (list).
        .route(
            "/jobs",
            post(routes::jobs::create_job).get(routes::jobs::list_jobs),
        )
        // GET /jobs/{id} (fetch one).
        .route("/jobs/{id}", get(routes::jobs::get_job))
        // POST /jobs/{id}/cancel (request cancellation).
        .route("/jobs/{id}/cancel", post(routes::jobs::cancel_job))
        // GET /health (DB-level liveness).
        .route("/health", get(routes::health::health))
        // GET /metrics (Prometheus text format).
        .route("/metrics", get(routes::metrics::metrics))
        // Swagger UI + OpenAPI JSON.
        .merge(swagger)
        // Middleware stack applied to all routes above. Note: `.layer`
        // applies to the entire router built so far, so the Swagger
        // routes also get the request-id and tracing treatment.
        .layer(middleware)
        // State threaded into every handler.
        .with_state(state)
}