forge_runtime/pg/pool.rs
1//! PostgreSQL connection pool management.
2//!
3//! # Single-pool concurrency model
4//!
5//! Forge runs every workload (gateway requests, job/cron workers, the
6//! workflow scheduler, the reactor, periodic cleanup, signals batch inserts)
7//! against a single primary pool. There are no per-workload pool slots and
8//! none are coming back: the previous multi-pool layout (`pools.jobs`,
9//! `pools.observability`, `pools.analytics`) was removed in Phase 2 because
10//! split pools made the connection budget impossible to reason about and
11//! produced nasty starvation modes when one pool was full and another idle.
12//!
13//! The single pool is fairer in steady state but exposes one real risk: a
14//! burst of slow background work can drain the budget that gateway requests
15//! need. Doctrine still says one pool, so the framework manages contention
16//! by throttling at the workload level instead:
17//!
18//! - **Job/cron worker**: capped at `worker.max_concurrent` running tasks
19//! ([`crate::jobs::worker::WorkerConfig::max_concurrent`]). Each task
20//! holds at most one connection at a time, so this directly bounds the
21//! slice of the pool that workers can take.
22//! - **Reactor re-execution**: capped at the realtime semaphore
23//! ([`crate::realtime::reactor`] uses a `Semaphore` sized to the configured
24//! max). Live re-execution can't oversubscribe the pool beyond that.
25//! - **Cleanup crons** (refresh-token purge, oauth-code purge, expired-job
26//! delete, invalidation purge, etc.) route through the same worker
27//! semaphore via cron-as-job, so they share the worker cap.
28//! - **Workflow execution**: durable workflows run inside `$workflow_resume`
29//! jobs, inheriting the worker semaphore transitively. The workflow
30//! scheduler itself only polls and dispatches.
31//! - **Gateway requests**: not throttled at the pool level. They take from
32//! whatever the workers and reactor have left.
33//!
34//! ## Persistent connection holders
35//!
36//! Several runtime components hold a connection for the process lifetime
37//! rather than acquiring per-operation. These come out of the pool budget
38//! the moment the runtime starts and never return until shutdown:
39//!
40//! - Change listener (`LISTEN forge_changes`), one connection per node.
41//! - Workflow scheduler listener (`LISTEN forge_workflow_wakeup`), one per
42//! node when workflows are registered.
43//! - Job worker wakeup listener (`LISTEN forge_jobs_available`), one per worker.
44//! - Leader election lock holder, one per leader role this node owns. A node
45//! that wins both the cron and signals leader roles holds two.
46//!
47//! Plan for two to three persistent connections per node in steady state,
48//! plus one per leader role this node may win.
49//!
50//! ## Sizing formula
51//!
52//! The recommended `database.pool_size` is:
53//!
54//! ```text
55//! pool_size >= worker.max_concurrent
56//! + realtime.max_concurrent_reexecution
57//! + expected_concurrent_gateway_requests
58//! + ~6 // persistent listeners, leader holds, migrations, health check
59//! ```
60//!
61//! For default worker (14: 8 default + 4 workflows + 2 cron) and reactor
62//! (64) caps the internal baseline alone is 84 connections. The 100-connection
63//! default covers the internal baseline plus ~16 concurrent gateway requests.
64//! Larger deployments should set `pool_size` explicitly.
65//!
66//! Note that gateway's `max_connections` (default 512) is the request-level
67//! concurrency cap, not the pool-level one. A real deployment rarely sees
68//! 512 concurrent in-flight DB queries because most requests are cached or
69//! short-lived. Size by *expected concurrent active queries*, not by the
70//! gateway cap.
71//!
72//! ## What's NOT gated
73//!
74//! - Migration runner (one-shot at startup, takes one connection).
75//! - Replica health monitor (one query every 15s, brief).
76//! - Signals event batch flush (one task, batches via UNNEST so each insert
77//! is one connection-trip even for large batches).
78//!
79//! ## When the model breaks
80//!
81//! If a production load test shows gateway acquisition timeouts under job
82//! load, the answer is to raise `pool_size` and/or lower
83//! `worker.max_concurrent`, not to introduce per-workload pools. Tagged
84//! semaphores at the acquire site will be added if a real workload is found
85//! that can't be sized via the existing knobs.
86
87use std::sync::Arc;
88use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
89use std::time::Duration;
90
91use sqlx::ConnectOptions;
92use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
93use tokio::sync::broadcast;
94use tokio::task::JoinHandle;
95use tracing::log::LevelFilter;
96
97use forge_core::config::DatabaseConfig;
98use forge_core::error::{ForgeError, Result};
99
100struct ReplicaEntry {
101 pool: Arc<PgPool>,
102 healthy: Arc<AtomicBool>,
103}
104
105/// Database connection wrapper with health-aware replica routing.
106#[derive(Clone)]
107pub struct Database {
108 primary: Arc<PgPool>,
109 replicas: Arc<Vec<ReplicaEntry>>,
110 config: DatabaseConfig,
111 replica_counter: Arc<AtomicUsize>,
112}
113
114impl Database {
115 pub async fn from_config(config: &DatabaseConfig) -> Result<Self> {
116 Self::from_config_with_service(config, "forge").await
117 }
118
119 /// Create a new database connection with a service name for tracing.
120 ///
121 /// The service name is set as PostgreSQL's `application_name`, visible in
122 /// `pg_stat_activity` for correlating queries to the originating service.
123 pub async fn from_config_with_service(
124 config: &DatabaseConfig,
125 service_name: &str,
126 ) -> Result<Self> {
127 if config.url.is_empty() {
128 return Err(ForgeError::internal(
129 "database.url cannot be empty. Provide a PostgreSQL connection URL.",
130 ));
131 }
132
133 let primary = Self::create_pool_with_statement_timeout(
134 &config.url,
135 config.pool_size,
136 config.min_pool_size,
137 config.pool_timeout.as_secs(),
138 config.statement_timeout.as_secs(),
139 config.test_before_acquire,
140 service_name,
141 )
142 .await
143 .map_err(|e| ForgeError::internal_with("Failed to connect to primary", e))?;
144
145 verify_postgres_version(&primary, "primary").await?;
146 detect_pgbouncer(&primary).await?;
147
148 let mut replicas = Vec::new();
149 for replica_url in &config.replica_urls {
150 let pool = Self::create_pool(
151 replica_url,
152 config.replica_pool_size.unwrap_or(config.pool_size / 2),
153 config.pool_timeout.as_secs(),
154 service_name,
155 )
156 .await
157 .map_err(|e| ForgeError::internal_with("Failed to connect to replica", e))?;
158 verify_postgres_version(&pool, "replica").await?;
159 replicas.push(ReplicaEntry {
160 pool: Arc::new(pool),
161 healthy: Arc::new(AtomicBool::new(true)),
162 });
163 }
164
165 Ok(Self {
166 primary: Arc::new(primary),
167 replicas: Arc::new(replicas),
168 config: config.clone(),
169 replica_counter: Arc::new(AtomicUsize::new(0)),
170 })
171 }
172
173 fn connect_options(url: &str, service_name: &str) -> sqlx::Result<PgConnectOptions> {
174 let options: PgConnectOptions = url.parse()?;
175 Ok(options
176 .application_name(service_name)
177 .log_statements(LevelFilter::Off)
178 .log_slow_statements(LevelFilter::Warn, Duration::from_millis(500)))
179 }
180
181 fn connect_options_with_timeout(
182 url: &str,
183 service_name: &str,
184 statement_timeout_secs: u64,
185 ) -> sqlx::Result<PgConnectOptions> {
186 let options: PgConnectOptions = url.parse()?;
187 let mut opts = options
188 .application_name(service_name)
189 .log_statements(LevelFilter::Off)
190 .log_slow_statements(LevelFilter::Warn, Duration::from_millis(500));
191 if statement_timeout_secs > 0 {
192 opts = opts.options([("statement_timeout", &format!("{}s", statement_timeout_secs))]);
193 }
194 Ok(opts)
195 }
196
197 async fn create_pool(
198 url: &str,
199 size: u32,
200 timeout_secs: u64,
201 service_name: &str,
202 ) -> sqlx::Result<PgPool> {
203 Self::create_pool_with_opts(url, size, 0, timeout_secs, true, service_name).await
204 }
205
206 async fn create_pool_with_opts(
207 url: &str,
208 size: u32,
209 min_size: u32,
210 timeout_secs: u64,
211 test_before_acquire: bool,
212 service_name: &str,
213 ) -> sqlx::Result<PgPool> {
214 Self::create_pool_with_statement_timeout(
215 url,
216 size,
217 min_size,
218 timeout_secs,
219 0,
220 test_before_acquire,
221 service_name,
222 )
223 .await
224 }
225
226 async fn create_pool_with_statement_timeout(
227 url: &str,
228 size: u32,
229 min_size: u32,
230 timeout_secs: u64,
231 statement_timeout_secs: u64,
232 test_before_acquire: bool,
233 service_name: &str,
234 ) -> sqlx::Result<PgPool> {
235 let options = if statement_timeout_secs > 0 {
236 Self::connect_options_with_timeout(url, service_name, statement_timeout_secs)?
237 } else {
238 Self::connect_options(url, service_name)?
239 };
240 PgPoolOptions::new()
241 .max_connections(size)
242 .min_connections(min_size)
243 .acquire_timeout(Duration::from_secs(timeout_secs))
244 .test_before_acquire(test_before_acquire)
245 .connect_with(options)
246 .await
247 }
248
249 pub fn primary(&self) -> &PgPool {
250 &self.primary
251 }
252
253 /// Get a pool for reads. Skips unhealthy replicas, falls back to primary.
254 pub fn read_pool(&self) -> &PgPool {
255 if !self.config.read_from_replica || self.replicas.is_empty() {
256 return &self.primary;
257 }
258
259 let len = self.replicas.len();
260 let start = self.replica_counter.fetch_add(1, Ordering::Relaxed) % len;
261
262 for offset in 0..len {
263 let idx = (start + offset) % len;
264 if let Some(entry) = self.replicas.get(idx)
265 && entry.healthy.load(Ordering::Relaxed)
266 {
267 return &entry.pool;
268 }
269 }
270
271 &self.primary
272 }
273
274 /// Start background health monitoring for replicas. Returns None when no
275 /// replicas are configured. The task exits when `shutdown_rx` receives a
276 /// message or the broadcast channel closes, so callers can join the handle
277 /// during graceful shutdown without leaking the loop.
278 pub fn start_health_monitor(
279 &self,
280 mut shutdown_rx: broadcast::Receiver<()>,
281 ) -> Option<JoinHandle<()>> {
282 if self.replicas.is_empty() {
283 return None;
284 }
285
286 let replicas = Arc::clone(&self.replicas);
287 let handle = tokio::spawn(async move {
288 let mut interval = tokio::time::interval(Duration::from_secs(15));
289 loop {
290 tokio::select! {
291 _ = shutdown_rx.recv() => {
292 tracing::debug!("Replica health monitor shutting down");
293 break;
294 }
295 _ = interval.tick() => {
296 for entry in replicas.iter() {
297 let ok = sqlx::query_scalar!("SELECT 1 as \"v!\"")
298 .fetch_one(entry.pool.as_ref())
299 .await
300 .is_ok();
301 let was_healthy = entry.healthy.swap(ok, Ordering::Relaxed);
302 if was_healthy && !ok {
303 tracing::warn!("Replica marked unhealthy");
304 } else if !was_healthy && ok {
305 tracing::info!("Replica recovered");
306 }
307 }
308 }
309 }
310 }
311 });
312
313 Some(handle)
314 }
315
316 /// Create a Database wrapper from an existing pool. Hidden from rustdoc:
317 /// production code goes through `from_config`, this exists for in-crate
318 /// tests and external integration harnesses that already own a `PgPool`.
319 #[doc(hidden)]
320 pub fn from_pool(pool: PgPool) -> Self {
321 Self {
322 primary: Arc::new(pool),
323 replicas: Arc::new(Vec::new()),
324 config: DatabaseConfig::default(),
325 replica_counter: Arc::new(AtomicUsize::new(0)),
326 }
327 }
328
329 pub async fn health_check(&self) -> Result<()> {
330 sqlx::query_scalar!("SELECT 1 as \"v!\"")
331 .fetch_one(self.primary.as_ref())
332 .await
333 .map_err(|e| ForgeError::internal_with("Health check failed", e))?;
334 Ok(())
335 }
336
337 pub async fn close(&self) {
338 self.primary.close().await;
339 for entry in self.replicas.iter() {
340 entry.pool.close().await;
341 }
342 }
343}
344
345/// Minimum supported PostgreSQL major version.
346///
347/// Forge v0.9+ uses features (skip-locked semantics with `NOWAIT`, partitioned
348/// table SET ACCESS METHOD, `pg_stat_statements.toplevel`, generated-column
349/// stored arrays of UUID, NOTIFY queue introspection via
350/// `pg_notification_queue_usage()`) that all require PostgreSQL 18 or newer.
351/// We hard-fail at startup so deployments don't discover an incompatibility
352/// at runtime under load.
353pub const MIN_POSTGRES_MAJOR: i32 = 18;
354
355/// Read the server's `server_version_num` and refuse to continue when the
356/// major version is below [`MIN_POSTGRES_MAJOR`]. The check uses a temporary
357/// connection from the supplied pool, so it works against both primary and
358/// replica before they're handed to the rest of the system.
359async fn verify_postgres_version(pool: &PgPool, role: &str) -> Result<()> {
360 let row = sqlx::query_scalar!("SELECT current_setting('server_version_num')")
361 .fetch_one(pool)
362 .await
363 .map_err(|e| {
364 ForgeError::internal(format!(
365 "Failed to read PostgreSQL server_version_num from {}: {}",
366 role, e
367 ))
368 })?;
369
370 let version_num: i32 = row
371 .ok_or_else(|| {
372 ForgeError::internal(format!(
373 "PostgreSQL {} returned NULL for server_version_num",
374 role
375 ))
376 })?
377 .parse()
378 .map_err(|e| {
379 ForgeError::internal(format!(
380 "Could not parse PostgreSQL server_version_num from {}: {}",
381 role, e
382 ))
383 })?;
384
385 let major = version_num / 10_000;
386 if major < MIN_POSTGRES_MAJOR {
387 return Err(ForgeError::internal(format!(
388 "PostgreSQL {} is at version {} but Forge requires {} or newer. \
389 See https://forge.dev/scale/hosting for supported versions.",
390 role, major, MIN_POSTGRES_MAJOR
391 )));
392 }
393 tracing::debug!(role, postgres_major = major, "PostgreSQL version verified");
394 Ok(())
395}
396
397/// Detect PgBouncer proxies and refuse to start when one is found.
398///
399/// PgBouncer in transaction-pooling mode breaks two Forge primitives:
400/// 1. `pg_try_advisory_lock` — advisory locks are per-connection; with
401/// transaction pooling the connection changes between calls so the lock
402/// is silently lost after each transaction boundary.
403/// 2. `LISTEN/NOTIFY` — persistent listeners require a stable connection;
404/// PgBouncer can reassign or close the underlying connection without
405/// Forge's listener task knowing, causing silent change-event loss.
406///
407/// Session-pooling mode does keep the connection, but Forge cannot distinguish
408/// it from transaction mode reliably at runtime. If you need connection pooling,
409/// use `pgcat` in query-router mode or rely on SQLx's built-in pool, which
410/// already amortises connection overhead without the above drawbacks.
411///
412/// Detection strategy:
413/// - `pg_backend_pid()` always returns a positive integer on real Postgres.
414/// PgBouncer returns 0 in transaction-pooling mode.
415/// - `version()` on PgBouncer returns a string containing "PgBouncer".
416///
417/// Both checks run so a future PgBouncer version that fixes `pg_backend_pid`
418/// can still be detected via the version string.
419async fn detect_pgbouncer(pool: &PgPool) -> Result<()> {
420 // Runtime query: this is a one-time startup introspection call, not a
421 // user-data query. Using sqlx::query() here is intentional and
422 // correct — we cannot use sqlx::query_scalar! because the .sqlx/ cache
423 // is built in SQLX_OFFLINE mode and this query should not appear in the
424 // cache (it is never called after startup).
425 #[allow(clippy::disallowed_methods)]
426 let (backend_pid, version_str): (i32, String) =
427 sqlx::query_as("SELECT pg_backend_pid(), version()")
428 .fetch_one(pool)
429 .await
430 .map_err(|e| {
431 ForgeError::internal(format!(
432 "PgBouncer detection query failed: {}. \
433 Forge requires a direct PostgreSQL connection.",
434 e
435 ))
436 })?;
437 let version_lower = version_str.to_lowercase();
438
439 if backend_pid == 0 || version_lower.contains("pgbouncer") {
440 return Err(ForgeError::config(
441 "Forge detected a PgBouncer proxy in the connection path. \
442 Forge requires direct PostgreSQL connections because it relies on \
443 `pg_try_advisory_lock` (for leader election) and persistent `LISTEN/NOTIFY` \
444 listeners (for real-time reactivity). Both break under PgBouncer's transaction \
445 pooling mode. Connect directly to PostgreSQL, or use a session-level pooler \
446 that preserves connection identity (e.g. pgcat in query-router mode).",
447 ));
448 }
449
450 tracing::debug!(
451 backend_pid,
452 "Direct PostgreSQL connection confirmed (no PgBouncer)"
453 );
454 Ok(())
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460
461 #[test]
462 fn test_database_config_clone() {
463 let config = DatabaseConfig::new("postgres://localhost/test");
464
465 let cloned = config.clone();
466 assert_eq!(cloned.url(), config.url());
467 assert_eq!(cloned.pool_size, config.pool_size);
468 }
469}