1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::time::Duration;
4
5use sqlx::ConnectOptions;
6use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
7use tokio::task::JoinHandle;
8use tracing::log::LevelFilter;
9
10use forge_core::config::{DatabaseConfig, PoolConfig};
11use forge_core::error::{ForgeError, Result};
12
13struct ReplicaEntry {
14 pool: Arc<PgPool>,
15 healthy: Arc<AtomicBool>,
16}
17
18#[derive(Clone)]
20pub struct Database {
21 primary: Arc<PgPool>,
22 replicas: Arc<Vec<ReplicaEntry>>,
23 config: DatabaseConfig,
24 replica_counter: Arc<AtomicUsize>,
25 jobs_pool: Option<Arc<PgPool>>,
27 observability_pool: Option<Arc<PgPool>>,
29 analytics_pool: Option<Arc<PgPool>>,
31}
32
33impl Database {
34 pub async fn from_config(config: &DatabaseConfig) -> Result<Self> {
36 Self::from_config_with_service(config, "forge").await
37 }
38
39 pub async fn from_config_with_service(
44 config: &DatabaseConfig,
45 service_name: &str,
46 ) -> Result<Self> {
47 if config.url.is_empty() {
48 return Err(ForgeError::Database(
49 "database.url cannot be empty. Provide a PostgreSQL connection URL.".into(),
50 ));
51 }
52
53 let primary_size = config
55 .pools
56 .default
57 .as_ref()
58 .map(|p| p.size)
59 .unwrap_or(config.pool_size);
60 let primary_timeout = config
61 .pools
62 .default
63 .as_ref()
64 .map(|p| p.timeout_secs)
65 .unwrap_or(config.pool_timeout_secs);
66
67 let primary = Self::create_pool(&config.url, primary_size, primary_timeout, service_name)
68 .await
69 .map_err(|e| ForgeError::Database(format!("Failed to connect to primary: {}", e)))?;
70
71 let mut replicas = Vec::new();
72 for replica_url in &config.replica_urls {
73 let pool = Self::create_pool(
74 replica_url,
75 config.pool_size / 2,
76 config.pool_timeout_secs,
77 service_name,
78 )
79 .await
80 .map_err(|e| ForgeError::Database(format!("Failed to connect to replica: {}", e)))?;
81 replicas.push(ReplicaEntry {
82 pool: Arc::new(pool),
83 healthy: Arc::new(AtomicBool::new(true)),
84 });
85 }
86
87 let jobs_pool =
88 Self::create_isolated_pool(&config.url, config.pools.jobs.as_ref(), service_name)
89 .await?;
90 let observability_pool = Self::create_isolated_pool(
91 &config.url,
92 config.pools.observability.as_ref(),
93 service_name,
94 )
95 .await?;
96 let analytics_pool =
97 Self::create_isolated_pool(&config.url, config.pools.analytics.as_ref(), service_name)
98 .await?;
99
100 Ok(Self {
101 primary: Arc::new(primary),
102 replicas: Arc::new(replicas),
103 config: config.clone(),
104 replica_counter: Arc::new(AtomicUsize::new(0)),
105 jobs_pool,
106 observability_pool,
107 analytics_pool,
108 })
109 }
110
111 fn connect_options(url: &str, service_name: &str) -> sqlx::Result<PgConnectOptions> {
112 let options: PgConnectOptions = url.parse()?;
113 Ok(options
114 .application_name(service_name)
115 .log_statements(LevelFilter::Debug)
116 .log_slow_statements(LevelFilter::Warn, Duration::from_millis(500)))
117 }
118
119 async fn create_pool(
120 url: &str,
121 size: u32,
122 timeout_secs: u64,
123 service_name: &str,
124 ) -> sqlx::Result<PgPool> {
125 let options = Self::connect_options(url, service_name)?;
126 PgPoolOptions::new()
127 .max_connections(size)
128 .acquire_timeout(Duration::from_secs(timeout_secs))
129 .connect_with(options)
130 .await
131 }
132
133 async fn create_isolated_pool(
134 url: &str,
135 config: Option<&PoolConfig>,
136 service_name: &str,
137 ) -> Result<Option<Arc<PgPool>>> {
138 let Some(cfg) = config else {
139 return Ok(None);
140 };
141 let pool = Self::create_pool(url, cfg.size, cfg.timeout_secs, service_name)
142 .await
143 .map_err(|e| ForgeError::Database(format!("Failed to create isolated pool: {}", e)))?;
144 Ok(Some(Arc::new(pool)))
145 }
146
147 pub fn primary(&self) -> &PgPool {
149 &self.primary
150 }
151
152 pub fn read_pool(&self) -> &PgPool {
154 if !self.config.read_from_replica || self.replicas.is_empty() {
155 return &self.primary;
156 }
157
158 let len = self.replicas.len();
159 let start = self.replica_counter.fetch_add(1, Ordering::Relaxed) % len;
160
161 for offset in 0..len {
163 let idx = (start + offset) % len;
164 if let Some(entry) = self.replicas.get(idx)
165 && entry.healthy.load(Ordering::Relaxed)
166 {
167 return &entry.pool;
168 }
169 }
170
171 &self.primary
173 }
174
175 pub fn jobs_pool(&self) -> &PgPool {
178 self.jobs_pool.as_deref().unwrap_or(&self.primary)
179 }
180
181 pub fn observability_pool(&self) -> &PgPool {
184 self.observability_pool.as_deref().unwrap_or(&self.primary)
185 }
186
187 pub fn analytics_pool(&self) -> &PgPool {
190 self.analytics_pool.as_deref().unwrap_or(&self.primary)
191 }
192
193 pub fn start_health_monitor(&self) -> Option<JoinHandle<()>> {
195 if self.replicas.is_empty() {
196 return None;
197 }
198
199 let replicas = Arc::clone(&self.replicas);
200 let handle = tokio::spawn(async move {
201 let mut interval = tokio::time::interval(Duration::from_secs(15));
202 loop {
203 interval.tick().await;
204 for entry in replicas.iter() {
205 let ok = sqlx::query("SELECT 1")
206 .execute(entry.pool.as_ref())
207 .await
208 .is_ok();
209 let was_healthy = entry.healthy.swap(ok, Ordering::Relaxed);
210 if was_healthy && !ok {
211 tracing::warn!("Replica marked unhealthy");
212 } else if !was_healthy && ok {
213 tracing::info!("Replica recovered");
214 }
215 }
216 }
217 });
218
219 Some(handle)
220 }
221
222 #[cfg(test)]
224 pub fn from_pool(pool: PgPool) -> Self {
225 Self {
226 primary: Arc::new(pool),
227 replicas: Arc::new(Vec::new()),
228 config: DatabaseConfig::default(),
229 replica_counter: Arc::new(AtomicUsize::new(0)),
230 jobs_pool: None,
231 observability_pool: None,
232 analytics_pool: None,
233 }
234 }
235
236 pub async fn health_check(&self) -> Result<()> {
238 sqlx::query("SELECT 1")
239 .execute(self.primary.as_ref())
240 .await
241 .map_err(|e| ForgeError::Database(format!("Health check failed: {}", e)))?;
242 Ok(())
243 }
244
245 pub async fn close(&self) {
247 self.primary.close().await;
248 for entry in self.replicas.iter() {
249 entry.pool.close().await;
250 }
251 if let Some(ref p) = self.jobs_pool {
252 p.close().await;
253 }
254 if let Some(ref p) = self.observability_pool {
255 p.close().await;
256 }
257 if let Some(ref p) = self.analytics_pool {
258 p.close().await;
259 }
260 }
261}
262
263pub type DatabasePool = PgPool;
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn test_database_config_clone() {
272 let config = DatabaseConfig::new("postgres://localhost/test");
273
274 let cloned = config.clone();
275 assert_eq!(cloned.url(), config.url());
276 assert_eq!(cloned.pool_size, config.pool_size);
277 }
278}