1use super::ScopedPoolFuture;
5use super::churn::{
6 PoolStats, decrement_active_count_saturating, pool_churn_record_destroy,
7 pool_churn_remaining_open, record_pool_connection_destroy,
8};
9use super::config::PoolConfig;
10use super::connection::PooledConn;
11use super::connection::PooledConnection;
12use super::gss::*;
13use crate::driver::{
14 AstPipelineMode, AutoCountPath, AutoCountPlan, ConnectOptions, PgConnection, PgError, PgResult,
15 is_ignorable_session_message, unexpected_backend_message,
16};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::time::{Duration, Instant};
20use tokio::sync::{Mutex, Semaphore};
21use tokio::task::JoinSet;
22
23pub(super) const MAX_HOT_STATEMENTS: usize = 32;
25
26pub(super) struct PgPoolInner {
28 pub(super) config: PoolConfig,
29 pub(super) connections: Mutex<Vec<PooledConn>>,
30 pub(super) semaphore: Semaphore,
31 pub(super) closed: AtomicBool,
32 pub(super) active_count: AtomicUsize,
33 pub(super) total_created: AtomicUsize,
34 pub(super) leaked_cleanup_inflight: AtomicUsize,
35 pub(super) hot_statements: std::sync::RwLock<std::collections::HashMap<u64, (String, String)>>,
39}
40
41pub(super) fn handle_hot_preprepare_message(
42 msg: &crate::protocol::BackendMessage,
43 parse_complete_count: &mut usize,
44 error: &mut Option<PgError>,
45) -> PgResult<bool> {
46 match msg {
47 crate::protocol::BackendMessage::ParseComplete => {
48 *parse_complete_count += 1;
49 Ok(false)
50 }
51 crate::protocol::BackendMessage::ErrorResponse(err) => {
52 if error.is_none() {
53 *error = Some(PgError::QueryServer(err.clone().into()));
54 }
55 Ok(false)
56 }
57 crate::protocol::BackendMessage::ReadyForQuery(_) => Ok(true),
58 msg if is_ignorable_session_message(msg) => Ok(false),
59 other => Err(unexpected_backend_message("pool hot pre-prepare", other)),
60 }
61}
62
63fn evict_failed_hot_preprepare_entries(
64 pool: &PgPoolInner,
65 missing: &[(u64, String, String)],
66) -> usize {
67 let Ok(mut hot) = pool.hot_statements.write() else {
68 return 0;
69 };
70
71 let mut evicted = 0usize;
72 for (hash, _, _) in missing {
73 if hot.remove(hash).is_some() {
74 evicted += 1;
75 }
76 }
77 evicted
78}
79
80impl PgPoolInner {
81 pub(super) async fn return_connection(&self, conn: PgConnection, created_at: Instant) {
82 decrement_active_count_saturating(&self.active_count);
83
84 if conn.is_io_desynced() {
85 tracing::warn!(
86 host = %self.config.host,
87 port = self.config.port,
88 user = %self.config.user,
89 db = %self.config.database,
90 "pool_return_desynced: dropping connection due to prior I/O/protocol desync"
91 );
92 record_pool_connection_destroy("pool_desynced_drop");
93 self.semaphore.add_permits(1);
94 pool_churn_record_destroy(&self.config, "return_desynced");
95 return;
96 }
97
98 if self.closed.load(Ordering::Relaxed) {
99 record_pool_connection_destroy("pool_closed_drop");
100 self.semaphore.add_permits(1);
101 return;
102 }
103
104 let mut connections = self.connections.lock().await;
105 if connections.len() < self.config.max_connections {
106 connections.push(PooledConn {
107 conn,
108 created_at,
109 last_used: Instant::now(),
110 });
111 } else {
112 record_pool_connection_destroy("pool_overflow_drop");
113 }
114
115 self.semaphore.add_permits(1);
116 }
117
118 async fn get_healthy_connection(&self) -> Option<PooledConn> {
120 let mut connections = self.connections.lock().await;
121
122 while let Some(pooled) = connections.pop() {
123 if pooled.last_used.elapsed() > self.config.idle_timeout {
124 tracing::debug!(
125 idle_secs = pooled.last_used.elapsed().as_secs(),
126 timeout_secs = self.config.idle_timeout.as_secs(),
127 "pool_checkout_evict: connection exceeded idle timeout"
128 );
129 record_pool_connection_destroy("idle_timeout_evict");
130 continue;
131 }
132
133 if let Some(max_life) = self.config.max_lifetime
134 && pooled.created_at.elapsed() > max_life
135 {
136 tracing::debug!(
137 age_secs = pooled.created_at.elapsed().as_secs(),
138 max_lifetime_secs = max_life.as_secs(),
139 "pool_checkout_evict: connection exceeded max lifetime"
140 );
141 record_pool_connection_destroy("max_lifetime_evict");
142 continue;
143 }
144
145 return Some(pooled);
146 }
147
148 None
149 }
150}
151
152#[derive(Clone)]
163pub struct PgPool {
164 pub(super) inner: Arc<PgPoolInner>,
165}
166
167impl PgPool {
168 pub async fn from_config() -> PgResult<Self> {
175 let qail = qail_core::config::QailConfig::load()
176 .map_err(|e| PgError::Connection(format!("Config error: {}", e)))?;
177 let config = PoolConfig::from_qail_config(&qail)?;
178 Self::connect(config).await
179 }
180
181 pub async fn connect(config: PoolConfig) -> PgResult<Self> {
183 validate_pool_config(&config)?;
184
185 let semaphore = Semaphore::new(config.max_connections);
187
188 let mut initial_connections = Vec::new();
189 for _ in 0..config.min_connections {
190 let conn = Self::create_connection(&config).await?;
191 initial_connections.push(PooledConn {
192 conn,
193 created_at: Instant::now(),
194 last_used: Instant::now(),
195 });
196 }
197
198 let initial_count = initial_connections.len();
199
200 let inner = Arc::new(PgPoolInner {
201 config,
202 connections: Mutex::new(initial_connections),
203 semaphore,
204 closed: AtomicBool::new(false),
205 active_count: AtomicUsize::new(0),
206 total_created: AtomicUsize::new(initial_count),
207 leaked_cleanup_inflight: AtomicUsize::new(0),
208 hot_statements: std::sync::RwLock::new(std::collections::HashMap::new()),
209 });
210
211 Ok(Self { inner })
212 }
213
214 pub async fn acquire_raw(&self) -> PgResult<PooledConnection> {
229 if self.inner.closed.load(Ordering::Relaxed) {
230 return Err(PgError::PoolClosed);
231 }
232
233 if let Some(remaining) = pool_churn_remaining_open(&self.inner.config) {
234 metrics::counter!("qail_pg_pool_churn_circuit_reject_total").increment(1);
235 tracing::warn!(
236 host = %self.inner.config.host,
237 port = self.inner.config.port,
238 user = %self.inner.config.user,
239 db = %self.inner.config.database,
240 remaining_ms = remaining.as_millis() as u64,
241 "pool_connection_churn_circuit_open"
242 );
243 return Err(PgError::PoolExhausted {
244 max: self.inner.config.max_connections,
245 });
246 }
247
248 let acquire_timeout = self.inner.config.acquire_timeout;
250 let permit =
251 match tokio::time::timeout(acquire_timeout, self.inner.semaphore.acquire()).await {
252 Ok(permit) => permit.map_err(|_| PgError::PoolClosed)?,
253 Err(_) => {
254 metrics::counter!("qail_pg_pool_acquire_timeouts_total").increment(1);
255 return Err(PgError::Timeout(format!(
256 "pool acquire after {}s ({} max connections)",
257 acquire_timeout.as_secs(),
258 self.inner.config.max_connections
259 )));
260 }
261 };
262
263 if self.inner.closed.load(Ordering::Relaxed) {
264 return Err(PgError::PoolClosed);
265 }
266
267 let (mut conn, mut created_at) =
269 if let Some(pooled) = self.inner.get_healthy_connection().await {
270 (pooled.conn, pooled.created_at)
271 } else {
272 let conn = Self::create_connection(&self.inner.config).await?;
273 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
274 (conn, Instant::now())
275 };
276
277 if self.inner.config.test_on_acquire
278 && let Err(e) = execute_simple_with_timeout(
279 &mut conn,
280 "SELECT 1",
281 self.inner.config.connect_timeout,
282 "pool checkout health check",
283 )
284 .await
285 {
286 tracing::warn!(
287 host = %self.inner.config.host,
288 port = self.inner.config.port,
289 user = %self.inner.config.user,
290 db = %self.inner.config.database,
291 error = %e,
292 "pool_health_check_failed: checkout probe failed, creating replacement connection"
293 );
294 pool_churn_record_destroy(&self.inner.config, "health_check_failed");
295 conn = Self::create_connection(&self.inner.config).await?;
296 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
297 created_at = Instant::now();
298 }
299
300 let missing: Vec<(u64, String, String)> = {
303 if let Ok(hot) = self.inner.hot_statements.read() {
304 hot.iter()
305 .filter(|(hash, _)| !conn.stmt_cache.contains(hash))
306 .map(|(hash, (name, sql))| (*hash, name.clone(), sql.clone()))
307 .collect()
308 } else {
309 Vec::new()
310 }
311 }; if !missing.is_empty() {
314 use crate::protocol::PgEncoder;
315 let mut buf = bytes::BytesMut::new();
316 for (_, name, sql) in &missing {
317 let parse_msg = PgEncoder::try_encode_parse(name, sql, &[])?;
318 buf.extend_from_slice(&parse_msg);
319 }
320 PgEncoder::encode_sync_to(&mut buf);
321 let preprepare_timeout = self.inner.config.connect_timeout;
322 let preprepare_result: PgResult<()> = match tokio::time::timeout(
323 preprepare_timeout,
324 async {
325 conn.send_bytes(&buf).await?;
326 let mut parse_complete_count = 0usize;
328 let mut parse_error: Option<PgError> = None;
329 loop {
330 let msg = conn.recv().await?;
331 if handle_hot_preprepare_message(
332 &msg,
333 &mut parse_complete_count,
334 &mut parse_error,
335 )? {
336 if let Some(err) = parse_error {
337 return Err(err);
338 }
339 if parse_complete_count != missing.len() {
340 return Err(PgError::Protocol(format!(
341 "hot pre-prepare completed with {} ParseComplete messages (expected {})",
342 parse_complete_count,
343 missing.len()
344 )));
345 }
346 break;
347 }
348 }
349 Ok::<(), PgError>(())
350 },
351 )
352 .await
353 {
354 Ok(res) => res,
355 Err(_) => Err(PgError::Timeout(format!(
356 "hot statement pre-prepare timeout after {:?} (pool config connect_timeout)",
357 preprepare_timeout
358 ))),
359 };
360
361 if let Err(e) = preprepare_result {
362 let evicted_hot_statements =
363 evict_failed_hot_preprepare_entries(&self.inner, &missing);
364 tracing::warn!(
365 host = %self.inner.config.host,
366 port = self.inner.config.port,
367 user = %self.inner.config.user,
368 db = %self.inner.config.database,
369 timeout_ms = preprepare_timeout.as_millis() as u64,
370 evicted_hot_statements,
371 error = %e,
372 "pool_hot_prepare_failed: replacing connection to avoid handing out uncertain protocol state"
373 );
374 pool_churn_record_destroy(&self.inner.config, "hot_prepare_failed");
375 conn = Self::create_connection(&self.inner.config).await?;
376 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
377 created_at = Instant::now();
378 } else {
379 for (hash, name, sql) in &missing {
381 conn.stmt_cache.put(*hash, name.clone());
382 conn.prepared_statements.insert(name.clone(), sql.clone());
383 }
384 }
385 }
386
387 self.inner.active_count.fetch_add(1, Ordering::Relaxed);
388 permit.forget();
390
391 Ok(PooledConnection {
392 conn: Some(conn),
393 pool: std::sync::Arc::clone(&self.inner),
394 rls_dirty: false,
395 created_at,
396 })
397 }
398
399 pub async fn acquire_with_rls(
415 &self,
416 ctx: qail_core::rls::RlsContext,
417 ) -> PgResult<PooledConnection> {
418 let mut conn = self.acquire_raw().await?;
420
421 let sql = crate::driver::rls::context_to_sql(&ctx);
423 let pg_conn = conn.get_mut()?;
424 if let Err(e) = execute_simple_with_timeout(
425 pg_conn,
426 &sql,
427 self.inner.config.connect_timeout,
428 "pool acquire_with_rls setup",
429 )
430 .await
431 {
432 if let Ok(pg_conn) = conn.get_mut() {
435 let _ = pg_conn.execute_simple("ROLLBACK").await;
436 }
437 conn.release().await;
438 return Err(e);
439 }
440
441 conn.rls_dirty = true;
443
444 Ok(conn)
445 }
446
447 pub async fn with_rls<T, F>(&self, ctx: qail_core::rls::RlsContext, f: F) -> PgResult<T>
451 where
452 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
453 {
454 let mut conn = self.acquire_with_rls(ctx).await?;
455 let out = f(&mut conn).await;
456 match out {
457 Ok(value) => {
458 conn.release_checked().await?;
459 Ok(value)
460 }
461 Err(err) => {
462 let _ = conn.rollback_and_release().await;
463 Err(err)
464 }
465 }
466 }
467
468 pub async fn with_system<T, F>(&self, f: F) -> PgResult<T>
470 where
471 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
472 {
473 self.with_rls(qail_core::rls::RlsContext::empty(), f).await
474 }
475
476 pub async fn with_global<T, F>(&self, f: F) -> PgResult<T>
478 where
479 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
480 {
481 self.with_rls(qail_core::rls::RlsContext::global(), f).await
482 }
483
484 pub async fn with_tenant<T, F>(&self, tenant_id: &str, f: F) -> PgResult<T>
486 where
487 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
488 {
489 self.with_rls(qail_core::rls::RlsContext::tenant(tenant_id), f)
490 .await
491 }
492
493 pub async fn acquire_with_rls_timeout(
498 &self,
499 ctx: qail_core::rls::RlsContext,
500 timeout_ms: u32,
501 ) -> PgResult<PooledConnection> {
502 let mut conn = self.acquire_raw().await?;
504
505 let sql = crate::driver::rls::context_to_sql_with_timeout(&ctx, timeout_ms);
507 let pg_conn = conn.get_mut()?;
508 if let Err(e) = execute_simple_with_timeout(
509 pg_conn,
510 &sql,
511 self.inner.config.connect_timeout,
512 "pool acquire_with_rls_timeout setup",
513 )
514 .await
515 {
516 if let Ok(pg_conn) = conn.get_mut() {
517 let _ = pg_conn.execute_simple("ROLLBACK").await;
518 }
519 conn.release().await;
520 return Err(e);
521 }
522
523 conn.rls_dirty = true;
525
526 Ok(conn)
527 }
528
529 pub async fn with_rls_timeout<T, F>(
531 &self,
532 ctx: qail_core::rls::RlsContext,
533 timeout_ms: u32,
534 f: F,
535 ) -> PgResult<T>
536 where
537 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
538 {
539 let mut conn = self.acquire_with_rls_timeout(ctx, timeout_ms).await?;
540 let out = f(&mut conn).await;
541 match out {
542 Ok(value) => {
543 conn.release_checked().await?;
544 Ok(value)
545 }
546 Err(err) => {
547 let _ = conn.rollback_and_release().await;
548 Err(err)
549 }
550 }
551 }
552
553 pub async fn acquire_with_rls_timeouts(
559 &self,
560 ctx: qail_core::rls::RlsContext,
561 statement_timeout_ms: u32,
562 lock_timeout_ms: u32,
563 ) -> PgResult<PooledConnection> {
564 let mut conn = self.acquire_raw().await?;
566
567 let sql = crate::driver::rls::context_to_sql_with_timeouts(
568 &ctx,
569 statement_timeout_ms,
570 lock_timeout_ms,
571 );
572 let pg_conn = conn.get_mut()?;
573 if let Err(e) = execute_simple_with_timeout(
574 pg_conn,
575 &sql,
576 self.inner.config.connect_timeout,
577 "pool acquire_with_rls_timeouts setup",
578 )
579 .await
580 {
581 if let Ok(pg_conn) = conn.get_mut() {
582 let _ = pg_conn.execute_simple("ROLLBACK").await;
583 }
584 conn.release().await;
585 return Err(e);
586 }
587
588 conn.rls_dirty = true;
589
590 Ok(conn)
591 }
592
593 pub async fn with_rls_timeouts<T, F>(
595 &self,
596 ctx: qail_core::rls::RlsContext,
597 statement_timeout_ms: u32,
598 lock_timeout_ms: u32,
599 f: F,
600 ) -> PgResult<T>
601 where
602 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
603 {
604 let mut conn = self
605 .acquire_with_rls_timeouts(ctx, statement_timeout_ms, lock_timeout_ms)
606 .await?;
607 let out = f(&mut conn).await;
608 match out {
609 Ok(value) => {
610 conn.release_checked().await?;
611 Ok(value)
612 }
613 Err(err) => {
614 let _ = conn.rollback_and_release().await;
615 Err(err)
616 }
617 }
618 }
619
620 pub async fn acquire_system(&self) -> PgResult<PooledConnection> {
630 let ctx = qail_core::rls::RlsContext::empty();
631 self.acquire_with_rls(ctx).await
632 }
633
634 pub async fn acquire_global(&self) -> PgResult<PooledConnection> {
640 self.acquire_with_rls(qail_core::rls::RlsContext::global())
641 .await
642 }
643
644 pub async fn acquire_for_tenant(&self, tenant_id: &str) -> PgResult<PooledConnection> {
656 self.acquire_with_rls(qail_core::rls::RlsContext::tenant(tenant_id))
657 .await
658 }
659
660 pub async fn acquire_with_branch(
674 &self,
675 ctx: &qail_core::branch::BranchContext,
676 ) -> PgResult<PooledConnection> {
677 let mut conn = self.acquire_raw().await?;
679
680 if let Some(branch_name) = ctx.branch_name() {
681 let sql = crate::driver::branch_sql::branch_context_sql(branch_name);
682 let pg_conn = conn.get_mut()?;
683 if let Err(e) = execute_simple_with_timeout(
684 pg_conn,
685 &sql,
686 self.inner.config.connect_timeout,
687 "pool acquire_with_branch setup",
688 )
689 .await
690 {
691 if let Ok(pg_conn) = conn.get_mut() {
692 let _ = pg_conn.execute_simple("ROLLBACK").await;
693 }
694 conn.release().await;
695 return Err(e);
696 }
697 conn.rls_dirty = true; }
699
700 Ok(conn)
701 }
702
703 pub async fn idle_count(&self) -> usize {
705 self.inner.connections.lock().await.len()
706 }
707
708 pub fn active_count(&self) -> usize {
710 self.inner.active_count.load(Ordering::Relaxed)
711 }
712
713 pub fn max_connections(&self) -> usize {
715 self.inner.config.max_connections
716 }
717
718 pub fn plan_auto_count(&self, batch_len: usize) -> AutoCountPlan {
720 AutoCountPlan::for_pool(
721 batch_len,
722 self.inner.config.max_connections,
723 self.inner.semaphore.available_permits(),
724 )
725 }
726
727 pub async fn execute_count_auto_with_plan(
729 &self,
730 cmds: &[qail_core::ast::Qail],
731 ) -> PgResult<(usize, AutoCountPlan)> {
732 let plan = self.plan_auto_count(cmds.len());
733
734 let completed = match plan.path {
735 AutoCountPath::SingleCached => {
736 if cmds.is_empty() {
737 0
738 } else {
739 let mut conn = self.acquire_system().await?;
740 let run_result = conn.fetch_all_cached(&cmds[0]).await;
741 conn.release().await;
742 let _ = run_result?;
743 1
744 }
745 }
746 AutoCountPath::PipelineOneShot | AutoCountPath::PipelineCached => {
747 let mode = if matches!(plan.path, AutoCountPath::PipelineOneShot) {
748 AstPipelineMode::OneShot
749 } else {
750 AstPipelineMode::Cached
751 };
752
753 let mut pooled = self.acquire_system().await?;
754 let run_result = {
755 let conn = pooled.get_mut()?;
756 conn.pipeline_execute_count_ast_with_mode(cmds, mode).await
757 };
758 pooled.release().await;
759 run_result?
760 }
761 AutoCountPath::PoolParallel => {
762 if cmds.is_empty() {
763 0
764 } else {
765 let all_cmds = Arc::new(cmds.to_vec());
766 let mut tasks: JoinSet<PgResult<usize>> = JoinSet::new();
767
768 for worker in 0..plan.workers {
769 let start = worker * plan.chunk_size;
770 if start >= all_cmds.len() {
771 break;
772 }
773 let end = (start + plan.chunk_size).min(all_cmds.len());
774 let pool = self.clone();
775 let all_cmds = Arc::clone(&all_cmds);
776
777 tasks.spawn(async move {
778 let mut pooled = pool.acquire_system().await?;
779 let run_result = {
780 let conn = pooled.get_mut()?;
781 conn.pipeline_execute_count_ast_with_mode(
782 &all_cmds[start..end],
783 AstPipelineMode::Auto,
784 )
785 .await
786 };
787 pooled.release().await;
788 run_result
789 });
790 }
791
792 let mut total = 0usize;
793 while let Some(joined) = tasks.join_next().await {
794 match joined {
795 Ok(Ok(count)) => {
796 total += count;
797 }
798 Ok(Err(err)) => return Err(err),
799 Err(err) => {
800 return Err(PgError::Connection(format!(
801 "auto pool worker join failed: {err}"
802 )));
803 }
804 }
805 }
806 total
807 }
808 }
809 };
810
811 Ok((completed, plan))
812 }
813
814 #[inline]
816 pub async fn execute_count_auto(&self, cmds: &[qail_core::ast::Qail]) -> PgResult<usize> {
817 let (completed, _plan) = self.execute_count_auto_with_plan(cmds).await?;
818 Ok(completed)
819 }
820
821 pub async fn stats(&self) -> PoolStats {
823 let idle = self.inner.connections.lock().await.len();
824 let active = self.inner.active_count.load(Ordering::Relaxed);
825 let used_slots = self
826 .inner
827 .config
828 .max_connections
829 .saturating_sub(self.inner.semaphore.available_permits());
830 PoolStats {
831 active,
832 idle,
833 pending: used_slots.saturating_sub(active),
834 max_size: self.inner.config.max_connections,
835 total_created: self.inner.total_created.load(Ordering::Relaxed),
836 }
837 }
838
839 pub fn is_closed(&self) -> bool {
841 self.inner.closed.load(Ordering::Relaxed)
842 }
843
844 pub async fn close(&self) {
851 self.close_graceful(self.inner.config.acquire_timeout).await;
852 }
853
854 pub async fn close_graceful(&self, drain_timeout: Duration) {
856 self.inner.closed.store(true, Ordering::Relaxed);
857 self.inner.semaphore.close();
859
860 let deadline = Instant::now() + drain_timeout;
861 loop {
862 let active = self.inner.active_count.load(Ordering::Relaxed);
863 if active == 0 {
864 break;
865 }
866 if Instant::now() >= deadline {
867 tracing::warn!(
868 active_connections = active,
869 timeout_ms = drain_timeout.as_millis() as u64,
870 "pool_close_drain_timeout: forcing idle cleanup while active connections remain"
871 );
872 break;
873 }
874 tokio::time::sleep(Duration::from_millis(25)).await;
875 }
876
877 let mut connections = self.inner.connections.lock().await;
878 let dropped_idle = connections.len();
879 connections.clear();
880 tracing::info!(
881 dropped_idle_connections = dropped_idle,
882 active_connections = self.inner.active_count.load(Ordering::Relaxed),
883 "pool_closed"
884 );
885 }
886
887 async fn create_connection(config: &PoolConfig) -> PgResult<PgConnection> {
889 if !config.auth_settings.has_any_password_method()
890 && config.mtls.is_none()
891 && config.password.is_some()
892 {
893 return Err(PgError::Auth(
894 "Invalid PoolConfig: all password auth methods are disabled".to_string(),
895 ));
896 }
897
898 let options = ConnectOptions {
899 tls_mode: config.tls_mode,
900 gss_enc_mode: config.gss_enc_mode,
901 tls_ca_cert_pem: config.tls_ca_cert_pem.clone(),
902 mtls: config.mtls.clone(),
903 gss_token_provider: config.gss_token_provider,
904 gss_token_provider_ex: config.gss_token_provider_ex.clone(),
905 auth: config.auth_settings,
906 startup_params: Vec::new(),
907 };
908
909 if let Some(remaining) = gss_circuit_remaining_open(config) {
910 metrics::counter!("qail_pg_gss_circuit_open_total").increment(1);
911 tracing::warn!(
912 host = %config.host,
913 port = config.port,
914 user = %config.user,
915 db = %config.database,
916 remaining_ms = remaining.as_millis() as u64,
917 "gss_connect_circuit_open"
918 );
919 return Err(PgError::Connection(format!(
920 "GSS connection circuit is open; retry after {:?}",
921 remaining
922 )));
923 }
924
925 let mut attempt = 0usize;
926 loop {
927 let connect_result = tokio::time::timeout(
928 config.connect_timeout,
929 PgConnection::connect_with_options(
930 &config.host,
931 config.port,
932 &config.user,
933 &config.database,
934 config.password.as_deref(),
935 options.clone(),
936 ),
937 )
938 .await;
939
940 let connect_result = match connect_result {
941 Ok(result) => result,
942 Err(_) => Err(PgError::Timeout(format!(
943 "connect timeout after {:?} (pool config connect_timeout)",
944 config.connect_timeout
945 ))),
946 };
947
948 match connect_result {
949 Ok(conn) => {
950 metrics::counter!("qail_pg_pool_connect_success_total").increment(1);
951 gss_circuit_record_success(config);
952 return Ok(conn);
953 }
954 Err(err) if should_retry_gss_connect_error(config, attempt, &err) => {
955 metrics::counter!("qail_pg_gss_connect_retries_total").increment(1);
956 gss_circuit_record_failure(config);
957 let delay = gss_retry_delay(config.gss_retry_base_delay, attempt);
958 tracing::warn!(
959 host = %config.host,
960 port = config.port,
961 user = %config.user,
962 db = %config.database,
963 attempt = attempt + 1,
964 delay_ms = delay.as_millis() as u64,
965 error = %err,
966 "gss_connect_retry"
967 );
968 tokio::time::sleep(delay).await;
969 attempt += 1;
970 }
971 Err(err) => {
972 metrics::counter!("qail_pg_pool_connect_failures_total").increment(1);
973 if should_track_gss_circuit_error(config, &err) {
974 metrics::counter!("qail_pg_gss_connect_failures_total").increment(1);
975 gss_circuit_record_failure(config);
976 }
977 return Err(err);
978 }
979 }
980 }
981 }
982
983 pub async fn maintain(&self) {
986 if self.inner.closed.load(Ordering::Relaxed) {
987 return;
988 }
989
990 let evicted = {
992 let mut connections = self.inner.connections.lock().await;
993 let before = connections.len();
994 connections.retain(|pooled| {
995 if pooled.last_used.elapsed() > self.inner.config.idle_timeout {
996 record_pool_connection_destroy("idle_sweep_evict");
997 return false;
998 }
999 if let Some(max_life) = self.inner.config.max_lifetime
1000 && pooled.created_at.elapsed() > max_life
1001 {
1002 record_pool_connection_destroy("lifetime_sweep_evict");
1003 return false;
1004 }
1005 true
1006 });
1007 before - connections.len()
1008 };
1009
1010 if evicted > 0 {
1011 tracing::debug!(evicted, "pool_maintenance: evicted stale idle connections");
1012 }
1013
1014 let min = self.inner.config.min_connections;
1016 if min == 0 {
1017 return;
1018 }
1019
1020 let idle_count = self.inner.connections.lock().await.len();
1021 let checked_out_slots = self
1022 .inner
1023 .config
1024 .max_connections
1025 .saturating_sub(self.inner.semaphore.available_permits());
1026 let deficit = maintenance_backfill_deficit(
1027 self.inner.config.max_connections,
1028 min,
1029 idle_count,
1030 checked_out_slots,
1031 );
1032 if deficit == 0 {
1033 return;
1034 }
1035 let mut created = 0usize;
1036 for _ in 0..deficit {
1037 match Self::create_connection(&self.inner.config).await {
1038 Ok(conn) => {
1039 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
1040 let mut connections = self.inner.connections.lock().await;
1041 if connections.len() < self.inner.config.max_connections {
1042 connections.push(PooledConn {
1043 conn,
1044 created_at: Instant::now(),
1045 last_used: Instant::now(),
1046 });
1047 created += 1;
1048 } else {
1049 break;
1051 }
1052 }
1053 Err(e) => {
1054 tracing::warn!(error = %e, "pool_maintenance: backfill connection failed");
1055 break; }
1057 }
1058 }
1059
1060 if created > 0 {
1061 tracing::debug!(
1062 created,
1063 min_connections = min,
1064 "pool_maintenance: backfilled idle connections"
1065 );
1066 }
1067 }
1068}
1069
1070pub fn spawn_pool_maintenance(pool: PgPool) {
1075 let interval_secs = std::cmp::max(pool.inner.config.idle_timeout.as_secs() / 2, 5);
1076 tokio::spawn(async move {
1077 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1078 loop {
1079 interval.tick().await;
1080 if pool.is_closed() {
1081 break;
1082 }
1083 pool.maintain().await;
1084 }
1085 });
1086}
1087
1088pub(super) fn maintenance_backfill_deficit(
1089 max_connections: usize,
1090 min_connections: usize,
1091 idle_count: usize,
1092 checked_out_slots: usize,
1093) -> usize {
1094 let target_idle = min_connections.min(max_connections);
1095 if idle_count >= target_idle {
1096 return 0;
1097 }
1098
1099 let needed_idle = target_idle - idle_count;
1100 let available_slots =
1101 max_connections.saturating_sub(idle_count.saturating_add(checked_out_slots));
1102 needed_idle.min(available_slots)
1103}
1104
1105pub(super) fn validate_pool_config(config: &PoolConfig) -> PgResult<()> {
1106 if config.max_connections == 0 {
1107 return Err(PgError::Connection(
1108 "Invalid PoolConfig: max_connections must be >= 1".to_string(),
1109 ));
1110 }
1111 if config.min_connections > config.max_connections {
1112 return Err(PgError::Connection(format!(
1113 "Invalid PoolConfig: min_connections ({}) must be <= max_connections ({})",
1114 config.min_connections, config.max_connections
1115 )));
1116 }
1117 if config.acquire_timeout.is_zero() {
1118 return Err(PgError::Connection(
1119 "Invalid PoolConfig: acquire_timeout must be > 0".to_string(),
1120 ));
1121 }
1122 if config.connect_timeout.is_zero() {
1123 return Err(PgError::Connection(
1124 "Invalid PoolConfig: connect_timeout must be > 0".to_string(),
1125 ));
1126 }
1127 if config.leaked_cleanup_queue == 0 {
1128 return Err(PgError::Connection(
1129 "Invalid PoolConfig: leaked_cleanup_queue must be >= 1".to_string(),
1130 ));
1131 }
1132 Ok(())
1133}
1134
1135pub(super) async fn execute_simple_with_timeout(
1136 conn: &mut PgConnection,
1137 sql: &str,
1138 timeout: Duration,
1139 operation: &str,
1140) -> PgResult<()> {
1141 match tokio::time::timeout(timeout, conn.execute_simple(sql)).await {
1142 Ok(result) => result,
1143 Err(_) => {
1144 conn.mark_io_desynced();
1145 Err(PgError::Timeout(format!(
1146 "{} timeout after {:?} (pool config connect_timeout)",
1147 operation, timeout
1148 )))
1149 }
1150 }
1151}