arcly-http 0.1.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Datasource contracts: tenant-scoped pools + read/write splitting.
//!
//! The framework defines the *shape* of data access; the app implements it
//! with SQLx, SeaORM, or anything else — the same boundary rule that keeps
//! OAuth providers and session stores app-side. `core/` is untouched: the
//! registry is an ordinary DI singleton.
//!
//! ## Hot-path guarantees
//!
//! - [`DataSourceRegistry`] is **frozen at boot**: tenant→pool lookup is an
//!   immutable `HashMap` read. No locks.
//! - Replica selection inside a [`DataSource`] implementation should be a
//!   single `AtomicUsize` round-robin — never a locked list.
//! - [`ReadAfterWritePin`] is one `AtomicBool` per request, used to route
//!   post-write reads to the primary (replica-lag protection).
//!
//! ## Read-after-write rule
//!
//! Within one request, after the first `Write` acquisition every subsequent
//! `Read` must go to the primary — a replica may not have replayed the write
//! yet. Handlers opt in by creating a [`ReadAfterWritePin`] and passing it to
//! [`DataSourceRegistry::acquire`]:
//!
//! ```ignore
//! let pin = ReadAfterWritePin::new();
//! let ds  = registry.for_tenant(ctx.tenant());
//! let w   = registry.acquire(ds, AccessIntent::Write, &pin).await?; // pins
//! let r   = registry.acquire(ds, AccessIntent::Read,  &pin).await?; // → primary
//! ```

pub mod db;
pub mod drivers;
/// Migration runner uses the SQLx query facade; SeaORM/Diesel ledger support
/// arrives with their facade methods.
#[cfg(feature = "db-sqlx")]
pub mod migrate;
pub mod outbox;
pub mod tx;

use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};

use futures::future::BoxFuture;

use crate::web::tenant::TenantConfig;

// ─── Intent & errors ──────────────────────────────────────────────────────────

/// Whether the caller intends to read or mutate.
///
/// `Write` always routes to the primary. `Read` may be served by a replica —
/// unless a [`ReadAfterWritePin`] has been tripped for this request.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum AccessIntent {
    Read,
    Write,
}

#[derive(Debug)]
pub struct DataError(pub String);

impl std::fmt::Display for DataError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "datasource error: {}", self.0)
    }
}
impl std::error::Error for DataError {}

// ─── DataSource trait ─────────────────────────────────────────────────────────

/// One logical database: a primary plus optional read replicas.
///
/// Implementations own pooling (SQLx `PgPool`s etc.) and replica selection.
/// `acquire(Write)` MUST return a primary connection; `acquire(Read)` MAY
/// return a replica.
pub trait DataSource: Send + Sync + 'static {
    type Conn: Send;

    fn acquire(&self, intent: AccessIntent) -> BoxFuture<'_, Result<Self::Conn, DataError>>;

    /// Logical name (matches `TenantConfig::datasource`). For logs/metrics.
    fn name(&self) -> &'static str;
}

// ─── Read-after-write pin ─────────────────────────────────────────────────────

/// Per-request replica-lag protection: once any `Write` is acquired through
/// this pin, all subsequent `Read`s are upgraded to the primary.
///
/// One `AtomicBool` — create it per request (cheap, no allocation beyond the
/// stack), never share across requests.
#[derive(Default)]
pub struct ReadAfterWritePin {
    wrote: AtomicBool,
}

impl ReadAfterWritePin {
    pub fn new() -> Self {
        Self::default()
    }

    /// Effective intent after applying the pin, recording writes as they pass.
    pub fn apply(&self, intent: AccessIntent) -> AccessIntent {
        match intent {
            AccessIntent::Write => {
                self.wrote.store(true, Ordering::Relaxed);
                AccessIntent::Write
            }
            AccessIntent::Read if self.wrote.load(Ordering::Relaxed) => AccessIntent::Write,
            AccessIntent::Read => AccessIntent::Read,
        }
    }
}

// ─── Registry ─────────────────────────────────────────────────────────────────

/// Frozen tenant→datasource map. Built once at boot, provided via DI,
/// read lock-free on every request.
pub struct DataSourceRegistry<D: DataSource> {
    default: D,
    by_name: HashMap<&'static str, D>,
}

impl<D: DataSource> DataSourceRegistry<D> {
    pub fn new(default: D) -> Self {
        Self {
            default,
            by_name: HashMap::new(),
        }
    }

    /// Register a named datasource (boot-time only — consumes `self`).
    pub fn with(mut self, name: &'static str, ds: D) -> Self {
        self.by_name.insert(name, ds);
        self
    }

    /// The datasource for this request's tenant (joins the tenant layer by
    /// `TenantConfig::datasource` name). Unknown/absent tenant → default.
    pub fn for_tenant(&self, tenant: Option<&TenantConfig>) -> &D {
        tenant
            .and_then(|t| self.by_name.get(t.datasource.as_str()))
            .unwrap_or(&self.default)
    }

    /// Iterate all datasources (default first as "", then named in
    /// deterministic order) — used by the migration runner.
    pub fn iter(&self) -> impl Iterator<Item = (&'static str, &D)> {
        let mut named: Vec<_> = self.by_name.iter().map(|(k, v)| (*k, v)).collect();
        named.sort_by_key(|(k, _)| *k);
        std::iter::once(("", &self.default)).chain(named)
    }

    /// Acquire a connection with the read-after-write pin applied.
    pub async fn acquire(
        &self,
        ds: &D,
        intent: AccessIntent,
        pin: &ReadAfterWritePin,
    ) -> Result<D::Conn, DataError> {
        ds.acquire(pin.apply(intent)).await
    }
}