1use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::time::Duration;
14
15use crate::DriverError;
16use crate::arena::Arena;
17use crate::codec::Encode;
18use crate::conn::Connection;
19use crate::types::{Config, PgDataRow, QueryResult, SimpleRow};
20
21pub struct Pool {
37 inner: Arc<PoolInner>,
38}
39
40struct PoolInner {
41 stack: std::sync::Mutex<Vec<Connection>>,
45 max_size: usize,
46 open_count: AtomicUsize,
47 config: Arc<Config>,
48 closed: AtomicBool,
50 release_pair: (std::sync::Mutex<()>, std::sync::Condvar),
53 max_lifetime: Option<Duration>,
56 acquire_timeout: Option<Duration>,
58 min_idle: usize,
60 warmup_sqls: std::sync::Mutex<Arc<Vec<Box<str>>>>,
62 max_stmt_cache_size: usize,
64}
65
66impl Pool {
67 pub fn connect(url: &str) -> Result<Self, DriverError> {
71 PoolBuilder::new().url(url).build()
72 }
73
74 pub fn builder() -> PoolBuilder {
76 PoolBuilder::new()
77 }
78
79 #[inline]
87 pub fn acquire(&self) -> Result<PoolGuard, DriverError> {
88 if self.inner.closed.load(Ordering::Acquire) {
89 return Err(DriverError::Pool("pool is closed".into()));
90 }
91
92 if let Some(guard) = self.try_pop_idle()? {
94 return Ok(guard);
95 }
96
97 loop {
99 let current = self.inner.open_count.load(Ordering::Acquire);
100 if current >= self.inner.max_size {
101 if let Some(timeout) = self.inner.acquire_timeout {
102 let (lock, cvar) = &self.inner.release_pair;
103 let guard = lock.lock().unwrap_or_else(|e| e.into_inner());
104 let (_guard, result) = cvar
105 .wait_timeout(guard, timeout)
106 .unwrap_or_else(|e| e.into_inner());
107 if result.timed_out() {
108 return Err(DriverError::Pool(
109 "pool exhausted: acquire timeout expired".into(),
110 ));
111 }
112 if let Some(guard) = self.try_pop_idle()? {
114 return Ok(guard);
115 }
116 continue;
118 }
119 return Err(DriverError::Pool(
120 "pool exhausted: all connections in use".into(),
121 ));
122 }
123 if self
124 .inner
125 .open_count
126 .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
127 .is_ok()
128 {
129 break;
130 }
131 }
133
134 let conn_result = Connection::connect_arc(self.inner.config.clone());
136 match conn_result {
137 Ok(mut conn) => {
138 conn.set_max_stmt_cache_size(self.inner.max_stmt_cache_size);
140 self.warmup_conn(&mut conn);
142
143 Ok(PoolGuard {
144 conn: Some(conn),
145 pool: self.inner.clone(),
146 discard: false,
147 })
148 }
149 Err(e) => {
150 self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
152 Err(e)
153 }
154 }
155 }
156
157 #[inline]
159 fn try_pop_idle(&self) -> Result<Option<PoolGuard>, DriverError> {
160 let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
161 while let Some(conn) = stack.pop() {
162 if let Some(max_lifetime) = self.inner.max_lifetime {
163 if conn.created_at().elapsed() >= max_lifetime {
164 self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
165 continue;
166 }
167 }
168 if conn.idle_duration() < Duration::from_secs(30) {
169 return Ok(Some(PoolGuard {
170 conn: Some(conn),
171 pool: self.inner.clone(),
172 discard: false,
173 }));
174 }
175 self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
177 }
178 Ok(None)
179 }
180
181 pub fn is_uds(&self) -> bool {
186 #[cfg(unix)]
187 {
188 self.inner.config.host_is_uds()
189 }
190 #[cfg(not(unix))]
191 {
192 false
193 }
194 }
195
196 pub fn begin(&self) -> Result<Transaction, DriverError> {
198 let mut guard = self.acquire()?;
199 guard.simple_query("BEGIN")?;
200 Ok(Transaction {
201 guard,
202 committed: false,
203 deferred_buf: Vec::new(),
204 deferred_count: 0,
205 })
206 }
207
208 pub fn open_count(&self) -> usize {
210 self.inner.open_count.load(Ordering::Relaxed)
211 }
212
213 pub fn max_size(&self) -> usize {
215 self.inner.max_size
216 }
217
218 pub fn status(&self) -> PoolStatus {
220 let idle = self
221 .inner
222 .stack
223 .lock()
224 .unwrap_or_else(|e| e.into_inner())
225 .len();
226 let open = self.inner.open_count.load(Ordering::Relaxed);
227 let active = open.saturating_sub(idle);
228 PoolStatus {
229 idle,
230 active,
231 open,
232 max_size: self.inner.max_size,
233 }
234 }
235
236 fn warmup_conn(&self, conn: &mut Connection) {
244 let sqls = self
245 .inner
246 .warmup_sqls
247 .lock()
248 .unwrap_or_else(|e| e.into_inner())
249 .clone();
250
251 if sqls.is_empty() {
252 return;
253 }
254
255 for sql in sqls.iter() {
256 let sql_hash = crate::types::hash_sql(sql);
257 let _ = conn.prepare_only(sql, sql_hash);
258 }
259 }
260
261 pub fn set_warmup_sqls(&self, sqls: &[&str]) {
283 let boxed: Arc<Vec<Box<str>>> =
284 Arc::new(sqls.iter().map(|s| (*s).into()).collect::<Vec<_>>());
285 *self
286 .inner
287 .warmup_sqls
288 .lock()
289 .unwrap_or_else(|e| e.into_inner()) = boxed;
290 }
291
292 pub fn close(&self) {
295 self.inner.closed.store(true, Ordering::Release);
296 let conns: Vec<Connection> = {
298 let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
299 std::mem::take(&mut *stack)
300 };
301 for conn in conns {
302 self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
303 let _ = conn.close();
304 }
305 let (_, cvar) = &self.inner.release_pair;
307 cvar.notify_all();
308 }
309
310 pub fn is_closed(&self) -> bool {
312 self.inner.closed.load(Ordering::Acquire)
313 }
314}
315
316impl Clone for Pool {
317 fn clone(&self) -> Self {
318 Pool {
319 inner: self.inner.clone(),
320 }
321 }
322}
323
324#[derive(Debug, Clone, Copy)]
328pub struct PoolStatus {
329 pub idle: usize,
331 pub active: usize,
333 pub open: usize,
335 pub max_size: usize,
337}
338
339pub struct PoolBuilder {
343 url: Option<String>,
344 max_size: usize,
345 max_lifetime: Option<Duration>,
347 acquire_timeout: Option<Duration>,
349 min_idle: usize,
351 max_stmt_cache_size: usize,
353}
354
355impl PoolBuilder {
356 fn new() -> Self {
357 Self {
358 url: None,
359 max_size: 10,
360 max_lifetime: Some(Duration::from_secs(30 * 60)), acquire_timeout: None, min_idle: 0, max_stmt_cache_size: 256, }
365 }
366
367 pub fn url(mut self, url: &str) -> Self {
369 self.url = Some(url.to_owned());
370 self
371 }
372
373 pub fn max_size(mut self, size: usize) -> Self {
377 self.max_size = size;
378 self
379 }
380
381 pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
384 self.max_lifetime = lifetime;
385 self
386 }
387
388 pub fn acquire_timeout(mut self, timeout: Option<Duration>) -> Self {
391 self.acquire_timeout = timeout;
392 self
393 }
394
395 pub fn min_idle(mut self, count: usize) -> Self {
398 self.min_idle = count;
399 self
400 }
401
402 pub fn max_stmt_cache_size(mut self, size: usize) -> Self {
406 self.max_stmt_cache_size = size;
407 self
408 }
409
410 pub fn build(self) -> Result<Pool, DriverError> {
412 let url = self
413 .url
414 .ok_or_else(|| DriverError::Pool("pool builder requires a URL".into()))?;
415
416 let config = Arc::new(Config::from_url(&url)?);
417
418 let pool = Pool {
419 inner: Arc::new(PoolInner {
420 stack: std::sync::Mutex::new(Vec::with_capacity(self.max_size)),
421 max_size: self.max_size,
422 open_count: AtomicUsize::new(0),
423 config,
424 closed: AtomicBool::new(false),
425 release_pair: (std::sync::Mutex::new(()), std::sync::Condvar::new()),
426 max_lifetime: self.max_lifetime,
427 acquire_timeout: self.acquire_timeout,
428 min_idle: self.min_idle,
429 warmup_sqls: std::sync::Mutex::new(Arc::new(Vec::new())),
430 max_stmt_cache_size: self.max_stmt_cache_size,
431 }),
432 };
433
434 if self.min_idle > 0 {
435 let inner = pool.inner.clone();
436 std::thread::spawn(move || {
437 maintain_min_idle(inner);
438 });
439 }
440
441 Ok(pool)
442 }
443}
444
445fn maintain_min_idle(inner: Arc<PoolInner>) {
447 loop {
448 if inner.closed.load(Ordering::Acquire) {
449 return;
450 }
451
452 let idle_count = inner.stack.lock().unwrap_or_else(|e| e.into_inner()).len();
453 let needed = inner.min_idle.saturating_sub(idle_count);
454
455 for _ in 0..needed {
456 if inner.closed.load(Ordering::Acquire) {
457 return;
458 }
459 let current = inner.open_count.load(Ordering::Acquire);
460 if current >= inner.max_size {
461 break;
462 }
463 if inner
464 .open_count
465 .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
466 .is_err()
467 {
468 continue;
469 }
470
471 match Connection::connect_arc(inner.config.clone()) {
472 Ok(conn) => {
473 let mut stack = inner.stack.lock().unwrap_or_else(|e| e.into_inner());
474 stack.push(conn);
475 let (_, cvar) = &inner.release_pair;
476 cvar.notify_one();
477 }
478 Err(_) => {
479 inner.open_count.fetch_sub(1, Ordering::AcqRel);
480 }
481 }
482 }
483
484 std::thread::sleep(Duration::from_secs(5));
486 }
487}
488
489pub struct PoolGuard {
496 conn: Option<Connection>,
497 pool: Arc<PoolInner>,
498 discard: bool,
500}
501
502impl PoolGuard {
503 pub fn mark_discard(&mut self) {
506 self.discard = true;
507 }
508
509 pub fn cancel(&self) -> Result<(), DriverError> {
514 let conn = self
515 .conn
516 .as_ref()
517 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?;
518 conn.cancel()
519 }
520
521 pub fn pid(&self) -> i32 {
525 self.conn.as_ref().expect("connection taken").pid()
526 }
527
528 pub fn is_idle(&self) -> bool {
530 self.conn.as_ref().expect("connection taken").is_idle()
531 }
532
533 pub fn is_in_transaction(&self) -> bool {
535 self.conn
536 .as_ref()
537 .expect("connection taken")
538 .is_in_transaction()
539 }
540
541 #[inline]
545 pub fn query(
546 &mut self,
547 sql: &str,
548 sql_hash: u64,
549 params: &[&(dyn Encode + Sync)],
550 ) -> Result<QueryResult, DriverError> {
551 self.conn
552 .as_mut()
553 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
554 .query(sql, sql_hash, params)
555 }
556
557 #[inline]
559 pub fn execute(
560 &mut self,
561 sql: &str,
562 sql_hash: u64,
563 params: &[&(dyn Encode + Sync)],
564 ) -> Result<u64, DriverError> {
565 self.conn
566 .as_mut()
567 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
568 .execute(sql, sql_hash, params)
569 }
570
571 pub fn execute_pipeline(
576 &mut self,
577 sql: &str,
578 sql_hash: u64,
579 param_sets: &[&[&(dyn Encode + Sync)]],
580 ) -> Result<Vec<u64>, DriverError> {
581 self.conn
582 .as_mut()
583 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
584 .execute_pipeline(sql, sql_hash, param_sets)
585 }
586
587 pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
589 self.conn
590 .as_mut()
591 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
592 .simple_query(sql)
593 }
594
595 pub fn simple_query_rows(&mut self, sql: &str) -> Result<Vec<SimpleRow>, DriverError> {
599 self.conn
600 .as_mut()
601 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
602 .simple_query_rows(sql)
603 }
604
605 pub fn for_each<F>(
607 &mut self,
608 sql: &str,
609 sql_hash: u64,
610 params: &[&(dyn Encode + Sync)],
611 f: F,
612 ) -> Result<(), DriverError>
613 where
614 F: FnMut(PgDataRow<'_>) -> Result<(), DriverError>,
615 {
616 self.conn
617 .as_mut()
618 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
619 .for_each(sql, sql_hash, params, f)
620 }
621
622 pub fn for_each_raw<F>(
624 &mut self,
625 sql: &str,
626 sql_hash: u64,
627 params: &[&(dyn Encode + Sync)],
628 f: F,
629 ) -> Result<(), DriverError>
630 where
631 F: FnMut(&[u8]) -> Result<(), DriverError>,
632 {
633 self.conn
634 .as_mut()
635 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
636 .for_each_raw(sql, sql_hash, params, f)
637 }
638
639 pub fn query_streaming_start(
643 &mut self,
644 sql: &str,
645 sql_hash: u64,
646 params: &[&(dyn Encode + Sync)],
647 chunk_size: i32,
648 ) -> Result<(std::sync::Arc<[crate::types::ColumnDesc]>, bool), DriverError> {
649 self.conn
650 .as_mut()
651 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
652 .query_streaming_start(sql, sql_hash, params, chunk_size)
653 }
654
655 pub fn streaming_send_execute(&mut self, chunk_size: i32) -> Result<(), DriverError> {
657 self.conn
658 .as_mut()
659 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
660 .streaming_send_execute(chunk_size)
661 }
662
663 pub fn streaming_next_chunk(
665 &mut self,
666 arena: &mut Arena,
667 all_col_offsets: &mut Vec<(usize, i32)>,
668 ) -> Result<bool, DriverError> {
669 self.conn
670 .as_mut()
671 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
672 .streaming_next_chunk(arena, all_col_offsets)
673 }
674
675 pub fn copy_in<'a, I>(
681 &mut self,
682 table: &str,
683 columns: &[&str],
684 rows: I,
685 ) -> Result<u64, DriverError>
686 where
687 I: IntoIterator<Item = &'a str>,
688 {
689 self.conn
690 .as_mut()
691 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
692 .copy_in(table, columns, rows)
693 }
694
695 pub fn copy_out<W: std::io::Write>(
699 &mut self,
700 query: &str,
701 writer: &mut W,
702 ) -> Result<u64, DriverError> {
703 self.conn
704 .as_mut()
705 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
706 .copy_out(query, writer)
707 }
708
709 pub fn is_sync(&self) -> bool {
711 true
712 }
713
714 pub(crate) fn ensure_stmt_prepared(
718 &mut self,
719 sql: &str,
720 sql_hash: u64,
721 params: &[&(dyn Encode + Sync)],
722 ) -> Result<[u8; 18], DriverError> {
723 self.conn
724 .as_mut()
725 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
726 .ensure_stmt_prepared(sql, sql_hash, params)
727 }
728
729 pub(crate) fn write_deferred_bind_execute(
731 &self,
732 sql_hash: u64,
733 params: &[&(dyn Encode + Sync)],
734 buf: &mut Vec<u8>,
735 ) {
736 let conn = self.conn.as_ref().expect("connection taken");
737 conn.write_deferred_bind_execute(sql_hash, params, buf);
738 }
739
740 pub(crate) fn flush_deferred_pipeline(
742 &mut self,
743 buf: &mut Vec<u8>,
744 count: usize,
745 ) -> Result<Vec<u64>, DriverError> {
746 self.conn
747 .as_mut()
748 .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
749 .flush_deferred_pipeline(buf, count)
750 }
751}
752
753impl Drop for PoolGuard {
754 fn drop(&mut self) {
755 if let Some(mut conn) = self.conn.take() {
756 if self.discard
763 || conn.is_in_failed_transaction()
764 || conn.is_in_transaction()
765 || conn.is_streaming()
766 || self.pool.closed.load(Ordering::Acquire)
767 {
768 self.pool.open_count.fetch_sub(1, Ordering::AcqRel);
769 return;
770 }
771
772 if conn.query_counter() & 63 == 0 {
776 conn.touch();
777 }
778
779 {
781 let mut stack = self.pool.stack.lock().unwrap_or_else(|e| e.into_inner());
782 stack.push(conn);
783 }
784
785 if self.pool.open_count.load(Ordering::Relaxed) >= self.pool.max_size {
788 let (_, cvar) = &self.pool.release_pair;
789 cvar.notify_one();
790 }
791 }
792 }
793}
794
795pub struct Transaction {
811 guard: PoolGuard,
812 committed: bool,
813 deferred_buf: Vec<u8>,
815 deferred_count: usize,
817}
818
819impl Transaction {
820 pub fn commit(mut self) -> Result<(), DriverError> {
824 if self.deferred_count > 0 {
825 self.flush_deferred()?;
826 }
827 self.guard.simple_query("COMMIT")?;
828 self.committed = true;
829 Ok(())
830 }
831
832 pub fn rollback(mut self) -> Result<(), DriverError> {
836 self.deferred_buf.clear();
837 self.deferred_count = 0;
838 self.guard.simple_query("ROLLBACK")?;
839 self.committed = true; Ok(())
841 }
842
843 pub fn query(
848 &mut self,
849 sql: &str,
850 sql_hash: u64,
851 params: &[&(dyn Encode + Sync)],
852 ) -> Result<QueryResult, DriverError> {
853 if self.deferred_count > 0 {
854 self.flush_deferred()?;
855 }
856 self.guard.query(sql, sql_hash, params)
857 }
858
859 pub fn execute(
861 &mut self,
862 sql: &str,
863 sql_hash: u64,
864 params: &[&(dyn Encode + Sync)],
865 ) -> Result<u64, DriverError> {
866 self.guard.execute(sql, sql_hash, params)
867 }
868
869 pub fn execute_pipeline(
871 &mut self,
872 sql: &str,
873 sql_hash: u64,
874 param_sets: &[&[&(dyn Encode + Sync)]],
875 ) -> Result<Vec<u64>, DriverError> {
876 self.guard.execute_pipeline(sql, sql_hash, param_sets)
877 }
878
879 pub fn for_each<F>(
883 &mut self,
884 sql: &str,
885 sql_hash: u64,
886 params: &[&(dyn Encode + Sync)],
887 f: F,
888 ) -> Result<(), DriverError>
889 where
890 F: FnMut(crate::types::PgDataRow<'_>) -> Result<(), DriverError>,
891 {
892 if self.deferred_count > 0 {
893 self.flush_deferred()?;
894 }
895 self.guard.for_each(sql, sql_hash, params, f)
896 }
897
898 pub fn for_each_raw<F>(
902 &mut self,
903 sql: &str,
904 sql_hash: u64,
905 params: &[&(dyn Encode + Sync)],
906 f: F,
907 ) -> Result<(), DriverError>
908 where
909 F: FnMut(&[u8]) -> Result<(), DriverError>,
910 {
911 if self.deferred_count > 0 {
912 self.flush_deferred()?;
913 }
914 self.guard.for_each_raw(sql, sql_hash, params, f)
915 }
916
917 pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
921 if self.deferred_count > 0 {
922 self.flush_deferred()?;
923 }
924 self.guard.simple_query(sql)
925 }
926
927 pub fn defer_execute(
956 &mut self,
957 sql: &str,
958 sql_hash: u64,
959 params: &[&(dyn Encode + Sync)],
960 ) -> Result<(), DriverError> {
961 if params.len() > i16::MAX as usize {
962 return Err(DriverError::Protocol(format!(
963 "parameter count {} exceeds maximum {}",
964 params.len(),
965 i16::MAX
966 )));
967 }
968
969 self.guard.ensure_stmt_prepared(sql, sql_hash, params)?;
971
972 self.guard
974 .write_deferred_bind_execute(sql_hash, params, &mut self.deferred_buf);
975 self.deferred_count += 1;
976 Ok(())
977 }
978
979 pub fn flush_deferred(&mut self) -> Result<Vec<u64>, DriverError> {
984 let count = self.deferred_count;
985 self.deferred_count = 0;
986 self.guard
987 .flush_deferred_pipeline(&mut self.deferred_buf, count)
988 }
989
990 pub fn deferred_count(&self) -> usize {
992 self.deferred_count
993 }
994}
995
996impl Drop for Transaction {
997 fn drop(&mut self) {
998 if !self.committed {
999 if let Some(_conn) = self.guard.conn.take() {
1002 self.guard.pool.open_count.fetch_sub(1, Ordering::AcqRel);
1003 }
1005 }
1006 }
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011 use super::*;
1012
1013 #[test]
1014 fn pool_builder_requires_url() {
1015 let result = PoolBuilder::new().build();
1016 assert!(result.is_err());
1017 }
1018
1019 #[test]
1020 fn pool_builder_validates_url() {
1021 let result = PoolBuilder::new().url("not_a_url").build();
1022 assert!(result.is_err());
1023 }
1024
1025 #[test]
1026 fn pool_builder_accepts_valid_url() {
1027 let pool = PoolBuilder::new()
1028 .url("postgres://user:pass@localhost/db")
1029 .max_size(5)
1030 .build()
1031 .unwrap();
1032 assert_eq!(pool.max_size(), 5);
1033 assert_eq!(pool.open_count(), 0);
1034 }
1035
1036 #[test]
1037 fn pool_connect_validates_url() {
1038 let result = Pool::connect("not_a_url");
1039 assert!(result.is_err());
1040 }
1041
1042 #[test]
1043 fn pool_max_size_zero() {
1044 let pool = PoolBuilder::new()
1045 .url("postgres://user:pass@localhost/db")
1046 .max_size(0)
1047 .build()
1048 .unwrap();
1049
1050 let result = pool.acquire();
1051 assert!(result.is_err());
1052 match result {
1053 Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
1054 Err(e) => panic!("expected Pool error, got: {e:?}"),
1055 Ok(_) => panic!("expected error, got Ok"),
1056 }
1057 }
1058
1059 #[test]
1060 fn pool_clone_shares_state() {
1061 let pool = PoolBuilder::new()
1062 .url("postgres://user:pass@localhost/db")
1063 .max_size(5)
1064 .build()
1065 .unwrap();
1066
1067 let pool2 = pool.clone();
1068 assert_eq!(pool.max_size(), pool2.max_size());
1069 }
1070
1071 #[test]
1075 fn pool_builder_max_lifetime() {
1076 let pool = PoolBuilder::new()
1077 .url("postgres://user:pass@localhost/db")
1078 .max_lifetime(Some(Duration::from_secs(60)))
1079 .build()
1080 .unwrap();
1081 assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(60)));
1082 }
1083
1084 #[test]
1086 fn pool_builder_max_lifetime_none() {
1087 let pool = PoolBuilder::new()
1088 .url("postgres://user:pass@localhost/db")
1089 .max_lifetime(None)
1090 .build()
1091 .unwrap();
1092 assert_eq!(pool.inner.max_lifetime, None);
1093 }
1094
1095 #[test]
1097 fn pool_builder_acquire_timeout_none() {
1098 let pool = PoolBuilder::new()
1099 .url("postgres://user:pass@localhost/db")
1100 .acquire_timeout(None)
1101 .build()
1102 .unwrap();
1103 assert_eq!(pool.inner.acquire_timeout, None);
1104 }
1105
1106 #[test]
1108 fn pool_builder_acquire_timeout_custom() {
1109 let pool = PoolBuilder::new()
1110 .url("postgres://user:pass@localhost/db")
1111 .acquire_timeout(Some(Duration::from_secs(10)))
1112 .build()
1113 .unwrap();
1114 assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(10)));
1115 }
1116
1117 #[test]
1119 fn pool_builder_min_idle() {
1120 let pool = PoolBuilder::new()
1121 .url("postgres://user:pass@localhost/db")
1122 .min_idle(2)
1123 .build()
1124 .unwrap();
1125 assert_eq!(pool.inner.min_idle, 2);
1126 }
1127
1128 #[test]
1130 fn pool_close_marks_closed() {
1131 let pool = PoolBuilder::new()
1132 .url("postgres://user:pass@localhost/db")
1133 .max_size(5)
1134 .build()
1135 .unwrap();
1136
1137 assert!(!pool.is_closed());
1138 pool.close();
1139 assert!(pool.is_closed());
1140
1141 let result = pool.acquire();
1143 assert!(result.is_err());
1144 match result {
1145 Err(DriverError::Pool(msg)) => assert!(msg.contains("closed")),
1146 Err(e) => panic!("expected Pool(closed) error, got: {e:?}"),
1147 Ok(_) => panic!("expected error, got Ok"),
1148 }
1149 }
1150
1151 #[test]
1153 fn pool_status_initial() {
1154 let pool = PoolBuilder::new()
1155 .url("postgres://user:pass@localhost/db")
1156 .max_size(10)
1157 .build()
1158 .unwrap();
1159
1160 let status = pool.status();
1161 assert_eq!(status.idle, 0);
1162 assert_eq!(status.active, 0);
1163 assert_eq!(status.open, 0);
1164 assert_eq!(status.max_size, 10);
1165 }
1166
1167 #[test]
1169 fn pool_builder_defaults() {
1170 let pool = PoolBuilder::new()
1171 .url("postgres://user:pass@localhost/db")
1172 .build()
1173 .unwrap();
1174
1175 assert_eq!(pool.max_size(), 10);
1176 assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(30 * 60)));
1177 assert_eq!(pool.inner.acquire_timeout, None); assert_eq!(pool.inner.min_idle, 0);
1179 }
1180
1181 #[test]
1183 fn pool_open_count_initial() {
1184 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1185 assert_eq!(pool.open_count(), 0);
1186 }
1187
1188 #[test]
1191 fn pool_builder_max_stmt_cache_size_default() {
1192 let pool = PoolBuilder::new()
1193 .url("postgres://user:pass@localhost/db")
1194 .build()
1195 .unwrap();
1196 assert_eq!(pool.inner.max_stmt_cache_size, 256);
1197 }
1198
1199 #[test]
1200 fn pool_builder_max_stmt_cache_size_custom() {
1201 let pool = PoolBuilder::new()
1202 .url("postgres://user:pass@localhost/db")
1203 .max_stmt_cache_size(512)
1204 .build()
1205 .unwrap();
1206 assert_eq!(pool.inner.max_stmt_cache_size, 512);
1207 }
1208
1209 #[test]
1212 fn pool_is_uds_false_for_tcp() {
1213 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1214 assert!(!pool.is_uds());
1215 }
1216
1217 #[cfg(unix)]
1218 #[test]
1219 fn pool_is_uds_true_for_unix_socket() {
1220 let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1221 assert!(pool.is_uds());
1222 }
1223
1224 #[cfg(unix)]
1225 #[test]
1226 fn pool_is_uds_true_for_var_run_socket() {
1227 let pool = Pool::connect("postgres://user@localhost/db?host=/var/run/postgresql").unwrap();
1228 assert!(pool.is_uds());
1229 }
1230
1231 #[test]
1232 fn pool_is_uds_false_for_ip_address() {
1233 let pool = Pool::connect("postgres://user:pass@127.0.0.1/db").unwrap();
1234 assert!(!pool.is_uds());
1235 }
1236
1237 #[cfg(unix)]
1238 #[test]
1239 fn pool_slot_sync_created_for_uds_config() {
1240 let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1241 assert!(config.host_is_uds());
1242 }
1243
1244 #[test]
1245 fn pool_slot_tcp_config() {
1246 let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1247 assert!(!config.host_is_uds());
1248 }
1249
1250 #[test]
1255 fn pool_is_uds_false_for_hostname() {
1256 let pool = Pool::connect("postgres://user:pass@db.example.com/db").unwrap();
1257 assert!(!pool.is_uds());
1258 }
1259
1260 #[cfg(unix)]
1261 #[test]
1262 fn pool_is_uds_true_for_tmp() {
1263 let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1264 assert!(pool.is_uds());
1265 }
1266
1267 #[test]
1272 fn pool_close_then_acquire_fails() {
1273 let pool = PoolBuilder::new()
1274 .url("postgres://user:pass@localhost/db")
1275 .max_size(5)
1276 .build()
1277 .unwrap();
1278 pool.close();
1279 let result = pool.acquire();
1280 assert!(result.is_err());
1281 match result {
1282 Err(DriverError::Pool(msg)) => {
1283 assert!(msg.contains("closed"), "should say closed: {msg}")
1284 }
1285 Err(e) => panic!("expected Pool error, got: {e:?}"),
1286 Ok(_) => panic!("expected error"),
1287 }
1288 }
1289
1290 #[test]
1291 fn pool_is_closed_before_and_after() {
1292 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1293 assert!(!pool.is_closed());
1294 pool.close();
1295 assert!(pool.is_closed());
1296 }
1297
1298 #[test]
1303 fn pool_exhausted_no_timeout() {
1304 let pool = PoolBuilder::new()
1305 .url("postgres://user:pass@localhost/db")
1306 .max_size(0)
1307 .acquire_timeout(None) .build()
1309 .unwrap();
1310 let result = pool.acquire();
1311 assert!(result.is_err());
1312 match result {
1313 Err(DriverError::Pool(msg)) => {
1314 assert!(msg.contains("exhausted"), "should say exhausted: {msg}")
1315 }
1316 Err(e) => panic!("expected Pool error, got: {e:?}"),
1317 Ok(_) => panic!("expected error"),
1318 }
1319 }
1320
1321 #[test]
1326 fn pool_builder_no_url_error() {
1327 let result = PoolBuilder::new().max_size(5).build();
1328 assert!(result.is_err());
1329 match result {
1330 Err(DriverError::Pool(msg)) => {
1331 assert!(msg.contains("URL"), "should mention URL: {msg}")
1332 }
1333 Err(e) => panic!("expected Pool error, got: {e:?}"),
1334 Ok(_) => panic!("expected error"),
1335 }
1336 }
1337
1338 #[test]
1339 fn pool_builder_invalid_url_error() {
1340 let result = PoolBuilder::new().url("ftp://something").build();
1341 assert!(result.is_err());
1342 }
1343
1344 #[test]
1345 fn pool_builder_stmt_cache_size_zero() {
1346 let pool = PoolBuilder::new()
1347 .url("postgres://user:pass@localhost/db")
1348 .max_stmt_cache_size(0)
1349 .build()
1350 .unwrap();
1351 assert_eq!(pool.inner.max_stmt_cache_size, 0);
1352 }
1353
1354 #[test]
1359 fn pool_status_reflects_max_size() {
1360 let pool = PoolBuilder::new()
1361 .url("postgres://user:pass@localhost/db")
1362 .max_size(20)
1363 .build()
1364 .unwrap();
1365 let status = pool.status();
1366 assert_eq!(status.max_size, 20);
1367 assert_eq!(status.idle, 0);
1368 assert_eq!(status.active, 0);
1369 assert_eq!(status.open, 0);
1370 }
1371
1372 #[test]
1377 fn pool_clone_shares_config() {
1378 let pool = PoolBuilder::new()
1379 .url("postgres://user:pass@localhost/db")
1380 .max_size(7)
1381 .build()
1382 .unwrap();
1383 let p2 = pool.clone();
1384 assert_eq!(pool.max_size(), 7);
1385 assert_eq!(p2.max_size(), 7);
1386 assert_eq!(pool.open_count(), p2.open_count());
1387 }
1388
1389 #[test]
1394 fn pool_set_warmup_sqls_empty() {
1395 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1396 pool.set_warmup_sqls(&[]);
1397 let sqls = pool
1398 .inner
1399 .warmup_sqls
1400 .lock()
1401 .unwrap_or_else(|e| e.into_inner())
1402 .clone();
1403 assert!(sqls.is_empty());
1404 }
1405
1406 #[test]
1407 fn pool_set_warmup_sqls_multiple() {
1408 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1409 pool.set_warmup_sqls(&["SELECT 1", "SELECT 2", "SELECT 3"]);
1410 let sqls = pool
1411 .inner
1412 .warmup_sqls
1413 .lock()
1414 .unwrap_or_else(|e| e.into_inner())
1415 .clone();
1416 assert_eq!(sqls.len(), 3);
1417 assert_eq!(&*sqls[0], "SELECT 1");
1418 assert_eq!(&*sqls[1], "SELECT 2");
1419 assert_eq!(&*sqls[2], "SELECT 3");
1420 }
1421
1422 #[test]
1423 fn pool_set_warmup_sqls_overwrite() {
1424 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1425 pool.set_warmup_sqls(&["SELECT 1"]);
1426 pool.set_warmup_sqls(&["SELECT 99"]);
1427 let sqls = pool
1428 .inner
1429 .warmup_sqls
1430 .lock()
1431 .unwrap_or_else(|e| e.into_inner())
1432 .clone();
1433 assert_eq!(sqls.len(), 1);
1434 assert_eq!(&*sqls[0], "SELECT 99");
1435 }
1436
1437 #[test]
1442 fn pool_status_debug() {
1443 let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1444 let status = pool.status();
1445 let dbg = format!("{status:?}");
1446 assert!(dbg.contains("PoolStatus"));
1447 assert!(dbg.contains("idle"));
1448 assert!(dbg.contains("active"));
1449 assert!(dbg.contains("open"));
1450 assert!(dbg.contains("max_size"));
1451 }
1452
1453 #[test]
1458 fn config_host_is_uds_returns_true_for_slash() {
1459 let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1460 assert!(config.host_is_uds());
1461 }
1462
1463 #[test]
1464 fn config_host_is_uds_returns_false_for_tcp() {
1465 let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1466 assert!(!config.host_is_uds());
1467 }
1468
1469 #[test]
1470 fn config_host_is_uds_returns_false_for_ip() {
1471 let config = Config::from_url("postgres://user:pass@192.168.1.1/db").unwrap();
1472 assert!(!config.host_is_uds());
1473 }
1474
1475 #[test]
1480 fn pool_builder_full_chain() {
1481 let pool = PoolBuilder::new()
1482 .url("postgres://user:pass@localhost/db")
1483 .max_size(3)
1484 .max_lifetime(Some(Duration::from_secs(600)))
1485 .acquire_timeout(Some(Duration::from_secs(5)))
1486 .min_idle(1)
1487 .max_stmt_cache_size(128)
1488 .build()
1489 .unwrap();
1490 assert_eq!(pool.max_size(), 3);
1491 assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(600)));
1492 assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(5)));
1493 assert_eq!(pool.inner.min_idle, 1);
1494 assert_eq!(pool.inner.max_stmt_cache_size, 128);
1495 }
1496
1497 #[test]
1500 fn pool_max_size_zero_rejects_all_acquires() {
1501 let pool = PoolBuilder::new()
1502 .url("postgres://user:pass@localhost/db")
1503 .max_size(0)
1504 .build()
1505 .unwrap();
1506 let result = pool.acquire();
1507 assert!(result.is_err());
1508 match &result {
1509 Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
1510 _ => panic!("expected pool exhausted error"),
1511 }
1512 }
1513
1514 #[test]
1517 fn url_parse_unknown_sslmode_returns_error() {
1518 let result = Config::from_url("postgres://u:p@h/d?sslmode=bogus");
1519 assert!(result.is_err());
1520 let msg = format!("{}", result.unwrap_err());
1521 assert!(msg.contains("unknown sslmode"));
1522 }
1523
1524 #[test]
1525 fn url_parse_invalid_port_returns_error() {
1526 let result = Config::from_url("postgres://u:p@h:abc/d");
1527 assert!(result.is_err());
1528 let msg = format!("{}", result.unwrap_err());
1529 assert!(msg.contains("invalid port"));
1530 }
1531
1532 #[test]
1533 fn url_parse_missing_at_sign_returns_error() {
1534 let result = Config::from_url("postgres://u:plocalhost/d");
1535 assert!(result.is_err());
1536 let msg = format!("{}", result.unwrap_err());
1537 assert!(msg.contains("missing @"));
1538 }
1539
1540 #[test]
1541 fn url_parse_empty_host_returns_error() {
1542 let result = Config::from_url("postgres://u:p@/d");
1543 assert!(result.is_err());
1544 }
1545
1546 #[test]
1547 fn url_parse_empty_user_returns_error() {
1548 let result = Config::from_url("postgres://:p@h/d");
1549 assert!(result.is_err());
1550 }
1551
1552 #[test]
1553 fn url_parse_statement_timeout_invalid_uses_default() {
1554 let config = Config::from_url("postgres://u:p@h/d?statement_timeout=notnum").unwrap();
1555 assert_eq!(config.statement_timeout_secs, 30);
1556 }
1557
1558 #[test]
1559 fn url_parse_malformed_percent_encoding() {
1560 let result = Config::from_url("postgres://u%:p@h/d");
1561 assert!(result.is_err());
1562 }
1563
1564 #[test]
1565 fn url_parse_invalid_hex_in_percent_encoding() {
1566 let result = Config::from_url("postgres://u%ZZ:p@h/d");
1567 assert!(result.is_err());
1568 }
1569}