Skip to main content

forge_runtime/db/
pool.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::time::Duration;
4
5use sqlx::ConnectOptions;
6use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
7use tokio::task::JoinHandle;
8use tracing::log::LevelFilter;
9
10use forge_core::config::{DatabaseConfig, PoolConfig};
11use forge_core::error::{ForgeError, Result};
12
13struct ReplicaEntry {
14    pool: Arc<PgPool>,
15    healthy: Arc<AtomicBool>,
16}
17
18/// Database connection wrapper with health-aware replica routing and workload isolation.
19#[derive(Clone)]
20pub struct Database {
21    primary: Arc<PgPool>,
22    replicas: Arc<Vec<ReplicaEntry>>,
23    config: DatabaseConfig,
24    replica_counter: Arc<AtomicUsize>,
25    /// Isolated pool for background jobs, cron, daemons, workflows.
26    jobs_pool: Option<Arc<PgPool>>,
27    /// Isolated pool for observability writes.
28    observability_pool: Option<Arc<PgPool>>,
29    /// Isolated pool for long-running analytics queries.
30    analytics_pool: Option<Arc<PgPool>>,
31}
32
33impl Database {
34    /// Create a new database connection from configuration.
35    pub async fn from_config(config: &DatabaseConfig) -> Result<Self> {
36        Self::from_config_with_service(config, "forge").await
37    }
38
39    /// Create a new database connection with a service name for tracing.
40    ///
41    /// The service name is set as PostgreSQL's `application_name`, visible in
42    /// `pg_stat_activity` for correlating queries to the originating service.
43    pub async fn from_config_with_service(
44        config: &DatabaseConfig,
45        service_name: &str,
46    ) -> Result<Self> {
47        if config.url.is_empty() {
48            return Err(ForgeError::Database(
49                "database.url cannot be empty. Provide a PostgreSQL connection URL.".into(),
50            ));
51        }
52
53        // If pools.default overrides the primary pool size, use it
54        let primary_size = config
55            .pools
56            .default
57            .as_ref()
58            .map(|p| p.size)
59            .unwrap_or(config.pool_size);
60        let primary_timeout = config
61            .pools
62            .default
63            .as_ref()
64            .map(|p| p.timeout_secs)
65            .unwrap_or(config.pool_timeout_secs);
66
67        let primary_min = config
68            .pools
69            .default
70            .as_ref()
71            .map(|p| p.min_size)
72            .unwrap_or(config.min_pool_size);
73        let primary_test = config
74            .pools
75            .default
76            .as_ref()
77            .map(|p| p.test_before_acquire)
78            .unwrap_or(config.test_before_acquire);
79
80        let primary = Self::create_pool_with_opts(
81            &config.url,
82            primary_size,
83            primary_min,
84            primary_timeout,
85            primary_test,
86            service_name,
87        )
88        .await
89        .map_err(|e| ForgeError::Database(format!("Failed to connect to primary: {}", e)))?;
90
91        let mut replicas = Vec::new();
92        for replica_url in &config.replica_urls {
93            let pool = Self::create_pool(
94                replica_url,
95                config.pool_size / 2,
96                config.pool_timeout_secs,
97                service_name,
98            )
99            .await
100            .map_err(|e| ForgeError::Database(format!("Failed to connect to replica: {}", e)))?;
101            replicas.push(ReplicaEntry {
102                pool: Arc::new(pool),
103                healthy: Arc::new(AtomicBool::new(true)),
104            });
105        }
106
107        let jobs_pool =
108            Self::create_isolated_pool(&config.url, config.pools.jobs.as_ref(), service_name)
109                .await?;
110        let observability_pool = Self::create_isolated_pool(
111            &config.url,
112            config.pools.observability.as_ref(),
113            service_name,
114        )
115        .await?;
116        let analytics_pool =
117            Self::create_isolated_pool(&config.url, config.pools.analytics.as_ref(), service_name)
118                .await?;
119
120        Ok(Self {
121            primary: Arc::new(primary),
122            replicas: Arc::new(replicas),
123            config: config.clone(),
124            replica_counter: Arc::new(AtomicUsize::new(0)),
125            jobs_pool,
126            observability_pool,
127            analytics_pool,
128        })
129    }
130
131    fn connect_options(url: &str, service_name: &str) -> sqlx::Result<PgConnectOptions> {
132        let options: PgConnectOptions = url.parse()?;
133        Ok(options
134            .application_name(service_name)
135            .log_statements(LevelFilter::Off)
136            .log_slow_statements(LevelFilter::Warn, Duration::from_millis(500)))
137    }
138
139    async fn create_pool(
140        url: &str,
141        size: u32,
142        timeout_secs: u64,
143        service_name: &str,
144    ) -> sqlx::Result<PgPool> {
145        Self::create_pool_with_opts(url, size, 0, timeout_secs, true, service_name).await
146    }
147
148    async fn create_pool_with_opts(
149        url: &str,
150        size: u32,
151        min_size: u32,
152        timeout_secs: u64,
153        test_before_acquire: bool,
154        service_name: &str,
155    ) -> sqlx::Result<PgPool> {
156        let options = Self::connect_options(url, service_name)?;
157        PgPoolOptions::new()
158            .max_connections(size)
159            .min_connections(min_size)
160            .acquire_timeout(Duration::from_secs(timeout_secs))
161            .test_before_acquire(test_before_acquire)
162            .connect_with(options)
163            .await
164    }
165
166    async fn create_isolated_pool(
167        url: &str,
168        config: Option<&PoolConfig>,
169        service_name: &str,
170    ) -> Result<Option<Arc<PgPool>>> {
171        let Some(cfg) = config else {
172            return Ok(None);
173        };
174        let pool = Self::create_pool_with_opts(
175            url,
176            cfg.size,
177            cfg.min_size,
178            cfg.timeout_secs,
179            cfg.test_before_acquire,
180            service_name,
181        )
182        .await
183        .map_err(|e| ForgeError::Database(format!("Failed to create isolated pool: {}", e)))?;
184        Ok(Some(Arc::new(pool)))
185    }
186
187    /// Get the primary pool for writes.
188    pub fn primary(&self) -> &PgPool {
189        &self.primary
190    }
191
192    /// Get a pool for reads. Skips unhealthy replicas, falls back to primary.
193    pub fn read_pool(&self) -> &PgPool {
194        if !self.config.read_from_replica || self.replicas.is_empty() {
195            return &self.primary;
196        }
197
198        let len = self.replicas.len();
199        let start = self.replica_counter.fetch_add(1, Ordering::Relaxed) % len;
200
201        // Try each replica starting from round-robin position
202        for offset in 0..len {
203            let idx = (start + offset) % len;
204            if let Some(entry) = self.replicas.get(idx)
205                && entry.healthy.load(Ordering::Relaxed)
206            {
207                return &entry.pool;
208            }
209        }
210
211        // All replicas unhealthy, fall back to primary
212        &self.primary
213    }
214
215    /// Pool for background jobs, cron, daemons, and workflows.
216    /// Falls back to primary if no isolated pool is configured.
217    pub fn jobs_pool(&self) -> &PgPool {
218        self.jobs_pool.as_deref().unwrap_or(&self.primary)
219    }
220
221    /// Pool for observability writes (metrics, slow query logs).
222    /// Falls back to primary if no isolated pool is configured.
223    pub fn observability_pool(&self) -> &PgPool {
224        self.observability_pool.as_deref().unwrap_or(&self.primary)
225    }
226
227    /// Pool for long-running analytics queries.
228    /// Falls back to primary if no isolated pool is configured.
229    pub fn analytics_pool(&self) -> &PgPool {
230        self.analytics_pool.as_deref().unwrap_or(&self.primary)
231    }
232
233    /// Start background health monitoring for replicas. Returns None if no replicas configured.
234    pub fn start_health_monitor(&self) -> Option<JoinHandle<()>> {
235        if self.replicas.is_empty() {
236            return None;
237        }
238
239        let replicas = Arc::clone(&self.replicas);
240        let handle = tokio::spawn(async move {
241            let mut interval = tokio::time::interval(Duration::from_secs(15));
242            loop {
243                interval.tick().await;
244                for entry in replicas.iter() {
245                    let ok = sqlx::query("SELECT 1")
246                        .execute(entry.pool.as_ref())
247                        .await
248                        .is_ok();
249                    let was_healthy = entry.healthy.swap(ok, Ordering::Relaxed);
250                    if was_healthy && !ok {
251                        tracing::warn!("Replica marked unhealthy");
252                    } else if !was_healthy && ok {
253                        tracing::info!("Replica recovered");
254                    }
255                }
256            }
257        });
258
259        Some(handle)
260    }
261
262    /// Create a Database wrapper from an existing pool (for testing).
263    #[cfg(test)]
264    pub fn from_pool(pool: PgPool) -> Self {
265        Self {
266            primary: Arc::new(pool),
267            replicas: Arc::new(Vec::new()),
268            config: DatabaseConfig::default(),
269            replica_counter: Arc::new(AtomicUsize::new(0)),
270            jobs_pool: None,
271            observability_pool: None,
272            analytics_pool: None,
273        }
274    }
275
276    /// Check database connectivity.
277    pub async fn health_check(&self) -> Result<()> {
278        sqlx::query("SELECT 1")
279            .execute(self.primary.as_ref())
280            .await
281            .map_err(|e| ForgeError::Database(format!("Health check failed: {}", e)))?;
282        Ok(())
283    }
284
285    /// Close all connections gracefully.
286    pub async fn close(&self) {
287        self.primary.close().await;
288        for entry in self.replicas.iter() {
289            entry.pool.close().await;
290        }
291        if let Some(ref p) = self.jobs_pool {
292            p.close().await;
293        }
294        if let Some(ref p) = self.observability_pool {
295            p.close().await;
296        }
297        if let Some(ref p) = self.analytics_pool {
298            p.close().await;
299        }
300    }
301}
302
303/// Type alias for the pool type.
304pub type DatabasePool = PgPool;
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309
310    #[test]
311    fn test_database_config_clone() {
312        let config = DatabaseConfig::new("postgres://localhost/test");
313
314        let cloned = config.clone();
315        assert_eq!(cloned.url(), config.url());
316        assert_eq!(cloned.pool_size, config.pool_size);
317    }
318}