1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::time::Duration;
4
5use sqlx::postgres::{PgPool, PgPoolOptions};
6use tokio::task::JoinHandle;
7
8use forge_core::config::{DatabaseConfig, PoolConfig};
9use forge_core::error::{ForgeError, Result};
10
11struct ReplicaEntry {
12 pool: Arc<PgPool>,
13 healthy: Arc<AtomicBool>,
14}
15
16#[derive(Clone)]
18pub struct Database {
19 primary: Arc<PgPool>,
20 replicas: Arc<Vec<ReplicaEntry>>,
21 config: DatabaseConfig,
22 replica_counter: Arc<AtomicUsize>,
23 jobs_pool: Option<Arc<PgPool>>,
25 observability_pool: Option<Arc<PgPool>>,
27 analytics_pool: Option<Arc<PgPool>>,
29}
30
31impl Database {
32 pub async fn from_config(config: &DatabaseConfig) -> Result<Self> {
34 if config.url.is_empty() {
35 return Err(ForgeError::Database(
36 "database.url cannot be empty. Provide a PostgreSQL connection URL.".into(),
37 ));
38 }
39
40 let primary_size = config
42 .pools
43 .default
44 .as_ref()
45 .map(|p| p.size)
46 .unwrap_or(config.pool_size);
47 let primary_timeout = config
48 .pools
49 .default
50 .as_ref()
51 .map(|p| p.timeout_secs)
52 .unwrap_or(config.pool_timeout_secs);
53
54 let primary = Self::create_pool(&config.url, primary_size, primary_timeout)
55 .await
56 .map_err(|e| ForgeError::Database(format!("Failed to connect to primary: {}", e)))?;
57
58 let mut replicas = Vec::new();
59 for replica_url in &config.replica_urls {
60 let pool =
61 Self::create_pool(replica_url, config.pool_size / 2, config.pool_timeout_secs)
62 .await
63 .map_err(|e| {
64 ForgeError::Database(format!("Failed to connect to replica: {}", e))
65 })?;
66 replicas.push(ReplicaEntry {
67 pool: Arc::new(pool),
68 healthy: Arc::new(AtomicBool::new(true)),
69 });
70 }
71
72 let jobs_pool = Self::create_isolated_pool(&config.url, config.pools.jobs.as_ref()).await?;
73 let observability_pool =
74 Self::create_isolated_pool(&config.url, config.pools.observability.as_ref()).await?;
75 let analytics_pool =
76 Self::create_isolated_pool(&config.url, config.pools.analytics.as_ref()).await?;
77
78 Ok(Self {
79 primary: Arc::new(primary),
80 replicas: Arc::new(replicas),
81 config: config.clone(),
82 replica_counter: Arc::new(AtomicUsize::new(0)),
83 jobs_pool,
84 observability_pool,
85 analytics_pool,
86 })
87 }
88
89 async fn create_pool(url: &str, size: u32, timeout_secs: u64) -> sqlx::Result<PgPool> {
90 PgPoolOptions::new()
91 .max_connections(size)
92 .acquire_timeout(Duration::from_secs(timeout_secs))
93 .connect(url)
94 .await
95 }
96
97 async fn create_isolated_pool(
98 url: &str,
99 config: Option<&PoolConfig>,
100 ) -> Result<Option<Arc<PgPool>>> {
101 let Some(cfg) = config else {
102 return Ok(None);
103 };
104 let pool = Self::create_pool(url, cfg.size, cfg.timeout_secs)
105 .await
106 .map_err(|e| ForgeError::Database(format!("Failed to create isolated pool: {}", e)))?;
107 Ok(Some(Arc::new(pool)))
108 }
109
110 pub fn primary(&self) -> &PgPool {
112 &self.primary
113 }
114
115 pub fn read_pool(&self) -> &PgPool {
117 if !self.config.read_from_replica || self.replicas.is_empty() {
118 return &self.primary;
119 }
120
121 let len = self.replicas.len();
122 let start = self.replica_counter.fetch_add(1, Ordering::Relaxed) % len;
123
124 for offset in 0..len {
126 let idx = (start + offset) % len;
127 if let Some(entry) = self.replicas.get(idx)
128 && entry.healthy.load(Ordering::Relaxed)
129 {
130 return &entry.pool;
131 }
132 }
133
134 &self.primary
136 }
137
138 pub fn jobs_pool(&self) -> &PgPool {
141 self.jobs_pool.as_deref().unwrap_or(&self.primary)
142 }
143
144 pub fn observability_pool(&self) -> &PgPool {
147 self.observability_pool.as_deref().unwrap_or(&self.primary)
148 }
149
150 pub fn analytics_pool(&self) -> &PgPool {
153 self.analytics_pool.as_deref().unwrap_or(&self.primary)
154 }
155
156 pub fn start_health_monitor(&self) -> Option<JoinHandle<()>> {
158 if self.replicas.is_empty() {
159 return None;
160 }
161
162 let replicas = Arc::clone(&self.replicas);
163 let handle = tokio::spawn(async move {
164 let mut interval = tokio::time::interval(Duration::from_secs(15));
165 loop {
166 interval.tick().await;
167 for entry in replicas.iter() {
168 let ok = sqlx::query("SELECT 1")
169 .execute(entry.pool.as_ref())
170 .await
171 .is_ok();
172 let was_healthy = entry.healthy.swap(ok, Ordering::Relaxed);
173 if was_healthy && !ok {
174 tracing::warn!("Replica marked unhealthy");
175 } else if !was_healthy && ok {
176 tracing::info!("Replica recovered");
177 }
178 }
179 }
180 });
181
182 Some(handle)
183 }
184
185 #[cfg(test)]
187 pub fn from_pool(pool: PgPool) -> Self {
188 Self {
189 primary: Arc::new(pool),
190 replicas: Arc::new(Vec::new()),
191 config: DatabaseConfig::default(),
192 replica_counter: Arc::new(AtomicUsize::new(0)),
193 jobs_pool: None,
194 observability_pool: None,
195 analytics_pool: None,
196 }
197 }
198
199 pub async fn health_check(&self) -> Result<()> {
201 sqlx::query("SELECT 1")
202 .execute(self.primary.as_ref())
203 .await
204 .map_err(|e| ForgeError::Database(format!("Health check failed: {}", e)))?;
205 Ok(())
206 }
207
208 pub async fn close(&self) {
210 self.primary.close().await;
211 for entry in self.replicas.iter() {
212 entry.pool.close().await;
213 }
214 if let Some(ref p) = self.jobs_pool {
215 p.close().await;
216 }
217 if let Some(ref p) = self.observability_pool {
218 p.close().await;
219 }
220 if let Some(ref p) = self.analytics_pool {
221 p.close().await;
222 }
223 }
224}
225
226pub type DatabasePool = PgPool;
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232
233 #[test]
234 fn test_database_config_clone() {
235 let config = DatabaseConfig::new("postgres://localhost/test");
236
237 let cloned = config.clone();
238 assert_eq!(cloned.url(), config.url());
239 assert_eq!(cloned.pool_size, config.pool_size);
240 }
241}