1pub mod replica;
46pub use replica::{ReplicaPool, ReplicaStrategy};
47
48pub mod sharding;
49pub use sharding::{ModuloShardChooser, QueryHints, ShardChooser, ShardedPool, ShardedPoolStats};
50
51use std::collections::VecDeque;
52use std::future::Future;
53use std::sync::atomic::{AtomicU64, Ordering};
54use std::sync::{Arc, Condvar, Mutex, Weak};
55use std::time::{Duration, Instant};
56
57use asupersync::{CancelReason, Cx, Outcome};
58use sqlmodel_core::error::{ConnectionError, ConnectionErrorKind, PoolError, PoolErrorKind};
59use sqlmodel_core::{Connection, Error};
60
61#[derive(Debug, Clone)]
63pub struct PoolConfig {
64 pub min_connections: usize,
66 pub max_connections: usize,
68 pub idle_timeout_ms: u64,
70 pub acquire_timeout_ms: u64,
72 pub max_lifetime_ms: u64,
74 pub test_on_checkout: bool,
76 pub test_on_return: bool,
78}
79
80impl Default for PoolConfig {
81 fn default() -> Self {
82 Self {
83 min_connections: 1,
84 max_connections: 10,
85 idle_timeout_ms: 600_000, acquire_timeout_ms: 30_000, max_lifetime_ms: 1_800_000, test_on_checkout: true,
89 test_on_return: false,
90 }
91 }
92}
93
94impl PoolConfig {
95 #[must_use]
97 pub fn new(max_connections: usize) -> Self {
98 Self {
99 max_connections,
100 ..Default::default()
101 }
102 }
103
104 #[must_use]
106 pub fn min_connections(mut self, n: usize) -> Self {
107 self.min_connections = n;
108 self
109 }
110
111 #[must_use]
113 pub fn idle_timeout(mut self, ms: u64) -> Self {
114 self.idle_timeout_ms = ms;
115 self
116 }
117
118 #[must_use]
120 pub fn acquire_timeout(mut self, ms: u64) -> Self {
121 self.acquire_timeout_ms = ms;
122 self
123 }
124
125 #[must_use]
127 pub fn max_lifetime(mut self, ms: u64) -> Self {
128 self.max_lifetime_ms = ms;
129 self
130 }
131
132 #[must_use]
134 pub fn test_on_checkout(mut self, enabled: bool) -> Self {
135 self.test_on_checkout = enabled;
136 self
137 }
138
139 #[must_use]
141 pub fn test_on_return(mut self, enabled: bool) -> Self {
142 self.test_on_return = enabled;
143 self
144 }
145}
146
147#[derive(Debug, Clone, Default)]
149pub struct PoolStats {
150 pub total_connections: usize,
152 pub idle_connections: usize,
154 pub active_connections: usize,
156 pub pending_requests: usize,
158 pub connections_created: u64,
160 pub connections_closed: u64,
162 pub acquires: u64,
164 pub timeouts: u64,
166}
167
168#[derive(Debug)]
170struct ConnectionMeta<C> {
171 conn: C,
173 created_at: Instant,
175 last_used: Instant,
177}
178
179impl<C> ConnectionMeta<C> {
180 fn new(conn: C) -> Self {
181 let now = Instant::now();
182 Self {
183 conn,
184 created_at: now,
185 last_used: now,
186 }
187 }
188
189 fn touch(&mut self) {
190 self.last_used = Instant::now();
191 }
192
193 fn age(&self) -> Duration {
194 self.created_at.elapsed()
195 }
196
197 fn idle_time(&self) -> Duration {
198 self.last_used.elapsed()
199 }
200}
201
202struct PoolInner<C> {
204 config: PoolConfig,
206 idle: VecDeque<ConnectionMeta<C>>,
208 active_count: usize,
210 total_count: usize,
212 waiter_count: usize,
214 closed: bool,
216}
217
218impl<C> PoolInner<C> {
219 fn new(config: PoolConfig) -> Self {
220 Self {
221 config,
222 idle: VecDeque::new(),
223 active_count: 0,
224 total_count: 0,
225 waiter_count: 0,
226 closed: false,
227 }
228 }
229
230 fn can_create_new(&self) -> bool {
231 !self.closed && self.total_count < self.config.max_connections
232 }
233
234 fn stats(&self) -> PoolStats {
235 PoolStats {
236 total_connections: self.total_count,
237 idle_connections: self.idle.len(),
238 active_connections: self.active_count,
239 pending_requests: self.waiter_count,
240 ..Default::default()
241 }
242 }
243}
244
245struct PoolShared<C> {
247 inner: Mutex<PoolInner<C>>,
249 conn_available: Condvar,
251 connections_created: AtomicU64,
253 connections_closed: AtomicU64,
254 acquires: AtomicU64,
255 timeouts: AtomicU64,
256}
257
258impl<C> PoolShared<C> {
259 fn new(config: PoolConfig) -> Self {
260 Self {
261 inner: Mutex::new(PoolInner::new(config)),
262 conn_available: Condvar::new(),
263 connections_created: AtomicU64::new(0),
264 connections_closed: AtomicU64::new(0),
265 acquires: AtomicU64::new(0),
266 timeouts: AtomicU64::new(0),
267 }
268 }
269
270 fn lock_or_recover(&self) -> std::sync::MutexGuard<'_, PoolInner<C>> {
279 self.inner.lock().unwrap_or_else(|poisoned| {
280 tracing::error!(
281 "Pool mutex poisoned; recovering for read-only access. \
282 A thread panicked while holding the lock."
283 );
284 poisoned.into_inner()
285 })
286 }
287
288 #[allow(clippy::result_large_err)] fn lock_or_error(
295 &self,
296 operation: &'static str,
297 ) -> Result<std::sync::MutexGuard<'_, PoolInner<C>>, Error> {
298 self.inner
299 .lock()
300 .map_err(|_| Error::Pool(PoolError::poisoned(operation)))
301 }
302}
303
304pub struct Pool<C: Connection> {
319 shared: Arc<PoolShared<C>>,
320}
321
322impl<C: Connection> Pool<C> {
323 #[must_use]
325 pub fn new(config: PoolConfig) -> Self {
326 Self {
327 shared: Arc::new(PoolShared::new(config)),
328 }
329 }
330
331 #[must_use]
333 pub fn config(&self) -> PoolConfig {
334 let inner = self.shared.lock_or_recover();
335 inner.config.clone()
336 }
337
338 #[must_use]
340 pub fn stats(&self) -> PoolStats {
341 let inner = self.shared.lock_or_recover();
342 let mut stats = inner.stats();
343 stats.connections_created = self.shared.connections_created.load(Ordering::Relaxed);
344 stats.connections_closed = self.shared.connections_closed.load(Ordering::Relaxed);
345 stats.acquires = self.shared.acquires.load(Ordering::Relaxed);
346 stats.timeouts = self.shared.timeouts.load(Ordering::Relaxed);
347 stats
348 }
349
350 #[must_use]
352 pub fn at_capacity(&self) -> bool {
353 let inner = self.shared.lock_or_recover();
354 inner.total_count >= inner.config.max_connections
355 }
356
357 #[must_use]
359 pub fn is_closed(&self) -> bool {
360 let inner = self.shared.lock_or_recover();
361 inner.closed
362 }
363
364 pub async fn acquire<F, Fut>(&self, cx: &Cx, factory: F) -> Outcome<PooledConnection<C>, Error>
379 where
380 F: Fn() -> Fut,
381 Fut: Future<Output = Outcome<C, Error>>,
382 {
383 let deadline = Instant::now() + Duration::from_millis(self.config().acquire_timeout_ms);
384 let test_on_checkout = self.config().test_on_checkout;
385 let max_lifetime = Duration::from_millis(self.config().max_lifetime_ms);
386 let idle_timeout = Duration::from_millis(self.config().idle_timeout_ms);
387
388 loop {
389 if cx.is_cancel_requested() {
391 return Outcome::Cancelled(CancelReason::user("pool acquire cancelled"));
392 }
393
394 if Instant::now() >= deadline {
396 self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
397 return Outcome::Err(Error::Pool(PoolError {
398 kind: PoolErrorKind::Timeout,
399 message: "acquire timeout: no connections available".to_string(),
400 source: None,
401 }));
402 }
403
404 let action = {
406 let mut inner = match self.shared.lock_or_error("acquire") {
407 Ok(guard) => guard,
408 Err(e) => return Outcome::Err(e),
409 };
410
411 if inner.closed {
412 AcquireAction::PoolClosed
413 } else {
414 let mut found_conn = None;
416 while let Some(mut meta) = inner.idle.pop_front() {
417 if meta.age() > max_lifetime {
419 inner.total_count -= 1;
420 self.shared
421 .connections_closed
422 .fetch_add(1, Ordering::Relaxed);
423 continue;
424 }
425
426 if meta.idle_time() > idle_timeout {
428 inner.total_count -= 1;
429 self.shared
430 .connections_closed
431 .fetch_add(1, Ordering::Relaxed);
432 continue;
433 }
434
435 meta.touch();
437 inner.active_count += 1;
438 found_conn = Some(meta);
439 break;
440 }
441
442 if let Some(meta) = found_conn {
443 AcquireAction::ValidateExisting(meta)
444 } else if inner.can_create_new() {
445 inner.total_count += 1;
447 inner.active_count += 1;
448 AcquireAction::CreateNew
449 } else {
450 inner.waiter_count += 1;
452 AcquireAction::Wait
453 }
454 }
455 };
456
457 match action {
458 AcquireAction::PoolClosed => {
459 return Outcome::Err(Error::Pool(PoolError {
460 kind: PoolErrorKind::Closed,
461 message: "pool has been closed".to_string(),
462 source: None,
463 }));
464 }
465 AcquireAction::ValidateExisting(meta) => {
466 return self.validate_and_wrap(cx, meta, test_on_checkout).await;
468 }
469 AcquireAction::CreateNew => {
470 match factory().await {
472 Outcome::Ok(conn) => {
473 self.shared
474 .connections_created
475 .fetch_add(1, Ordering::Relaxed);
476 self.shared.acquires.fetch_add(1, Ordering::Relaxed);
477 let meta = ConnectionMeta::new(conn);
478 return Outcome::Ok(PooledConnection::new(
479 meta,
480 Arc::downgrade(&self.shared),
481 ));
482 }
483 Outcome::Err(e) => {
484 if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
486 inner.total_count -= 1;
487 inner.active_count -= 1;
488 }
489 return Outcome::Err(e);
491 }
492 Outcome::Cancelled(reason) => {
493 if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
494 inner.total_count -= 1;
495 inner.active_count -= 1;
496 }
497 return Outcome::Cancelled(reason);
498 }
499 Outcome::Panicked(info) => {
500 if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
501 inner.total_count -= 1;
502 inner.active_count -= 1;
503 }
504 return Outcome::Panicked(info);
505 }
506 }
507 }
508 AcquireAction::Wait => {
509 let remaining = deadline.saturating_duration_since(Instant::now());
511 if remaining.is_zero() {
512 if let Ok(mut inner) = self.shared.lock_or_error("acquire_timeout") {
513 inner.waiter_count -= 1;
514 }
515 self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
516 return Outcome::Err(Error::Pool(PoolError {
517 kind: PoolErrorKind::Timeout,
518 message: "acquire timeout: no connections available".to_string(),
519 source: None,
520 }));
521 }
522
523 let wait_time = remaining.min(Duration::from_millis(100));
525 {
526 let inner = match self.shared.lock_or_error("acquire_wait") {
527 Ok(guard) => guard,
528 Err(e) => return Outcome::Err(e),
529 };
530 let _ = self
532 .shared
533 .conn_available
534 .wait_timeout(inner, wait_time)
535 .map_err(|_| {
536 tracing::error!("Pool mutex poisoned during wait_timeout");
537 });
538 }
539
540 {
542 if let Ok(mut inner) = self.shared.lock_or_error("acquire_wake") {
543 inner.waiter_count = inner.waiter_count.saturating_sub(1);
544 }
545 }
546
547 }
549 }
550 }
551 }
552
553 async fn validate_and_wrap(
555 &self,
556 cx: &Cx,
557 meta: ConnectionMeta<C>,
558 test_on_checkout: bool,
559 ) -> Outcome<PooledConnection<C>, Error> {
560 if test_on_checkout {
561 match meta.conn.ping(cx).await {
563 Outcome::Ok(()) => {
564 self.shared.acquires.fetch_add(1, Ordering::Relaxed);
565 Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
566 }
567 Outcome::Err(_) | Outcome::Cancelled(_) | Outcome::Panicked(_) => {
568 {
570 if let Ok(mut inner) = self.shared.lock_or_error("validate_cleanup") {
571 inner.total_count -= 1;
572 inner.active_count -= 1;
573 }
574 }
575 self.shared
576 .connections_closed
577 .fetch_add(1, Ordering::Relaxed);
578 Outcome::Err(Error::Connection(ConnectionError {
580 kind: ConnectionErrorKind::Disconnected,
581 message: "connection validation failed".to_string(),
582 source: None,
583 }))
584 }
585 }
586 } else {
587 self.shared.acquires.fetch_add(1, Ordering::Relaxed);
588 Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
589 }
590 }
591
592 pub fn close(&self) {
596 match self.shared.inner.lock() {
597 Ok(mut inner) => {
598 inner.closed = true;
599
600 let idle_count = inner.idle.len();
602 inner.idle.clear();
603 inner.total_count -= idle_count;
604 self.shared
605 .connections_closed
606 .fetch_add(idle_count as u64, Ordering::Relaxed);
607 drop(inner);
608 }
609 Err(poisoned) => {
610 tracing::error!(
613 "Pool mutex poisoned during close; attempting recovery. \
614 Pool state may be inconsistent."
615 );
616 let mut inner = poisoned.into_inner();
617 inner.closed = true;
618 let idle_count = inner.idle.len();
619 inner.idle.clear();
620 inner.total_count -= idle_count;
621 self.shared
622 .connections_closed
623 .fetch_add(idle_count as u64, Ordering::Relaxed);
624 }
625 }
626
627 self.shared.conn_available.notify_all();
629 }
630
631 #[must_use]
633 pub fn idle_count(&self) -> usize {
634 let inner = self.shared.lock_or_recover();
635 inner.idle.len()
636 }
637
638 #[must_use]
640 pub fn active_count(&self) -> usize {
641 let inner = self.shared.lock_or_recover();
642 inner.active_count
643 }
644
645 #[must_use]
647 pub fn total_count(&self) -> usize {
648 let inner = self.shared.lock_or_recover();
649 inner.total_count
650 }
651}
652
653enum AcquireAction<C> {
655 PoolClosed,
657 ValidateExisting(ConnectionMeta<C>),
659 CreateNew,
661 Wait,
663}
664
665pub struct PooledConnection<C: Connection> {
670 meta: Option<ConnectionMeta<C>>,
672 pool: Weak<PoolShared<C>>,
674}
675
676impl<C: Connection> PooledConnection<C> {
677 fn new(meta: ConnectionMeta<C>, pool: Weak<PoolShared<C>>) -> Self {
678 Self {
679 meta: Some(meta),
680 pool,
681 }
682 }
683
684 pub fn detach(mut self) -> C {
689 if let Some(pool) = self.pool.upgrade() {
690 match pool.inner.lock() {
693 Ok(mut inner) => {
694 inner.total_count -= 1;
695 inner.active_count -= 1;
696 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
697 }
698 Err(_poisoned) => {
699 tracing::error!(
700 "Pool mutex poisoned during detach; pool counters will be inconsistent"
701 );
702 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
704 }
705 }
706 }
707 self.meta.take().expect("connection already detached").conn
708 }
709
710 #[must_use]
712 pub fn age(&self) -> Duration {
713 self.meta.as_ref().map_or(Duration::ZERO, |m| m.age())
714 }
715
716 #[must_use]
718 pub fn idle_time(&self) -> Duration {
719 self.meta.as_ref().map_or(Duration::ZERO, |m| m.idle_time())
720 }
721}
722
723impl<C: Connection> std::ops::Deref for PooledConnection<C> {
724 type Target = C;
725
726 fn deref(&self) -> &Self::Target {
727 &self
728 .meta
729 .as_ref()
730 .expect("connection already returned to pool")
731 .conn
732 }
733}
734
735impl<C: Connection> std::ops::DerefMut for PooledConnection<C> {
736 fn deref_mut(&mut self) -> &mut Self::Target {
737 &mut self
738 .meta
739 .as_mut()
740 .expect("connection already returned to pool")
741 .conn
742 }
743}
744
745impl<C: Connection> Drop for PooledConnection<C> {
746 fn drop(&mut self) {
747 if let Some(mut meta) = self.meta.take() {
748 meta.touch(); if let Some(pool) = self.pool.upgrade() {
750 let mut inner = match pool.inner.lock() {
753 Ok(guard) => guard,
754 Err(_poisoned) => {
755 tracing::error!(
756 "Pool mutex poisoned during connection return; \
757 connection will be leaked. A thread panicked while holding the lock."
758 );
759 return;
762 }
763 };
764
765 if inner.closed {
766 inner.total_count -= 1;
767 inner.active_count -= 1;
768 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
769 return;
770 }
771
772 let max_lifetime = Duration::from_millis(inner.config.max_lifetime_ms);
774 if meta.age() > max_lifetime {
775 inner.total_count -= 1;
776 inner.active_count -= 1;
777 pool.connections_closed.fetch_add(1, Ordering::Relaxed);
778 return;
779 }
780
781 inner.active_count -= 1;
782 inner.idle.push_back(meta);
783
784 drop(inner);
785 pool.conn_available.notify_one();
786 }
787 }
789 }
790}
791
792impl<C: Connection + std::fmt::Debug> std::fmt::Debug for PooledConnection<C> {
793 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
794 f.debug_struct("PooledConnection")
795 .field("conn", &self.meta.as_ref().map(|m| &m.conn))
796 .field("age", &self.age())
797 .field("idle_time", &self.idle_time())
798 .finish_non_exhaustive()
799 }
800}
801
802#[cfg(test)]
803mod tests {
804 use super::*;
805 use sqlmodel_core::connection::{IsolationLevel, PreparedStatement, TransactionOps};
806 use sqlmodel_core::{Row, Value};
807 use std::sync::atomic::AtomicBool;
808
809 #[derive(Debug)]
811 struct MockConnection {
812 id: u32,
813 ping_should_fail: Arc<AtomicBool>,
814 }
815
816 impl MockConnection {
817 fn new(id: u32) -> Self {
818 Self {
819 id,
820 ping_should_fail: Arc::new(AtomicBool::new(false)),
821 }
822 }
823
824 #[allow(dead_code)]
825 fn with_ping_behavior(id: u32, should_fail: Arc<AtomicBool>) -> Self {
826 Self {
827 id,
828 ping_should_fail: should_fail,
829 }
830 }
831 }
832
833 struct MockTx;
835
836 impl TransactionOps for MockTx {
837 async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
838 Outcome::Ok(vec![])
839 }
840
841 async fn query_one(
842 &self,
843 _cx: &Cx,
844 _sql: &str,
845 _params: &[Value],
846 ) -> Outcome<Option<Row>, Error> {
847 Outcome::Ok(None)
848 }
849
850 async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
851 Outcome::Ok(0)
852 }
853
854 async fn savepoint(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
855 Outcome::Ok(())
856 }
857
858 async fn rollback_to(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
859 Outcome::Ok(())
860 }
861
862 async fn release(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
863 Outcome::Ok(())
864 }
865
866 async fn commit(self, _cx: &Cx) -> Outcome<(), Error> {
867 Outcome::Ok(())
868 }
869
870 async fn rollback(self, _cx: &Cx) -> Outcome<(), Error> {
871 Outcome::Ok(())
872 }
873 }
874
875 impl Connection for MockConnection {
876 type Tx<'conn> = MockTx;
877
878 async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
879 Outcome::Ok(vec![])
880 }
881
882 async fn query_one(
883 &self,
884 _cx: &Cx,
885 _sql: &str,
886 _params: &[Value],
887 ) -> Outcome<Option<Row>, Error> {
888 Outcome::Ok(None)
889 }
890
891 async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
892 Outcome::Ok(0)
893 }
894
895 async fn insert(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<i64, Error> {
896 Outcome::Ok(0)
897 }
898
899 async fn batch(
900 &self,
901 _cx: &Cx,
902 _statements: &[(String, Vec<Value>)],
903 ) -> Outcome<Vec<u64>, Error> {
904 Outcome::Ok(vec![])
905 }
906
907 async fn begin(&self, _cx: &Cx) -> Outcome<Self::Tx<'_>, Error> {
908 Outcome::Ok(MockTx)
909 }
910
911 async fn begin_with(
912 &self,
913 _cx: &Cx,
914 _isolation: IsolationLevel,
915 ) -> Outcome<Self::Tx<'_>, Error> {
916 Outcome::Ok(MockTx)
917 }
918
919 async fn prepare(&self, _cx: &Cx, _sql: &str) -> Outcome<PreparedStatement, Error> {
920 Outcome::Ok(PreparedStatement::new(1, String::new(), 0))
921 }
922
923 async fn query_prepared(
924 &self,
925 _cx: &Cx,
926 _stmt: &PreparedStatement,
927 _params: &[Value],
928 ) -> Outcome<Vec<Row>, Error> {
929 Outcome::Ok(vec![])
930 }
931
932 async fn execute_prepared(
933 &self,
934 _cx: &Cx,
935 _stmt: &PreparedStatement,
936 _params: &[Value],
937 ) -> Outcome<u64, Error> {
938 Outcome::Ok(0)
939 }
940
941 async fn ping(&self, _cx: &Cx) -> Outcome<(), Error> {
942 if self.ping_should_fail.load(Ordering::Relaxed) {
943 Outcome::Err(Error::Connection(ConnectionError {
944 kind: ConnectionErrorKind::Disconnected,
945 message: "mock ping failed".to_string(),
946 source: None,
947 }))
948 } else {
949 Outcome::Ok(())
950 }
951 }
952
953 async fn close(self, _cx: &Cx) -> Result<(), Error> {
954 Ok(())
955 }
956 }
957
958 #[test]
959 fn test_config_default() {
960 let config = PoolConfig::default();
961 assert_eq!(config.min_connections, 1);
962 assert_eq!(config.max_connections, 10);
963 assert_eq!(config.idle_timeout_ms, 600_000);
964 assert_eq!(config.acquire_timeout_ms, 30_000);
965 assert_eq!(config.max_lifetime_ms, 1_800_000);
966 assert!(config.test_on_checkout);
967 assert!(!config.test_on_return);
968 }
969
970 #[test]
971 fn test_config_builder() {
972 let config = PoolConfig::new(20)
973 .min_connections(5)
974 .idle_timeout(60_000)
975 .acquire_timeout(5_000)
976 .max_lifetime(300_000)
977 .test_on_checkout(false)
978 .test_on_return(true);
979
980 assert_eq!(config.min_connections, 5);
981 assert_eq!(config.max_connections, 20);
982 assert_eq!(config.idle_timeout_ms, 60_000);
983 assert_eq!(config.acquire_timeout_ms, 5_000);
984 assert_eq!(config.max_lifetime_ms, 300_000);
985 assert!(!config.test_on_checkout);
986 assert!(config.test_on_return);
987 }
988
989 #[test]
990 fn test_config_clone() {
991 let config = PoolConfig::new(15).min_connections(3);
992 let cloned = config.clone();
993 assert_eq!(config.max_connections, cloned.max_connections);
994 assert_eq!(config.min_connections, cloned.min_connections);
995 }
996
997 #[test]
998 fn test_stats_default() {
999 let stats = PoolStats::default();
1000 assert_eq!(stats.total_connections, 0);
1001 assert_eq!(stats.idle_connections, 0);
1002 assert_eq!(stats.active_connections, 0);
1003 assert_eq!(stats.pending_requests, 0);
1004 assert_eq!(stats.connections_created, 0);
1005 assert_eq!(stats.connections_closed, 0);
1006 assert_eq!(stats.acquires, 0);
1007 assert_eq!(stats.timeouts, 0);
1008 }
1009
1010 #[test]
1011 fn test_stats_clone() {
1012 let stats = PoolStats {
1013 total_connections: 5,
1014 acquires: 100,
1015 ..Default::default()
1016 };
1017 let cloned = stats.clone();
1018 assert_eq!(stats.total_connections, cloned.total_connections);
1019 assert_eq!(stats.acquires, cloned.acquires);
1020 }
1021
1022 #[test]
1023 fn test_connection_meta_timing() {
1024 use std::thread;
1025
1026 struct DummyConn;
1028
1029 let meta = ConnectionMeta::new(DummyConn);
1030 let initial_age = meta.age();
1031
1032 thread::sleep(Duration::from_millis(10));
1034
1035 assert!(meta.age() > initial_age);
1037 assert!(meta.idle_time() > Duration::ZERO);
1038 }
1039
1040 #[test]
1041 fn test_connection_meta_touch() {
1042 use std::thread;
1043
1044 struct DummyConn;
1045
1046 let mut meta = ConnectionMeta::new(DummyConn);
1047
1048 thread::sleep(Duration::from_millis(10));
1050 let idle_before_touch = meta.idle_time();
1051 assert!(idle_before_touch > Duration::ZERO);
1052
1053 meta.touch();
1055 let idle_after_touch = meta.idle_time();
1056
1057 assert!(idle_after_touch < idle_before_touch);
1059 }
1060
1061 #[test]
1062 fn test_pool_new() {
1063 let config = PoolConfig::new(5);
1064 let pool: Pool<MockConnection> = Pool::new(config);
1065
1066 assert_eq!(pool.idle_count(), 0);
1068 assert_eq!(pool.active_count(), 0);
1069 assert_eq!(pool.total_count(), 0);
1070 assert!(!pool.is_closed());
1071 assert!(!pool.at_capacity());
1072 }
1073
1074 #[test]
1075 fn test_pool_config() {
1076 let config = PoolConfig::new(7).min_connections(2);
1077 let pool: Pool<MockConnection> = Pool::new(config);
1078
1079 let retrieved_config = pool.config();
1080 assert_eq!(retrieved_config.max_connections, 7);
1081 assert_eq!(retrieved_config.min_connections, 2);
1082 }
1083
1084 #[test]
1085 fn test_pool_stats_initial() {
1086 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1087
1088 let stats = pool.stats();
1089 assert_eq!(stats.total_connections, 0);
1090 assert_eq!(stats.idle_connections, 0);
1091 assert_eq!(stats.active_connections, 0);
1092 assert_eq!(stats.pending_requests, 0);
1093 assert_eq!(stats.connections_created, 0);
1094 assert_eq!(stats.connections_closed, 0);
1095 assert_eq!(stats.acquires, 0);
1096 assert_eq!(stats.timeouts, 0);
1097 }
1098
1099 #[test]
1100 fn test_pool_close() {
1101 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1102
1103 assert!(!pool.is_closed());
1104 pool.close();
1105 assert!(pool.is_closed());
1106 }
1107
1108 #[test]
1109 fn test_pool_inner_can_create_new() {
1110 let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(3));
1111
1112 assert!(inner.can_create_new());
1114
1115 inner.total_count = 3;
1117 assert!(!inner.can_create_new());
1118
1119 inner.total_count = 2;
1121 assert!(inner.can_create_new());
1122
1123 inner.closed = true;
1125 assert!(!inner.can_create_new());
1126 }
1127
1128 #[test]
1129 fn test_pool_inner_stats() {
1130 let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(10));
1131
1132 inner.total_count = 5;
1133 inner.active_count = 3;
1134 inner.waiter_count = 2;
1135 inner
1136 .idle
1137 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1138 inner
1139 .idle
1140 .push_back(ConnectionMeta::new(MockConnection::new(2)));
1141
1142 let stats = inner.stats();
1143 assert_eq!(stats.total_connections, 5);
1144 assert_eq!(stats.idle_connections, 2);
1145 assert_eq!(stats.active_connections, 3);
1146 assert_eq!(stats.pending_requests, 2);
1147 }
1148
1149 #[test]
1150 fn test_pooled_connection_age_and_idle_time() {
1151 use std::thread;
1152
1153 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1154
1155 {
1157 let mut inner = pool.shared.inner.lock().unwrap();
1158 inner.total_count = 1;
1159 inner.active_count = 1;
1160 }
1161
1162 let meta = ConnectionMeta::new(MockConnection::new(1));
1163 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1164
1165 assert!(pooled.age() >= Duration::ZERO);
1167
1168 thread::sleep(Duration::from_millis(5));
1169 assert!(pooled.age() > Duration::ZERO);
1170 }
1171
1172 #[test]
1173 fn test_pooled_connection_detach() {
1174 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1175
1176 {
1178 let mut inner = pool.shared.inner.lock().unwrap();
1179 inner.total_count = 1;
1180 inner.active_count = 1;
1181 }
1182
1183 let meta = ConnectionMeta::new(MockConnection::new(42));
1184 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1185
1186 assert_eq!(pool.total_count(), 1);
1188 assert_eq!(pool.active_count(), 1);
1189
1190 let conn = pooled.detach();
1192 assert_eq!(conn.id, 42);
1193
1194 assert_eq!(pool.total_count(), 0);
1196 assert_eq!(pool.active_count(), 0);
1197
1198 let stats = pool.stats();
1200 assert_eq!(stats.connections_closed, 1);
1201 }
1202
1203 #[test]
1204 fn test_pooled_connection_drop_returns_to_pool() {
1205 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1206
1207 {
1209 let mut inner = pool.shared.inner.lock().unwrap();
1210 inner.total_count = 1;
1211 inner.active_count = 1;
1212 }
1213
1214 let meta = ConnectionMeta::new(MockConnection::new(1));
1215 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1216
1217 assert_eq!(pool.active_count(), 1);
1219 assert_eq!(pool.idle_count(), 0);
1220
1221 drop(pooled);
1223
1224 assert_eq!(pool.active_count(), 0);
1226 assert_eq!(pool.idle_count(), 1);
1227 assert_eq!(pool.total_count(), 1); }
1229
1230 #[test]
1231 fn test_pooled_connection_drop_when_pool_closed() {
1232 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1233
1234 {
1236 let mut inner = pool.shared.inner.lock().unwrap();
1237 inner.total_count = 1;
1238 inner.active_count = 1;
1239 }
1240
1241 let meta = ConnectionMeta::new(MockConnection::new(1));
1242 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1243
1244 pool.close();
1246
1247 drop(pooled);
1249
1250 assert_eq!(pool.idle_count(), 0);
1252 assert_eq!(pool.active_count(), 0);
1253 assert_eq!(pool.total_count(), 0);
1254
1255 assert_eq!(pool.stats().connections_closed, 1);
1257 }
1258
1259 #[test]
1260 fn test_pooled_connection_deref() {
1261 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1262
1263 {
1265 let mut inner = pool.shared.inner.lock().unwrap();
1266 inner.total_count = 1;
1267 inner.active_count = 1;
1268 }
1269
1270 let meta = ConnectionMeta::new(MockConnection::new(99));
1271 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1272
1273 assert_eq!(pooled.id, 99);
1275 }
1276
1277 #[test]
1278 fn test_pooled_connection_deref_mut() {
1279 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1280
1281 {
1283 let mut inner = pool.shared.inner.lock().unwrap();
1284 inner.total_count = 1;
1285 inner.active_count = 1;
1286 }
1287
1288 let meta = ConnectionMeta::new(MockConnection::new(1));
1289 let mut pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1290
1291 pooled.id = 50;
1293 assert_eq!(pooled.id, 50);
1294 }
1295
1296 #[test]
1297 fn test_pooled_connection_debug() {
1298 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1299
1300 {
1302 let mut inner = pool.shared.inner.lock().unwrap();
1303 inner.total_count = 1;
1304 inner.active_count = 1;
1305 }
1306
1307 let meta = ConnectionMeta::new(MockConnection::new(1));
1308 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1309
1310 let debug_str = format!("{:?}", pooled);
1311 assert!(debug_str.contains("PooledConnection"));
1312 assert!(debug_str.contains("age"));
1313 }
1314
1315 #[test]
1316 fn test_pool_at_capacity() {
1317 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(2));
1318
1319 assert!(!pool.at_capacity());
1320
1321 {
1323 let mut inner = pool.shared.inner.lock().unwrap();
1324 inner.total_count = 1;
1325 }
1326 assert!(!pool.at_capacity());
1327
1328 {
1329 let mut inner = pool.shared.inner.lock().unwrap();
1330 inner.total_count = 2;
1331 }
1332 assert!(pool.at_capacity());
1333 }
1334
1335 #[test]
1336 fn test_acquire_action_enum() {
1337 let closed: AcquireAction<MockConnection> = AcquireAction::PoolClosed;
1339 assert!(matches!(closed, AcquireAction::PoolClosed));
1340
1341 let create: AcquireAction<MockConnection> = AcquireAction::CreateNew;
1342 assert!(matches!(create, AcquireAction::CreateNew));
1343
1344 let wait: AcquireAction<MockConnection> = AcquireAction::Wait;
1345 assert!(matches!(wait, AcquireAction::Wait));
1346
1347 let meta = ConnectionMeta::new(MockConnection::new(1));
1348 let validate: AcquireAction<MockConnection> = AcquireAction::ValidateExisting(meta);
1349 assert!(matches!(validate, AcquireAction::ValidateExisting(_)));
1350 }
1351
1352 #[test]
1353 fn test_pool_shared_atomic_counters() {
1354 let shared = PoolShared::<MockConnection>::new(PoolConfig::new(5));
1355
1356 assert_eq!(shared.connections_created.load(Ordering::Relaxed), 0);
1358 assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 0);
1359 assert_eq!(shared.acquires.load(Ordering::Relaxed), 0);
1360 assert_eq!(shared.timeouts.load(Ordering::Relaxed), 0);
1361
1362 shared.connections_created.fetch_add(1, Ordering::Relaxed);
1364 shared.connections_closed.fetch_add(2, Ordering::Relaxed);
1365 shared.acquires.fetch_add(10, Ordering::Relaxed);
1366 shared.timeouts.fetch_add(3, Ordering::Relaxed);
1367
1368 assert_eq!(shared.connections_created.load(Ordering::Relaxed), 1);
1369 assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 2);
1370 assert_eq!(shared.acquires.load(Ordering::Relaxed), 10);
1371 assert_eq!(shared.timeouts.load(Ordering::Relaxed), 3);
1372 }
1373
1374 #[test]
1375 fn test_pool_close_clears_idle() {
1376 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1377
1378 {
1380 let mut inner = pool.shared.inner.lock().unwrap();
1381 inner.total_count = 3;
1382 inner
1383 .idle
1384 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1385 inner
1386 .idle
1387 .push_back(ConnectionMeta::new(MockConnection::new(2)));
1388 inner
1389 .idle
1390 .push_back(ConnectionMeta::new(MockConnection::new(3)));
1391 }
1392
1393 assert_eq!(pool.idle_count(), 3);
1394 assert_eq!(pool.total_count(), 3);
1395
1396 pool.close();
1397
1398 assert_eq!(pool.idle_count(), 0);
1400 assert_eq!(pool.total_count(), 0);
1401 assert!(pool.is_closed());
1402
1403 assert_eq!(pool.stats().connections_closed, 3);
1405 }
1406
1407 fn poison_pool_mutex() -> Pool<MockConnection> {
1420 use std::panic;
1421 use std::thread;
1422
1423 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1424
1425 {
1427 let mut inner = pool.shared.inner.lock().unwrap();
1428 inner.total_count = 2;
1429 inner.active_count = 1;
1430 inner
1431 .idle
1432 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1433 }
1434
1435 let shared_clone = Arc::clone(&pool.shared);
1437 let handle = thread::spawn(move || {
1438 let _guard = shared_clone.inner.lock().unwrap();
1439 panic!("intentional panic to poison mutex");
1441 });
1442
1443 let _ = handle.join();
1445
1446 assert!(pool.shared.inner.lock().is_err());
1448
1449 pool
1450 }
1451
1452 #[test]
1455 fn test_config_after_poisoning_returns_valid_data() {
1456 let pool = poison_pool_mutex();
1457
1458 let config = pool.config();
1460 assert_eq!(config.max_connections, 5);
1461 }
1462
1463 #[test]
1464 fn test_stats_after_poisoning_returns_valid_data() {
1465 let pool = poison_pool_mutex();
1466
1467 let stats = pool.stats();
1469 assert_eq!(stats.total_connections, 2);
1471 assert_eq!(stats.active_connections, 1);
1472 assert_eq!(stats.idle_connections, 1);
1473 }
1474
1475 #[test]
1476 fn test_at_capacity_after_poisoning() {
1477 let pool = poison_pool_mutex();
1478
1479 assert!(!pool.at_capacity());
1482 }
1483
1484 #[test]
1485 fn test_is_closed_after_poisoning() {
1486 let pool = poison_pool_mutex();
1487
1488 assert!(!pool.is_closed());
1490 }
1491
1492 #[test]
1493 fn test_idle_count_after_poisoning() {
1494 let pool = poison_pool_mutex();
1495
1496 assert_eq!(pool.idle_count(), 1);
1498 }
1499
1500 #[test]
1501 fn test_active_count_after_poisoning() {
1502 let pool = poison_pool_mutex();
1503
1504 assert_eq!(pool.active_count(), 1);
1506 }
1507
1508 #[test]
1509 fn test_total_count_after_poisoning() {
1510 let pool = poison_pool_mutex();
1511
1512 assert_eq!(pool.total_count(), 2);
1514 }
1515
1516 #[test]
1519 fn test_lock_or_error_returns_error_when_poisoned() {
1520 use std::thread;
1521
1522 let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1523
1524 let shared_clone = Arc::clone(&shared);
1526 let handle = thread::spawn(move || {
1527 let _guard = shared_clone.inner.lock().unwrap();
1528 panic!("intentional panic to poison mutex");
1529 });
1530 let _ = handle.join();
1531
1532 let result = shared.lock_or_error("test_operation");
1534
1535 match result {
1537 Err(Error::Pool(pool_err)) => {
1538 assert!(matches!(pool_err.kind, PoolErrorKind::Poisoned));
1539 assert!(pool_err.message.contains("poisoned"));
1540 }
1541 Err(other) => panic!("Expected Pool error, got: {:?}", other),
1542 Ok(_) => panic!("Expected error, got Ok"),
1543 }
1544 }
1545
1546 #[test]
1547 fn test_lock_or_recover_succeeds_when_poisoned() {
1548 use std::thread;
1549
1550 let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1551
1552 {
1554 let mut inner = shared.inner.lock().unwrap();
1555 inner.total_count = 42;
1556 }
1557
1558 let shared_clone = Arc::clone(&shared);
1560 let handle = thread::spawn(move || {
1561 let _guard = shared_clone.inner.lock().unwrap();
1562 panic!("intentional panic to poison mutex");
1563 });
1564 let _ = handle.join();
1565
1566 assert!(shared.inner.lock().is_err());
1568
1569 let inner = shared.lock_or_recover();
1571 assert_eq!(inner.total_count, 42);
1572 }
1573
1574 #[test]
1575 fn test_close_after_poisoning_recovers_and_closes() {
1576 let pool = poison_pool_mutex();
1577
1578 pool.close();
1580
1581 assert!(pool.is_closed());
1583
1584 assert_eq!(pool.idle_count(), 0);
1586 }
1587
1588 #[test]
1591 fn test_drop_pooled_connection_after_poisoning_does_not_panic() {
1592 use std::panic;
1593 use std::thread;
1594
1595 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1596
1597 {
1599 let mut inner = pool.shared.inner.lock().unwrap();
1600 inner.total_count = 1;
1601 inner.active_count = 1;
1602 }
1603
1604 let meta = ConnectionMeta::new(MockConnection::new(1));
1606 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1607
1608 let shared_clone = Arc::clone(&pool.shared);
1610 let handle = thread::spawn(move || {
1611 let _guard = shared_clone.inner.lock().unwrap();
1612 panic!("intentional panic to poison mutex");
1613 });
1614 let _ = handle.join();
1615
1616 assert!(pool.shared.inner.lock().is_err());
1618
1619 let drop_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1622 drop(pooled);
1623 }));
1624
1625 assert!(
1627 drop_result.is_ok(),
1628 "Dropping PooledConnection after mutex poisoning should not panic"
1629 );
1630 }
1631
1632 #[test]
1633 fn test_detach_after_poisoning_does_not_panic() {
1634 use std::panic;
1635 use std::thread;
1636
1637 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1638
1639 {
1641 let mut inner = pool.shared.inner.lock().unwrap();
1642 inner.total_count = 1;
1643 inner.active_count = 1;
1644 }
1645
1646 let meta = ConnectionMeta::new(MockConnection::new(42));
1648 let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1649
1650 let shared_clone = Arc::clone(&pool.shared);
1652 let handle = thread::spawn(move || {
1653 let _guard = shared_clone.inner.lock().unwrap();
1654 panic!("intentional panic to poison mutex");
1655 });
1656 let _ = handle.join();
1657
1658 assert!(pool.shared.inner.lock().is_err());
1660
1661 let detach_result = panic::catch_unwind(panic::AssertUnwindSafe(|| pooled.detach()));
1663
1664 assert!(
1665 detach_result.is_ok(),
1666 "detach() after mutex poisoning should not panic"
1667 );
1668
1669 let conn = detach_result.unwrap();
1671 assert_eq!(conn.id, 42);
1672 }
1673
1674 #[test]
1677 fn test_pool_survives_thread_panic_during_acquire() {
1678 use std::thread;
1679
1680 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1681 let pool_arc = Arc::new(pool);
1682
1683 let pool_clone = Arc::clone(&pool_arc);
1686 let handle = thread::spawn(move || {
1687 {
1689 let mut inner = pool_clone.shared.inner.lock().unwrap();
1690 inner.total_count = 1;
1691 inner.active_count = 1;
1692 }
1693
1694 let _guard = pool_clone.shared.inner.lock().unwrap();
1697 panic!("simulated panic during database operation");
1698 });
1699
1700 let _ = handle.join();
1702
1703 assert_eq!(pool_arc.total_count(), 1);
1705 assert_eq!(pool_arc.config().max_connections, 5);
1706
1707 let stats = pool_arc.stats();
1709 assert_eq!(stats.total_connections, 1);
1710 }
1711
1712 #[test]
1713 fn test_pool_close_after_thread_panic() {
1714 use std::thread;
1715
1716 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1717
1718 {
1720 let mut inner = pool.shared.inner.lock().unwrap();
1721 inner.total_count = 2;
1722 inner
1723 .idle
1724 .push_back(ConnectionMeta::new(MockConnection::new(1)));
1725 inner
1726 .idle
1727 .push_back(ConnectionMeta::new(MockConnection::new(2)));
1728 }
1729
1730 let shared_clone = Arc::clone(&pool.shared);
1732 let handle = thread::spawn(move || {
1733 let _guard = shared_clone.inner.lock().unwrap();
1734 panic!("intentional panic");
1735 });
1736 let _ = handle.join();
1737
1738 pool.close();
1740
1741 assert!(pool.is_closed());
1743 assert_eq!(pool.idle_count(), 0);
1744 }
1745
1746 #[test]
1747 fn test_multiple_reads_after_poisoning() {
1748 let pool = poison_pool_mutex();
1749
1750 for _ in 0..10 {
1752 let _ = pool.config();
1753 let _ = pool.stats();
1754 let _ = pool.at_capacity();
1755 let _ = pool.is_closed();
1756 let _ = pool.idle_count();
1757 let _ = pool.active_count();
1758 let _ = pool.total_count();
1759 }
1760
1761 assert_eq!(pool.total_count(), 2);
1763 }
1764
1765 #[test]
1766 fn test_waiters_count_after_poisoning() {
1767 use std::thread;
1768
1769 let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1770
1771 {
1773 let mut inner = pool.shared.inner.lock().unwrap();
1774 inner.waiter_count = 3;
1775 }
1776
1777 let shared_clone = Arc::clone(&pool.shared);
1779 let handle = thread::spawn(move || {
1780 let _guard = shared_clone.inner.lock().unwrap();
1781 panic!("intentional panic");
1782 });
1783 let _ = handle.join();
1784
1785 let stats = pool.stats();
1787 assert_eq!(stats.pending_requests, 3);
1788 }
1789}