Skip to main content

qail_pg/driver/pool/
lifecycle.rs

1//! Pool lifecycle: PgPoolInner, PgPool core (connect, maintain, close),
2//! hot statement pre-prepare, and connection creation.
3
4use super::ScopedPoolFuture;
5use super::churn::{
6    PoolStats, decrement_active_count_saturating, pool_churn_record_destroy,
7    pool_churn_remaining_open, record_pool_connection_destroy,
8};
9use super::config::PoolConfig;
10use super::connection::PooledConn;
11use super::connection::PooledConnection;
12use super::gss::*;
13use crate::driver::{
14    AstPipelineMode, AutoCountPath, AutoCountPlan, ConnectOptions, PgConnection, PgError, PgResult,
15    is_ignorable_session_message, unexpected_backend_message,
16};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::time::{Duration, Instant};
20use tokio::sync::{Mutex, Semaphore};
21use tokio::task::JoinSet;
22
23/// Maximum number of hot statements to track globally.
24pub(super) const MAX_HOT_STATEMENTS: usize = 32;
25
26/// Inner pool state (shared across clones).
27pub(super) struct PgPoolInner {
28    pub(super) config: PoolConfig,
29    pub(super) connections: Mutex<Vec<PooledConn>>,
30    pub(super) semaphore: Semaphore,
31    pub(super) closed: AtomicBool,
32    pub(super) active_count: AtomicUsize,
33    pub(super) total_created: AtomicUsize,
34    pub(super) leaked_cleanup_inflight: AtomicUsize,
35    /// Global registry of frequently-used prepared statements.
36    /// Maps sql_hash → (stmt_name, sql_text).
37    /// New connections pre-prepare these on checkout for instant cache hits.
38    pub(super) hot_statements: std::sync::RwLock<std::collections::HashMap<u64, (String, String)>>,
39}
40
41pub(super) fn handle_hot_preprepare_message(
42    msg: &crate::protocol::BackendMessage,
43    parse_complete_count: &mut usize,
44    error: &mut Option<PgError>,
45) -> PgResult<bool> {
46    match msg {
47        crate::protocol::BackendMessage::ParseComplete => {
48            *parse_complete_count += 1;
49            Ok(false)
50        }
51        crate::protocol::BackendMessage::ErrorResponse(err) => {
52            if error.is_none() {
53                *error = Some(PgError::QueryServer(err.clone().into()));
54            }
55            Ok(false)
56        }
57        crate::protocol::BackendMessage::ReadyForQuery(_) => Ok(true),
58        msg if is_ignorable_session_message(msg) => Ok(false),
59        other => Err(unexpected_backend_message("pool hot pre-prepare", other)),
60    }
61}
62
63impl PgPoolInner {
64    pub(super) async fn return_connection(&self, conn: PgConnection, created_at: Instant) {
65        decrement_active_count_saturating(&self.active_count);
66
67        if conn.is_io_desynced() {
68            tracing::warn!(
69                host = %self.config.host,
70                port = self.config.port,
71                user = %self.config.user,
72                db = %self.config.database,
73                "pool_return_desynced: dropping connection due to prior I/O/protocol desync"
74            );
75            record_pool_connection_destroy("pool_desynced_drop");
76            self.semaphore.add_permits(1);
77            pool_churn_record_destroy(&self.config, "return_desynced");
78            return;
79        }
80
81        if self.closed.load(Ordering::Relaxed) {
82            record_pool_connection_destroy("pool_closed_drop");
83            self.semaphore.add_permits(1);
84            return;
85        }
86
87        let mut connections = self.connections.lock().await;
88        if connections.len() < self.config.max_connections {
89            connections.push(PooledConn {
90                conn,
91                created_at,
92                last_used: Instant::now(),
93            });
94        } else {
95            record_pool_connection_destroy("pool_overflow_drop");
96        }
97
98        self.semaphore.add_permits(1);
99    }
100
101    /// Get a healthy connection from the pool, or None if pool is empty.
102    async fn get_healthy_connection(&self) -> Option<PooledConn> {
103        let mut connections = self.connections.lock().await;
104
105        while let Some(pooled) = connections.pop() {
106            if pooled.last_used.elapsed() > self.config.idle_timeout {
107                tracing::debug!(
108                    idle_secs = pooled.last_used.elapsed().as_secs(),
109                    timeout_secs = self.config.idle_timeout.as_secs(),
110                    "pool_checkout_evict: connection exceeded idle timeout"
111                );
112                record_pool_connection_destroy("idle_timeout_evict");
113                continue;
114            }
115
116            if let Some(max_life) = self.config.max_lifetime
117                && pooled.created_at.elapsed() > max_life
118            {
119                tracing::debug!(
120                    age_secs = pooled.created_at.elapsed().as_secs(),
121                    max_lifetime_secs = max_life.as_secs(),
122                    "pool_checkout_evict: connection exceeded max lifetime"
123                );
124                record_pool_connection_destroy("max_lifetime_evict");
125                continue;
126            }
127
128            return Some(pooled);
129        }
130
131        None
132    }
133}
134
135/// # Example
136/// ```ignore
137/// let config = PoolConfig::new("localhost", 5432, "user", "db")
138///     .password("secret")
139///     .max_connections(20);
140/// let pool = PgPool::connect(config).await?;
141/// // Get a connection from the pool
142/// let mut conn = pool.acquire_raw().await?;
143/// conn.simple_query("SELECT 1").await?;
144/// ```
145#[derive(Clone)]
146pub struct PgPool {
147    pub(super) inner: Arc<PgPoolInner>,
148}
149
150impl PgPool {
151    /// Create a pool from `qail.toml` (loads and parses automatically).
152    ///
153    /// # Example
154    /// ```ignore
155    /// let pool = PgPool::from_config().await?;
156    /// ```
157    pub async fn from_config() -> PgResult<Self> {
158        let qail = qail_core::config::QailConfig::load()
159            .map_err(|e| PgError::Connection(format!("Config error: {}", e)))?;
160        let config = PoolConfig::from_qail_config(&qail)?;
161        Self::connect(config).await
162    }
163
164    /// Create a new connection pool.
165    pub async fn connect(config: PoolConfig) -> PgResult<Self> {
166        validate_pool_config(&config)?;
167
168        // Semaphore starts with max_connections permits
169        let semaphore = Semaphore::new(config.max_connections);
170
171        let mut initial_connections = Vec::new();
172        for _ in 0..config.min_connections {
173            let conn = Self::create_connection(&config).await?;
174            initial_connections.push(PooledConn {
175                conn,
176                created_at: Instant::now(),
177                last_used: Instant::now(),
178            });
179        }
180
181        let initial_count = initial_connections.len();
182
183        let inner = Arc::new(PgPoolInner {
184            config,
185            connections: Mutex::new(initial_connections),
186            semaphore,
187            closed: AtomicBool::new(false),
188            active_count: AtomicUsize::new(0),
189            total_created: AtomicUsize::new(initial_count),
190            leaked_cleanup_inflight: AtomicUsize::new(0),
191            hot_statements: std::sync::RwLock::new(std::collections::HashMap::new()),
192        });
193
194        Ok(Self { inner })
195    }
196
197    /// Acquire a raw connection from the pool (crate-internal only).
198    ///
199    /// # Safety (not `unsafe` in the Rust sense, but security-critical)
200    ///
201    /// This returns a connection with **no RLS context**. All tenant data
202    /// queries on this connection will bypass row-level security.
203    ///
204    /// **Safe usage**: Pair with `fetch_all_with_rls()` for pipelined
205    /// RLS+query execution (single roundtrip). Or use `acquire_with_rls()`
206    /// / `acquire_with_rls_timeout()` for the 2-roundtrip path.
207    ///
208    /// **Unsafe usage**: Running queries directly on a raw connection
209    /// without RLS context. Every call site MUST include a `// SAFETY:`
210    /// comment explaining why raw acquisition is justified.
211    pub async fn acquire_raw(&self) -> PgResult<PooledConnection> {
212        if self.inner.closed.load(Ordering::Relaxed) {
213            return Err(PgError::PoolClosed);
214        }
215
216        if let Some(remaining) = pool_churn_remaining_open(&self.inner.config) {
217            metrics::counter!("qail_pg_pool_churn_circuit_reject_total").increment(1);
218            tracing::warn!(
219                host = %self.inner.config.host,
220                port = self.inner.config.port,
221                user = %self.inner.config.user,
222                db = %self.inner.config.database,
223                remaining_ms = remaining.as_millis() as u64,
224                "pool_connection_churn_circuit_open"
225            );
226            return Err(PgError::PoolExhausted {
227                max: self.inner.config.max_connections,
228            });
229        }
230
231        // Wait for available slot with timeout
232        let acquire_timeout = self.inner.config.acquire_timeout;
233        let permit =
234            match tokio::time::timeout(acquire_timeout, self.inner.semaphore.acquire()).await {
235                Ok(permit) => permit.map_err(|_| PgError::PoolClosed)?,
236                Err(_) => {
237                    metrics::counter!("qail_pg_pool_acquire_timeouts_total").increment(1);
238                    return Err(PgError::Timeout(format!(
239                        "pool acquire after {}s ({} max connections)",
240                        acquire_timeout.as_secs(),
241                        self.inner.config.max_connections
242                    )));
243                }
244            };
245
246        if self.inner.closed.load(Ordering::Relaxed) {
247            return Err(PgError::PoolClosed);
248        }
249
250        // Try to get existing healthy connection
251        let (mut conn, mut created_at) =
252            if let Some(pooled) = self.inner.get_healthy_connection().await {
253                (pooled.conn, pooled.created_at)
254            } else {
255                let conn = Self::create_connection(&self.inner.config).await?;
256                self.inner.total_created.fetch_add(1, Ordering::Relaxed);
257                (conn, Instant::now())
258            };
259
260        if self.inner.config.test_on_acquire
261            && let Err(e) = execute_simple_with_timeout(
262                &mut conn,
263                "SELECT 1",
264                self.inner.config.connect_timeout,
265                "pool checkout health check",
266            )
267            .await
268        {
269            tracing::warn!(
270                host = %self.inner.config.host,
271                port = self.inner.config.port,
272                user = %self.inner.config.user,
273                db = %self.inner.config.database,
274                error = %e,
275                "pool_health_check_failed: checkout probe failed, creating replacement connection"
276            );
277            pool_churn_record_destroy(&self.inner.config, "health_check_failed");
278            conn = Self::create_connection(&self.inner.config).await?;
279            self.inner.total_created.fetch_add(1, Ordering::Relaxed);
280            created_at = Instant::now();
281        }
282
283        // Pre-prepare hot statements that this connection doesn't have yet.
284        // Collect data synchronously (guard dropped before async work).
285        let missing: Vec<(u64, String, String)> = {
286            if let Ok(hot) = self.inner.hot_statements.read() {
287                hot.iter()
288                    .filter(|(hash, _)| !conn.stmt_cache.contains(hash))
289                    .map(|(hash, (name, sql))| (*hash, name.clone(), sql.clone()))
290                    .collect()
291            } else {
292                Vec::new()
293            }
294        }; // RwLockReadGuard dropped here — safe across .await
295
296        if !missing.is_empty() {
297            use crate::protocol::PgEncoder;
298            let mut buf = bytes::BytesMut::new();
299            for (_, name, sql) in &missing {
300                let parse_msg = PgEncoder::try_encode_parse(name, sql, &[])?;
301                buf.extend_from_slice(&parse_msg);
302            }
303            PgEncoder::encode_sync_to(&mut buf);
304            let preprepare_timeout = self.inner.config.connect_timeout;
305            let preprepare_result: PgResult<()> = match tokio::time::timeout(
306                preprepare_timeout,
307                async {
308                    conn.send_bytes(&buf).await?;
309                    // Drain responses and fail closed on any parse error.
310                    let mut parse_complete_count = 0usize;
311                    let mut parse_error: Option<PgError> = None;
312                    loop {
313                        let msg = conn.recv().await?;
314                        if handle_hot_preprepare_message(
315                            &msg,
316                            &mut parse_complete_count,
317                            &mut parse_error,
318                        )? {
319                            if let Some(err) = parse_error {
320                                return Err(err);
321                            }
322                            if parse_complete_count != missing.len() {
323                                return Err(PgError::Protocol(format!(
324                                    "hot pre-prepare completed with {} ParseComplete messages (expected {})",
325                                    parse_complete_count,
326                                    missing.len()
327                                )));
328                            }
329                            break;
330                        }
331                    }
332                    Ok::<(), PgError>(())
333                },
334            )
335            .await
336            {
337                Ok(res) => res,
338                Err(_) => Err(PgError::Timeout(format!(
339                    "hot statement pre-prepare timeout after {:?} (pool config connect_timeout)",
340                    preprepare_timeout
341                ))),
342            };
343
344            if let Err(e) = preprepare_result {
345                tracing::warn!(
346                    host = %self.inner.config.host,
347                    port = self.inner.config.port,
348                    user = %self.inner.config.user,
349                    db = %self.inner.config.database,
350                    timeout_ms = preprepare_timeout.as_millis() as u64,
351                    error = %e,
352                    "pool_hot_prepare_failed: replacing connection to avoid handing out uncertain protocol state"
353                );
354                pool_churn_record_destroy(&self.inner.config, "hot_prepare_failed");
355                conn = Self::create_connection(&self.inner.config).await?;
356                self.inner.total_created.fetch_add(1, Ordering::Relaxed);
357                created_at = Instant::now();
358            } else {
359                // Register in local cache
360                for (hash, name, sql) in &missing {
361                    conn.stmt_cache.put(*hash, name.clone());
362                    conn.prepared_statements.insert(name.clone(), sql.clone());
363                }
364            }
365        }
366
367        self.inner.active_count.fetch_add(1, Ordering::Relaxed);
368        // Permit is intentionally detached here; returned by `release()` / pool return.
369        permit.forget();
370
371        Ok(PooledConnection {
372            conn: Some(conn),
373            pool: std::sync::Arc::clone(&self.inner),
374            rls_dirty: false,
375            created_at,
376        })
377    }
378
379    /// Acquire a connection with RLS context pre-configured.
380    ///
381    /// Sets PostgreSQL session variables for tenant isolation before
382    /// returning the connection. When the connection is dropped, it
383    /// automatically clears the RLS context before returning to the pool.
384    ///
385    /// # Example
386    /// ```ignore
387    /// use qail_core::rls::RlsContext;
388    ///
389    /// let mut conn = pool.acquire_with_rls(
390    ///     RlsContext::tenant("550e8400-e29b-41d4-a716-446655440000")
391    /// ).await?;
392    /// // All queries through `conn` are now scoped to this tenant
393    /// ```
394    pub async fn acquire_with_rls(
395        &self,
396        ctx: qail_core::rls::RlsContext,
397    ) -> PgResult<PooledConnection> {
398        // SAFETY: RLS context is set immediately below via context_to_sql().
399        let mut conn = self.acquire_raw().await?;
400
401        // Set RLS context on the raw connection
402        let sql = crate::driver::rls::context_to_sql(&ctx);
403        let pg_conn = conn.get_mut()?;
404        if let Err(e) = execute_simple_with_timeout(
405            pg_conn,
406            &sql,
407            self.inner.config.connect_timeout,
408            "pool acquire_with_rls setup",
409        )
410        .await
411        {
412            // Attempt recovery ROLLBACK to salvage the connection rather than
413            // letting Drop destroy it (which wastes a TCP connection).
414            if let Ok(pg_conn) = conn.get_mut() {
415                let _ = pg_conn.execute_simple("ROLLBACK").await;
416            }
417            conn.release().await;
418            return Err(e);
419        }
420
421        // Mark dirty so Drop resets context before pool return
422        conn.rls_dirty = true;
423
424        Ok(conn)
425    }
426
427    /// Scoped connection helper that guarantees `release()` after closure execution.
428    ///
429    /// Prefer this over manual `acquire_with_rls()` in normal request handlers.
430    pub async fn with_rls<T, F>(&self, ctx: qail_core::rls::RlsContext, f: F) -> PgResult<T>
431    where
432        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
433    {
434        let mut conn = self.acquire_with_rls(ctx).await?;
435        let out = f(&mut conn).await;
436        match out {
437            Ok(value) => {
438                conn.release_checked().await?;
439                Ok(value)
440            }
441            Err(err) => {
442                let _ = conn.rollback_and_release().await;
443                Err(err)
444            }
445        }
446    }
447
448    /// Scoped helper for system-level operations (`RlsContext::empty()`).
449    pub async fn with_system<T, F>(&self, f: F) -> PgResult<T>
450    where
451        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
452    {
453        self.with_rls(qail_core::rls::RlsContext::empty(), f).await
454    }
455
456    /// Scoped helper for global/platform row access (`tenant_id IS NULL`).
457    pub async fn with_global<T, F>(&self, f: F) -> PgResult<T>
458    where
459        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
460    {
461        self.with_rls(qail_core::rls::RlsContext::global(), f).await
462    }
463
464    /// Scoped helper for single-tenant access.
465    pub async fn with_tenant<T, F>(&self, tenant_id: &str, f: F) -> PgResult<T>
466    where
467        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
468    {
469        self.with_rls(qail_core::rls::RlsContext::tenant(tenant_id), f)
470            .await
471    }
472
473    /// Acquire a connection with RLS context AND statement timeout.
474    ///
475    /// Like `acquire_with_rls()`, but also sets `statement_timeout` to prevent
476    /// runaway queries from holding pool connections indefinitely.
477    pub async fn acquire_with_rls_timeout(
478        &self,
479        ctx: qail_core::rls::RlsContext,
480        timeout_ms: u32,
481    ) -> PgResult<PooledConnection> {
482        // SAFETY: RLS context + timeout set immediately below via context_to_sql_with_timeout().
483        let mut conn = self.acquire_raw().await?;
484
485        // Set RLS context + statement_timeout atomically
486        let sql = crate::driver::rls::context_to_sql_with_timeout(&ctx, timeout_ms);
487        let pg_conn = conn.get_mut()?;
488        if let Err(e) = execute_simple_with_timeout(
489            pg_conn,
490            &sql,
491            self.inner.config.connect_timeout,
492            "pool acquire_with_rls_timeout setup",
493        )
494        .await
495        {
496            if let Ok(pg_conn) = conn.get_mut() {
497                let _ = pg_conn.execute_simple("ROLLBACK").await;
498            }
499            conn.release().await;
500            return Err(e);
501        }
502
503        // Mark dirty so Drop resets context + timeout before pool return
504        conn.rls_dirty = true;
505
506        Ok(conn)
507    }
508
509    /// Scoped connection helper that guarantees `release()` after closure execution.
510    pub async fn with_rls_timeout<T, F>(
511        &self,
512        ctx: qail_core::rls::RlsContext,
513        timeout_ms: u32,
514        f: F,
515    ) -> PgResult<T>
516    where
517        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
518    {
519        let mut conn = self.acquire_with_rls_timeout(ctx, timeout_ms).await?;
520        let out = f(&mut conn).await;
521        match out {
522            Ok(value) => {
523                conn.release_checked().await?;
524                Ok(value)
525            }
526            Err(err) => {
527                let _ = conn.rollback_and_release().await;
528                Err(err)
529            }
530        }
531    }
532
533    /// Acquire a connection with RLS context, statement timeout, AND lock timeout.
534    ///
535    /// Like `acquire_with_rls_timeout()`, but also sets `lock_timeout` to prevent
536    /// queries from blocking indefinitely on row/table locks.
537    /// When `lock_timeout_ms` is 0, the lock_timeout clause is omitted.
538    pub async fn acquire_with_rls_timeouts(
539        &self,
540        ctx: qail_core::rls::RlsContext,
541        statement_timeout_ms: u32,
542        lock_timeout_ms: u32,
543    ) -> PgResult<PooledConnection> {
544        // SAFETY: RLS context + timeouts set immediately below via context_to_sql_with_timeouts().
545        let mut conn = self.acquire_raw().await?;
546
547        let sql = crate::driver::rls::context_to_sql_with_timeouts(
548            &ctx,
549            statement_timeout_ms,
550            lock_timeout_ms,
551        );
552        let pg_conn = conn.get_mut()?;
553        if let Err(e) = execute_simple_with_timeout(
554            pg_conn,
555            &sql,
556            self.inner.config.connect_timeout,
557            "pool acquire_with_rls_timeouts setup",
558        )
559        .await
560        {
561            if let Ok(pg_conn) = conn.get_mut() {
562                let _ = pg_conn.execute_simple("ROLLBACK").await;
563            }
564            conn.release().await;
565            return Err(e);
566        }
567
568        conn.rls_dirty = true;
569
570        Ok(conn)
571    }
572
573    /// Scoped connection helper that guarantees `release()` after closure execution.
574    pub async fn with_rls_timeouts<T, F>(
575        &self,
576        ctx: qail_core::rls::RlsContext,
577        statement_timeout_ms: u32,
578        lock_timeout_ms: u32,
579        f: F,
580    ) -> PgResult<T>
581    where
582        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
583    {
584        let mut conn = self
585            .acquire_with_rls_timeouts(ctx, statement_timeout_ms, lock_timeout_ms)
586            .await?;
587        let out = f(&mut conn).await;
588        match out {
589            Ok(value) => {
590                conn.release_checked().await?;
591                Ok(value)
592            }
593            Err(err) => {
594                let _ = conn.rollback_and_release().await;
595                Err(err)
596            }
597        }
598    }
599
600    /// Acquire a connection for system-level operations (no tenant context).
601    ///
602    /// Sets RLS session variables to maximally restrictive values:
603    /// - `app.current_tenant_id = ''`
604    /// - `app.current_agent_id = ''`  
605    /// - `app.is_super_admin = false`
606    ///
607    /// Use this for startup introspection, migrations, and health checks
608    /// that must not operate within any tenant scope.
609    pub async fn acquire_system(&self) -> PgResult<PooledConnection> {
610        let ctx = qail_core::rls::RlsContext::empty();
611        self.acquire_with_rls(ctx).await
612    }
613
614    /// Acquire a connection scoped to global/platform rows.
615    ///
616    /// Shorthand for `acquire_with_rls(RlsContext::global())`.
617    /// Use this for shared reference data (for example: currencies, ports,
618    /// vessel types) stored as `tenant_id IS NULL`.
619    pub async fn acquire_global(&self) -> PgResult<PooledConnection> {
620        self.acquire_with_rls(qail_core::rls::RlsContext::global())
621            .await
622    }
623
624    /// Acquire a connection scoped to a specific tenant.
625    ///
626    /// Shorthand for `acquire_with_rls(RlsContext::tenant(tenant_id))`.
627    /// Use this when you already know the tenant UUID and want a
628    /// tenant-scoped connection in a single call.
629    ///
630    /// # Example
631    /// ```ignore
632    /// let mut conn = pool.acquire_for_tenant("550e8400-...").await?;
633    /// // All queries through `conn` are now scoped to this tenant
634    /// ```
635    pub async fn acquire_for_tenant(&self, tenant_id: &str) -> PgResult<PooledConnection> {
636        self.acquire_with_rls(qail_core::rls::RlsContext::tenant(tenant_id))
637            .await
638    }
639
640    /// Acquire a connection with branch context pre-configured.
641    ///
642    /// Sets PostgreSQL session variable `app.branch_id` for data virtualization.
643    /// When the connection is dropped, it automatically clears the branch context.
644    ///
645    /// # Example
646    /// ```ignore
647    /// use qail_core::branch::BranchContext;
648    ///
649    /// let ctx = BranchContext::branch("feature-auth");
650    /// let mut conn = pool.acquire_with_branch(&ctx).await?;
651    /// // All queries through `conn` are now branch-aware
652    /// ```
653    pub async fn acquire_with_branch(
654        &self,
655        ctx: &qail_core::branch::BranchContext,
656    ) -> PgResult<PooledConnection> {
657        // SAFETY: Branch context is set immediately below via branch_context_sql().
658        let mut conn = self.acquire_raw().await?;
659
660        if let Some(branch_name) = ctx.branch_name() {
661            let sql = crate::driver::branch_sql::branch_context_sql(branch_name);
662            let pg_conn = conn.get_mut()?;
663            if let Err(e) = execute_simple_with_timeout(
664                pg_conn,
665                &sql,
666                self.inner.config.connect_timeout,
667                "pool acquire_with_branch setup",
668            )
669            .await
670            {
671                if let Ok(pg_conn) = conn.get_mut() {
672                    let _ = pg_conn.execute_simple("ROLLBACK").await;
673                }
674                conn.release().await;
675                return Err(e);
676            }
677            conn.rls_dirty = true; // Reuse dirty flag for auto-reset
678        }
679
680        Ok(conn)
681    }
682
683    /// Get the current number of idle connections.
684    pub async fn idle_count(&self) -> usize {
685        self.inner.connections.lock().await.len()
686    }
687
688    /// Get the number of connections currently in use.
689    pub fn active_count(&self) -> usize {
690        self.inner.active_count.load(Ordering::Relaxed)
691    }
692
693    /// Get the maximum number of connections.
694    pub fn max_connections(&self) -> usize {
695        self.inner.config.max_connections
696    }
697
698    /// Plan auto count strategy for a given batch length.
699    pub fn plan_auto_count(&self, batch_len: usize) -> AutoCountPlan {
700        AutoCountPlan::for_pool(
701            batch_len,
702            self.inner.config.max_connections,
703            self.inner.semaphore.available_permits(),
704        )
705    }
706
707    /// Execute commands with runtime auto strategy and return count + plan.
708    pub async fn execute_count_auto_with_plan(
709        &self,
710        cmds: &[qail_core::ast::Qail],
711    ) -> PgResult<(usize, AutoCountPlan)> {
712        let plan = self.plan_auto_count(cmds.len());
713
714        let completed = match plan.path {
715            AutoCountPath::SingleCached => {
716                if cmds.is_empty() {
717                    0
718                } else {
719                    let mut conn = self.acquire_system().await?;
720                    let run_result = conn.fetch_all_cached(&cmds[0]).await;
721                    conn.release().await;
722                    let _ = run_result?;
723                    1
724                }
725            }
726            AutoCountPath::PipelineOneShot | AutoCountPath::PipelineCached => {
727                let mode = if matches!(plan.path, AutoCountPath::PipelineOneShot) {
728                    AstPipelineMode::OneShot
729                } else {
730                    AstPipelineMode::Cached
731                };
732
733                let mut pooled = self.acquire_system().await?;
734                let run_result = {
735                    let conn = pooled.get_mut()?;
736                    conn.pipeline_execute_count_ast_with_mode(cmds, mode).await
737                };
738                pooled.release().await;
739                run_result?
740            }
741            AutoCountPath::PoolParallel => {
742                if cmds.is_empty() {
743                    0
744                } else {
745                    let all_cmds = Arc::new(cmds.to_vec());
746                    let mut tasks: JoinSet<PgResult<usize>> = JoinSet::new();
747
748                    for worker in 0..plan.workers {
749                        let start = worker * plan.chunk_size;
750                        if start >= all_cmds.len() {
751                            break;
752                        }
753                        let end = (start + plan.chunk_size).min(all_cmds.len());
754                        let pool = self.clone();
755                        let all_cmds = Arc::clone(&all_cmds);
756
757                        tasks.spawn(async move {
758                            let mut pooled = pool.acquire_system().await?;
759                            let run_result = {
760                                let conn = pooled.get_mut()?;
761                                conn.pipeline_execute_count_ast_with_mode(
762                                    &all_cmds[start..end],
763                                    AstPipelineMode::Auto,
764                                )
765                                .await
766                            };
767                            pooled.release().await;
768                            run_result
769                        });
770                    }
771
772                    let mut total = 0usize;
773                    while let Some(joined) = tasks.join_next().await {
774                        match joined {
775                            Ok(Ok(count)) => {
776                                total += count;
777                            }
778                            Ok(Err(err)) => return Err(err),
779                            Err(err) => {
780                                return Err(PgError::Connection(format!(
781                                    "auto pool worker join failed: {err}"
782                                )));
783                            }
784                        }
785                    }
786                    total
787                }
788            }
789        };
790
791        Ok((completed, plan))
792    }
793
794    /// Execute commands with runtime auto strategy.
795    #[inline]
796    pub async fn execute_count_auto(&self, cmds: &[qail_core::ast::Qail]) -> PgResult<usize> {
797        let (completed, _plan) = self.execute_count_auto_with_plan(cmds).await?;
798        Ok(completed)
799    }
800
801    /// Get comprehensive pool statistics.
802    pub async fn stats(&self) -> PoolStats {
803        let idle = self.inner.connections.lock().await.len();
804        let active = self.inner.active_count.load(Ordering::Relaxed);
805        let used_slots = self
806            .inner
807            .config
808            .max_connections
809            .saturating_sub(self.inner.semaphore.available_permits());
810        PoolStats {
811            active,
812            idle,
813            pending: used_slots.saturating_sub(active),
814            max_size: self.inner.config.max_connections,
815            total_created: self.inner.total_created.load(Ordering::Relaxed),
816        }
817    }
818
819    /// Check if the pool is closed.
820    pub fn is_closed(&self) -> bool {
821        self.inner.closed.load(Ordering::Relaxed)
822    }
823
824    /// Close the pool gracefully.
825    ///
826    /// Rejects new acquires immediately, then waits up to `acquire_timeout`
827    /// for in-flight connections to be released before dropping idle
828    /// connections. Connections released after closure are destroyed by
829    /// `return_connection` and not returned to the idle queue.
830    pub async fn close(&self) {
831        self.close_graceful(self.inner.config.acquire_timeout).await;
832    }
833
834    /// Close the pool gracefully with an explicit drain timeout.
835    pub async fn close_graceful(&self, drain_timeout: Duration) {
836        self.inner.closed.store(true, Ordering::Relaxed);
837        // Wake blocked acquires immediately so shutdown doesn't wait on acquire_timeout.
838        self.inner.semaphore.close();
839
840        let deadline = Instant::now() + drain_timeout;
841        loop {
842            let active = self.inner.active_count.load(Ordering::Relaxed);
843            if active == 0 {
844                break;
845            }
846            if Instant::now() >= deadline {
847                tracing::warn!(
848                    active_connections = active,
849                    timeout_ms = drain_timeout.as_millis() as u64,
850                    "pool_close_drain_timeout: forcing idle cleanup while active connections remain"
851                );
852                break;
853            }
854            tokio::time::sleep(Duration::from_millis(25)).await;
855        }
856
857        let mut connections = self.inner.connections.lock().await;
858        let dropped_idle = connections.len();
859        connections.clear();
860        tracing::info!(
861            dropped_idle_connections = dropped_idle,
862            active_connections = self.inner.active_count.load(Ordering::Relaxed),
863            "pool_closed"
864        );
865    }
866
867    /// Create a new connection using the pool configuration.
868    async fn create_connection(config: &PoolConfig) -> PgResult<PgConnection> {
869        if !config.auth_settings.has_any_password_method()
870            && config.mtls.is_none()
871            && config.password.is_some()
872        {
873            return Err(PgError::Auth(
874                "Invalid PoolConfig: all password auth methods are disabled".to_string(),
875            ));
876        }
877
878        let options = ConnectOptions {
879            tls_mode: config.tls_mode,
880            gss_enc_mode: config.gss_enc_mode,
881            tls_ca_cert_pem: config.tls_ca_cert_pem.clone(),
882            mtls: config.mtls.clone(),
883            gss_token_provider: config.gss_token_provider,
884            gss_token_provider_ex: config.gss_token_provider_ex.clone(),
885            auth: config.auth_settings,
886            startup_params: Vec::new(),
887        };
888
889        if let Some(remaining) = gss_circuit_remaining_open(config) {
890            metrics::counter!("qail_pg_gss_circuit_open_total").increment(1);
891            tracing::warn!(
892                host = %config.host,
893                port = config.port,
894                user = %config.user,
895                db = %config.database,
896                remaining_ms = remaining.as_millis() as u64,
897                "gss_connect_circuit_open"
898            );
899            return Err(PgError::Connection(format!(
900                "GSS connection circuit is open; retry after {:?}",
901                remaining
902            )));
903        }
904
905        let mut attempt = 0usize;
906        loop {
907            let connect_result = tokio::time::timeout(
908                config.connect_timeout,
909                PgConnection::connect_with_options(
910                    &config.host,
911                    config.port,
912                    &config.user,
913                    &config.database,
914                    config.password.as_deref(),
915                    options.clone(),
916                ),
917            )
918            .await;
919
920            let connect_result = match connect_result {
921                Ok(result) => result,
922                Err(_) => Err(PgError::Timeout(format!(
923                    "connect timeout after {:?} (pool config connect_timeout)",
924                    config.connect_timeout
925                ))),
926            };
927
928            match connect_result {
929                Ok(conn) => {
930                    metrics::counter!("qail_pg_pool_connect_success_total").increment(1);
931                    gss_circuit_record_success(config);
932                    return Ok(conn);
933                }
934                Err(err) if should_retry_gss_connect_error(config, attempt, &err) => {
935                    metrics::counter!("qail_pg_gss_connect_retries_total").increment(1);
936                    gss_circuit_record_failure(config);
937                    let delay = gss_retry_delay(config.gss_retry_base_delay, attempt);
938                    tracing::warn!(
939                        host = %config.host,
940                        port = config.port,
941                        user = %config.user,
942                        db = %config.database,
943                        attempt = attempt + 1,
944                        delay_ms = delay.as_millis() as u64,
945                        error = %err,
946                        "gss_connect_retry"
947                    );
948                    tokio::time::sleep(delay).await;
949                    attempt += 1;
950                }
951                Err(err) => {
952                    metrics::counter!("qail_pg_pool_connect_failures_total").increment(1);
953                    if should_track_gss_circuit_error(config, &err) {
954                        metrics::counter!("qail_pg_gss_connect_failures_total").increment(1);
955                        gss_circuit_record_failure(config);
956                    }
957                    return Err(err);
958                }
959            }
960        }
961    }
962
963    /// Run one maintenance cycle: evict stale idle connections and backfill
964    /// to `min_connections`. Called periodically by `spawn_pool_maintenance`.
965    pub async fn maintain(&self) {
966        if self.inner.closed.load(Ordering::Relaxed) {
967            return;
968        }
969
970        // Phase 1: Evict idle and expired connections from the pool.
971        let evicted = {
972            let mut connections = self.inner.connections.lock().await;
973            let before = connections.len();
974            connections.retain(|pooled| {
975                if pooled.last_used.elapsed() > self.inner.config.idle_timeout {
976                    record_pool_connection_destroy("idle_sweep_evict");
977                    return false;
978                }
979                if let Some(max_life) = self.inner.config.max_lifetime
980                    && pooled.created_at.elapsed() > max_life
981                {
982                    record_pool_connection_destroy("lifetime_sweep_evict");
983                    return false;
984                }
985                true
986            });
987            before - connections.len()
988        };
989
990        if evicted > 0 {
991            tracing::debug!(evicted, "pool_maintenance: evicted stale idle connections");
992        }
993
994        // Phase 2: Backfill to min_connections if below threshold.
995        let min = self.inner.config.min_connections;
996        if min == 0 {
997            return;
998        }
999
1000        let idle_count = self.inner.connections.lock().await.len();
1001        let checked_out_slots = self
1002            .inner
1003            .config
1004            .max_connections
1005            .saturating_sub(self.inner.semaphore.available_permits());
1006        let deficit = maintenance_backfill_deficit(
1007            self.inner.config.max_connections,
1008            min,
1009            idle_count,
1010            checked_out_slots,
1011        );
1012        if deficit == 0 {
1013            return;
1014        }
1015        let mut created = 0usize;
1016        for _ in 0..deficit {
1017            match Self::create_connection(&self.inner.config).await {
1018                Ok(conn) => {
1019                    self.inner.total_created.fetch_add(1, Ordering::Relaxed);
1020                    let mut connections = self.inner.connections.lock().await;
1021                    if connections.len() < self.inner.config.max_connections {
1022                        connections.push(PooledConn {
1023                            conn,
1024                            created_at: Instant::now(),
1025                            last_used: Instant::now(),
1026                        });
1027                        created += 1;
1028                    } else {
1029                        // Pool filled by concurrent acquires; stop backfill.
1030                        break;
1031                    }
1032                }
1033                Err(e) => {
1034                    tracing::warn!(error = %e, "pool_maintenance: backfill connection failed");
1035                    break; // Transient failure — retry next cycle.
1036                }
1037            }
1038        }
1039
1040        if created > 0 {
1041            tracing::debug!(
1042                created,
1043                min_connections = min,
1044                "pool_maintenance: backfilled idle connections"
1045            );
1046        }
1047    }
1048}
1049
1050/// Spawn a background task that periodically maintains pool health.
1051///
1052/// Runs every `idle_timeout / 2` (min 5s): evicts stale idle connections and
1053/// backfills to `min_connections`. Call once after `PgPool::connect`.
1054pub fn spawn_pool_maintenance(pool: PgPool) {
1055    let interval_secs = std::cmp::max(pool.inner.config.idle_timeout.as_secs() / 2, 5);
1056    tokio::spawn(async move {
1057        let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1058        loop {
1059            interval.tick().await;
1060            if pool.is_closed() {
1061                break;
1062            }
1063            pool.maintain().await;
1064        }
1065    });
1066}
1067
1068pub(super) fn maintenance_backfill_deficit(
1069    max_connections: usize,
1070    min_connections: usize,
1071    idle_count: usize,
1072    checked_out_slots: usize,
1073) -> usize {
1074    let target_idle = min_connections.min(max_connections);
1075    if idle_count >= target_idle {
1076        return 0;
1077    }
1078
1079    let needed_idle = target_idle - idle_count;
1080    let available_slots =
1081        max_connections.saturating_sub(idle_count.saturating_add(checked_out_slots));
1082    needed_idle.min(available_slots)
1083}
1084
1085pub(super) fn validate_pool_config(config: &PoolConfig) -> PgResult<()> {
1086    if config.max_connections == 0 {
1087        return Err(PgError::Connection(
1088            "Invalid PoolConfig: max_connections must be >= 1".to_string(),
1089        ));
1090    }
1091    if config.min_connections > config.max_connections {
1092        return Err(PgError::Connection(format!(
1093            "Invalid PoolConfig: min_connections ({}) must be <= max_connections ({})",
1094            config.min_connections, config.max_connections
1095        )));
1096    }
1097    if config.acquire_timeout.is_zero() {
1098        return Err(PgError::Connection(
1099            "Invalid PoolConfig: acquire_timeout must be > 0".to_string(),
1100        ));
1101    }
1102    if config.connect_timeout.is_zero() {
1103        return Err(PgError::Connection(
1104            "Invalid PoolConfig: connect_timeout must be > 0".to_string(),
1105        ));
1106    }
1107    if config.leaked_cleanup_queue == 0 {
1108        return Err(PgError::Connection(
1109            "Invalid PoolConfig: leaked_cleanup_queue must be >= 1".to_string(),
1110        ));
1111    }
1112    Ok(())
1113}
1114
1115pub(super) async fn execute_simple_with_timeout(
1116    conn: &mut PgConnection,
1117    sql: &str,
1118    timeout: Duration,
1119    operation: &str,
1120) -> PgResult<()> {
1121    match tokio::time::timeout(timeout, conn.execute_simple(sql)).await {
1122        Ok(result) => result,
1123        Err(_) => {
1124            conn.mark_io_desynced();
1125            Err(PgError::Timeout(format!(
1126                "{} timeout after {:?} (pool config connect_timeout)",
1127                operation, timeout
1128            )))
1129        }
1130    }
1131}