1pub mod replica;
46pub use replica::{ReplicaPool, ReplicaStrategy};
47
48pub mod sharding;
49pub use sharding::{ModuloShardChooser, QueryHints, ShardChooser, ShardedPool, ShardedPoolStats};
50
51use std::collections::VecDeque;
52use std::future::Future;
53use std::sync::atomic::{AtomicU64, Ordering};
54use std::sync::{Arc, Condvar, Mutex, Weak};
55use std::time::{Duration, Instant};
56
57use asupersync::{CancelReason, Cx, Outcome};
58use sqlmodel_core::error::{ConnectionError, ConnectionErrorKind, PoolError, PoolErrorKind};
59use sqlmodel_core::{Connection, Error};
60
61#[derive(Debug, Clone)]
63pub struct PoolConfig {
64 pub min_connections: usize,
66 pub max_connections: usize,
68 pub idle_timeout_ms: u64,
70 pub acquire_timeout_ms: u64,
72 pub max_lifetime_ms: u64,
74 pub test_on_checkout: bool,
76 pub test_on_return: bool,
78}
79
80impl Default for PoolConfig {
81 fn default() -> Self {
82 Self {
83 min_connections: 1,
84 max_connections: 10,
85 idle_timeout_ms: 600_000, acquire_timeout_ms: 30_000, max_lifetime_ms: 1_800_000, test_on_checkout: true,
89 test_on_return: false,
90 }
91 }
92}
93
94impl PoolConfig {
95 #[must_use]
97 pub fn new(max_connections: usize) -> Self {
98 Self {
99 max_connections,
100 ..Default::default()
101 }
102 }
103
104 #[must_use]
106 pub fn min_connections(mut self, n: usize) -> Self {
107 self.min_connections = n;
108 self
109 }
110
111 #[must_use]
113 pub fn idle_timeout(mut self, ms: u64) -> Self {
114 self.idle_timeout_ms = ms;
115 self
116 }
117
118 #[must_use]
120 pub fn acquire_timeout(mut self, ms: u64) -> Self {
121 self.acquire_timeout_ms = ms;
122 self
123 }
124
125 #[must_use]
127 pub fn max_lifetime(mut self, ms: u64) -> Self {
128 self.max_lifetime_ms = ms;
129 self
130 }
131
132 #[must_use]
134 pub fn test_on_checkout(mut self, enabled: bool) -> Self {
135 self.test_on_checkout = enabled;
136 self
137 }
138
139 #[must_use]
141 pub fn test_on_return(mut self, enabled: bool) -> Self {
142 self.test_on_return = enabled;
143 self
144 }
145}
146
147#[derive(Debug, Clone, Default)]
149pub struct PoolStats {
150 pub total_connections: usize,
152 pub idle_connections: usize,
154 pub active_connections: usize,
156 pub pending_requests: usize,
158 pub connections_created: u64,
160 pub connections_closed: u64,
162 pub acquires: u64,
164 pub timeouts: u64,
166}
167
168#[derive(Debug)]
170struct ConnectionMeta<C> {
171 conn: C,
173 created_at: Instant,
175 last_used: Instant,
177}
178
179impl<C> ConnectionMeta<C> {
180 fn new(conn: C) -> Self {
181 let now = Instant::now();
182 Self {
183 conn,
184 created_at: now,
185 last_used: now,
186 }
187 }
188
189 fn touch(&mut self) {
190 self.last_used = Instant::now();
191 }
192
193 fn age(&self) -> Duration {
194 self.created_at.elapsed()
195 }
196
197 fn idle_time(&self) -> Duration {
198 self.last_used.elapsed()
199 }
200}
201
202struct PoolInner<C> {
204 config: PoolConfig,
206 idle: VecDeque<ConnectionMeta<C>>,
208 active_count: usize,
210 total_count: usize,
212 waiter_count: usize,
214 closed: bool,
216}
217
218impl<C> PoolInner<C> {
219 fn new(config: PoolConfig) -> Self {
220 Self {
221 config,
222 idle: VecDeque::new(),
223 active_count: 0,
224 total_count: 0,
225 waiter_count: 0,
226 closed: false,
227 }
228 }
229
230 fn can_create_new(&self) -> bool {
231 !self.closed && self.total_count < self.config.max_connections
232 }
233
234 fn stats(&self) -> PoolStats {
235 PoolStats {
236 total_connections: self.total_count,
237 idle_connections: self.idle.len(),
238 active_connections: self.active_count,
239 pending_requests: self.waiter_count,
240 ..Default::default()
241 }
242 }
243}
244
245struct PoolShared<C> {
247 inner: Mutex<PoolInner<C>>,
249 conn_available: Condvar,
251 connections_created: AtomicU64,
253 connections_closed: AtomicU64,
254 acquires: AtomicU64,
255 timeouts: AtomicU64,
256}
257
258impl<C> PoolShared<C> {
259 fn new(config: PoolConfig) -> Self {
260 Self {
261 inner: Mutex::new(PoolInner::new(config)),
262 conn_available: Condvar::new(),
263 connections_created: AtomicU64::new(0),
264 connections_closed: AtomicU64::new(0),
265 acquires: AtomicU64::new(0),
266 timeouts: AtomicU64::new(0),
267 }
268 }
269
270 fn lock_or_recover(&self) -> std::sync::MutexGuard<'_, PoolInner<C>> {
279 self.inner.lock().unwrap_or_else(|poisoned| {
280 tracing::error!(
281 "Pool mutex poisoned; recovering for read-only access. \
282 A thread panicked while holding the lock."
283 );
284 poisoned.into_inner()
285 })
286 }
287
288 #[allow(clippy::result_large_err)] fn lock_or_error(
295 &self,
296 operation: &'static str,
297 ) -> Result<std::sync::MutexGuard<'_, PoolInner<C>>, Error> {
298 self.inner
299 .lock()
300 .map_err(|_| Error::Pool(PoolError::poisoned(operation)))
301 }
302}
303
304pub struct Pool<C: Connection> {
319 shared: Arc<PoolShared<C>>,
320}
321
322impl<C: Connection> Pool<C> {
323 #[must_use]
325 pub fn new(config: PoolConfig) -> Self {
326 Self {
327 shared: Arc::new(PoolShared::new(config)),
328 }
329 }
330
331 #[must_use]
333 pub fn config(&self) -> PoolConfig {
334 let inner = self.shared.lock_or_recover();
335 inner.config.clone()
336 }
337
338 #[must_use]
340 pub fn stats(&self) -> PoolStats {
341 let inner = self.shared.lock_or_recover();
342 let mut stats = inner.stats();
343 stats.connections_created = self.shared.connections_created.load(Ordering::Relaxed);
344 stats.connections_closed = self.shared.connections_closed.load(Ordering::Relaxed);
345 stats.acquires = self.shared.acquires.load(Ordering::Relaxed);
346 stats.timeouts = self.shared.timeouts.load(Ordering::Relaxed);
347 stats
348 }
349
350 #[must_use]
352 pub fn at_capacity(&self) -> bool {
353 let inner = self.shared.lock_or_recover();
354 inner.total_count >= inner.config.max_connections
355 }
356
357 #[must_use]
359 pub fn is_closed(&self) -> bool {
360 let inner = self.shared.lock_or_recover();
361 inner.closed
362 }
363
364 pub async fn acquire<F, Fut>(&self, cx: &Cx, factory: F) -> Outcome<PooledConnection<C>, Error>
379 where
380 F: Fn() -> Fut,
381 Fut: Future<Output = Outcome<C, Error>>,
382 {
383 let deadline = Instant::now() + Duration::from_millis(self.config().acquire_timeout_ms);
384 let test_on_checkout = self.config().test_on_checkout;
385 let max_lifetime = Duration::from_millis(self.config().max_lifetime_ms);
386 let idle_timeout = Duration::from_millis(self.config().idle_timeout_ms);
387
388 loop {
389 if cx.is_cancel_requested() {
391 return Outcome::Cancelled(CancelReason::user("pool acquire cancelled"));
392 }
393
394 if Instant::now() >= deadline {
396 self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
397 return Outcome::Err(Error::Pool(PoolError {
398 kind: PoolErrorKind::Timeout,
399 message: "acquire timeout: no connections available".to_string(),
400 source: None,
401 }));
402 }
403
404 let action = {
406 let mut inner = match self.shared.lock_or_error("acquire") {
407 Ok(guard) => guard,
408 Err(e) => return Outcome::Err(e),
409 };
410
411 if inner.closed {
412 AcquireAction::PoolClosed
413 } else {
414 let mut found_conn = None;
416 while let Some(mut meta) = inner.idle.pop_front() {
417 if meta.age() > max_lifetime {
419 inner.total_count -= 1;
420 self.shared
421 .connections_closed
422 .fetch_add(1, Ordering::Relaxed);
423 continue;
424 }
425
426 if meta.idle_time() > idle_timeout {
428 inner.total_count -= 1;
429 self.shared
430 .connections_closed
431 .fetch_add(1, Ordering::Relaxed);
432 continue;
433 }
434
435 meta.touch();
437 inner.active_count += 1;
438 found_conn = Some(meta);
439 break;
440 }
441
442 if let Some(meta) = found_conn {
443 AcquireAction::ValidateExisting(meta)
444 } else if inner.can_create_new() {
445 inner.total_count += 1;
447 inner.active_count += 1;
448 AcquireAction::CreateNew
449 } else {
450 inner.waiter_count += 1;
452 AcquireAction::Wait
453 }
454 }
455 };
456
457 match action {
458 AcquireAction::PoolClosed => {
459 return Outcome::Err(Error::Pool(PoolError {
460 kind: PoolErrorKind::Closed,
461 message: "pool has been closed".to_string(),
462 source: None,
463 }));
464 }
465 AcquireAction::ValidateExisting(meta) => {
466 return self.validate_and_wrap(cx, meta, test_on_checkout).await;
468 }
469 AcquireAction::CreateNew => {
470 match factory().await {
472 Outcome::Ok(conn) => {
473 self.shared
474 .connections_created
475 .fetch_add(1, Ordering::Relaxed);
476 self.shared.acquires.fetch_add(1, Ordering::Relaxed);
477 let meta = ConnectionMeta::new(conn);
478 return Outcome::Ok(PooledConnection::new(
479 meta,
480 Arc::downgrade(&self.shared),
481 ));
482 }
483 Outcome::Err(e) => {
484 if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
486 inner.total_count -= 1;
487 inner.active_count -= 1;
488 }
489 return Outcome::Err(e);
491 }
492 Outcome::Cancelled(reason) => {
493 if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
494 inner.total_count -= 1;
495 inner.active_count -= 1;
496 }
497 return Outcome::Cancelled(reason);
498 }
499 Outcome::Panicked(info) => {
500 if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
501 inner.total_count -= 1;
502 inner.active_count -= 1;
503 }
504 return Outcome::Panicked(info);
505 }
506 }
507 }
508 AcquireAction::Wait => {
509 let remaining = deadline.saturating_duration_since(Instant::now());
511 if remaining.is_zero() {
512 if let Ok(mut inner) = self.shared.lock_or_error("acquire_timeout") {
513 inner.waiter_count -= 1;
514 }
515 self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
516 return Outcome::Err(Error::Pool(PoolError {
517 kind: PoolErrorKind::Timeout,
518 message: "acquire timeout: no connections available".to_string(),
519 source: None,
520 }));
521 }
522
523 let wait_time = remaining.min(Duration::from_millis(100));
525 {
526 let inner = match self.shared.lock_or_error("acquire_wait") {
527 Ok(guard) => guard,
528 Err(e) => return Outcome::Err(e),
529 };
530 let _ = self
532 .shared
533 .conn_available
534 .wait_timeout(inner, wait_time)
535 .map_err(|_| {
536 tracing::error!("Pool mutex poisoned during wait_timeout");
537 });
538 }
539
540 {
542 if let Ok(mut inner) = self.shared.lock_or_error("acquire_wake") {
543 inner.waiter_count = inner.waiter_count.saturating_sub(1);
544 }
545 }
546
547 }
549 }
550 }
551 }
552
553 async fn validate_and_wrap(
555 &self,
556 cx: &Cx,
557 meta: ConnectionMeta<C>,
558 test_on_checkout: bool,
559 ) -> Outcome<PooledConnection<C>, Error> {
560 if test_on_checkout {
561 match meta.conn.ping(cx).await {
563 Outcome::Ok(()) => {
564 self.shared.acquires.fetch_add(1, Ordering::Relaxed);
565 Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
566 }
567 Outcome::Err(_) | Outcome::Cancelled(_) | Outcome::Panicked(_) => {
568 {
570 if let Ok(mut inner) = self.shared.lock_or_error("validate_cleanup") {
571 inner.total_count -= 1;
572 inner.active_count -= 1;
573 }
574 }
575 self.shared
576 .connections_closed
577 .fetch_add(1, Ordering::Relaxed);
578 Outcome::Err(Error::Connection(ConnectionError {
580 kind: ConnectionErrorKind::Disconnected,
581 message: "connection validation failed".to_string(),
582 source: None,
583 }))
584 }
585 }
586 } else {
587 self.shared.acquires.fetch_add(1, Ordering::Relaxed);
588 Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
589 }
590 }
591
592 pub fn clear_idle(&self) {
596 if let Ok(mut inner) = self.shared.inner.lock() {
597 let idle_count = inner.idle.len();
598 inner.idle.clear();
599 inner.total_count -= idle_count;
600 self.shared
601 .connections_closed
602 .fetch_add(idle_count as u64, Ordering::Relaxed);
603 }
604 }
605
606 pub fn close(&self) {
607 match self.shared.inner.lock() {
608 Ok(mut inner) => {
609 inner.closed = true;
610
611 let idle_count = inner.idle.len();
613 inner.idle.clear();
614 inner.total_count -= idle_count;
615 self.shared
616 .connections_closed
617 .fetch_add(idle_count as u64, Ordering::Relaxed);
618 drop(inner);
619 }
620 Err(poisoned) => {
621 tracing::error!(
624 "Pool mutex poisoned during close; attempting recovery. \
625 Pool state may be inconsistent."
626 );
627 let mut inner = poisoned.into_inner();
628 inner.closed = true;
629 let idle_count = inner.idle.len();
630 inner.idle.clear();
631 inner.total_count -= idle_count;
632 self.shared
633 .connections_closed
634 .fetch_add(idle_count as u64, Ordering::Relaxed);
635 }
636 }
637
638 self.shared.conn_available.notify_all();
640 }
641
642 #[must_use]
644 pub fn idle_count(&self) -> usize {
645 let inner = self.shared.lock_or_recover();
646 inner.idle.len()
647 }
648
649 #[must_use]
651 pub fn active_count(&self) -> usize {
652 let inner = self.shared.lock_or_recover();
653 inner.active_count
654 }
655
656 #[must_use]
658 pub fn total_count(&self) -> usize {
659 let inner = self.shared.lock_or_recover();
660 inner.total_count
661 }
662}
663
664enum AcquireAction<C> {
666 PoolClosed,
668 ValidateExisting(ConnectionMeta<C>),
670 CreateNew,
672 Wait,
674}
675
676pub struct PooledConnection<C: Connection> {
681 meta: Option<ConnectionMeta<C>>,
683 pool: Weak<PoolShared<C>>,
685}
686
687impl<C: Connection> PooledConnection<C> {
688 fn new(meta: ConnectionMeta<C>, pool: Weak<PoolShared<C>>) -> Self {
689 Self {
690 meta: Some(meta),
691 pool,
692 }
693 }
694
695 pub fn detach(mut self) -> C {
700 if let Some(pool) = self.pool.upgrade() {
701 match pool.inner.lock() {
704 Ok(mut inner) => {
705 inner.total_count -= 1;
706 inner.active_count -= 1;
707 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
708 }
709 Err(_poisoned) => {
710 tracing::error!(
711 "Pool mutex poisoned during detach; pool counters will be inconsistent"
712 );
713 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
715 }
716 }
717 }
718 self.meta.take().expect("connection already detached").conn
719 }
720
721 #[must_use]
723 pub fn age(&self) -> Duration {
724 self.meta.as_ref().map_or(Duration::ZERO, |m| m.age())
725 }
726
727 #[must_use]
729 pub fn idle_time(&self) -> Duration {
730 self.meta.as_ref().map_or(Duration::ZERO, |m| m.idle_time())
731 }
732}
733
734impl<C: Connection> std::ops::Deref for PooledConnection<C> {
735 type Target = C;
736
737 fn deref(&self) -> &Self::Target {
738 &self
739 .meta
740 .as_ref()
741 .expect("connection already returned to pool")
742 .conn
743 }
744}
745
746impl<C: Connection> std::ops::DerefMut for PooledConnection<C> {
747 fn deref_mut(&mut self) -> &mut Self::Target {
748 &mut self
749 .meta
750 .as_mut()
751 .expect("connection already returned to pool")
752 .conn
753 }
754}
755
756impl<C: Connection> Drop for PooledConnection<C> {
757 fn drop(&mut self) {
758 if let Some(mut meta) = self.meta.take() {
759 meta.touch(); if let Some(pool) = self.pool.upgrade() {
761 let mut inner = match pool.inner.lock() {
764 Ok(guard) => guard,
765 Err(_poisoned) => {
766 tracing::error!(
767 "Pool mutex poisoned during connection return; \
768 connection will be leaked. A thread panicked while holding the lock."
769 );
770 return;
773 }
774 };
775
776 if inner.closed {
777 inner.total_count -= 1;
778 inner.active_count -= 1;
779 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
780 return;
781 }
782
783 let max_lifetime = Duration::from_millis(inner.config.max_lifetime_ms);
785 if meta.age() > max_lifetime {
786 inner.total_count -= 1;
787 inner.active_count -= 1;
788 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
789 return;
790 }
791
792 inner.active_count -= 1;
793 inner.idle.push_back(meta);
794
795 drop(inner);
796 pool.conn_available.notify_one();
797 }
798 }
800 }
801}
802
803impl<C: Connection + std::fmt::Debug> std::fmt::Debug for PooledConnection<C> {
804 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
805 f.debug_struct("PooledConnection")
806 .field("conn", &self.meta.as_ref().map(|m| &m.conn))
807 .field("age", &self.age())
808 .field("idle_time", &self.idle_time())
809 .finish_non_exhaustive()
810 }
811}
812
813#[cfg(test)]
814mod tests {
815 use super::*;
816 use sqlmodel_core::connection::{IsolationLevel, PreparedStatement, TransactionOps};
817 use sqlmodel_core::{Row, Value};
818 use std::sync::atomic::AtomicBool;
819
820 #[derive(Debug)]
822 struct MockConnection {
823 id: u32,
824 ping_should_fail: Arc<AtomicBool>,
825 }
826
827 impl MockConnection {
828 fn new(id: u32) -> Self {
829 Self {
830 id,
831 ping_should_fail: Arc::new(AtomicBool::new(false)),
832 }
833 }
834
835 #[allow(dead_code)]
836 fn with_ping_behavior(id: u32, should_fail: Arc<AtomicBool>) -> Self {
837 Self {
838 id,
839 ping_should_fail: should_fail,
840 }
841 }
842 }
843
844 struct MockTx;
846
847 impl TransactionOps for MockTx {
848 async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
849 Outcome::Ok(vec![])
850 }
851
852 async fn query_one(
853 &self,
854 _cx: &Cx,
855 _sql: &str,
856 _params: &[Value],
857 ) -> Outcome<Option<Row>, Error> {
858 Outcome::Ok(None)
859 }
860
861 async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
862 Outcome::Ok(0)
863 }
864
865 async fn savepoint(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
866 Outcome::Ok(())
867 }
868
869 async fn rollback_to(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
870 Outcome::Ok(())
871 }
872
873 async fn release(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
874 Outcome::Ok(())
875 }
876
877 async fn commit(self, _cx: &Cx) -> Outcome<(), Error> {
878 Outcome::Ok(())
879 }
880
881 async fn rollback(self, _cx: &Cx) -> Outcome<(), Error> {
882 Outcome::Ok(())
883 }
884 }
885
886 impl Connection for MockConnection {
887 type Tx<'conn> = MockTx;
888
889 async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
890 Outcome::Ok(vec![])
891 }
892
893 async fn query_one(
894 &self,
895 _cx: &Cx,
896 _sql: &str,
897 _params: &[Value],
898 ) -> Outcome<Option<Row>, Error> {
899 Outcome::Ok(None)
900 }
901
902 async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
903 Outcome::Ok(0)
904 }
905
906 async fn insert(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<i64, Error> {
907 Outcome::Ok(0)
908 }
909
910 async fn batch(
911 &self,
912 _cx: &Cx,
913 _statements: &[(String, Vec<Value>)],
914 ) -> Outcome<Vec<u64>, Error> {
915 Outcome::Ok(vec![])
916 }
917
918 async fn begin(&self, _cx: &Cx) -> Outcome<Self::Tx<'_>, Error> {
919 Outcome::Ok(MockTx)
920 }
921
922 async fn begin_with(
923 &self,
924 _cx: &Cx,
925 _isolation: IsolationLevel,
926 ) -> Outcome<Self::Tx<'_>, Error> {
927 Outcome::Ok(MockTx)
928 }
929
930 async fn prepare(&self, _cx: &Cx, _sql: &str) -> Outcome<PreparedStatement, Error> {
931 Outcome::Ok(PreparedStatement::new(1, String::new(), 0))
932 }
933
934 async fn query_prepared(
935 &self,
936 _cx: &Cx,
937 _stmt: &PreparedStatement,
938 _params: &[Value],
939 ) -> Outcome<Vec<Row>, Error> {
940 Outcome::Ok(vec![])
941 }
942
943 async fn execute_prepared(
944 &self,
945 _cx: &Cx,
946 _stmt: &PreparedStatement,
947 _params: &[Value],
948 ) -> Outcome<u64, Error> {
949 Outcome::Ok(0)
950 }
951
952 async fn ping(&self, _cx: &Cx) -> Outcome<(), Error> {
953 if self.ping_should_fail.load(Ordering::Relaxed) {
954 Outcome::Err(Error::Connection(ConnectionError {
955 kind: ConnectionErrorKind::Disconnected,
956 message: "mock ping failed".to_string(),
957 source: None,
958 }))
959 } else {
960 Outcome::Ok(())
961 }
962 }
963
964 async fn close(self, _cx: &Cx) -> Result<(), Error> {
965 Ok(())
966 }
967 }
968
969 #[test]
970 fn test_config_default() {
971 let config = PoolConfig::default();
972 assert_eq!(config.min_connections, 1);
973 assert_eq!(config.max_connections, 10);
974 assert_eq!(config.idle_timeout_ms, 600_000);
975 assert_eq!(config.acquire_timeout_ms, 30_000);
976 assert_eq!(config.max_lifetime_ms, 1_800_000);
977 assert!(config.test_on_checkout);
978 assert!(!config.test_on_return);
979 }
980
981 #[test]
982 fn test_config_builder() {
983 let config = PoolConfig::new(20)
984 .min_connections(5)
985 .idle_timeout(60_000)
986 .acquire_timeout(5_000)
987 .max_lifetime(300_000)
988 .test_on_checkout(false)
989 .test_on_return(true);
990
991 assert_eq!(config.min_connections, 5);
992 assert_eq!(config.max_connections, 20);
993 assert_eq!(config.idle_timeout_ms, 60_000);
994 assert_eq!(config.acquire_timeout_ms, 5_000);
995 assert_eq!(config.max_lifetime_ms, 300_000);
996 assert!(!config.test_on_checkout);
997 assert!(config.test_on_return);
998 }
999
1000 #[test]
1001 fn test_config_clone() {
1002 let config = PoolConfig::new(15).min_connections(3);
1003 let cloned = config.clone();
1004 assert_eq!(config.max_connections, cloned.max_connections);
1005 assert_eq!(config.min_connections, cloned.min_connections);
1006 }
1007
1008 #[test]
1009 fn test_stats_default() {
1010 let stats = PoolStats::default();
1011 assert_eq!(stats.total_connections, 0);
1012 assert_eq!(stats.idle_connections, 0);
1013 assert_eq!(stats.active_connections, 0);
1014 assert_eq!(stats.pending_requests, 0);
1015 assert_eq!(stats.connections_created, 0);
1016 assert_eq!(stats.connections_closed, 0);
1017 assert_eq!(stats.acquires, 0);
1018 assert_eq!(stats.timeouts, 0);
1019 }
1020
1021 #[test]
1022 fn test_stats_clone() {
1023 let stats = PoolStats {
1024 total_connections: 5,
1025 acquires: 100,
1026 ..Default::default()
1027 };
1028 let cloned = stats.clone();
1029 assert_eq!(stats.total_connections, cloned.total_connections);
1030 assert_eq!(stats.acquires, cloned.acquires);
1031 }
1032
1033 #[test]
1034 fn test_connection_meta_timing() {
1035 use std::thread;
1036
1037 struct DummyConn;
1039
1040 let meta = ConnectionMeta::new(DummyConn);
1041 let initial_age = meta.age();
1042
1043 thread::sleep(Duration::from_millis(10));
1045
1046 assert!(meta.age() > initial_age);
1048 assert!(meta.idle_time() > Duration::ZERO);
1049 }
1050
1051 #[test]
1052 fn test_connection_meta_touch() {
1053 use std::thread;
1054
1055 struct DummyConn;
1056
1057 let mut meta = ConnectionMeta::new(DummyConn);
1058
1059 thread::sleep(Duration::from_millis(10));
1061 let idle_before_touch = meta.idle_time();
1062 assert!(idle_before_touch > Duration::ZERO);
1063
1064 meta.touch();
1066 let idle_after_touch = meta.idle_time();
1067
1068 assert!(idle_after_touch < idle_before_touch);
1070 }
1071
1072 #[test]
1073 fn test_pool_new() {
1074 let config = PoolConfig::new(5);
1075 let pool: Pool<MockConnection> = Pool::new(config);
1076
1077 assert_eq!(pool.idle_count(), 0);
1079 assert_eq!(pool.active_count(), 0);
1080 assert_eq!(pool.total_count(), 0);
1081 assert!(!pool.is_closed());
1082 assert!(!pool.at_capacity());
1083 }
1084
1085 #[test]
1086 fn test_pool_config() {
1087 let config = PoolConfig::new(7).min_connections(2);
1088 let pool: Pool<MockConnection> = Pool::new(config);
1089
1090 let retrieved_config = pool.config();
1091 assert_eq!(retrieved_config.max_connections, 7);
1092 assert_eq!(retrieved_config.min_connections, 2);
1093 }
1094
1095 #[test]
1096 fn test_pool_stats_initial() {
1097 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1098
1099 let stats = pool.stats();
1100 assert_eq!(stats.total_connections, 0);
1101 assert_eq!(stats.idle_connections, 0);
1102 assert_eq!(stats.active_connections, 0);
1103 assert_eq!(stats.pending_requests, 0);
1104 assert_eq!(stats.connections_created, 0);
1105 assert_eq!(stats.connections_closed, 0);
1106 assert_eq!(stats.acquires, 0);
1107 assert_eq!(stats.timeouts, 0);
1108 }
1109
1110 #[test]
1111 fn test_pool_close() {
1112 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1113
1114 assert!(!pool.is_closed());
1115 pool.close();
1116 assert!(pool.is_closed());
1117 }
1118
1119 #[test]
1120 fn test_pool_inner_can_create_new() {
1121 let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(3));
1122
1123 assert!(inner.can_create_new());
1125
1126 inner.total_count = 3;
1128 assert!(!inner.can_create_new());
1129
1130 inner.total_count = 2;
1132 assert!(inner.can_create_new());
1133
1134 inner.closed = true;
1136 assert!(!inner.can_create_new());
1137 }
1138
1139 #[test]
1140 fn test_pool_inner_stats() {
1141 let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(10));
1142
1143 inner.total_count = 5;
1144 inner.active_count = 3;
1145 inner.waiter_count = 2;
1146 inner
1147 .idle
1148 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1149 inner
1150 .idle
1151 .push_back(ConnectionMeta::new(MockConnection::new(2)));
1152
1153 let stats = inner.stats();
1154 assert_eq!(stats.total_connections, 5);
1155 assert_eq!(stats.idle_connections, 2);
1156 assert_eq!(stats.active_connections, 3);
1157 assert_eq!(stats.pending_requests, 2);
1158 }
1159
1160 #[test]
1161 fn test_pooled_connection_age_and_idle_time() {
1162 use std::thread;
1163
1164 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1165
1166 {
1168 let mut inner = pool.shared.inner.lock().unwrap();
1169 inner.total_count = 1;
1170 inner.active_count = 1;
1171 }
1172
1173 let meta = ConnectionMeta::new(MockConnection::new(1));
1174 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1175
1176 assert!(pooled.age() >= Duration::ZERO);
1178
1179 thread::sleep(Duration::from_millis(5));
1180 assert!(pooled.age() > Duration::ZERO);
1181 }
1182
1183 #[test]
1184 fn test_pooled_connection_detach() {
1185 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1186
1187 {
1189 let mut inner = pool.shared.inner.lock().unwrap();
1190 inner.total_count = 1;
1191 inner.active_count = 1;
1192 }
1193
1194 let meta = ConnectionMeta::new(MockConnection::new(42));
1195 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1196
1197 assert_eq!(pool.total_count(), 1);
1199 assert_eq!(pool.active_count(), 1);
1200
1201 let conn = pooled.detach();
1203 assert_eq!(conn.id, 42);
1204
1205 assert_eq!(pool.total_count(), 0);
1207 assert_eq!(pool.active_count(), 0);
1208
1209 let stats = pool.stats();
1211 assert_eq!(stats.connections_closed, 1);
1212 }
1213
1214 #[test]
1215 fn test_pooled_connection_drop_returns_to_pool() {
1216 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1217
1218 {
1220 let mut inner = pool.shared.inner.lock().unwrap();
1221 inner.total_count = 1;
1222 inner.active_count = 1;
1223 }
1224
1225 let meta = ConnectionMeta::new(MockConnection::new(1));
1226 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1227
1228 assert_eq!(pool.active_count(), 1);
1230 assert_eq!(pool.idle_count(), 0);
1231
1232 drop(pooled);
1234
1235 assert_eq!(pool.active_count(), 0);
1237 assert_eq!(pool.idle_count(), 1);
1238 assert_eq!(pool.total_count(), 1); }
1240
1241 #[test]
1242 fn test_pooled_connection_drop_when_pool_closed() {
1243 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1244
1245 {
1247 let mut inner = pool.shared.inner.lock().unwrap();
1248 inner.total_count = 1;
1249 inner.active_count = 1;
1250 }
1251
1252 let meta = ConnectionMeta::new(MockConnection::new(1));
1253 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1254
1255 pool.close();
1257
1258 drop(pooled);
1260
1261 assert_eq!(pool.idle_count(), 0);
1263 assert_eq!(pool.active_count(), 0);
1264 assert_eq!(pool.total_count(), 0);
1265
1266 assert_eq!(pool.stats().connections_closed, 1);
1268 }
1269
1270 #[test]
1271 fn test_pooled_connection_deref() {
1272 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1273
1274 {
1276 let mut inner = pool.shared.inner.lock().unwrap();
1277 inner.total_count = 1;
1278 inner.active_count = 1;
1279 }
1280
1281 let meta = ConnectionMeta::new(MockConnection::new(99));
1282 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1283
1284 assert_eq!(pooled.id, 99);
1286 }
1287
1288 #[test]
1289 fn test_pooled_connection_deref_mut() {
1290 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1291
1292 {
1294 let mut inner = pool.shared.inner.lock().unwrap();
1295 inner.total_count = 1;
1296 inner.active_count = 1;
1297 }
1298
1299 let meta = ConnectionMeta::new(MockConnection::new(1));
1300 let mut pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1301
1302 pooled.id = 50;
1304 assert_eq!(pooled.id, 50);
1305 }
1306
1307 #[test]
1308 fn test_pooled_connection_debug() {
1309 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1310
1311 {
1313 let mut inner = pool.shared.inner.lock().unwrap();
1314 inner.total_count = 1;
1315 inner.active_count = 1;
1316 }
1317
1318 let meta = ConnectionMeta::new(MockConnection::new(1));
1319 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1320
1321 let debug_str = format!("{:?}", pooled);
1322 assert!(debug_str.contains("PooledConnection"));
1323 assert!(debug_str.contains("age"));
1324 }
1325
1326 #[test]
1327 fn test_pool_at_capacity() {
1328 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(2));
1329
1330 assert!(!pool.at_capacity());
1331
1332 {
1334 let mut inner = pool.shared.inner.lock().unwrap();
1335 inner.total_count = 1;
1336 }
1337 assert!(!pool.at_capacity());
1338
1339 {
1340 let mut inner = pool.shared.inner.lock().unwrap();
1341 inner.total_count = 2;
1342 }
1343 assert!(pool.at_capacity());
1344 }
1345
1346 #[test]
1347 fn test_acquire_action_enum() {
1348 let closed: AcquireAction<MockConnection> = AcquireAction::PoolClosed;
1350 assert!(matches!(closed, AcquireAction::PoolClosed));
1351
1352 let create: AcquireAction<MockConnection> = AcquireAction::CreateNew;
1353 assert!(matches!(create, AcquireAction::CreateNew));
1354
1355 let wait: AcquireAction<MockConnection> = AcquireAction::Wait;
1356 assert!(matches!(wait, AcquireAction::Wait));
1357
1358 let meta = ConnectionMeta::new(MockConnection::new(1));
1359 let validate: AcquireAction<MockConnection> = AcquireAction::ValidateExisting(meta);
1360 assert!(matches!(validate, AcquireAction::ValidateExisting(_)));
1361 }
1362
1363 #[test]
1364 fn test_pool_shared_atomic_counters() {
1365 let shared = PoolShared::<MockConnection>::new(PoolConfig::new(5));
1366
1367 assert_eq!(shared.connections_created.load(Ordering::Relaxed), 0);
1369 assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 0);
1370 assert_eq!(shared.acquires.load(Ordering::Relaxed), 0);
1371 assert_eq!(shared.timeouts.load(Ordering::Relaxed), 0);
1372
1373 shared.connections_created.fetch_add(1, Ordering::Relaxed);
1375 shared.connections_closed.fetch_add(2, Ordering::Relaxed);
1376 shared.acquires.fetch_add(10, Ordering::Relaxed);
1377 shared.timeouts.fetch_add(3, Ordering::Relaxed);
1378
1379 assert_eq!(shared.connections_created.load(Ordering::Relaxed), 1);
1380 assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 2);
1381 assert_eq!(shared.acquires.load(Ordering::Relaxed), 10);
1382 assert_eq!(shared.timeouts.load(Ordering::Relaxed), 3);
1383 }
1384
1385 #[test]
1386 fn test_pool_close_clears_idle() {
1387 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1388
1389 {
1391 let mut inner = pool.shared.inner.lock().unwrap();
1392 inner.total_count = 3;
1393 inner
1394 .idle
1395 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1396 inner
1397 .idle
1398 .push_back(ConnectionMeta::new(MockConnection::new(2)));
1399 inner
1400 .idle
1401 .push_back(ConnectionMeta::new(MockConnection::new(3)));
1402 }
1403
1404 assert_eq!(pool.idle_count(), 3);
1405 assert_eq!(pool.total_count(), 3);
1406
1407 pool.close();
1408
1409 assert_eq!(pool.idle_count(), 0);
1411 assert_eq!(pool.total_count(), 0);
1412 assert!(pool.is_closed());
1413
1414 assert_eq!(pool.stats().connections_closed, 3);
1416 }
1417
1418 fn poison_pool_mutex() -> Pool<MockConnection> {
1431 use std::panic;
1432 use std::thread;
1433
1434 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1435
1436 {
1438 let mut inner = pool.shared.inner.lock().unwrap();
1439 inner.total_count = 2;
1440 inner.active_count = 1;
1441 inner
1442 .idle
1443 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1444 }
1445
1446 let shared_clone = Arc::clone(&pool.shared);
1448 let handle = thread::spawn(move || {
1449 let _guard = shared_clone.inner.lock().unwrap();
1450 panic!("intentional panic to poison mutex");
1452 });
1453
1454 let _ = handle.join();
1456
1457 assert!(pool.shared.inner.lock().is_err());
1459
1460 pool
1461 }
1462
1463 #[test]
1466 fn test_config_after_poisoning_returns_valid_data() {
1467 let pool = poison_pool_mutex();
1468
1469 let config = pool.config();
1471 assert_eq!(config.max_connections, 5);
1472 }
1473
1474 #[test]
1475 fn test_stats_after_poisoning_returns_valid_data() {
1476 let pool = poison_pool_mutex();
1477
1478 let stats = pool.stats();
1480 assert_eq!(stats.total_connections, 2);
1482 assert_eq!(stats.active_connections, 1);
1483 assert_eq!(stats.idle_connections, 1);
1484 }
1485
1486 #[test]
1487 fn test_at_capacity_after_poisoning() {
1488 let pool = poison_pool_mutex();
1489
1490 assert!(!pool.at_capacity());
1493 }
1494
1495 #[test]
1496 fn test_is_closed_after_poisoning() {
1497 let pool = poison_pool_mutex();
1498
1499 assert!(!pool.is_closed());
1501 }
1502
1503 #[test]
1504 fn test_idle_count_after_poisoning() {
1505 let pool = poison_pool_mutex();
1506
1507 assert_eq!(pool.idle_count(), 1);
1509 }
1510
1511 #[test]
1512 fn test_active_count_after_poisoning() {
1513 let pool = poison_pool_mutex();
1514
1515 assert_eq!(pool.active_count(), 1);
1517 }
1518
1519 #[test]
1520 fn test_total_count_after_poisoning() {
1521 let pool = poison_pool_mutex();
1522
1523 assert_eq!(pool.total_count(), 2);
1525 }
1526
1527 #[test]
1530 fn test_lock_or_error_returns_error_when_poisoned() {
1531 use std::thread;
1532
1533 let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1534
1535 let shared_clone = Arc::clone(&shared);
1537 let handle = thread::spawn(move || {
1538 let _guard = shared_clone.inner.lock().unwrap();
1539 panic!("intentional panic to poison mutex");
1540 });
1541 let _ = handle.join();
1542
1543 let result = shared.lock_or_error("test_operation");
1545
1546 match result {
1548 Err(Error::Pool(pool_err)) => {
1549 assert!(matches!(pool_err.kind, PoolErrorKind::Poisoned));
1550 assert!(pool_err.message.contains("poisoned"));
1551 }
1552 Err(other) => panic!("Expected Pool error, got: {:?}", other),
1553 Ok(_) => panic!("Expected error, got Ok"),
1554 }
1555 }
1556
1557 #[test]
1558 fn test_lock_or_recover_succeeds_when_poisoned() {
1559 use std::thread;
1560
1561 let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1562
1563 {
1565 let mut inner = shared.inner.lock().unwrap();
1566 inner.total_count = 42;
1567 }
1568
1569 let shared_clone = Arc::clone(&shared);
1571 let handle = thread::spawn(move || {
1572 let _guard = shared_clone.inner.lock().unwrap();
1573 panic!("intentional panic to poison mutex");
1574 });
1575 let _ = handle.join();
1576
1577 assert!(shared.inner.lock().is_err());
1579
1580 let inner = shared.lock_or_recover();
1582 assert_eq!(inner.total_count, 42);
1583 }
1584
1585 #[test]
1586 fn test_close_after_poisoning_recovers_and_closes() {
1587 let pool = poison_pool_mutex();
1588
1589 pool.close();
1591
1592 assert!(pool.is_closed());
1594
1595 assert_eq!(pool.idle_count(), 0);
1597 }
1598
1599 #[test]
1602 fn test_drop_pooled_connection_after_poisoning_does_not_panic() {
1603 use std::panic;
1604 use std::thread;
1605
1606 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1607
1608 {
1610 let mut inner = pool.shared.inner.lock().unwrap();
1611 inner.total_count = 1;
1612 inner.active_count = 1;
1613 }
1614
1615 let meta = ConnectionMeta::new(MockConnection::new(1));
1617 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1618
1619 let shared_clone = Arc::clone(&pool.shared);
1621 let handle = thread::spawn(move || {
1622 let _guard = shared_clone.inner.lock().unwrap();
1623 panic!("intentional panic to poison mutex");
1624 });
1625 let _ = handle.join();
1626
1627 assert!(pool.shared.inner.lock().is_err());
1629
1630 let drop_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1633 drop(pooled);
1634 }));
1635
1636 assert!(
1638 drop_result.is_ok(),
1639 "Dropping PooledConnection after mutex poisoning should not panic"
1640 );
1641 }
1642
1643 #[test]
1644 fn test_detach_after_poisoning_does_not_panic() {
1645 use std::panic;
1646 use std::thread;
1647
1648 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1649
1650 {
1652 let mut inner = pool.shared.inner.lock().unwrap();
1653 inner.total_count = 1;
1654 inner.active_count = 1;
1655 }
1656
1657 let meta = ConnectionMeta::new(MockConnection::new(42));
1659 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1660
1661 let shared_clone = Arc::clone(&pool.shared);
1663 let handle = thread::spawn(move || {
1664 let _guard = shared_clone.inner.lock().unwrap();
1665 panic!("intentional panic to poison mutex");
1666 });
1667 let _ = handle.join();
1668
1669 assert!(pool.shared.inner.lock().is_err());
1671
1672 let detach_result = panic::catch_unwind(panic::AssertUnwindSafe(|| pooled.detach()));
1674
1675 assert!(
1676 detach_result.is_ok(),
1677 "detach() after mutex poisoning should not panic"
1678 );
1679
1680 let conn = detach_result.unwrap();
1682 assert_eq!(conn.id, 42);
1683 }
1684
1685 #[test]
1688 fn test_pool_survives_thread_panic_during_acquire() {
1689 use std::thread;
1690
1691 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1692 let pool_arc = Arc::new(pool);
1693
1694 let pool_clone = Arc::clone(&pool_arc);
1697 let handle = thread::spawn(move || {
1698 {
1700 let mut inner = pool_clone.shared.inner.lock().unwrap();
1701 inner.total_count = 1;
1702 inner.active_count = 1;
1703 }
1704
1705 let _guard = pool_clone.shared.inner.lock().unwrap();
1708 panic!("simulated panic during database operation");
1709 });
1710
1711 let _ = handle.join();
1713
1714 assert_eq!(pool_arc.total_count(), 1);
1716 assert_eq!(pool_arc.config().max_connections, 5);
1717
1718 let stats = pool_arc.stats();
1720 assert_eq!(stats.total_connections, 1);
1721 }
1722
1723 #[test]
1724 fn test_pool_close_after_thread_panic() {
1725 use std::thread;
1726
1727 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1728
1729 {
1731 let mut inner = pool.shared.inner.lock().unwrap();
1732 inner.total_count = 2;
1733 inner
1734 .idle
1735 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1736 inner
1737 .idle
1738 .push_back(ConnectionMeta::new(MockConnection::new(2)));
1739 }
1740
1741 let shared_clone = Arc::clone(&pool.shared);
1743 let handle = thread::spawn(move || {
1744 let _guard = shared_clone.inner.lock().unwrap();
1745 panic!("intentional panic");
1746 });
1747 let _ = handle.join();
1748
1749 pool.close();
1751
1752 assert!(pool.is_closed());
1754 assert_eq!(pool.idle_count(), 0);
1755 }
1756
1757 #[test]
1758 fn test_multiple_reads_after_poisoning() {
1759 let pool = poison_pool_mutex();
1760
1761 for _ in 0..10 {
1763 let _ = pool.config();
1764 let _ = pool.stats();
1765 let _ = pool.at_capacity();
1766 let _ = pool.is_closed();
1767 let _ = pool.idle_count();
1768 let _ = pool.active_count();
1769 let _ = pool.total_count();
1770 }
1771
1772 assert_eq!(pool.total_count(), 2);
1774 }
1775
1776 #[test]
1777 fn test_waiters_count_after_poisoning() {
1778 use std::thread;
1779
1780 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1781
1782 {
1784 let mut inner = pool.shared.inner.lock().unwrap();
1785 inner.waiter_count = 3;
1786 }
1787
1788 let shared_clone = Arc::clone(&pool.shared);
1790 let handle = thread::spawn(move || {
1791 let _guard = shared_clone.inner.lock().unwrap();
1792 panic!("intentional panic");
1793 });
1794 let _ = handle.join();
1795
1796 let stats = pool.stats();
1798 assert_eq!(stats.pending_requests, 3);
1799 }
1800}