Skip to main content

arcly_http/data/
db.rs

1//! Unified database facade — one handle, three ecosystems.
2//!
3//! `ArclyDbPool` is the single type user code injects regardless of which
4//! driver is compiled in (`db-sqlx`, `db-seaorm`, `db-diesel`). It implements
5//! [`DataSource`], so it slots straight into the existing
6//! `DataSourceRegistry` — tenant routing, read/write splitting, and
7//! `ReadAfterWritePin` all apply unchanged, because primary/replica selection
8//! happens here at the facade, before any driver is touched.
9//!
10//! ## Zero-lock guarantees
11//!
12//! - Replica selection: one `AtomicUsize` round-robin.
13//! - SQLx / SeaORM pools are internally lock-free async acquires.
14//! - Diesel (sync core) never runs on a worker thread: both `pool.get()` and
15//!   the query closure execute inside `spawn_blocking`
16//!   (see `data::drivers::diesel`).
17//!
18//! ## Feature gating
19//!
20//! This module always compiles. Driver variants exist only when their Cargo
21//! feature is enabled; with no `db-*` feature, the enums are uninhabited and
22//! every operation reports a configuration error at the call site.
23
24use std::sync::atomic::{AtomicUsize, Ordering};
25
26use futures::future::BoxFuture;
27
28use crate::data::{AccessIntent, DataError, DataSource};
29
30// ─── Driver wrapper ───────────────────────────────────────────────────────────
31
32/// One concrete connection pool. Variants are feature-gated; constructors
33/// live in `data::drivers::*`.
34pub enum DbDriver {
35    #[cfg(feature = "db-sqlx")]
36    Sqlx(sqlx::AnyPool),
37    #[cfg(feature = "db-seaorm")]
38    SeaOrm(sea_orm::DatabaseConnection),
39    #[cfg(feature = "db-diesel")]
40    Diesel(crate::data::drivers::diesel::DieselBlockingPool),
41}
42
43/// An acquired handle, decoupled from the pool's lifetime so it can be held
44/// across `.await` points and moved into transactions.
45pub enum OwnedDbConn {
46    #[cfg(feature = "db-sqlx")]
47    Sqlx(sqlx::pool::PoolConnection<sqlx::Any>),
48    #[cfg(feature = "db-seaorm")]
49    SeaOrm(sea_orm::DatabaseConnection),
50    #[cfg(feature = "db-diesel")]
51    Diesel(crate::data::drivers::diesel::DieselBlockingPool),
52}
53
54// ─── Facade pool ──────────────────────────────────────────────────────────────
55
56/// Primary + replicas for one logical database. Build at boot (plugin
57/// `on_init`), register into `DataSourceRegistry<ArclyDbPool>`, freeze.
58pub struct ArclyDbPool {
59    name: &'static str,
60    primary: DbDriver,
61    replicas: Vec<DbDriver>,
62    rr: AtomicUsize,
63}
64
65impl ArclyDbPool {
66    pub fn new(name: &'static str, primary: DbDriver) -> Self {
67        Self {
68            name,
69            primary,
70            replicas: Vec::new(),
71            rr: AtomicUsize::new(0),
72        }
73    }
74
75    /// Add a read replica (boot-time only — consumes `self`).
76    pub fn with_replica(mut self, replica: DbDriver) -> Self {
77        self.replicas.push(replica);
78        self
79    }
80
81    /// Writes → primary; reads → replica round-robin (primary when none).
82    pub(crate) fn pick(&self, intent: AccessIntent) -> &DbDriver {
83        match intent {
84            AccessIntent::Write => &self.primary,
85            AccessIntent::Read if self.replicas.is_empty() => &self.primary,
86            AccessIntent::Read => {
87                let i = self.rr.fetch_add(1, Ordering::Relaxed);
88                &self.replicas[i % self.replicas.len()]
89            }
90        }
91    }
92
93    /// The primary driver — transactions always run here.
94    pub(crate) fn primary(&self) -> &DbDriver {
95        &self.primary
96    }
97}
98
99impl DataSource for ArclyDbPool {
100    type Conn = OwnedDbConn;
101
102    #[allow(unused_variables, unreachable_code)]
103    fn acquire(&self, intent: AccessIntent) -> BoxFuture<'_, Result<Self::Conn, DataError>> {
104        Box::pin(async move {
105            let driver = self.pick(intent);
106            match driver {
107                #[cfg(feature = "db-sqlx")]
108                DbDriver::Sqlx(pool) => Ok(OwnedDbConn::Sqlx(
109                    pool.acquire().await.map_err(|e| DataError(e.to_string()))?,
110                )),
111                #[cfg(feature = "db-seaorm")]
112                DbDriver::SeaOrm(conn) => Ok(OwnedDbConn::SeaOrm(conn.clone())),
113                #[cfg(feature = "db-diesel")]
114                DbDriver::Diesel(pool) => Ok(OwnedDbConn::Diesel(pool.clone())),
115                #[allow(unreachable_patterns)]
116                _ => Err(DataError(
117                    "no database driver feature enabled (db-sqlx / db-seaorm / db-diesel)".into(),
118                )),
119            }
120        })
121    }
122
123    fn name(&self) -> &'static str {
124        self.name
125    }
126}