Skip to main content

qail_pg/driver/pool/
lifecycle.rs

1//! Pool lifecycle: PgPoolInner, PgPool core (connect, maintain, close),
2//! hot statement pre-prepare, and connection creation.
3
4use super::ScopedPoolFuture;
5use super::churn::{
6    PoolStats, decrement_active_count_saturating, pool_churn_record_destroy,
7    pool_churn_remaining_open, record_pool_connection_destroy,
8};
9use super::config::PoolConfig;
10use super::connection::PooledConn;
11use super::connection::PooledConnection;
12use super::gss::*;
13use crate::driver::{
14    AstPipelineMode, AutoCountPath, AutoCountPlan, ConnectOptions, PgConnection, PgError, PgResult,
15    is_ignorable_session_message, unexpected_backend_message,
16};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::time::{Duration, Instant};
20use tokio::sync::{Mutex, Semaphore};
21use tokio::task::JoinSet;
22
23/// Maximum number of hot statements to track globally.
24pub(super) const MAX_HOT_STATEMENTS: usize = 32;
25
26/// Inner pool state (shared across clones).
27pub(super) struct PgPoolInner {
28    pub(super) config: PoolConfig,
29    pub(super) connections: Mutex<Vec<PooledConn>>,
30    pub(super) semaphore: Semaphore,
31    pub(super) closed: AtomicBool,
32    pub(super) active_count: AtomicUsize,
33    pub(super) total_created: AtomicUsize,
34    pub(super) leaked_cleanup_inflight: AtomicUsize,
35    /// Global registry of frequently-used prepared statements.
36    /// Maps sql_hash → (stmt_name, sql_text).
37    /// New connections pre-prepare these on checkout for instant cache hits.
38    pub(super) hot_statements: std::sync::RwLock<std::collections::HashMap<u64, (String, String)>>,
39}
40
41pub(super) fn handle_hot_preprepare_message(
42    msg: &crate::protocol::BackendMessage,
43    parse_complete_count: &mut usize,
44    error: &mut Option<PgError>,
45) -> PgResult<bool> {
46    match msg {
47        crate::protocol::BackendMessage::ParseComplete => {
48            *parse_complete_count += 1;
49            Ok(false)
50        }
51        crate::protocol::BackendMessage::ErrorResponse(err) => {
52            if error.is_none() {
53                *error = Some(PgError::QueryServer(err.clone().into()));
54            }
55            Ok(false)
56        }
57        crate::protocol::BackendMessage::ReadyForQuery(_) => Ok(true),
58        msg if is_ignorable_session_message(msg) => Ok(false),
59        other => Err(unexpected_backend_message("pool hot pre-prepare", other)),
60    }
61}
62
63impl PgPoolInner {
64    pub(super) async fn return_connection(&self, conn: PgConnection, created_at: Instant) {
65        decrement_active_count_saturating(&self.active_count);
66
67        if conn.is_io_desynced() {
68            tracing::warn!(
69                host = %self.config.host,
70                port = self.config.port,
71                user = %self.config.user,
72                db = %self.config.database,
73                "pool_return_desynced: dropping connection due to prior I/O/protocol desync"
74            );
75            record_pool_connection_destroy("pool_desynced_drop");
76            self.semaphore.add_permits(1);
77            pool_churn_record_destroy(&self.config, "return_desynced");
78            return;
79        }
80
81        if self.closed.load(Ordering::Relaxed) {
82            record_pool_connection_destroy("pool_closed_drop");
83            self.semaphore.add_permits(1);
84            return;
85        }
86
87        let mut connections = self.connections.lock().await;
88        if connections.len() < self.config.max_connections {
89            connections.push(PooledConn {
90                conn,
91                created_at,
92                last_used: Instant::now(),
93            });
94        } else {
95            record_pool_connection_destroy("pool_overflow_drop");
96        }
97
98        self.semaphore.add_permits(1);
99    }
100
101    /// Get a healthy connection from the pool, or None if pool is empty.
102    async fn get_healthy_connection(&self) -> Option<PooledConn> {
103        let mut connections = self.connections.lock().await;
104
105        while let Some(pooled) = connections.pop() {
106            if pooled.last_used.elapsed() > self.config.idle_timeout {
107                tracing::debug!(
108                    idle_secs = pooled.last_used.elapsed().as_secs(),
109                    timeout_secs = self.config.idle_timeout.as_secs(),
110                    "pool_checkout_evict: connection exceeded idle timeout"
111                );
112                record_pool_connection_destroy("idle_timeout_evict");
113                continue;
114            }
115
116            if let Some(max_life) = self.config.max_lifetime
117                && pooled.created_at.elapsed() > max_life
118            {
119                tracing::debug!(
120                    age_secs = pooled.created_at.elapsed().as_secs(),
121                    max_lifetime_secs = max_life.as_secs(),
122                    "pool_checkout_evict: connection exceeded max lifetime"
123                );
124                record_pool_connection_destroy("max_lifetime_evict");
125                continue;
126            }
127
128            return Some(pooled);
129        }
130
131        None
132    }
133}
134
135/// # Example
136/// ```ignore
137/// let config = PoolConfig::new("localhost", 5432, "user", "db")
138///     .password("secret")
139///     .max_connections(20);
140/// let pool = PgPool::connect(config).await?;
141/// // Get a connection from the pool
142/// let mut conn = pool.acquire_raw().await?;
143/// conn.simple_query("SELECT 1").await?;
144/// ```
145#[derive(Clone)]
146pub struct PgPool {
147    pub(super) inner: Arc<PgPoolInner>,
148}
149
150impl PgPool {
151    /// Create a pool from `qail.toml` (loads and parses automatically).
152    ///
153    /// # Example
154    /// ```ignore
155    /// let pool = PgPool::from_config().await?;
156    /// ```
157    pub async fn from_config() -> PgResult<Self> {
158        let qail = qail_core::config::QailConfig::load()
159            .map_err(|e| PgError::Connection(format!("Config error: {}", e)))?;
160        let config = PoolConfig::from_qail_config(&qail)?;
161        Self::connect(config).await
162    }
163
164    /// Create a new connection pool.
165    pub async fn connect(config: PoolConfig) -> PgResult<Self> {
166        validate_pool_config(&config)?;
167
168        // Semaphore starts with max_connections permits
169        let semaphore = Semaphore::new(config.max_connections);
170
171        let mut initial_connections = Vec::new();
172        for _ in 0..config.min_connections {
173            let conn = Self::create_connection(&config).await?;
174            initial_connections.push(PooledConn {
175                conn,
176                created_at: Instant::now(),
177                last_used: Instant::now(),
178            });
179        }
180
181        let initial_count = initial_connections.len();
182
183        let inner = Arc::new(PgPoolInner {
184            config,
185            connections: Mutex::new(initial_connections),
186            semaphore,
187            closed: AtomicBool::new(false),
188            active_count: AtomicUsize::new(0),
189            total_created: AtomicUsize::new(initial_count),
190            leaked_cleanup_inflight: AtomicUsize::new(0),
191            hot_statements: std::sync::RwLock::new(std::collections::HashMap::new()),
192        });
193
194        Ok(Self { inner })
195    }
196
197    /// Acquire a raw connection from the pool (crate-internal only).
198    ///
199    /// # Safety (not `unsafe` in the Rust sense, but security-critical)
200    ///
201    /// This returns a connection with **no RLS context**. All tenant data
202    /// queries on this connection will bypass row-level security.
203    ///
204    /// **Safe usage**: Pair with `fetch_all_with_rls()` for pipelined
205    /// RLS+query execution (single roundtrip). Or use `acquire_with_rls()`
206    /// / `acquire_with_rls_timeout()` for the 2-roundtrip path.
207    ///
208    /// **Unsafe usage**: Running queries directly on a raw connection
209    /// without RLS context. Every call site MUST include a `// SAFETY:`
210    /// comment explaining why raw acquisition is justified.
211    pub async fn acquire_raw(&self) -> PgResult<PooledConnection> {
212        if self.inner.closed.load(Ordering::Relaxed) {
213            return Err(PgError::PoolClosed);
214        }
215
216        if let Some(remaining) = pool_churn_remaining_open(&self.inner.config) {
217            metrics::counter!("qail_pg_pool_churn_circuit_reject_total").increment(1);
218            tracing::warn!(
219                host = %self.inner.config.host,
220                port = self.inner.config.port,
221                user = %self.inner.config.user,
222                db = %self.inner.config.database,
223                remaining_ms = remaining.as_millis() as u64,
224                "pool_connection_churn_circuit_open"
225            );
226            return Err(PgError::PoolExhausted {
227                max: self.inner.config.max_connections,
228            });
229        }
230
231        // Wait for available slot with timeout
232        let acquire_timeout = self.inner.config.acquire_timeout;
233        let permit =
234            match tokio::time::timeout(acquire_timeout, self.inner.semaphore.acquire()).await {
235                Ok(permit) => permit.map_err(|_| PgError::PoolClosed)?,
236                Err(_) => {
237                    metrics::counter!("qail_pg_pool_acquire_timeouts_total").increment(1);
238                    return Err(PgError::Timeout(format!(
239                        "pool acquire after {}s ({} max connections)",
240                        acquire_timeout.as_secs(),
241                        self.inner.config.max_connections
242                    )));
243                }
244            };
245
246        if self.inner.closed.load(Ordering::Relaxed) {
247            return Err(PgError::PoolClosed);
248        }
249
250        // Try to get existing healthy connection
251        let (mut conn, mut created_at) =
252            if let Some(pooled) = self.inner.get_healthy_connection().await {
253                (pooled.conn, pooled.created_at)
254            } else {
255                let conn = Self::create_connection(&self.inner.config).await?;
256                self.inner.total_created.fetch_add(1, Ordering::Relaxed);
257                (conn, Instant::now())
258            };
259
260        if self.inner.config.test_on_acquire
261            && let Err(e) = execute_simple_with_timeout(
262                &mut conn,
263                "SELECT 1",
264                self.inner.config.connect_timeout,
265                "pool checkout health check",
266            )
267            .await
268        {
269            tracing::warn!(
270                host = %self.inner.config.host,
271                port = self.inner.config.port,
272                user = %self.inner.config.user,
273                db = %self.inner.config.database,
274                error = %e,
275                "pool_health_check_failed: checkout probe failed, creating replacement connection"
276            );
277            pool_churn_record_destroy(&self.inner.config, "health_check_failed");
278            conn = Self::create_connection(&self.inner.config).await?;
279            self.inner.total_created.fetch_add(1, Ordering::Relaxed);
280            created_at = Instant::now();
281        }
282
283        // Pre-prepare hot statements that this connection doesn't have yet.
284        // Collect data synchronously (guard dropped before async work).
285        let missing: Vec<(u64, String, String)> = {
286            if let Ok(hot) = self.inner.hot_statements.read() {
287                hot.iter()
288                    .filter(|(hash, _)| !conn.stmt_cache.contains(hash))
289                    .map(|(hash, (name, sql))| (*hash, name.clone(), sql.clone()))
290                    .collect()
291            } else {
292                Vec::new()
293            }
294        }; // RwLockReadGuard dropped here — safe across .await
295
296        if !missing.is_empty() {
297            use crate::protocol::PgEncoder;
298            let mut buf = bytes::BytesMut::new();
299            for (_, name, sql) in &missing {
300                let parse_msg = PgEncoder::try_encode_parse(name, sql, &[])?;
301                buf.extend_from_slice(&parse_msg);
302            }
303            PgEncoder::encode_sync_to(&mut buf);
304            let preprepare_timeout = self.inner.config.connect_timeout;
305            let preprepare_result: PgResult<()> = match tokio::time::timeout(
306                preprepare_timeout,
307                async {
308                    conn.send_bytes(&buf).await?;
309                    // Drain responses and fail closed on any parse error.
310                    let mut parse_complete_count = 0usize;
311                    let mut parse_error: Option<PgError> = None;
312                    loop {
313                        let msg = conn.recv().await?;
314                        if handle_hot_preprepare_message(
315                            &msg,
316                            &mut parse_complete_count,
317                            &mut parse_error,
318                        )? {
319                            if let Some(err) = parse_error {
320                                return Err(err);
321                            }
322                            if parse_complete_count != missing.len() {
323                                return Err(PgError::Protocol(format!(
324                                    "hot pre-prepare completed with {} ParseComplete messages (expected {})",
325                                    parse_complete_count,
326                                    missing.len()
327                                )));
328                            }
329                            break;
330                        }
331                    }
332                    Ok::<(), PgError>(())
333                },
334            )
335            .await
336            {
337                Ok(res) => res,
338                Err(_) => Err(PgError::Timeout(format!(
339                    "hot statement pre-prepare timeout after {:?} (pool config connect_timeout)",
340                    preprepare_timeout
341                ))),
342            };
343
344            if let Err(e) = preprepare_result {
345                tracing::warn!(
346                    host = %self.inner.config.host,
347                    port = self.inner.config.port,
348                    user = %self.inner.config.user,
349                    db = %self.inner.config.database,
350                    timeout_ms = preprepare_timeout.as_millis() as u64,
351                    error = %e,
352                    "pool_hot_prepare_failed: replacing connection to avoid handing out uncertain protocol state"
353                );
354                pool_churn_record_destroy(&self.inner.config, "hot_prepare_failed");
355                conn = Self::create_connection(&self.inner.config).await?;
356                self.inner.total_created.fetch_add(1, Ordering::Relaxed);
357                created_at = Instant::now();
358            } else {
359                // Register in local cache
360                for (hash, name, sql) in &missing {
361                    conn.stmt_cache.put(*hash, name.clone());
362                    conn.prepared_statements.insert(name.clone(), sql.clone());
363                }
364            }
365        }
366
367        self.inner.active_count.fetch_add(1, Ordering::Relaxed);
368        // Permit is intentionally detached here; returned by `release()` / pool return.
369        permit.forget();
370
371        Ok(PooledConnection {
372            conn: Some(conn),
373            pool: self.inner.clone(),
374            rls_dirty: false,
375            created_at,
376        })
377    }
378
379    /// Acquire a connection with RLS context pre-configured.
380    ///
381    /// Sets PostgreSQL session variables for tenant isolation before
382    /// returning the connection. When the connection is dropped, it
383    /// automatically clears the RLS context before returning to the pool.
384    ///
385    /// # Example
386    /// ```ignore
387    /// use qail_core::rls::RlsContext;
388    ///
389    /// let mut conn = pool.acquire_with_rls(
390    ///     RlsContext::tenant("550e8400-e29b-41d4-a716-446655440000")
391    /// ).await?;
392    /// // All queries through `conn` are now scoped to this tenant
393    /// ```
394    pub async fn acquire_with_rls(
395        &self,
396        ctx: qail_core::rls::RlsContext,
397    ) -> PgResult<PooledConnection> {
398        // SAFETY: RLS context is set immediately below via context_to_sql().
399        let mut conn = self.acquire_raw().await?;
400
401        // Set RLS context on the raw connection
402        let sql = crate::driver::rls::context_to_sql(&ctx);
403        let pg_conn = conn.get_mut()?;
404        if let Err(e) = execute_simple_with_timeout(
405            pg_conn,
406            &sql,
407            self.inner.config.connect_timeout,
408            "pool acquire_with_rls setup",
409        )
410        .await
411        {
412            // Attempt recovery ROLLBACK to salvage the connection rather than
413            // letting Drop destroy it (which wastes a TCP connection).
414            if let Ok(pg_conn) = conn.get_mut() {
415                let _ = pg_conn.execute_simple("ROLLBACK").await;
416            }
417            conn.release().await;
418            return Err(e);
419        }
420
421        // Mark dirty so Drop resets context before pool return
422        conn.rls_dirty = true;
423
424        Ok(conn)
425    }
426
427    /// Scoped connection helper that guarantees `release()` after closure execution.
428    ///
429    /// Prefer this over manual `acquire_with_rls()` in normal request handlers.
430    pub async fn with_rls<T, F>(&self, ctx: qail_core::rls::RlsContext, f: F) -> PgResult<T>
431    where
432        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
433    {
434        let mut conn = self.acquire_with_rls(ctx).await?;
435        let out = f(&mut conn).await;
436        conn.release().await;
437        out
438    }
439
440    /// Scoped helper for system-level operations (`RlsContext::empty()`).
441    pub async fn with_system<T, F>(&self, f: F) -> PgResult<T>
442    where
443        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
444    {
445        self.with_rls(qail_core::rls::RlsContext::empty(), f).await
446    }
447
448    /// Scoped helper for global/platform row access (`tenant_id IS NULL`).
449    pub async fn with_global<T, F>(&self, f: F) -> PgResult<T>
450    where
451        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
452    {
453        self.with_rls(qail_core::rls::RlsContext::global(), f).await
454    }
455
456    /// Scoped helper for single-tenant access.
457    pub async fn with_tenant<T, F>(&self, tenant_id: &str, f: F) -> PgResult<T>
458    where
459        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
460    {
461        self.with_rls(qail_core::rls::RlsContext::tenant(tenant_id), f)
462            .await
463    }
464
465    /// Acquire a connection with RLS context AND statement timeout.
466    ///
467    /// Like `acquire_with_rls()`, but also sets `statement_timeout` to prevent
468    /// runaway queries from holding pool connections indefinitely.
469    pub async fn acquire_with_rls_timeout(
470        &self,
471        ctx: qail_core::rls::RlsContext,
472        timeout_ms: u32,
473    ) -> PgResult<PooledConnection> {
474        // SAFETY: RLS context + timeout set immediately below via context_to_sql_with_timeout().
475        let mut conn = self.acquire_raw().await?;
476
477        // Set RLS context + statement_timeout atomically
478        let sql = crate::driver::rls::context_to_sql_with_timeout(&ctx, timeout_ms);
479        let pg_conn = conn.get_mut()?;
480        if let Err(e) = execute_simple_with_timeout(
481            pg_conn,
482            &sql,
483            self.inner.config.connect_timeout,
484            "pool acquire_with_rls_timeout setup",
485        )
486        .await
487        {
488            if let Ok(pg_conn) = conn.get_mut() {
489                let _ = pg_conn.execute_simple("ROLLBACK").await;
490            }
491            conn.release().await;
492            return Err(e);
493        }
494
495        // Mark dirty so Drop resets context + timeout before pool return
496        conn.rls_dirty = true;
497
498        Ok(conn)
499    }
500
501    /// Scoped connection helper that guarantees `release()` after closure execution.
502    pub async fn with_rls_timeout<T, F>(
503        &self,
504        ctx: qail_core::rls::RlsContext,
505        timeout_ms: u32,
506        f: F,
507    ) -> PgResult<T>
508    where
509        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
510    {
511        let mut conn = self.acquire_with_rls_timeout(ctx, timeout_ms).await?;
512        let out = f(&mut conn).await;
513        conn.release().await;
514        out
515    }
516
517    /// Acquire a connection with RLS context, statement timeout, AND lock timeout.
518    ///
519    /// Like `acquire_with_rls_timeout()`, but also sets `lock_timeout` to prevent
520    /// queries from blocking indefinitely on row/table locks.
521    /// When `lock_timeout_ms` is 0, the lock_timeout clause is omitted.
522    pub async fn acquire_with_rls_timeouts(
523        &self,
524        ctx: qail_core::rls::RlsContext,
525        statement_timeout_ms: u32,
526        lock_timeout_ms: u32,
527    ) -> PgResult<PooledConnection> {
528        // SAFETY: RLS context + timeouts set immediately below via context_to_sql_with_timeouts().
529        let mut conn = self.acquire_raw().await?;
530
531        let sql = crate::driver::rls::context_to_sql_with_timeouts(
532            &ctx,
533            statement_timeout_ms,
534            lock_timeout_ms,
535        );
536        let pg_conn = conn.get_mut()?;
537        if let Err(e) = execute_simple_with_timeout(
538            pg_conn,
539            &sql,
540            self.inner.config.connect_timeout,
541            "pool acquire_with_rls_timeouts setup",
542        )
543        .await
544        {
545            if let Ok(pg_conn) = conn.get_mut() {
546                let _ = pg_conn.execute_simple("ROLLBACK").await;
547            }
548            conn.release().await;
549            return Err(e);
550        }
551
552        conn.rls_dirty = true;
553
554        Ok(conn)
555    }
556
557    /// Scoped connection helper that guarantees `release()` after closure execution.
558    pub async fn with_rls_timeouts<T, F>(
559        &self,
560        ctx: qail_core::rls::RlsContext,
561        statement_timeout_ms: u32,
562        lock_timeout_ms: u32,
563        f: F,
564    ) -> PgResult<T>
565    where
566        F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
567    {
568        let mut conn = self
569            .acquire_with_rls_timeouts(ctx, statement_timeout_ms, lock_timeout_ms)
570            .await?;
571        let out = f(&mut conn).await;
572        conn.release().await;
573        out
574    }
575
576    /// Acquire a connection for system-level operations (no tenant context).
577    ///
578    /// Sets RLS session variables to maximally restrictive values:
579    /// - `app.current_tenant_id = ''`
580    /// - `app.current_agent_id = ''`  
581    /// - `app.is_super_admin = false`
582    ///
583    /// Use this for startup introspection, migrations, and health checks
584    /// that must not operate within any tenant scope.
585    pub async fn acquire_system(&self) -> PgResult<PooledConnection> {
586        let ctx = qail_core::rls::RlsContext::empty();
587        self.acquire_with_rls(ctx).await
588    }
589
590    /// Acquire a connection scoped to global/platform rows.
591    ///
592    /// Shorthand for `acquire_with_rls(RlsContext::global())`.
593    /// Use this for shared reference data (for example: currencies, ports,
594    /// vessel types) stored as `tenant_id IS NULL`.
595    pub async fn acquire_global(&self) -> PgResult<PooledConnection> {
596        self.acquire_with_rls(qail_core::rls::RlsContext::global())
597            .await
598    }
599
600    /// Acquire a connection scoped to a specific tenant.
601    ///
602    /// Shorthand for `acquire_with_rls(RlsContext::tenant(tenant_id))`.
603    /// Use this when you already know the tenant UUID and want a
604    /// tenant-scoped connection in a single call.
605    ///
606    /// # Example
607    /// ```ignore
608    /// let mut conn = pool.acquire_for_tenant("550e8400-...").await?;
609    /// // All queries through `conn` are now scoped to this tenant
610    /// ```
611    pub async fn acquire_for_tenant(&self, tenant_id: &str) -> PgResult<PooledConnection> {
612        self.acquire_with_rls(qail_core::rls::RlsContext::tenant(tenant_id))
613            .await
614    }
615
616    /// Acquire a connection with branch context pre-configured.
617    ///
618    /// Sets PostgreSQL session variable `app.branch_id` for data virtualization.
619    /// When the connection is dropped, it automatically clears the branch context.
620    ///
621    /// # Example
622    /// ```ignore
623    /// use qail_core::branch::BranchContext;
624    ///
625    /// let ctx = BranchContext::branch("feature-auth");
626    /// let mut conn = pool.acquire_with_branch(&ctx).await?;
627    /// // All queries through `conn` are now branch-aware
628    /// ```
629    pub async fn acquire_with_branch(
630        &self,
631        ctx: &qail_core::branch::BranchContext,
632    ) -> PgResult<PooledConnection> {
633        // SAFETY: Branch context is set immediately below via branch_context_sql().
634        let mut conn = self.acquire_raw().await?;
635
636        if let Some(branch_name) = ctx.branch_name() {
637            let sql = crate::driver::branch_sql::branch_context_sql(branch_name);
638            let pg_conn = conn.get_mut()?;
639            if let Err(e) = execute_simple_with_timeout(
640                pg_conn,
641                &sql,
642                self.inner.config.connect_timeout,
643                "pool acquire_with_branch setup",
644            )
645            .await
646            {
647                if let Ok(pg_conn) = conn.get_mut() {
648                    let _ = pg_conn.execute_simple("ROLLBACK").await;
649                }
650                conn.release().await;
651                return Err(e);
652            }
653            conn.rls_dirty = true; // Reuse dirty flag for auto-reset
654        }
655
656        Ok(conn)
657    }
658
659    /// Get the current number of idle connections.
660    pub async fn idle_count(&self) -> usize {
661        self.inner.connections.lock().await.len()
662    }
663
664    /// Get the number of connections currently in use.
665    pub fn active_count(&self) -> usize {
666        self.inner.active_count.load(Ordering::Relaxed)
667    }
668
669    /// Get the maximum number of connections.
670    pub fn max_connections(&self) -> usize {
671        self.inner.config.max_connections
672    }
673
674    /// Plan auto count strategy for a given batch length.
675    pub fn plan_auto_count(&self, batch_len: usize) -> AutoCountPlan {
676        AutoCountPlan::for_pool(
677            batch_len,
678            self.inner.config.max_connections,
679            self.inner.semaphore.available_permits(),
680        )
681    }
682
683    /// Execute commands with runtime auto strategy and return count + plan.
684    pub async fn execute_count_auto_with_plan(
685        &self,
686        cmds: &[qail_core::ast::Qail],
687    ) -> PgResult<(usize, AutoCountPlan)> {
688        let plan = self.plan_auto_count(cmds.len());
689
690        let completed = match plan.path {
691            AutoCountPath::SingleCached => {
692                if cmds.is_empty() {
693                    0
694                } else {
695                    let mut conn = self.acquire_system().await?;
696                    let run_result = conn.fetch_all_cached(&cmds[0]).await;
697                    conn.release().await;
698                    let _ = run_result?;
699                    1
700                }
701            }
702            AutoCountPath::PipelineOneShot | AutoCountPath::PipelineCached => {
703                let mode = if matches!(plan.path, AutoCountPath::PipelineOneShot) {
704                    AstPipelineMode::OneShot
705                } else {
706                    AstPipelineMode::Cached
707                };
708
709                let mut pooled = self.acquire_system().await?;
710                let run_result = {
711                    let conn = pooled.get_mut()?;
712                    conn.pipeline_execute_count_ast_with_mode(cmds, mode).await
713                };
714                pooled.release().await;
715                run_result?
716            }
717            AutoCountPath::PoolParallel => {
718                if cmds.is_empty() {
719                    0
720                } else {
721                    let all_cmds = Arc::new(cmds.to_vec());
722                    let mut tasks: JoinSet<PgResult<usize>> = JoinSet::new();
723
724                    for worker in 0..plan.workers {
725                        let start = worker * plan.chunk_size;
726                        if start >= all_cmds.len() {
727                            break;
728                        }
729                        let end = (start + plan.chunk_size).min(all_cmds.len());
730                        let pool = self.clone();
731                        let all_cmds = Arc::clone(&all_cmds);
732
733                        tasks.spawn(async move {
734                            let mut pooled = pool.acquire_system().await?;
735                            let run_result = {
736                                let conn = pooled.get_mut()?;
737                                conn.pipeline_execute_count_ast_with_mode(
738                                    &all_cmds[start..end],
739                                    AstPipelineMode::Auto,
740                                )
741                                .await
742                            };
743                            pooled.release().await;
744                            run_result
745                        });
746                    }
747
748                    let mut total = 0usize;
749                    while let Some(joined) = tasks.join_next().await {
750                        match joined {
751                            Ok(Ok(count)) => {
752                                total += count;
753                            }
754                            Ok(Err(err)) => return Err(err),
755                            Err(err) => {
756                                return Err(PgError::Connection(format!(
757                                    "auto pool worker join failed: {err}"
758                                )));
759                            }
760                        }
761                    }
762                    total
763                }
764            }
765        };
766
767        Ok((completed, plan))
768    }
769
770    /// Execute commands with runtime auto strategy.
771    #[inline]
772    pub async fn execute_count_auto(&self, cmds: &[qail_core::ast::Qail]) -> PgResult<usize> {
773        let (completed, _plan) = self.execute_count_auto_with_plan(cmds).await?;
774        Ok(completed)
775    }
776
777    /// Get comprehensive pool statistics.
778    pub async fn stats(&self) -> PoolStats {
779        let idle = self.inner.connections.lock().await.len();
780        let active = self.inner.active_count.load(Ordering::Relaxed);
781        let used_slots = self
782            .inner
783            .config
784            .max_connections
785            .saturating_sub(self.inner.semaphore.available_permits());
786        PoolStats {
787            active,
788            idle,
789            pending: used_slots.saturating_sub(active),
790            max_size: self.inner.config.max_connections,
791            total_created: self.inner.total_created.load(Ordering::Relaxed),
792        }
793    }
794
795    /// Check if the pool is closed.
796    pub fn is_closed(&self) -> bool {
797        self.inner.closed.load(Ordering::Relaxed)
798    }
799
800    /// Close the pool gracefully.
801    ///
802    /// Rejects new acquires immediately, then waits up to `acquire_timeout`
803    /// for in-flight connections to be released before dropping idle
804    /// connections. Connections released after closure are destroyed by
805    /// `return_connection` and not returned to the idle queue.
806    pub async fn close(&self) {
807        self.close_graceful(self.inner.config.acquire_timeout).await;
808    }
809
810    /// Close the pool gracefully with an explicit drain timeout.
811    pub async fn close_graceful(&self, drain_timeout: Duration) {
812        self.inner.closed.store(true, Ordering::Relaxed);
813        // Wake blocked acquires immediately so shutdown doesn't wait on acquire_timeout.
814        self.inner.semaphore.close();
815
816        let deadline = Instant::now() + drain_timeout;
817        loop {
818            let active = self.inner.active_count.load(Ordering::Relaxed);
819            if active == 0 {
820                break;
821            }
822            if Instant::now() >= deadline {
823                tracing::warn!(
824                    active_connections = active,
825                    timeout_ms = drain_timeout.as_millis() as u64,
826                    "pool_close_drain_timeout: forcing idle cleanup while active connections remain"
827                );
828                break;
829            }
830            tokio::time::sleep(Duration::from_millis(25)).await;
831        }
832
833        let mut connections = self.inner.connections.lock().await;
834        let dropped_idle = connections.len();
835        connections.clear();
836        tracing::info!(
837            dropped_idle_connections = dropped_idle,
838            active_connections = self.inner.active_count.load(Ordering::Relaxed),
839            "pool_closed"
840        );
841    }
842
843    /// Create a new connection using the pool configuration.
844    async fn create_connection(config: &PoolConfig) -> PgResult<PgConnection> {
845        if !config.auth_settings.has_any_password_method()
846            && config.mtls.is_none()
847            && config.password.is_some()
848        {
849            return Err(PgError::Auth(
850                "Invalid PoolConfig: all password auth methods are disabled".to_string(),
851            ));
852        }
853
854        let options = ConnectOptions {
855            tls_mode: config.tls_mode,
856            gss_enc_mode: config.gss_enc_mode,
857            tls_ca_cert_pem: config.tls_ca_cert_pem.clone(),
858            mtls: config.mtls.clone(),
859            gss_token_provider: config.gss_token_provider,
860            gss_token_provider_ex: config.gss_token_provider_ex.clone(),
861            auth: config.auth_settings,
862            startup_params: Vec::new(),
863        };
864
865        if let Some(remaining) = gss_circuit_remaining_open(config) {
866            metrics::counter!("qail_pg_gss_circuit_open_total").increment(1);
867            tracing::warn!(
868                host = %config.host,
869                port = config.port,
870                user = %config.user,
871                db = %config.database,
872                remaining_ms = remaining.as_millis() as u64,
873                "gss_connect_circuit_open"
874            );
875            return Err(PgError::Connection(format!(
876                "GSS connection circuit is open; retry after {:?}",
877                remaining
878            )));
879        }
880
881        let mut attempt = 0usize;
882        loop {
883            let connect_result = tokio::time::timeout(
884                config.connect_timeout,
885                PgConnection::connect_with_options(
886                    &config.host,
887                    config.port,
888                    &config.user,
889                    &config.database,
890                    config.password.as_deref(),
891                    options.clone(),
892                ),
893            )
894            .await;
895
896            let connect_result = match connect_result {
897                Ok(result) => result,
898                Err(_) => Err(PgError::Timeout(format!(
899                    "connect timeout after {:?} (pool config connect_timeout)",
900                    config.connect_timeout
901                ))),
902            };
903
904            match connect_result {
905                Ok(conn) => {
906                    metrics::counter!("qail_pg_pool_connect_success_total").increment(1);
907                    gss_circuit_record_success(config);
908                    return Ok(conn);
909                }
910                Err(err) if should_retry_gss_connect_error(config, attempt, &err) => {
911                    metrics::counter!("qail_pg_gss_connect_retries_total").increment(1);
912                    gss_circuit_record_failure(config);
913                    let delay = gss_retry_delay(config.gss_retry_base_delay, attempt);
914                    tracing::warn!(
915                        host = %config.host,
916                        port = config.port,
917                        user = %config.user,
918                        db = %config.database,
919                        attempt = attempt + 1,
920                        delay_ms = delay.as_millis() as u64,
921                        error = %err,
922                        "gss_connect_retry"
923                    );
924                    tokio::time::sleep(delay).await;
925                    attempt += 1;
926                }
927                Err(err) => {
928                    metrics::counter!("qail_pg_pool_connect_failures_total").increment(1);
929                    if should_track_gss_circuit_error(config, &err) {
930                        metrics::counter!("qail_pg_gss_connect_failures_total").increment(1);
931                        gss_circuit_record_failure(config);
932                    }
933                    return Err(err);
934                }
935            }
936        }
937    }
938
939    /// Run one maintenance cycle: evict stale idle connections and backfill
940    /// to `min_connections`. Called periodically by `spawn_pool_maintenance`.
941    pub async fn maintain(&self) {
942        if self.inner.closed.load(Ordering::Relaxed) {
943            return;
944        }
945
946        // Phase 1: Evict idle and expired connections from the pool.
947        let evicted = {
948            let mut connections = self.inner.connections.lock().await;
949            let before = connections.len();
950            connections.retain(|pooled| {
951                if pooled.last_used.elapsed() > self.inner.config.idle_timeout {
952                    record_pool_connection_destroy("idle_sweep_evict");
953                    return false;
954                }
955                if let Some(max_life) = self.inner.config.max_lifetime
956                    && pooled.created_at.elapsed() > max_life
957                {
958                    record_pool_connection_destroy("lifetime_sweep_evict");
959                    return false;
960                }
961                true
962            });
963            before - connections.len()
964        };
965
966        if evicted > 0 {
967            tracing::debug!(evicted, "pool_maintenance: evicted stale idle connections");
968        }
969
970        // Phase 2: Backfill to min_connections if below threshold.
971        let min = self.inner.config.min_connections;
972        if min == 0 {
973            return;
974        }
975
976        let idle_count = self.inner.connections.lock().await.len();
977        let checked_out_slots = self
978            .inner
979            .config
980            .max_connections
981            .saturating_sub(self.inner.semaphore.available_permits());
982        let deficit = maintenance_backfill_deficit(
983            self.inner.config.max_connections,
984            min,
985            idle_count,
986            checked_out_slots,
987        );
988        if deficit == 0 {
989            return;
990        }
991        let mut created = 0usize;
992        for _ in 0..deficit {
993            match Self::create_connection(&self.inner.config).await {
994                Ok(conn) => {
995                    self.inner.total_created.fetch_add(1, Ordering::Relaxed);
996                    let mut connections = self.inner.connections.lock().await;
997                    if connections.len() < self.inner.config.max_connections {
998                        connections.push(PooledConn {
999                            conn,
1000                            created_at: Instant::now(),
1001                            last_used: Instant::now(),
1002                        });
1003                        created += 1;
1004                    } else {
1005                        // Pool filled by concurrent acquires; stop backfill.
1006                        break;
1007                    }
1008                }
1009                Err(e) => {
1010                    tracing::warn!(error = %e, "pool_maintenance: backfill connection failed");
1011                    break; // Transient failure — retry next cycle.
1012                }
1013            }
1014        }
1015
1016        if created > 0 {
1017            tracing::debug!(
1018                created,
1019                min_connections = min,
1020                "pool_maintenance: backfilled idle connections"
1021            );
1022        }
1023    }
1024}
1025
1026/// Spawn a background task that periodically maintains pool health.
1027///
1028/// Runs every `idle_timeout / 2` (min 5s): evicts stale idle connections and
1029/// backfills to `min_connections`. Call once after `PgPool::connect`.
1030pub fn spawn_pool_maintenance(pool: PgPool) {
1031    let interval_secs = std::cmp::max(pool.inner.config.idle_timeout.as_secs() / 2, 5);
1032    tokio::spawn(async move {
1033        let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1034        loop {
1035            interval.tick().await;
1036            if pool.is_closed() {
1037                break;
1038            }
1039            pool.maintain().await;
1040        }
1041    });
1042}
1043
1044pub(super) fn maintenance_backfill_deficit(
1045    max_connections: usize,
1046    min_connections: usize,
1047    idle_count: usize,
1048    checked_out_slots: usize,
1049) -> usize {
1050    let target_idle = min_connections.min(max_connections);
1051    if idle_count >= target_idle {
1052        return 0;
1053    }
1054
1055    let needed_idle = target_idle - idle_count;
1056    let available_slots =
1057        max_connections.saturating_sub(idle_count.saturating_add(checked_out_slots));
1058    needed_idle.min(available_slots)
1059}
1060
1061pub(super) fn validate_pool_config(config: &PoolConfig) -> PgResult<()> {
1062    if config.max_connections == 0 {
1063        return Err(PgError::Connection(
1064            "Invalid PoolConfig: max_connections must be >= 1".to_string(),
1065        ));
1066    }
1067    if config.min_connections > config.max_connections {
1068        return Err(PgError::Connection(format!(
1069            "Invalid PoolConfig: min_connections ({}) must be <= max_connections ({})",
1070            config.min_connections, config.max_connections
1071        )));
1072    }
1073    if config.acquire_timeout.is_zero() {
1074        return Err(PgError::Connection(
1075            "Invalid PoolConfig: acquire_timeout must be > 0".to_string(),
1076        ));
1077    }
1078    if config.connect_timeout.is_zero() {
1079        return Err(PgError::Connection(
1080            "Invalid PoolConfig: connect_timeout must be > 0".to_string(),
1081        ));
1082    }
1083    if config.leaked_cleanup_queue == 0 {
1084        return Err(PgError::Connection(
1085            "Invalid PoolConfig: leaked_cleanup_queue must be >= 1".to_string(),
1086        ));
1087    }
1088    Ok(())
1089}
1090
1091pub(super) async fn execute_simple_with_timeout(
1092    conn: &mut PgConnection,
1093    sql: &str,
1094    timeout: Duration,
1095    operation: &str,
1096) -> PgResult<()> {
1097    match tokio::time::timeout(timeout, conn.execute_simple(sql)).await {
1098        Ok(result) => result,
1099        Err(_) => {
1100            conn.mark_io_desynced();
1101            Err(PgError::Timeout(format!(
1102                "{} timeout after {:?} (pool config connect_timeout)",
1103                operation, timeout
1104            )))
1105        }
1106    }
1107}