1#![cfg(feature = "tokio")]
18
19use std::fmt;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use tokio::sync::{Mutex, Notify};
24use tokio::time::timeout;
25
26use crate::async_node::{AsyncNode, async_block_for_result};
27use crate::{NodeOpt, Opts, Value, VoltError, VoltTable};
28
29#[cfg(feature = "tracing")]
34macro_rules! async_pool_trace {
35 ($($arg:tt)*) => { tracing::trace!($($arg)*) };
36}
37#[cfg(not(feature = "tracing"))]
38macro_rules! async_pool_trace {
39 ($($arg:tt)*) => {};
40}
41
42#[cfg(feature = "tracing")]
43macro_rules! async_pool_debug {
44 ($($arg:tt)*) => { tracing::debug!($($arg)*) };
45}
46#[cfg(not(feature = "tracing"))]
47macro_rules! async_pool_debug {
48 ($($arg:tt)*) => {};
49}
50
51#[cfg(feature = "tracing")]
52macro_rules! async_pool_info {
53 ($($arg:tt)*) => { tracing::info!($($arg)*) };
54}
55#[cfg(not(feature = "tracing"))]
56macro_rules! async_pool_info {
57 ($($arg:tt)*) => {};
58}
59
60#[cfg(feature = "tracing")]
61macro_rules! async_pool_warn {
62 ($($arg:tt)*) => { tracing::warn!($($arg)*) };
63}
64#[cfg(not(feature = "tracing"))]
65macro_rules! async_pool_warn {
66 ($($arg:tt)*) => {};
67}
68
69#[cfg(feature = "tracing")]
70macro_rules! async_pool_error {
71 ($($arg:tt)*) => { tracing::error!($($arg)*) };
72}
73#[cfg(not(feature = "tracing"))]
74macro_rules! async_pool_error {
75 ($($arg:tt)*) => {};
76}
77
78#[cfg(feature = "metrics")]
83mod async_pool_metrics {
84 use metrics::{counter, gauge};
85
86 pub fn set_connections_total(count: usize) {
87 gauge!("voltdb_async_pool_connections_total").set(count as f64);
88 }
89
90 pub fn set_connections_healthy(count: usize) {
91 gauge!("voltdb_async_pool_connections_healthy").set(count as f64);
92 }
93
94 pub fn inc_reconnect_total() {
95 counter!("voltdb_async_pool_reconnect_total").increment(1);
96 }
97
98 pub fn inc_circuit_open_total() {
99 counter!("voltdb_async_pool_circuit_open_total").increment(1);
100 }
101
102 pub fn inc_requests_failed_total() {
103 counter!("voltdb_async_pool_requests_failed_total").increment(1);
104 }
105
106 pub fn inc_requests_total() {
107 counter!("voltdb_async_pool_requests_total").increment(1);
108 }
109}
110
111#[cfg(not(feature = "metrics"))]
112mod async_pool_metrics {
113 pub fn set_connections_total(_count: usize) {}
114 pub fn set_connections_healthy(_count: usize) {}
115 pub fn inc_reconnect_total() {}
116 pub fn inc_circuit_open_total() {}
117 pub fn inc_requests_failed_total() {}
118 pub fn inc_requests_total() {}
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
127pub enum AsyncPoolError {
128 PoolShutdown,
130 CircuitOpen,
132 PoolExhausted,
134 Timeout,
136 LockPoisoned,
138}
139
140impl fmt::Display for AsyncPoolError {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 match self {
143 AsyncPoolError::PoolShutdown => write!(f, "Pool is shutting down"),
144 AsyncPoolError::CircuitOpen => write!(f, "Circuit breaker is open"),
145 AsyncPoolError::PoolExhausted => write!(f, "Pool exhausted, no healthy connections"),
146 AsyncPoolError::Timeout => write!(f, "Timed out waiting for connection"),
147 AsyncPoolError::LockPoisoned => write!(f, "Internal lock poisoned"),
148 }
149 }
150}
151
152impl std::error::Error for AsyncPoolError {}
153
154impl From<AsyncPoolError> for VoltError {
155 fn from(e: AsyncPoolError) -> Self {
156 match e {
157 AsyncPoolError::PoolShutdown => VoltError::ConnectionNotAvailable,
158 AsyncPoolError::CircuitOpen => VoltError::ConnectionNotAvailable,
159 AsyncPoolError::PoolExhausted => VoltError::ConnectionNotAvailable,
160 AsyncPoolError::Timeout => VoltError::Timeout,
161 AsyncPoolError::LockPoisoned => {
162 VoltError::PoisonError("Pool lock poisoned".to_string())
163 }
164 }
165 }
166}
167
168#[derive(Debug, Clone)]
174pub enum ConnState {
175 Healthy,
177 Unhealthy { since: Instant },
179 Reconnecting,
181}
182
183impl ConnState {
184 fn is_healthy(&self) -> bool {
185 matches!(self, ConnState::Healthy)
186 }
187
188 fn is_reconnecting(&self) -> bool {
189 matches!(self, ConnState::Reconnecting)
190 }
191}
192
193#[derive(Debug, Clone)]
199pub enum Circuit {
200 Closed,
202 Open { until: Instant },
204 HalfOpen,
206}
207
208impl Circuit {
209 fn should_allow(&self) -> bool {
211 match self {
212 Circuit::Closed => true,
213 Circuit::Open { until } => Instant::now() >= *until,
214 Circuit::HalfOpen => true,
215 }
216 }
217
218 fn open(&mut self, duration: Duration) {
220 *self = Circuit::Open {
221 until: Instant::now() + duration,
222 };
223 async_pool_metrics::inc_circuit_open_total();
224 async_pool_warn!("circuit breaker opened");
225 }
226
227 #[allow(dead_code)]
229 fn half_open(&mut self) {
230 *self = Circuit::HalfOpen;
231 async_pool_debug!("circuit breaker half-open");
232 }
233
234 fn close(&mut self) {
236 *self = Circuit::Closed;
237 async_pool_info!("circuit breaker closed");
238 }
239}
240
241#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
247pub enum ExhaustionPolicy {
248 #[default]
250 FailFast,
251 Block { timeout: Duration },
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
257pub enum ValidationMode {
258 #[default]
260 FailFast,
261 BestEffort,
263}
264
265#[derive(Debug, Clone)]
267pub struct AsyncPoolConfig {
268 pub size: usize,
270 pub reconnect_backoff: Duration,
272 pub circuit_open_duration: Duration,
274 pub exhaustion_policy: ExhaustionPolicy,
276 pub validation_mode: ValidationMode,
278 pub circuit_failure_threshold: u32,
280 pub shutdown_timeout: Duration,
285}
286
287impl Default for AsyncPoolConfig {
288 fn default() -> Self {
289 Self {
290 size: 10,
291 reconnect_backoff: Duration::from_secs(5),
292 circuit_open_duration: Duration::from_secs(30),
293 exhaustion_policy: ExhaustionPolicy::FailFast,
294 validation_mode: ValidationMode::FailFast,
295 circuit_failure_threshold: 3,
296 shutdown_timeout: Duration::from_secs(30),
297 }
298 }
299}
300
301impl AsyncPoolConfig {
302 pub fn new() -> Self {
303 Self::default()
304 }
305
306 pub fn size(mut self, size: usize) -> Self {
307 self.size = size;
308 self
309 }
310
311 pub fn reconnect_backoff(mut self, duration: Duration) -> Self {
312 self.reconnect_backoff = duration;
313 self
314 }
315
316 pub fn circuit_open_duration(mut self, duration: Duration) -> Self {
317 self.circuit_open_duration = duration;
318 self
319 }
320
321 pub fn exhaustion_policy(mut self, policy: ExhaustionPolicy) -> Self {
322 self.exhaustion_policy = policy;
323 self
324 }
325
326 pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
327 self.validation_mode = mode;
328 self
329 }
330
331 pub fn circuit_failure_threshold(mut self, threshold: u32) -> Self {
332 self.circuit_failure_threshold = threshold;
333 self
334 }
335
336 pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
337 self.shutdown_timeout = timeout;
338 self
339 }
340}
341
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
348enum PoolPhase {
349 Running,
351 Shutdown,
353}
354
355#[derive(Debug)]
361struct SlotMeta {
362 state: ConnState,
363 circuit: Circuit,
364 consecutive_failures: u32,
365 last_reconnect_attempt: Option<Instant>,
366 host_idx: usize,
367}
368
369impl SlotMeta {
370 fn new_healthy(host_idx: usize) -> Self {
371 Self {
372 state: ConnState::Healthy,
373 circuit: Circuit::Closed,
374 consecutive_failures: 0,
375 last_reconnect_attempt: None,
376 host_idx,
377 }
378 }
379
380 fn new_unhealthy(host_idx: usize) -> Self {
381 Self {
382 state: ConnState::Unhealthy {
383 since: Instant::now(),
384 },
385 circuit: Circuit::Open {
386 until: Instant::now() + Duration::from_secs(5),
387 },
388 consecutive_failures: 1,
389 last_reconnect_attempt: None,
390 host_idx,
391 }
392 }
393
394 fn is_available(&self) -> bool {
396 self.state.is_healthy() && self.circuit.should_allow()
397 }
398
399 fn needs_reconnect(&self, backoff: Duration) -> bool {
401 if self.state.is_healthy() || self.state.is_reconnecting() {
402 return false;
403 }
404
405 match self.last_reconnect_attempt {
406 None => true,
407 Some(last) => Instant::now().duration_since(last) >= backoff,
408 }
409 }
410
411 fn record_success(&mut self) {
412 self.consecutive_failures = 0;
413 self.state = ConnState::Healthy;
414 self.circuit.close();
415 }
416
417 fn record_failure(&mut self, config: &AsyncPoolConfig) {
418 self.consecutive_failures += 1;
419 self.state = ConnState::Unhealthy {
420 since: Instant::now(),
421 };
422
423 if self.consecutive_failures >= config.circuit_failure_threshold {
424 self.circuit.open(config.circuit_open_duration);
425 }
426 }
427}
428
429struct AsyncInnerPool {
434 opts: Opts,
435 config: AsyncPoolConfig,
436 slots: Vec<SlotMeta>,
437 nodes: Vec<Arc<Mutex<Option<AsyncNode>>>>,
438 phase: PoolPhase,
439}
440
441impl fmt::Debug for AsyncInnerPool {
442 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
443 f.debug_struct("AsyncInnerPool")
444 .field("config", &self.config)
445 .field("slots_count", &self.slots.len())
446 .field("phase", &self.phase)
447 .finish()
448 }
449}
450
451impl AsyncInnerPool {
452 fn node_opt(&self, host_idx: usize) -> Result<NodeOpt, VoltError> {
453 let ip_port = self
454 .opts
455 .0
456 .ip_ports
457 .get(host_idx)
458 .cloned()
459 .ok_or(VoltError::InvalidConfig)?;
460 Ok(NodeOpt {
461 ip_port,
462 pass: self.opts.0.pass.clone(),
463 user: self.opts.0.user.clone(),
464 connect_timeout: self.opts.0.connect_timeout,
465 read_timeout: self.opts.0.read_timeout,
466 })
467 }
468
469 async fn new(opts: Opts, config: AsyncPoolConfig) -> Result<Self, VoltError> {
470 let num_hosts = opts.0.ip_ports.len();
471 let mut inner = AsyncInnerPool {
472 opts,
473 config: config.clone(),
474 slots: Vec::with_capacity(config.size),
475 nodes: Vec::with_capacity(config.size),
476 phase: PoolPhase::Running,
477 };
478
479 for i in 0..config.size {
480 let host_idx = i % num_hosts;
481 let node_opt = inner.node_opt(host_idx)?;
482
483 async_pool_debug!(slot = i, host = host_idx, "creating connection");
484
485 match AsyncNode::new(node_opt).await {
486 Ok(node) => {
487 inner.slots.push(SlotMeta::new_healthy(host_idx));
488 inner.nodes.push(Arc::new(Mutex::new(Some(node))));
489 async_pool_info!(slot = i, "connection established");
490 }
491 Err(e) => match config.validation_mode {
492 ValidationMode::FailFast => {
493 async_pool_error!(slot = i, error = ?e, "connection failed, aborting pool creation");
494 return Err(e);
495 }
496 ValidationMode::BestEffort => {
497 async_pool_warn!(slot = i, error = ?e, "connection failed, marking unhealthy");
498 inner.slots.push(SlotMeta::new_unhealthy(host_idx));
499 inner.nodes.push(Arc::new(Mutex::new(None)));
500 }
501 },
502 }
503 }
504
505 inner.update_metrics();
506 async_pool_info!(
507 size = config.size,
508 healthy = inner.healthy_count(),
509 "async pool initialized"
510 );
511
512 Ok(inner)
513 }
514
515 fn healthy_count(&self) -> usize {
516 self.slots.iter().filter(|s| s.state.is_healthy()).count()
517 }
518
519 fn update_metrics(&self) {
520 async_pool_metrics::set_connections_total(self.slots.len());
521 async_pool_metrics::set_connections_healthy(self.healthy_count());
522 }
523}
524
525pub struct AsyncPool {
543 inner: Arc<Mutex<AsyncInnerPool>>,
544 notify: Arc<Notify>,
545 counter: AtomicUsize,
546 shutdown_flag: AtomicBool,
547 config: AsyncPoolConfig,
548}
549
550impl fmt::Debug for AsyncPool {
551 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
552 f.debug_struct("AsyncPool")
553 .field("counter", &self.counter.load(Ordering::Relaxed))
554 .field("shutdown", &self.shutdown_flag.load(Ordering::Relaxed))
555 .field("config", &self.config)
556 .finish()
557 }
558}
559
560impl AsyncPool {
561 pub async fn new<T: Into<Opts>>(opts: T) -> Result<AsyncPool, VoltError> {
563 AsyncPool::with_config(opts, AsyncPoolConfig::default()).await
564 }
565
566 pub async fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<AsyncPool, VoltError> {
568 AsyncPool::with_config(opts, AsyncPoolConfig::new().size(size)).await
569 }
570
571 pub async fn with_config<T: Into<Opts>>(
573 opts: T,
574 config: AsyncPoolConfig,
575 ) -> Result<AsyncPool, VoltError> {
576 let inner = AsyncInnerPool::new(opts.into(), config.clone()).await?;
577 Ok(AsyncPool {
578 inner: Arc::new(Mutex::new(inner)),
579 notify: Arc::new(Notify::new()),
580 counter: AtomicUsize::new(0),
581 shutdown_flag: AtomicBool::new(false),
582 config,
583 })
584 }
585
586 pub async fn get_conn(&self) -> Result<AsyncPooledConn<'_>, VoltError> {
588 if self.shutdown_flag.load(Ordering::Relaxed) {
589 async_pool_warn!("get_conn called on shutdown pool");
590 return Err(AsyncPoolError::PoolShutdown.into());
591 }
592
593 async_pool_metrics::inc_requests_total();
594
595 let preferred_idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.config.size;
596
597 match self.config.exhaustion_policy {
598 ExhaustionPolicy::FailFast => self.get_conn_failfast(preferred_idx).await,
599 ExhaustionPolicy::Block {
600 timeout: wait_timeout,
601 } => self.get_conn_blocking(preferred_idx, wait_timeout).await,
602 }
603 }
604
605 async fn get_conn_failfast(
606 &self,
607 preferred_idx: usize,
608 ) -> Result<AsyncPooledConn<'_>, VoltError> {
609 let inner = self.inner.lock().await;
610
611 if inner.phase != PoolPhase::Running {
612 return Err(AsyncPoolError::PoolShutdown.into());
613 }
614
615 if inner.slots[preferred_idx].is_available() {
617 return self.checkout_slot(&inner, preferred_idx).await;
618 }
619
620 for i in 1..self.config.size {
622 let idx = (preferred_idx + i) % self.config.size;
623 if inner.slots[idx].is_available() {
624 async_pool_debug!(
625 preferred = preferred_idx,
626 actual = idx,
627 "using alternate connection"
628 );
629 return self.checkout_slot(&inner, idx).await;
630 }
631 }
632
633 async_pool_warn!("no healthy connections available");
634 async_pool_metrics::inc_requests_failed_total();
635 Err(AsyncPoolError::PoolExhausted.into())
636 }
637
638 async fn get_conn_blocking(
639 &self,
640 preferred_idx: usize,
641 wait_timeout: Duration,
642 ) -> Result<AsyncPooledConn<'_>, VoltError> {
643 let deadline = Instant::now() + wait_timeout;
644
645 loop {
646 let inner = self.inner.lock().await;
647
648 if inner.phase != PoolPhase::Running {
649 return Err(AsyncPoolError::PoolShutdown.into());
650 }
651
652 if inner.slots[preferred_idx].is_available() {
654 return self.checkout_slot(&inner, preferred_idx).await;
655 }
656
657 for i in 1..self.config.size {
659 let idx = (preferred_idx + i) % self.config.size;
660 if inner.slots[idx].is_available() {
661 return self.checkout_slot(&inner, idx).await;
662 }
663 }
664
665 let remaining = deadline.saturating_duration_since(Instant::now());
667 if remaining.is_zero() {
668 async_pool_warn!(timeout = ?wait_timeout, "connection wait timed out");
669 async_pool_metrics::inc_requests_failed_total();
670 return Err(AsyncPoolError::Timeout.into());
671 }
672
673 async_pool_trace!("waiting for available connection");
674 drop(inner);
675
676 let _ = timeout(remaining, self.notify.notified()).await;
678 }
679 }
680
681 async fn checkout_slot(
682 &self,
683 inner: &AsyncInnerPool,
684 idx: usize,
685 ) -> Result<AsyncPooledConn<'_>, VoltError> {
686 let node = Arc::clone(&inner.nodes[idx]);
687 let config = inner.config.clone();
688 let host_idx = inner.slots[idx].host_idx;
689
690 async_pool_trace!(slot = idx, "connection acquired");
691
692 Ok(AsyncPooledConn {
693 pool: self,
694 idx,
695 node,
696 config,
697 host_idx,
698 })
699 }
700
701 async fn report_fatal_error(&self, idx: usize) {
703 #[allow(clippy::type_complexity)]
704 let reconnect_info: Option<(
705 Arc<Mutex<Option<AsyncNode>>>,
706 NodeOpt,
707 AsyncPoolConfig,
708 )>;
709
710 {
711 let mut inner = self.inner.lock().await;
712 let config = inner.config.clone();
713 inner.slots[idx].record_failure(&config);
714 async_pool_debug!(slot = idx, "fatal error reported");
715
716 inner.update_metrics();
717 self.notify.notify_waiters();
718
719 if !self.shutdown_flag.load(Ordering::Relaxed) {
720 let backoff = inner.config.reconnect_backoff;
721 if inner.slots[idx].needs_reconnect(backoff) {
722 let node_arc = Arc::clone(&inner.nodes[idx]);
723 let host_idx = inner.slots[idx].host_idx;
724 if let Ok(node_opt) = inner.node_opt(host_idx) {
725 inner.slots[idx].state = ConnState::Reconnecting;
726 inner.slots[idx].last_reconnect_attempt = Some(Instant::now());
727
728 reconnect_info = Some((node_arc, node_opt, config));
729 } else {
730 reconnect_info = None;
731 }
732 } else {
733 reconnect_info = None;
734 }
735 } else {
736 reconnect_info = None;
737 }
738 }
739
740 if let Some((node_arc, node_opt, config)) = reconnect_info {
741 self.do_reconnect(idx, node_arc, node_opt, config).await;
742 }
743 }
744
745 async fn do_reconnect(
746 &self,
747 idx: usize,
748 node_arc: Arc<Mutex<Option<AsyncNode>>>,
749 node_opt: NodeOpt,
750 config: AsyncPoolConfig,
751 ) {
752 async_pool_info!(slot = idx, "attempting reconnection");
753 async_pool_metrics::inc_reconnect_total();
754
755 match AsyncNode::new(node_opt).await {
756 Ok(new_node) => {
757 {
758 let mut node_guard = node_arc.lock().await;
759 *node_guard = Some(new_node);
760 }
761
762 {
763 let mut inner = self.inner.lock().await;
764 inner.slots[idx].record_success();
765 inner.update_metrics();
766 }
767
768 self.notify.notify_waiters();
769 async_pool_info!(slot = idx, "reconnection successful");
770 }
771 Err(_e) => {
772 {
773 let mut inner = self.inner.lock().await;
774 inner.slots[idx].record_failure(&config);
775 inner.update_metrics();
776 }
777 async_pool_error!(slot = idx, error = ?_e, "reconnection failed");
778 }
779 }
780 }
781
782 async fn mark_success(&self, idx: usize) {
783 let mut inner = self.inner.lock().await;
784 inner.slots[idx].record_success();
785 inner.update_metrics();
786 }
787
788 pub async fn shutdown(&self) {
790 async_pool_info!("initiating pool shutdown");
791 self.shutdown_flag.store(true, Ordering::Relaxed);
792
793 let mut inner = self.inner.lock().await;
794 inner.phase = PoolPhase::Shutdown;
795 async_pool_info!("entering shutdown phase");
796
797 for slot in &mut inner.slots {
798 slot.state = ConnState::Unhealthy {
799 since: Instant::now(),
800 };
801 }
802
803 for node_arc in &inner.nodes {
804 let mut node_guard = node_arc.lock().await;
805 *node_guard = None;
806 }
807
808 inner.update_metrics();
809 self.notify.notify_waiters();
810
811 async_pool_info!("pool shutdown complete");
812 }
813
814 pub fn is_shutdown(&self) -> bool {
816 self.shutdown_flag.load(Ordering::Relaxed)
817 }
818
819 pub async fn stats(&self) -> AsyncPoolStats {
821 let inner = self.inner.lock().await;
822 AsyncPoolStats {
823 size: self.config.size,
824 healthy: inner.healthy_count(),
825 total_requests: self.counter.load(Ordering::Relaxed),
826 is_shutdown: self.shutdown_flag.load(Ordering::Relaxed),
827 }
828 }
829}
830
831#[derive(Debug, Clone)]
833pub struct AsyncPoolStats {
834 pub size: usize,
835 pub healthy: usize,
836 pub total_requests: usize,
837 pub is_shutdown: bool,
838}
839
840pub struct AsyncPooledConn<'a> {
846 pool: &'a AsyncPool,
847 idx: usize,
848 node: Arc<Mutex<Option<AsyncNode>>>,
849 #[allow(dead_code)]
850 config: AsyncPoolConfig,
851 #[allow(dead_code)]
852 host_idx: usize,
853}
854
855impl fmt::Debug for AsyncPooledConn<'_> {
856 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
857 f.debug_struct("AsyncPooledConn")
858 .field("idx", &self.idx)
859 .field("host_idx", &self.host_idx)
860 .finish()
861 }
862}
863
864impl AsyncPooledConn<'_> {
865 pub async fn query(&self, sql: &str) -> Result<VoltTable, VoltError> {
867 async_pool_trace!(slot = self.idx, sql = sql, "executing query");
868
869 let mut node_guard = self.node.lock().await;
870 let node = node_guard
871 .as_mut()
872 .ok_or(VoltError::ConnectionNotAvailable)?;
873 let mut rx = node.query(sql).await?;
874 drop(node_guard);
875 let result = async_block_for_result(&mut rx).await;
876 self.handle_result(&result).await;
877 result
878 }
880
881 pub async fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
882 async_pool_trace!(slot = self.idx, "listing procedures");
883
884 let mut node_guard = self.node.lock().await;
885 let node = node_guard
886 .as_mut()
887 .ok_or(VoltError::ConnectionNotAvailable)?;
888 let mut rx = node.list_procedures().await?;
889 drop(node_guard);
890 let result = async_block_for_result(&mut rx).await;
891 self.handle_result(&result).await;
892 result
893 }
894
895 pub async fn call_sp(
896 &mut self,
897 proc: &str,
898 params: Vec<&dyn Value>,
899 ) -> Result<VoltTable, VoltError> {
900 async_pool_trace!(
901 slot = self.idx,
902 procedure = proc,
903 "calling stored procedure"
904 );
905 let mut node_guard = self.node.lock().await;
906 let node = node_guard
907 .as_mut()
908 .ok_or(VoltError::ConnectionNotAvailable)?;
909 let mut rx = node.call_sp(proc, params).await?;
910 drop(node_guard);
911
912 let result = async_block_for_result(&mut rx).await;
913 self.handle_result(&result).await;
914 result
915 }
916
917 pub async fn upload_jar(&self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
919 async_pool_trace!(slot = self.idx, size = bs.len(), "uploading jar");
920
921 let mut node_guard = self.node.lock().await;
922 let node = node_guard
923 .as_mut()
924 .ok_or(VoltError::ConnectionNotAvailable)?;
925
926 let mut rx = node.upload_jar(bs).await?;
927 drop(node_guard);
928
929 let result = async_block_for_result(&mut rx).await;
930 self.handle_result(&result).await;
931 result
932 }
933
934 async fn handle_result<T>(&self, result: &Result<T, VoltError>) {
936 match result {
937 Ok(_) => {
938 self.pool.mark_success(self.idx).await;
939 }
940 Err(e) if e.is_connection_fatal() => {
941 async_pool_error!(slot = self.idx, error = ?e, "fatal connection error detected");
942 {
943 let mut guard = self.node.lock().await;
944 *guard = None;
945 }
946 self.pool.report_fatal_error(self.idx).await;
947 }
948 Err(_) => {
949 }
951 }
952 }
953
954 pub fn slot_index(&self) -> usize {
956 self.idx
957 }
958}
959
960#[cfg(test)]
965mod tests {
966 use super::*;
967
968 #[test]
969 fn test_conn_state_is_healthy() {
970 assert!(ConnState::Healthy.is_healthy());
971 assert!(
972 !ConnState::Unhealthy {
973 since: Instant::now()
974 }
975 .is_healthy()
976 );
977 assert!(!ConnState::Reconnecting.is_healthy());
978 }
979
980 #[test]
981 fn test_circuit_should_allow_closed() {
982 let circuit = Circuit::Closed;
983 assert!(circuit.should_allow());
984 }
985
986 #[test]
987 fn test_circuit_should_allow_open_not_expired() {
988 let circuit = Circuit::Open {
989 until: Instant::now() + Duration::from_secs(60),
990 };
991 assert!(!circuit.should_allow());
992 }
993
994 #[test]
995 fn test_circuit_should_allow_open_expired() {
996 let circuit = Circuit::Open {
997 until: Instant::now() - Duration::from_secs(1),
998 };
999 assert!(circuit.should_allow());
1000 }
1001
1002 #[test]
1003 fn test_circuit_should_allow_half_open() {
1004 let circuit = Circuit::HalfOpen;
1005 assert!(circuit.should_allow());
1006 }
1007
1008 #[test]
1009 fn test_circuit_transitions() {
1010 let mut circuit = Circuit::Closed;
1011
1012 circuit.open(Duration::from_secs(30));
1013 assert!(matches!(circuit, Circuit::Open { .. }));
1014
1015 circuit.half_open();
1016 assert!(matches!(circuit, Circuit::HalfOpen));
1017
1018 circuit.close();
1019 assert!(matches!(circuit, Circuit::Closed));
1020 }
1021
1022 #[test]
1023 fn test_pool_config_builder() {
1024 let config = AsyncPoolConfig::new()
1025 .size(20)
1026 .reconnect_backoff(Duration::from_secs(10))
1027 .circuit_open_duration(Duration::from_secs(60))
1028 .exhaustion_policy(ExhaustionPolicy::Block {
1029 timeout: Duration::from_secs(5),
1030 })
1031 .validation_mode(ValidationMode::BestEffort)
1032 .circuit_failure_threshold(5)
1033 .shutdown_timeout(Duration::from_secs(60));
1034
1035 assert_eq!(config.size, 20);
1036 assert_eq!(config.reconnect_backoff, Duration::from_secs(10));
1037 assert_eq!(config.circuit_open_duration, Duration::from_secs(60));
1038 assert_eq!(
1039 config.exhaustion_policy,
1040 ExhaustionPolicy::Block {
1041 timeout: Duration::from_secs(5)
1042 }
1043 );
1044 assert_eq!(config.validation_mode, ValidationMode::BestEffort);
1045 assert_eq!(config.circuit_failure_threshold, 5);
1046 assert_eq!(config.shutdown_timeout, Duration::from_secs(60));
1047 }
1048
1049 #[test]
1050 fn test_pool_config_default() {
1051 let config = AsyncPoolConfig::default();
1052 assert_eq!(config.size, 10);
1053 assert_eq!(config.exhaustion_policy, ExhaustionPolicy::FailFast);
1054 assert_eq!(config.validation_mode, ValidationMode::FailFast);
1055 }
1056
1057 #[test]
1058 fn test_slot_meta_is_available() {
1059 let slot = SlotMeta::new_healthy(0);
1060 assert!(slot.is_available());
1061
1062 let slot = SlotMeta::new_unhealthy(0);
1063 assert!(!slot.is_available());
1064
1065 let mut slot = SlotMeta::new_healthy(0);
1066 slot.circuit = Circuit::Open {
1067 until: Instant::now() + Duration::from_secs(60),
1068 };
1069 assert!(!slot.is_available());
1070 }
1071
1072 #[test]
1073 fn test_slot_meta_needs_reconnect() {
1074 let mut slot = SlotMeta::new_unhealthy(0);
1075 let backoff = Duration::from_millis(100);
1076
1077 assert!(slot.needs_reconnect(backoff));
1078
1079 slot.state = ConnState::Reconnecting;
1080 assert!(!slot.needs_reconnect(backoff));
1081
1082 slot.state = ConnState::Healthy;
1083 assert!(!slot.needs_reconnect(backoff));
1084 }
1085
1086 #[test]
1087 fn test_slot_meta_record_success() {
1088 let mut slot = SlotMeta::new_unhealthy(0);
1089 slot.consecutive_failures = 5;
1090
1091 slot.record_success();
1092
1093 assert_eq!(slot.consecutive_failures, 0);
1094 assert!(matches!(slot.state, ConnState::Healthy));
1095 assert!(matches!(slot.circuit, Circuit::Closed));
1096 }
1097
1098 #[test]
1099 fn test_slot_meta_record_failure_opens_circuit() {
1100 let mut slot = SlotMeta::new_healthy(0);
1101 slot.consecutive_failures = 2;
1102
1103 let config = AsyncPoolConfig::default().circuit_failure_threshold(3);
1104
1105 slot.record_failure(&config);
1106
1107 assert_eq!(slot.consecutive_failures, 3);
1108 assert!(matches!(slot.circuit, Circuit::Open { .. }));
1109 }
1110
1111 #[test]
1112 fn test_async_pool_error_display() {
1113 assert_eq!(
1114 format!("{}", AsyncPoolError::PoolShutdown),
1115 "Pool is shutting down"
1116 );
1117 assert_eq!(
1118 format!("{}", AsyncPoolError::CircuitOpen),
1119 "Circuit breaker is open"
1120 );
1121 assert_eq!(
1122 format!("{}", AsyncPoolError::PoolExhausted),
1123 "Pool exhausted, no healthy connections"
1124 );
1125 assert_eq!(
1126 format!("{}", AsyncPoolError::Timeout),
1127 "Timed out waiting for connection"
1128 );
1129 }
1130
1131 #[test]
1132 fn test_pool_phase() {
1133 assert_eq!(PoolPhase::Running, PoolPhase::Running);
1134 assert_ne!(PoolPhase::Running, PoolPhase::Shutdown);
1135 }
1136}