Skip to main content

forge_runtime/pg/
pool.rs

1//! PostgreSQL connection pool management.
2//!
3//! # Single-pool concurrency model
4//!
5//! Forge runs every workload (gateway requests, job/cron workers, the
6//! workflow scheduler, the reactor, periodic cleanup, signals batch inserts)
7//! against a single primary pool. There are no per-workload pool slots and
8//! none are coming back: the previous multi-pool layout (`pools.jobs`,
9//! `pools.observability`, `pools.analytics`) was removed in Phase 2 because
10//! split pools made the connection budget impossible to reason about and
11//! produced nasty starvation modes when one pool was full and another idle.
12//!
13//! The single pool is fairer in steady state but exposes one real risk: a
14//! burst of slow background work can drain the budget that gateway requests
15//! need. Doctrine still says one pool, so the framework manages contention
16//! by throttling at the workload level instead:
17//!
18//! - **Job/cron worker**: capped at `worker.max_concurrent` running tasks
19//!   ([`crate::jobs::worker::WorkerConfig::max_concurrent`]). Each task
20//!   holds at most one connection at a time, so this directly bounds the
21//!   slice of the pool that workers can take.
22//! - **Reactor re-execution**: capped at the realtime semaphore
23//!   ([`crate::realtime::reactor`] uses a `Semaphore` sized to the configured
24//!   max). Live re-execution can't oversubscribe the pool beyond that.
25//! - **Cleanup crons** (refresh-token purge, oauth-code purge, expired-job
26//!   delete, invalidation purge, etc.) route through the same worker
27//!   semaphore via cron-as-job, so they share the worker cap.
28//! - **Workflow execution**: durable workflows run inside `$workflow_resume`
29//!   jobs, inheriting the worker semaphore transitively. The workflow
30//!   scheduler itself only polls and dispatches.
31//! - **Gateway requests**: not throttled at the pool level. They take from
32//!   whatever the workers and reactor have left.
33//!
34//! ## Persistent connection holders
35//!
36//! Several runtime components hold a connection for the process lifetime
37//! rather than acquiring per-operation. These come out of the pool budget
38//! the moment the runtime starts and never return until shutdown:
39//!
40//! - Change listener (`LISTEN forge_changes`), one connection per node.
41//! - Workflow scheduler listener (`LISTEN forge_workflow_wakeup`), one per
42//!   node when workflows are registered.
43//! - Job worker wakeup listener (`LISTEN forge_jobs_available`), one per worker.
44//! - Leader election lock holder, one per leader role this node owns. A node
45//!   that wins both the cron and signals leader roles holds two.
46//!
47//! Plan for two to three persistent connections per node in steady state,
48//! plus one per leader role this node may win.
49//!
50//! ## Sizing formula
51//!
52//! The recommended `database.pool_size` is:
53//!
54//! ```text
55//! pool_size >= worker.max_concurrent
56//!            + realtime.max_concurrent_reexecution
57//!            + expected_concurrent_gateway_requests
58//!            + ~6   // persistent listeners, leader holds, migrations, health check
59//! ```
60//!
61//! For default worker (14: 8 default + 4 workflows + 2 cron) and reactor
62//! (64) caps the internal baseline alone is 84 connections. The 100-connection
63//! default covers the internal baseline plus ~16 concurrent gateway requests.
64//! Larger deployments should set `pool_size` explicitly.
65//!
66//! Note that gateway's `max_connections` (default 512) is the request-level
67//! concurrency cap, not the pool-level one. A real deployment rarely sees
68//! 512 concurrent in-flight DB queries because most requests are cached or
69//! short-lived. Size by *expected concurrent active queries*, not by the
70//! gateway cap.
71//!
72//! ## What's NOT gated
73//!
74//! - Migration runner (one-shot at startup, takes one connection).
75//! - Replica health monitor (one query every 15s, brief).
76//! - Signals event batch flush (one task, batches via UNNEST so each insert
77//!   is one connection-trip even for large batches).
78//!
79//! ## When the model breaks
80//!
81//! If a production load test shows gateway acquisition timeouts under job
82//! load, the answer is to raise `pool_size` and/or lower
83//! `worker.max_concurrent`, not to introduce per-workload pools. Tagged
84//! semaphores at the acquire site will be added if a real workload is found
85//! that can't be sized via the existing knobs.
86
87use std::sync::Arc;
88use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
89use std::time::Duration;
90
91use sqlx::ConnectOptions;
92use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
93use tokio::sync::broadcast;
94use tokio::task::JoinHandle;
95use tracing::log::LevelFilter;
96
97use forge_core::config::DatabaseConfig;
98use forge_core::error::{ForgeError, Result};
99
100struct ReplicaEntry {
101    pool: Arc<PgPool>,
102    healthy: Arc<AtomicBool>,
103}
104
105/// Database connection wrapper with health-aware replica routing.
106#[derive(Clone)]
107pub struct Database {
108    primary: Arc<PgPool>,
109    replicas: Arc<Vec<ReplicaEntry>>,
110    config: DatabaseConfig,
111    replica_counter: Arc<AtomicUsize>,
112}
113
114impl Database {
115    pub async fn from_config(config: &DatabaseConfig) -> Result<Self> {
116        Self::from_config_with_service(config, "forge").await
117    }
118
119    /// Create a new database connection with a service name for tracing.
120    ///
121    /// The service name is set as PostgreSQL's `application_name`, visible in
122    /// `pg_stat_activity` for correlating queries to the originating service.
123    pub async fn from_config_with_service(
124        config: &DatabaseConfig,
125        service_name: &str,
126    ) -> Result<Self> {
127        if config.url.is_empty() {
128            return Err(ForgeError::internal(
129                "database.url cannot be empty. Provide a PostgreSQL connection URL.",
130            ));
131        }
132
133        let primary = Self::create_pool_with_statement_timeout(
134            &config.url,
135            config.pool_size,
136            config.min_pool_size,
137            config.pool_timeout.as_secs(),
138            config.statement_timeout.as_secs(),
139            config.test_before_acquire,
140            service_name,
141        )
142        .await
143        .map_err(|e| ForgeError::internal_with("Failed to connect to primary", e))?;
144
145        verify_postgres_version(&primary, "primary").await?;
146        detect_pgbouncer(&primary).await?;
147
148        let mut replicas = Vec::new();
149        for replica_url in &config.replica_urls {
150            let pool = Self::create_pool(
151                replica_url,
152                config.replica_pool_size.unwrap_or(config.pool_size / 2),
153                config.pool_timeout.as_secs(),
154                service_name,
155            )
156            .await
157            .map_err(|e| ForgeError::internal_with("Failed to connect to replica", e))?;
158            verify_postgres_version(&pool, "replica").await?;
159            replicas.push(ReplicaEntry {
160                pool: Arc::new(pool),
161                healthy: Arc::new(AtomicBool::new(true)),
162            });
163        }
164
165        Ok(Self {
166            primary: Arc::new(primary),
167            replicas: Arc::new(replicas),
168            config: config.clone(),
169            replica_counter: Arc::new(AtomicUsize::new(0)),
170        })
171    }
172
173    fn connect_options(url: &str, service_name: &str) -> sqlx::Result<PgConnectOptions> {
174        let options: PgConnectOptions = url.parse()?;
175        Ok(options
176            .application_name(service_name)
177            .log_statements(LevelFilter::Off)
178            .log_slow_statements(LevelFilter::Warn, Duration::from_millis(500)))
179    }
180
181    fn connect_options_with_timeout(
182        url: &str,
183        service_name: &str,
184        statement_timeout_secs: u64,
185    ) -> sqlx::Result<PgConnectOptions> {
186        let options: PgConnectOptions = url.parse()?;
187        let mut opts = options
188            .application_name(service_name)
189            .log_statements(LevelFilter::Off)
190            .log_slow_statements(LevelFilter::Warn, Duration::from_millis(500));
191        if statement_timeout_secs > 0 {
192            opts = opts.options([("statement_timeout", &format!("{}s", statement_timeout_secs))]);
193        }
194        Ok(opts)
195    }
196
197    async fn create_pool(
198        url: &str,
199        size: u32,
200        timeout_secs: u64,
201        service_name: &str,
202    ) -> sqlx::Result<PgPool> {
203        Self::create_pool_with_opts(url, size, 0, timeout_secs, true, service_name).await
204    }
205
206    async fn create_pool_with_opts(
207        url: &str,
208        size: u32,
209        min_size: u32,
210        timeout_secs: u64,
211        test_before_acquire: bool,
212        service_name: &str,
213    ) -> sqlx::Result<PgPool> {
214        Self::create_pool_with_statement_timeout(
215            url,
216            size,
217            min_size,
218            timeout_secs,
219            0,
220            test_before_acquire,
221            service_name,
222        )
223        .await
224    }
225
226    async fn create_pool_with_statement_timeout(
227        url: &str,
228        size: u32,
229        min_size: u32,
230        timeout_secs: u64,
231        statement_timeout_secs: u64,
232        test_before_acquire: bool,
233        service_name: &str,
234    ) -> sqlx::Result<PgPool> {
235        let options = if statement_timeout_secs > 0 {
236            Self::connect_options_with_timeout(url, service_name, statement_timeout_secs)?
237        } else {
238            Self::connect_options(url, service_name)?
239        };
240        PgPoolOptions::new()
241            .max_connections(size)
242            .min_connections(min_size)
243            .acquire_timeout(Duration::from_secs(timeout_secs))
244            .test_before_acquire(test_before_acquire)
245            .connect_with(options)
246            .await
247    }
248
249    pub fn primary(&self) -> &PgPool {
250        &self.primary
251    }
252
253    /// Get a pool for reads. Skips unhealthy replicas, falls back to primary.
254    pub fn read_pool(&self) -> &PgPool {
255        if !self.config.read_from_replica || self.replicas.is_empty() {
256            return &self.primary;
257        }
258
259        let len = self.replicas.len();
260        let start = self.replica_counter.fetch_add(1, Ordering::Relaxed) % len;
261
262        for offset in 0..len {
263            let idx = (start + offset) % len;
264            if let Some(entry) = self.replicas.get(idx)
265                && entry.healthy.load(Ordering::Relaxed)
266            {
267                return &entry.pool;
268            }
269        }
270
271        &self.primary
272    }
273
274    /// Start background health monitoring for replicas. Returns None when no
275    /// replicas are configured. The task exits when `shutdown_rx` receives a
276    /// message or the broadcast channel closes, so callers can join the handle
277    /// during graceful shutdown without leaking the loop.
278    pub fn start_health_monitor(
279        &self,
280        mut shutdown_rx: broadcast::Receiver<()>,
281    ) -> Option<JoinHandle<()>> {
282        if self.replicas.is_empty() {
283            return None;
284        }
285
286        let replicas = Arc::clone(&self.replicas);
287        let handle = tokio::spawn(async move {
288            let mut interval = tokio::time::interval(Duration::from_secs(15));
289            loop {
290                tokio::select! {
291                    _ = shutdown_rx.recv() => {
292                        tracing::debug!("Replica health monitor shutting down");
293                        break;
294                    }
295                    _ = interval.tick() => {
296                        for entry in replicas.iter() {
297                            let ok = sqlx::query_scalar!("SELECT 1 as \"v!\"")
298                                .fetch_one(entry.pool.as_ref())
299                                .await
300                                .is_ok();
301                            let was_healthy = entry.healthy.swap(ok, Ordering::Relaxed);
302                            if was_healthy && !ok {
303                                tracing::warn!("Replica marked unhealthy");
304                            } else if !was_healthy && ok {
305                                tracing::info!("Replica recovered");
306                            }
307                        }
308                    }
309                }
310            }
311        });
312
313        Some(handle)
314    }
315
316    /// Create a Database wrapper from an existing pool. Hidden from rustdoc:
317    /// production code goes through `from_config`, this exists for in-crate
318    /// tests and external integration harnesses that already own a `PgPool`.
319    #[doc(hidden)]
320    pub fn from_pool(pool: PgPool) -> Self {
321        Self {
322            primary: Arc::new(pool),
323            replicas: Arc::new(Vec::new()),
324            config: DatabaseConfig::default(),
325            replica_counter: Arc::new(AtomicUsize::new(0)),
326        }
327    }
328
329    pub async fn health_check(&self) -> Result<()> {
330        sqlx::query_scalar!("SELECT 1 as \"v!\"")
331            .fetch_one(self.primary.as_ref())
332            .await
333            .map_err(|e| ForgeError::internal_with("Health check failed", e))?;
334        Ok(())
335    }
336
337    pub async fn close(&self) {
338        self.primary.close().await;
339        for entry in self.replicas.iter() {
340            entry.pool.close().await;
341        }
342    }
343}
344
345/// Minimum supported PostgreSQL major version.
346///
347/// Forge v0.9+ uses features (skip-locked semantics with `NOWAIT`, partitioned
348/// table SET ACCESS METHOD, `pg_stat_statements.toplevel`, generated-column
349/// stored arrays of UUID, NOTIFY queue introspection via
350/// `pg_notification_queue_usage()`) that all require PostgreSQL 18 or newer.
351/// We hard-fail at startup so deployments don't discover an incompatibility
352/// at runtime under load.
353pub const MIN_POSTGRES_MAJOR: i32 = 18;
354
355/// Read the server's `server_version_num` and refuse to continue when the
356/// major version is below [`MIN_POSTGRES_MAJOR`]. The check uses a temporary
357/// connection from the supplied pool, so it works against both primary and
358/// replica before they're handed to the rest of the system.
359async fn verify_postgres_version(pool: &PgPool, role: &str) -> Result<()> {
360    let row = sqlx::query_scalar!("SELECT current_setting('server_version_num')")
361        .fetch_one(pool)
362        .await
363        .map_err(|e| {
364            ForgeError::internal(format!(
365                "Failed to read PostgreSQL server_version_num from {}: {}",
366                role, e
367            ))
368        })?;
369
370    let version_num: i32 = row
371        .ok_or_else(|| {
372            ForgeError::internal(format!(
373                "PostgreSQL {} returned NULL for server_version_num",
374                role
375            ))
376        })?
377        .parse()
378        .map_err(|e| {
379            ForgeError::internal(format!(
380                "Could not parse PostgreSQL server_version_num from {}: {}",
381                role, e
382            ))
383        })?;
384
385    let major = version_num / 10_000;
386    if major < MIN_POSTGRES_MAJOR {
387        return Err(ForgeError::internal(format!(
388            "PostgreSQL {} is at version {} but Forge requires {} or newer. \
389             See https://forge.dev/scale/hosting for supported versions.",
390            role, major, MIN_POSTGRES_MAJOR
391        )));
392    }
393    tracing::debug!(role, postgres_major = major, "PostgreSQL version verified");
394    Ok(())
395}
396
397/// Detect PgBouncer proxies and refuse to start when one is found.
398///
399/// PgBouncer in transaction-pooling mode breaks two Forge primitives:
400///   1. `pg_try_advisory_lock` — advisory locks are per-connection; with
401///      transaction pooling the connection changes between calls so the lock
402///      is silently lost after each transaction boundary.
403///   2. `LISTEN/NOTIFY` — persistent listeners require a stable connection;
404///      PgBouncer can reassign or close the underlying connection without
405///      Forge's listener task knowing, causing silent change-event loss.
406///
407/// Session-pooling mode does keep the connection, but Forge cannot distinguish
408/// it from transaction mode reliably at runtime. If you need connection pooling,
409/// use `pgcat` in query-router mode or rely on SQLx's built-in pool, which
410/// already amortises connection overhead without the above drawbacks.
411///
412/// Detection strategy:
413///   - `pg_backend_pid()` always returns a positive integer on real Postgres.
414///     PgBouncer returns 0 in transaction-pooling mode.
415///   - `version()` on PgBouncer returns a string containing "PgBouncer".
416///
417/// Both checks run so a future PgBouncer version that fixes `pg_backend_pid`
418/// can still be detected via the version string.
419async fn detect_pgbouncer(pool: &PgPool) -> Result<()> {
420    // Runtime query: this is a one-time startup introspection call, not a
421    // user-data query. Using sqlx::query() here is intentional and
422    // correct — we cannot use sqlx::query_scalar! because the .sqlx/ cache
423    // is built in SQLX_OFFLINE mode and this query should not appear in the
424    // cache (it is never called after startup).
425    #[allow(clippy::disallowed_methods)]
426    let (backend_pid, version_str): (i32, String) =
427        sqlx::query_as("SELECT pg_backend_pid(), version()")
428            .fetch_one(pool)
429            .await
430            .map_err(|e| {
431                ForgeError::internal(format!(
432                    "PgBouncer detection query failed: {}. \
433                     Forge requires a direct PostgreSQL connection.",
434                    e
435                ))
436            })?;
437    let version_lower = version_str.to_lowercase();
438
439    if backend_pid == 0 || version_lower.contains("pgbouncer") {
440        return Err(ForgeError::config(
441            "Forge detected a PgBouncer proxy in the connection path. \
442             Forge requires direct PostgreSQL connections because it relies on \
443             `pg_try_advisory_lock` (for leader election) and persistent `LISTEN/NOTIFY` \
444             listeners (for real-time reactivity). Both break under PgBouncer's transaction \
445             pooling mode. Connect directly to PostgreSQL, or use a session-level pooler \
446             that preserves connection identity (e.g. pgcat in query-router mode).",
447        ));
448    }
449
450    tracing::debug!(
451        backend_pid,
452        "Direct PostgreSQL connection confirmed (no PgBouncer)"
453    );
454    Ok(())
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460
461    #[test]
462    fn test_database_config_clone() {
463        let config = DatabaseConfig::new("postgres://localhost/test");
464
465        let cloned = config.clone();
466        assert_eq!(cloned.url(), config.url());
467        assert_eq!(cloned.pool_size, config.pool_size);
468    }
469}