Skip to main content

arcly_http/data/
db.rs

1//! Unified database facade — one handle, three ecosystems.
2//!
3//! `ArclyDbPool` is the single type user code injects regardless of which
4//! driver is compiled in (`db-sqlx`, `db-seaorm`, `db-diesel`). It implements
5//! [`DataSource`], so it slots straight into the existing
6//! `DataSourceRegistry` — tenant routing, read/write splitting, and
7//! `ReadAfterWritePin` all apply unchanged, because primary/replica selection
8//! happens here at the facade, before any driver is touched.
9//!
10//! ## Zero-lock guarantees
11//!
12//! - Replica selection: one `AtomicUsize` round-robin.
13//! - SQLx / SeaORM pools are internally lock-free async acquires.
14//! - Diesel (sync core) never runs on a worker thread: both `pool.get()` and
15//!   the query closure execute inside `spawn_blocking`
16//!   (see `data::drivers::diesel`).
17//!
18//! ## Feature gating
19//!
20//! This module always compiles. Driver variants exist only when their Cargo
21//! feature is enabled; with no `db-*` feature, the enums are uninhabited and
22//! every operation reports a configuration error at the call site.
23
24use std::sync::atomic::{AtomicUsize, Ordering};
25
26use futures::future::BoxFuture;
27
28use crate::data::{AccessIntent, DataError, DataSource};
29
30// ─── Driver wrapper ───────────────────────────────────────────────────────────
31
32/// One concrete connection pool. Variants are feature-gated; constructors
33/// live in `data::drivers::*`.
34pub enum DbDriver {
35    #[cfg(feature = "db-sqlx")]
36    Sqlx(sqlx::AnyPool),
37    #[cfg(feature = "db-seaorm")]
38    SeaOrm(sea_orm::DatabaseConnection),
39    #[cfg(feature = "db-diesel")]
40    Diesel(crate::data::drivers::diesel::DieselBlockingPool),
41}
42
43/// An acquired handle, decoupled from the pool's lifetime so it can be held
44/// across `.await` points and moved into transactions.
45pub enum OwnedDbConn {
46    #[cfg(feature = "db-sqlx")]
47    Sqlx(sqlx::pool::PoolConnection<sqlx::Any>),
48    #[cfg(feature = "db-seaorm")]
49    SeaOrm(sea_orm::DatabaseConnection),
50    #[cfg(feature = "db-diesel")]
51    Diesel(crate::data::drivers::diesel::DieselBlockingPool),
52}
53
54// ─── Facade pool ──────────────────────────────────────────────────────────────
55
56/// Primary + replicas for one logical database. Build at boot (plugin
57/// `on_init`), register into `DataSourceRegistry<ArclyDbPool>`, freeze.
58pub struct ArclyDbPool {
59    name: &'static str,
60    primary: DbDriver,
61    replicas: Vec<DbDriver>,
62    rr: AtomicUsize,
63}
64
65impl ArclyDbPool {
66    pub fn new(name: &'static str, primary: DbDriver) -> Self {
67        Self {
68            name,
69            primary,
70            replicas: Vec::new(),
71            rr: AtomicUsize::new(0),
72        }
73    }
74
75    /// Add a read replica (boot-time only — consumes `self`).
76    pub fn with_replica(mut self, replica: DbDriver) -> Self {
77        self.replicas.push(replica);
78        self
79    }
80
81    /// Writes → primary; reads → replica round-robin (primary when none).
82    /// `true` in the second tuple slot when a replica was chosen — the
83    /// acquire path uses it to fall back to the primary on replica failure.
84    pub(crate) fn pick(&self, intent: AccessIntent) -> (&DbDriver, bool) {
85        match intent {
86            AccessIntent::Write => (&self.primary, false),
87            AccessIntent::Read if self.replicas.is_empty() => (&self.primary, false),
88            AccessIntent::Read => {
89                let i = self.rr.fetch_add(1, Ordering::Relaxed);
90                (&self.replicas[i % self.replicas.len()], true)
91            }
92        }
93    }
94
95    /// The primary driver — transactions always run here.
96    pub(crate) fn primary(&self) -> &DbDriver {
97        &self.primary
98    }
99
100    /// Acquire from one concrete driver.
101    #[allow(unused_variables, unreachable_code)]
102    async fn acquire_driver(&self, driver: &DbDriver) -> Result<OwnedDbConn, DataError> {
103        match driver {
104            #[cfg(feature = "db-sqlx")]
105            DbDriver::Sqlx(pool) => Ok(OwnedDbConn::Sqlx(
106                pool.acquire()
107                    .await
108                    .map_err(|e| DataError::connection(e.to_string()))?,
109            )),
110            #[cfg(feature = "db-seaorm")]
111            DbDriver::SeaOrm(conn) => Ok(OwnedDbConn::SeaOrm(conn.clone())),
112            #[cfg(feature = "db-diesel")]
113            DbDriver::Diesel(pool) => Ok(OwnedDbConn::Diesel(pool.clone())),
114            #[allow(unreachable_patterns)]
115            _ => Err(DataError::config(
116                "no database driver feature enabled (db-sqlx / db-seaorm / db-diesel)",
117            )),
118        }
119    }
120}
121
122impl DataSource for ArclyDbPool {
123    type Conn = OwnedDbConn;
124
125    /// Reads that land on a replica **fail over to the primary** when the
126    /// replica acquire fails — one dead replica must degrade read capacity,
127    /// not turn 1/N of reads into errors until a restart. Acquire outcomes
128    /// are labelled per pool in Prometheus.
129    fn acquire(&self, intent: AccessIntent) -> BoxFuture<'_, Result<Self::Conn, DataError>> {
130        Box::pin(async move {
131            let started = std::time::Instant::now();
132            let (driver, is_replica) = self.pick(intent);
133
134            let result = match self.acquire_driver(driver).await {
135                Ok(conn) => Ok(conn),
136                Err(replica_err) if is_replica => {
137                    metrics::counter!("db_replica_fallback_total", "pool" => self.name)
138                        .increment(1);
139                    tracing::warn!(
140                        pool = self.name,
141                        error = %replica_err,
142                        "replica acquire failed — falling back to primary"
143                    );
144                    self.acquire_driver(&self.primary).await
145                }
146                Err(e) => Err(e),
147            };
148
149            metrics::histogram!("db_acquire_seconds", "pool" => self.name)
150                .record(started.elapsed().as_secs_f64());
151            if result.is_err() {
152                metrics::counter!("db_acquire_errors_total", "pool" => self.name).increment(1);
153            }
154            result
155        })
156    }
157
158    fn name(&self) -> &'static str {
159        self.name
160    }
161}
162
163// ─── Health integration ───────────────────────────────────────────────────────
164
165impl ArclyDbPool {
166    /// Liveness ping against the **primary** driver — `SELECT 1` for SQLx,
167    /// the native `ping()` for SeaORM, a pooled checkout for Diesel.
168    /// Cheap enough for probe handlers (which additionally bound every
169    /// check with their own per-check timeout).
170    #[allow(unreachable_code)]
171    pub async fn ping(&self) -> Result<(), DataError> {
172        match self.primary() {
173            #[cfg(feature = "db-sqlx")]
174            DbDriver::Sqlx(pool) => {
175                sqlx::query_scalar::<_, i64>("SELECT 1")
176                    .fetch_one(pool)
177                    .await
178                    .map_err(|e| DataError::connection(e.to_string()))?;
179                Ok(())
180            }
181            #[cfg(feature = "db-seaorm")]
182            DbDriver::SeaOrm(conn) => conn
183                .ping()
184                .await
185                .map_err(|e| DataError::connection(e.to_string())),
186            #[cfg(feature = "db-diesel")]
187            DbDriver::Diesel(pool) => pool.ping().await,
188            #[allow(unreachable_patterns)]
189            _ => Err(DataError::config("no database driver feature enabled")),
190        }
191    }
192}
193
194/// Readiness probe over every pool in a [`DataSourceRegistry`](crate::data::DataSourceRegistry): pings each
195/// primary and reports the first failure as `Unhealthy` naming the pool.
196///
197/// Register from `on_start`, where the frozen container hands out `&'static`
198/// references:
199///
200/// ```ignore
201/// let registry = container.get::<DataSourceRegistry<ArclyDbPool>>();
202/// health::global().register("database", DbHealthCheck::new(registry));
203/// ```
204pub struct DbHealthCheck {
205    registry: &'static crate::data::DataSourceRegistry<ArclyDbPool>,
206}
207
208impl DbHealthCheck {
209    pub fn new(registry: &'static crate::data::DataSourceRegistry<ArclyDbPool>) -> Self {
210        Self { registry }
211    }
212}
213
214impl crate::observability::health::HealthCheck for DbHealthCheck {
215    fn check(&self) -> BoxFuture<'_, crate::observability::health::HealthStatus> {
216        use crate::observability::health::HealthStatus;
217        Box::pin(async move {
218            for (name, pool) in self.registry.iter() {
219                if let Err(e) = pool.ping().await {
220                    let label = if name.is_empty() { "default" } else { name };
221                    return HealthStatus::Unhealthy(format!("pool `{label}`: {e}"));
222                }
223            }
224            HealthStatus::Healthy
225        })
226    }
227}
228
229#[cfg(all(test, feature = "db-sqlx"))]
230mod tests {
231    use super::*;
232
233    async fn memory_pool() -> DbDriver {
234        sqlx::any::install_default_drivers();
235        DbDriver::Sqlx(
236            sqlx::any::AnyPoolOptions::new()
237                .max_connections(2)
238                .connect("sqlite::memory:")
239                .await
240                .expect("in-memory sqlite"),
241        )
242    }
243
244    /// A pool whose acquires always fail: lazily connected to a path that
245    /// cannot exist (read-only mode, missing file).
246    fn dead_pool() -> DbDriver {
247        sqlx::any::install_default_drivers();
248        DbDriver::Sqlx(
249            sqlx::any::AnyPoolOptions::new()
250                .max_connections(1)
251                .acquire_timeout(std::time::Duration::from_millis(200))
252                .connect_lazy("sqlite:///nonexistent-dir/arcly-itest.db?mode=ro")
253                .expect("lazy pool builds without connecting"),
254        )
255    }
256
257    #[tokio::test]
258    async fn dead_replica_falls_back_to_primary_for_reads() {
259        let pool = ArclyDbPool::new("failover-test", memory_pool().await).with_replica(dead_pool());
260
261        // Round-robin sends this read to the (dead) replica; the facade must
262        // recover via the primary instead of surfacing an error.
263        for _ in 0..3 {
264            let conn = pool.acquire(AccessIntent::Read).await;
265            assert!(
266                conn.is_ok(),
267                "read must fall back to primary: {:?}",
268                conn.err().map(|e| e.to_string())
269            );
270        }
271        // Writes are untouched by the failover path.
272        assert!(pool.acquire(AccessIntent::Write).await.is_ok());
273    }
274
275    #[tokio::test]
276    async fn ping_succeeds_on_live_primary_and_fails_on_dead() {
277        let live = ArclyDbPool::new("live", memory_pool().await);
278        assert!(live.ping().await.is_ok());
279
280        let dead = ArclyDbPool::new("dead", dead_pool());
281        assert!(dead.ping().await.is_err());
282    }
283}