Skip to main content

forge_runtime/db/
pool.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::time::Duration;
4
5use sqlx::postgres::{PgPool, PgPoolOptions};
6use tokio::task::JoinHandle;
7
8use forge_core::config::{DatabaseConfig, PoolConfig};
9use forge_core::error::{ForgeError, Result};
10
11struct ReplicaEntry {
12    pool: Arc<PgPool>,
13    healthy: Arc<AtomicBool>,
14}
15
16/// Database connection wrapper with health-aware replica routing and workload isolation.
17#[derive(Clone)]
18pub struct Database {
19    primary: Arc<PgPool>,
20    replicas: Arc<Vec<ReplicaEntry>>,
21    config: DatabaseConfig,
22    replica_counter: Arc<AtomicUsize>,
23    /// Isolated pool for background jobs, cron, daemons, workflows.
24    jobs_pool: Option<Arc<PgPool>>,
25    /// Isolated pool for observability writes.
26    observability_pool: Option<Arc<PgPool>>,
27    /// Isolated pool for long-running analytics queries.
28    analytics_pool: Option<Arc<PgPool>>,
29}
30
31impl Database {
32    /// Create a new database connection from configuration.
33    pub async fn from_config(config: &DatabaseConfig) -> Result<Self> {
34        if config.url.is_empty() {
35            return Err(ForgeError::Database(
36                "database.url cannot be empty. Provide a PostgreSQL connection URL.".into(),
37            ));
38        }
39
40        // If pools.default overrides the primary pool size, use it
41        let primary_size = config
42            .pools
43            .default
44            .as_ref()
45            .map(|p| p.size)
46            .unwrap_or(config.pool_size);
47        let primary_timeout = config
48            .pools
49            .default
50            .as_ref()
51            .map(|p| p.timeout_secs)
52            .unwrap_or(config.pool_timeout_secs);
53
54        let primary = Self::create_pool(&config.url, primary_size, primary_timeout)
55            .await
56            .map_err(|e| ForgeError::Database(format!("Failed to connect to primary: {}", e)))?;
57
58        let mut replicas = Vec::new();
59        for replica_url in &config.replica_urls {
60            let pool =
61                Self::create_pool(replica_url, config.pool_size / 2, config.pool_timeout_secs)
62                    .await
63                    .map_err(|e| {
64                        ForgeError::Database(format!("Failed to connect to replica: {}", e))
65                    })?;
66            replicas.push(ReplicaEntry {
67                pool: Arc::new(pool),
68                healthy: Arc::new(AtomicBool::new(true)),
69            });
70        }
71
72        let jobs_pool = Self::create_isolated_pool(&config.url, config.pools.jobs.as_ref()).await?;
73        let observability_pool =
74            Self::create_isolated_pool(&config.url, config.pools.observability.as_ref()).await?;
75        let analytics_pool =
76            Self::create_isolated_pool(&config.url, config.pools.analytics.as_ref()).await?;
77
78        Ok(Self {
79            primary: Arc::new(primary),
80            replicas: Arc::new(replicas),
81            config: config.clone(),
82            replica_counter: Arc::new(AtomicUsize::new(0)),
83            jobs_pool,
84            observability_pool,
85            analytics_pool,
86        })
87    }
88
89    async fn create_pool(url: &str, size: u32, timeout_secs: u64) -> sqlx::Result<PgPool> {
90        PgPoolOptions::new()
91            .max_connections(size)
92            .acquire_timeout(Duration::from_secs(timeout_secs))
93            .connect(url)
94            .await
95    }
96
97    async fn create_isolated_pool(
98        url: &str,
99        config: Option<&PoolConfig>,
100    ) -> Result<Option<Arc<PgPool>>> {
101        let Some(cfg) = config else {
102            return Ok(None);
103        };
104        let pool = Self::create_pool(url, cfg.size, cfg.timeout_secs)
105            .await
106            .map_err(|e| ForgeError::Database(format!("Failed to create isolated pool: {}", e)))?;
107        Ok(Some(Arc::new(pool)))
108    }
109
110    /// Get the primary pool for writes.
111    pub fn primary(&self) -> &PgPool {
112        &self.primary
113    }
114
115    /// Get a pool for reads. Skips unhealthy replicas, falls back to primary.
116    pub fn read_pool(&self) -> &PgPool {
117        if !self.config.read_from_replica || self.replicas.is_empty() {
118            return &self.primary;
119        }
120
121        let len = self.replicas.len();
122        let start = self.replica_counter.fetch_add(1, Ordering::Relaxed) % len;
123
124        // Try each replica starting from round-robin position
125        for offset in 0..len {
126            let idx = (start + offset) % len;
127            if let Some(entry) = self.replicas.get(idx)
128                && entry.healthy.load(Ordering::Relaxed)
129            {
130                return &entry.pool;
131            }
132        }
133
134        // All replicas unhealthy, fall back to primary
135        &self.primary
136    }
137
138    /// Pool for background jobs, cron, daemons, and workflows.
139    /// Falls back to primary if no isolated pool is configured.
140    pub fn jobs_pool(&self) -> &PgPool {
141        self.jobs_pool.as_deref().unwrap_or(&self.primary)
142    }
143
144    /// Pool for observability writes (metrics, slow query logs).
145    /// Falls back to primary if no isolated pool is configured.
146    pub fn observability_pool(&self) -> &PgPool {
147        self.observability_pool.as_deref().unwrap_or(&self.primary)
148    }
149
150    /// Pool for long-running analytics queries.
151    /// Falls back to primary if no isolated pool is configured.
152    pub fn analytics_pool(&self) -> &PgPool {
153        self.analytics_pool.as_deref().unwrap_or(&self.primary)
154    }
155
156    /// Start background health monitoring for replicas. Returns None if no replicas configured.
157    pub fn start_health_monitor(&self) -> Option<JoinHandle<()>> {
158        if self.replicas.is_empty() {
159            return None;
160        }
161
162        let replicas = Arc::clone(&self.replicas);
163        let handle = tokio::spawn(async move {
164            let mut interval = tokio::time::interval(Duration::from_secs(15));
165            loop {
166                interval.tick().await;
167                for entry in replicas.iter() {
168                    let ok = sqlx::query("SELECT 1")
169                        .execute(entry.pool.as_ref())
170                        .await
171                        .is_ok();
172                    let was_healthy = entry.healthy.swap(ok, Ordering::Relaxed);
173                    if was_healthy && !ok {
174                        tracing::warn!("Replica marked unhealthy");
175                    } else if !was_healthy && ok {
176                        tracing::info!("Replica recovered");
177                    }
178                }
179            }
180        });
181
182        Some(handle)
183    }
184
185    /// Create a Database wrapper from an existing pool (for testing).
186    #[cfg(test)]
187    pub fn from_pool(pool: PgPool) -> Self {
188        Self {
189            primary: Arc::new(pool),
190            replicas: Arc::new(Vec::new()),
191            config: DatabaseConfig::default(),
192            replica_counter: Arc::new(AtomicUsize::new(0)),
193            jobs_pool: None,
194            observability_pool: None,
195            analytics_pool: None,
196        }
197    }
198
199    /// Check database connectivity.
200    pub async fn health_check(&self) -> Result<()> {
201        sqlx::query("SELECT 1")
202            .execute(self.primary.as_ref())
203            .await
204            .map_err(|e| ForgeError::Database(format!("Health check failed: {}", e)))?;
205        Ok(())
206    }
207
208    /// Close all connections gracefully.
209    pub async fn close(&self) {
210        self.primary.close().await;
211        for entry in self.replicas.iter() {
212            entry.pool.close().await;
213        }
214        if let Some(ref p) = self.jobs_pool {
215            p.close().await;
216        }
217        if let Some(ref p) = self.observability_pool {
218            p.close().await;
219        }
220        if let Some(ref p) = self.analytics_pool {
221            p.close().await;
222        }
223    }
224}
225
226/// Type alias for the pool type.
227pub type DatabasePool = PgPool;
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232
233    #[test]
234    fn test_database_config_clone() {
235        let config = DatabaseConfig::new("postgres://localhost/test");
236
237        let cloned = config.clone();
238        assert_eq!(cloned.url(), config.url());
239        assert_eq!(cloned.pool_size, config.pool_size);
240    }
241}