rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Postgres connection pool configuration and migration runner.
//!
//! Centralising the pool defaults here means every binary in the crate
//! gets the same connection-acquisition behaviour. Tunable per-call via
//! the builder methods, but the defaults are chosen to be sensible for
//! the typical web-API / worker shape:
//!
//! - 10 connections is enough for a single API node plus a small worker
//!   pool. The Postgres `max_connections` ceiling (default 100) leaves
//!   headroom for ~10 such pods.
//! - `min_connections = 0` means the pool grows lazily; the first
//!   request after idle pays the connection cost. Acceptable for a
//!   demo; production may want to set this >0 to absorb load bursts.
//! - 5-second acquire timeout surfaces "the DB is gone" quickly as an
//!   error rather than letting handlers hang.
//! - 5-minute idle timeout reclaims connections from a quiet pool so
//!   long-running deployments don't accumulate idle handles indefinitely.

use std::time::Duration;

use sqlx::raw_sql::raw_sql;
use sqlx_postgres::{PgConnectOptions, PgPool, PgPoolOptions};

/// Connection pool configuration. Construct via [`PoolConfig::from_url`]
/// and modify with the builder methods.
///
/// The fields are public so library consumers can override fields
/// directly without going through the builder when their use case
/// doesn't fit the chain pattern.
#[derive(Debug, Clone)]
pub struct PoolConfig {
    /// The Postgres connection string. Standard `postgres://` form.
    pub url: String,
    /// Maximum number of pool connections.
    pub max_connections: u32,
    /// Minimum number of pool connections to keep open even when idle.
    pub min_connections: u32,
    /// How long a `pool.acquire()` will wait for a free connection
    /// before returning an error.
    pub acquire_timeout: Duration,
    /// How long a connection may sit idle before being closed and
    /// reclaimed. `None` disables idle reaping.
    pub idle_timeout: Option<Duration>,
}

impl PoolConfig {
    /// Construct a config from a URL, using the standard defaults
    /// (max 10, min 0, 5 s acquire timeout, 5 min idle timeout).
    pub fn from_url(url: impl Into<String>) -> Self {
        Self {
            url: url.into(),
            max_connections: 10,
            min_connections: 0,
            acquire_timeout: Duration::from_secs(5),
            idle_timeout: Some(Duration::from_secs(300)),
        }
    }

    /// Override the maximum connection count. Used by the worker binary
    /// to size the pool to `concurrency + 2` so each worker task can
    /// always claim a connection.
    pub fn with_max_connections(mut self, n: u32) -> Self {
        self.max_connections = n;
        self
    }
}

/// Open a connection pool against the configured URL.
///
/// Establishing the pool actually opens a connection (lazy initialisation
/// is not used here; `connect_with` eagerly connects to surface
/// configuration errors at startup rather than at first query).
pub async fn connect(cfg: &PoolConfig) -> Result<PgPool, sqlx::Error> {
    let opts: PgConnectOptions = cfg.url.parse()?;
    PgPoolOptions::new()
        .max_connections(cfg.max_connections)
        .min_connections(cfg.min_connections)
        .acquire_timeout(cfg.acquire_timeout)
        .idle_timeout(cfg.idle_timeout)
        .connect_with(opts)
        .await
}

/// Run the embedded initial schema against the given pool.
///
/// The SQL is embedded at compile time so published crates and installed
/// binaries do not need a separate `migrations/` directory at runtime.
/// The migration itself is idempotent (`IF NOT EXISTS` plus duplicate-type
/// guards), so re-running it against an already-migrated database is safe.
pub async fn migrate(pool: &PgPool) -> Result<(), sqlx::Error> {
    raw_sql(include_str!("../migrations/0001_init.sql"))
        .execute(pool)
        .await?;
    Ok(())
}