1use std::fmt;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::sync::{Arc, Condvar, Mutex};
20use std::time::{Duration, Instant};
21
22use crate::pool_core::{
23 Circuit, ConnState, ExhaustionPolicy, PoolError, PoolPhase, ValidationMode,
24};
25use crate::{Node, NodeOpt, Opts, Value, VoltError, VoltTable, block_for_result, node};
26
27#[cfg(feature = "tracing")]
32macro_rules! pool_trace {
33 ($($arg:tt)*) => { tracing::trace!($($arg)*) };
34}
35#[cfg(not(feature = "tracing"))]
36macro_rules! pool_trace {
37 ($($arg:tt)*) => {};
38}
39
40#[cfg(feature = "tracing")]
41macro_rules! pool_debug {
42 ($($arg:tt)*) => { tracing::debug!($($arg)*) };
43}
44#[cfg(not(feature = "tracing"))]
45macro_rules! pool_debug {
46 ($($arg:tt)*) => {};
47}
48
49#[cfg(feature = "tracing")]
50macro_rules! pool_info {
51 ($($arg:tt)*) => { tracing::info!($($arg)*) };
52}
53#[cfg(not(feature = "tracing"))]
54macro_rules! pool_info {
55 ($($arg:tt)*) => {};
56}
57
58#[cfg(feature = "tracing")]
59macro_rules! pool_warn {
60 ($($arg:tt)*) => { tracing::warn!($($arg)*) };
61}
62#[cfg(not(feature = "tracing"))]
63macro_rules! pool_warn {
64 ($($arg:tt)*) => {};
65}
66
67#[cfg(feature = "tracing")]
68macro_rules! pool_error {
69 ($($arg:tt)*) => { tracing::error!($($arg)*) };
70}
71#[cfg(not(feature = "tracing"))]
72macro_rules! pool_error {
73 ($($arg:tt)*) => {};
74}
75
76#[cfg(feature = "metrics")]
81mod pool_metrics {
82 use metrics::{counter, gauge};
83
84 pub fn set_connections_total(count: usize) {
85 gauge!("voltdb_pool_connections_total").set(count as f64);
86 }
87
88 pub fn set_connections_healthy(count: usize) {
89 gauge!("voltdb_pool_connections_healthy").set(count as f64);
90 }
91
92 pub fn inc_reconnect_total() {
93 counter!("voltdb_pool_reconnect_total").increment(1);
94 }
95
96 pub fn inc_circuit_open_total() {
97 counter!("voltdb_pool_circuit_open_total").increment(1);
98 }
99
100 pub fn inc_requests_failed_total() {
101 counter!("voltdb_pool_requests_failed_total").increment(1);
102 }
103
104 pub fn inc_requests_total() {
105 counter!("voltdb_pool_requests_total").increment(1);
106 }
107}
108
109#[cfg(not(feature = "metrics"))]
110mod pool_metrics {
111 pub fn set_connections_total(_count: usize) {}
112 pub fn set_connections_healthy(_count: usize) {}
113 pub fn inc_reconnect_total() {}
114 pub fn inc_circuit_open_total() {}
115 pub fn inc_requests_failed_total() {}
116 pub fn inc_requests_total() {}
117}
118
119#[derive(Debug, Clone)]
125pub struct PoolConfig {
126 pub size: usize,
128 pub reconnect_backoff: Duration,
130 pub circuit_open_duration: Duration,
132 pub exhaustion_policy: ExhaustionPolicy,
134 pub validation_mode: ValidationMode,
136 pub circuit_failure_threshold: u32,
138 pub shutdown_timeout: Duration,
143}
144
145impl Default for PoolConfig {
146 fn default() -> Self {
147 Self {
148 size: 10,
149 reconnect_backoff: Duration::from_secs(5),
150 circuit_open_duration: Duration::from_secs(30),
151 exhaustion_policy: ExhaustionPolicy::FailFast,
152 validation_mode: ValidationMode::FailFast,
153 circuit_failure_threshold: 3,
154 shutdown_timeout: Duration::from_secs(30),
155 }
156 }
157}
158
159impl PoolConfig {
160 pub fn new() -> Self {
161 Self::default()
162 }
163
164 pub fn size(mut self, size: usize) -> Self {
165 self.size = size;
166 self
167 }
168
169 pub fn reconnect_backoff(mut self, duration: Duration) -> Self {
170 self.reconnect_backoff = duration;
171 self
172 }
173
174 pub fn circuit_open_duration(mut self, duration: Duration) -> Self {
175 self.circuit_open_duration = duration;
176 self
177 }
178
179 pub fn exhaustion_policy(mut self, policy: ExhaustionPolicy) -> Self {
180 self.exhaustion_policy = policy;
181 self
182 }
183
184 pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
185 self.validation_mode = mode;
186 self
187 }
188
189 pub fn circuit_failure_threshold(mut self, threshold: u32) -> Self {
190 self.circuit_failure_threshold = threshold;
191 self
192 }
193
194 pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
195 self.shutdown_timeout = timeout;
196 self
197 }
198}
199
200#[derive(Debug)]
211struct SlotMeta {
212 state: ConnState,
213 circuit: Circuit,
214 consecutive_failures: u32,
215 last_reconnect_attempt: Option<Instant>,
216 host_idx: usize,
217}
218
219impl SlotMeta {
220 fn new_healthy(host_idx: usize) -> Self {
221 Self {
222 state: ConnState::Healthy,
223 circuit: Circuit::Closed,
224 consecutive_failures: 0,
225 last_reconnect_attempt: None,
226 host_idx,
227 }
228 }
229
230 fn new_unhealthy(host_idx: usize) -> Self {
231 Self {
232 state: ConnState::Unhealthy {
233 since: Instant::now(),
234 },
235 circuit: Circuit::Open {
236 until: Instant::now() + Duration::from_secs(5),
237 },
238 consecutive_failures: 1,
239 last_reconnect_attempt: None,
240 host_idx,
241 }
242 }
243
244 fn is_available(&self) -> bool {
247 self.state.is_healthy() && self.circuit.should_allow()
248 }
249
250 fn needs_reconnect(&self, backoff: Duration) -> bool {
252 if self.state.is_healthy() || self.state.is_reconnecting() {
253 return false;
254 }
255
256 match self.last_reconnect_attempt {
257 None => true,
258 Some(last) => Instant::now().duration_since(last) >= backoff,
259 }
260 }
261
262 fn record_success(&mut self) {
263 self.consecutive_failures = 0;
264 self.state = ConnState::Healthy;
265 self.circuit.close();
266 }
267
268 fn record_failure(&mut self, config: &PoolConfig) {
269 self.consecutive_failures += 1;
270 self.state = ConnState::Unhealthy {
271 since: Instant::now(),
272 };
273
274 if self.consecutive_failures >= config.circuit_failure_threshold {
275 self.circuit.open(config.circuit_open_duration);
276 pool_metrics::inc_circuit_open_total();
277 }
278 }
279}
280
281struct InnerPool {
286 opts: Opts,
287 config: PoolConfig,
288 slots: Vec<SlotMeta>,
289 nodes: Vec<Arc<Mutex<Option<Node>>>>,
293 phase: PoolPhase,
294}
295
296impl fmt::Debug for InnerPool {
297 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298 f.debug_struct("InnerPool")
299 .field("config", &self.config)
300 .field("slots_count", &self.slots.len())
301 .field("phase", &self.phase)
302 .finish()
303 }
304}
305
306impl InnerPool {
307 fn node_opt(&self, host_idx: usize) -> Result<NodeOpt, VoltError> {
308 let ip_port = self
309 .opts
310 .0
311 .ip_ports
312 .get(host_idx)
313 .cloned()
314 .ok_or(VoltError::InvalidConfig)?;
315 Ok(NodeOpt {
316 ip_port,
317 pass: self.opts.0.pass.clone(),
318 user: self.opts.0.user.clone(),
319 connect_timeout: self.opts.0.connect_timeout,
320 read_timeout: self.opts.0.read_timeout,
321 })
322 }
323
324 fn new(opts: Opts, config: PoolConfig) -> Result<Self, VoltError> {
325 let num_hosts = opts.0.ip_ports.len();
326 let mut inner = InnerPool {
327 opts,
328 config: config.clone(),
329 slots: Vec::with_capacity(config.size),
330 nodes: Vec::with_capacity(config.size),
331 phase: PoolPhase::Running,
332 };
333
334 for i in 0..config.size {
335 let host_idx = i % num_hosts;
336 let node_opt = inner.node_opt(host_idx)?;
337
338 pool_debug!(slot = i, host = host_idx, "creating connection");
339
340 match node::Node::new(node_opt) {
341 Ok(node) => {
342 inner.slots.push(SlotMeta::new_healthy(host_idx));
343 inner.nodes.push(Arc::new(Mutex::new(Some(node))));
344 pool_info!(slot = i, "connection established");
345 }
346 Err(e) => match config.validation_mode {
347 ValidationMode::FailFast => {
348 pool_error!(slot = i, error = ?e, "connection failed, aborting pool creation");
349 return Err(e);
350 }
351 ValidationMode::BestEffort => {
352 pool_warn!(slot = i, error = ?e, "connection failed, marking unhealthy");
353 inner.slots.push(SlotMeta::new_unhealthy(host_idx));
354 inner.nodes.push(Arc::new(Mutex::new(None)));
355 }
356 },
357 }
358 }
359
360 inner.update_metrics();
361 pool_info!(
362 size = config.size,
363 healthy = inner.healthy_count(),
364 "pool initialized"
365 );
366
367 Ok(inner)
368 }
369
370 fn healthy_count(&self) -> usize {
371 self.slots.iter().filter(|s| s.state.is_healthy()).count()
372 }
373
374 fn update_metrics(&self) {
375 pool_metrics::set_connections_total(self.slots.len());
376 pool_metrics::set_connections_healthy(self.healthy_count());
377 }
378}
379
380pub struct Pool {
404 inner: Arc<(Mutex<InnerPool>, Condvar)>,
405 counter: AtomicUsize,
406 shutdown_flag: AtomicBool,
407 config: PoolConfig,
408}
409
410impl fmt::Debug for Pool {
411 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
412 let inner = self.inner.0.lock().ok();
413 f.debug_struct("Pool")
414 .field("counter", &self.counter.load(Ordering::Relaxed))
415 .field("shutdown", &self.shutdown_flag.load(Ordering::Relaxed))
416 .field(
417 "healthy",
418 &inner.as_ref().map(|i| i.healthy_count()).unwrap_or(0),
419 )
420 .field("config", &self.config)
421 .finish()
422 }
423}
424
425impl Pool {
426 pub fn new<T: Into<Opts>>(opts: T) -> Result<Pool, VoltError> {
428 Pool::with_config(opts, PoolConfig::default())
429 }
430
431 pub fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<Pool, VoltError> {
433 Pool::with_config(opts, PoolConfig::new().size(size))
434 }
435
436 pub fn with_config<T: Into<Opts>>(opts: T, config: PoolConfig) -> Result<Pool, VoltError> {
438 let inner = InnerPool::new(opts.into(), config.clone())?;
439 Ok(Pool {
440 inner: Arc::new((Mutex::new(inner), Condvar::new())),
441 counter: AtomicUsize::new(0),
442 shutdown_flag: AtomicBool::new(false),
443 config,
444 })
445 }
446
447 pub fn get_conn(&self) -> Result<PooledConn<'_>, VoltError> {
454 if self.shutdown_flag.load(Ordering::Relaxed) {
455 pool_warn!("get_conn called on shutdown pool");
456 return Err(PoolError::PoolShutdown.into());
457 }
458
459 pool_metrics::inc_requests_total();
460
461 let preferred_idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.config.size;
462
463 match self.config.exhaustion_policy {
464 ExhaustionPolicy::FailFast => self.get_conn_failfast(preferred_idx),
465 ExhaustionPolicy::Block { timeout } => self.get_conn_blocking(preferred_idx, timeout),
466 }
467 }
468
469 fn get_conn_failfast(&self, preferred_idx: usize) -> Result<PooledConn<'_>, VoltError> {
470 let (lock, _cvar) = &*self.inner;
471 let inner = lock.lock().map_err(|_| PoolError::LockPoisoned)?;
472
473 if inner.phase != PoolPhase::Running {
474 return Err(PoolError::PoolShutdown.into());
475 }
476
477 if inner.slots[preferred_idx].is_available() {
479 return self.checkout_slot(&inner, preferred_idx);
480 }
481
482 for i in 1..self.config.size {
484 let idx = (preferred_idx + i) % self.config.size;
485 if inner.slots[idx].is_available() {
486 pool_debug!(
487 preferred = preferred_idx,
488 actual = idx,
489 "using alternate connection"
490 );
491 return self.checkout_slot(&inner, idx);
492 }
493 }
494
495 pool_warn!("no healthy connections available");
496 pool_metrics::inc_requests_failed_total();
497 Err(PoolError::PoolExhausted.into())
498 }
499
500 fn get_conn_blocking(
501 &self,
502 preferred_idx: usize,
503 timeout: Duration,
504 ) -> Result<PooledConn<'_>, VoltError> {
505 let deadline = Instant::now() + timeout;
506 let (lock, cvar) = &*self.inner;
507
508 let mut inner = lock.lock().map_err(|_| PoolError::LockPoisoned)?;
509
510 loop {
511 if inner.phase != PoolPhase::Running {
512 return Err(PoolError::PoolShutdown.into());
513 }
514
515 if inner.slots[preferred_idx].is_available() {
517 return self.checkout_slot(&inner, preferred_idx);
518 }
519
520 for i in 1..self.config.size {
522 let idx = (preferred_idx + i) % self.config.size;
523 if inner.slots[idx].is_available() {
524 return self.checkout_slot(&inner, idx);
525 }
526 }
527
528 let remaining = deadline.saturating_duration_since(Instant::now());
530 if remaining.is_zero() {
531 pool_warn!(timeout = ?timeout, "connection wait timed out");
532 pool_metrics::inc_requests_failed_total();
533 return Err(PoolError::Timeout.into());
534 }
535
536 pool_trace!("waiting for available connection");
537 let (guard, _timeout_result) = cvar
538 .wait_timeout(inner, remaining)
539 .map_err(|_| PoolError::LockPoisoned)?;
540 inner = guard;
541 }
542 }
543
544 fn checkout_slot<'a>(
547 &'a self,
548 inner: &InnerPool,
549 idx: usize,
550 ) -> Result<PooledConn<'a>, VoltError> {
551 let node = Arc::clone(&inner.nodes[idx]);
553 let config = inner.config.clone();
554 let host_idx = inner.slots[idx].host_idx;
555
556 pool_trace!(slot = idx, "connection acquired");
557
558 Ok(PooledConn {
560 pool: self,
561 idx,
562 node,
563 config,
564 host_idx,
565 })
566 }
567
568 fn report_fatal_error(&self, idx: usize) {
570 let (lock, cvar) = &*self.inner;
571
572 #[allow(clippy::type_complexity)]
574 let mut reconnect_info: Option<(Arc<Mutex<Option<Node>>>, NodeOpt, PoolConfig)> = None;
575
576 if let Ok(mut inner) = lock.lock() {
577 let config = inner.config.clone();
579 inner.slots[idx].record_failure(&config);
580 pool_debug!(slot = idx, "fatal error reported");
581
582 inner.update_metrics();
583
584 cvar.notify_all();
586
587 if !self.shutdown_flag.load(Ordering::Relaxed) {
589 let backoff = inner.config.reconnect_backoff;
590 if inner.slots[idx].needs_reconnect(backoff) {
591 let node_arc = Arc::clone(&inner.nodes[idx]);
593 let host_idx = inner.slots[idx].host_idx;
594 if let Ok(node_opt) = inner.node_opt(host_idx) {
595 inner.slots[idx].state = ConnState::Reconnecting;
596 inner.slots[idx].last_reconnect_attempt = Some(Instant::now());
597
598 reconnect_info = Some((node_arc, node_opt, config));
599 }
600 }
601 }
602 }
603
604 if let Some((node_arc, node_opt, config)) = reconnect_info {
606 self.do_reconnect(idx, node_arc, node_opt, config);
607 }
608 }
609
610 fn do_reconnect(
612 &self,
613 idx: usize,
614 node_arc: Arc<Mutex<Option<Node>>>,
615 node_opt: NodeOpt,
616 config: PoolConfig,
617 ) {
618 pool_info!(slot = idx, "attempting reconnection");
619 pool_metrics::inc_reconnect_total();
620
621 match node::Node::new(node_opt) {
623 Ok(new_node) => {
624 if let Ok(mut node_guard) = node_arc.lock() {
626 *node_guard = Some(new_node);
627 }
628
629 let (lock, cvar) = &*self.inner;
631 if let Ok(mut inner) = lock.lock() {
632 inner.slots[idx].record_success();
633 inner.update_metrics();
634 cvar.notify_all(); }
636 pool_info!(slot = idx, "reconnection successful");
637 }
638 Err(_e) => {
639 let (lock, _) = &*self.inner;
641 if let Ok(mut inner) = lock.lock() {
642 inner.slots[idx].record_failure(&config);
643 inner.update_metrics();
644 }
645 pool_error!(slot = idx, error = ?_e, "reconnection failed");
646 }
647 }
648 }
649
650 fn mark_success(&self, idx: usize) {
652 let (lock, _) = &*self.inner;
653 if let Ok(mut inner) = lock.lock() {
654 inner.slots[idx].record_success();
655 inner.update_metrics();
656 }
657 }
658
659 pub fn shutdown(&self) {
669 pool_info!("initiating pool shutdown");
670 self.shutdown_flag.store(true, Ordering::Relaxed);
671
672 let (lock, cvar) = &*self.inner;
673
674 if let Ok(mut inner) = lock.lock() {
675 inner.phase = PoolPhase::Shutdown;
677 pool_info!("entering shutdown phase");
678
679 for slot in &mut inner.slots {
681 slot.state = ConnState::Unhealthy {
682 since: Instant::now(),
683 };
684 }
685
686 for node_arc in &inner.nodes {
688 if let Ok(mut node_guard) = node_arc.lock() {
689 *node_guard = None;
690 }
691 }
692 inner.update_metrics();
693
694 cvar.notify_all();
696 }
697
698 pool_info!("pool shutdown complete");
699 }
700
701 pub fn is_shutdown(&self) -> bool {
703 self.shutdown_flag.load(Ordering::Relaxed)
704 }
705
706 pub fn stats(&self) -> PoolStats {
708 let (lock, _) = &*self.inner;
709 let inner = lock.lock().ok();
710 PoolStats {
711 size: self.config.size,
712 healthy: inner.as_ref().map(|i| i.healthy_count()).unwrap_or(0),
713 total_requests: self.counter.load(Ordering::Relaxed),
714 is_shutdown: self.shutdown_flag.load(Ordering::Relaxed),
715 }
716 }
717}
718
719#[derive(Debug, Clone)]
721pub struct PoolStats {
722 pub size: usize,
723 pub healthy: usize,
724 pub total_requests: usize,
725 pub is_shutdown: bool,
726}
727
728pub struct PooledConn<'a> {
740 pool: &'a Pool,
741 idx: usize,
742 node: Arc<Mutex<Option<Node>>>,
743 #[allow(dead_code)] config: PoolConfig,
745 #[allow(dead_code)]
746 host_idx: usize,
747}
748
749impl fmt::Debug for PooledConn<'_> {
750 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
751 f.debug_struct("PooledConn")
752 .field("idx", &self.idx)
753 .field("host_idx", &self.host_idx)
754 .finish()
755 }
756}
757
758impl PooledConn<'_> {
759 pub fn query(&mut self, sql: &str) -> Result<VoltTable, VoltError> {
761 pool_trace!(slot = self.idx, sql = sql, "executing query");
762
763 let mut node_guard = self
764 .node
765 .lock()
766 .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
767
768 let node = node_guard
769 .as_mut()
770 .ok_or(VoltError::ConnectionNotAvailable)?;
771
772 let result = node.query(sql).and_then(|r| block_for_result(&r));
773 drop(node_guard);
774
775 self.handle_result(&result);
776 result
777 }
778
779 pub fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
781 pool_trace!(slot = self.idx, "listing procedures");
782
783 let mut node_guard = self
784 .node
785 .lock()
786 .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
787
788 let node = node_guard
789 .as_mut()
790 .ok_or(VoltError::ConnectionNotAvailable)?;
791
792 let result = node.list_procedures().and_then(|r| block_for_result(&r));
793 drop(node_guard);
794
795 self.handle_result(&result);
796 result
797 }
798
799 pub fn call_sp(&mut self, proc: &str, params: Vec<&dyn Value>) -> Result<VoltTable, VoltError> {
801 pool_trace!(
802 slot = self.idx,
803 procedure = proc,
804 "calling stored procedure"
805 );
806
807 let mut node_guard = self
808 .node
809 .lock()
810 .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
811
812 let node = node_guard
813 .as_mut()
814 .ok_or(VoltError::ConnectionNotAvailable)?;
815
816 let result = node
817 .call_sp(proc, params)
818 .and_then(|r| block_for_result(&r));
819 drop(node_guard);
820
821 self.handle_result(&result);
822 result
823 }
824
825 pub fn upload_jar(&mut self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
827 pool_trace!(slot = self.idx, size = bs.len(), "uploading jar");
828
829 let mut node_guard = self
830 .node
831 .lock()
832 .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
833
834 let node = node_guard
835 .as_mut()
836 .ok_or(VoltError::ConnectionNotAvailable)?;
837
838 let result = node.upload_jar(bs).and_then(|r| block_for_result(&r));
839 drop(node_guard);
840
841 self.handle_result(&result);
842 result
843 }
844
845 fn handle_result<T>(&self, result: &Result<T, VoltError>) {
847 match result {
848 Ok(_) => {
849 self.pool.mark_success(self.idx);
850 }
851 Err(e) if e.is_connection_fatal() => {
852 pool_error!(slot = self.idx, error = ?e, "fatal connection error detected");
853 if let Ok(mut guard) = self.node.lock() {
855 *guard = None;
856 }
857 self.pool.report_fatal_error(self.idx);
858 }
859 Err(_) => {
860 }
862 }
863 }
864
865 pub fn slot_index(&self) -> usize {
867 self.idx
868 }
869}
870
871#[cfg(test)]
879mod tests {
880 use super::*;
881
882 #[test]
883 fn test_pool_config_builder() {
884 let config = PoolConfig::new()
885 .size(20)
886 .reconnect_backoff(Duration::from_secs(10))
887 .circuit_open_duration(Duration::from_secs(60))
888 .exhaustion_policy(ExhaustionPolicy::Block {
889 timeout: Duration::from_secs(5),
890 })
891 .validation_mode(ValidationMode::BestEffort)
892 .circuit_failure_threshold(5)
893 .shutdown_timeout(Duration::from_secs(60));
894
895 assert_eq!(config.size, 20);
896 assert_eq!(config.reconnect_backoff, Duration::from_secs(10));
897 assert_eq!(config.circuit_open_duration, Duration::from_secs(60));
898 assert_eq!(
899 config.exhaustion_policy,
900 ExhaustionPolicy::Block {
901 timeout: Duration::from_secs(5)
902 }
903 );
904 assert_eq!(config.validation_mode, ValidationMode::BestEffort);
905 assert_eq!(config.circuit_failure_threshold, 5);
906 assert_eq!(config.shutdown_timeout, Duration::from_secs(60));
907 }
908
909 #[test]
910 fn test_pool_config_default() {
911 let config = PoolConfig::default();
912 assert_eq!(config.size, 10);
913 assert_eq!(config.exhaustion_policy, ExhaustionPolicy::FailFast);
914 assert_eq!(config.validation_mode, ValidationMode::FailFast);
915 }
916
917 #[test]
918 fn test_slot_meta_is_available() {
919 let slot = SlotMeta::new_healthy(0);
921 assert!(slot.is_available());
922
923 let slot = SlotMeta::new_unhealthy(0);
925 assert!(!slot.is_available());
926
927 let mut slot = SlotMeta::new_healthy(0);
929 slot.circuit = Circuit::Open {
930 until: Instant::now() + Duration::from_secs(60),
931 };
932 assert!(!slot.is_available());
933 }
934
935 #[test]
936 fn test_slot_meta_needs_reconnect() {
937 let mut slot = SlotMeta::new_unhealthy(0);
938 let backoff = Duration::from_millis(100);
939
940 assert!(slot.needs_reconnect(backoff));
942
943 slot.state = ConnState::Reconnecting;
945 assert!(!slot.needs_reconnect(backoff));
946
947 slot.state = ConnState::Healthy;
949 assert!(!slot.needs_reconnect(backoff));
950 }
951
952 #[test]
953 fn test_slot_meta_record_success() {
954 let mut slot = SlotMeta::new_unhealthy(0);
955 slot.consecutive_failures = 5;
956
957 slot.record_success();
958
959 assert_eq!(slot.consecutive_failures, 0);
960 assert!(matches!(slot.state, ConnState::Healthy));
961 assert!(matches!(slot.circuit, Circuit::Closed));
962 }
963
964 #[test]
965 fn test_slot_meta_record_failure_opens_circuit() {
966 let mut slot = SlotMeta::new_healthy(0);
967 slot.consecutive_failures = 2;
968
969 let config = PoolConfig::default().circuit_failure_threshold(3);
970
971 slot.record_failure(&config);
972
973 assert_eq!(slot.consecutive_failures, 3);
974 assert!(matches!(slot.circuit, Circuit::Open { .. }));
975 }
976}