1pub mod replica;
46pub use replica::{ReplicaPool, ReplicaStrategy};
47
48pub mod sharding;
49pub use sharding::{ModuloShardChooser, QueryHints, ShardChooser, ShardedPool, ShardedPoolStats};
50
51use std::collections::VecDeque;
52use std::future::Future;
53use std::sync::atomic::{AtomicU64, Ordering};
54use std::sync::{Arc, Condvar, Mutex, Weak};
55use std::time::{Duration, Instant};
56
57use asupersync::{CancelReason, Cx, Outcome};
58use sqlmodel_core::error::{ConnectionError, ConnectionErrorKind, PoolError, PoolErrorKind};
59use sqlmodel_core::{Connection, Error};
60
61#[derive(Debug, Clone)]
63pub struct PoolConfig {
64 pub min_connections: usize,
66 pub max_connections: usize,
68 pub idle_timeout_ms: u64,
70 pub acquire_timeout_ms: u64,
72 pub max_lifetime_ms: u64,
74 pub test_on_checkout: bool,
76 pub test_on_return: bool,
78}
79
80impl Default for PoolConfig {
81 fn default() -> Self {
82 Self {
83 min_connections: 1,
84 max_connections: 10,
85 idle_timeout_ms: 600_000, acquire_timeout_ms: 30_000, max_lifetime_ms: 1_800_000, test_on_checkout: true,
89 test_on_return: false,
90 }
91 }
92}
93
94impl PoolConfig {
95 #[must_use]
97 pub fn new(max_connections: usize) -> Self {
98 Self {
99 max_connections,
100 ..Default::default()
101 }
102 }
103
104 #[must_use]
106 pub fn min_connections(mut self, n: usize) -> Self {
107 self.min_connections = n;
108 self
109 }
110
111 #[must_use]
113 pub fn idle_timeout(mut self, ms: u64) -> Self {
114 self.idle_timeout_ms = ms;
115 self
116 }
117
118 #[must_use]
120 pub fn acquire_timeout(mut self, ms: u64) -> Self {
121 self.acquire_timeout_ms = ms;
122 self
123 }
124
125 #[must_use]
127 pub fn max_lifetime(mut self, ms: u64) -> Self {
128 self.max_lifetime_ms = ms;
129 self
130 }
131
132 #[must_use]
134 pub fn test_on_checkout(mut self, enabled: bool) -> Self {
135 self.test_on_checkout = enabled;
136 self
137 }
138
139 #[must_use]
141 pub fn test_on_return(mut self, enabled: bool) -> Self {
142 self.test_on_return = enabled;
143 self
144 }
145}
146
147#[derive(Debug, Clone, Default)]
149pub struct PoolStats {
150 pub total_connections: usize,
152 pub idle_connections: usize,
154 pub active_connections: usize,
156 pub pending_requests: usize,
158 pub connections_created: u64,
160 pub connections_closed: u64,
162 pub acquires: u64,
164 pub timeouts: u64,
166}
167
168#[derive(Debug)]
170struct ConnectionMeta<C> {
171 conn: C,
173 created_at: Instant,
175 last_used: Instant,
177}
178
179impl<C> ConnectionMeta<C> {
180 fn new(conn: C) -> Self {
181 let now = Instant::now();
182 Self {
183 conn,
184 created_at: now,
185 last_used: now,
186 }
187 }
188
189 fn touch(&mut self) {
190 self.last_used = Instant::now();
191 }
192
193 fn age(&self) -> Duration {
194 self.created_at.elapsed()
195 }
196
197 fn idle_time(&self) -> Duration {
198 self.last_used.elapsed()
199 }
200}
201
202struct PoolInner<C> {
204 config: PoolConfig,
206 idle: VecDeque<ConnectionMeta<C>>,
208 active_count: usize,
210 total_count: usize,
212 waiter_count: usize,
214 closed: bool,
216}
217
218impl<C> PoolInner<C> {
219 fn new(config: PoolConfig) -> Self {
220 Self {
221 config,
222 idle: VecDeque::new(),
223 active_count: 0,
224 total_count: 0,
225 waiter_count: 0,
226 closed: false,
227 }
228 }
229
230 fn can_create_new(&self) -> bool {
231 !self.closed && self.total_count < self.config.max_connections
232 }
233
234 fn stats(&self) -> PoolStats {
235 PoolStats {
236 total_connections: self.total_count,
237 idle_connections: self.idle.len(),
238 active_connections: self.active_count,
239 pending_requests: self.waiter_count,
240 ..Default::default()
241 }
242 }
243}
244
245struct PoolShared<C> {
247 inner: Mutex<PoolInner<C>>,
249 conn_available: Condvar,
251 connections_created: AtomicU64,
253 connections_closed: AtomicU64,
254 acquires: AtomicU64,
255 timeouts: AtomicU64,
256}
257
258impl<C> PoolShared<C> {
259 fn new(config: PoolConfig) -> Self {
260 Self {
261 inner: Mutex::new(PoolInner::new(config)),
262 conn_available: Condvar::new(),
263 connections_created: AtomicU64::new(0),
264 connections_closed: AtomicU64::new(0),
265 acquires: AtomicU64::new(0),
266 timeouts: AtomicU64::new(0),
267 }
268 }
269
270 fn lock_or_recover(&self) -> std::sync::MutexGuard<'_, PoolInner<C>> {
279 self.inner.lock().unwrap_or_else(|poisoned| {
280 tracing::error!(
281 "Pool mutex poisoned; recovering for read-only access. \
282 A thread panicked while holding the lock."
283 );
284 poisoned.into_inner()
285 })
286 }
287
288 #[allow(clippy::result_large_err)] fn lock_or_error(
295 &self,
296 operation: &'static str,
297 ) -> Result<std::sync::MutexGuard<'_, PoolInner<C>>, Error> {
298 self.inner
299 .lock()
300 .map_err(|_| Error::Pool(PoolError::poisoned(operation)))
301 }
302}
303
304pub struct Pool<C: Connection> {
319 shared: Arc<PoolShared<C>>,
320}
321
322impl<C: Connection> Pool<C> {
323 #[must_use]
325 pub fn new(config: PoolConfig) -> Self {
326 Self {
327 shared: Arc::new(PoolShared::new(config)),
328 }
329 }
330
331 #[must_use]
333 pub fn config(&self) -> PoolConfig {
334 let inner = self.shared.lock_or_recover();
335 inner.config.clone()
336 }
337
338 #[must_use]
340 pub fn stats(&self) -> PoolStats {
341 let inner = self.shared.lock_or_recover();
342 let mut stats = inner.stats();
343 stats.connections_created = self.shared.connections_created.load(Ordering::Relaxed);
344 stats.connections_closed = self.shared.connections_closed.load(Ordering::Relaxed);
345 stats.acquires = self.shared.acquires.load(Ordering::Relaxed);
346 stats.timeouts = self.shared.timeouts.load(Ordering::Relaxed);
347 stats
348 }
349
350 #[must_use]
352 pub fn at_capacity(&self) -> bool {
353 let inner = self.shared.lock_or_recover();
354 inner.total_count >= inner.config.max_connections
355 }
356
357 #[must_use]
359 pub fn is_closed(&self) -> bool {
360 let inner = self.shared.lock_or_recover();
361 inner.closed
362 }
363
364 pub async fn acquire<F, Fut>(&self, cx: &Cx, factory: F) -> Outcome<PooledConnection<C>, Error>
379 where
380 F: Fn() -> Fut,
381 Fut: Future<Output = Outcome<C, Error>>,
382 {
383 let deadline = Instant::now() + Duration::from_millis(self.config().acquire_timeout_ms);
384 let test_on_checkout = self.config().test_on_checkout;
385 let max_lifetime = Duration::from_millis(self.config().max_lifetime_ms);
386 let idle_timeout = Duration::from_millis(self.config().idle_timeout_ms);
387
388 loop {
389 if cx.is_cancel_requested() {
391 return Outcome::Cancelled(CancelReason::user("pool acquire cancelled"));
392 }
393
394 if Instant::now() >= deadline {
396 self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
397 return Outcome::Err(Error::Pool(PoolError {
398 kind: PoolErrorKind::Timeout,
399 message: "acquire timeout: no connections available".to_string(),
400 source: None,
401 }));
402 }
403
404 let action = {
406 let mut inner = match self.shared.lock_or_error("acquire") {
407 Ok(guard) => guard,
408 Err(e) => return Outcome::Err(e),
409 };
410
411 if inner.closed {
412 AcquireAction::PoolClosed
413 } else {
414 let mut found_conn = None;
416 while let Some(mut meta) = inner.idle.pop_front() {
417 if meta.age() > max_lifetime {
419 inner.total_count -= 1;
420 self.shared
421 .connections_closed
422 .fetch_add(1, Ordering::Relaxed);
423 continue;
424 }
425
426 if meta.idle_time() > idle_timeout {
428 inner.total_count -= 1;
429 self.shared
430 .connections_closed
431 .fetch_add(1, Ordering::Relaxed);
432 continue;
433 }
434
435 meta.touch();
437 inner.active_count += 1;
438 found_conn = Some(meta);
439 break;
440 }
441
442 if let Some(meta) = found_conn {
443 AcquireAction::ValidateExisting(meta)
444 } else if inner.can_create_new() {
445 inner.total_count += 1;
447 inner.active_count += 1;
448 AcquireAction::CreateNew
449 } else {
450 inner.waiter_count += 1;
452 AcquireAction::Wait
453 }
454 }
455 };
456
457 match action {
458 AcquireAction::PoolClosed => {
459 return Outcome::Err(Error::Pool(PoolError {
460 kind: PoolErrorKind::Closed,
461 message: "pool has been closed".to_string(),
462 source: None,
463 }));
464 }
465 AcquireAction::ValidateExisting(meta) => {
466 return self.validate_and_wrap(cx, meta, test_on_checkout).await;
468 }
469 AcquireAction::CreateNew => {
470 match factory().await {
472 Outcome::Ok(conn) => {
473 self.shared
474 .connections_created
475 .fetch_add(1, Ordering::Relaxed);
476 self.shared.acquires.fetch_add(1, Ordering::Relaxed);
477 let meta = ConnectionMeta::new(conn);
478 return Outcome::Ok(PooledConnection::new(
479 meta,
480 Arc::downgrade(&self.shared),
481 ));
482 }
483 Outcome::Err(e) => {
484 if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
486 inner.total_count -= 1;
487 inner.active_count -= 1;
488 }
489 return Outcome::Err(e);
491 }
492 Outcome::Cancelled(reason) => {
493 if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
494 inner.total_count -= 1;
495 inner.active_count -= 1;
496 }
497 return Outcome::Cancelled(reason);
498 }
499 Outcome::Panicked(info) => {
500 if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
501 inner.total_count -= 1;
502 inner.active_count -= 1;
503 }
504 return Outcome::Panicked(info);
505 }
506 }
507 }
508 AcquireAction::Wait => {
509 let remaining = deadline.saturating_duration_since(Instant::now());
511 if remaining.is_zero() {
512 if let Ok(mut inner) = self.shared.lock_or_error("acquire_timeout") {
513 inner.waiter_count -= 1;
514 }
515 self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
516 return Outcome::Err(Error::Pool(PoolError {
517 kind: PoolErrorKind::Timeout,
518 message: "acquire timeout: no connections available".to_string(),
519 source: None,
520 }));
521 }
522
523 let wait_time = remaining.min(Duration::from_millis(100));
525 {
526 let inner = match self.shared.lock_or_error("acquire_wait") {
527 Ok(guard) => guard,
528 Err(e) => return Outcome::Err(e),
529 };
530 let _ = self
532 .shared
533 .conn_available
534 .wait_timeout(inner, wait_time)
535 .map_err(|_| {
536 tracing::error!("Pool mutex poisoned during wait_timeout");
537 });
538 }
539
540 {
542 if let Ok(mut inner) = self.shared.lock_or_error("acquire_wake") {
543 inner.waiter_count = inner.waiter_count.saturating_sub(1);
544 }
545 }
546
547 }
549 }
550 }
551 }
552
553 async fn validate_and_wrap(
555 &self,
556 cx: &Cx,
557 meta: ConnectionMeta<C>,
558 test_on_checkout: bool,
559 ) -> Outcome<PooledConnection<C>, Error> {
560 if test_on_checkout {
561 match meta.conn.ping(cx).await {
563 Outcome::Ok(()) => {
564 self.shared.acquires.fetch_add(1, Ordering::Relaxed);
565 Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
566 }
567 Outcome::Err(_) | Outcome::Cancelled(_) | Outcome::Panicked(_) => {
568 {
570 if let Ok(mut inner) = self.shared.lock_or_error("validate_cleanup") {
571 inner.total_count -= 1;
572 inner.active_count -= 1;
573 }
574 }
575 self.shared
576 .connections_closed
577 .fetch_add(1, Ordering::Relaxed);
578 Outcome::Err(Error::Connection(ConnectionError {
580 kind: ConnectionErrorKind::Disconnected,
581 message: "connection validation failed".to_string(),
582 source: None,
583 }))
584 }
585 }
586 } else {
587 self.shared.acquires.fetch_add(1, Ordering::Relaxed);
588 Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
589 }
590 }
591
592 pub fn clear_idle(&self) {
596 if let Ok(mut inner) = self.shared.inner.lock() {
597 let idle_count = inner.idle.len();
598 inner.idle.clear();
599 inner.total_count -= idle_count;
600 self.shared.connections_closed.fetch_add(idle_count as u64, Ordering::Relaxed);
601 }
602 }
603
604 pub fn close(&self) {
605 match self.shared.inner.lock() {
606 Ok(mut inner) => {
607 inner.closed = true;
608
609 let idle_count = inner.idle.len();
611 inner.idle.clear();
612 inner.total_count -= idle_count;
613 self.shared
614 .connections_closed
615 .fetch_add(idle_count as u64, Ordering::Relaxed);
616 drop(inner);
617 }
618 Err(poisoned) => {
619 tracing::error!(
622 "Pool mutex poisoned during close; attempting recovery. \
623 Pool state may be inconsistent."
624 );
625 let mut inner = poisoned.into_inner();
626 inner.closed = true;
627 let idle_count = inner.idle.len();
628 inner.idle.clear();
629 inner.total_count -= idle_count;
630 self.shared
631 .connections_closed
632 .fetch_add(idle_count as u64, Ordering::Relaxed);
633 }
634 }
635
636 self.shared.conn_available.notify_all();
638 }
639
640 #[must_use]
642 pub fn idle_count(&self) -> usize {
643 let inner = self.shared.lock_or_recover();
644 inner.idle.len()
645 }
646
647 #[must_use]
649 pub fn active_count(&self) -> usize {
650 let inner = self.shared.lock_or_recover();
651 inner.active_count
652 }
653
654 #[must_use]
656 pub fn total_count(&self) -> usize {
657 let inner = self.shared.lock_or_recover();
658 inner.total_count
659 }
660}
661
662enum AcquireAction<C> {
664 PoolClosed,
666 ValidateExisting(ConnectionMeta<C>),
668 CreateNew,
670 Wait,
672}
673
674pub struct PooledConnection<C: Connection> {
679 meta: Option<ConnectionMeta<C>>,
681 pool: Weak<PoolShared<C>>,
683}
684
685impl<C: Connection> PooledConnection<C> {
686 fn new(meta: ConnectionMeta<C>, pool: Weak<PoolShared<C>>) -> Self {
687 Self {
688 meta: Some(meta),
689 pool,
690 }
691 }
692
693 pub fn detach(mut self) -> C {
698 if let Some(pool) = self.pool.upgrade() {
699 match pool.inner.lock() {
702 Ok(mut inner) => {
703 inner.total_count -= 1;
704 inner.active_count -= 1;
705 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
706 }
707 Err(_poisoned) => {
708 tracing::error!(
709 "Pool mutex poisoned during detach; pool counters will be inconsistent"
710 );
711 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
713 }
714 }
715 }
716 self.meta.take().expect("connection already detached").conn
717 }
718
719 #[must_use]
721 pub fn age(&self) -> Duration {
722 self.meta.as_ref().map_or(Duration::ZERO, |m| m.age())
723 }
724
725 #[must_use]
727 pub fn idle_time(&self) -> Duration {
728 self.meta.as_ref().map_or(Duration::ZERO, |m| m.idle_time())
729 }
730}
731
732impl<C: Connection> std::ops::Deref for PooledConnection<C> {
733 type Target = C;
734
735 fn deref(&self) -> &Self::Target {
736 &self
737 .meta
738 .as_ref()
739 .expect("connection already returned to pool")
740 .conn
741 }
742}
743
744impl<C: Connection> std::ops::DerefMut for PooledConnection<C> {
745 fn deref_mut(&mut self) -> &mut Self::Target {
746 &mut self
747 .meta
748 .as_mut()
749 .expect("connection already returned to pool")
750 .conn
751 }
752}
753
754impl<C: Connection> Drop for PooledConnection<C> {
755 fn drop(&mut self) {
756 if let Some(mut meta) = self.meta.take() {
757 meta.touch(); if let Some(pool) = self.pool.upgrade() {
759 let mut inner = match pool.inner.lock() {
762 Ok(guard) => guard,
763 Err(_poisoned) => {
764 tracing::error!(
765 "Pool mutex poisoned during connection return; \
766 connection will be leaked. A thread panicked while holding the lock."
767 );
768 return;
771 }
772 };
773
774 if inner.closed {
775 inner.total_count -= 1;
776 inner.active_count -= 1;
777 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
778 return;
779 }
780
781 let max_lifetime = Duration::from_millis(inner.config.max_lifetime_ms);
783 if meta.age() > max_lifetime {
784 inner.total_count -= 1;
785 inner.active_count -= 1;
786 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
787 return;
788 }
789
790 inner.active_count -= 1;
791 inner.idle.push_back(meta);
792
793 drop(inner);
794 pool.conn_available.notify_one();
795 }
796 }
798 }
799}
800
801impl<C: Connection + std::fmt::Debug> std::fmt::Debug for PooledConnection<C> {
802 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
803 f.debug_struct("PooledConnection")
804 .field("conn", &self.meta.as_ref().map(|m| &m.conn))
805 .field("age", &self.age())
806 .field("idle_time", &self.idle_time())
807 .finish_non_exhaustive()
808 }
809}
810
811#[cfg(test)]
812mod tests {
813 use super::*;
814 use sqlmodel_core::connection::{IsolationLevel, PreparedStatement, TransactionOps};
815 use sqlmodel_core::{Row, Value};
816 use std::sync::atomic::AtomicBool;
817
818 #[derive(Debug)]
820 struct MockConnection {
821 id: u32,
822 ping_should_fail: Arc<AtomicBool>,
823 }
824
825 impl MockConnection {
826 fn new(id: u32) -> Self {
827 Self {
828 id,
829 ping_should_fail: Arc::new(AtomicBool::new(false)),
830 }
831 }
832
833 #[allow(dead_code)]
834 fn with_ping_behavior(id: u32, should_fail: Arc<AtomicBool>) -> Self {
835 Self {
836 id,
837 ping_should_fail: should_fail,
838 }
839 }
840 }
841
842 struct MockTx;
844
845 impl TransactionOps for MockTx {
846 async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
847 Outcome::Ok(vec![])
848 }
849
850 async fn query_one(
851 &self,
852 _cx: &Cx,
853 _sql: &str,
854 _params: &[Value],
855 ) -> Outcome<Option<Row>, Error> {
856 Outcome::Ok(None)
857 }
858
859 async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
860 Outcome::Ok(0)
861 }
862
863 async fn savepoint(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
864 Outcome::Ok(())
865 }
866
867 async fn rollback_to(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
868 Outcome::Ok(())
869 }
870
871 async fn release(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
872 Outcome::Ok(())
873 }
874
875 async fn commit(self, _cx: &Cx) -> Outcome<(), Error> {
876 Outcome::Ok(())
877 }
878
879 async fn rollback(self, _cx: &Cx) -> Outcome<(), Error> {
880 Outcome::Ok(())
881 }
882 }
883
884 impl Connection for MockConnection {
885 type Tx<'conn> = MockTx;
886
887 async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
888 Outcome::Ok(vec![])
889 }
890
891 async fn query_one(
892 &self,
893 _cx: &Cx,
894 _sql: &str,
895 _params: &[Value],
896 ) -> Outcome<Option<Row>, Error> {
897 Outcome::Ok(None)
898 }
899
900 async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
901 Outcome::Ok(0)
902 }
903
904 async fn insert(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<i64, Error> {
905 Outcome::Ok(0)
906 }
907
908 async fn batch(
909 &self,
910 _cx: &Cx,
911 _statements: &[(String, Vec<Value>)],
912 ) -> Outcome<Vec<u64>, Error> {
913 Outcome::Ok(vec![])
914 }
915
916 async fn begin(&self, _cx: &Cx) -> Outcome<Self::Tx<'_>, Error> {
917 Outcome::Ok(MockTx)
918 }
919
920 async fn begin_with(
921 &self,
922 _cx: &Cx,
923 _isolation: IsolationLevel,
924 ) -> Outcome<Self::Tx<'_>, Error> {
925 Outcome::Ok(MockTx)
926 }
927
928 async fn prepare(&self, _cx: &Cx, _sql: &str) -> Outcome<PreparedStatement, Error> {
929 Outcome::Ok(PreparedStatement::new(1, String::new(), 0))
930 }
931
932 async fn query_prepared(
933 &self,
934 _cx: &Cx,
935 _stmt: &PreparedStatement,
936 _params: &[Value],
937 ) -> Outcome<Vec<Row>, Error> {
938 Outcome::Ok(vec![])
939 }
940
941 async fn execute_prepared(
942 &self,
943 _cx: &Cx,
944 _stmt: &PreparedStatement,
945 _params: &[Value],
946 ) -> Outcome<u64, Error> {
947 Outcome::Ok(0)
948 }
949
950 async fn ping(&self, _cx: &Cx) -> Outcome<(), Error> {
951 if self.ping_should_fail.load(Ordering::Relaxed) {
952 Outcome::Err(Error::Connection(ConnectionError {
953 kind: ConnectionErrorKind::Disconnected,
954 message: "mock ping failed".to_string(),
955 source: None,
956 }))
957 } else {
958 Outcome::Ok(())
959 }
960 }
961
962 async fn close(self, _cx: &Cx) -> Result<(), Error> {
963 Ok(())
964 }
965 }
966
967 #[test]
968 fn test_config_default() {
969 let config = PoolConfig::default();
970 assert_eq!(config.min_connections, 1);
971 assert_eq!(config.max_connections, 10);
972 assert_eq!(config.idle_timeout_ms, 600_000);
973 assert_eq!(config.acquire_timeout_ms, 30_000);
974 assert_eq!(config.max_lifetime_ms, 1_800_000);
975 assert!(config.test_on_checkout);
976 assert!(!config.test_on_return);
977 }
978
979 #[test]
980 fn test_config_builder() {
981 let config = PoolConfig::new(20)
982 .min_connections(5)
983 .idle_timeout(60_000)
984 .acquire_timeout(5_000)
985 .max_lifetime(300_000)
986 .test_on_checkout(false)
987 .test_on_return(true);
988
989 assert_eq!(config.min_connections, 5);
990 assert_eq!(config.max_connections, 20);
991 assert_eq!(config.idle_timeout_ms, 60_000);
992 assert_eq!(config.acquire_timeout_ms, 5_000);
993 assert_eq!(config.max_lifetime_ms, 300_000);
994 assert!(!config.test_on_checkout);
995 assert!(config.test_on_return);
996 }
997
998 #[test]
999 fn test_config_clone() {
1000 let config = PoolConfig::new(15).min_connections(3);
1001 let cloned = config.clone();
1002 assert_eq!(config.max_connections, cloned.max_connections);
1003 assert_eq!(config.min_connections, cloned.min_connections);
1004 }
1005
1006 #[test]
1007 fn test_stats_default() {
1008 let stats = PoolStats::default();
1009 assert_eq!(stats.total_connections, 0);
1010 assert_eq!(stats.idle_connections, 0);
1011 assert_eq!(stats.active_connections, 0);
1012 assert_eq!(stats.pending_requests, 0);
1013 assert_eq!(stats.connections_created, 0);
1014 assert_eq!(stats.connections_closed, 0);
1015 assert_eq!(stats.acquires, 0);
1016 assert_eq!(stats.timeouts, 0);
1017 }
1018
1019 #[test]
1020 fn test_stats_clone() {
1021 let stats = PoolStats {
1022 total_connections: 5,
1023 acquires: 100,
1024 ..Default::default()
1025 };
1026 let cloned = stats.clone();
1027 assert_eq!(stats.total_connections, cloned.total_connections);
1028 assert_eq!(stats.acquires, cloned.acquires);
1029 }
1030
1031 #[test]
1032 fn test_connection_meta_timing() {
1033 use std::thread;
1034
1035 struct DummyConn;
1037
1038 let meta = ConnectionMeta::new(DummyConn);
1039 let initial_age = meta.age();
1040
1041 thread::sleep(Duration::from_millis(10));
1043
1044 assert!(meta.age() > initial_age);
1046 assert!(meta.idle_time() > Duration::ZERO);
1047 }
1048
1049 #[test]
1050 fn test_connection_meta_touch() {
1051 use std::thread;
1052
1053 struct DummyConn;
1054
1055 let mut meta = ConnectionMeta::new(DummyConn);
1056
1057 thread::sleep(Duration::from_millis(10));
1059 let idle_before_touch = meta.idle_time();
1060 assert!(idle_before_touch > Duration::ZERO);
1061
1062 meta.touch();
1064 let idle_after_touch = meta.idle_time();
1065
1066 assert!(idle_after_touch < idle_before_touch);
1068 }
1069
1070 #[test]
1071 fn test_pool_new() {
1072 let config = PoolConfig::new(5);
1073 let pool: Pool<MockConnection> = Pool::new(config);
1074
1075 assert_eq!(pool.idle_count(), 0);
1077 assert_eq!(pool.active_count(), 0);
1078 assert_eq!(pool.total_count(), 0);
1079 assert!(!pool.is_closed());
1080 assert!(!pool.at_capacity());
1081 }
1082
1083 #[test]
1084 fn test_pool_config() {
1085 let config = PoolConfig::new(7).min_connections(2);
1086 let pool: Pool<MockConnection> = Pool::new(config);
1087
1088 let retrieved_config = pool.config();
1089 assert_eq!(retrieved_config.max_connections, 7);
1090 assert_eq!(retrieved_config.min_connections, 2);
1091 }
1092
1093 #[test]
1094 fn test_pool_stats_initial() {
1095 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1096
1097 let stats = pool.stats();
1098 assert_eq!(stats.total_connections, 0);
1099 assert_eq!(stats.idle_connections, 0);
1100 assert_eq!(stats.active_connections, 0);
1101 assert_eq!(stats.pending_requests, 0);
1102 assert_eq!(stats.connections_created, 0);
1103 assert_eq!(stats.connections_closed, 0);
1104 assert_eq!(stats.acquires, 0);
1105 assert_eq!(stats.timeouts, 0);
1106 }
1107
1108 #[test]
1109 fn test_pool_close() {
1110 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1111
1112 assert!(!pool.is_closed());
1113 pool.close();
1114 assert!(pool.is_closed());
1115 }
1116
1117 #[test]
1118 fn test_pool_inner_can_create_new() {
1119 let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(3));
1120
1121 assert!(inner.can_create_new());
1123
1124 inner.total_count = 3;
1126 assert!(!inner.can_create_new());
1127
1128 inner.total_count = 2;
1130 assert!(inner.can_create_new());
1131
1132 inner.closed = true;
1134 assert!(!inner.can_create_new());
1135 }
1136
1137 #[test]
1138 fn test_pool_inner_stats() {
1139 let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(10));
1140
1141 inner.total_count = 5;
1142 inner.active_count = 3;
1143 inner.waiter_count = 2;
1144 inner
1145 .idle
1146 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1147 inner
1148 .idle
1149 .push_back(ConnectionMeta::new(MockConnection::new(2)));
1150
1151 let stats = inner.stats();
1152 assert_eq!(stats.total_connections, 5);
1153 assert_eq!(stats.idle_connections, 2);
1154 assert_eq!(stats.active_connections, 3);
1155 assert_eq!(stats.pending_requests, 2);
1156 }
1157
1158 #[test]
1159 fn test_pooled_connection_age_and_idle_time() {
1160 use std::thread;
1161
1162 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1163
1164 {
1166 let mut inner = pool.shared.inner.lock().unwrap();
1167 inner.total_count = 1;
1168 inner.active_count = 1;
1169 }
1170
1171 let meta = ConnectionMeta::new(MockConnection::new(1));
1172 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1173
1174 assert!(pooled.age() >= Duration::ZERO);
1176
1177 thread::sleep(Duration::from_millis(5));
1178 assert!(pooled.age() > Duration::ZERO);
1179 }
1180
1181 #[test]
1182 fn test_pooled_connection_detach() {
1183 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1184
1185 {
1187 let mut inner = pool.shared.inner.lock().unwrap();
1188 inner.total_count = 1;
1189 inner.active_count = 1;
1190 }
1191
1192 let meta = ConnectionMeta::new(MockConnection::new(42));
1193 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1194
1195 assert_eq!(pool.total_count(), 1);
1197 assert_eq!(pool.active_count(), 1);
1198
1199 let conn = pooled.detach();
1201 assert_eq!(conn.id, 42);
1202
1203 assert_eq!(pool.total_count(), 0);
1205 assert_eq!(pool.active_count(), 0);
1206
1207 let stats = pool.stats();
1209 assert_eq!(stats.connections_closed, 1);
1210 }
1211
1212 #[test]
1213 fn test_pooled_connection_drop_returns_to_pool() {
1214 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1215
1216 {
1218 let mut inner = pool.shared.inner.lock().unwrap();
1219 inner.total_count = 1;
1220 inner.active_count = 1;
1221 }
1222
1223 let meta = ConnectionMeta::new(MockConnection::new(1));
1224 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1225
1226 assert_eq!(pool.active_count(), 1);
1228 assert_eq!(pool.idle_count(), 0);
1229
1230 drop(pooled);
1232
1233 assert_eq!(pool.active_count(), 0);
1235 assert_eq!(pool.idle_count(), 1);
1236 assert_eq!(pool.total_count(), 1); }
1238
1239 #[test]
1240 fn test_pooled_connection_drop_when_pool_closed() {
1241 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1242
1243 {
1245 let mut inner = pool.shared.inner.lock().unwrap();
1246 inner.total_count = 1;
1247 inner.active_count = 1;
1248 }
1249
1250 let meta = ConnectionMeta::new(MockConnection::new(1));
1251 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1252
1253 pool.close();
1255
1256 drop(pooled);
1258
1259 assert_eq!(pool.idle_count(), 0);
1261 assert_eq!(pool.active_count(), 0);
1262 assert_eq!(pool.total_count(), 0);
1263
1264 assert_eq!(pool.stats().connections_closed, 1);
1266 }
1267
1268 #[test]
1269 fn test_pooled_connection_deref() {
1270 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1271
1272 {
1274 let mut inner = pool.shared.inner.lock().unwrap();
1275 inner.total_count = 1;
1276 inner.active_count = 1;
1277 }
1278
1279 let meta = ConnectionMeta::new(MockConnection::new(99));
1280 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1281
1282 assert_eq!(pooled.id, 99);
1284 }
1285
1286 #[test]
1287 fn test_pooled_connection_deref_mut() {
1288 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1289
1290 {
1292 let mut inner = pool.shared.inner.lock().unwrap();
1293 inner.total_count = 1;
1294 inner.active_count = 1;
1295 }
1296
1297 let meta = ConnectionMeta::new(MockConnection::new(1));
1298 let mut pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1299
1300 pooled.id = 50;
1302 assert_eq!(pooled.id, 50);
1303 }
1304
1305 #[test]
1306 fn test_pooled_connection_debug() {
1307 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1308
1309 {
1311 let mut inner = pool.shared.inner.lock().unwrap();
1312 inner.total_count = 1;
1313 inner.active_count = 1;
1314 }
1315
1316 let meta = ConnectionMeta::new(MockConnection::new(1));
1317 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1318
1319 let debug_str = format!("{:?}", pooled);
1320 assert!(debug_str.contains("PooledConnection"));
1321 assert!(debug_str.contains("age"));
1322 }
1323
1324 #[test]
1325 fn test_pool_at_capacity() {
1326 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(2));
1327
1328 assert!(!pool.at_capacity());
1329
1330 {
1332 let mut inner = pool.shared.inner.lock().unwrap();
1333 inner.total_count = 1;
1334 }
1335 assert!(!pool.at_capacity());
1336
1337 {
1338 let mut inner = pool.shared.inner.lock().unwrap();
1339 inner.total_count = 2;
1340 }
1341 assert!(pool.at_capacity());
1342 }
1343
1344 #[test]
1345 fn test_acquire_action_enum() {
1346 let closed: AcquireAction<MockConnection> = AcquireAction::PoolClosed;
1348 assert!(matches!(closed, AcquireAction::PoolClosed));
1349
1350 let create: AcquireAction<MockConnection> = AcquireAction::CreateNew;
1351 assert!(matches!(create, AcquireAction::CreateNew));
1352
1353 let wait: AcquireAction<MockConnection> = AcquireAction::Wait;
1354 assert!(matches!(wait, AcquireAction::Wait));
1355
1356 let meta = ConnectionMeta::new(MockConnection::new(1));
1357 let validate: AcquireAction<MockConnection> = AcquireAction::ValidateExisting(meta);
1358 assert!(matches!(validate, AcquireAction::ValidateExisting(_)));
1359 }
1360
1361 #[test]
1362 fn test_pool_shared_atomic_counters() {
1363 let shared = PoolShared::<MockConnection>::new(PoolConfig::new(5));
1364
1365 assert_eq!(shared.connections_created.load(Ordering::Relaxed), 0);
1367 assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 0);
1368 assert_eq!(shared.acquires.load(Ordering::Relaxed), 0);
1369 assert_eq!(shared.timeouts.load(Ordering::Relaxed), 0);
1370
1371 shared.connections_created.fetch_add(1, Ordering::Relaxed);
1373 shared.connections_closed.fetch_add(2, Ordering::Relaxed);
1374 shared.acquires.fetch_add(10, Ordering::Relaxed);
1375 shared.timeouts.fetch_add(3, Ordering::Relaxed);
1376
1377 assert_eq!(shared.connections_created.load(Ordering::Relaxed), 1);
1378 assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 2);
1379 assert_eq!(shared.acquires.load(Ordering::Relaxed), 10);
1380 assert_eq!(shared.timeouts.load(Ordering::Relaxed), 3);
1381 }
1382
1383 #[test]
1384 fn test_pool_close_clears_idle() {
1385 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1386
1387 {
1389 let mut inner = pool.shared.inner.lock().unwrap();
1390 inner.total_count = 3;
1391 inner
1392 .idle
1393 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1394 inner
1395 .idle
1396 .push_back(ConnectionMeta::new(MockConnection::new(2)));
1397 inner
1398 .idle
1399 .push_back(ConnectionMeta::new(MockConnection::new(3)));
1400 }
1401
1402 assert_eq!(pool.idle_count(), 3);
1403 assert_eq!(pool.total_count(), 3);
1404
1405 pool.close();
1406
1407 assert_eq!(pool.idle_count(), 0);
1409 assert_eq!(pool.total_count(), 0);
1410 assert!(pool.is_closed());
1411
1412 assert_eq!(pool.stats().connections_closed, 3);
1414 }
1415
1416 fn poison_pool_mutex() -> Pool<MockConnection> {
1429 use std::panic;
1430 use std::thread;
1431
1432 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1433
1434 {
1436 let mut inner = pool.shared.inner.lock().unwrap();
1437 inner.total_count = 2;
1438 inner.active_count = 1;
1439 inner
1440 .idle
1441 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1442 }
1443
1444 let shared_clone = Arc::clone(&pool.shared);
1446 let handle = thread::spawn(move || {
1447 let _guard = shared_clone.inner.lock().unwrap();
1448 panic!("intentional panic to poison mutex");
1450 });
1451
1452 let _ = handle.join();
1454
1455 assert!(pool.shared.inner.lock().is_err());
1457
1458 pool
1459 }
1460
1461 #[test]
1464 fn test_config_after_poisoning_returns_valid_data() {
1465 let pool = poison_pool_mutex();
1466
1467 let config = pool.config();
1469 assert_eq!(config.max_connections, 5);
1470 }
1471
1472 #[test]
1473 fn test_stats_after_poisoning_returns_valid_data() {
1474 let pool = poison_pool_mutex();
1475
1476 let stats = pool.stats();
1478 assert_eq!(stats.total_connections, 2);
1480 assert_eq!(stats.active_connections, 1);
1481 assert_eq!(stats.idle_connections, 1);
1482 }
1483
1484 #[test]
1485 fn test_at_capacity_after_poisoning() {
1486 let pool = poison_pool_mutex();
1487
1488 assert!(!pool.at_capacity());
1491 }
1492
1493 #[test]
1494 fn test_is_closed_after_poisoning() {
1495 let pool = poison_pool_mutex();
1496
1497 assert!(!pool.is_closed());
1499 }
1500
1501 #[test]
1502 fn test_idle_count_after_poisoning() {
1503 let pool = poison_pool_mutex();
1504
1505 assert_eq!(pool.idle_count(), 1);
1507 }
1508
1509 #[test]
1510 fn test_active_count_after_poisoning() {
1511 let pool = poison_pool_mutex();
1512
1513 assert_eq!(pool.active_count(), 1);
1515 }
1516
1517 #[test]
1518 fn test_total_count_after_poisoning() {
1519 let pool = poison_pool_mutex();
1520
1521 assert_eq!(pool.total_count(), 2);
1523 }
1524
1525 #[test]
1528 fn test_lock_or_error_returns_error_when_poisoned() {
1529 use std::thread;
1530
1531 let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1532
1533 let shared_clone = Arc::clone(&shared);
1535 let handle = thread::spawn(move || {
1536 let _guard = shared_clone.inner.lock().unwrap();
1537 panic!("intentional panic to poison mutex");
1538 });
1539 let _ = handle.join();
1540
1541 let result = shared.lock_or_error("test_operation");
1543
1544 match result {
1546 Err(Error::Pool(pool_err)) => {
1547 assert!(matches!(pool_err.kind, PoolErrorKind::Poisoned));
1548 assert!(pool_err.message.contains("poisoned"));
1549 }
1550 Err(other) => panic!("Expected Pool error, got: {:?}", other),
1551 Ok(_) => panic!("Expected error, got Ok"),
1552 }
1553 }
1554
1555 #[test]
1556 fn test_lock_or_recover_succeeds_when_poisoned() {
1557 use std::thread;
1558
1559 let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1560
1561 {
1563 let mut inner = shared.inner.lock().unwrap();
1564 inner.total_count = 42;
1565 }
1566
1567 let shared_clone = Arc::clone(&shared);
1569 let handle = thread::spawn(move || {
1570 let _guard = shared_clone.inner.lock().unwrap();
1571 panic!("intentional panic to poison mutex");
1572 });
1573 let _ = handle.join();
1574
1575 assert!(shared.inner.lock().is_err());
1577
1578 let inner = shared.lock_or_recover();
1580 assert_eq!(inner.total_count, 42);
1581 }
1582
1583 #[test]
1584 fn test_close_after_poisoning_recovers_and_closes() {
1585 let pool = poison_pool_mutex();
1586
1587 pool.close();
1589
1590 assert!(pool.is_closed());
1592
1593 assert_eq!(pool.idle_count(), 0);
1595 }
1596
1597 #[test]
1600 fn test_drop_pooled_connection_after_poisoning_does_not_panic() {
1601 use std::panic;
1602 use std::thread;
1603
1604 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1605
1606 {
1608 let mut inner = pool.shared.inner.lock().unwrap();
1609 inner.total_count = 1;
1610 inner.active_count = 1;
1611 }
1612
1613 let meta = ConnectionMeta::new(MockConnection::new(1));
1615 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1616
1617 let shared_clone = Arc::clone(&pool.shared);
1619 let handle = thread::spawn(move || {
1620 let _guard = shared_clone.inner.lock().unwrap();
1621 panic!("intentional panic to poison mutex");
1622 });
1623 let _ = handle.join();
1624
1625 assert!(pool.shared.inner.lock().is_err());
1627
1628 let drop_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1631 drop(pooled);
1632 }));
1633
1634 assert!(
1636 drop_result.is_ok(),
1637 "Dropping PooledConnection after mutex poisoning should not panic"
1638 );
1639 }
1640
1641 #[test]
1642 fn test_detach_after_poisoning_does_not_panic() {
1643 use std::panic;
1644 use std::thread;
1645
1646 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1647
1648 {
1650 let mut inner = pool.shared.inner.lock().unwrap();
1651 inner.total_count = 1;
1652 inner.active_count = 1;
1653 }
1654
1655 let meta = ConnectionMeta::new(MockConnection::new(42));
1657 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1658
1659 let shared_clone = Arc::clone(&pool.shared);
1661 let handle = thread::spawn(move || {
1662 let _guard = shared_clone.inner.lock().unwrap();
1663 panic!("intentional panic to poison mutex");
1664 });
1665 let _ = handle.join();
1666
1667 assert!(pool.shared.inner.lock().is_err());
1669
1670 let detach_result = panic::catch_unwind(panic::AssertUnwindSafe(|| pooled.detach()));
1672
1673 assert!(
1674 detach_result.is_ok(),
1675 "detach() after mutex poisoning should not panic"
1676 );
1677
1678 let conn = detach_result.unwrap();
1680 assert_eq!(conn.id, 42);
1681 }
1682
1683 #[test]
1686 fn test_pool_survives_thread_panic_during_acquire() {
1687 use std::thread;
1688
1689 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1690 let pool_arc = Arc::new(pool);
1691
1692 let pool_clone = Arc::clone(&pool_arc);
1695 let handle = thread::spawn(move || {
1696 {
1698 let mut inner = pool_clone.shared.inner.lock().unwrap();
1699 inner.total_count = 1;
1700 inner.active_count = 1;
1701 }
1702
1703 let _guard = pool_clone.shared.inner.lock().unwrap();
1706 panic!("simulated panic during database operation");
1707 });
1708
1709 let _ = handle.join();
1711
1712 assert_eq!(pool_arc.total_count(), 1);
1714 assert_eq!(pool_arc.config().max_connections, 5);
1715
1716 let stats = pool_arc.stats();
1718 assert_eq!(stats.total_connections, 1);
1719 }
1720
1721 #[test]
1722 fn test_pool_close_after_thread_panic() {
1723 use std::thread;
1724
1725 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1726
1727 {
1729 let mut inner = pool.shared.inner.lock().unwrap();
1730 inner.total_count = 2;
1731 inner
1732 .idle
1733 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1734 inner
1735 .idle
1736 .push_back(ConnectionMeta::new(MockConnection::new(2)));
1737 }
1738
1739 let shared_clone = Arc::clone(&pool.shared);
1741 let handle = thread::spawn(move || {
1742 let _guard = shared_clone.inner.lock().unwrap();
1743 panic!("intentional panic");
1744 });
1745 let _ = handle.join();
1746
1747 pool.close();
1749
1750 assert!(pool.is_closed());
1752 assert_eq!(pool.idle_count(), 0);
1753 }
1754
1755 #[test]
1756 fn test_multiple_reads_after_poisoning() {
1757 let pool = poison_pool_mutex();
1758
1759 for _ in 0..10 {
1761 let _ = pool.config();
1762 let _ = pool.stats();
1763 let _ = pool.at_capacity();
1764 let _ = pool.is_closed();
1765 let _ = pool.idle_count();
1766 let _ = pool.active_count();
1767 let _ = pool.total_count();
1768 }
1769
1770 assert_eq!(pool.total_count(), 2);
1772 }
1773
1774 #[test]
1775 fn test_waiters_count_after_poisoning() {
1776 use std::thread;
1777
1778 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1779
1780 {
1782 let mut inner = pool.shared.inner.lock().unwrap();
1783 inner.waiter_count = 3;
1784 }
1785
1786 let shared_clone = Arc::clone(&pool.shared);
1788 let handle = thread::spawn(move || {
1789 let _guard = shared_clone.inner.lock().unwrap();
1790 panic!("intentional panic");
1791 });
1792 let _ = handle.join();
1793
1794 let stats = pool.stats();
1796 assert_eq!(stats.pending_requests, 3);
1797 }
1798}