1use std::fmt;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::sync::{Arc, Condvar, Mutex};
20use std::time::{Duration, Instant};
21
22use crate::pool_core::{Circuit, ConnState, ExhaustionPolicy, PoolPhase, ValidationMode};
23use crate::{Node, NodeOpt, Opts, Value, VoltError, VoltTable, block_for_result, node};
24
25#[cfg(feature = "tracing")]
30macro_rules! pool_trace {
31 ($($arg:tt)*) => { tracing::trace!($($arg)*) };
32}
33#[cfg(not(feature = "tracing"))]
34macro_rules! pool_trace {
35 ($($arg:tt)*) => {};
36}
37
38#[cfg(feature = "tracing")]
39macro_rules! pool_debug {
40 ($($arg:tt)*) => { tracing::debug!($($arg)*) };
41}
42#[cfg(not(feature = "tracing"))]
43macro_rules! pool_debug {
44 ($($arg:tt)*) => {};
45}
46
47#[cfg(feature = "tracing")]
48macro_rules! pool_info {
49 ($($arg:tt)*) => { tracing::info!($($arg)*) };
50}
51#[cfg(not(feature = "tracing"))]
52macro_rules! pool_info {
53 ($($arg:tt)*) => {};
54}
55
56#[cfg(feature = "tracing")]
57macro_rules! pool_warn {
58 ($($arg:tt)*) => { tracing::warn!($($arg)*) };
59}
60#[cfg(not(feature = "tracing"))]
61macro_rules! pool_warn {
62 ($($arg:tt)*) => {};
63}
64
65#[cfg(feature = "tracing")]
66macro_rules! pool_error {
67 ($($arg:tt)*) => { tracing::error!($($arg)*) };
68}
69#[cfg(not(feature = "tracing"))]
70macro_rules! pool_error {
71 ($($arg:tt)*) => {};
72}
73
74#[cfg(feature = "metrics")]
79mod pool_metrics {
80 use metrics::{counter, gauge};
81
82 pub fn set_connections_total(count: usize) {
83 gauge!("voltdb_pool_connections_total").set(count as f64);
84 }
85
86 pub fn set_connections_healthy(count: usize) {
87 gauge!("voltdb_pool_connections_healthy").set(count as f64);
88 }
89
90 pub fn inc_reconnect_total() {
91 counter!("voltdb_pool_reconnect_total").increment(1);
92 }
93
94 pub fn inc_circuit_open_total() {
95 counter!("voltdb_pool_circuit_open_total").increment(1);
96 }
97
98 pub fn inc_requests_failed_total() {
99 counter!("voltdb_pool_requests_failed_total").increment(1);
100 }
101
102 pub fn inc_requests_total() {
103 counter!("voltdb_pool_requests_total").increment(1);
104 }
105}
106
107#[cfg(not(feature = "metrics"))]
108mod pool_metrics {
109 pub fn set_connections_total(_count: usize) {}
110 pub fn set_connections_healthy(_count: usize) {}
111 pub fn inc_reconnect_total() {}
112 pub fn inc_circuit_open_total() {}
113 pub fn inc_requests_failed_total() {}
114 pub fn inc_requests_total() {}
115}
116
117#[derive(Debug, Clone)]
123pub struct PoolConfig {
124 pub size: usize,
126 pub reconnect_backoff: Duration,
128 pub circuit_open_duration: Duration,
130 pub exhaustion_policy: ExhaustionPolicy,
132 pub validation_mode: ValidationMode,
134 pub circuit_failure_threshold: u32,
136 pub shutdown_timeout: Duration,
141}
142
143impl Default for PoolConfig {
144 fn default() -> Self {
145 Self {
146 size: 10,
147 reconnect_backoff: Duration::from_secs(5),
148 circuit_open_duration: Duration::from_secs(30),
149 exhaustion_policy: ExhaustionPolicy::FailFast,
150 validation_mode: ValidationMode::FailFast,
151 circuit_failure_threshold: 3,
152 shutdown_timeout: Duration::from_secs(30),
153 }
154 }
155}
156
157impl PoolConfig {
158 pub fn new() -> Self {
159 Self::default()
160 }
161
162 pub fn size(mut self, size: usize) -> Self {
163 self.size = size;
164 self
165 }
166
167 pub fn reconnect_backoff(mut self, duration: Duration) -> Self {
168 self.reconnect_backoff = duration;
169 self
170 }
171
172 pub fn circuit_open_duration(mut self, duration: Duration) -> Self {
173 self.circuit_open_duration = duration;
174 self
175 }
176
177 pub fn exhaustion_policy(mut self, policy: ExhaustionPolicy) -> Self {
178 self.exhaustion_policy = policy;
179 self
180 }
181
182 pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
183 self.validation_mode = mode;
184 self
185 }
186
187 pub fn circuit_failure_threshold(mut self, threshold: u32) -> Self {
188 self.circuit_failure_threshold = threshold;
189 self
190 }
191
192 pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
193 self.shutdown_timeout = timeout;
194 self
195 }
196}
197
198#[derive(Debug)]
209struct SlotMeta {
210 state: ConnState,
211 circuit: Circuit,
212 consecutive_failures: u32,
213 last_reconnect_attempt: Option<Instant>,
214 host_idx: usize,
215}
216
217impl SlotMeta {
218 fn new_healthy(host_idx: usize) -> Self {
219 Self {
220 state: ConnState::Healthy,
221 circuit: Circuit::Closed,
222 consecutive_failures: 0,
223 last_reconnect_attempt: None,
224 host_idx,
225 }
226 }
227
228 fn new_unhealthy(host_idx: usize) -> Self {
229 Self {
230 state: ConnState::Unhealthy {
231 since: Instant::now(),
232 },
233 circuit: Circuit::Open {
234 until: Instant::now() + Duration::from_secs(5),
235 },
236 consecutive_failures: 1,
237 last_reconnect_attempt: None,
238 host_idx,
239 }
240 }
241
242 fn is_available(&self) -> bool {
245 self.state.is_healthy() && self.circuit.should_allow()
246 }
247
248 fn needs_reconnect(&self, backoff: Duration) -> bool {
250 if self.state.is_healthy() || self.state.is_reconnecting() {
251 return false;
252 }
253
254 match self.last_reconnect_attempt {
255 None => true,
256 Some(last) => Instant::now().duration_since(last) >= backoff,
257 }
258 }
259
260 fn record_success(&mut self) {
261 self.consecutive_failures = 0;
262 self.state = ConnState::Healthy;
263 self.circuit.close();
264 }
265
266 fn record_failure(&mut self, config: &PoolConfig) {
267 self.consecutive_failures += 1;
268 self.state = ConnState::Unhealthy {
269 since: Instant::now(),
270 };
271
272 if self.consecutive_failures >= config.circuit_failure_threshold {
273 self.circuit.open(config.circuit_open_duration);
274 pool_metrics::inc_circuit_open_total();
275 }
276 }
277}
278
279struct InnerPool {
284 opts: Opts,
285 config: PoolConfig,
286 slots: Vec<SlotMeta>,
287 nodes: Vec<Arc<Mutex<Option<Node>>>>,
291 phase: PoolPhase,
292}
293
294impl fmt::Debug for InnerPool {
295 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296 f.debug_struct("InnerPool")
297 .field("config", &self.config)
298 .field("slots_count", &self.slots.len())
299 .field("phase", &self.phase)
300 .finish()
301 }
302}
303
304impl InnerPool {
305 fn node_opt(&self, host_idx: usize) -> Result<NodeOpt, VoltError> {
306 let ip_port = self
307 .opts
308 .0
309 .ip_ports
310 .get(host_idx)
311 .cloned()
312 .ok_or(VoltError::InvalidConfig)?;
313 Ok(NodeOpt {
314 ip_port,
315 pass: self.opts.0.pass.clone(),
316 user: self.opts.0.user.clone(),
317 connect_timeout: self.opts.0.connect_timeout,
318 read_timeout: self.opts.0.read_timeout,
319 })
320 }
321
322 fn new(opts: Opts, config: PoolConfig) -> Result<Self, VoltError> {
323 let num_hosts = opts.0.ip_ports.len();
324 let mut inner = InnerPool {
325 opts,
326 config: config.clone(),
327 slots: Vec::with_capacity(config.size),
328 nodes: Vec::with_capacity(config.size),
329 phase: PoolPhase::Running,
330 };
331
332 for i in 0..config.size {
333 let host_idx = i % num_hosts;
334 let node_opt = inner.node_opt(host_idx)?;
335
336 pool_debug!(slot = i, host = host_idx, "creating connection");
337
338 match node::Node::new(node_opt) {
339 Ok(node) => {
340 inner.slots.push(SlotMeta::new_healthy(host_idx));
341 inner.nodes.push(Arc::new(Mutex::new(Some(node))));
342 pool_info!(slot = i, "connection established");
343 }
344 Err(e) => match config.validation_mode {
345 ValidationMode::FailFast => {
346 pool_error!(slot = i, error = ?e, "connection failed, aborting pool creation");
347 return Err(e);
348 }
349 ValidationMode::BestEffort => {
350 pool_warn!(slot = i, error = ?e, "connection failed, marking unhealthy");
351 inner.slots.push(SlotMeta::new_unhealthy(host_idx));
352 inner.nodes.push(Arc::new(Mutex::new(None)));
353 }
354 },
355 }
356 }
357
358 inner.update_metrics();
359 pool_info!(
360 size = config.size,
361 healthy = inner.healthy_count(),
362 "pool initialized"
363 );
364
365 Ok(inner)
366 }
367
368 fn healthy_count(&self) -> usize {
369 self.slots.iter().filter(|s| s.state.is_healthy()).count()
370 }
371
372 fn update_metrics(&self) {
373 pool_metrics::set_connections_total(self.slots.len());
374 pool_metrics::set_connections_healthy(self.healthy_count());
375 }
376}
377
378pub struct Pool {
402 inner: Arc<(Mutex<InnerPool>, Condvar)>,
403 counter: AtomicUsize,
404 shutdown_flag: AtomicBool,
405 config: PoolConfig,
406}
407
408impl fmt::Debug for Pool {
409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410 let inner = self.inner.0.lock().ok();
411 f.debug_struct("Pool")
412 .field("counter", &self.counter.load(Ordering::Relaxed))
413 .field("shutdown", &self.shutdown_flag.load(Ordering::Relaxed))
414 .field(
415 "healthy",
416 &inner.as_ref().map(|i| i.healthy_count()).unwrap_or(0),
417 )
418 .field("config", &self.config)
419 .finish()
420 }
421}
422
423impl Pool {
424 pub fn new<T: Into<Opts>>(opts: T) -> Result<Pool, VoltError> {
426 Pool::with_config(opts, PoolConfig::default())
427 }
428
429 pub fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<Pool, VoltError> {
431 Pool::with_config(opts, PoolConfig::new().size(size))
432 }
433
434 pub fn with_config<T: Into<Opts>>(opts: T, config: PoolConfig) -> Result<Pool, VoltError> {
436 let inner = InnerPool::new(opts.into(), config.clone())?;
437 Ok(Pool {
438 inner: Arc::new((Mutex::new(inner), Condvar::new())),
439 counter: AtomicUsize::new(0),
440 shutdown_flag: AtomicBool::new(false),
441 config,
442 })
443 }
444
445 pub fn get_conn(&self) -> Result<PooledConn<'_>, VoltError> {
452 if self.shutdown_flag.load(Ordering::Relaxed) {
453 pool_warn!("get_conn called on shutdown pool");
454 return Err(VoltError::PoolShutdown);
455 }
456
457 pool_metrics::inc_requests_total();
458
459 let preferred_idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.config.size;
460
461 match self.config.exhaustion_policy {
462 ExhaustionPolicy::FailFast => self.get_conn_failfast(preferred_idx),
463 ExhaustionPolicy::Block { timeout } => self.get_conn_blocking(preferred_idx, timeout),
464 }
465 }
466
467 fn get_conn_failfast(&self, preferred_idx: usize) -> Result<PooledConn<'_>, VoltError> {
468 let (lock, _cvar) = &*self.inner;
469 let inner = lock
470 .lock()
471 .map_err(|_| VoltError::PoisonError("Pool lock poisoned".into()))?;
472
473 if inner.phase != PoolPhase::Running {
474 return Err(VoltError::PoolShutdown);
475 }
476
477 if inner.slots[preferred_idx].is_available() {
479 return self.checkout_slot(&inner, preferred_idx);
480 }
481
482 for i in 1..self.config.size {
484 let idx = (preferred_idx + i) % self.config.size;
485 if inner.slots[idx].is_available() {
486 pool_debug!(
487 preferred = preferred_idx,
488 actual = idx,
489 "using alternate connection"
490 );
491 return self.checkout_slot(&inner, idx);
492 }
493 }
494
495 pool_warn!("no healthy connections available");
496 pool_metrics::inc_requests_failed_total();
497 Err(VoltError::PoolExhausted)
498 }
499
500 fn get_conn_blocking(
501 &self,
502 preferred_idx: usize,
503 timeout: Duration,
504 ) -> Result<PooledConn<'_>, VoltError> {
505 let deadline = Instant::now() + timeout;
506 let (lock, cvar) = &*self.inner;
507
508 let mut inner = lock
509 .lock()
510 .map_err(|_| VoltError::PoisonError("Pool lock poisoned".into()))?;
511
512 loop {
513 if inner.phase != PoolPhase::Running {
514 return Err(VoltError::PoolShutdown);
515 }
516
517 if inner.slots[preferred_idx].is_available() {
519 return self.checkout_slot(&inner, preferred_idx);
520 }
521
522 for i in 1..self.config.size {
524 let idx = (preferred_idx + i) % self.config.size;
525 if inner.slots[idx].is_available() {
526 return self.checkout_slot(&inner, idx);
527 }
528 }
529
530 let remaining = deadline.saturating_duration_since(Instant::now());
532 if remaining.is_zero() {
533 pool_warn!(timeout = ?timeout, "connection wait timed out");
534 pool_metrics::inc_requests_failed_total();
535 return Err(VoltError::Timeout);
536 }
537
538 pool_trace!("waiting for available connection");
539 let (guard, _timeout_result) = cvar
540 .wait_timeout(inner, remaining)
541 .map_err(|_| VoltError::PoisonError("Pool lock poisoned".into()))?;
542 inner = guard;
543 }
544 }
545
546 fn checkout_slot<'a>(
549 &'a self,
550 inner: &InnerPool,
551 idx: usize,
552 ) -> Result<PooledConn<'a>, VoltError> {
553 let node = Arc::clone(&inner.nodes[idx]);
555 let config = inner.config.clone();
556 let host_idx = inner.slots[idx].host_idx;
557
558 pool_trace!(slot = idx, "connection acquired");
559
560 Ok(PooledConn {
562 pool: self,
563 idx,
564 node,
565 config,
566 host_idx,
567 })
568 }
569
570 fn report_fatal_error(&self, idx: usize) {
572 let (lock, cvar) = &*self.inner;
573
574 #[allow(clippy::type_complexity)]
576 let mut reconnect_info: Option<(Arc<Mutex<Option<Node>>>, NodeOpt, PoolConfig)> = None;
577
578 if let Ok(mut inner) = lock.lock() {
579 let config = inner.config.clone();
581 inner.slots[idx].record_failure(&config);
582 pool_debug!(slot = idx, "fatal error reported");
583
584 inner.update_metrics();
585
586 cvar.notify_all();
588
589 if !self.shutdown_flag.load(Ordering::Relaxed) {
591 let backoff = inner.config.reconnect_backoff;
592 if inner.slots[idx].needs_reconnect(backoff) {
593 let node_arc = Arc::clone(&inner.nodes[idx]);
595 let host_idx = inner.slots[idx].host_idx;
596 if let Ok(node_opt) = inner.node_opt(host_idx) {
597 inner.slots[idx].state = ConnState::Reconnecting;
598 inner.slots[idx].last_reconnect_attempt = Some(Instant::now());
599
600 reconnect_info = Some((node_arc, node_opt, config));
601 }
602 }
603 }
604 }
605
606 if let Some((node_arc, node_opt, config)) = reconnect_info {
608 self.do_reconnect(idx, node_arc, node_opt, config);
609 }
610 }
611
612 fn do_reconnect(
614 &self,
615 idx: usize,
616 node_arc: Arc<Mutex<Option<Node>>>,
617 node_opt: NodeOpt,
618 config: PoolConfig,
619 ) {
620 pool_info!(slot = idx, "attempting reconnection");
621 pool_metrics::inc_reconnect_total();
622
623 match node::Node::new(node_opt) {
625 Ok(new_node) => {
626 if let Ok(mut node_guard) = node_arc.lock() {
628 *node_guard = Some(new_node);
629 }
630
631 let (lock, cvar) = &*self.inner;
633 if let Ok(mut inner) = lock.lock() {
634 inner.slots[idx].record_success();
635 inner.update_metrics();
636 cvar.notify_all(); }
638 pool_info!(slot = idx, "reconnection successful");
639 }
640 Err(_e) => {
641 let (lock, _) = &*self.inner;
643 if let Ok(mut inner) = lock.lock() {
644 inner.slots[idx].record_failure(&config);
645 inner.update_metrics();
646 }
647 pool_error!(slot = idx, error = ?_e, "reconnection failed");
648 }
649 }
650 }
651
652 fn mark_success(&self, idx: usize) {
654 let (lock, _) = &*self.inner;
655 if let Ok(mut inner) = lock.lock() {
656 inner.slots[idx].record_success();
657 inner.update_metrics();
658 }
659 }
660
661 pub fn shutdown(&self) {
671 pool_info!("initiating pool shutdown");
672 self.shutdown_flag.store(true, Ordering::Relaxed);
673
674 let (lock, cvar) = &*self.inner;
675
676 if let Ok(mut inner) = lock.lock() {
677 inner.phase = PoolPhase::Shutdown;
679 pool_info!("entering shutdown phase");
680
681 for slot in &mut inner.slots {
683 slot.state = ConnState::Unhealthy {
684 since: Instant::now(),
685 };
686 }
687
688 for node_arc in &inner.nodes {
690 if let Ok(mut node_guard) = node_arc.lock() {
691 if let Some(ref mut node) = *node_guard {
692 let _ = node.shutdown();
693 }
694 *node_guard = None;
695 }
696 }
697 inner.update_metrics();
698
699 cvar.notify_all();
701 }
702
703 pool_info!("pool shutdown complete");
704 }
705
706 pub fn is_shutdown(&self) -> bool {
708 self.shutdown_flag.load(Ordering::Relaxed)
709 }
710
711 pub fn stats(&self) -> PoolStats {
713 let (lock, _) = &*self.inner;
714 let inner = lock.lock().ok();
715 PoolStats {
716 size: self.config.size,
717 healthy: inner.as_ref().map(|i| i.healthy_count()).unwrap_or(0),
718 total_requests: self.counter.load(Ordering::Relaxed),
719 is_shutdown: self.shutdown_flag.load(Ordering::Relaxed),
720 }
721 }
722}
723
724#[derive(Debug, Clone)]
726pub struct PoolStats {
727 pub size: usize,
728 pub healthy: usize,
729 pub total_requests: usize,
730 pub is_shutdown: bool,
731}
732
733pub struct PooledConn<'a> {
745 pool: &'a Pool,
746 idx: usize,
747 node: Arc<Mutex<Option<Node>>>,
748 #[allow(dead_code)] config: PoolConfig,
750 #[allow(dead_code)]
751 host_idx: usize,
752}
753
754impl fmt::Debug for PooledConn<'_> {
755 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
756 f.debug_struct("PooledConn")
757 .field("idx", &self.idx)
758 .field("host_idx", &self.host_idx)
759 .finish()
760 }
761}
762
763impl PooledConn<'_> {
764 pub fn query(&mut self, sql: &str) -> Result<VoltTable, VoltError> {
766 pool_trace!(slot = self.idx, sql = sql, "executing query");
767
768 let mut node_guard = self
769 .node
770 .lock()
771 .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
772
773 let node = node_guard
774 .as_mut()
775 .ok_or(VoltError::ConnectionNotAvailable)?;
776
777 let result = node.query(sql).and_then(|r| block_for_result(&r));
778 drop(node_guard);
779
780 self.handle_result(&result);
781 result
782 }
783
784 pub fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
786 pool_trace!(slot = self.idx, "listing procedures");
787
788 let mut node_guard = self
789 .node
790 .lock()
791 .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
792
793 let node = node_guard
794 .as_mut()
795 .ok_or(VoltError::ConnectionNotAvailable)?;
796
797 let result = node.list_procedures().and_then(|r| block_for_result(&r));
798 drop(node_guard);
799
800 self.handle_result(&result);
801 result
802 }
803
804 pub fn call_sp(&mut self, proc: &str, params: Vec<&dyn Value>) -> Result<VoltTable, VoltError> {
806 pool_trace!(
807 slot = self.idx,
808 procedure = proc,
809 "calling stored procedure"
810 );
811
812 let mut node_guard = self
813 .node
814 .lock()
815 .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
816
817 let node = node_guard
818 .as_mut()
819 .ok_or(VoltError::ConnectionNotAvailable)?;
820
821 let result = node
822 .call_sp(proc, params)
823 .and_then(|r| block_for_result(&r));
824 drop(node_guard);
825
826 self.handle_result(&result);
827 result
828 }
829
830 pub fn upload_jar(&mut self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
832 pool_trace!(slot = self.idx, size = bs.len(), "uploading jar");
833
834 let mut node_guard = self
835 .node
836 .lock()
837 .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
838
839 let node = node_guard
840 .as_mut()
841 .ok_or(VoltError::ConnectionNotAvailable)?;
842
843 let result = node.upload_jar(bs).and_then(|r| block_for_result(&r));
844 drop(node_guard);
845
846 self.handle_result(&result);
847 result
848 }
849
850 fn handle_result<T>(&self, result: &Result<T, VoltError>) {
852 match result {
853 Ok(_) => {
854 self.pool.mark_success(self.idx);
855 }
856 Err(e) if e.is_connection_fatal() => {
857 pool_error!(slot = self.idx, error = ?e, "fatal connection error detected");
858 if let Ok(mut guard) = self.node.lock() {
860 *guard = None;
861 }
862 self.pool.report_fatal_error(self.idx);
863 }
864 Err(_) => {
865 }
867 }
868 }
869
870 pub fn slot_index(&self) -> usize {
872 self.idx
873 }
874}
875
876#[cfg(test)]
884mod tests {
885 use super::*;
886
887 #[test]
888 fn test_pool_config_builder() {
889 let config = PoolConfig::new()
890 .size(20)
891 .reconnect_backoff(Duration::from_secs(10))
892 .circuit_open_duration(Duration::from_secs(60))
893 .exhaustion_policy(ExhaustionPolicy::Block {
894 timeout: Duration::from_secs(5),
895 })
896 .validation_mode(ValidationMode::BestEffort)
897 .circuit_failure_threshold(5)
898 .shutdown_timeout(Duration::from_secs(60));
899
900 assert_eq!(config.size, 20);
901 assert_eq!(config.reconnect_backoff, Duration::from_secs(10));
902 assert_eq!(config.circuit_open_duration, Duration::from_secs(60));
903 assert_eq!(
904 config.exhaustion_policy,
905 ExhaustionPolicy::Block {
906 timeout: Duration::from_secs(5)
907 }
908 );
909 assert_eq!(config.validation_mode, ValidationMode::BestEffort);
910 assert_eq!(config.circuit_failure_threshold, 5);
911 assert_eq!(config.shutdown_timeout, Duration::from_secs(60));
912 }
913
914 #[test]
915 fn test_pool_config_default() {
916 let config = PoolConfig::default();
917 assert_eq!(config.size, 10);
918 assert_eq!(config.exhaustion_policy, ExhaustionPolicy::FailFast);
919 assert_eq!(config.validation_mode, ValidationMode::FailFast);
920 }
921
922 #[test]
923 fn test_slot_meta_is_available() {
924 let slot = SlotMeta::new_healthy(0);
926 assert!(slot.is_available());
927
928 let slot = SlotMeta::new_unhealthy(0);
930 assert!(!slot.is_available());
931
932 let mut slot = SlotMeta::new_healthy(0);
934 slot.circuit = Circuit::Open {
935 until: Instant::now() + Duration::from_secs(60),
936 };
937 assert!(!slot.is_available());
938 }
939
940 #[test]
941 fn test_slot_meta_needs_reconnect() {
942 let mut slot = SlotMeta::new_unhealthy(0);
943 let backoff = Duration::from_millis(100);
944
945 assert!(slot.needs_reconnect(backoff));
947
948 slot.state = ConnState::Reconnecting;
950 assert!(!slot.needs_reconnect(backoff));
951
952 slot.state = ConnState::Healthy;
954 assert!(!slot.needs_reconnect(backoff));
955 }
956
957 #[test]
958 fn test_slot_meta_record_success() {
959 let mut slot = SlotMeta::new_unhealthy(0);
960 slot.consecutive_failures = 5;
961
962 slot.record_success();
963
964 assert_eq!(slot.consecutive_failures, 0);
965 assert!(matches!(slot.state, ConnState::Healthy));
966 assert!(matches!(slot.circuit, Circuit::Closed));
967 }
968
969 #[test]
970 fn test_slot_meta_record_failure_opens_circuit() {
971 let mut slot = SlotMeta::new_healthy(0);
972 slot.consecutive_failures = 2;
973
974 let config = PoolConfig::default().circuit_failure_threshold(3);
975
976 slot.record_failure(&config);
977
978 assert_eq!(slot.consecutive_failures, 3);
979 assert!(matches!(slot.circuit, Circuit::Open { .. }));
980 }
981}