forge_runtime/db/
pool.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use sqlx::postgres::{PgPool, PgPoolOptions};
5
6use forge_core::config::DatabaseConfig;
7use forge_core::error::{ForgeError, Result};
8
9/// Database connection wrapper providing connection pooling.
10#[derive(Clone)]
11pub struct Database {
12    /// Primary connection pool.
13    primary: Arc<PgPool>,
14
15    /// Read replica pools (optional).
16    replicas: Vec<Arc<PgPool>>,
17
18    /// Configuration.
19    config: DatabaseConfig,
20
21    /// Counter for round-robin replica selection.
22    replica_counter: Arc<std::sync::atomic::AtomicUsize>,
23}
24
25impl Database {
26    /// Create a new database connection from configuration.
27    pub async fn from_config(config: &DatabaseConfig) -> Result<Self> {
28        let primary = Self::create_pool(&config.url, config.pool_size, config.pool_timeout_secs)
29            .await
30            .map_err(|e| ForgeError::Database(format!("Failed to connect to primary: {}", e)))?;
31
32        let mut replicas = Vec::new();
33        for replica_url in &config.replica_urls {
34            let pool =
35                Self::create_pool(replica_url, config.pool_size / 2, config.pool_timeout_secs)
36                    .await
37                    .map_err(|e| {
38                        ForgeError::Database(format!("Failed to connect to replica: {}", e))
39                    })?;
40            replicas.push(Arc::new(pool));
41        }
42
43        Ok(Self {
44            primary: Arc::new(primary),
45            replicas,
46            config: config.clone(),
47            replica_counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
48        })
49    }
50
51    /// Create a connection pool with the given parameters.
52    async fn create_pool(url: &str, size: u32, timeout_secs: u64) -> sqlx::Result<PgPool> {
53        PgPoolOptions::new()
54            .max_connections(size)
55            .acquire_timeout(Duration::from_secs(timeout_secs))
56            .connect(url)
57            .await
58    }
59
60    /// Get the primary pool for writes.
61    pub fn primary(&self) -> &PgPool {
62        &self.primary
63    }
64
65    /// Get a pool for reads (uses replica if configured, otherwise primary).
66    pub fn read_pool(&self) -> &PgPool {
67        if self.config.read_from_replica && !self.replicas.is_empty() {
68            // Round-robin replica selection
69            let idx = self
70                .replica_counter
71                .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
72                % self.replicas.len();
73            &self.replicas[idx]
74        } else {
75            &self.primary
76        }
77    }
78
79    /// Check database connectivity.
80    pub async fn health_check(&self) -> Result<()> {
81        sqlx::query("SELECT 1")
82            .execute(self.primary.as_ref())
83            .await
84            .map_err(|e| ForgeError::Database(format!("Health check failed: {}", e)))?;
85        Ok(())
86    }
87
88    /// Close all connections gracefully.
89    pub async fn close(&self) {
90        self.primary.close().await;
91        for replica in &self.replicas {
92            replica.close().await;
93        }
94    }
95}
96
97/// Type alias for the pool type.
98pub type DatabasePool = PgPool;
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    // Integration tests require a real PostgreSQL connection
105    // These are placeholder tests that don't require a database
106
107    #[test]
108    fn test_database_config_clone() {
109        let config = DatabaseConfig {
110            url: "postgres://localhost/test".to_string(),
111            pool_size: 10,
112            ..Default::default()
113        };
114
115        let cloned = config.clone();
116        assert_eq!(cloned.url, config.url);
117        assert_eq!(cloned.pool_size, config.pool_size);
118    }
119}