1use super::ScopedPoolFuture;
5use super::churn::{
6 PoolStats, decrement_active_count_saturating, pool_churn_record_destroy,
7 pool_churn_remaining_open, record_pool_connection_destroy,
8};
9use super::config::PoolConfig;
10use super::connection::PooledConn;
11use super::connection::PooledConnection;
12use super::gss::*;
13use crate::driver::{
14 ConnectOptions, PgConnection, PgError, PgResult, is_ignorable_session_message,
15 unexpected_backend_message,
16};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::time::{Duration, Instant};
20use tokio::sync::{Mutex, Semaphore};
21
22pub(super) const MAX_HOT_STATEMENTS: usize = 32;
24
25pub(super) struct PgPoolInner {
27 pub(super) config: PoolConfig,
28 pub(super) connections: Mutex<Vec<PooledConn>>,
29 pub(super) semaphore: Semaphore,
30 pub(super) closed: AtomicBool,
31 pub(super) active_count: AtomicUsize,
32 pub(super) total_created: AtomicUsize,
33 pub(super) leaked_cleanup_inflight: AtomicUsize,
34 pub(super) hot_statements: std::sync::RwLock<std::collections::HashMap<u64, (String, String)>>,
38}
39
40pub(super) fn handle_hot_preprepare_message(
41 msg: &crate::protocol::BackendMessage,
42 parse_complete_count: &mut usize,
43 error: &mut Option<PgError>,
44) -> PgResult<bool> {
45 match msg {
46 crate::protocol::BackendMessage::ParseComplete => {
47 *parse_complete_count += 1;
48 Ok(false)
49 }
50 crate::protocol::BackendMessage::ErrorResponse(err) => {
51 if error.is_none() {
52 *error = Some(PgError::QueryServer(err.clone().into()));
53 }
54 Ok(false)
55 }
56 crate::protocol::BackendMessage::ReadyForQuery(_) => Ok(true),
57 msg if is_ignorable_session_message(msg) => Ok(false),
58 other => Err(unexpected_backend_message("pool hot pre-prepare", other)),
59 }
60}
61
62impl PgPoolInner {
63 pub(super) async fn return_connection(&self, conn: PgConnection, created_at: Instant) {
64 decrement_active_count_saturating(&self.active_count);
65
66 if conn.is_io_desynced() {
67 tracing::warn!(
68 host = %self.config.host,
69 port = self.config.port,
70 user = %self.config.user,
71 db = %self.config.database,
72 "pool_return_desynced: dropping connection due to prior I/O/protocol desync"
73 );
74 record_pool_connection_destroy("pool_desynced_drop");
75 self.semaphore.add_permits(1);
76 pool_churn_record_destroy(&self.config, "return_desynced");
77 return;
78 }
79
80 if self.closed.load(Ordering::Relaxed) {
81 record_pool_connection_destroy("pool_closed_drop");
82 self.semaphore.add_permits(1);
83 return;
84 }
85
86 let mut connections = self.connections.lock().await;
87 if connections.len() < self.config.max_connections {
88 connections.push(PooledConn {
89 conn,
90 created_at,
91 last_used: Instant::now(),
92 });
93 } else {
94 record_pool_connection_destroy("pool_overflow_drop");
95 }
96
97 self.semaphore.add_permits(1);
98 }
99
100 async fn get_healthy_connection(&self) -> Option<PooledConn> {
102 let mut connections = self.connections.lock().await;
103
104 while let Some(pooled) = connections.pop() {
105 if pooled.last_used.elapsed() > self.config.idle_timeout {
106 tracing::debug!(
107 idle_secs = pooled.last_used.elapsed().as_secs(),
108 timeout_secs = self.config.idle_timeout.as_secs(),
109 "pool_checkout_evict: connection exceeded idle timeout"
110 );
111 record_pool_connection_destroy("idle_timeout_evict");
112 continue;
113 }
114
115 if let Some(max_life) = self.config.max_lifetime
116 && pooled.created_at.elapsed() > max_life
117 {
118 tracing::debug!(
119 age_secs = pooled.created_at.elapsed().as_secs(),
120 max_lifetime_secs = max_life.as_secs(),
121 "pool_checkout_evict: connection exceeded max lifetime"
122 );
123 record_pool_connection_destroy("max_lifetime_evict");
124 continue;
125 }
126
127 return Some(pooled);
128 }
129
130 None
131 }
132}
133
134#[derive(Clone)]
145pub struct PgPool {
146 pub(super) inner: Arc<PgPoolInner>,
147}
148
149impl PgPool {
150 pub async fn from_config() -> PgResult<Self> {
157 let qail = qail_core::config::QailConfig::load()
158 .map_err(|e| PgError::Connection(format!("Config error: {}", e)))?;
159 let config = PoolConfig::from_qail_config(&qail)?;
160 Self::connect(config).await
161 }
162
163 pub async fn connect(config: PoolConfig) -> PgResult<Self> {
165 validate_pool_config(&config)?;
166
167 let semaphore = Semaphore::new(config.max_connections);
169
170 let mut initial_connections = Vec::new();
171 for _ in 0..config.min_connections {
172 let conn = Self::create_connection(&config).await?;
173 initial_connections.push(PooledConn {
174 conn,
175 created_at: Instant::now(),
176 last_used: Instant::now(),
177 });
178 }
179
180 let initial_count = initial_connections.len();
181
182 let inner = Arc::new(PgPoolInner {
183 config,
184 connections: Mutex::new(initial_connections),
185 semaphore,
186 closed: AtomicBool::new(false),
187 active_count: AtomicUsize::new(0),
188 total_created: AtomicUsize::new(initial_count),
189 leaked_cleanup_inflight: AtomicUsize::new(0),
190 hot_statements: std::sync::RwLock::new(std::collections::HashMap::new()),
191 });
192
193 Ok(Self { inner })
194 }
195
196 pub async fn acquire_raw(&self) -> PgResult<PooledConnection> {
211 if self.inner.closed.load(Ordering::Relaxed) {
212 return Err(PgError::PoolClosed);
213 }
214
215 if let Some(remaining) = pool_churn_remaining_open(&self.inner.config) {
216 metrics::counter!("qail_pg_pool_churn_circuit_reject_total").increment(1);
217 tracing::warn!(
218 host = %self.inner.config.host,
219 port = self.inner.config.port,
220 user = %self.inner.config.user,
221 db = %self.inner.config.database,
222 remaining_ms = remaining.as_millis() as u64,
223 "pool_connection_churn_circuit_open"
224 );
225 return Err(PgError::PoolExhausted {
226 max: self.inner.config.max_connections,
227 });
228 }
229
230 let acquire_timeout = self.inner.config.acquire_timeout;
232 let permit =
233 match tokio::time::timeout(acquire_timeout, self.inner.semaphore.acquire()).await {
234 Ok(permit) => permit.map_err(|_| PgError::PoolClosed)?,
235 Err(_) => {
236 metrics::counter!("qail_pg_pool_acquire_timeouts_total").increment(1);
237 return Err(PgError::Timeout(format!(
238 "pool acquire after {}s ({} max connections)",
239 acquire_timeout.as_secs(),
240 self.inner.config.max_connections
241 )));
242 }
243 };
244
245 if self.inner.closed.load(Ordering::Relaxed) {
246 return Err(PgError::PoolClosed);
247 }
248
249 let (mut conn, mut created_at) =
251 if let Some(pooled) = self.inner.get_healthy_connection().await {
252 (pooled.conn, pooled.created_at)
253 } else {
254 let conn = Self::create_connection(&self.inner.config).await?;
255 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
256 (conn, Instant::now())
257 };
258
259 if self.inner.config.test_on_acquire
260 && let Err(e) = execute_simple_with_timeout(
261 &mut conn,
262 "SELECT 1",
263 self.inner.config.connect_timeout,
264 "pool checkout health check",
265 )
266 .await
267 {
268 tracing::warn!(
269 host = %self.inner.config.host,
270 port = self.inner.config.port,
271 user = %self.inner.config.user,
272 db = %self.inner.config.database,
273 error = %e,
274 "pool_health_check_failed: checkout probe failed, creating replacement connection"
275 );
276 pool_churn_record_destroy(&self.inner.config, "health_check_failed");
277 conn = Self::create_connection(&self.inner.config).await?;
278 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
279 created_at = Instant::now();
280 }
281
282 let missing: Vec<(u64, String, String)> = {
285 if let Ok(hot) = self.inner.hot_statements.read() {
286 hot.iter()
287 .filter(|(hash, _)| !conn.stmt_cache.contains(hash))
288 .map(|(hash, (name, sql))| (*hash, name.clone(), sql.clone()))
289 .collect()
290 } else {
291 Vec::new()
292 }
293 }; if !missing.is_empty() {
296 use crate::protocol::PgEncoder;
297 let mut buf = bytes::BytesMut::new();
298 for (_, name, sql) in &missing {
299 let parse_msg = PgEncoder::try_encode_parse(name, sql, &[])?;
300 buf.extend_from_slice(&parse_msg);
301 }
302 PgEncoder::encode_sync_to(&mut buf);
303 let preprepare_timeout = self.inner.config.connect_timeout;
304 let preprepare_result: PgResult<()> = match tokio::time::timeout(
305 preprepare_timeout,
306 async {
307 conn.send_bytes(&buf).await?;
308 let mut parse_complete_count = 0usize;
310 let mut parse_error: Option<PgError> = None;
311 loop {
312 let msg = conn.recv().await?;
313 if handle_hot_preprepare_message(
314 &msg,
315 &mut parse_complete_count,
316 &mut parse_error,
317 )? {
318 if let Some(err) = parse_error {
319 return Err(err);
320 }
321 if parse_complete_count != missing.len() {
322 return Err(PgError::Protocol(format!(
323 "hot pre-prepare completed with {} ParseComplete messages (expected {})",
324 parse_complete_count,
325 missing.len()
326 )));
327 }
328 break;
329 }
330 }
331 Ok::<(), PgError>(())
332 },
333 )
334 .await
335 {
336 Ok(res) => res,
337 Err(_) => Err(PgError::Timeout(format!(
338 "hot statement pre-prepare timeout after {:?} (pool config connect_timeout)",
339 preprepare_timeout
340 ))),
341 };
342
343 if let Err(e) = preprepare_result {
344 tracing::warn!(
345 host = %self.inner.config.host,
346 port = self.inner.config.port,
347 user = %self.inner.config.user,
348 db = %self.inner.config.database,
349 timeout_ms = preprepare_timeout.as_millis() as u64,
350 error = %e,
351 "pool_hot_prepare_failed: replacing connection to avoid handing out uncertain protocol state"
352 );
353 pool_churn_record_destroy(&self.inner.config, "hot_prepare_failed");
354 conn = Self::create_connection(&self.inner.config).await?;
355 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
356 created_at = Instant::now();
357 } else {
358 for (hash, name, sql) in &missing {
360 conn.stmt_cache.put(*hash, name.clone());
361 conn.prepared_statements.insert(name.clone(), sql.clone());
362 }
363 }
364 }
365
366 self.inner.active_count.fetch_add(1, Ordering::Relaxed);
367 permit.forget();
369
370 Ok(PooledConnection {
371 conn: Some(conn),
372 pool: self.inner.clone(),
373 rls_dirty: false,
374 created_at,
375 })
376 }
377
378 pub async fn acquire_with_rls(
394 &self,
395 ctx: qail_core::rls::RlsContext,
396 ) -> PgResult<PooledConnection> {
397 let mut conn = self.acquire_raw().await?;
399
400 let sql = crate::driver::rls::context_to_sql(&ctx);
402 let pg_conn = conn.get_mut()?;
403 if let Err(e) = execute_simple_with_timeout(
404 pg_conn,
405 &sql,
406 self.inner.config.connect_timeout,
407 "pool acquire_with_rls setup",
408 )
409 .await
410 {
411 if let Ok(pg_conn) = conn.get_mut() {
414 let _ = pg_conn.execute_simple("ROLLBACK").await;
415 }
416 conn.release().await;
417 return Err(e);
418 }
419
420 conn.rls_dirty = true;
422
423 Ok(conn)
424 }
425
426 pub async fn with_rls<T, F>(&self, ctx: qail_core::rls::RlsContext, f: F) -> PgResult<T>
430 where
431 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
432 {
433 let mut conn = self.acquire_with_rls(ctx).await?;
434 let out = f(&mut conn).await;
435 conn.release().await;
436 out
437 }
438
439 pub async fn with_system<T, F>(&self, f: F) -> PgResult<T>
441 where
442 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
443 {
444 self.with_rls(qail_core::rls::RlsContext::empty(), f).await
445 }
446
447 pub async fn with_global<T, F>(&self, f: F) -> PgResult<T>
449 where
450 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
451 {
452 self.with_rls(qail_core::rls::RlsContext::global(), f).await
453 }
454
455 pub async fn with_tenant<T, F>(&self, tenant_id: &str, f: F) -> PgResult<T>
457 where
458 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
459 {
460 self.with_rls(qail_core::rls::RlsContext::tenant(tenant_id), f)
461 .await
462 }
463
464 pub async fn acquire_with_rls_timeout(
469 &self,
470 ctx: qail_core::rls::RlsContext,
471 timeout_ms: u32,
472 ) -> PgResult<PooledConnection> {
473 let mut conn = self.acquire_raw().await?;
475
476 let sql = crate::driver::rls::context_to_sql_with_timeout(&ctx, timeout_ms);
478 let pg_conn = conn.get_mut()?;
479 if let Err(e) = execute_simple_with_timeout(
480 pg_conn,
481 &sql,
482 self.inner.config.connect_timeout,
483 "pool acquire_with_rls_timeout setup",
484 )
485 .await
486 {
487 if let Ok(pg_conn) = conn.get_mut() {
488 let _ = pg_conn.execute_simple("ROLLBACK").await;
489 }
490 conn.release().await;
491 return Err(e);
492 }
493
494 conn.rls_dirty = true;
496
497 Ok(conn)
498 }
499
500 pub async fn with_rls_timeout<T, F>(
502 &self,
503 ctx: qail_core::rls::RlsContext,
504 timeout_ms: u32,
505 f: F,
506 ) -> PgResult<T>
507 where
508 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
509 {
510 let mut conn = self.acquire_with_rls_timeout(ctx, timeout_ms).await?;
511 let out = f(&mut conn).await;
512 conn.release().await;
513 out
514 }
515
516 pub async fn acquire_with_rls_timeouts(
522 &self,
523 ctx: qail_core::rls::RlsContext,
524 statement_timeout_ms: u32,
525 lock_timeout_ms: u32,
526 ) -> PgResult<PooledConnection> {
527 let mut conn = self.acquire_raw().await?;
529
530 let sql = crate::driver::rls::context_to_sql_with_timeouts(
531 &ctx,
532 statement_timeout_ms,
533 lock_timeout_ms,
534 );
535 let pg_conn = conn.get_mut()?;
536 if let Err(e) = execute_simple_with_timeout(
537 pg_conn,
538 &sql,
539 self.inner.config.connect_timeout,
540 "pool acquire_with_rls_timeouts setup",
541 )
542 .await
543 {
544 if let Ok(pg_conn) = conn.get_mut() {
545 let _ = pg_conn.execute_simple("ROLLBACK").await;
546 }
547 conn.release().await;
548 return Err(e);
549 }
550
551 conn.rls_dirty = true;
552
553 Ok(conn)
554 }
555
556 pub async fn with_rls_timeouts<T, F>(
558 &self,
559 ctx: qail_core::rls::RlsContext,
560 statement_timeout_ms: u32,
561 lock_timeout_ms: u32,
562 f: F,
563 ) -> PgResult<T>
564 where
565 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
566 {
567 let mut conn = self
568 .acquire_with_rls_timeouts(ctx, statement_timeout_ms, lock_timeout_ms)
569 .await?;
570 let out = f(&mut conn).await;
571 conn.release().await;
572 out
573 }
574
575 pub async fn acquire_system(&self) -> PgResult<PooledConnection> {
585 let ctx = qail_core::rls::RlsContext::empty();
586 self.acquire_with_rls(ctx).await
587 }
588
589 pub async fn acquire_global(&self) -> PgResult<PooledConnection> {
595 self.acquire_with_rls(qail_core::rls::RlsContext::global())
596 .await
597 }
598
599 pub async fn acquire_for_tenant(&self, tenant_id: &str) -> PgResult<PooledConnection> {
611 self.acquire_with_rls(qail_core::rls::RlsContext::tenant(tenant_id))
612 .await
613 }
614
615 pub async fn acquire_with_branch(
629 &self,
630 ctx: &qail_core::branch::BranchContext,
631 ) -> PgResult<PooledConnection> {
632 let mut conn = self.acquire_raw().await?;
634
635 if let Some(branch_name) = ctx.branch_name() {
636 let sql = crate::driver::branch_sql::branch_context_sql(branch_name);
637 let pg_conn = conn.get_mut()?;
638 if let Err(e) = execute_simple_with_timeout(
639 pg_conn,
640 &sql,
641 self.inner.config.connect_timeout,
642 "pool acquire_with_branch setup",
643 )
644 .await
645 {
646 if let Ok(pg_conn) = conn.get_mut() {
647 let _ = pg_conn.execute_simple("ROLLBACK").await;
648 }
649 conn.release().await;
650 return Err(e);
651 }
652 conn.rls_dirty = true; }
654
655 Ok(conn)
656 }
657
658 pub async fn idle_count(&self) -> usize {
660 self.inner.connections.lock().await.len()
661 }
662
663 pub fn active_count(&self) -> usize {
665 self.inner.active_count.load(Ordering::Relaxed)
666 }
667
668 pub fn max_connections(&self) -> usize {
670 self.inner.config.max_connections
671 }
672
673 pub async fn stats(&self) -> PoolStats {
675 let idle = self.inner.connections.lock().await.len();
676 let active = self.inner.active_count.load(Ordering::Relaxed);
677 let used_slots = self
678 .inner
679 .config
680 .max_connections
681 .saturating_sub(self.inner.semaphore.available_permits());
682 PoolStats {
683 active,
684 idle,
685 pending: used_slots.saturating_sub(active),
686 max_size: self.inner.config.max_connections,
687 total_created: self.inner.total_created.load(Ordering::Relaxed),
688 }
689 }
690
691 pub fn is_closed(&self) -> bool {
693 self.inner.closed.load(Ordering::Relaxed)
694 }
695
696 pub async fn close(&self) {
703 self.close_graceful(self.inner.config.acquire_timeout).await;
704 }
705
706 pub async fn close_graceful(&self, drain_timeout: Duration) {
708 self.inner.closed.store(true, Ordering::Relaxed);
709 self.inner.semaphore.close();
711
712 let deadline = Instant::now() + drain_timeout;
713 loop {
714 let active = self.inner.active_count.load(Ordering::Relaxed);
715 if active == 0 {
716 break;
717 }
718 if Instant::now() >= deadline {
719 tracing::warn!(
720 active_connections = active,
721 timeout_ms = drain_timeout.as_millis() as u64,
722 "pool_close_drain_timeout: forcing idle cleanup while active connections remain"
723 );
724 break;
725 }
726 tokio::time::sleep(Duration::from_millis(25)).await;
727 }
728
729 let mut connections = self.inner.connections.lock().await;
730 let dropped_idle = connections.len();
731 connections.clear();
732 tracing::info!(
733 dropped_idle_connections = dropped_idle,
734 active_connections = self.inner.active_count.load(Ordering::Relaxed),
735 "pool_closed"
736 );
737 }
738
739 async fn create_connection(config: &PoolConfig) -> PgResult<PgConnection> {
741 if !config.auth_settings.has_any_password_method()
742 && config.mtls.is_none()
743 && config.password.is_some()
744 {
745 return Err(PgError::Auth(
746 "Invalid PoolConfig: all password auth methods are disabled".to_string(),
747 ));
748 }
749
750 let options = ConnectOptions {
751 tls_mode: config.tls_mode,
752 gss_enc_mode: config.gss_enc_mode,
753 tls_ca_cert_pem: config.tls_ca_cert_pem.clone(),
754 mtls: config.mtls.clone(),
755 gss_token_provider: config.gss_token_provider,
756 gss_token_provider_ex: config.gss_token_provider_ex.clone(),
757 auth: config.auth_settings,
758 startup_params: Vec::new(),
759 };
760
761 if let Some(remaining) = gss_circuit_remaining_open(config) {
762 metrics::counter!("qail_pg_gss_circuit_open_total").increment(1);
763 tracing::warn!(
764 host = %config.host,
765 port = config.port,
766 user = %config.user,
767 db = %config.database,
768 remaining_ms = remaining.as_millis() as u64,
769 "gss_connect_circuit_open"
770 );
771 return Err(PgError::Connection(format!(
772 "GSS connection circuit is open; retry after {:?}",
773 remaining
774 )));
775 }
776
777 let mut attempt = 0usize;
778 loop {
779 let connect_result = tokio::time::timeout(
780 config.connect_timeout,
781 PgConnection::connect_with_options(
782 &config.host,
783 config.port,
784 &config.user,
785 &config.database,
786 config.password.as_deref(),
787 options.clone(),
788 ),
789 )
790 .await;
791
792 let connect_result = match connect_result {
793 Ok(result) => result,
794 Err(_) => Err(PgError::Timeout(format!(
795 "connect timeout after {:?} (pool config connect_timeout)",
796 config.connect_timeout
797 ))),
798 };
799
800 match connect_result {
801 Ok(conn) => {
802 metrics::counter!("qail_pg_pool_connect_success_total").increment(1);
803 gss_circuit_record_success(config);
804 return Ok(conn);
805 }
806 Err(err) if should_retry_gss_connect_error(config, attempt, &err) => {
807 metrics::counter!("qail_pg_gss_connect_retries_total").increment(1);
808 gss_circuit_record_failure(config);
809 let delay = gss_retry_delay(config.gss_retry_base_delay, attempt);
810 tracing::warn!(
811 host = %config.host,
812 port = config.port,
813 user = %config.user,
814 db = %config.database,
815 attempt = attempt + 1,
816 delay_ms = delay.as_millis() as u64,
817 error = %err,
818 "gss_connect_retry"
819 );
820 tokio::time::sleep(delay).await;
821 attempt += 1;
822 }
823 Err(err) => {
824 metrics::counter!("qail_pg_pool_connect_failures_total").increment(1);
825 if should_track_gss_circuit_error(config, &err) {
826 metrics::counter!("qail_pg_gss_connect_failures_total").increment(1);
827 gss_circuit_record_failure(config);
828 }
829 return Err(err);
830 }
831 }
832 }
833 }
834
835 pub async fn maintain(&self) {
838 if self.inner.closed.load(Ordering::Relaxed) {
839 return;
840 }
841
842 let evicted = {
844 let mut connections = self.inner.connections.lock().await;
845 let before = connections.len();
846 connections.retain(|pooled| {
847 if pooled.last_used.elapsed() > self.inner.config.idle_timeout {
848 record_pool_connection_destroy("idle_sweep_evict");
849 return false;
850 }
851 if let Some(max_life) = self.inner.config.max_lifetime
852 && pooled.created_at.elapsed() > max_life
853 {
854 record_pool_connection_destroy("lifetime_sweep_evict");
855 return false;
856 }
857 true
858 });
859 before - connections.len()
860 };
861
862 if evicted > 0 {
863 tracing::debug!(evicted, "pool_maintenance: evicted stale idle connections");
864 }
865
866 let min = self.inner.config.min_connections;
868 if min == 0 {
869 return;
870 }
871
872 let idle_count = self.inner.connections.lock().await.len();
873 if idle_count >= min {
874 return;
875 }
876
877 let deficit = min - idle_count;
878 let mut created = 0usize;
879 for _ in 0..deficit {
880 match Self::create_connection(&self.inner.config).await {
881 Ok(conn) => {
882 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
883 let mut connections = self.inner.connections.lock().await;
884 if connections.len() < self.inner.config.max_connections {
885 connections.push(PooledConn {
886 conn,
887 created_at: Instant::now(),
888 last_used: Instant::now(),
889 });
890 created += 1;
891 } else {
892 break;
894 }
895 }
896 Err(e) => {
897 tracing::warn!(error = %e, "pool_maintenance: backfill connection failed");
898 break; }
900 }
901 }
902
903 if created > 0 {
904 tracing::debug!(
905 created,
906 min_connections = min,
907 "pool_maintenance: backfilled idle connections"
908 );
909 }
910 }
911}
912
913pub fn spawn_pool_maintenance(pool: PgPool) {
918 let interval_secs = std::cmp::max(pool.inner.config.idle_timeout.as_secs() / 2, 5);
919 tokio::spawn(async move {
920 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
921 loop {
922 interval.tick().await;
923 if pool.is_closed() {
924 break;
925 }
926 pool.maintain().await;
927 }
928 });
929}
930
931pub(super) fn validate_pool_config(config: &PoolConfig) -> PgResult<()> {
932 if config.max_connections == 0 {
933 return Err(PgError::Connection(
934 "Invalid PoolConfig: max_connections must be >= 1".to_string(),
935 ));
936 }
937 if config.min_connections > config.max_connections {
938 return Err(PgError::Connection(format!(
939 "Invalid PoolConfig: min_connections ({}) must be <= max_connections ({})",
940 config.min_connections, config.max_connections
941 )));
942 }
943 if config.acquire_timeout.is_zero() {
944 return Err(PgError::Connection(
945 "Invalid PoolConfig: acquire_timeout must be > 0".to_string(),
946 ));
947 }
948 if config.connect_timeout.is_zero() {
949 return Err(PgError::Connection(
950 "Invalid PoolConfig: connect_timeout must be > 0".to_string(),
951 ));
952 }
953 if config.leaked_cleanup_queue == 0 {
954 return Err(PgError::Connection(
955 "Invalid PoolConfig: leaked_cleanup_queue must be >= 1".to_string(),
956 ));
957 }
958 Ok(())
959}
960
961pub(super) async fn execute_simple_with_timeout(
962 conn: &mut PgConnection,
963 sql: &str,
964 timeout: Duration,
965 operation: &str,
966) -> PgResult<()> {
967 match tokio::time::timeout(timeout, conn.execute_simple(sql)).await {
968 Ok(result) => result,
969 Err(_) => {
970 conn.mark_io_desynced();
971 Err(PgError::Timeout(format!(
972 "{} timeout after {:?} (pool config connect_timeout)",
973 operation, timeout
974 )))
975 }
976 }
977}