Skip to main content

qail_pg/driver/pool/
lifecycle.rs

1//! Pool lifecycle: PgPoolInner, PgPool core (connect, maintain, close),
2//! hot statement pre-prepare, and connection creation.
3
4use super::ScopedPoolFuture;
5use super::churn::{
6    PoolStats, decrement_active_count_saturating, pool_churn_record_destroy,
7    pool_churn_remaining_open, record_pool_connection_destroy,
8};
9use super::config::PoolConfig;
10use super::connection::PooledConn;
11use super::connection::PooledConnection;
12use super::gss::*;
13use crate::driver::{
14    ConnectOptions, PgConnection, PgError, PgResult, is_ignorable_session_message,
15    unexpected_backend_message,
16};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::time::{Duration, Instant};
20use tokio::sync::{Mutex, Semaphore};
21
22/// Maximum number of hot statements to track globally.
23pub(super) const MAX_HOT_STATEMENTS: usize = 32;
24
25/// Inner pool state (shared across clones).
26pub(super) struct PgPoolInner {
27    pub(super) config: PoolConfig,
28    pub(super) connections: Mutex<Vec<PooledConn>>,
29    pub(super) semaphore: Semaphore,
30    pub(super) closed: AtomicBool,
31    pub(super) active_count: AtomicUsize,
32    pub(super) total_created: AtomicUsize,
33    pub(super) leaked_cleanup_inflight: AtomicUsize,
34    /// Global registry of frequently-used prepared statements.
35    /// Maps sql_hash → (stmt_name, sql_text).
36    /// New connections pre-prepare these on checkout for instant cache hits.
37    pub(super) hot_statements: std::sync::RwLock<std::collections::HashMap<u64, (String, String)>>,
38}
39
40pub(super) fn handle_hot_preprepare_message(
41    msg: &crate::protocol::BackendMessage,
42    parse_complete_count: &mut usize,
43    error: &mut Option<PgError>,
44) -> PgResult<bool> {
45    match msg {
46        crate::protocol::BackendMessage::ParseComplete => {
47            *parse_complete_count += 1;
48            Ok(false)
49        }
50        crate::protocol::BackendMessage::ErrorResponse(err) => {
51            if error.is_none() {
52                *error = Some(PgError::QueryServer(err.clone().into()));
53            }
54            Ok(false)
55        }
56        crate::protocol::BackendMessage::ReadyForQuery(_) => Ok(true),
57        msg if is_ignorable_session_message(msg) => Ok(false),
58        other => Err(unexpected_backend_message("pool hot pre-prepare", other)),
59    }
60}
61
62impl PgPoolInner {
63    pub(super) async fn return_connection(&self, conn: PgConnection, created_at: Instant) {
64        decrement_active_count_saturating(&self.active_count);
65
66        if conn.is_io_desynced() {
67            tracing::warn!(
68                host = %self.config.host,
69                port = self.config.port,
70                user = %self.config.user,
71                db = %self.config.database,
72                "pool_return_desynced: dropping connection due to prior I/O/protocol desync"
73            );
74            record_pool_connection_destroy("pool_desynced_drop");
75            self.semaphore.add_permits(1);
76            pool_churn_record_destroy(&self.config, "return_desynced");
77            return;
78        }
79
80        if self.closed.load(Ordering::Relaxed) {
81            record_pool_connection_destroy("pool_closed_drop");
82            self.semaphore.add_permits(1);
83            return;
84        }
85
86        let mut connections = self.connections.lock().await;
87        if connections.len() < self.config.max_connections {
88            connections.push(PooledConn {
89                conn,
90                created_at,
91                last_used: Instant::now(),
92            });
93        } else {
94            record_pool_connection_destroy("pool_overflow_drop");
95        }
96
97        self.semaphore.add_permits(1);
98    }
99
100    /// Get a healthy connection from the pool, or None if pool is empty.
101    async fn get_healthy_connection(&self) -> Option<PooledConn> {
102        let mut connections = self.connections.lock().await;
103
104        while let Some(pooled) = connections.pop() {
105            if pooled.last_used.elapsed() > self.config.idle_timeout {
106                tracing::debug!(
107                    idle_secs = pooled.last_used.elapsed().as_secs(),
108                    timeout_secs = self.config.idle_timeout.as_secs(),
109                    "pool_checkout_evict: connection exceeded idle timeout"
110                );
111                record_pool_connection_destroy("idle_timeout_evict");
112                continue;
113            }
114
115            if let Some(max_life) = self.config.max_lifetime
116                && pooled.created_at.elapsed() > max_life
117            {
118                tracing::debug!(
119                    age_secs = pooled.created_at.elapsed().as_secs(),
120                    max_lifetime_secs = max_life.as_secs(),
121                    "pool_checkout_evict: connection exceeded max lifetime"
122                );
123                record_pool_connection_destroy("max_lifetime_evict");
124                continue;
125            }
126
127            return Some(pooled);
128        }
129
130        None
131    }
132}
133
134/// # Example
135/// ```ignore
136/// let config = PoolConfig::new("localhost", 5432, "user", "db")
137///     .password("secret")
138///     .max_connections(20);
139/// let pool = PgPool::connect(config).await?;
140/// // Get a connection from the pool
141/// let mut conn = pool.acquire_raw().await?;
142/// conn.simple_query("SELECT 1").await?;
143/// ```
144#[derive(Clone)]
145pub struct PgPool {
146    pub(super) inner: Arc<PgPoolInner>,
147}
148
149impl PgPool {
150    /// Create a pool from `qail.toml` (loads and parses automatically).
151    ///
152    /// # Example
153    /// ```ignore
154    /// let pool = PgPool::from_config().await?;
155    /// ```
156    pub async fn from_config() -> PgResult<Self> {
157        let qail = qail_core::config::QailConfig::load()
158            .map_err(|e| PgError::Connection(format!("Config error: {}", e)))?;
159        let config = PoolConfig::from_qail_config(&qail)?;
160        Self::connect(config).await
161    }
162
163    /// Create a new connection pool.
164    pub async fn connect(config: PoolConfig) -> PgResult<Self> {
165        validate_pool_config(&config)?;
166
167        // Semaphore starts with max_connections permits
168        let semaphore = Semaphore::new(config.max_connections);
169
170        let mut initial_connections = Vec::new();
171        for _ in 0..config.min_connections {
172            let conn = Self::create_connection(&config).await?;
173            initial_connections.push(PooledConn {
174                conn,
175                created_at: Instant::now(),
176                last_used: Instant::now(),
177            });
178        }
179
180        let initial_count = initial_connections.len();
181
182        let inner = Arc::new(PgPoolInner {
183            config,
184            connections: Mutex::new(initial_connections),
185            semaphore,
186            closed: AtomicBool::new(false),
187            active_count: AtomicUsize::new(0),
188            total_created: AtomicUsize::new(initial_count),
189            leaked_cleanup_inflight: AtomicUsize::new(0),
190            hot_statements: std::sync::RwLock::new(std::collections::HashMap::new()),
191        });
192
193        Ok(Self { inner })
194    }
195
196    /// Acquire a raw connection from the pool (crate-internal only).
197    ///
198    /// # Safety (not `unsafe` in the Rust sense, but security-critical)
199    ///
200    /// This returns a connection with **no RLS context**. All tenant data
201    /// queries on this connection will bypass row-level security.
202    ///
203    /// **Safe usage**: Pair with `fetch_all_with_rls()` for pipelined
204    /// RLS+query execution (single roundtrip). Or use `acquire_with_rls()`
205    /// / `acquire_with_rls_timeout()` for the 2-roundtrip path.
206    ///
207    /// **Unsafe usage**: Running queries directly on a raw connection
208    /// without RLS context. Every call site MUST include a `// SAFETY:`
209    /// comment explaining why raw acquisition is justified.
210    pub async fn acquire_raw(&self) -> PgResult<PooledConnection> {
211        if self.inner.closed.load(Ordering::Relaxed) {
212            return Err(PgError::PoolClosed);
213        }
214
215        if let Some(remaining) = pool_churn_remaining_open(&self.inner.config) {
216            metrics::counter!("qail_pg_pool_churn_circuit_reject_total").increment(1);
217            tracing::warn!(
218                host = %self.inner.config.host,
219                port = self.inner.config.port,
220                user = %self.inner.config.user,
221                db = %self.inner.config.database,
222                remaining_ms = remaining.as_millis() as u64,
223                "pool_connection_churn_circuit_open"
224            );
225            return Err(PgError::PoolExhausted {
226                max: self.inner.config.max_connections,
227            });
228        }
229
230        // Wait for available slot with timeout
231        let acquire_timeout = self.inner.config.acquire_timeout;
232        let permit =
233            match tokio::time::timeout(acquire_timeout, self.inner.semaphore.acquire()).await {
234                Ok(permit) => permit.map_err(|_| PgError::PoolClosed)?,
235                Err(_) => {
236                    metrics::counter!("qail_pg_pool_acquire_timeouts_total").increment(1);
237                    return Err(PgError::Timeout(format!(
238                        "pool acquire after {}s ({} max connections)",
239                        acquire_timeout.as_secs(),
240                        self.inner.config.max_connections
241                    )));
242                }
243            };
244
245        if self.inner.closed.load(Ordering::Relaxed) {
246            return Err(PgError::PoolClosed);
247        }
248
249        // Try to get existing healthy connection
250        let (mut conn, mut created_at) =
251            if let Some(pooled) = self.inner.get_healthy_connection().await {
252                (pooled.conn, pooled.created_at)
253            } else {
254                let conn = Self::create_connection(&self.inner.config).await?;
255                self.inner.total_created.fetch_add(1, Ordering::Relaxed);
256                (conn, Instant::now())
257            };
258
259        if self.inner.config.test_on_acquire
260            && let Err(e) = execute_simple_with_timeout(
261                &mut conn,
262                "SELECT 1",
263                self.inner.config.connect_timeout,
264                "pool checkout health check",
265            )
266            .await
267        {
268            tracing::warn!(
269                host = %self.inner.config.host,
270                port = self.inner.config.port,
271                user = %self.inner.config.user,
272                db = %self.inner.config.database,
273                error = %e,
274                "pool_health_check_failed: checkout probe failed, creating replacement connection"
275            );
276            pool_churn_record_destroy(&self.inner.config, "health_check_failed");
277            conn = Self::create_connection(&self.inner.config).await?;
278            self.inner.total_created.fetch_add(1, Ordering::Relaxed);
279            created_at = Instant::now();
280        }
281
282        // Pre-prepare hot statements that this connection doesn't have yet.
283        // Collect data synchronously (guard dropped before async work).
284        let missing: Vec<(u64, String, String)> = {
285            if let Ok(hot) = self.inner.hot_statements.read() {
286                hot.iter()
287                    .filter(|(hash, _)| !conn.stmt_cache.contains(hash))
288                    .map(|(hash, (name, sql))| (*hash, name.clone(), sql.clone()))
289                    .collect()
290            } else {
291                Vec::new()
292            }
293        }; // RwLockReadGuard dropped here — safe across .await
294
295        if !missing.is_empty() {
296            use crate::protocol::PgEncoder;
297            let mut buf = bytes::BytesMut::new();
298            for (_, name, sql) in &missing {
299                let parse_msg = PgEncoder::try_encode_parse(name, sql, &[])?;
300                buf.extend_from_slice(&parse_msg);
301            }
302            PgEncoder::encode_sync_to(&mut buf);
303            let preprepare_timeout = self.inner.config.connect_timeout;
304            let preprepare_result: PgResult<()> = match tokio::time::timeout(
305                preprepare_timeout,
306                async {
307                    conn.send_bytes(&buf).await?;
308                    // Drain responses and fail closed on any parse error.
309                    let mut parse_complete_count = 0usize;
310                    let mut parse_error: Option<PgError> = None;
311                    loop {
312                        let msg = conn.recv().await?;
313                        if handle_hot_preprepare_message(
314                            &msg,
315                            &mut parse_complete_count,
316                            &mut parse_error,
317                        )? {
318                            if let Some(err) = parse_error {
319                                return Err(err);
320                            }
321                            if parse_complete_count != missing.len() {
322                                return Err(PgError::Protocol(format!(
323                                    "hot pre-prepare completed with {} ParseComplete messages (expected {})",
324                                    parse_complete_count,
325                                    missing.len()
326                                )));
327                            }
328                            break;
329                        }
330                    }
331                    Ok::<(), PgError>(())
332                },
333            )
334            .await
335            {
336                Ok(res) => res,
337                Err(_) => Err(PgError::Timeout(format!(
338                    "hot statement pre-prepare timeout after {:?} (pool config connect_timeout)",
339                    preprepare_timeout
340                ))),
341            };
342
343            if let Err(e) = preprepare_result {
344                tracing::warn!(
345                    host = %self.inner.config.host,
346                    port = self.inner.config.port,
347                    user = %self.inner.config.user,
348                    db = %self.inner.config.database,
349                    timeout_ms = preprepare_timeout.as_millis() as u64,
350                    error = %e,
351                    "pool_hot_prepare_failed: replacing connection to avoid handing out uncertain protocol state"
352                );
353                pool_churn_record_destroy(&self.inner.config, "hot_prepare_failed");
354                conn = Self::create_connection(&self.inner.config).await?;
355                self.inner.total_created.fetch_add(1, Ordering::Relaxed);
356                created_at = Instant::now();
357            } else {
358                // Register in local cache
359                for (hash, name, sql) in &missing {
360                    conn.stmt_cache.put(*hash, name.clone());
361                    conn.prepared_statements.insert(name.clone(), sql.clone());
362                }
363            }
364        }
365
366        self.inner.active_count.fetch_add(1, Ordering::Relaxed);
367        // Permit is intentionally detached here; returned by `release()` / pool return.
368        permit.forget();
369
370        Ok(PooledConnection {
371            conn: Some(conn),
372            pool: self.inner.clone(),
373            rls_dirty: false,
374            created_at,
375        })
376    }
377
378    /// Acquire a connection with RLS context pre-configured.
379    ///
380    /// Sets PostgreSQL session variables for tenant isolation before
381    /// returning the connection. When the connection is dropped, it
382    /// automatically clears the RLS context before returning to the pool.
383    ///
384    /// # Example
385    /// ```ignore
386    /// use qail_core::rls::RlsContext;
387    ///
388    /// let mut conn = pool.acquire_with_rls(
389    ///     RlsContext::tenant("550e8400-e29b-41d4-a716-446655440000")
390    /// ).await?;
391    /// // All queries through `conn` are now scoped to this tenant
392    /// ```
393    pub async fn acquire_with_rls(
394        &self,
395        ctx: qail_core::rls::RlsContext,
396    ) -> PgResult<PooledConnection> {
397        // SAFETY: RLS context is set immediately below via context_to_sql().
398        let mut conn = self.acquire_raw().await?;
399
400        // Set RLS context on the raw connection
401        let sql = crate::driver::rls::context_to_sql(&ctx);
402        let pg_conn = conn.get_mut()?;
403        if let Err(e) = execute_simple_with_timeout(
404            pg_conn,
405            &sql,
406            self.inner.config.connect_timeout,
407            "pool acquire_with_rls setup",
408        )
409        .await
410        {
411            // Attempt recovery ROLLBACK to salvage the connection rather than
412            // letting Drop destroy it (which wastes a TCP connection).
413            if let Ok(pg_conn) = conn.get_mut() {
414                let _ = pg_conn.execute_simple("ROLLBACK").await;
415            }
416            conn.release().await;
417            return Err(e);
418        }
419
420        // Mark dirty so Drop resets context before pool return
421        conn.rls_dirty = true;
422
423        Ok(conn)
424    }
425
426    /// Scoped connection helper that guarantees `release()` after closure execution.
427    ///
428    /// Prefer this over manual `acquire_with_rls()` in normal request handlers.
429    pub async fn with_rls<T, F>(&self, ctx: qail_core::rls::RlsContext, f: F) -> PgResult<T>
430    where
431        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
432    {
433        let mut conn = self.acquire_with_rls(ctx).await?;
434        let out = f(&mut conn).await;
435        conn.release().await;
436        out
437    }
438
439    /// Scoped helper for system-level operations (`RlsContext::empty()`).
440    pub async fn with_system<T, F>(&self, f: F) -> PgResult<T>
441    where
442        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
443    {
444        self.with_rls(qail_core::rls::RlsContext::empty(), f).await
445    }
446
447    /// Scoped helper for global/platform row access (`tenant_id IS NULL`).
448    pub async fn with_global<T, F>(&self, f: F) -> PgResult<T>
449    where
450        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
451    {
452        self.with_rls(qail_core::rls::RlsContext::global(), f).await
453    }
454
455    /// Scoped helper for single-tenant access.
456    pub async fn with_tenant<T, F>(&self, tenant_id: &str, f: F) -> PgResult<T>
457    where
458        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
459    {
460        self.with_rls(qail_core::rls::RlsContext::tenant(tenant_id), f)
461            .await
462    }
463
464    /// Acquire a connection with RLS context AND statement timeout.
465    ///
466    /// Like `acquire_with_rls()`, but also sets `statement_timeout` to prevent
467    /// runaway queries from holding pool connections indefinitely.
468    pub async fn acquire_with_rls_timeout(
469        &self,
470        ctx: qail_core::rls::RlsContext,
471        timeout_ms: u32,
472    ) -> PgResult<PooledConnection> {
473        // SAFETY: RLS context + timeout set immediately below via context_to_sql_with_timeout().
474        let mut conn = self.acquire_raw().await?;
475
476        // Set RLS context + statement_timeout atomically
477        let sql = crate::driver::rls::context_to_sql_with_timeout(&ctx, timeout_ms);
478        let pg_conn = conn.get_mut()?;
479        if let Err(e) = execute_simple_with_timeout(
480            pg_conn,
481            &sql,
482            self.inner.config.connect_timeout,
483            "pool acquire_with_rls_timeout setup",
484        )
485        .await
486        {
487            if let Ok(pg_conn) = conn.get_mut() {
488                let _ = pg_conn.execute_simple("ROLLBACK").await;
489            }
490            conn.release().await;
491            return Err(e);
492        }
493
494        // Mark dirty so Drop resets context + timeout before pool return
495        conn.rls_dirty = true;
496
497        Ok(conn)
498    }
499
500    /// Scoped connection helper that guarantees `release()` after closure execution.
501    pub async fn with_rls_timeout<T, F>(
502        &self,
503        ctx: qail_core::rls::RlsContext,
504        timeout_ms: u32,
505        f: F,
506    ) -> PgResult<T>
507    where
508        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
509    {
510        let mut conn = self.acquire_with_rls_timeout(ctx, timeout_ms).await?;
511        let out = f(&mut conn).await;
512        conn.release().await;
513        out
514    }
515
516    /// Acquire a connection with RLS context, statement timeout, AND lock timeout.
517    ///
518    /// Like `acquire_with_rls_timeout()`, but also sets `lock_timeout` to prevent
519    /// queries from blocking indefinitely on row/table locks.
520    /// When `lock_timeout_ms` is 0, the lock_timeout clause is omitted.
521    pub async fn acquire_with_rls_timeouts(
522        &self,
523        ctx: qail_core::rls::RlsContext,
524        statement_timeout_ms: u32,
525        lock_timeout_ms: u32,
526    ) -> PgResult<PooledConnection> {
527        // SAFETY: RLS context + timeouts set immediately below via context_to_sql_with_timeouts().
528        let mut conn = self.acquire_raw().await?;
529
530        let sql = crate::driver::rls::context_to_sql_with_timeouts(
531            &ctx,
532            statement_timeout_ms,
533            lock_timeout_ms,
534        );
535        let pg_conn = conn.get_mut()?;
536        if let Err(e) = execute_simple_with_timeout(
537            pg_conn,
538            &sql,
539            self.inner.config.connect_timeout,
540            "pool acquire_with_rls_timeouts setup",
541        )
542        .await
543        {
544            if let Ok(pg_conn) = conn.get_mut() {
545                let _ = pg_conn.execute_simple("ROLLBACK").await;
546            }
547            conn.release().await;
548            return Err(e);
549        }
550
551        conn.rls_dirty = true;
552
553        Ok(conn)
554    }
555
556    /// Scoped connection helper that guarantees `release()` after closure execution.
557    pub async fn with_rls_timeouts<T, F>(
558        &self,
559        ctx: qail_core::rls::RlsContext,
560        statement_timeout_ms: u32,
561        lock_timeout_ms: u32,
562        f: F,
563    ) -> PgResult<T>
564    where
565        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
566    {
567        let mut conn = self
568            .acquire_with_rls_timeouts(ctx, statement_timeout_ms, lock_timeout_ms)
569            .await?;
570        let out = f(&mut conn).await;
571        conn.release().await;
572        out
573    }
574
575    /// Acquire a connection for system-level operations (no tenant context).
576    ///
577    /// Sets RLS session variables to maximally restrictive values:
578    /// - `app.current_tenant_id = ''`
579    /// - `app.current_agent_id = ''`  
580    /// - `app.is_super_admin = false`
581    ///
582    /// Use this for startup introspection, migrations, and health checks
583    /// that must not operate within any tenant scope.
584    pub async fn acquire_system(&self) -> PgResult<PooledConnection> {
585        let ctx = qail_core::rls::RlsContext::empty();
586        self.acquire_with_rls(ctx).await
587    }
588
589    /// Acquire a connection scoped to global/platform rows.
590    ///
591    /// Shorthand for `acquire_with_rls(RlsContext::global())`.
592    /// Use this for shared reference data (for example: currencies, ports,
593    /// vessel types) stored as `tenant_id IS NULL`.
594    pub async fn acquire_global(&self) -> PgResult<PooledConnection> {
595        self.acquire_with_rls(qail_core::rls::RlsContext::global())
596            .await
597    }
598
599    /// Acquire a connection scoped to a specific tenant.
600    ///
601    /// Shorthand for `acquire_with_rls(RlsContext::tenant(tenant_id))`.
602    /// Use this when you already know the tenant UUID and want a
603    /// tenant-scoped connection in a single call.
604    ///
605    /// # Example
606    /// ```ignore
607    /// let mut conn = pool.acquire_for_tenant("550e8400-...").await?;
608    /// // All queries through `conn` are now scoped to this tenant
609    /// ```
610    pub async fn acquire_for_tenant(&self, tenant_id: &str) -> PgResult<PooledConnection> {
611        self.acquire_with_rls(qail_core::rls::RlsContext::tenant(tenant_id))
612            .await
613    }
614
615    /// Acquire a connection with branch context pre-configured.
616    ///
617    /// Sets PostgreSQL session variable `app.branch_id` for data virtualization.
618    /// When the connection is dropped, it automatically clears the branch context.
619    ///
620    /// # Example
621    /// ```ignore
622    /// use qail_core::branch::BranchContext;
623    ///
624    /// let ctx = BranchContext::branch("feature-auth");
625    /// let mut conn = pool.acquire_with_branch(&ctx).await?;
626    /// // All queries through `conn` are now branch-aware
627    /// ```
628    pub async fn acquire_with_branch(
629        &self,
630        ctx: &qail_core::branch::BranchContext,
631    ) -> PgResult<PooledConnection> {
632        // SAFETY: Branch context is set immediately below via branch_context_sql().
633        let mut conn = self.acquire_raw().await?;
634
635        if let Some(branch_name) = ctx.branch_name() {
636            let sql = crate::driver::branch_sql::branch_context_sql(branch_name);
637            let pg_conn = conn.get_mut()?;
638            if let Err(e) = execute_simple_with_timeout(
639                pg_conn,
640                &sql,
641                self.inner.config.connect_timeout,
642                "pool acquire_with_branch setup",
643            )
644            .await
645            {
646                if let Ok(pg_conn) = conn.get_mut() {
647                    let _ = pg_conn.execute_simple("ROLLBACK").await;
648                }
649                conn.release().await;
650                return Err(e);
651            }
652            conn.rls_dirty = true; // Reuse dirty flag for auto-reset
653        }
654
655        Ok(conn)
656    }
657
658    /// Get the current number of idle connections.
659    pub async fn idle_count(&self) -> usize {
660        self.inner.connections.lock().await.len()
661    }
662
663    /// Get the number of connections currently in use.
664    pub fn active_count(&self) -> usize {
665        self.inner.active_count.load(Ordering::Relaxed)
666    }
667
668    /// Get the maximum number of connections.
669    pub fn max_connections(&self) -> usize {
670        self.inner.config.max_connections
671    }
672
673    /// Get comprehensive pool statistics.
674    pub async fn stats(&self) -> PoolStats {
675        let idle = self.inner.connections.lock().await.len();
676        let active = self.inner.active_count.load(Ordering::Relaxed);
677        let used_slots = self
678            .inner
679            .config
680            .max_connections
681            .saturating_sub(self.inner.semaphore.available_permits());
682        PoolStats {
683            active,
684            idle,
685            pending: used_slots.saturating_sub(active),
686            max_size: self.inner.config.max_connections,
687            total_created: self.inner.total_created.load(Ordering::Relaxed),
688        }
689    }
690
691    /// Check if the pool is closed.
692    pub fn is_closed(&self) -> bool {
693        self.inner.closed.load(Ordering::Relaxed)
694    }
695
696    /// Close the pool gracefully.
697    ///
698    /// Rejects new acquires immediately, then waits up to `acquire_timeout`
699    /// for in-flight connections to be released before dropping idle
700    /// connections. Connections released after closure are destroyed by
701    /// `return_connection` and not returned to the idle queue.
702    pub async fn close(&self) {
703        self.close_graceful(self.inner.config.acquire_timeout).await;
704    }
705
706    /// Close the pool gracefully with an explicit drain timeout.
707    pub async fn close_graceful(&self, drain_timeout: Duration) {
708        self.inner.closed.store(true, Ordering::Relaxed);
709        // Wake blocked acquires immediately so shutdown doesn't wait on acquire_timeout.
710        self.inner.semaphore.close();
711
712        let deadline = Instant::now() + drain_timeout;
713        loop {
714            let active = self.inner.active_count.load(Ordering::Relaxed);
715            if active == 0 {
716                break;
717            }
718            if Instant::now() >= deadline {
719                tracing::warn!(
720                    active_connections = active,
721                    timeout_ms = drain_timeout.as_millis() as u64,
722                    "pool_close_drain_timeout: forcing idle cleanup while active connections remain"
723                );
724                break;
725            }
726            tokio::time::sleep(Duration::from_millis(25)).await;
727        }
728
729        let mut connections = self.inner.connections.lock().await;
730        let dropped_idle = connections.len();
731        connections.clear();
732        tracing::info!(
733            dropped_idle_connections = dropped_idle,
734            active_connections = self.inner.active_count.load(Ordering::Relaxed),
735            "pool_closed"
736        );
737    }
738
739    /// Create a new connection using the pool configuration.
740    async fn create_connection(config: &PoolConfig) -> PgResult<PgConnection> {
741        if !config.auth_settings.has_any_password_method()
742            && config.mtls.is_none()
743            && config.password.is_some()
744        {
745            return Err(PgError::Auth(
746                "Invalid PoolConfig: all password auth methods are disabled".to_string(),
747            ));
748        }
749
750        let options = ConnectOptions {
751            tls_mode: config.tls_mode,
752            gss_enc_mode: config.gss_enc_mode,
753            tls_ca_cert_pem: config.tls_ca_cert_pem.clone(),
754            mtls: config.mtls.clone(),
755            gss_token_provider: config.gss_token_provider,
756            gss_token_provider_ex: config.gss_token_provider_ex.clone(),
757            auth: config.auth_settings,
758            startup_params: Vec::new(),
759        };
760
761        if let Some(remaining) = gss_circuit_remaining_open(config) {
762            metrics::counter!("qail_pg_gss_circuit_open_total").increment(1);
763            tracing::warn!(
764                host = %config.host,
765                port = config.port,
766                user = %config.user,
767                db = %config.database,
768                remaining_ms = remaining.as_millis() as u64,
769                "gss_connect_circuit_open"
770            );
771            return Err(PgError::Connection(format!(
772                "GSS connection circuit is open; retry after {:?}",
773                remaining
774            )));
775        }
776
777        let mut attempt = 0usize;
778        loop {
779            let connect_result = tokio::time::timeout(
780                config.connect_timeout,
781                PgConnection::connect_with_options(
782                    &config.host,
783                    config.port,
784                    &config.user,
785                    &config.database,
786                    config.password.as_deref(),
787                    options.clone(),
788                ),
789            )
790            .await;
791
792            let connect_result = match connect_result {
793                Ok(result) => result,
794                Err(_) => Err(PgError::Timeout(format!(
795                    "connect timeout after {:?} (pool config connect_timeout)",
796                    config.connect_timeout
797                ))),
798            };
799
800            match connect_result {
801                Ok(conn) => {
802                    metrics::counter!("qail_pg_pool_connect_success_total").increment(1);
803                    gss_circuit_record_success(config);
804                    return Ok(conn);
805                }
806                Err(err) if should_retry_gss_connect_error(config, attempt, &err) => {
807                    metrics::counter!("qail_pg_gss_connect_retries_total").increment(1);
808                    gss_circuit_record_failure(config);
809                    let delay = gss_retry_delay(config.gss_retry_base_delay, attempt);
810                    tracing::warn!(
811                        host = %config.host,
812                        port = config.port,
813                        user = %config.user,
814                        db = %config.database,
815                        attempt = attempt + 1,
816                        delay_ms = delay.as_millis() as u64,
817                        error = %err,
818                        "gss_connect_retry"
819                    );
820                    tokio::time::sleep(delay).await;
821                    attempt += 1;
822                }
823                Err(err) => {
824                    metrics::counter!("qail_pg_pool_connect_failures_total").increment(1);
825                    if should_track_gss_circuit_error(config, &err) {
826                        metrics::counter!("qail_pg_gss_connect_failures_total").increment(1);
827                        gss_circuit_record_failure(config);
828                    }
829                    return Err(err);
830                }
831            }
832        }
833    }
834
835    /// Run one maintenance cycle: evict stale idle connections and backfill
836    /// to `min_connections`. Called periodically by `spawn_pool_maintenance`.
837    pub async fn maintain(&self) {
838        if self.inner.closed.load(Ordering::Relaxed) {
839            return;
840        }
841
842        // Phase 1: Evict idle and expired connections from the pool.
843        let evicted = {
844            let mut connections = self.inner.connections.lock().await;
845            let before = connections.len();
846            connections.retain(|pooled| {
847                if pooled.last_used.elapsed() > self.inner.config.idle_timeout {
848                    record_pool_connection_destroy("idle_sweep_evict");
849                    return false;
850                }
851                if let Some(max_life) = self.inner.config.max_lifetime
852                    && pooled.created_at.elapsed() > max_life
853                {
854                    record_pool_connection_destroy("lifetime_sweep_evict");
855                    return false;
856                }
857                true
858            });
859            before - connections.len()
860        };
861
862        if evicted > 0 {
863            tracing::debug!(evicted, "pool_maintenance: evicted stale idle connections");
864        }
865
866        // Phase 2: Backfill to min_connections if below threshold.
867        let min = self.inner.config.min_connections;
868        if min == 0 {
869            return;
870        }
871
872        let idle_count = self.inner.connections.lock().await.len();
873        if idle_count >= min {
874            return;
875        }
876
877        let deficit = min - idle_count;
878        let mut created = 0usize;
879        for _ in 0..deficit {
880            match Self::create_connection(&self.inner.config).await {
881                Ok(conn) => {
882                    self.inner.total_created.fetch_add(1, Ordering::Relaxed);
883                    let mut connections = self.inner.connections.lock().await;
884                    if connections.len() < self.inner.config.max_connections {
885                        connections.push(PooledConn {
886                            conn,
887                            created_at: Instant::now(),
888                            last_used: Instant::now(),
889                        });
890                        created += 1;
891                    } else {
892                        // Pool filled by concurrent acquires; stop backfill.
893                        break;
894                    }
895                }
896                Err(e) => {
897                    tracing::warn!(error = %e, "pool_maintenance: backfill connection failed");
898                    break; // Transient failure — retry next cycle.
899                }
900            }
901        }
902
903        if created > 0 {
904            tracing::debug!(
905                created,
906                min_connections = min,
907                "pool_maintenance: backfilled idle connections"
908            );
909        }
910    }
911}
912
913/// Spawn a background task that periodically maintains pool health.
914///
915/// Runs every `idle_timeout / 2` (min 5s): evicts stale idle connections and
916/// backfills to `min_connections`. Call once after `PgPool::connect`.
917pub fn spawn_pool_maintenance(pool: PgPool) {
918    let interval_secs = std::cmp::max(pool.inner.config.idle_timeout.as_secs() / 2, 5);
919    tokio::spawn(async move {
920        let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
921        loop {
922            interval.tick().await;
923            if pool.is_closed() {
924                break;
925            }
926            pool.maintain().await;
927        }
928    });
929}
930
931pub(super) fn validate_pool_config(config: &PoolConfig) -> PgResult<()> {
932    if config.max_connections == 0 {
933        return Err(PgError::Connection(
934            "Invalid PoolConfig: max_connections must be >= 1".to_string(),
935        ));
936    }
937    if config.min_connections > config.max_connections {
938        return Err(PgError::Connection(format!(
939            "Invalid PoolConfig: min_connections ({}) must be <= max_connections ({})",
940            config.min_connections, config.max_connections
941        )));
942    }
943    if config.acquire_timeout.is_zero() {
944        return Err(PgError::Connection(
945            "Invalid PoolConfig: acquire_timeout must be > 0".to_string(),
946        ));
947    }
948    if config.connect_timeout.is_zero() {
949        return Err(PgError::Connection(
950            "Invalid PoolConfig: connect_timeout must be > 0".to_string(),
951        ));
952    }
953    if config.leaked_cleanup_queue == 0 {
954        return Err(PgError::Connection(
955            "Invalid PoolConfig: leaked_cleanup_queue must be >= 1".to_string(),
956        ));
957    }
958    Ok(())
959}
960
961pub(super) async fn execute_simple_with_timeout(
962    conn: &mut PgConnection,
963    sql: &str,
964    timeout: Duration,
965    operation: &str,
966) -> PgResult<()> {
967    match tokio::time::timeout(timeout, conn.execute_simple(sql)).await {
968        Ok(result) => result,
969        Err(_) => {
970            conn.mark_io_desynced();
971            Err(PgError::Timeout(format!(
972                "{} timeout after {:?} (pool config connect_timeout)",
973                operation, timeout
974            )))
975        }
976    }
977}