1use std::sync::atomic::{AtomicUsize, Ordering};
25
26use futures::future::BoxFuture;
27
28use crate::data::{AccessIntent, DataError, DataSource};
29
30pub enum DbDriver {
35 #[cfg(feature = "db-sqlx")]
36 Sqlx(sqlx::AnyPool),
37 #[cfg(feature = "db-seaorm")]
38 SeaOrm(sea_orm::DatabaseConnection),
39 #[cfg(feature = "db-diesel")]
40 Diesel(crate::data::drivers::diesel::DieselBlockingPool),
41}
42
43pub enum OwnedDbConn {
46 #[cfg(feature = "db-sqlx")]
47 Sqlx(sqlx::pool::PoolConnection<sqlx::Any>),
48 #[cfg(feature = "db-seaorm")]
49 SeaOrm(sea_orm::DatabaseConnection),
50 #[cfg(feature = "db-diesel")]
51 Diesel(crate::data::drivers::diesel::DieselBlockingPool),
52}
53
54pub struct ArclyDbPool {
59 name: &'static str,
60 primary: DbDriver,
61 replicas: Vec<DbDriver>,
62 rr: AtomicUsize,
63}
64
65impl ArclyDbPool {
66 pub fn new(name: &'static str, primary: DbDriver) -> Self {
67 Self {
68 name,
69 primary,
70 replicas: Vec::new(),
71 rr: AtomicUsize::new(0),
72 }
73 }
74
75 pub fn with_replica(mut self, replica: DbDriver) -> Self {
77 self.replicas.push(replica);
78 self
79 }
80
81 pub(crate) fn pick(&self, intent: AccessIntent) -> (&DbDriver, bool) {
85 match intent {
86 AccessIntent::Write => (&self.primary, false),
87 AccessIntent::Read if self.replicas.is_empty() => (&self.primary, false),
88 AccessIntent::Read => {
89 let i = self.rr.fetch_add(1, Ordering::Relaxed);
90 (&self.replicas[i % self.replicas.len()], true)
91 }
92 }
93 }
94
95 pub(crate) fn primary(&self) -> &DbDriver {
97 &self.primary
98 }
99
100 #[allow(unused_variables, unreachable_code)]
102 async fn acquire_driver(&self, driver: &DbDriver) -> Result<OwnedDbConn, DataError> {
103 match driver {
104 #[cfg(feature = "db-sqlx")]
105 DbDriver::Sqlx(pool) => Ok(OwnedDbConn::Sqlx(
106 pool.acquire()
107 .await
108 .map_err(|e| DataError::connection(e.to_string()))?,
109 )),
110 #[cfg(feature = "db-seaorm")]
111 DbDriver::SeaOrm(conn) => Ok(OwnedDbConn::SeaOrm(conn.clone())),
112 #[cfg(feature = "db-diesel")]
113 DbDriver::Diesel(pool) => Ok(OwnedDbConn::Diesel(pool.clone())),
114 #[allow(unreachable_patterns)]
115 _ => Err(DataError::config(
116 "no database driver feature enabled (db-sqlx / db-seaorm / db-diesel)",
117 )),
118 }
119 }
120}
121
122impl DataSource for ArclyDbPool {
123 type Conn = OwnedDbConn;
124
125 fn acquire(&self, intent: AccessIntent) -> BoxFuture<'_, Result<Self::Conn, DataError>> {
130 Box::pin(async move {
131 let started = std::time::Instant::now();
132 let (driver, is_replica) = self.pick(intent);
133
134 let result = match self.acquire_driver(driver).await {
135 Ok(conn) => Ok(conn),
136 Err(replica_err) if is_replica => {
137 metrics::counter!("db_replica_fallback_total", "pool" => self.name)
138 .increment(1);
139 tracing::warn!(
140 pool = self.name,
141 error = %replica_err,
142 "replica acquire failed — falling back to primary"
143 );
144 self.acquire_driver(&self.primary).await
145 }
146 Err(e) => Err(e),
147 };
148
149 metrics::histogram!("db_acquire_seconds", "pool" => self.name)
150 .record(started.elapsed().as_secs_f64());
151 if result.is_err() {
152 metrics::counter!("db_acquire_errors_total", "pool" => self.name).increment(1);
153 }
154 result
155 })
156 }
157
158 fn name(&self) -> &'static str {
159 self.name
160 }
161}
162
163impl ArclyDbPool {
166 #[allow(unreachable_code)]
171 pub async fn ping(&self) -> Result<(), DataError> {
172 match self.primary() {
173 #[cfg(feature = "db-sqlx")]
174 DbDriver::Sqlx(pool) => {
175 sqlx::query_scalar::<_, i64>("SELECT 1")
176 .fetch_one(pool)
177 .await
178 .map_err(|e| DataError::connection(e.to_string()))?;
179 Ok(())
180 }
181 #[cfg(feature = "db-seaorm")]
182 DbDriver::SeaOrm(conn) => conn
183 .ping()
184 .await
185 .map_err(|e| DataError::connection(e.to_string())),
186 #[cfg(feature = "db-diesel")]
187 DbDriver::Diesel(pool) => pool.ping().await,
188 #[allow(unreachable_patterns)]
189 _ => Err(DataError::config("no database driver feature enabled")),
190 }
191 }
192}
193
194pub struct DbHealthCheck {
205 registry: &'static crate::data::DataSourceRegistry<ArclyDbPool>,
206}
207
208impl DbHealthCheck {
209 pub fn new(registry: &'static crate::data::DataSourceRegistry<ArclyDbPool>) -> Self {
210 Self { registry }
211 }
212}
213
214impl crate::observability::health::HealthCheck for DbHealthCheck {
215 fn check(&self) -> BoxFuture<'_, crate::observability::health::HealthStatus> {
216 use crate::observability::health::HealthStatus;
217 Box::pin(async move {
218 for (name, pool) in self.registry.iter() {
219 if let Err(e) = pool.ping().await {
220 let label = if name.is_empty() { "default" } else { name };
221 return HealthStatus::Unhealthy(format!("pool `{label}`: {e}"));
222 }
223 }
224 HealthStatus::Healthy
225 })
226 }
227}
228
229#[cfg(all(test, feature = "db-sqlx"))]
230mod tests {
231 use super::*;
232
233 async fn memory_pool() -> DbDriver {
234 sqlx::any::install_default_drivers();
235 DbDriver::Sqlx(
236 sqlx::any::AnyPoolOptions::new()
237 .max_connections(2)
238 .connect("sqlite::memory:")
239 .await
240 .expect("in-memory sqlite"),
241 )
242 }
243
244 fn dead_pool() -> DbDriver {
247 sqlx::any::install_default_drivers();
248 DbDriver::Sqlx(
249 sqlx::any::AnyPoolOptions::new()
250 .max_connections(1)
251 .acquire_timeout(std::time::Duration::from_millis(200))
252 .connect_lazy("sqlite:///nonexistent-dir/arcly-itest.db?mode=ro")
253 .expect("lazy pool builds without connecting"),
254 )
255 }
256
257 #[tokio::test]
258 async fn dead_replica_falls_back_to_primary_for_reads() {
259 let pool = ArclyDbPool::new("failover-test", memory_pool().await).with_replica(dead_pool());
260
261 for _ in 0..3 {
264 let conn = pool.acquire(AccessIntent::Read).await;
265 assert!(
266 conn.is_ok(),
267 "read must fall back to primary: {:?}",
268 conn.err().map(|e| e.to_string())
269 );
270 }
271 assert!(pool.acquire(AccessIntent::Write).await.is_ok());
273 }
274
275 #[tokio::test]
276 async fn ping_succeeds_on_live_primary_and_fails_on_dead() {
277 let live = ArclyDbPool::new("live", memory_pool().await);
278 assert!(live.ping().await.is_ok());
279
280 let dead = ArclyDbPool::new("dead", dead_pool());
281 assert!(dead.ping().await.is_err());
282 }
283}