1use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::time::Duration;
14
15use crate::DriverError;
16use crate::arena::Arena;
17use crate::codec::Encode;
18use crate::conn::Connection;
19use crate::types::{Config, PgDataRow, QueryResult, SimpleRow};
20
21pub struct Pool {
37 inner: Arc<PoolInner>,
38}
39
40struct PoolInner {
41 stack: std::sync::Mutex<Vec<Connection>>,
45 max_size: usize,
46 open_count: AtomicUsize,
47 config: Arc<Config>,
48 closed: AtomicBool,
50 release_pair: (std::sync::Mutex<()>, std::sync::Condvar),
53 max_lifetime: Option<Duration>,
56 acquire_timeout: Option<Duration>,
58 min_idle: usize,
60 warmup_sqls: std::sync::Mutex<Arc<Vec<Box<str>>>>,
62 max_stmt_cache_size: usize,
64}
65
66impl Pool {
67 pub fn connect(url: &str) -> Result<Self, DriverError> {
71 PoolBuilder::new().url(url).build()
72 }
73
74 pub fn builder() -> PoolBuilder {
76 PoolBuilder::new()
77 }
78
79 #[inline]
87 pub fn acquire(&self) -> Result<PoolGuard, DriverError> {
88 if self.inner.closed.load(Ordering::Acquire) {
89 return Err(DriverError::Pool("pool is closed".into()));
90 }
91
92 if let Some(guard) = self.try_pop_idle()? {
94 return Ok(guard);
95 }
96
97 loop {
99 let current = self.inner.open_count.load(Ordering::Acquire);
100 if current >= self.inner.max_size {
101 if let Some(timeout) = self.inner.acquire_timeout {
102 let (lock, cvar) = &self.inner.release_pair;
103 let guard = lock.lock().unwrap_or_else(|e| e.into_inner());
104 let (_guard, result) = cvar
105 .wait_timeout(guard, timeout)
106 .unwrap_or_else(|e| e.into_inner());
107 if result.timed_out() {
108 return Err(DriverError::Pool(
109 "pool exhausted: acquire timeout expired".into(),
110 ));
111 }
112 if let Some(guard) = self.try_pop_idle()? {
114 return Ok(guard);
115 }
116 continue;
118 }
119 return Err(DriverError::Pool(
120 "pool exhausted: all connections in use".into(),
121 ));
122 }
123 if self
124 .inner
125 .open_count
126 .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
127 .is_ok()
128 {
129 break;
130 }
131 }
133
134 let conn_result = Connection::connect_arc(self.inner.config.clone());
136 match conn_result {
137 Ok(mut conn) => {
138 conn.set_max_stmt_cache_size(self.inner.max_stmt_cache_size);
140 self.warmup_conn(&mut conn);
142
143 Ok(PoolGuard {
144 conn: Some(conn),
145 pool: self.inner.clone(),
146 discard: false,
147 })
148 }
149 Err(e) => {
150 self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
152 Err(e)
153 }
154 }
155 }
156
157 #[inline]
159 fn try_pop_idle(&self) -> Result<Option<PoolGuard>, DriverError> {
160 let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
161 while let Some(conn) = stack.pop() {
162 if let Some(max_lifetime) = self.inner.max_lifetime {
163 if conn.created_at().elapsed() >= max_lifetime {
164 self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
165 continue;
166 }
167 }
168 if conn.idle_duration() < Duration::from_secs(30) {
169 return Ok(Some(PoolGuard {
170 conn: Some(conn),
171 pool: self.inner.clone(),
172 discard: false,
173 }));
174 }
175 self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
177 }
178 Ok(None)
179 }
180
181 pub fn is_uds(&self) -> bool {
186 #[cfg(unix)]
187 {
188 self.inner.config.host_is_uds()
189 }
190 #[cfg(not(unix))]
191 {
192 false
193 }
194 }
195
196 pub fn begin(&self) -> Result<Transaction, DriverError> {
198 let mut guard = self.acquire()?;
199 guard.simple_query("BEGIN")?;
200 Ok(Transaction {
201 guard,
202 committed: false,
203 deferred_buf: Vec::new(),
204 deferred_count: 0,
205 })
206 }
207
208 pub fn open_count(&self) -> usize {
210 self.inner.open_count.load(Ordering::Relaxed)
211 }
212
213 pub fn max_size(&self) -> usize {
215 self.inner.max_size
216 }
217
218 pub fn status(&self) -> PoolStatus {
220 let idle = self
221 .inner
222 .stack
223 .lock()
224 .unwrap_or_else(|e| e.into_inner())
225 .len();
226 let open = self.inner.open_count.load(Ordering::Relaxed);
227 let active = open.saturating_sub(idle);
228 PoolStatus {
229 idle,
230 active,
231 open,
232 max_size: self.inner.max_size,
233 }
234 }
235
236 fn warmup_conn(&self, conn: &mut Connection) {
244 let sqls = self
245 .inner
246 .warmup_sqls
247 .lock()
248 .unwrap_or_else(|e| e.into_inner())
249 .clone();
250
251 if sqls.is_empty() {
252 return;
253 }
254
255 for sql in sqls.iter() {
256 let sql_hash = crate::types::hash_sql(sql);
257 let _ = conn.prepare_only(sql, sql_hash);
258 }
259 }
260
261 pub fn set_warmup_sqls(&self, sqls: &[&str]) {
283 let boxed: Arc<Vec<Box<str>>> =
284 Arc::new(sqls.iter().map(|s| (*s).into()).collect::<Vec<_>>());
285 *self
286 .inner
287 .warmup_sqls
288 .lock()
289 .unwrap_or_else(|e| e.into_inner()) = boxed;
290 }
291
292 pub fn close(&self) {
295 self.inner.closed.store(true, Ordering::Release);
296 let conns: Vec<Connection> = {
298 let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
299 std::mem::take(&mut *stack)
300 };
301 for conn in conns {
302 self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
303 let _ = conn.close();
304 }
305 let (_, cvar) = &self.inner.release_pair;
307 cvar.notify_all();
308 }
309
310 pub fn is_closed(&self) -> bool {
312 self.inner.closed.load(Ordering::Acquire)
313 }
314}
315
316impl Clone for Pool {
317 fn clone(&self) -> Self {
318 Pool {
319 inner: self.inner.clone(),
320 }
321 }
322}
323
324#[derive(Debug, Clone, Copy)]
328pub struct PoolStatus {
329 pub idle: usize,
331 pub active: usize,
333 pub open: usize,
335 pub max_size: usize,
337}
338
339pub struct PoolBuilder {
343 url: Option<String>,
344 max_size: usize,
345 max_lifetime: Option<Duration>,
347 acquire_timeout: Option<Duration>,
349 min_idle: usize,
351 max_stmt_cache_size: usize,
353}
354
355impl PoolBuilder {
356 fn new() -> Self {
357 Self {
358 url: None,
359 max_size: 10,
360 max_lifetime: Some(Duration::from_secs(30 * 60)), acquire_timeout: None, min_idle: 0, max_stmt_cache_size: 256, }
365 }
366
367 pub fn url(mut self, url: &str) -> Self {
369 self.url = Some(url.to_owned());
370 self
371 }
372
373 pub fn max_size(mut self, size: usize) -> Self {
377 self.max_size = size;
378 self
379 }
380
381 pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
384 self.max_lifetime = lifetime;
385 self
386 }
387
388 pub fn acquire_timeout(mut self, timeout: Option<Duration>) -> Self {
391 self.acquire_timeout = timeout;
392 self
393 }
394
395 pub fn min_idle(mut self, count: usize) -> Self {
398 self.min_idle = count;
399 self
400 }
401
402 pub fn max_stmt_cache_size(mut self, size: usize) -> Self {
406 self.max_stmt_cache_size = size;
407 self
408 }
409
410 pub fn build(self) -> Result<Pool, DriverError> {
412 let url = self
413 .url
414 .ok_or_else(|| DriverError::Pool("pool builder requires a URL".into()))?;
415
416 let config = Arc::new(Config::from_url(&url)?);
417
418 let pool = Pool {
419 inner: Arc::new(PoolInner {
420 stack: std::sync::Mutex::new(Vec::with_capacity(self.max_size)),
421 max_size: self.max_size,
422 open_count: AtomicUsize::new(0),
423 config,
424 closed: AtomicBool::new(false),
425 release_pair: (std::sync::Mutex::new(()), std::sync::Condvar::new()),
426 max_lifetime: self.max_lifetime,
427 acquire_timeout: self.acquire_timeout,
428 min_idle: self.min_idle,
429 warmup_sqls: std::sync::Mutex::new(Arc::new(Vec::new())),
430 max_stmt_cache_size: self.max_stmt_cache_size,
431 }),
432 };
433
434 if self.min_idle > 0 {
435 let inner = pool.inner.clone();
436 std::thread::spawn(move || {
437 maintain_min_idle(inner);
438 });
439 }
440
441 Ok(pool)
442 }
443}
444
445fn maintain_min_idle(inner: Arc<PoolInner>) {
447 loop {
448 if inner.closed.load(Ordering::Acquire) {
449 return;
450 }
451
452 let idle_count = inner.stack.lock().unwrap_or_else(|e| e.into_inner()).len();
453 let needed = inner.min_idle.saturating_sub(idle_count);
454
455 for _ in 0..needed {
456 if inner.closed.load(Ordering::Acquire) {
457 return;
458 }
459 let current = inner.open_count.load(Ordering::Acquire);
460 if current >= inner.max_size {
461 break;
462 }
463 if inner
464 .open_count
465 .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
466 .is_err()
467 {
468 continue;
469 }
470
471 match Connection::connect_arc(inner.config.clone()) {
472 Ok(conn) => {
473 let mut stack = inner.stack.lock().unwrap_or_else(|e| e.into_inner());
474 stack.push(conn);
475 let (_, cvar) = &inner.release_pair;
476 cvar.notify_one();
477 }
478 Err(_) => {
479 inner.open_count.fetch_sub(1, Ordering::AcqRel);
480 }
481 }
482 }
483
484 std::thread::sleep(Duration::from_secs(5));
486 }
487}
488
489pub struct PoolGuard {
496 conn: Option<Connection>,
497 pool: Arc<PoolInner>,
498 discard: bool,
500}
501
502impl PoolGuard {
503 pub fn mark_discard(&mut self) {
506 self.discard = true;
507 }
508
509 pub fn cancel(&self) -> Result<(), DriverError> {
514 let conn = self
515 .conn
516 .as_ref()
517 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?;
518 conn.cancel()
519 }
520
521 pub fn pid(&self) -> i32 {
525 self.conn.as_ref().expect("connection taken").pid()
526 }
527
528 pub fn is_idle(&self) -> bool {
530 self.conn.as_ref().expect("connection taken").is_idle()
531 }
532
533 pub fn is_in_transaction(&self) -> bool {
535 self.conn
536 .as_ref()
537 .expect("connection taken")
538 .is_in_transaction()
539 }
540
541 #[inline]
545 pub fn query(
546 &mut self,
547 sql: &str,
548 sql_hash: u64,
549 params: &[&(dyn Encode + Sync)],
550 ) -> Result<QueryResult, DriverError> {
551 self.conn
552 .as_mut()
553 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
554 .query(sql, sql_hash, params)
555 }
556
557 #[inline]
559 pub fn execute(
560 &mut self,
561 sql: &str,
562 sql_hash: u64,
563 params: &[&(dyn Encode + Sync)],
564 ) -> Result<u64, DriverError> {
565 self.conn
566 .as_mut()
567 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
568 .execute(sql, sql_hash, params)
569 }
570
571 pub fn execute_pipeline(
576 &mut self,
577 sql: &str,
578 sql_hash: u64,
579 param_sets: &[&[&(dyn Encode + Sync)]],
580 ) -> Result<Vec<u64>, DriverError> {
581 self.conn
582 .as_mut()
583 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
584 .execute_pipeline(sql, sql_hash, param_sets)
585 }
586
587 pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
589 self.conn
590 .as_mut()
591 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
592 .simple_query(sql)
593 }
594
595 pub fn simple_query_rows(&mut self, sql: &str) -> Result<Vec<SimpleRow>, DriverError> {
599 self.conn
600 .as_mut()
601 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
602 .simple_query_rows(sql)
603 }
604
605 pub fn for_each<F>(
607 &mut self,
608 sql: &str,
609 sql_hash: u64,
610 params: &[&(dyn Encode + Sync)],
611 f: F,
612 ) -> Result<(), DriverError>
613 where
614 F: FnMut(PgDataRow<'_>) -> Result<(), DriverError>,
615 {
616 self.conn
617 .as_mut()
618 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
619 .for_each(sql, sql_hash, params, f)
620 }
621
622 pub fn for_each_raw<F>(
624 &mut self,
625 sql: &str,
626 sql_hash: u64,
627 params: &[&(dyn Encode + Sync)],
628 f: F,
629 ) -> Result<(), DriverError>
630 where
631 F: FnMut(&[u8]) -> Result<(), DriverError>,
632 {
633 self.conn
634 .as_mut()
635 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
636 .for_each_raw(sql, sql_hash, params, f)
637 }
638
639 pub fn query_streaming_start(
643 &mut self,
644 sql: &str,
645 sql_hash: u64,
646 params: &[&(dyn Encode + Sync)],
647 chunk_size: i32,
648 ) -> Result<(std::sync::Arc<[crate::types::ColumnDesc]>, bool), DriverError> {
649 self.conn
650 .as_mut()
651 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
652 .query_streaming_start(sql, sql_hash, params, chunk_size)
653 }
654
655 pub fn streaming_send_execute(&mut self, chunk_size: i32) -> Result<(), DriverError> {
657 self.conn
658 .as_mut()
659 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
660 .streaming_send_execute(chunk_size)
661 }
662
663 pub fn streaming_next_chunk(
665 &mut self,
666 arena: &mut Arena,
667 all_col_offsets: &mut Vec<(usize, i32)>,
668 ) -> Result<bool, DriverError> {
669 self.conn
670 .as_mut()
671 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
672 .streaming_next_chunk(arena, all_col_offsets)
673 }
674
675 pub fn is_sync(&self) -> bool {
677 true
678 }
679
680 pub(crate) fn ensure_stmt_prepared(
684 &mut self,
685 sql: &str,
686 sql_hash: u64,
687 params: &[&(dyn Encode + Sync)],
688 ) -> Result<[u8; 18], DriverError> {
689 self.conn
690 .as_mut()
691 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
692 .ensure_stmt_prepared(sql, sql_hash, params)
693 }
694
695 pub(crate) fn write_deferred_bind_execute(
697 &self,
698 sql_hash: u64,
699 params: &[&(dyn Encode + Sync)],
700 buf: &mut Vec<u8>,
701 ) {
702 let conn = self.conn.as_ref().expect("connection taken");
703 conn.write_deferred_bind_execute(sql_hash, params, buf);
704 }
705
706 pub(crate) fn flush_deferred_pipeline(
708 &mut self,
709 buf: &mut Vec<u8>,
710 count: usize,
711 ) -> Result<Vec<u64>, DriverError> {
712 self.conn
713 .as_mut()
714 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
715 .flush_deferred_pipeline(buf, count)
716 }
717}
718
719impl Drop for PoolGuard {
720 fn drop(&mut self) {
721 if let Some(mut conn) = self.conn.take() {
722 if self.discard
729 || conn.is_in_failed_transaction()
730 || conn.is_in_transaction()
731 || conn.is_streaming()
732 || self.pool.closed.load(Ordering::Acquire)
733 {
734 self.pool.open_count.fetch_sub(1, Ordering::AcqRel);
735 return;
736 }
737
738 if conn.query_counter() & 63 == 0 {
742 conn.touch();
743 }
744
745 {
747 let mut stack = self.pool.stack.lock().unwrap_or_else(|e| e.into_inner());
748 stack.push(conn);
749 }
750
751 if self.pool.open_count.load(Ordering::Relaxed) >= self.pool.max_size {
754 let (_, cvar) = &self.pool.release_pair;
755 cvar.notify_one();
756 }
757 }
758 }
759}
760
761pub struct Transaction {
777 guard: PoolGuard,
778 committed: bool,
779 deferred_buf: Vec<u8>,
781 deferred_count: usize,
783}
784
785impl Transaction {
786 pub fn commit(mut self) -> Result<(), DriverError> {
790 if self.deferred_count > 0 {
791 self.flush_deferred()?;
792 }
793 self.guard.simple_query("COMMIT")?;
794 self.committed = true;
795 Ok(())
796 }
797
798 pub fn rollback(mut self) -> Result<(), DriverError> {
802 self.deferred_buf.clear();
803 self.deferred_count = 0;
804 self.guard.simple_query("ROLLBACK")?;
805 self.committed = true; Ok(())
807 }
808
809 pub fn query(
814 &mut self,
815 sql: &str,
816 sql_hash: u64,
817 params: &[&(dyn Encode + Sync)],
818 ) -> Result<QueryResult, DriverError> {
819 if self.deferred_count > 0 {
820 self.flush_deferred()?;
821 }
822 self.guard.query(sql, sql_hash, params)
823 }
824
825 pub fn execute(
827 &mut self,
828 sql: &str,
829 sql_hash: u64,
830 params: &[&(dyn Encode + Sync)],
831 ) -> Result<u64, DriverError> {
832 self.guard.execute(sql, sql_hash, params)
833 }
834
835 pub fn execute_pipeline(
837 &mut self,
838 sql: &str,
839 sql_hash: u64,
840 param_sets: &[&[&(dyn Encode + Sync)]],
841 ) -> Result<Vec<u64>, DriverError> {
842 self.guard.execute_pipeline(sql, sql_hash, param_sets)
843 }
844
845 pub fn for_each<F>(
849 &mut self,
850 sql: &str,
851 sql_hash: u64,
852 params: &[&(dyn Encode + Sync)],
853 f: F,
854 ) -> Result<(), DriverError>
855 where
856 F: FnMut(crate::types::PgDataRow<'_>) -> Result<(), DriverError>,
857 {
858 if self.deferred_count > 0 {
859 self.flush_deferred()?;
860 }
861 self.guard.for_each(sql, sql_hash, params, f)
862 }
863
864 pub fn for_each_raw<F>(
868 &mut self,
869 sql: &str,
870 sql_hash: u64,
871 params: &[&(dyn Encode + Sync)],
872 f: F,
873 ) -> Result<(), DriverError>
874 where
875 F: FnMut(&[u8]) -> Result<(), DriverError>,
876 {
877 if self.deferred_count > 0 {
878 self.flush_deferred()?;
879 }
880 self.guard.for_each_raw(sql, sql_hash, params, f)
881 }
882
883 pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
887 if self.deferred_count > 0 {
888 self.flush_deferred()?;
889 }
890 self.guard.simple_query(sql)
891 }
892
893 pub fn defer_execute(
922 &mut self,
923 sql: &str,
924 sql_hash: u64,
925 params: &[&(dyn Encode + Sync)],
926 ) -> Result<(), DriverError> {
927 if params.len() > i16::MAX as usize {
928 return Err(DriverError::Protocol(format!(
929 "parameter count {} exceeds maximum {}",
930 params.len(),
931 i16::MAX
932 )));
933 }
934
935 self.guard.ensure_stmt_prepared(sql, sql_hash, params)?;
937
938 self.guard
940 .write_deferred_bind_execute(sql_hash, params, &mut self.deferred_buf);
941 self.deferred_count += 1;
942 Ok(())
943 }
944
945 pub fn flush_deferred(&mut self) -> Result<Vec<u64>, DriverError> {
950 let count = self.deferred_count;
951 self.deferred_count = 0;
952 self.guard
953 .flush_deferred_pipeline(&mut self.deferred_buf, count)
954 }
955
956 pub fn deferred_count(&self) -> usize {
958 self.deferred_count
959 }
960}
961
962impl Drop for Transaction {
963 fn drop(&mut self) {
964 if !self.committed {
965 if let Some(_conn) = self.guard.conn.take() {
968 self.guard.pool.open_count.fetch_sub(1, Ordering::AcqRel);
969 }
971 }
972 }
973}
974
975#[cfg(test)]
976mod tests {
977 use super::*;
978
979 #[test]
980 fn pool_builder_requires_url() {
981 let result = PoolBuilder::new().build();
982 assert!(result.is_err());
983 }
984
985 #[test]
986 fn pool_builder_validates_url() {
987 let result = PoolBuilder::new().url("not_a_url").build();
988 assert!(result.is_err());
989 }
990
991 #[test]
992 fn pool_builder_accepts_valid_url() {
993 let pool = PoolBuilder::new()
994 .url("postgres://user:pass@localhost/db")
995 .max_size(5)
996 .build()
997 .unwrap();
998 assert_eq!(pool.max_size(), 5);
999 assert_eq!(pool.open_count(), 0);
1000 }
1001
1002 #[test]
1003 fn pool_connect_validates_url() {
1004 let result = Pool::connect("not_a_url");
1005 assert!(result.is_err());
1006 }
1007
1008 #[test]
1009 fn pool_max_size_zero() {
1010 let pool = PoolBuilder::new()
1011 .url("postgres://user:pass@localhost/db")
1012 .max_size(0)
1013 .build()
1014 .unwrap();
1015
1016 let result = pool.acquire();
1017 assert!(result.is_err());
1018 match result {
1019 Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
1020 Err(e) => panic!("expected Pool error, got: {e:?}"),
1021 Ok(_) => panic!("expected error, got Ok"),
1022 }
1023 }
1024
1025 #[test]
1026 fn pool_clone_shares_state() {
1027 let pool = PoolBuilder::new()
1028 .url("postgres://user:pass@localhost/db")
1029 .max_size(5)
1030 .build()
1031 .unwrap();
1032
1033 let pool2 = pool.clone();
1034 assert_eq!(pool.max_size(), pool2.max_size());
1035 }
1036
1037 #[test]
1041 fn pool_builder_max_lifetime() {
1042 let pool = PoolBuilder::new()
1043 .url("postgres://user:pass@localhost/db")
1044 .max_lifetime(Some(Duration::from_secs(60)))
1045 .build()
1046 .unwrap();
1047 assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(60)));
1048 }
1049
1050 #[test]
1052 fn pool_builder_max_lifetime_none() {
1053 let pool = PoolBuilder::new()
1054 .url("postgres://user:pass@localhost/db")
1055 .max_lifetime(None)
1056 .build()
1057 .unwrap();
1058 assert_eq!(pool.inner.max_lifetime, None);
1059 }
1060
1061 #[test]
1063 fn pool_builder_acquire_timeout_none() {
1064 let pool = PoolBuilder::new()
1065 .url("postgres://user:pass@localhost/db")
1066 .acquire_timeout(None)
1067 .build()
1068 .unwrap();
1069 assert_eq!(pool.inner.acquire_timeout, None);
1070 }
1071
1072 #[test]
1074 fn pool_builder_acquire_timeout_custom() {
1075 let pool = PoolBuilder::new()
1076 .url("postgres://user:pass@localhost/db")
1077 .acquire_timeout(Some(Duration::from_secs(10)))
1078 .build()
1079 .unwrap();
1080 assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(10)));
1081 }
1082
1083 #[test]
1085 fn pool_builder_min_idle() {
1086 let pool = PoolBuilder::new()
1087 .url("postgres://user:pass@localhost/db")
1088 .min_idle(2)
1089 .build()
1090 .unwrap();
1091 assert_eq!(pool.inner.min_idle, 2);
1092 }
1093
1094 #[test]
1096 fn pool_close_marks_closed() {
1097 let pool = PoolBuilder::new()
1098 .url("postgres://user:pass@localhost/db")
1099 .max_size(5)
1100 .build()
1101 .unwrap();
1102
1103 assert!(!pool.is_closed());
1104 pool.close();
1105 assert!(pool.is_closed());
1106
1107 let result = pool.acquire();
1109 assert!(result.is_err());
1110 match result {
1111 Err(DriverError::Pool(msg)) => assert!(msg.contains("closed")),
1112 Err(e) => panic!("expected Pool(closed) error, got: {e:?}"),
1113 Ok(_) => panic!("expected error, got Ok"),
1114 }
1115 }
1116
1117 #[test]
1119 fn pool_status_initial() {
1120 let pool = PoolBuilder::new()
1121 .url("postgres://user:pass@localhost/db")
1122 .max_size(10)
1123 .build()
1124 .unwrap();
1125
1126 let status = pool.status();
1127 assert_eq!(status.idle, 0);
1128 assert_eq!(status.active, 0);
1129 assert_eq!(status.open, 0);
1130 assert_eq!(status.max_size, 10);
1131 }
1132
1133 #[test]
1135 fn pool_builder_defaults() {
1136 let pool = PoolBuilder::new()
1137 .url("postgres://user:pass@localhost/db")
1138 .build()
1139 .unwrap();
1140
1141 assert_eq!(pool.max_size(), 10);
1142 assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(30 * 60)));
1143 assert_eq!(pool.inner.acquire_timeout, None); assert_eq!(pool.inner.min_idle, 0);
1145 }
1146
1147 #[test]
1149 fn pool_open_count_initial() {
1150 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1151 assert_eq!(pool.open_count(), 0);
1152 }
1153
1154 #[test]
1157 fn pool_builder_max_stmt_cache_size_default() {
1158 let pool = PoolBuilder::new()
1159 .url("postgres://user:pass@localhost/db")
1160 .build()
1161 .unwrap();
1162 assert_eq!(pool.inner.max_stmt_cache_size, 256);
1163 }
1164
1165 #[test]
1166 fn pool_builder_max_stmt_cache_size_custom() {
1167 let pool = PoolBuilder::new()
1168 .url("postgres://user:pass@localhost/db")
1169 .max_stmt_cache_size(512)
1170 .build()
1171 .unwrap();
1172 assert_eq!(pool.inner.max_stmt_cache_size, 512);
1173 }
1174
1175 #[test]
1178 fn pool_is_uds_false_for_tcp() {
1179 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1180 assert!(!pool.is_uds());
1181 }
1182
1183 #[cfg(unix)]
1184 #[test]
1185 fn pool_is_uds_true_for_unix_socket() {
1186 let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1187 assert!(pool.is_uds());
1188 }
1189
1190 #[cfg(unix)]
1191 #[test]
1192 fn pool_is_uds_true_for_var_run_socket() {
1193 let pool = Pool::connect("postgres://user@localhost/db?host=/var/run/postgresql").unwrap();
1194 assert!(pool.is_uds());
1195 }
1196
1197 #[test]
1198 fn pool_is_uds_false_for_ip_address() {
1199 let pool = Pool::connect("postgres://user:pass@127.0.0.1/db").unwrap();
1200 assert!(!pool.is_uds());
1201 }
1202
1203 #[cfg(unix)]
1204 #[test]
1205 fn pool_slot_sync_created_for_uds_config() {
1206 let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1207 assert!(config.host_is_uds());
1208 }
1209
1210 #[test]
1211 fn pool_slot_tcp_config() {
1212 let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1213 assert!(!config.host_is_uds());
1214 }
1215
1216 #[test]
1221 fn pool_is_uds_false_for_hostname() {
1222 let pool = Pool::connect("postgres://user:pass@db.example.com/db").unwrap();
1223 assert!(!pool.is_uds());
1224 }
1225
1226 #[cfg(unix)]
1227 #[test]
1228 fn pool_is_uds_true_for_tmp() {
1229 let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1230 assert!(pool.is_uds());
1231 }
1232
1233 #[test]
1238 fn pool_close_then_acquire_fails() {
1239 let pool = PoolBuilder::new()
1240 .url("postgres://user:pass@localhost/db")
1241 .max_size(5)
1242 .build()
1243 .unwrap();
1244 pool.close();
1245 let result = pool.acquire();
1246 assert!(result.is_err());
1247 match result {
1248 Err(DriverError::Pool(msg)) => {
1249 assert!(msg.contains("closed"), "should say closed: {msg}")
1250 }
1251 Err(e) => panic!("expected Pool error, got: {e:?}"),
1252 Ok(_) => panic!("expected error"),
1253 }
1254 }
1255
1256 #[test]
1257 fn pool_is_closed_before_and_after() {
1258 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1259 assert!(!pool.is_closed());
1260 pool.close();
1261 assert!(pool.is_closed());
1262 }
1263
1264 #[test]
1269 fn pool_exhausted_no_timeout() {
1270 let pool = PoolBuilder::new()
1271 .url("postgres://user:pass@localhost/db")
1272 .max_size(0)
1273 .acquire_timeout(None) .build()
1275 .unwrap();
1276 let result = pool.acquire();
1277 assert!(result.is_err());
1278 match result {
1279 Err(DriverError::Pool(msg)) => {
1280 assert!(msg.contains("exhausted"), "should say exhausted: {msg}")
1281 }
1282 Err(e) => panic!("expected Pool error, got: {e:?}"),
1283 Ok(_) => panic!("expected error"),
1284 }
1285 }
1286
1287 #[test]
1292 fn pool_builder_no_url_error() {
1293 let result = PoolBuilder::new().max_size(5).build();
1294 assert!(result.is_err());
1295 match result {
1296 Err(DriverError::Pool(msg)) => {
1297 assert!(msg.contains("URL"), "should mention URL: {msg}")
1298 }
1299 Err(e) => panic!("expected Pool error, got: {e:?}"),
1300 Ok(_) => panic!("expected error"),
1301 }
1302 }
1303
1304 #[test]
1305 fn pool_builder_invalid_url_error() {
1306 let result = PoolBuilder::new().url("ftp://something").build();
1307 assert!(result.is_err());
1308 }
1309
1310 #[test]
1311 fn pool_builder_stmt_cache_size_zero() {
1312 let pool = PoolBuilder::new()
1313 .url("postgres://user:pass@localhost/db")
1314 .max_stmt_cache_size(0)
1315 .build()
1316 .unwrap();
1317 assert_eq!(pool.inner.max_stmt_cache_size, 0);
1318 }
1319
1320 #[test]
1325 fn pool_status_reflects_max_size() {
1326 let pool = PoolBuilder::new()
1327 .url("postgres://user:pass@localhost/db")
1328 .max_size(20)
1329 .build()
1330 .unwrap();
1331 let status = pool.status();
1332 assert_eq!(status.max_size, 20);
1333 assert_eq!(status.idle, 0);
1334 assert_eq!(status.active, 0);
1335 assert_eq!(status.open, 0);
1336 }
1337
1338 #[test]
1343 fn pool_clone_shares_config() {
1344 let pool = PoolBuilder::new()
1345 .url("postgres://user:pass@localhost/db")
1346 .max_size(7)
1347 .build()
1348 .unwrap();
1349 let p2 = pool.clone();
1350 assert_eq!(pool.max_size(), 7);
1351 assert_eq!(p2.max_size(), 7);
1352 assert_eq!(pool.open_count(), p2.open_count());
1353 }
1354
1355 #[test]
1360 fn pool_set_warmup_sqls_empty() {
1361 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1362 pool.set_warmup_sqls(&[]);
1363 let sqls = pool
1364 .inner
1365 .warmup_sqls
1366 .lock()
1367 .unwrap_or_else(|e| e.into_inner())
1368 .clone();
1369 assert!(sqls.is_empty());
1370 }
1371
1372 #[test]
1373 fn pool_set_warmup_sqls_multiple() {
1374 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1375 pool.set_warmup_sqls(&["SELECT 1", "SELECT 2", "SELECT 3"]);
1376 let sqls = pool
1377 .inner
1378 .warmup_sqls
1379 .lock()
1380 .unwrap_or_else(|e| e.into_inner())
1381 .clone();
1382 assert_eq!(sqls.len(), 3);
1383 assert_eq!(&*sqls[0], "SELECT 1");
1384 assert_eq!(&*sqls[1], "SELECT 2");
1385 assert_eq!(&*sqls[2], "SELECT 3");
1386 }
1387
1388 #[test]
1389 fn pool_set_warmup_sqls_overwrite() {
1390 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1391 pool.set_warmup_sqls(&["SELECT 1"]);
1392 pool.set_warmup_sqls(&["SELECT 99"]);
1393 let sqls = pool
1394 .inner
1395 .warmup_sqls
1396 .lock()
1397 .unwrap_or_else(|e| e.into_inner())
1398 .clone();
1399 assert_eq!(sqls.len(), 1);
1400 assert_eq!(&*sqls[0], "SELECT 99");
1401 }
1402
1403 #[test]
1408 fn pool_status_debug() {
1409 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1410 let status = pool.status();
1411 let dbg = format!("{status:?}");
1412 assert!(dbg.contains("PoolStatus"));
1413 assert!(dbg.contains("idle"));
1414 assert!(dbg.contains("active"));
1415 assert!(dbg.contains("open"));
1416 assert!(dbg.contains("max_size"));
1417 }
1418
1419 #[test]
1424 fn config_host_is_uds_returns_true_for_slash() {
1425 let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1426 assert!(config.host_is_uds());
1427 }
1428
1429 #[test]
1430 fn config_host_is_uds_returns_false_for_tcp() {
1431 let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1432 assert!(!config.host_is_uds());
1433 }
1434
1435 #[test]
1436 fn config_host_is_uds_returns_false_for_ip() {
1437 let config = Config::from_url("postgres://user:pass@192.168.1.1/db").unwrap();
1438 assert!(!config.host_is_uds());
1439 }
1440
1441 #[test]
1446 fn pool_builder_full_chain() {
1447 let pool = PoolBuilder::new()
1448 .url("postgres://user:pass@localhost/db")
1449 .max_size(3)
1450 .max_lifetime(Some(Duration::from_secs(600)))
1451 .acquire_timeout(Some(Duration::from_secs(5)))
1452 .min_idle(1)
1453 .max_stmt_cache_size(128)
1454 .build()
1455 .unwrap();
1456 assert_eq!(pool.max_size(), 3);
1457 assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(600)));
1458 assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(5)));
1459 assert_eq!(pool.inner.min_idle, 1);
1460 assert_eq!(pool.inner.max_stmt_cache_size, 128);
1461 }
1462
1463 #[test]
1466 fn pool_max_size_zero_rejects_all_acquires() {
1467 let pool = PoolBuilder::new()
1468 .url("postgres://user:pass@localhost/db")
1469 .max_size(0)
1470 .build()
1471 .unwrap();
1472 let result = pool.acquire();
1473 assert!(result.is_err());
1474 match &result {
1475 Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
1476 _ => panic!("expected pool exhausted error"),
1477 }
1478 }
1479
1480 #[test]
1483 fn url_parse_unknown_sslmode_returns_error() {
1484 let result = Config::from_url("postgres://u:p@h/d?sslmode=bogus");
1485 assert!(result.is_err());
1486 let msg = format!("{}", result.unwrap_err());
1487 assert!(msg.contains("unknown sslmode"));
1488 }
1489
1490 #[test]
1491 fn url_parse_invalid_port_returns_error() {
1492 let result = Config::from_url("postgres://u:p@h:abc/d");
1493 assert!(result.is_err());
1494 let msg = format!("{}", result.unwrap_err());
1495 assert!(msg.contains("invalid port"));
1496 }
1497
1498 #[test]
1499 fn url_parse_missing_at_sign_returns_error() {
1500 let result = Config::from_url("postgres://u:plocalhost/d");
1501 assert!(result.is_err());
1502 let msg = format!("{}", result.unwrap_err());
1503 assert!(msg.contains("missing @"));
1504 }
1505
1506 #[test]
1507 fn url_parse_empty_host_returns_error() {
1508 let result = Config::from_url("postgres://u:p@/d");
1509 assert!(result.is_err());
1510 }
1511
1512 #[test]
1513 fn url_parse_empty_user_returns_error() {
1514 let result = Config::from_url("postgres://:p@h/d");
1515 assert!(result.is_err());
1516 }
1517
1518 #[test]
1519 fn url_parse_statement_timeout_invalid_uses_default() {
1520 let config = Config::from_url("postgres://u:p@h/d?statement_timeout=notnum").unwrap();
1521 assert_eq!(config.statement_timeout_secs, 30);
1522 }
1523
1524 #[test]
1525 fn url_parse_malformed_percent_encoding() {
1526 let result = Config::from_url("postgres://u%:p@h/d");
1527 assert!(result.is_err());
1528 }
1529
1530 #[test]
1531 fn url_parse_invalid_hex_in_percent_encoding() {
1532 let result = Config::from_url("postgres://u%ZZ:p@h/d");
1533 assert!(result.is_err());
1534 }
1535}