use std::sync::atomic::{AtomicUsize, Ordering};
use futures::future::BoxFuture;
use crate::data::{AccessIntent, DataError, DataSource};
pub enum DbDriver {
#[cfg(feature = "db-sqlx")]
Sqlx(sqlx::AnyPool),
#[cfg(feature = "db-seaorm")]
SeaOrm(sea_orm::DatabaseConnection),
#[cfg(feature = "db-diesel")]
Diesel(crate::data::drivers::diesel::DieselBlockingPool),
}
pub enum OwnedDbConn {
#[cfg(feature = "db-sqlx")]
Sqlx(sqlx::pool::PoolConnection<sqlx::Any>),
#[cfg(feature = "db-seaorm")]
SeaOrm(sea_orm::DatabaseConnection),
#[cfg(feature = "db-diesel")]
Diesel(crate::data::drivers::diesel::DieselBlockingPool),
}
pub struct ArclyDbPool {
name: &'static str,
primary: DbDriver,
replicas: Vec<DbDriver>,
rr: AtomicUsize,
}
impl ArclyDbPool {
pub fn new(name: &'static str, primary: DbDriver) -> Self {
Self {
name,
primary,
replicas: Vec::new(),
rr: AtomicUsize::new(0),
}
}
pub fn with_replica(mut self, replica: DbDriver) -> Self {
self.replicas.push(replica);
self
}
pub(crate) fn pick(&self, intent: AccessIntent) -> (&DbDriver, bool) {
match intent {
AccessIntent::Write => (&self.primary, false),
AccessIntent::Read if self.replicas.is_empty() => (&self.primary, false),
AccessIntent::Read => {
let i = self.rr.fetch_add(1, Ordering::Relaxed);
(&self.replicas[i % self.replicas.len()], true)
}
}
}
pub(crate) fn primary(&self) -> &DbDriver {
&self.primary
}
#[allow(unused_variables, unreachable_code)]
async fn acquire_driver(&self, driver: &DbDriver) -> Result<OwnedDbConn, DataError> {
match driver {
#[cfg(feature = "db-sqlx")]
DbDriver::Sqlx(pool) => Ok(OwnedDbConn::Sqlx(
pool.acquire()
.await
.map_err(|e| DataError::connection(e.to_string()))?,
)),
#[cfg(feature = "db-seaorm")]
DbDriver::SeaOrm(conn) => Ok(OwnedDbConn::SeaOrm(conn.clone())),
#[cfg(feature = "db-diesel")]
DbDriver::Diesel(pool) => Ok(OwnedDbConn::Diesel(pool.clone())),
#[allow(unreachable_patterns)]
_ => Err(DataError::config(
"no database driver feature enabled (db-sqlx / db-seaorm / db-diesel)",
)),
}
}
}
impl DataSource for ArclyDbPool {
type Conn = OwnedDbConn;
fn acquire(&self, intent: AccessIntent) -> BoxFuture<'_, Result<Self::Conn, DataError>> {
Box::pin(async move {
let started = std::time::Instant::now();
let (driver, is_replica) = self.pick(intent);
let result = match self.acquire_driver(driver).await {
Ok(conn) => Ok(conn),
Err(replica_err) if is_replica => {
metrics::counter!("db_replica_fallback_total", "pool" => self.name)
.increment(1);
tracing::warn!(
pool = self.name,
error = %replica_err,
"replica acquire failed — falling back to primary"
);
self.acquire_driver(&self.primary).await
}
Err(e) => Err(e),
};
metrics::histogram!("db_acquire_seconds", "pool" => self.name)
.record(started.elapsed().as_secs_f64());
if result.is_err() {
metrics::counter!("db_acquire_errors_total", "pool" => self.name).increment(1);
}
result
})
}
fn name(&self) -> &'static str {
self.name
}
}
impl ArclyDbPool {
#[allow(unreachable_code)]
pub async fn ping(&self) -> Result<(), DataError> {
match self.primary() {
#[cfg(feature = "db-sqlx")]
DbDriver::Sqlx(pool) => {
sqlx::query_scalar::<_, i64>("SELECT 1")
.fetch_one(pool)
.await
.map_err(|e| DataError::connection(e.to_string()))?;
Ok(())
}
#[cfg(feature = "db-seaorm")]
DbDriver::SeaOrm(conn) => conn
.ping()
.await
.map_err(|e| DataError::connection(e.to_string())),
#[cfg(feature = "db-diesel")]
DbDriver::Diesel(pool) => pool.ping().await,
#[allow(unreachable_patterns)]
_ => Err(DataError::config("no database driver feature enabled")),
}
}
}
pub struct DbHealthCheck {
registry: &'static crate::data::DataSourceRegistry<ArclyDbPool>,
}
impl DbHealthCheck {
pub fn new(registry: &'static crate::data::DataSourceRegistry<ArclyDbPool>) -> Self {
Self { registry }
}
}
impl crate::observability::health::HealthCheck for DbHealthCheck {
fn check(&self) -> BoxFuture<'_, crate::observability::health::HealthStatus> {
use crate::observability::health::HealthStatus;
Box::pin(async move {
for (name, pool) in self.registry.iter() {
if let Err(e) = pool.ping().await {
let label = if name.is_empty() { "default" } else { name };
return HealthStatus::Unhealthy(format!("pool `{label}`: {e}"));
}
}
HealthStatus::Healthy
})
}
}
#[cfg(all(test, feature = "db-sqlx"))]
mod tests {
use super::*;
async fn memory_pool() -> DbDriver {
sqlx::any::install_default_drivers();
DbDriver::Sqlx(
sqlx::any::AnyPoolOptions::new()
.max_connections(2)
.connect("sqlite::memory:")
.await
.expect("in-memory sqlite"),
)
}
fn dead_pool() -> DbDriver {
sqlx::any::install_default_drivers();
DbDriver::Sqlx(
sqlx::any::AnyPoolOptions::new()
.max_connections(1)
.acquire_timeout(std::time::Duration::from_millis(200))
.connect_lazy("sqlite:///nonexistent-dir/arcly-itest.db?mode=ro")
.expect("lazy pool builds without connecting"),
)
}
#[tokio::test]
async fn dead_replica_falls_back_to_primary_for_reads() {
let pool = ArclyDbPool::new("failover-test", memory_pool().await).with_replica(dead_pool());
for _ in 0..3 {
let conn = pool.acquire(AccessIntent::Read).await;
assert!(
conn.is_ok(),
"read must fall back to primary: {:?}",
conn.err().map(|e| e.to_string())
);
}
assert!(pool.acquire(AccessIntent::Write).await.is_ok());
}
#[tokio::test]
async fn ping_succeeds_on_live_primary_and_fails_on_dead() {
let live = ArclyDbPool::new("live", memory_pool().await);
assert!(live.ping().await.is_ok());
let dead = ArclyDbPool::new("dead", dead_pool());
assert!(dead.ping().await.is_err());
}
}