arcly-http 0.4.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Unified database facade — one handle, three ecosystems.
//!
//! `ArclyDbPool` is the single type user code injects regardless of which
//! driver is compiled in (`db-sqlx`, `db-seaorm`, `db-diesel`). It implements
//! [`DataSource`], so it slots straight into the existing
//! `DataSourceRegistry` — tenant routing, read/write splitting, and
//! `ReadAfterWritePin` all apply unchanged, because primary/replica selection
//! happens here at the facade, before any driver is touched.
//!
//! ## Zero-lock guarantees
//!
//! - Replica selection: one `AtomicUsize` round-robin.
//! - SQLx / SeaORM pools are internally lock-free async acquires.
//! - Diesel (sync core) never runs on a worker thread: both `pool.get()` and
//!   the query closure execute inside `spawn_blocking`
//!   (see `data::drivers::diesel`).
//!
//! ## Feature gating
//!
//! This module always compiles. Driver variants exist only when their Cargo
//! feature is enabled; with no `db-*` feature, the enums are uninhabited and
//! every operation reports a configuration error at the call site.

use std::sync::atomic::{AtomicUsize, Ordering};

use futures::future::BoxFuture;

use crate::data::{AccessIntent, DataError, DataSource};

// ─── Driver wrapper ───────────────────────────────────────────────────────────

/// One concrete connection pool. Variants are feature-gated; constructors
/// live in `data::drivers::*`.
pub enum DbDriver {
    #[cfg(feature = "db-sqlx")]
    Sqlx(sqlx::AnyPool),
    #[cfg(feature = "db-seaorm")]
    SeaOrm(sea_orm::DatabaseConnection),
    #[cfg(feature = "db-diesel")]
    Diesel(crate::data::drivers::diesel::DieselBlockingPool),
}

/// An acquired handle, decoupled from the pool's lifetime so it can be held
/// across `.await` points and moved into transactions.
pub enum OwnedDbConn {
    #[cfg(feature = "db-sqlx")]
    Sqlx(sqlx::pool::PoolConnection<sqlx::Any>),
    #[cfg(feature = "db-seaorm")]
    SeaOrm(sea_orm::DatabaseConnection),
    #[cfg(feature = "db-diesel")]
    Diesel(crate::data::drivers::diesel::DieselBlockingPool),
}

// ─── Facade pool ──────────────────────────────────────────────────────────────

/// Primary + replicas for one logical database. Build at boot (plugin
/// `on_init`), register into `DataSourceRegistry<ArclyDbPool>`, freeze.
pub struct ArclyDbPool {
    name: &'static str,
    primary: DbDriver,
    replicas: Vec<DbDriver>,
    rr: AtomicUsize,
}

impl ArclyDbPool {
    pub fn new(name: &'static str, primary: DbDriver) -> Self {
        Self {
            name,
            primary,
            replicas: Vec::new(),
            rr: AtomicUsize::new(0),
        }
    }

    /// Add a read replica (boot-time only — consumes `self`).
    pub fn with_replica(mut self, replica: DbDriver) -> Self {
        self.replicas.push(replica);
        self
    }

    /// Writes → primary; reads → replica round-robin (primary when none).
    /// `true` in the second tuple slot when a replica was chosen — the
    /// acquire path uses it to fall back to the primary on replica failure.
    pub(crate) fn pick(&self, intent: AccessIntent) -> (&DbDriver, bool) {
        match intent {
            AccessIntent::Write => (&self.primary, false),
            AccessIntent::Read if self.replicas.is_empty() => (&self.primary, false),
            AccessIntent::Read => {
                let i = self.rr.fetch_add(1, Ordering::Relaxed);
                (&self.replicas[i % self.replicas.len()], true)
            }
        }
    }

    /// The primary driver — transactions always run here.
    pub(crate) fn primary(&self) -> &DbDriver {
        &self.primary
    }

    /// Acquire from one concrete driver.
    #[allow(unused_variables, unreachable_code)]
    async fn acquire_driver(&self, driver: &DbDriver) -> Result<OwnedDbConn, DataError> {
        match driver {
            #[cfg(feature = "db-sqlx")]
            DbDriver::Sqlx(pool) => Ok(OwnedDbConn::Sqlx(
                pool.acquire()
                    .await
                    .map_err(|e| DataError::connection(e.to_string()))?,
            )),
            #[cfg(feature = "db-seaorm")]
            DbDriver::SeaOrm(conn) => Ok(OwnedDbConn::SeaOrm(conn.clone())),
            #[cfg(feature = "db-diesel")]
            DbDriver::Diesel(pool) => Ok(OwnedDbConn::Diesel(pool.clone())),
            #[allow(unreachable_patterns)]
            _ => Err(DataError::config(
                "no database driver feature enabled (db-sqlx / db-seaorm / db-diesel)",
            )),
        }
    }
}

impl DataSource for ArclyDbPool {
    type Conn = OwnedDbConn;

    /// Reads that land on a replica **fail over to the primary** when the
    /// replica acquire fails — one dead replica must degrade read capacity,
    /// not turn 1/N of reads into errors until a restart. Acquire outcomes
    /// are labelled per pool in Prometheus.
    fn acquire(&self, intent: AccessIntent) -> BoxFuture<'_, Result<Self::Conn, DataError>> {
        Box::pin(async move {
            let started = std::time::Instant::now();
            let (driver, is_replica) = self.pick(intent);

            let result = match self.acquire_driver(driver).await {
                Ok(conn) => Ok(conn),
                Err(replica_err) if is_replica => {
                    metrics::counter!("db_replica_fallback_total", "pool" => self.name)
                        .increment(1);
                    tracing::warn!(
                        pool = self.name,
                        error = %replica_err,
                        "replica acquire failed — falling back to primary"
                    );
                    self.acquire_driver(&self.primary).await
                }
                Err(e) => Err(e),
            };

            metrics::histogram!("db_acquire_seconds", "pool" => self.name)
                .record(started.elapsed().as_secs_f64());
            if result.is_err() {
                metrics::counter!("db_acquire_errors_total", "pool" => self.name).increment(1);
            }
            result
        })
    }

    fn name(&self) -> &'static str {
        self.name
    }
}

// ─── Health integration ───────────────────────────────────────────────────────

impl ArclyDbPool {
    /// Liveness ping against the **primary** driver — `SELECT 1` for SQLx,
    /// the native `ping()` for SeaORM, a pooled checkout for Diesel.
    /// Cheap enough for probe handlers (which additionally bound every
    /// check with their own per-check timeout).
    #[allow(unreachable_code)]
    pub async fn ping(&self) -> Result<(), DataError> {
        match self.primary() {
            #[cfg(feature = "db-sqlx")]
            DbDriver::Sqlx(pool) => {
                sqlx::query_scalar::<_, i64>("SELECT 1")
                    .fetch_one(pool)
                    .await
                    .map_err(|e| DataError::connection(e.to_string()))?;
                Ok(())
            }
            #[cfg(feature = "db-seaorm")]
            DbDriver::SeaOrm(conn) => conn
                .ping()
                .await
                .map_err(|e| DataError::connection(e.to_string())),
            #[cfg(feature = "db-diesel")]
            DbDriver::Diesel(pool) => pool.ping().await,
            #[allow(unreachable_patterns)]
            _ => Err(DataError::config("no database driver feature enabled")),
        }
    }
}

/// Readiness probe over every pool in a [`DataSourceRegistry`](crate::data::DataSourceRegistry): pings each
/// primary and reports the first failure as `Unhealthy` naming the pool.
///
/// Register from `on_start`, where the frozen container hands out `&'static`
/// references:
///
/// ```ignore
/// let registry = container.get::<DataSourceRegistry<ArclyDbPool>>();
/// health::global().register("database", DbHealthCheck::new(registry));
/// ```
pub struct DbHealthCheck {
    registry: &'static crate::data::DataSourceRegistry<ArclyDbPool>,
}

impl DbHealthCheck {
    pub fn new(registry: &'static crate::data::DataSourceRegistry<ArclyDbPool>) -> Self {
        Self { registry }
    }
}

impl crate::observability::health::HealthCheck for DbHealthCheck {
    fn check(&self) -> BoxFuture<'_, crate::observability::health::HealthStatus> {
        use crate::observability::health::HealthStatus;
        Box::pin(async move {
            for (name, pool) in self.registry.iter() {
                if let Err(e) = pool.ping().await {
                    let label = if name.is_empty() { "default" } else { name };
                    return HealthStatus::Unhealthy(format!("pool `{label}`: {e}"));
                }
            }
            HealthStatus::Healthy
        })
    }
}

#[cfg(all(test, feature = "db-sqlx"))]
mod tests {
    use super::*;

    async fn memory_pool() -> DbDriver {
        sqlx::any::install_default_drivers();
        DbDriver::Sqlx(
            sqlx::any::AnyPoolOptions::new()
                .max_connections(2)
                .connect("sqlite::memory:")
                .await
                .expect("in-memory sqlite"),
        )
    }

    /// A pool whose acquires always fail: lazily connected to a path that
    /// cannot exist (read-only mode, missing file).
    fn dead_pool() -> DbDriver {
        sqlx::any::install_default_drivers();
        DbDriver::Sqlx(
            sqlx::any::AnyPoolOptions::new()
                .max_connections(1)
                .acquire_timeout(std::time::Duration::from_millis(200))
                .connect_lazy("sqlite:///nonexistent-dir/arcly-itest.db?mode=ro")
                .expect("lazy pool builds without connecting"),
        )
    }

    #[tokio::test]
    async fn dead_replica_falls_back_to_primary_for_reads() {
        let pool = ArclyDbPool::new("failover-test", memory_pool().await).with_replica(dead_pool());

        // Round-robin sends this read to the (dead) replica; the facade must
        // recover via the primary instead of surfacing an error.
        for _ in 0..3 {
            let conn = pool.acquire(AccessIntent::Read).await;
            assert!(
                conn.is_ok(),
                "read must fall back to primary: {:?}",
                conn.err().map(|e| e.to_string())
            );
        }
        // Writes are untouched by the failover path.
        assert!(pool.acquire(AccessIntent::Write).await.is_ok());
    }

    #[tokio::test]
    async fn ping_succeeds_on_live_primary_and_fails_on_dead() {
        let live = ArclyDbPool::new("live", memory_pool().await);
        assert!(live.ping().await.is_ok());

        let dead = ArclyDbPool::new("dead", dead_pool());
        assert!(dead.ping().await.is_err());
    }
}