Skip to main content

qail_pg/driver/pool/
lifecycle.rs

1//! Pool lifecycle: PgPoolInner, PgPool core (connect, maintain, close),
2//! hot statement pre-prepare, and connection creation.
3
4use super::ScopedPoolFuture;
5use super::churn::{
6    PoolStats, decrement_active_count_saturating, pool_churn_record_destroy,
7    pool_churn_remaining_open, record_pool_connection_destroy,
8};
9use super::config::PoolConfig;
10use super::connection::PooledConn;
11use super::connection::PooledConnection;
12use super::gss::*;
13use crate::driver::{
14    AstPipelineMode, AutoCountPath, AutoCountPlan, ConnectOptions, PgConnection, PgError, PgResult,
15    is_ignorable_session_message, unexpected_backend_message,
16};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::time::{Duration, Instant};
20use tokio::sync::{Mutex, Semaphore};
21use tokio::task::JoinSet;
22
23/// Maximum number of hot statements to track globally.
24pub(super) const MAX_HOT_STATEMENTS: usize = 32;
25
26/// Inner pool state (shared across clones).
27pub(super) struct PgPoolInner {
28    pub(super) config: PoolConfig,
29    pub(super) connections: Mutex<Vec<PooledConn>>,
30    pub(super) semaphore: Semaphore,
31    pub(super) closed: AtomicBool,
32    pub(super) active_count: AtomicUsize,
33    pub(super) total_created: AtomicUsize,
34    pub(super) leaked_cleanup_inflight: AtomicUsize,
35    /// Global registry of frequently-used prepared statements.
36    /// Maps sql_hash → (stmt_name, sql_text).
37    /// New connections pre-prepare these on checkout for instant cache hits.
38    pub(super) hot_statements: std::sync::RwLock<std::collections::HashMap<u64, (String, String)>>,
39}
40
41pub(super) fn handle_hot_preprepare_message(
42    msg: &crate::protocol::BackendMessage,
43    parse_complete_count: &mut usize,
44    error: &mut Option<PgError>,
45) -> PgResult<bool> {
46    match msg {
47        crate::protocol::BackendMessage::ParseComplete => {
48            *parse_complete_count += 1;
49            Ok(false)
50        }
51        crate::protocol::BackendMessage::ErrorResponse(err) => {
52            if error.is_none() {
53                *error = Some(PgError::QueryServer(err.clone().into()));
54            }
55            Ok(false)
56        }
57        crate::protocol::BackendMessage::ReadyForQuery(_) => Ok(true),
58        msg if is_ignorable_session_message(msg) => Ok(false),
59        other => Err(unexpected_backend_message("pool hot pre-prepare", other)),
60    }
61}
62
63fn evict_failed_hot_preprepare_entries(
64    pool: &PgPoolInner,
65    missing: &[(u64, String, String)],
66) -> usize {
67    let Ok(mut hot) = pool.hot_statements.write() else {
68        return 0;
69    };
70
71    let mut evicted = 0usize;
72    for (hash, _, _) in missing {
73        if hot.remove(hash).is_some() {
74            evicted += 1;
75        }
76    }
77    evicted
78}
79
80impl PgPoolInner {
81    pub(super) async fn return_connection(&self, conn: PgConnection, created_at: Instant) {
82        decrement_active_count_saturating(&self.active_count);
83
84        if conn.is_io_desynced() {
85            tracing::warn!(
86                host = %self.config.host,
87                port = self.config.port,
88                user = %self.config.user,
89                db = %self.config.database,
90                "pool_return_desynced: dropping connection due to prior I/O/protocol desync"
91            );
92            record_pool_connection_destroy("pool_desynced_drop");
93            self.semaphore.add_permits(1);
94            pool_churn_record_destroy(&self.config, "return_desynced");
95            return;
96        }
97
98        if self.closed.load(Ordering::Relaxed) {
99            record_pool_connection_destroy("pool_closed_drop");
100            self.semaphore.add_permits(1);
101            return;
102        }
103
104        let mut connections = self.connections.lock().await;
105        if connections.len() < self.config.max_connections {
106            connections.push(PooledConn {
107                conn,
108                created_at,
109                last_used: Instant::now(),
110            });
111        } else {
112            record_pool_connection_destroy("pool_overflow_drop");
113        }
114
115        self.semaphore.add_permits(1);
116    }
117
118    /// Get a healthy connection from the pool, or None if pool is empty.
119    async fn get_healthy_connection(&self) -> Option<PooledConn> {
120        let mut connections = self.connections.lock().await;
121
122        while let Some(pooled) = connections.pop() {
123            if pooled.last_used.elapsed() > self.config.idle_timeout {
124                tracing::debug!(
125                    idle_secs = pooled.last_used.elapsed().as_secs(),
126                    timeout_secs = self.config.idle_timeout.as_secs(),
127                    "pool_checkout_evict: connection exceeded idle timeout"
128                );
129                record_pool_connection_destroy("idle_timeout_evict");
130                continue;
131            }
132
133            if let Some(max_life) = self.config.max_lifetime
134                && pooled.created_at.elapsed() > max_life
135            {
136                tracing::debug!(
137                    age_secs = pooled.created_at.elapsed().as_secs(),
138                    max_lifetime_secs = max_life.as_secs(),
139                    "pool_checkout_evict: connection exceeded max lifetime"
140                );
141                record_pool_connection_destroy("max_lifetime_evict");
142                continue;
143            }
144
145            return Some(pooled);
146        }
147
148        None
149    }
150}
151
152/// # Example
153/// ```ignore
154/// let config = PoolConfig::new("localhost", 5432, "user", "db")
155///     .password("secret")
156///     .max_connections(20);
157/// let pool = PgPool::connect(config).await?;
158/// // Get a connection from the pool
159/// let mut conn = pool.acquire_raw().await?;
160/// conn.simple_query("SELECT 1").await?;
161/// ```
162#[derive(Clone)]
163pub struct PgPool {
164    pub(super) inner: Arc<PgPoolInner>,
165}
166
167impl PgPool {
168    /// Create a pool from `qail.toml` (loads and parses automatically).
169    ///
170    /// # Example
171    /// ```ignore
172    /// let pool = PgPool::from_config().await?;
173    /// ```
174    pub async fn from_config() -> PgResult<Self> {
175        let qail = qail_core::config::QailConfig::load()
176            .map_err(|e| PgError::Connection(format!("Config error: {}", e)))?;
177        let config = PoolConfig::from_qail_config(&qail)?;
178        Self::connect(config).await
179    }
180
181    /// Create a new connection pool.
182    pub async fn connect(config: PoolConfig) -> PgResult<Self> {
183        validate_pool_config(&config)?;
184
185        // Semaphore starts with max_connections permits
186        let semaphore = Semaphore::new(config.max_connections);
187
188        let mut initial_connections = Vec::new();
189        for _ in 0..config.min_connections {
190            let conn = Self::create_connection(&config).await?;
191            initial_connections.push(PooledConn {
192                conn,
193                created_at: Instant::now(),
194                last_used: Instant::now(),
195            });
196        }
197
198        let initial_count = initial_connections.len();
199
200        let inner = Arc::new(PgPoolInner {
201            config,
202            connections: Mutex::new(initial_connections),
203            semaphore,
204            closed: AtomicBool::new(false),
205            active_count: AtomicUsize::new(0),
206            total_created: AtomicUsize::new(initial_count),
207            leaked_cleanup_inflight: AtomicUsize::new(0),
208            hot_statements: std::sync::RwLock::new(std::collections::HashMap::new()),
209        });
210
211        Ok(Self { inner })
212    }
213
214    /// Acquire a raw connection from the pool (crate-internal only).
215    ///
216    /// # Safety (not `unsafe` in the Rust sense, but security-critical)
217    ///
218    /// This returns a connection with **no RLS context**. All tenant data
219    /// queries on this connection will bypass row-level security.
220    ///
221    /// **Safe usage**: Pair with `fetch_all_with_rls()` for pipelined
222    /// RLS+query execution (single roundtrip). Or use `acquire_with_rls()`
223    /// / `acquire_with_rls_timeout()` for the 2-roundtrip path.
224    ///
225    /// **Unsafe usage**: Running queries directly on a raw connection
226    /// without RLS context. Every call site MUST include a `// SAFETY:`
227    /// comment explaining why raw acquisition is justified.
228    pub async fn acquire_raw(&self) -> PgResult<PooledConnection> {
229        if self.inner.closed.load(Ordering::Relaxed) {
230            return Err(PgError::PoolClosed);
231        }
232
233        if let Some(remaining) = pool_churn_remaining_open(&self.inner.config) {
234            metrics::counter!("qail_pg_pool_churn_circuit_reject_total").increment(1);
235            tracing::warn!(
236                host = %self.inner.config.host,
237                port = self.inner.config.port,
238                user = %self.inner.config.user,
239                db = %self.inner.config.database,
240                remaining_ms = remaining.as_millis() as u64,
241                "pool_connection_churn_circuit_open"
242            );
243            return Err(PgError::PoolExhausted {
244                max: self.inner.config.max_connections,
245            });
246        }
247
248        // Wait for available slot with timeout
249        let acquire_timeout = self.inner.config.acquire_timeout;
250        let permit =
251            match tokio::time::timeout(acquire_timeout, self.inner.semaphore.acquire()).await {
252                Ok(permit) => permit.map_err(|_| PgError::PoolClosed)?,
253                Err(_) => {
254                    metrics::counter!("qail_pg_pool_acquire_timeouts_total").increment(1);
255                    return Err(PgError::Timeout(format!(
256                        "pool acquire after {}s ({} max connections)",
257                        acquire_timeout.as_secs(),
258                        self.inner.config.max_connections
259                    )));
260                }
261            };
262
263        if self.inner.closed.load(Ordering::Relaxed) {
264            return Err(PgError::PoolClosed);
265        }
266
267        // Try to get existing healthy connection
268        let (mut conn, mut created_at) =
269            if let Some(pooled) = self.inner.get_healthy_connection().await {
270                (pooled.conn, pooled.created_at)
271            } else {
272                let conn = Self::create_connection(&self.inner.config).await?;
273                self.inner.total_created.fetch_add(1, Ordering::Relaxed);
274                (conn, Instant::now())
275            };
276
277        if self.inner.config.test_on_acquire
278            && let Err(e) = execute_simple_with_timeout(
279                &mut conn,
280                "SELECT 1",
281                self.inner.config.connect_timeout,
282                "pool checkout health check",
283            )
284            .await
285        {
286            tracing::warn!(
287                host = %self.inner.config.host,
288                port = self.inner.config.port,
289                user = %self.inner.config.user,
290                db = %self.inner.config.database,
291                error = %e,
292                "pool_health_check_failed: checkout probe failed, creating replacement connection"
293            );
294            pool_churn_record_destroy(&self.inner.config, "health_check_failed");
295            conn = Self::create_connection(&self.inner.config).await?;
296            self.inner.total_created.fetch_add(1, Ordering::Relaxed);
297            created_at = Instant::now();
298        }
299
300        // Pre-prepare hot statements that this connection doesn't have yet.
301        // Collect data synchronously (guard dropped before async work).
302        let missing: Vec<(u64, String, String)> = {
303            if let Ok(hot) = self.inner.hot_statements.read() {
304                hot.iter()
305                    .filter(|(hash, _)| !conn.stmt_cache.contains(hash))
306                    .map(|(hash, (name, sql))| (*hash, name.clone(), sql.clone()))
307                    .collect()
308            } else {
309                Vec::new()
310            }
311        }; // RwLockReadGuard dropped here — safe across .await
312
313        if !missing.is_empty() {
314            use crate::protocol::PgEncoder;
315            let mut buf = bytes::BytesMut::new();
316            for (_, name, sql) in &missing {
317                let parse_msg = PgEncoder::try_encode_parse(name, sql, &[])?;
318                buf.extend_from_slice(&parse_msg);
319            }
320            PgEncoder::encode_sync_to(&mut buf);
321            let preprepare_timeout = self.inner.config.connect_timeout;
322            let preprepare_result: PgResult<()> = match tokio::time::timeout(
323                preprepare_timeout,
324                async {
325                    conn.send_bytes(&buf).await?;
326                    // Drain responses and fail closed on any parse error.
327                    let mut parse_complete_count = 0usize;
328                    let mut parse_error: Option<PgError> = None;
329                    loop {
330                        let msg = conn.recv().await?;
331                        if handle_hot_preprepare_message(
332                            &msg,
333                            &mut parse_complete_count,
334                            &mut parse_error,
335                        )? {
336                            if let Some(err) = parse_error {
337                                return Err(err);
338                            }
339                            if parse_complete_count != missing.len() {
340                                return Err(PgError::Protocol(format!(
341                                    "hot pre-prepare completed with {} ParseComplete messages (expected {})",
342                                    parse_complete_count,
343                                    missing.len()
344                                )));
345                            }
346                            break;
347                        }
348                    }
349                    Ok::<(), PgError>(())
350                },
351            )
352            .await
353            {
354                Ok(res) => res,
355                Err(_) => Err(PgError::Timeout(format!(
356                    "hot statement pre-prepare timeout after {:?} (pool config connect_timeout)",
357                    preprepare_timeout
358                ))),
359            };
360
361            if let Err(e) = preprepare_result {
362                let evicted_hot_statements =
363                    evict_failed_hot_preprepare_entries(&self.inner, &missing);
364                tracing::warn!(
365                    host = %self.inner.config.host,
366                    port = self.inner.config.port,
367                    user = %self.inner.config.user,
368                    db = %self.inner.config.database,
369                    timeout_ms = preprepare_timeout.as_millis() as u64,
370                    evicted_hot_statements,
371                    error = %e,
372                    "pool_hot_prepare_failed: replacing connection to avoid handing out uncertain protocol state"
373                );
374                pool_churn_record_destroy(&self.inner.config, "hot_prepare_failed");
375                conn = Self::create_connection(&self.inner.config).await?;
376                self.inner.total_created.fetch_add(1, Ordering::Relaxed);
377                created_at = Instant::now();
378            } else {
379                // Register in local cache
380                for (hash, name, sql) in &missing {
381                    conn.stmt_cache.put(*hash, name.clone());
382                    conn.prepared_statements.insert(name.clone(), sql.clone());
383                }
384            }
385        }
386
387        self.inner.active_count.fetch_add(1, Ordering::Relaxed);
388        // Permit is intentionally detached here; returned by `release()` / pool return.
389        permit.forget();
390
391        Ok(PooledConnection {
392            conn: Some(conn),
393            pool: std::sync::Arc::clone(&self.inner),
394            rls_dirty: false,
395            created_at,
396        })
397    }
398
399    /// Acquire a connection with RLS context pre-configured.
400    ///
401    /// Sets PostgreSQL session variables for tenant isolation before
402    /// returning the connection. When the connection is dropped, it
403    /// automatically clears the RLS context before returning to the pool.
404    ///
405    /// # Example
406    /// ```ignore
407    /// use qail_core::rls::RlsContext;
408    ///
409    /// let mut conn = pool.acquire_with_rls(
410    ///     RlsContext::tenant("550e8400-e29b-41d4-a716-446655440000")
411    /// ).await?;
412    /// // All queries through `conn` are now scoped to this tenant
413    /// ```
414    pub async fn acquire_with_rls(
415        &self,
416        ctx: qail_core::rls::RlsContext,
417    ) -> PgResult<PooledConnection> {
418        // SAFETY: RLS context is set immediately below via context_to_sql().
419        let mut conn = self.acquire_raw().await?;
420
421        // Set RLS context on the raw connection
422        let sql = crate::driver::rls::context_to_sql(&ctx);
423        let pg_conn = conn.get_mut()?;
424        if let Err(e) = execute_simple_with_timeout(
425            pg_conn,
426            &sql,
427            self.inner.config.connect_timeout,
428            "pool acquire_with_rls setup",
429        )
430        .await
431        {
432            // Attempt recovery ROLLBACK to salvage the connection rather than
433            // letting Drop destroy it (which wastes a TCP connection).
434            if let Ok(pg_conn) = conn.get_mut() {
435                let _ = pg_conn.execute_simple("ROLLBACK").await;
436            }
437            conn.release().await;
438            return Err(e);
439        }
440
441        // Mark dirty so Drop resets context before pool return
442        conn.rls_dirty = true;
443
444        Ok(conn)
445    }
446
447    /// Scoped connection helper that guarantees `release()` after closure execution.
448    ///
449    /// Prefer this over manual `acquire_with_rls()` in normal request handlers.
450    pub async fn with_rls<T, F>(&self, ctx: qail_core::rls::RlsContext, f: F) -> PgResult<T>
451    where
452        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
453    {
454        let mut conn = self.acquire_with_rls(ctx).await?;
455        let out = f(&mut conn).await;
456        match out {
457            Ok(value) => {
458                conn.release_checked().await?;
459                Ok(value)
460            }
461            Err(err) => {
462                let _ = conn.rollback_and_release().await;
463                Err(err)
464            }
465        }
466    }
467
468    /// Scoped helper for system-level operations (`RlsContext::empty()`).
469    pub async fn with_system<T, F>(&self, f: F) -> PgResult<T>
470    where
471        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
472    {
473        self.with_rls(qail_core::rls::RlsContext::empty(), f).await
474    }
475
476    /// Scoped helper for global/platform row access (`tenant_id IS NULL`).
477    pub async fn with_global<T, F>(&self, f: F) -> PgResult<T>
478    where
479        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
480    {
481        self.with_rls(qail_core::rls::RlsContext::global(), f).await
482    }
483
484    /// Scoped helper for single-tenant access.
485    pub async fn with_tenant<T, F>(&self, tenant_id: &str, f: F) -> PgResult<T>
486    where
487        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
488    {
489        self.with_rls(qail_core::rls::RlsContext::tenant(tenant_id), f)
490            .await
491    }
492
493    /// Acquire a connection with RLS context AND statement timeout.
494    ///
495    /// Like `acquire_with_rls()`, but also sets `statement_timeout` to prevent
496    /// runaway queries from holding pool connections indefinitely.
497    pub async fn acquire_with_rls_timeout(
498        &self,
499        ctx: qail_core::rls::RlsContext,
500        timeout_ms: u32,
501    ) -> PgResult<PooledConnection> {
502        // SAFETY: RLS context + timeout set immediately below via context_to_sql_with_timeout().
503        let mut conn = self.acquire_raw().await?;
504
505        // Set RLS context + statement_timeout atomically
506        let sql = crate::driver::rls::context_to_sql_with_timeout(&ctx, timeout_ms);
507        let pg_conn = conn.get_mut()?;
508        if let Err(e) = execute_simple_with_timeout(
509            pg_conn,
510            &sql,
511            self.inner.config.connect_timeout,
512            "pool acquire_with_rls_timeout setup",
513        )
514        .await
515        {
516            if let Ok(pg_conn) = conn.get_mut() {
517                let _ = pg_conn.execute_simple("ROLLBACK").await;
518            }
519            conn.release().await;
520            return Err(e);
521        }
522
523        // Mark dirty so Drop resets context + timeout before pool return
524        conn.rls_dirty = true;
525
526        Ok(conn)
527    }
528
529    /// Scoped connection helper that guarantees `release()` after closure execution.
530    pub async fn with_rls_timeout<T, F>(
531        &self,
532        ctx: qail_core::rls::RlsContext,
533        timeout_ms: u32,
534        f: F,
535    ) -> PgResult<T>
536    where
537        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
538    {
539        let mut conn = self.acquire_with_rls_timeout(ctx, timeout_ms).await?;
540        let out = f(&mut conn).await;
541        match out {
542            Ok(value) => {
543                conn.release_checked().await?;
544                Ok(value)
545            }
546            Err(err) => {
547                let _ = conn.rollback_and_release().await;
548                Err(err)
549            }
550        }
551    }
552
553    /// Acquire a connection with RLS context, statement timeout, AND lock timeout.
554    ///
555    /// Like `acquire_with_rls_timeout()`, but also sets `lock_timeout` to prevent
556    /// queries from blocking indefinitely on row/table locks.
557    /// When `lock_timeout_ms` is 0, the lock_timeout clause is omitted.
558    pub async fn acquire_with_rls_timeouts(
559        &self,
560        ctx: qail_core::rls::RlsContext,
561        statement_timeout_ms: u32,
562        lock_timeout_ms: u32,
563    ) -> PgResult<PooledConnection> {
564        // SAFETY: RLS context + timeouts set immediately below via context_to_sql_with_timeouts().
565        let mut conn = self.acquire_raw().await?;
566
567        let sql = crate::driver::rls::context_to_sql_with_timeouts(
568            &ctx,
569            statement_timeout_ms,
570            lock_timeout_ms,
571        );
572        let pg_conn = conn.get_mut()?;
573        if let Err(e) = execute_simple_with_timeout(
574            pg_conn,
575            &sql,
576            self.inner.config.connect_timeout,
577            "pool acquire_with_rls_timeouts setup",
578        )
579        .await
580        {
581            if let Ok(pg_conn) = conn.get_mut() {
582                let _ = pg_conn.execute_simple("ROLLBACK").await;
583            }
584            conn.release().await;
585            return Err(e);
586        }
587
588        conn.rls_dirty = true;
589
590        Ok(conn)
591    }
592
593    /// Scoped connection helper that guarantees `release()` after closure execution.
594    pub async fn with_rls_timeouts<T, F>(
595        &self,
596        ctx: qail_core::rls::RlsContext,
597        statement_timeout_ms: u32,
598        lock_timeout_ms: u32,
599        f: F,
600    ) -> PgResult<T>
601    where
602        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
603    {
604        let mut conn = self
605            .acquire_with_rls_timeouts(ctx, statement_timeout_ms, lock_timeout_ms)
606            .await?;
607        let out = f(&mut conn).await;
608        match out {
609            Ok(value) => {
610                conn.release_checked().await?;
611                Ok(value)
612            }
613            Err(err) => {
614                let _ = conn.rollback_and_release().await;
615                Err(err)
616            }
617        }
618    }
619
620    /// Acquire a connection for system-level operations (no tenant context).
621    ///
622    /// Sets RLS session variables to maximally restrictive values:
623    /// - `app.current_tenant_id = ''`
624    /// - `app.current_agent_id = ''`  
625    /// - `app.is_super_admin = false`
626    ///
627    /// Use this for startup introspection, migrations, and health checks
628    /// that must not operate within any tenant scope.
629    pub async fn acquire_system(&self) -> PgResult<PooledConnection> {
630        let ctx = qail_core::rls::RlsContext::empty();
631        self.acquire_with_rls(ctx).await
632    }
633
634    /// Acquire a connection scoped to global/platform rows.
635    ///
636    /// Shorthand for `acquire_with_rls(RlsContext::global())`.
637    /// Use this for shared reference data (for example: currencies, ports,
638    /// vessel types) stored as `tenant_id IS NULL`.
639    pub async fn acquire_global(&self) -> PgResult<PooledConnection> {
640        self.acquire_with_rls(qail_core::rls::RlsContext::global())
641            .await
642    }
643
644    /// Acquire a connection scoped to a specific tenant.
645    ///
646    /// Shorthand for `acquire_with_rls(RlsContext::tenant(tenant_id))`.
647    /// Use this when you already know the tenant UUID and want a
648    /// tenant-scoped connection in a single call.
649    ///
650    /// # Example
651    /// ```ignore
652    /// let mut conn = pool.acquire_for_tenant("550e8400-...").await?;
653    /// // All queries through `conn` are now scoped to this tenant
654    /// ```
655    pub async fn acquire_for_tenant(&self, tenant_id: &str) -> PgResult<PooledConnection> {
656        self.acquire_with_rls(qail_core::rls::RlsContext::tenant(tenant_id))
657            .await
658    }
659
660    /// Acquire a connection with branch context pre-configured.
661    ///
662    /// Sets PostgreSQL session variable `app.branch_id` for data virtualization.
663    /// When the connection is dropped, it automatically clears the branch context.
664    ///
665    /// # Example
666    /// ```ignore
667    /// use qail_core::branch::BranchContext;
668    ///
669    /// let ctx = BranchContext::branch("feature-auth");
670    /// let mut conn = pool.acquire_with_branch(&ctx).await?;
671    /// // All queries through `conn` are now branch-aware
672    /// ```
673    pub async fn acquire_with_branch(
674        &self,
675        ctx: &qail_core::branch::BranchContext,
676    ) -> PgResult<PooledConnection> {
677        // SAFETY: Branch context is set immediately below via branch_context_sql().
678        let mut conn = self.acquire_raw().await?;
679
680        if let Some(branch_name) = ctx.branch_name() {
681            let sql = crate::driver::branch_sql::branch_context_sql(branch_name);
682            let pg_conn = conn.get_mut()?;
683            if let Err(e) = execute_simple_with_timeout(
684                pg_conn,
685                &sql,
686                self.inner.config.connect_timeout,
687                "pool acquire_with_branch setup",
688            )
689            .await
690            {
691                if let Ok(pg_conn) = conn.get_mut() {
692                    let _ = pg_conn.execute_simple("ROLLBACK").await;
693                }
694                conn.release().await;
695                return Err(e);
696            }
697            conn.rls_dirty = true; // Reuse dirty flag for auto-reset
698        }
699
700        Ok(conn)
701    }
702
703    /// Get the current number of idle connections.
704    pub async fn idle_count(&self) -> usize {
705        self.inner.connections.lock().await.len()
706    }
707
708    /// Get the number of connections currently in use.
709    pub fn active_count(&self) -> usize {
710        self.inner.active_count.load(Ordering::Relaxed)
711    }
712
713    /// Get the maximum number of connections.
714    pub fn max_connections(&self) -> usize {
715        self.inner.config.max_connections
716    }
717
718    /// Plan auto count strategy for a given batch length.
719    pub fn plan_auto_count(&self, batch_len: usize) -> AutoCountPlan {
720        AutoCountPlan::for_pool(
721            batch_len,
722            self.inner.config.max_connections,
723            self.inner.semaphore.available_permits(),
724        )
725    }
726
727    /// Execute commands with runtime auto strategy and return count + plan.
728    pub async fn execute_count_auto_with_plan(
729        &self,
730        cmds: &[qail_core::ast::Qail],
731    ) -> PgResult<(usize, AutoCountPlan)> {
732        let plan = self.plan_auto_count(cmds.len());
733
734        let completed = match plan.path {
735            AutoCountPath::SingleCached => {
736                if cmds.is_empty() {
737                    0
738                } else {
739                    let mut conn = self.acquire_system().await?;
740                    let run_result = conn.fetch_all_cached(&cmds[0]).await;
741                    conn.release().await;
742                    let _ = run_result?;
743                    1
744                }
745            }
746            AutoCountPath::PipelineOneShot | AutoCountPath::PipelineCached => {
747                let mode = if matches!(plan.path, AutoCountPath::PipelineOneShot) {
748                    AstPipelineMode::OneShot
749                } else {
750                    AstPipelineMode::Cached
751                };
752
753                let mut pooled = self.acquire_system().await?;
754                let run_result = {
755                    let conn = pooled.get_mut()?;
756                    conn.pipeline_execute_count_ast_with_mode(cmds, mode).await
757                };
758                pooled.release().await;
759                run_result?
760            }
761            AutoCountPath::PoolParallel => {
762                if cmds.is_empty() {
763                    0
764                } else {
765                    let all_cmds = Arc::new(cmds.to_vec());
766                    let mut tasks: JoinSet<PgResult<usize>> = JoinSet::new();
767
768                    for worker in 0..plan.workers {
769                        let start = worker * plan.chunk_size;
770                        if start >= all_cmds.len() {
771                            break;
772                        }
773                        let end = (start + plan.chunk_size).min(all_cmds.len());
774                        let pool = self.clone();
775                        let all_cmds = Arc::clone(&all_cmds);
776
777                        tasks.spawn(async move {
778                            let mut pooled = pool.acquire_system().await?;
779                            let run_result = {
780                                let conn = pooled.get_mut()?;
781                                conn.pipeline_execute_count_ast_with_mode(
782                                    &all_cmds[start..end],
783                                    AstPipelineMode::Auto,
784                                )
785                                .await
786                            };
787                            pooled.release().await;
788                            run_result
789                        });
790                    }
791
792                    let mut total = 0usize;
793                    while let Some(joined) = tasks.join_next().await {
794                        match joined {
795                            Ok(Ok(count)) => {
796                                total += count;
797                            }
798                            Ok(Err(err)) => return Err(err),
799                            Err(err) => {
800                                return Err(PgError::Connection(format!(
801                                    "auto pool worker join failed: {err}"
802                                )));
803                            }
804                        }
805                    }
806                    total
807                }
808            }
809        };
810
811        Ok((completed, plan))
812    }
813
814    /// Execute commands with runtime auto strategy.
815    #[inline]
816    pub async fn execute_count_auto(&self, cmds: &[qail_core::ast::Qail]) -> PgResult<usize> {
817        let (completed, _plan) = self.execute_count_auto_with_plan(cmds).await?;
818        Ok(completed)
819    }
820
821    /// Get comprehensive pool statistics.
822    pub async fn stats(&self) -> PoolStats {
823        let idle = self.inner.connections.lock().await.len();
824        let active = self.inner.active_count.load(Ordering::Relaxed);
825        let used_slots = self
826            .inner
827            .config
828            .max_connections
829            .saturating_sub(self.inner.semaphore.available_permits());
830        PoolStats {
831            active,
832            idle,
833            pending: used_slots.saturating_sub(active),
834            max_size: self.inner.config.max_connections,
835            total_created: self.inner.total_created.load(Ordering::Relaxed),
836        }
837    }
838
839    /// Check if the pool is closed.
840    pub fn is_closed(&self) -> bool {
841        self.inner.closed.load(Ordering::Relaxed)
842    }
843
844    /// Close the pool gracefully.
845    ///
846    /// Rejects new acquires immediately, then waits up to `acquire_timeout`
847    /// for in-flight connections to be released before dropping idle
848    /// connections. Connections released after closure are destroyed by
849    /// `return_connection` and not returned to the idle queue.
850    pub async fn close(&self) {
851        self.close_graceful(self.inner.config.acquire_timeout).await;
852    }
853
854    /// Close the pool gracefully with an explicit drain timeout.
855    pub async fn close_graceful(&self, drain_timeout: Duration) {
856        self.inner.closed.store(true, Ordering::Relaxed);
857        // Wake blocked acquires immediately so shutdown doesn't wait on acquire_timeout.
858        self.inner.semaphore.close();
859
860        let deadline = Instant::now() + drain_timeout;
861        loop {
862            let active = self.inner.active_count.load(Ordering::Relaxed);
863            if active == 0 {
864                break;
865            }
866            if Instant::now() >= deadline {
867                tracing::warn!(
868                    active_connections = active,
869                    timeout_ms = drain_timeout.as_millis() as u64,
870                    "pool_close_drain_timeout: forcing idle cleanup while active connections remain"
871                );
872                break;
873            }
874            tokio::time::sleep(Duration::from_millis(25)).await;
875        }
876
877        let mut connections = self.inner.connections.lock().await;
878        let dropped_idle = connections.len();
879        connections.clear();
880        tracing::info!(
881            dropped_idle_connections = dropped_idle,
882            active_connections = self.inner.active_count.load(Ordering::Relaxed),
883            "pool_closed"
884        );
885    }
886
887    /// Create a new connection using the pool configuration.
888    async fn create_connection(config: &PoolConfig) -> PgResult<PgConnection> {
889        if !config.auth_settings.has_any_password_method()
890            && config.mtls.is_none()
891            && config.password.is_some()
892        {
893            return Err(PgError::Auth(
894                "Invalid PoolConfig: all password auth methods are disabled".to_string(),
895            ));
896        }
897
898        let options = ConnectOptions {
899            tls_mode: config.tls_mode,
900            gss_enc_mode: config.gss_enc_mode,
901            tls_ca_cert_pem: config.tls_ca_cert_pem.clone(),
902            mtls: config.mtls.clone(),
903            gss_token_provider: config.gss_token_provider,
904            gss_token_provider_ex: config.gss_token_provider_ex.clone(),
905            auth: config.auth_settings,
906            startup_params: Vec::new(),
907        };
908
909        if let Some(remaining) = gss_circuit_remaining_open(config) {
910            metrics::counter!("qail_pg_gss_circuit_open_total").increment(1);
911            tracing::warn!(
912                host = %config.host,
913                port = config.port,
914                user = %config.user,
915                db = %config.database,
916                remaining_ms = remaining.as_millis() as u64,
917                "gss_connect_circuit_open"
918            );
919            return Err(PgError::Connection(format!(
920                "GSS connection circuit is open; retry after {:?}",
921                remaining
922            )));
923        }
924
925        let mut attempt = 0usize;
926        loop {
927            let connect_result = tokio::time::timeout(
928                config.connect_timeout,
929                PgConnection::connect_with_options(
930                    &config.host,
931                    config.port,
932                    &config.user,
933                    &config.database,
934                    config.password.as_deref(),
935                    options.clone(),
936                ),
937            )
938            .await;
939
940            let connect_result = match connect_result {
941                Ok(result) => result,
942                Err(_) => Err(PgError::Timeout(format!(
943                    "connect timeout after {:?} (pool config connect_timeout)",
944                    config.connect_timeout
945                ))),
946            };
947
948            match connect_result {
949                Ok(conn) => {
950                    metrics::counter!("qail_pg_pool_connect_success_total").increment(1);
951                    gss_circuit_record_success(config);
952                    return Ok(conn);
953                }
954                Err(err) if should_retry_gss_connect_error(config, attempt, &err) => {
955                    metrics::counter!("qail_pg_gss_connect_retries_total").increment(1);
956                    gss_circuit_record_failure(config);
957                    let delay = gss_retry_delay(config.gss_retry_base_delay, attempt);
958                    tracing::warn!(
959                        host = %config.host,
960                        port = config.port,
961                        user = %config.user,
962                        db = %config.database,
963                        attempt = attempt + 1,
964                        delay_ms = delay.as_millis() as u64,
965                        error = %err,
966                        "gss_connect_retry"
967                    );
968                    tokio::time::sleep(delay).await;
969                    attempt += 1;
970                }
971                Err(err) => {
972                    metrics::counter!("qail_pg_pool_connect_failures_total").increment(1);
973                    if should_track_gss_circuit_error(config, &err) {
974                        metrics::counter!("qail_pg_gss_connect_failures_total").increment(1);
975                        gss_circuit_record_failure(config);
976                    }
977                    return Err(err);
978                }
979            }
980        }
981    }
982
983    /// Run one maintenance cycle: evict stale idle connections and backfill
984    /// to `min_connections`. Called periodically by `spawn_pool_maintenance`.
985    pub async fn maintain(&self) {
986        if self.inner.closed.load(Ordering::Relaxed) {
987            return;
988        }
989
990        // Phase 1: Evict idle and expired connections from the pool.
991        let evicted = {
992            let mut connections = self.inner.connections.lock().await;
993            let before = connections.len();
994            connections.retain(|pooled| {
995                if pooled.last_used.elapsed() > self.inner.config.idle_timeout {
996                    record_pool_connection_destroy("idle_sweep_evict");
997                    return false;
998                }
999                if let Some(max_life) = self.inner.config.max_lifetime
1000                    && pooled.created_at.elapsed() > max_life
1001                {
1002                    record_pool_connection_destroy("lifetime_sweep_evict");
1003                    return false;
1004                }
1005                true
1006            });
1007            before - connections.len()
1008        };
1009
1010        if evicted > 0 {
1011            tracing::debug!(evicted, "pool_maintenance: evicted stale idle connections");
1012        }
1013
1014        // Phase 2: Backfill to min_connections if below threshold.
1015        let min = self.inner.config.min_connections;
1016        if min == 0 {
1017            return;
1018        }
1019
1020        let idle_count = self.inner.connections.lock().await.len();
1021        let checked_out_slots = self
1022            .inner
1023            .config
1024            .max_connections
1025            .saturating_sub(self.inner.semaphore.available_permits());
1026        let deficit = maintenance_backfill_deficit(
1027            self.inner.config.max_connections,
1028            min,
1029            idle_count,
1030            checked_out_slots,
1031        );
1032        if deficit == 0 {
1033            return;
1034        }
1035        let mut created = 0usize;
1036        for _ in 0..deficit {
1037            match Self::create_connection(&self.inner.config).await {
1038                Ok(conn) => {
1039                    self.inner.total_created.fetch_add(1, Ordering::Relaxed);
1040                    let mut connections = self.inner.connections.lock().await;
1041                    if connections.len() < self.inner.config.max_connections {
1042                        connections.push(PooledConn {
1043                            conn,
1044                            created_at: Instant::now(),
1045                            last_used: Instant::now(),
1046                        });
1047                        created += 1;
1048                    } else {
1049                        // Pool filled by concurrent acquires; stop backfill.
1050                        break;
1051                    }
1052                }
1053                Err(e) => {
1054                    tracing::warn!(error = %e, "pool_maintenance: backfill connection failed");
1055                    break; // Transient failure — retry next cycle.
1056                }
1057            }
1058        }
1059
1060        if created > 0 {
1061            tracing::debug!(
1062                created,
1063                min_connections = min,
1064                "pool_maintenance: backfilled idle connections"
1065            );
1066        }
1067    }
1068}
1069
1070/// Spawn a background task that periodically maintains pool health.
1071///
1072/// Runs every `idle_timeout / 2` (min 5s): evicts stale idle connections and
1073/// backfills to `min_connections`. Call once after `PgPool::connect`.
1074pub fn spawn_pool_maintenance(pool: PgPool) {
1075    let interval_secs = std::cmp::max(pool.inner.config.idle_timeout.as_secs() / 2, 5);
1076    tokio::spawn(async move {
1077        let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1078        loop {
1079            interval.tick().await;
1080            if pool.is_closed() {
1081                break;
1082            }
1083            pool.maintain().await;
1084        }
1085    });
1086}
1087
1088pub(super) fn maintenance_backfill_deficit(
1089    max_connections: usize,
1090    min_connections: usize,
1091    idle_count: usize,
1092    checked_out_slots: usize,
1093) -> usize {
1094    let target_idle = min_connections.min(max_connections);
1095    if idle_count >= target_idle {
1096        return 0;
1097    }
1098
1099    let needed_idle = target_idle - idle_count;
1100    let available_slots =
1101        max_connections.saturating_sub(idle_count.saturating_add(checked_out_slots));
1102    needed_idle.min(available_slots)
1103}
1104
1105pub(super) fn validate_pool_config(config: &PoolConfig) -> PgResult<()> {
1106    if config.max_connections == 0 {
1107        return Err(PgError::Connection(
1108            "Invalid PoolConfig: max_connections must be >= 1".to_string(),
1109        ));
1110    }
1111    if config.min_connections > config.max_connections {
1112        return Err(PgError::Connection(format!(
1113            "Invalid PoolConfig: min_connections ({}) must be <= max_connections ({})",
1114            config.min_connections, config.max_connections
1115        )));
1116    }
1117    if config.acquire_timeout.is_zero() {
1118        return Err(PgError::Connection(
1119            "Invalid PoolConfig: acquire_timeout must be > 0".to_string(),
1120        ));
1121    }
1122    if config.connect_timeout.is_zero() {
1123        return Err(PgError::Connection(
1124            "Invalid PoolConfig: connect_timeout must be > 0".to_string(),
1125        ));
1126    }
1127    if config.leaked_cleanup_queue == 0 {
1128        return Err(PgError::Connection(
1129            "Invalid PoolConfig: leaked_cleanup_queue must be >= 1".to_string(),
1130        ));
1131    }
1132    Ok(())
1133}
1134
1135pub(super) async fn execute_simple_with_timeout(
1136    conn: &mut PgConnection,
1137    sql: &str,
1138    timeout: Duration,
1139    operation: &str,
1140) -> PgResult<()> {
1141    match tokio::time::timeout(timeout, conn.execute_simple(sql)).await {
1142        Ok(result) => result,
1143        Err(_) => {
1144            conn.mark_io_desynced();
1145            Err(PgError::Timeout(format!(
1146                "{} timeout after {:?} (pool config connect_timeout)",
1147                operation, timeout
1148            )))
1149        }
1150    }
1151}