Skip to main content

klauthed_data/db/
mod.rs

1//! Database connectors.
2//!
3//! The top-level functions (`connect`, `connect_verified`, `ping`, `close`)
4//! target relational databases via sqlx's driver-agnostic `AnyPool` and are
5//! compiled only when the `sql` (or a driver) feature is active.
6//!
7//! NoSQL and other backends live in sub-modules gated by their own features:
8//!
9//! * `mongo` — MongoDB client connector (`mongodb` feature)
10
11#[cfg(feature = "mongodb")]
12pub mod mongo;
13
14// ── Relational (sqlx AnyPool) ─────────────────────────────────────────────────
15
16#[cfg(feature = "sql")]
17use std::time::Duration;
18
19#[cfg(feature = "sql")]
20use klauthed_core::config::{DatabaseConfig, PoolConfig};
21#[cfg(feature = "sql")]
22use sqlx::AnyPool;
23#[cfg(feature = "sql")]
24use sqlx::any::AnyPoolOptions;
25
26#[cfg(feature = "sql")]
27use crate::error::DataError;
28
29/// Connect to a relational database described by `config`, returning a ready
30/// connection pool.
31///
32/// The concrete driver is chosen from the connection URL scheme, so the matching
33/// feature (`postgres` / `mysql` / `sqlite`) must be enabled or the connection
34/// will fail at runtime with an "unsupported scheme" error from sqlx.
35#[cfg(feature = "sql")]
36pub async fn connect(config: &DatabaseConfig) -> Result<AnyPool, DataError> {
37    if !config.system.is_relational() {
38        return Err(DataError::UnsupportedSystem(config.system));
39    }
40
41    // Registers whichever Any drivers were compiled in (idempotent).
42    sqlx::any::install_default_drivers();
43
44    let url = config.connection_url();
45    tracing::debug!(system = ?config.system, "connecting to relational database");
46
47    let pool = pool_options(&config.pool).connect(&url).await?;
48    Ok(pool)
49}
50
51/// Connect and immediately verify the database answers, so misconfiguration
52/// fails fast at startup rather than on the first query.
53#[cfg(feature = "sql")]
54pub async fn connect_verified(config: &DatabaseConfig) -> Result<AnyPool, DataError> {
55    let pool = connect(config).await?;
56    ping(&pool).await?;
57    Ok(pool)
58}
59
60/// Health-check an existing pool by issuing `SELECT 1`. Works across all
61/// supported relational backends.
62#[cfg(feature = "sql")]
63pub async fn ping(pool: &AnyPool) -> Result<(), DataError> {
64    sqlx::query("SELECT 1").execute(pool).await?;
65    Ok(())
66}
67
68/// Gracefully close a pool, waiting for in-flight connections to be released.
69/// Call this during shutdown so the database sees a clean disconnect.
70#[cfg(feature = "sql")]
71pub async fn close(pool: &AnyPool) {
72    pool.close().await;
73}
74
75/// Translate our [`PoolConfig`] into sqlx [`AnyPoolOptions`].
76#[cfg(feature = "sql")]
77fn pool_options(pool: &PoolConfig) -> AnyPoolOptions {
78    let mut options = AnyPoolOptions::new()
79        .max_connections(pool.max_connections)
80        .min_connections(pool.min_connections)
81        .acquire_timeout(Duration::from_secs(pool.acquire_timeout_secs));
82
83    if let Some(idle) = pool.idle_timeout_secs {
84        options = options.idle_timeout(Duration::from_secs(idle));
85    }
86    if let Some(lifetime) = pool.max_lifetime_secs {
87        options = options.max_lifetime(Duration::from_secs(lifetime));
88    }
89    options
90}
91
92#[cfg(all(test, feature = "sql"))]
93mod tests {
94    use super::*;
95    use klauthed_core::config::DbSystem;
96
97    #[tokio::test]
98    async fn rejects_non_relational_system() {
99        let config = DatabaseConfig { system: DbSystem::MongoDb, ..Default::default() };
100        let err = connect(&config).await.unwrap_err();
101        assert!(matches!(err, DataError::UnsupportedSystem(DbSystem::MongoDb)));
102    }
103
104    #[cfg(feature = "sqlite")]
105    #[tokio::test]
106    async fn connect_verify_ping_close_against_sqlite_memory() {
107        // A real end-to-end exercise of the round-out: connect, SELECT 1, close.
108        let config = DatabaseConfig {
109            system: DbSystem::Sqlite,
110            url: Some("sqlite::memory:".into()),
111            ..Default::default()
112        };
113
114        let pool = connect_verified(&config).await.expect("connect + ping");
115        ping(&pool).await.expect("ping again");
116        close(&pool).await;
117    }
118}