rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! API entry point.
//!
//! Wires the runtime: tracing subscriber, Prometheus recorder, database
//! pool, idempotent migration apply, and the Axum router from
//! [`rust_job_queue_api_worker_system::api::build_router`]. Then calls
//! `axum::serve` with a graceful-shutdown future hooked to
//! SIGINT/SIGTERM.
//!
//! This file is intentionally short — the goal is for all the
//! interesting behaviour to live in library code (so it's testable,
//! reusable, and well-isolated) and for this binary to be just the
//! "wire the process together" layer.

use std::sync::Arc;

use anyhow::Context;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use rust_job_queue_api_worker_system::{
    api::{build_router, AppState},
    connect, migrate, PoolConfig,
};
use tokio::net::TcpListener;
use tokio::signal;
use tracing::info;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    init_tracing();

    let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?;
    let bind_addr = std::env::var("API_BIND_ADDR").unwrap_or_else(|_| "0.0.0.0:8080".into());

    let pool = connect(&PoolConfig::from_url(database_url)).await?;
    // Idempotent in normal deployments because the `migrations` service
    // runs migrations once before any api container starts. We re-call
    // it here so a developer running `cargo run --bin job-queue-api`
    // against a fresh database doesn't have to run a separate migrate
    // command.
    migrate(&pool).await?;

    let metrics_handle = init_metrics()?;
    let state = AppState {
        pool,
        metrics: Arc::new(metrics_handle),
    };

    let app = build_router(state);
    let listener = TcpListener::bind(&bind_addr).await?;
    let local_addr = listener.local_addr()?;
    info!(%local_addr, "api listening");

    // axum::serve's graceful shutdown stops accepting new connections
    // as soon as the future resolves and drains in-flight requests
    // before returning.
    axum::serve(listener, app)
        .with_graceful_shutdown(wait_shutdown())
        .await?;
    info!("api exiting");
    Ok(())
}

/// Install the global tracing subscriber.
///
/// JSON output when `RUST_LOG_FORMAT=json`, human-friendly otherwise.
/// The filter respects `RUST_LOG` per standard EnvFilter syntax; the
/// default (`info,sqlx=warn`) silences sqlx's per-query debug spam
/// while keeping our own info logs.
fn init_tracing() {
    let filter =
        EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info,sqlx=warn"));
    let json = std::env::var("RUST_LOG_FORMAT").as_deref().unwrap_or("") == "json";
    let builder = tracing_subscriber::fmt().with_env_filter(filter);
    if json {
        builder.json().init();
    } else {
        builder.init();
    }
}

/// Install the global Prometheus recorder and return a handle for the
/// `/metrics` route to render against.
///
/// Unlike the worker binary, the API doesn't use `with_http_listener`
/// — the metrics are served by an Axum route on the same listener as
/// the rest of the API, so we just need the recorder + handle.
fn init_metrics() -> anyhow::Result<PrometheusHandle> {
    let recorder = PrometheusBuilder::new().build_recorder();
    let handle = recorder.handle();
    metrics::set_global_recorder(recorder)
        .map_err(|e| anyhow::anyhow!("failed to install prometheus recorder: {e}"))?;
    Ok(handle)
}

/// Resolves on the first of SIGINT or SIGTERM. SIGTERM is the signal
/// container orchestrators (Docker, Kubernetes) send on shutdown;
/// SIGINT is what Ctrl-C sends during local development. Both should
/// trigger a graceful shutdown of the HTTP server.
async fn wait_shutdown() {
    let ctrl_c = async {
        let _ = signal::ctrl_c().await;
    };
    #[cfg(unix)]
    let term = async {
        let mut sig = signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("install sigterm handler");
        sig.recv().await;
    };
    #[cfg(not(unix))]
    let term = std::future::pending::<()>();
    tokio::select! {
        () = ctrl_c => {}
        () = term => {}
    }
}