Skip to main content

forge_runtime/db/
pool.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::time::Duration;
4
5use sqlx::ConnectOptions;
6use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
7use tokio::task::JoinHandle;
8use tracing::log::LevelFilter;
9
10use forge_core::config::{DatabaseConfig, PoolConfig};
11use forge_core::error::{ForgeError, Result};
12
13struct ReplicaEntry {
14    pool: Arc<PgPool>,
15    healthy: Arc<AtomicBool>,
16}
17
18/// Database connection wrapper with health-aware replica routing and workload isolation.
19#[derive(Clone)]
20pub struct Database {
21    primary: Arc<PgPool>,
22    replicas: Arc<Vec<ReplicaEntry>>,
23    config: DatabaseConfig,
24    replica_counter: Arc<AtomicUsize>,
25    /// Isolated pool for background jobs, cron, daemons, workflows.
26    jobs_pool: Option<Arc<PgPool>>,
27    /// Isolated pool for observability writes.
28    observability_pool: Option<Arc<PgPool>>,
29    /// Isolated pool for long-running analytics queries.
30    analytics_pool: Option<Arc<PgPool>>,
31}
32
33impl Database {
34    /// Create a new database connection from configuration.
35    pub async fn from_config(config: &DatabaseConfig) -> Result<Self> {
36        Self::from_config_with_service(config, "forge").await
37    }
38
39    /// Create a new database connection with a service name for tracing.
40    ///
41    /// The service name is set as PostgreSQL's `application_name`, visible in
42    /// `pg_stat_activity` for correlating queries to the originating service.
43    pub async fn from_config_with_service(
44        config: &DatabaseConfig,
45        service_name: &str,
46    ) -> Result<Self> {
47        if config.url.is_empty() {
48            return Err(ForgeError::Database(
49                "database.url cannot be empty. Provide a PostgreSQL connection URL.".into(),
50            ));
51        }
52
53        // If pools.default overrides the primary pool size, use it
54        let primary_size = config
55            .pools
56            .default
57            .as_ref()
58            .map(|p| p.size)
59            .unwrap_or(config.pool_size);
60        let primary_timeout = config
61            .pools
62            .default
63            .as_ref()
64            .map(|p| p.timeout_secs)
65            .unwrap_or(config.pool_timeout_secs);
66
67        let primary = Self::create_pool(&config.url, primary_size, primary_timeout, service_name)
68            .await
69            .map_err(|e| ForgeError::Database(format!("Failed to connect to primary: {}", e)))?;
70
71        let mut replicas = Vec::new();
72        for replica_url in &config.replica_urls {
73            let pool = Self::create_pool(
74                replica_url,
75                config.pool_size / 2,
76                config.pool_timeout_secs,
77                service_name,
78            )
79            .await
80            .map_err(|e| ForgeError::Database(format!("Failed to connect to replica: {}", e)))?;
81            replicas.push(ReplicaEntry {
82                pool: Arc::new(pool),
83                healthy: Arc::new(AtomicBool::new(true)),
84            });
85        }
86
87        let jobs_pool =
88            Self::create_isolated_pool(&config.url, config.pools.jobs.as_ref(), service_name)
89                .await?;
90        let observability_pool = Self::create_isolated_pool(
91            &config.url,
92            config.pools.observability.as_ref(),
93            service_name,
94        )
95        .await?;
96        let analytics_pool =
97            Self::create_isolated_pool(&config.url, config.pools.analytics.as_ref(), service_name)
98                .await?;
99
100        Ok(Self {
101            primary: Arc::new(primary),
102            replicas: Arc::new(replicas),
103            config: config.clone(),
104            replica_counter: Arc::new(AtomicUsize::new(0)),
105            jobs_pool,
106            observability_pool,
107            analytics_pool,
108        })
109    }
110
111    fn connect_options(url: &str, service_name: &str) -> sqlx::Result<PgConnectOptions> {
112        let options: PgConnectOptions = url.parse()?;
113        Ok(options
114            .application_name(service_name)
115            .log_statements(LevelFilter::Debug)
116            .log_slow_statements(LevelFilter::Warn, Duration::from_millis(500)))
117    }
118
119    async fn create_pool(
120        url: &str,
121        size: u32,
122        timeout_secs: u64,
123        service_name: &str,
124    ) -> sqlx::Result<PgPool> {
125        let options = Self::connect_options(url, service_name)?;
126        PgPoolOptions::new()
127            .max_connections(size)
128            .acquire_timeout(Duration::from_secs(timeout_secs))
129            .connect_with(options)
130            .await
131    }
132
133    async fn create_isolated_pool(
134        url: &str,
135        config: Option<&PoolConfig>,
136        service_name: &str,
137    ) -> Result<Option<Arc<PgPool>>> {
138        let Some(cfg) = config else {
139            return Ok(None);
140        };
141        let pool = Self::create_pool(url, cfg.size, cfg.timeout_secs, service_name)
142            .await
143            .map_err(|e| ForgeError::Database(format!("Failed to create isolated pool: {}", e)))?;
144        Ok(Some(Arc::new(pool)))
145    }
146
147    /// Get the primary pool for writes.
148    pub fn primary(&self) -> &PgPool {
149        &self.primary
150    }
151
152    /// Get a pool for reads. Skips unhealthy replicas, falls back to primary.
153    pub fn read_pool(&self) -> &PgPool {
154        if !self.config.read_from_replica || self.replicas.is_empty() {
155            return &self.primary;
156        }
157
158        let len = self.replicas.len();
159        let start = self.replica_counter.fetch_add(1, Ordering::Relaxed) % len;
160
161        // Try each replica starting from round-robin position
162        for offset in 0..len {
163            let idx = (start + offset) % len;
164            if let Some(entry) = self.replicas.get(idx)
165                && entry.healthy.load(Ordering::Relaxed)
166            {
167                return &entry.pool;
168            }
169        }
170
171        // All replicas unhealthy, fall back to primary
172        &self.primary
173    }
174
175    /// Pool for background jobs, cron, daemons, and workflows.
176    /// Falls back to primary if no isolated pool is configured.
177    pub fn jobs_pool(&self) -> &PgPool {
178        self.jobs_pool.as_deref().unwrap_or(&self.primary)
179    }
180
181    /// Pool for observability writes (metrics, slow query logs).
182    /// Falls back to primary if no isolated pool is configured.
183    pub fn observability_pool(&self) -> &PgPool {
184        self.observability_pool.as_deref().unwrap_or(&self.primary)
185    }
186
187    /// Pool for long-running analytics queries.
188    /// Falls back to primary if no isolated pool is configured.
189    pub fn analytics_pool(&self) -> &PgPool {
190        self.analytics_pool.as_deref().unwrap_or(&self.primary)
191    }
192
193    /// Start background health monitoring for replicas. Returns None if no replicas configured.
194    pub fn start_health_monitor(&self) -> Option<JoinHandle<()>> {
195        if self.replicas.is_empty() {
196            return None;
197        }
198
199        let replicas = Arc::clone(&self.replicas);
200        let handle = tokio::spawn(async move {
201            let mut interval = tokio::time::interval(Duration::from_secs(15));
202            loop {
203                interval.tick().await;
204                for entry in replicas.iter() {
205                    let ok = sqlx::query("SELECT 1")
206                        .execute(entry.pool.as_ref())
207                        .await
208                        .is_ok();
209                    let was_healthy = entry.healthy.swap(ok, Ordering::Relaxed);
210                    if was_healthy && !ok {
211                        tracing::warn!("Replica marked unhealthy");
212                    } else if !was_healthy && ok {
213                        tracing::info!("Replica recovered");
214                    }
215                }
216            }
217        });
218
219        Some(handle)
220    }
221
222    /// Create a Database wrapper from an existing pool (for testing).
223    #[cfg(test)]
224    pub fn from_pool(pool: PgPool) -> Self {
225        Self {
226            primary: Arc::new(pool),
227            replicas: Arc::new(Vec::new()),
228            config: DatabaseConfig::default(),
229            replica_counter: Arc::new(AtomicUsize::new(0)),
230            jobs_pool: None,
231            observability_pool: None,
232            analytics_pool: None,
233        }
234    }
235
236    /// Check database connectivity.
237    pub async fn health_check(&self) -> Result<()> {
238        sqlx::query("SELECT 1")
239            .execute(self.primary.as_ref())
240            .await
241            .map_err(|e| ForgeError::Database(format!("Health check failed: {}", e)))?;
242        Ok(())
243    }
244
245    /// Close all connections gracefully.
246    pub async fn close(&self) {
247        self.primary.close().await;
248        for entry in self.replicas.iter() {
249            entry.pool.close().await;
250        }
251        if let Some(ref p) = self.jobs_pool {
252            p.close().await;
253        }
254        if let Some(ref p) = self.observability_pool {
255            p.close().await;
256        }
257        if let Some(ref p) = self.analytics_pool {
258            p.close().await;
259        }
260    }
261}
262
263/// Type alias for the pool type.
264pub type DatabasePool = PgPool;
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn test_database_config_clone() {
272        let config = DatabaseConfig::new("postgres://localhost/test");
273
274        let cloned = config.clone();
275        assert_eq!(cloned.url(), config.url());
276        assert_eq!(cloned.pool_size, config.pool_size);
277    }
278}