1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::time::Duration;
4
5use sqlx::ConnectOptions;
6use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
7use tokio::task::JoinHandle;
8use tracing::log::LevelFilter;
9
10use forge_core::config::{DatabaseConfig, PoolConfig};
11use forge_core::error::{ForgeError, Result};
12
13struct ReplicaEntry {
14 pool: Arc<PgPool>,
15 healthy: Arc<AtomicBool>,
16}
17
18#[derive(Clone)]
20pub struct Database {
21 primary: Arc<PgPool>,
22 replicas: Arc<Vec<ReplicaEntry>>,
23 config: DatabaseConfig,
24 replica_counter: Arc<AtomicUsize>,
25 jobs_pool: Option<Arc<PgPool>>,
27 observability_pool: Option<Arc<PgPool>>,
29 analytics_pool: Option<Arc<PgPool>>,
31}
32
33impl Database {
34 pub async fn from_config(config: &DatabaseConfig) -> Result<Self> {
36 Self::from_config_with_service(config, "forge").await
37 }
38
39 pub async fn from_config_with_service(
44 config: &DatabaseConfig,
45 service_name: &str,
46 ) -> Result<Self> {
47 if config.url.is_empty() {
48 return Err(ForgeError::Database(
49 "database.url cannot be empty. Provide a PostgreSQL connection URL.".into(),
50 ));
51 }
52
53 let primary_size = config
55 .pools
56 .default
57 .as_ref()
58 .map(|p| p.size)
59 .unwrap_or(config.pool_size);
60 let primary_timeout = config
61 .pools
62 .default
63 .as_ref()
64 .map(|p| p.timeout_secs)
65 .unwrap_or(config.pool_timeout_secs);
66
67 let primary_min = config
68 .pools
69 .default
70 .as_ref()
71 .map(|p| p.min_size)
72 .unwrap_or(config.min_pool_size);
73 let primary_test = config
74 .pools
75 .default
76 .as_ref()
77 .map(|p| p.test_before_acquire)
78 .unwrap_or(config.test_before_acquire);
79
80 let primary = Self::create_pool_with_opts(
81 &config.url,
82 primary_size,
83 primary_min,
84 primary_timeout,
85 primary_test,
86 service_name,
87 )
88 .await
89 .map_err(|e| ForgeError::Database(format!("Failed to connect to primary: {}", e)))?;
90
91 let mut replicas = Vec::new();
92 for replica_url in &config.replica_urls {
93 let pool = Self::create_pool(
94 replica_url,
95 config.pool_size / 2,
96 config.pool_timeout_secs,
97 service_name,
98 )
99 .await
100 .map_err(|e| ForgeError::Database(format!("Failed to connect to replica: {}", e)))?;
101 replicas.push(ReplicaEntry {
102 pool: Arc::new(pool),
103 healthy: Arc::new(AtomicBool::new(true)),
104 });
105 }
106
107 let jobs_pool =
108 Self::create_isolated_pool(&config.url, config.pools.jobs.as_ref(), service_name)
109 .await?;
110 let observability_pool = Self::create_isolated_pool(
111 &config.url,
112 config.pools.observability.as_ref(),
113 service_name,
114 )
115 .await?;
116 let analytics_pool =
117 Self::create_isolated_pool(&config.url, config.pools.analytics.as_ref(), service_name)
118 .await?;
119
120 Ok(Self {
121 primary: Arc::new(primary),
122 replicas: Arc::new(replicas),
123 config: config.clone(),
124 replica_counter: Arc::new(AtomicUsize::new(0)),
125 jobs_pool,
126 observability_pool,
127 analytics_pool,
128 })
129 }
130
131 fn connect_options(url: &str, service_name: &str) -> sqlx::Result<PgConnectOptions> {
132 let options: PgConnectOptions = url.parse()?;
133 Ok(options
134 .application_name(service_name)
135 .log_statements(LevelFilter::Off)
136 .log_slow_statements(LevelFilter::Warn, Duration::from_millis(500)))
137 }
138
139 async fn create_pool(
140 url: &str,
141 size: u32,
142 timeout_secs: u64,
143 service_name: &str,
144 ) -> sqlx::Result<PgPool> {
145 Self::create_pool_with_opts(url, size, 0, timeout_secs, true, service_name).await
146 }
147
148 async fn create_pool_with_opts(
149 url: &str,
150 size: u32,
151 min_size: u32,
152 timeout_secs: u64,
153 test_before_acquire: bool,
154 service_name: &str,
155 ) -> sqlx::Result<PgPool> {
156 let options = Self::connect_options(url, service_name)?;
157 PgPoolOptions::new()
158 .max_connections(size)
159 .min_connections(min_size)
160 .acquire_timeout(Duration::from_secs(timeout_secs))
161 .test_before_acquire(test_before_acquire)
162 .connect_with(options)
163 .await
164 }
165
166 async fn create_isolated_pool(
167 url: &str,
168 config: Option<&PoolConfig>,
169 service_name: &str,
170 ) -> Result<Option<Arc<PgPool>>> {
171 let Some(cfg) = config else {
172 return Ok(None);
173 };
174 let pool = Self::create_pool_with_opts(
175 url,
176 cfg.size,
177 cfg.min_size,
178 cfg.timeout_secs,
179 cfg.test_before_acquire,
180 service_name,
181 )
182 .await
183 .map_err(|e| ForgeError::Database(format!("Failed to create isolated pool: {}", e)))?;
184 Ok(Some(Arc::new(pool)))
185 }
186
187 pub fn primary(&self) -> &PgPool {
189 &self.primary
190 }
191
192 pub fn read_pool(&self) -> &PgPool {
194 if !self.config.read_from_replica || self.replicas.is_empty() {
195 return &self.primary;
196 }
197
198 let len = self.replicas.len();
199 let start = self.replica_counter.fetch_add(1, Ordering::Relaxed) % len;
200
201 for offset in 0..len {
203 let idx = (start + offset) % len;
204 if let Some(entry) = self.replicas.get(idx)
205 && entry.healthy.load(Ordering::Relaxed)
206 {
207 return &entry.pool;
208 }
209 }
210
211 &self.primary
213 }
214
215 pub fn jobs_pool(&self) -> &PgPool {
218 self.jobs_pool.as_deref().unwrap_or(&self.primary)
219 }
220
221 pub fn observability_pool(&self) -> &PgPool {
224 self.observability_pool.as_deref().unwrap_or(&self.primary)
225 }
226
227 pub fn analytics_pool(&self) -> &PgPool {
230 self.analytics_pool.as_deref().unwrap_or(&self.primary)
231 }
232
233 pub fn start_health_monitor(&self) -> Option<JoinHandle<()>> {
235 if self.replicas.is_empty() {
236 return None;
237 }
238
239 let replicas = Arc::clone(&self.replicas);
240 let handle = tokio::spawn(async move {
241 let mut interval = tokio::time::interval(Duration::from_secs(15));
242 loop {
243 interval.tick().await;
244 for entry in replicas.iter() {
245 let ok = sqlx::query("SELECT 1")
246 .execute(entry.pool.as_ref())
247 .await
248 .is_ok();
249 let was_healthy = entry.healthy.swap(ok, Ordering::Relaxed);
250 if was_healthy && !ok {
251 tracing::warn!("Replica marked unhealthy");
252 } else if !was_healthy && ok {
253 tracing::info!("Replica recovered");
254 }
255 }
256 }
257 });
258
259 Some(handle)
260 }
261
262 #[cfg(test)]
264 pub fn from_pool(pool: PgPool) -> Self {
265 Self {
266 primary: Arc::new(pool),
267 replicas: Arc::new(Vec::new()),
268 config: DatabaseConfig::default(),
269 replica_counter: Arc::new(AtomicUsize::new(0)),
270 jobs_pool: None,
271 observability_pool: None,
272 analytics_pool: None,
273 }
274 }
275
276 pub async fn health_check(&self) -> Result<()> {
278 sqlx::query("SELECT 1")
279 .execute(self.primary.as_ref())
280 .await
281 .map_err(|e| ForgeError::Database(format!("Health check failed: {}", e)))?;
282 Ok(())
283 }
284
285 pub async fn close(&self) {
287 self.primary.close().await;
288 for entry in self.replicas.iter() {
289 entry.pool.close().await;
290 }
291 if let Some(ref p) = self.jobs_pool {
292 p.close().await;
293 }
294 if let Some(ref p) = self.observability_pool {
295 p.close().await;
296 }
297 if let Some(ref p) = self.analytics_pool {
298 p.close().await;
299 }
300 }
301}
302
303pub type DatabasePool = PgPool;
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309
310 #[test]
311 fn test_database_config_clone() {
312 let config = DatabaseConfig::new("postgres://localhost/test");
313
314 let cloned = config.clone();
315 assert_eq!(cloned.url(), config.url());
316 assert_eq!(cloned.pool_size, config.pool_size);
317 }
318}