1use std::collections::VecDeque;
27use std::time::{Duration, Instant};
28
29use crate::connection::{PgConfig, PgConnection};
30use crate::error::{PgError, PgResult};
31
32#[derive(Debug, Clone)]
36pub struct PgPoolConfig {
37 pub max_size: usize,
39 pub min_size: usize,
41 pub max_lifetime: Option<Duration>,
43 pub idle_timeout: Option<Duration>,
45 pub checkout_timeout: Option<Duration>,
47 pub connection_timeout: Option<Duration>,
49 pub test_on_checkout: bool,
51 pub validation_query: String,
53 pub auto_reconnect: bool,
55}
56
57impl Default for PgPoolConfig {
58 fn default() -> Self {
59 Self {
60 max_size: 10,
61 min_size: 1,
62 max_lifetime: Some(Duration::from_secs(30 * 60)), idle_timeout: Some(Duration::from_secs(10 * 60)), checkout_timeout: Some(Duration::from_secs(5)),
65 connection_timeout: Some(Duration::from_secs(5)),
66 test_on_checkout: false,
67 validation_query: "SELECT 1".to_string(),
68 auto_reconnect: true,
69 }
70 }
71}
72
73impl PgPoolConfig {
74 pub fn new() -> Self {
76 Self::default()
77 }
78
79 pub fn max_size(mut self, size: usize) -> Self {
81 self.max_size = size;
82 self
83 }
84
85 pub fn min_size(mut self, size: usize) -> Self {
87 self.min_size = size;
88 self
89 }
90
91 pub fn max_lifetime(mut self, duration: Duration) -> Self {
93 self.max_lifetime = Some(duration);
94 self
95 }
96
97 pub fn idle_timeout(mut self, duration: Duration) -> Self {
99 self.idle_timeout = Some(duration);
100 self
101 }
102
103 pub fn checkout_timeout(mut self, duration: Duration) -> Self {
105 self.checkout_timeout = Some(duration);
106 self
107 }
108
109 pub fn connection_timeout(mut self, duration: Duration) -> Self {
111 self.connection_timeout = Some(duration);
112 self
113 }
114
115 pub fn test_on_checkout(mut self, enable: bool) -> Self {
117 self.test_on_checkout = enable;
118 self
119 }
120
121 pub fn no_max_lifetime(mut self) -> Self {
123 self.max_lifetime = None;
124 self
125 }
126
127 pub fn no_idle_timeout(mut self) -> Self {
129 self.idle_timeout = None;
130 self
131 }
132}
133
134struct PooledConn {
138 conn: PgConnection,
139 created_at: Instant,
140 last_used: Instant,
141}
142
143impl PooledConn {
144 fn new(conn: PgConnection) -> Self {
145 let now = Instant::now();
146 Self {
147 conn,
148 created_at: now,
149 last_used: now,
150 }
151 }
152
153 fn is_lifetime_expired(&self, max_lifetime: Option<Duration>) -> bool {
155 max_lifetime.is_some_and(|max| self.created_at.elapsed() > max)
156 }
157
158 fn is_idle_expired(&self, idle_timeout: Option<Duration>) -> bool {
160 idle_timeout.is_some_and(|timeout| self.last_used.elapsed() > timeout)
161 }
162}
163
164#[derive(Debug, Clone, Default)]
168pub struct PoolStats {
169 pub total_checkouts: u64,
170 pub total_connections_created: u64,
171 pub total_connections_closed: u64,
172 pub validation_failures: u64,
173 pub lifetime_expirations: u64,
174 pub idle_expirations: u64,
175 pub checkout_timeouts: u64,
176}
177
178pub struct PgPool {
186 config: PgConfig,
187 pool_config: PgPoolConfig,
188 idle: VecDeque<PooledConn>,
190 active: usize,
193 stats: PoolStats,
195}
196
197impl PgPool {
198 pub fn new(config: PgConfig, size: usize) -> Self {
201 let pool_config = PgPoolConfig::default().max_size(size);
202 Self {
203 config,
204 pool_config,
205 idle: VecDeque::with_capacity(size),
206 active: 0,
207 stats: PoolStats::default(),
208 }
209 }
210
211 pub fn with_config(config: PgConfig, pool_config: PgPoolConfig) -> Self {
213 Self {
214 idle: VecDeque::with_capacity(pool_config.max_size),
215 config,
216 pool_config,
217 active: 0,
218 stats: PoolStats::default(),
219 }
220 }
221
222 pub fn connect(config: PgConfig, size: usize) -> PgResult<Self> {
224 let mut pool = Self::new(config, size);
225 for _ in 0..size {
226 let conn = PgConnection::connect(&pool.config)?;
227 pool.idle.push_back(PooledConn::new(conn));
228 pool.stats.total_connections_created += 1;
229 }
230 Ok(pool)
231 }
232
233 pub fn connect_with_config(config: PgConfig, pool_config: PgPoolConfig) -> PgResult<Self> {
235 let min = pool_config.min_size.min(pool_config.max_size);
236 let mut pool = Self::with_config(config, pool_config);
237 for _ in 0..min {
238 let conn = PgConnection::connect(&pool.config)?;
239 pool.idle.push_back(PooledConn::new(conn));
240 pool.stats.total_connections_created += 1;
241 }
242 Ok(pool)
243 }
244
245 fn try_checkout(&mut self) -> PgResult<PooledConn> {
251 self.stats.total_checkouts += 1;
252
253 while let Some(mut pooled) = self.idle.pop_front() {
255 if pooled.is_lifetime_expired(self.pool_config.max_lifetime) {
257 self.stats.lifetime_expirations += 1;
258 self.stats.total_connections_closed += 1;
259 continue; }
261 if pooled.is_idle_expired(self.pool_config.idle_timeout) {
262 self.stats.idle_expirations += 1;
263 self.stats.total_connections_closed += 1;
264 continue;
265 }
266
267 if self.pool_config.test_on_checkout
269 && pooled
270 .conn
271 .query_simple(&self.pool_config.validation_query)
272 .is_err()
273 {
274 self.stats.validation_failures += 1;
275 self.stats.total_connections_closed += 1;
276 if self.pool_config.auto_reconnect {
277 match PgConnection::connect(&self.config) {
279 Ok(new_conn) => {
280 pooled = PooledConn::new(new_conn);
281 self.stats.total_connections_created += 1;
282 }
283 Err(e) => return Err(e),
284 }
285 } else {
286 return Err(PgError::PoolValidationFailed);
287 }
288 }
289
290 pooled.last_used = Instant::now();
291 return Ok(pooled);
292 }
293
294 let total = self.active + self.idle.len();
296 if total < self.pool_config.max_size {
297 let conn = PgConnection::connect(&self.config)?;
298 self.stats.total_connections_created += 1;
299 let pooled = PooledConn::new(conn);
300 return Ok(pooled);
301 }
302
303 Err(PgError::PoolExhausted)
305 }
306
307 pub fn try_get(&mut self) -> PgResult<ConnectionGuard<'_>> {
315 let pooled = self.try_checkout()?;
316 self.active += 1;
317 Ok(ConnectionGuard {
318 pool: self as *mut PgPool,
319 conn: Some(pooled),
320 _marker: std::marker::PhantomData,
321 })
322 }
323
324 pub fn get(&mut self) -> PgResult<ConnectionGuard<'_>> {
330 let timeout = self
331 .pool_config
332 .checkout_timeout
333 .unwrap_or(Duration::from_secs(5));
334 let start = Instant::now();
335
336 match self.try_checkout() {
338 Ok(pooled) => {
339 self.active += 1;
340 return Ok(ConnectionGuard {
341 pool: self as *mut PgPool,
342 conn: Some(pooled),
343 _marker: std::marker::PhantomData,
344 });
345 }
346 Err(PgError::PoolExhausted) => { }
347 Err(e) => return Err(e),
348 }
349
350 let backoff_us = [100u64, 250, 500, 1000];
352 let mut attempt = 0usize;
353 loop {
354 if start.elapsed() >= timeout {
355 self.stats.checkout_timeouts += 1;
356 return Err(PgError::PoolTimeout);
357 }
358
359 let sleep_us = backoff_us[attempt.min(backoff_us.len() - 1)];
360 std::thread::sleep(Duration::from_micros(sleep_us));
361 attempt += 1;
362
363 match self.try_checkout() {
364 Ok(pooled) => {
365 self.active += 1;
366 return Ok(ConnectionGuard {
367 pool: self as *mut PgPool,
368 conn: Some(pooled),
369 _marker: std::marker::PhantomData,
370 });
371 }
372 Err(PgError::PoolExhausted) => continue,
373 Err(e) => return Err(e),
374 }
375 }
376 }
377
378 fn return_conn(&mut self, mut pooled: PooledConn) {
380 self.active = self.active.saturating_sub(1);
381
382 if pooled.conn.is_broken() {
384 self.stats.total_connections_closed += 1;
385 return; }
387
388 pooled.last_used = Instant::now();
389
390 if self.idle.len() + self.active < self.pool_config.max_size {
392 self.idle.push_back(pooled);
393 } else {
394 self.stats.total_connections_closed += 1;
395 }
397 }
398
399 pub fn reap(&mut self) {
406 let mut i = 0;
407 while i < self.idle.len() {
408 let expired = {
409 let pooled = &self.idle[i];
410 pooled.is_lifetime_expired(self.pool_config.max_lifetime)
411 || pooled.is_idle_expired(self.pool_config.idle_timeout)
412 };
413 if expired {
414 self.idle.remove(i);
415 self.stats.total_connections_closed += 1;
416 } else {
417 i += 1;
418 }
419 }
420
421 let total = self.active + self.idle.len();
423 if total < self.pool_config.min_size {
424 let need = self.pool_config.min_size - total;
425 for _ in 0..need {
426 if let Ok(conn) = PgConnection::connect(&self.config) {
427 self.idle.push_back(PooledConn::new(conn));
428 self.stats.total_connections_created += 1;
429 }
430 }
431 }
432 }
433
434 pub fn config(&self) -> &PgConfig {
438 &self.config
439 }
440
441 pub fn pool_config(&self) -> &PgPoolConfig {
443 &self.pool_config
444 }
445
446 pub fn pool_size(&self) -> usize {
448 self.pool_config.max_size
449 }
450
451 pub fn set_max_size(&mut self, new_size: usize) {
458 self.pool_config.max_size = new_size;
459 while self.idle.len() + self.active > new_size && !self.idle.is_empty() {
461 self.idle.pop_front();
462 self.stats.total_connections_closed += 1;
463 }
464 }
465
466 pub fn idle_connections(&self) -> usize {
468 self.idle.len()
469 }
470
471 pub fn active_connections(&self) -> usize {
473 self.active
474 }
475
476 pub fn total_connections(&self) -> usize {
478 self.idle.len() + self.active
479 }
480
481 pub fn stats(&self) -> &PoolStats {
483 &self.stats
484 }
485
486 pub fn close_all(&mut self) {
488 let closed = self.idle.len();
489 self.idle.clear();
490 self.stats.total_connections_closed += closed as u64;
491 }
492}
493
494pub struct ConnectionGuard<'a> {
508 pool: *mut PgPool,
514 conn: Option<PooledConn>,
515 _marker: std::marker::PhantomData<&'a mut PgPool>,
517}
518
519impl<'a> ConnectionGuard<'a> {
520 #[inline]
522 pub fn conn(&mut self) -> &mut PgConnection {
523 &mut self
524 .conn
525 .as_mut()
526 .expect("ConnectionGuard used after take")
527 .conn
528 }
529}
530
531impl<'a> std::ops::Deref for ConnectionGuard<'a> {
532 type Target = PgConnection;
533 fn deref(&self) -> &PgConnection {
534 &self
535 .conn
536 .as_ref()
537 .expect("ConnectionGuard used after take")
538 .conn
539 }
540}
541
542impl<'a> std::ops::DerefMut for ConnectionGuard<'a> {
543 fn deref_mut(&mut self) -> &mut PgConnection {
544 &mut self
545 .conn
546 .as_mut()
547 .expect("ConnectionGuard used after take")
548 .conn
549 }
550}
551
552impl<'a> Drop for ConnectionGuard<'a> {
553 fn drop(&mut self) {
554 if let Some(pooled) = self.conn.take() {
555 unsafe {
558 (*self.pool).return_conn(pooled);
559 }
560 }
561 }
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567 use crate::connection::PgConfig;
568 use crate::error::PgError;
569
570 fn dummy_config() -> PgConfig {
571 PgConfig::new("127.0.0.1", 5432, "test", "test", "testdb")
572 }
573
574 #[test]
577 fn test_pool_config_default_values() {
578 let cfg = PgPoolConfig::default();
579 assert_eq!(cfg.max_size, 10);
580 assert_eq!(cfg.min_size, 1);
581 assert!(
582 cfg.max_lifetime.is_some(),
583 "default max_lifetime should be set"
584 );
585 assert!(
586 cfg.idle_timeout.is_some(),
587 "default idle_timeout should be set"
588 );
589 assert!(
590 cfg.checkout_timeout.is_some(),
591 "default checkout_timeout should be set"
592 );
593 assert!(cfg.connection_timeout.is_some());
594 assert!(
595 !cfg.test_on_checkout,
596 "test_on_checkout should default to false"
597 );
598 assert!(cfg.auto_reconnect, "auto_reconnect should default to true");
599 assert_eq!(cfg.validation_query, "SELECT 1");
600 }
601
602 #[test]
603 fn test_pool_config_new_equals_default() {
604 let a = PgPoolConfig::new();
605 let b = PgPoolConfig::default();
606 assert_eq!(a.max_size, b.max_size);
607 assert_eq!(a.min_size, b.min_size);
608 }
609
610 #[test]
613 fn test_builder_max_size() {
614 let cfg = PgPoolConfig::new().max_size(25);
615 assert_eq!(cfg.max_size, 25);
616 }
617
618 #[test]
619 fn test_builder_min_size() {
620 let cfg = PgPoolConfig::new().min_size(3);
621 assert_eq!(cfg.min_size, 3);
622 }
623
624 #[test]
625 fn test_builder_max_lifetime() {
626 let d = Duration::from_secs(900);
627 let cfg = PgPoolConfig::new().max_lifetime(d);
628 assert_eq!(cfg.max_lifetime, Some(d));
629 }
630
631 #[test]
632 fn test_builder_no_max_lifetime() {
633 let cfg = PgPoolConfig::new().no_max_lifetime();
634 assert!(cfg.max_lifetime.is_none());
635 }
636
637 #[test]
638 fn test_builder_idle_timeout() {
639 let d = Duration::from_secs(300);
640 let cfg = PgPoolConfig::new().idle_timeout(d);
641 assert_eq!(cfg.idle_timeout, Some(d));
642 }
643
644 #[test]
645 fn test_builder_no_idle_timeout() {
646 let cfg = PgPoolConfig::new().no_idle_timeout();
647 assert!(cfg.idle_timeout.is_none());
648 }
649
650 #[test]
651 fn test_builder_checkout_timeout() {
652 let d = Duration::from_secs(10);
653 let cfg = PgPoolConfig::new().checkout_timeout(d);
654 assert_eq!(cfg.checkout_timeout, Some(d));
655 }
656
657 #[test]
658 fn test_builder_connection_timeout() {
659 let d = Duration::from_secs(3);
660 let cfg = PgPoolConfig::new().connection_timeout(d);
661 assert_eq!(cfg.connection_timeout, Some(d));
662 }
663
664 #[test]
665 fn test_builder_test_on_checkout() {
666 let cfg = PgPoolConfig::new().test_on_checkout(true);
667 assert!(cfg.test_on_checkout);
668 let cfg2 = PgPoolConfig::new().test_on_checkout(false);
669 assert!(!cfg2.test_on_checkout);
670 }
671
672 #[test]
673 fn test_builder_auto_reconnect_false() {
674 let mut cfg = PgPoolConfig::new();
675 cfg.auto_reconnect = false;
676 assert!(!cfg.auto_reconnect);
677 cfg.auto_reconnect = true;
678 assert!(cfg.auto_reconnect);
679 }
680
681 #[test]
682 fn test_builder_validation_query() {
683 let mut cfg = PgPoolConfig::new();
684 cfg.validation_query = "SELECT version()".to_string();
685 assert_eq!(cfg.validation_query, "SELECT version()");
686 }
687
688 #[test]
689 fn test_builder_chained() {
690 let mut cfg = PgPoolConfig::new()
691 .max_size(20)
692 .min_size(2)
693 .checkout_timeout(Duration::from_secs(5))
694 .test_on_checkout(true)
695 .no_idle_timeout();
696 cfg.auto_reconnect = false;
697 cfg.validation_query = "SELECT 1+1".to_string();
698 assert_eq!(cfg.max_size, 20);
699 assert_eq!(cfg.min_size, 2);
700 assert!(cfg.test_on_checkout);
701 assert!(!cfg.auto_reconnect);
702 assert!(cfg.idle_timeout.is_none());
703 assert_eq!(cfg.validation_query, "SELECT 1+1");
704 }
705
706 #[test]
707 fn test_builder_clone() {
708 let cfg = PgPoolConfig::new().max_size(7).min_size(2);
709 let cloned = cfg.clone();
710 assert_eq!(cloned.max_size, 7);
711 assert_eq!(cloned.min_size, 2);
712 }
713
714 #[test]
717 fn test_pool_stats_all_zero_initially() {
718 let stats = PoolStats::default();
719 assert_eq!(stats.total_checkouts, 0);
720 assert_eq!(stats.total_connections_created, 0);
721 assert_eq!(stats.total_connections_closed, 0);
722 assert_eq!(stats.validation_failures, 0);
723 assert_eq!(stats.lifetime_expirations, 0);
724 assert_eq!(stats.idle_expirations, 0);
725 assert_eq!(stats.checkout_timeouts, 0);
726 }
727
728 #[test]
731 fn test_pool_new_starts_empty() {
732 let pool = PgPool::new(dummy_config(), 10);
733 assert_eq!(pool.idle_connections(), 0);
734 assert_eq!(pool.active_connections(), 0);
735 assert_eq!(pool.total_connections(), 0);
736 }
737
738 #[test]
739 fn test_pool_stats_initially_zeroed() {
740 let pool = PgPool::new(dummy_config(), 5);
741 let s = pool.stats();
742 assert_eq!(s.total_checkouts, 0);
743 assert_eq!(s.total_connections_created, 0);
744 assert_eq!(s.total_connections_closed, 0);
745 }
746
747 #[test]
748 fn test_pool_total_equals_idle_plus_active() {
749 let pool = PgPool::new(dummy_config(), 10);
750 assert_eq!(
751 pool.total_connections(),
752 pool.idle_connections() + pool.active_connections()
753 );
754 }
755
756 #[test]
761 fn test_try_get_returns_pool_exhausted_when_at_capacity() {
762 let mut pool = PgPool::new(dummy_config(), 0);
763 let result = pool.try_get();
764 assert!(
765 matches!(result, Err(PgError::PoolExhausted)),
766 "Expected PoolExhausted, got: {:?}",
767 result.err()
768 );
769 }
770
771 #[test]
772 fn test_try_get_never_returns_would_block() {
773 let mut pool = PgPool::new(dummy_config(), 0);
775 let result = pool.try_get();
776 assert!(
777 !matches!(result, Err(PgError::WouldBlock)),
778 "try_get must NOT return WouldBlock — pool should return PoolExhausted"
779 );
780 }
781
782 #[test]
783 fn test_get_with_short_timeout_returns_pool_timeout_when_empty() {
784 let pool_cfg = PgPoolConfig::new()
785 .max_size(0)
786 .checkout_timeout(Duration::from_millis(1));
787 let mut pool = PgPool::with_config(dummy_config(), pool_cfg);
788 let result = pool.get();
789 assert!(
790 matches!(result, Err(PgError::PoolTimeout)),
791 "Expected PoolTimeout after checkout_timeout exceeded, got: {:?}",
792 result.err()
793 );
794 }
795
796 #[test]
797 fn test_get_timeout_increments_checkout_timeout_counter() {
798 let pool_cfg = PgPoolConfig::new()
799 .max_size(0)
800 .checkout_timeout(Duration::from_millis(1));
801 let mut pool = PgPool::with_config(dummy_config(), pool_cfg);
802 let _ = pool.get();
803 assert_eq!(pool.stats().checkout_timeouts, 1);
804 }
805
806 #[test]
809 fn test_set_max_size_to_zero_makes_pool_exhausted() {
810 let mut pool = PgPool::new(dummy_config(), 10);
811 pool.set_max_size(0);
812 let result = pool.try_get();
813 assert!(matches!(result, Err(PgError::PoolExhausted)));
814 }
815
816 #[test]
817 fn test_set_max_size_grow_does_not_panic() {
818 let mut pool = PgPool::new(dummy_config(), 5);
819 pool.set_max_size(100); assert_eq!(pool.idle_connections(), 0); }
822
823 #[test]
824 fn test_set_max_size_shrink_with_empty_idle_is_noop() {
825 let mut pool = PgPool::new(dummy_config(), 10);
827 pool.set_max_size(1);
828 assert_eq!(pool.idle_connections(), 0);
829 }
830
831 #[test]
834 fn test_close_all_on_empty_pool_no_panic() {
835 let mut pool = PgPool::new(dummy_config(), 10);
836 pool.close_all(); assert_eq!(pool.idle_connections(), 0);
838 }
839
840 #[test]
841 fn test_close_all_does_not_affect_active_count() {
842 let mut pool = PgPool::new(dummy_config(), 10);
844 pool.close_all();
845 assert_eq!(pool.active_connections(), 0);
846 }
847
848 #[test]
849 fn test_close_all_increments_closed_stats() {
850 let mut pool = PgPool::new(dummy_config(), 5);
852 pool.close_all();
853 assert_eq!(pool.stats().total_connections_closed, 0);
855 }
856
857 #[test]
860 fn test_try_get_increments_checkout_counter() {
861 let mut pool = PgPool::new(dummy_config(), 0);
862 let _ = pool.try_get(); assert_eq!(pool.stats().total_checkouts, 1);
864 }
865
866 #[test]
867 fn test_try_get_multiple_exhausted_increments_counter() {
868 let mut pool = PgPool::new(dummy_config(), 0);
869 for _ in 0..5 {
870 let _ = pool.try_get();
871 }
872 assert_eq!(pool.stats().total_checkouts, 5);
873 }
874
875 #[test]
878 fn test_pool_with_config_respects_max_size() {
879 let cfg = PgPoolConfig::new().max_size(3);
880 let pool = PgPool::with_config(dummy_config(), cfg);
881 assert_eq!(pool.idle_connections(), 0);
882 assert_eq!(pool.active_connections(), 0);
883 }
884
885 #[test]
888 fn test_reap_on_empty_pool_no_panic() {
889 let mut pool = PgPool::new(dummy_config(), 10);
890 pool.reap(); assert_eq!(pool.idle_connections(), 0);
892 }
893}