1use std::sync::Arc;
2use std::time::Duration;
3
4use sqlx::postgres::{PgPool, PgPoolOptions};
5
6use forge_core::config::DatabaseConfig;
7use forge_core::error::{ForgeError, Result};
8
9#[derive(Clone)]
11pub struct Database {
12 primary: Arc<PgPool>,
14
15 replicas: Vec<Arc<PgPool>>,
17
18 config: DatabaseConfig,
20
21 replica_counter: Arc<std::sync::atomic::AtomicUsize>,
23}
24
25impl Database {
26 pub async fn from_config(config: &DatabaseConfig) -> Result<Self> {
28 let primary = Self::create_pool(&config.url, config.pool_size, config.pool_timeout_secs)
29 .await
30 .map_err(|e| ForgeError::Database(format!("Failed to connect to primary: {}", e)))?;
31
32 let mut replicas = Vec::new();
33 for replica_url in &config.replica_urls {
34 let pool =
35 Self::create_pool(replica_url, config.pool_size / 2, config.pool_timeout_secs)
36 .await
37 .map_err(|e| {
38 ForgeError::Database(format!("Failed to connect to replica: {}", e))
39 })?;
40 replicas.push(Arc::new(pool));
41 }
42
43 Ok(Self {
44 primary: Arc::new(primary),
45 replicas,
46 config: config.clone(),
47 replica_counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
48 })
49 }
50
51 async fn create_pool(url: &str, size: u32, timeout_secs: u64) -> sqlx::Result<PgPool> {
53 PgPoolOptions::new()
54 .max_connections(size)
55 .acquire_timeout(Duration::from_secs(timeout_secs))
56 .connect(url)
57 .await
58 }
59
60 pub fn primary(&self) -> &PgPool {
62 &self.primary
63 }
64
65 pub fn read_pool(&self) -> &PgPool {
67 if self.config.read_from_replica && !self.replicas.is_empty() {
68 let idx = self
70 .replica_counter
71 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
72 % self.replicas.len();
73 &self.replicas[idx]
74 } else {
75 &self.primary
76 }
77 }
78
79 pub async fn health_check(&self) -> Result<()> {
81 sqlx::query("SELECT 1")
82 .execute(self.primary.as_ref())
83 .await
84 .map_err(|e| ForgeError::Database(format!("Health check failed: {}", e)))?;
85 Ok(())
86 }
87
88 pub async fn close(&self) {
90 self.primary.close().await;
91 for replica in &self.replicas {
92 replica.close().await;
93 }
94 }
95}
96
97pub type DatabasePool = PgPool;
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 #[test]
108 fn test_database_config_clone() {
109 let config = DatabaseConfig {
110 url: "postgres://localhost/test".to_string(),
111 pool_size: 10,
112 ..Default::default()
113 };
114
115 let cloned = config.clone();
116 assert_eq!(cloned.url, config.url);
117 assert_eq!(cloned.pool_size, config.pool_size);
118 }
119}