rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Worker entry point.
//!
//! Responsibilities (kept small so the orchestration in [`runtime`] and the
//! per-job logic in [`executor`] can stay library-grade and reusable):
//!
//! 1. Initialise structured tracing (JSON when `RUST_LOG_FORMAT=json`,
//!    otherwise human-readable).
//! 2. Install a Prometheus recorder + HTTP listener so operators can scrape
//!    `worker_jobs_completed_total{kind,outcome}` and similar metrics
//!    independently of the API binary.
//! 3. Connect to Postgres, run embedded migrations (idempotent), and sweep
//!    any rows left in `running` by a previous worker that crashed before
//!    finishing its current job.
//! 4. Wire SIGINT / SIGTERM to a [`CancellationToken`] so the runtime can
//!    shut down gracefully without losing in-flight work.
//! 5. Hand control to [`WorkerRuntime::run`], which owns the worker pool
//!    for the rest of the process's lifetime.
//!
//! [`runtime`]: rust_job_queue_api_worker_system::worker::runtime
//! [`executor`]: rust_job_queue_api_worker_system::worker::executor

use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use metrics_exporter_prometheus::PrometheusBuilder;
use rust_job_queue_api_worker_system::{
    connect, migrate,
    worker::{recover_stale_at_startup, SimulatedExecutor, WorkerRuntime},
    PoolConfig,
};
use tokio::signal;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    init_tracing();
    init_metrics()?;

    let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?;
    let concurrency: usize = std::env::var("WORKER_CONCURRENCY")
        .ok()
        .and_then(|s| s.parse().ok())
        .filter(|n: &usize| *n > 0)
        .unwrap_or(4);

    // Two extra connections beyond `concurrency` is the right shape: each
    // worker holds one for its own dequeue/transition cycle, and the two
    // spare slots absorb the recovery sweep and the cooperative-cancel
    // status checks the executor performs between sub-steps.
    let max_conns = u32::try_from(concurrency)
        .unwrap_or(u32::MAX)
        .saturating_add(2);
    let pool = connect(&PoolConfig::from_url(database_url).with_max_connections(max_conns)).await?;
    migrate(&pool).await?;
    let recovered = recover_stale_at_startup(&pool, Duration::from_secs(300)).await?;
    info!(recovered, concurrency, "worker startup complete");

    let cancel = CancellationToken::new();
    let shutdown_token = cancel.clone();
    tokio::spawn(async move {
        wait_shutdown().await;
        info!("shutdown signal received");
        shutdown_token.cancel();
    });

    let runtime =
        WorkerRuntime::new(pool, Arc::new(SimulatedExecutor)).with_concurrency(concurrency);
    runtime.run(cancel).await;
    info!("worker exiting");
    Ok(())
}

/// Install the global Prometheus recorder and start its HTTP listener.
///
/// We bind on `WORKER_METRICS_BIND_ADDR` (default `0.0.0.0:9091`) — a
/// distinct port from the API's `/metrics` so a single Prometheus scrape
/// config can target each process independently.
fn init_metrics() -> anyhow::Result<()> {
    let bind: SocketAddr = std::env::var("WORKER_METRICS_BIND_ADDR")
        .unwrap_or_else(|_| "0.0.0.0:9091".to_string())
        .parse()
        .context("WORKER_METRICS_BIND_ADDR must be a socket address (host:port)")?;
    PrometheusBuilder::new()
        .with_http_listener(bind)
        .install()
        .context("install prometheus exporter")?;
    info!(%bind, "metrics listener installed");
    Ok(())
}

fn init_tracing() {
    let filter =
        EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info,sqlx=warn"));
    let json = std::env::var("RUST_LOG_FORMAT").as_deref().unwrap_or("") == "json";
    let builder = tracing_subscriber::fmt().with_env_filter(filter);
    if json {
        builder.json().init();
    } else {
        builder.init();
    }
}

/// Resolves on the first of SIGINT or SIGTERM. SIGTERM is what container
/// orchestrators (Docker, Kubernetes) send on shutdown; SIGINT is what
/// Ctrl-C sends during local development.
async fn wait_shutdown() {
    let ctrl_c = async {
        let _ = signal::ctrl_c().await;
    };
    #[cfg(unix)]
    let term = async {
        let mut sig = signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("install sigterm handler");
        sig.recv().await;
    };
    #[cfg(not(unix))]
    let term = std::future::pending::<()>();
    tokio::select! {
        () = ctrl_c => {}
        () = term => {}
    }
}