arcly-http 0.2.2

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Datasource contracts: tenant-scoped pools + read/write splitting.
//!
//! The framework defines the *shape* of data access; the app implements it
//! with SQLx, SeaORM, or anything else — the same boundary rule that keeps
//! OAuth providers and session stores app-side. `core/` is untouched: the
//! registry is an ordinary DI singleton.
//!
//! ## Hot-path guarantees
//!
//! - [`DataSourceRegistry`] is **frozen at boot**: tenant→pool lookup is an
//!   immutable `HashMap` read. No locks.
//! - Replica selection inside a [`DataSource`] implementation should be a
//!   single `AtomicUsize` round-robin — never a locked list.
//! - [`ReadAfterWritePin`] is one `AtomicBool` per request, used to route
//!   post-write reads to the primary (replica-lag protection).
//!
//! ## Read-after-write rule
//!
//! Within one request, after the first `Write` acquisition every subsequent
//! `Read` must go to the primary — a replica may not have replayed the write
//! yet. Handlers opt in by creating a [`ReadAfterWritePin`] and passing it to
//! [`DataSourceRegistry::acquire`]:
//!
//! ```ignore
//! let pin = ReadAfterWritePin::new();
//! let ds  = registry.for_tenant(ctx.tenant());
//! let w   = registry.acquire(ds, AccessIntent::Write, &pin).await?; // pins
//! let r   = registry.acquire(ds, AccessIntent::Read,  &pin).await?; // → primary
//! ```

pub mod db;
pub mod drivers;
/// Migration runner uses the SQLx query facade; SeaORM/Diesel ledger support
/// arrives with their facade methods.
#[cfg(feature = "db-sqlx")]
pub mod migrate;
pub mod outbox;
pub mod tx;

use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};

use futures::future::BoxFuture;

use crate::web::tenant::TenantConfig;

// ─── Intent & errors ──────────────────────────────────────────────────────────

/// Whether the caller intends to read or mutate.
///
/// `Write` always routes to the primary. `Read` may be served by a replica —
/// unless a [`ReadAfterWritePin`] has been tripped for this request.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum AccessIntent {
    Read,
    Write,
}

/// Classified datasource failure — callers can finally distinguish "retry
/// somewhere else" from "this will never work", which retry policies,
/// circuit breakers, and replica failover need.
#[derive(Debug)]
#[non_exhaustive]
pub struct DataError {
    pub kind: DataErrorKind,
    pub message: String,
}

/// Failure classes. `#[non_exhaustive]`: match with a `_` arm.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum DataErrorKind {
    /// Misconfiguration (missing driver feature, bad URL): permanent until
    /// a human intervenes.
    Config,
    /// Connection/pool failure: **retryable** — another replica or a later
    /// attempt may succeed.
    Connection,
    /// The statement itself failed (syntax, constraint, type): permanent.
    Query,
    /// The operation timed out: **retryable**.
    Timeout,
    /// Optimistic/duplicate conflict (unique violation, version clash):
    /// permanent for this payload, but semantically distinct from `Query`
    /// so idempotent upsert paths can branch on it.
    Conflict,
    /// Unclassified (driver returned something we don't recognise).
    Other,
}

impl DataError {
    pub fn new(kind: DataErrorKind, message: impl Into<String>) -> Self {
        Self {
            kind,
            message: message.into(),
        }
    }
    pub fn config(m: impl Into<String>) -> Self {
        Self::new(DataErrorKind::Config, m)
    }
    pub fn connection(m: impl Into<String>) -> Self {
        Self::new(DataErrorKind::Connection, m)
    }
    pub fn query(m: impl Into<String>) -> Self {
        Self::new(DataErrorKind::Query, m)
    }
    pub fn timeout(m: impl Into<String>) -> Self {
        Self::new(DataErrorKind::Timeout, m)
    }
    pub fn conflict(m: impl Into<String>) -> Self {
        Self::new(DataErrorKind::Conflict, m)
    }
    pub fn other(m: impl Into<String>) -> Self {
        Self::new(DataErrorKind::Other, m)
    }

    /// `true` when a retry (or replica failover) can plausibly succeed.
    pub fn is_retryable(&self) -> bool {
        matches!(
            self.kind,
            DataErrorKind::Connection | DataErrorKind::Timeout
        )
    }
}

impl std::fmt::Display for DataError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "datasource error ({:?}): {}", self.kind, self.message)
    }
}
impl std::error::Error for DataError {}

// ─── DataSource trait ─────────────────────────────────────────────────────────

/// One logical database: a primary plus optional read replicas.
///
/// Implementations own pooling (SQLx `PgPool`s etc.) and replica selection.
/// `acquire(Write)` MUST return a primary connection; `acquire(Read)` MAY
/// return a replica.
pub trait DataSource: Send + Sync + 'static {
    type Conn: Send;

    fn acquire(&self, intent: AccessIntent) -> BoxFuture<'_, Result<Self::Conn, DataError>>;

    /// Logical name (matches `TenantConfig::datasource`). For logs/metrics.
    fn name(&self) -> &'static str;
}

// ─── Read-after-write pin ─────────────────────────────────────────────────────

/// Per-request replica-lag protection: once any `Write` is acquired through
/// this pin, all subsequent `Read`s are upgraded to the primary.
///
/// One `AtomicBool` — create it per request (cheap, no allocation beyond the
/// stack), never share across requests.
#[derive(Default)]
pub struct ReadAfterWritePin {
    wrote: AtomicBool,
}

impl ReadAfterWritePin {
    pub fn new() -> Self {
        Self::default()
    }

    /// Effective intent after applying the pin, recording writes as they pass.
    pub fn apply(&self, intent: AccessIntent) -> AccessIntent {
        match intent {
            AccessIntent::Write => {
                self.wrote.store(true, Ordering::Relaxed);
                AccessIntent::Write
            }
            AccessIntent::Read if self.wrote.load(Ordering::Relaxed) => AccessIntent::Write,
            AccessIntent::Read => AccessIntent::Read,
        }
    }
}

// ─── Registry ─────────────────────────────────────────────────────────────────

/// Frozen tenant→datasource map. Built once at boot, provided via DI,
/// read lock-free on every request.
pub struct DataSourceRegistry<D: DataSource> {
    default: D,
    by_name: HashMap<&'static str, D>,
}

impl<D: DataSource> DataSourceRegistry<D> {
    pub fn new(default: D) -> Self {
        Self {
            default,
            by_name: HashMap::new(),
        }
    }

    /// Register a named datasource (boot-time only — consumes `self`).
    pub fn with(mut self, name: &'static str, ds: D) -> Self {
        self.by_name.insert(name, ds);
        self
    }

    /// The datasource for this request's tenant (joins the tenant layer by
    /// `TenantConfig::datasource` name). Unknown/absent tenant → default.
    pub fn for_tenant(&self, tenant: Option<&TenantConfig>) -> &D {
        tenant
            .and_then(|t| self.by_name.get(t.datasource.as_str()))
            .unwrap_or(&self.default)
    }

    /// Iterate all datasources (default first as "", then named in
    /// deterministic order) — used by the migration runner.
    pub fn iter(&self) -> impl Iterator<Item = (&'static str, &D)> {
        let mut named: Vec<_> = self.by_name.iter().map(|(k, v)| (*k, v)).collect();
        named.sort_by_key(|(k, _)| *k);
        std::iter::once(("", &self.default)).chain(named)
    }

    /// Acquire a connection with the read-after-write pin applied.
    pub async fn acquire(
        &self,
        ds: &D,
        intent: AccessIntent,
        pin: &ReadAfterWritePin,
    ) -> Result<D::Conn, DataError> {
        ds.acquire(pin.apply(intent)).await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::web::tenant::{TenantConfig, TenantId};

    #[test]
    fn pin_upgrades_reads_after_first_write() {
        let pin = ReadAfterWritePin::new();
        assert_eq!(pin.apply(AccessIntent::Read), AccessIntent::Read);
        assert_eq!(pin.apply(AccessIntent::Write), AccessIntent::Write);
        // Every read after a write goes to the primary.
        assert_eq!(pin.apply(AccessIntent::Read), AccessIntent::Write);
        assert_eq!(pin.apply(AccessIntent::Read), AccessIntent::Write);
    }

    struct FakeDs(&'static str);
    impl DataSource for FakeDs {
        type Conn = ();
        fn acquire(&self, _: AccessIntent) -> BoxFuture<'_, Result<(), DataError>> {
            Box::pin(async { Ok(()) })
        }
        fn name(&self) -> &'static str {
            self.0
        }
    }

    fn tenant(ds: &str) -> TenantConfig {
        TenantConfig {
            id: TenantId::new("t"),
            display_name: "T".into(),
            datasource: ds.into(),
        }
    }

    #[test]
    fn registry_routes_by_tenant_datasource_with_default_fallback() {
        let reg = DataSourceRegistry::new(FakeDs("default"))
            .with("acme", FakeDs("acme"))
            .with("globex", FakeDs("globex"));

        assert_eq!(reg.for_tenant(Some(&tenant("acme"))).name(), "acme");
        assert_eq!(reg.for_tenant(Some(&tenant("unknown"))).name(), "default");
        assert_eq!(reg.for_tenant(None).name(), "default");
    }

    #[test]
    fn data_error_taxonomy_classifies_retryability() {
        assert!(DataError::connection("pool down").is_retryable());
        assert!(DataError::timeout("slow").is_retryable());
        assert!(!DataError::query("syntax").is_retryable());
        assert!(!DataError::config("bad url").is_retryable());
        assert!(!DataError::conflict("dup key").is_retryable());
        let e = DataError::connection("x");
        assert_eq!(e.kind, DataErrorKind::Connection);
        assert!(e.to_string().contains("Connection"));
    }

    #[test]
    fn registry_iter_is_deterministic_default_first() {
        let reg = DataSourceRegistry::new(FakeDs("default"))
            .with("b", FakeDs("b"))
            .with("a", FakeDs("a"));
        let order: Vec<&str> = reg.iter().map(|(n, _)| n).collect();
        assert_eq!(order, vec!["", "a", "b"]);
    }
}