1#![cfg(feature = "tokio")]
18
19use std::fmt;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use tokio::sync::{Mutex, Notify};
24use tokio::time::timeout;
25
26use crate::async_node::{AsyncNode, async_block_for_result_with_timeout};
27use crate::{NodeOpt, Opts, Value, VoltError, VoltTable};
28
29#[cfg(feature = "tracing")]
34macro_rules! async_pool_trace {
35 ($($arg:tt)*) => { tracing::trace!($($arg)*) };
36}
37#[cfg(not(feature = "tracing"))]
38macro_rules! async_pool_trace {
39 ($($arg:tt)*) => {};
40}
41
42#[cfg(feature = "tracing")]
43macro_rules! async_pool_debug {
44 ($($arg:tt)*) => { tracing::debug!($($arg)*) };
45}
46#[cfg(not(feature = "tracing"))]
47macro_rules! async_pool_debug {
48 ($($arg:tt)*) => {};
49}
50
51#[cfg(feature = "tracing")]
52macro_rules! async_pool_info {
53 ($($arg:tt)*) => { tracing::info!($($arg)*) };
54}
55#[cfg(not(feature = "tracing"))]
56macro_rules! async_pool_info {
57 ($($arg:tt)*) => {};
58}
59
60#[cfg(feature = "tracing")]
61macro_rules! async_pool_warn {
62 ($($arg:tt)*) => { tracing::warn!($($arg)*) };
63}
64#[cfg(not(feature = "tracing"))]
65macro_rules! async_pool_warn {
66 ($($arg:tt)*) => {};
67}
68
69#[cfg(feature = "tracing")]
70macro_rules! async_pool_error {
71 ($($arg:tt)*) => { tracing::error!($($arg)*) };
72}
73#[cfg(not(feature = "tracing"))]
74macro_rules! async_pool_error {
75 ($($arg:tt)*) => {};
76}
77
78#[cfg(feature = "metrics")]
83mod async_pool_metrics {
84 use metrics::{counter, gauge};
85
86 pub fn set_connections_total(count: usize) {
87 gauge!("voltdb_async_pool_connections_total").set(count as f64);
88 }
89
90 pub fn set_connections_healthy(count: usize) {
91 gauge!("voltdb_async_pool_connections_healthy").set(count as f64);
92 }
93
94 pub fn inc_reconnect_total() {
95 counter!("voltdb_async_pool_reconnect_total").increment(1);
96 }
97
98 pub fn inc_circuit_open_total() {
99 counter!("voltdb_async_pool_circuit_open_total").increment(1);
100 }
101
102 pub fn inc_requests_failed_total() {
103 counter!("voltdb_async_pool_requests_failed_total").increment(1);
104 }
105
106 pub fn inc_requests_total() {
107 counter!("voltdb_async_pool_requests_total").increment(1);
108 }
109}
110
111#[cfg(not(feature = "metrics"))]
112mod async_pool_metrics {
113 pub fn set_connections_total(_count: usize) {}
114 pub fn set_connections_healthy(_count: usize) {}
115 pub fn inc_reconnect_total() {}
116 pub fn inc_circuit_open_total() {}
117 pub fn inc_requests_failed_total() {}
118 pub fn inc_requests_total() {}
119}
120
121#[derive(Debug, Clone)]
127pub enum ConnState {
128 Healthy,
130 Unhealthy { since: Instant },
132 Reconnecting,
134}
135
136impl ConnState {
137 fn is_healthy(&self) -> bool {
138 matches!(self, ConnState::Healthy)
139 }
140
141 fn is_reconnecting(&self) -> bool {
142 matches!(self, ConnState::Reconnecting)
143 }
144}
145
146#[derive(Debug, Clone)]
152pub enum Circuit {
153 Closed,
155 Open { until: Instant },
157 HalfOpen,
159}
160
161impl Circuit {
162 fn should_allow(&self) -> bool {
164 match self {
165 Circuit::Closed => true,
166 Circuit::Open { until } => Instant::now() >= *until,
167 Circuit::HalfOpen => true,
168 }
169 }
170
171 fn open(&mut self, duration: Duration) {
173 *self = Circuit::Open {
174 until: Instant::now() + duration,
175 };
176 async_pool_metrics::inc_circuit_open_total();
177 async_pool_warn!("circuit breaker opened");
178 }
179
180 #[allow(dead_code)]
182 fn half_open(&mut self) {
183 *self = Circuit::HalfOpen;
184 async_pool_debug!("circuit breaker half-open");
185 }
186
187 fn close(&mut self) {
189 *self = Circuit::Closed;
190 async_pool_info!("circuit breaker closed");
191 }
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
200pub enum ExhaustionPolicy {
201 #[default]
203 FailFast,
204 Block { timeout: Duration },
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
210pub enum ValidationMode {
211 #[default]
213 FailFast,
214 BestEffort,
216}
217
218#[derive(Debug, Clone)]
220pub struct AsyncPoolConfig {
221 pub size: usize,
223 pub reconnect_backoff: Duration,
225 pub circuit_open_duration: Duration,
227 pub exhaustion_policy: ExhaustionPolicy,
229 pub validation_mode: ValidationMode,
231 pub circuit_failure_threshold: u32,
233 pub shutdown_timeout: Duration,
238 pub request_timeout: Duration,
241}
242
243impl Default for AsyncPoolConfig {
244 fn default() -> Self {
245 Self {
246 size: 10,
247 reconnect_backoff: Duration::from_secs(5),
248 circuit_open_duration: Duration::from_secs(30),
249 exhaustion_policy: ExhaustionPolicy::FailFast,
250 validation_mode: ValidationMode::FailFast,
251 circuit_failure_threshold: 3,
252 shutdown_timeout: Duration::from_secs(30),
253 request_timeout: Duration::from_secs(30),
254 }
255 }
256}
257
258impl AsyncPoolConfig {
259 pub fn new() -> Self {
260 Self::default()
261 }
262
263 pub fn size(mut self, size: usize) -> Self {
264 self.size = size;
265 self
266 }
267
268 pub fn reconnect_backoff(mut self, duration: Duration) -> Self {
269 self.reconnect_backoff = duration;
270 self
271 }
272
273 pub fn circuit_open_duration(mut self, duration: Duration) -> Self {
274 self.circuit_open_duration = duration;
275 self
276 }
277
278 pub fn exhaustion_policy(mut self, policy: ExhaustionPolicy) -> Self {
279 self.exhaustion_policy = policy;
280 self
281 }
282
283 pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
284 self.validation_mode = mode;
285 self
286 }
287
288 pub fn circuit_failure_threshold(mut self, threshold: u32) -> Self {
289 self.circuit_failure_threshold = threshold;
290 self
291 }
292
293 pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
294 self.shutdown_timeout = timeout;
295 self
296 }
297
298 pub fn request_timeout(mut self, duration: Duration) -> Self {
299 self.request_timeout = duration;
300 self
301 }
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq)]
310enum PoolPhase {
311 Running,
313 Shutdown,
315}
316
317#[derive(Debug)]
323struct SlotMeta {
324 state: ConnState,
325 circuit: Circuit,
326 consecutive_failures: u32,
327 last_reconnect_attempt: Option<Instant>,
328 host_idx: usize,
329}
330
331impl SlotMeta {
332 fn new_healthy(host_idx: usize) -> Self {
333 Self {
334 state: ConnState::Healthy,
335 circuit: Circuit::Closed,
336 consecutive_failures: 0,
337 last_reconnect_attempt: None,
338 host_idx,
339 }
340 }
341
342 fn new_unhealthy(host_idx: usize) -> Self {
343 Self {
344 state: ConnState::Unhealthy {
345 since: Instant::now(),
346 },
347 circuit: Circuit::Open {
348 until: Instant::now() + Duration::from_secs(5),
349 },
350 consecutive_failures: 1,
351 last_reconnect_attempt: None,
352 host_idx,
353 }
354 }
355
356 fn is_available(&self) -> bool {
358 self.state.is_healthy() && self.circuit.should_allow()
359 }
360
361 fn needs_reconnect(&self, backoff: Duration) -> bool {
363 if self.state.is_healthy() || self.state.is_reconnecting() {
364 return false;
365 }
366
367 match self.last_reconnect_attempt {
368 None => true,
369 Some(last) => Instant::now().duration_since(last) >= backoff,
370 }
371 }
372
373 fn record_success(&mut self) {
374 self.consecutive_failures = 0;
375 self.state = ConnState::Healthy;
376 self.circuit.close();
377 }
378
379 fn record_failure(&mut self, config: &AsyncPoolConfig) {
380 self.consecutive_failures += 1;
381 self.state = ConnState::Unhealthy {
382 since: Instant::now(),
383 };
384
385 if self.consecutive_failures >= config.circuit_failure_threshold {
386 self.circuit.open(config.circuit_open_duration);
387 }
388 }
389}
390
391struct AsyncInnerPool {
396 opts: Opts,
397 config: AsyncPoolConfig,
398 slots: Vec<SlotMeta>,
399 nodes: Vec<Arc<Mutex<Option<AsyncNode>>>>,
400 phase: PoolPhase,
401}
402
403impl fmt::Debug for AsyncInnerPool {
404 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405 f.debug_struct("AsyncInnerPool")
406 .field("config", &self.config)
407 .field("slots_count", &self.slots.len())
408 .field("phase", &self.phase)
409 .finish()
410 }
411}
412
413impl AsyncInnerPool {
414 fn node_opt(&self, host_idx: usize) -> Result<NodeOpt, VoltError> {
415 let ip_port = self
416 .opts
417 .0
418 .ip_ports
419 .get(host_idx)
420 .cloned()
421 .ok_or(VoltError::InvalidConfig)?;
422 Ok(NodeOpt {
423 ip_port,
424 pass: self.opts.0.pass.clone(),
425 user: self.opts.0.user.clone(),
426 connect_timeout: self.opts.0.connect_timeout,
427 read_timeout: self.opts.0.read_timeout,
428 })
429 }
430
431 async fn new(opts: Opts, config: AsyncPoolConfig) -> Result<Self, VoltError> {
432 let num_hosts = opts.0.ip_ports.len();
433 let mut inner = AsyncInnerPool {
434 opts,
435 config: config.clone(),
436 slots: Vec::with_capacity(config.size),
437 nodes: Vec::with_capacity(config.size),
438 phase: PoolPhase::Running,
439 };
440
441 for i in 0..config.size {
442 let host_idx = i % num_hosts;
443 let node_opt = inner.node_opt(host_idx)?;
444
445 async_pool_debug!(slot = i, host = host_idx, "creating connection");
446
447 match AsyncNode::new(node_opt).await {
448 Ok(node) => {
449 inner.slots.push(SlotMeta::new_healthy(host_idx));
450 inner.nodes.push(Arc::new(Mutex::new(Some(node))));
451 async_pool_info!(slot = i, "connection established");
452 }
453 Err(e) => match config.validation_mode {
454 ValidationMode::FailFast => {
455 async_pool_error!(slot = i, error = ?e, "connection failed, aborting pool creation");
456 return Err(e);
457 }
458 ValidationMode::BestEffort => {
459 async_pool_warn!(slot = i, error = ?e, "connection failed, marking unhealthy");
460 inner.slots.push(SlotMeta::new_unhealthy(host_idx));
461 inner.nodes.push(Arc::new(Mutex::new(None)));
462 }
463 },
464 }
465 }
466
467 inner.update_metrics();
468 async_pool_info!(
469 size = config.size,
470 healthy = inner.healthy_count(),
471 "async pool initialized"
472 );
473
474 Ok(inner)
475 }
476
477 fn healthy_count(&self) -> usize {
478 self.slots.iter().filter(|s| s.state.is_healthy()).count()
479 }
480
481 fn update_metrics(&self) {
482 async_pool_metrics::set_connections_total(self.slots.len());
483 async_pool_metrics::set_connections_healthy(self.healthy_count());
484 }
485}
486
487pub struct AsyncPool {
505 inner: Arc<Mutex<AsyncInnerPool>>,
506 notify: Arc<Notify>,
507 counter: AtomicUsize,
508 shutdown_flag: AtomicBool,
509 config: AsyncPoolConfig,
510}
511
512impl fmt::Debug for AsyncPool {
513 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
514 f.debug_struct("AsyncPool")
515 .field("counter", &self.counter.load(Ordering::Relaxed))
516 .field("shutdown", &self.shutdown_flag.load(Ordering::Relaxed))
517 .field("config", &self.config)
518 .finish()
519 }
520}
521
522impl AsyncPool {
523 pub async fn new<T: Into<Opts>>(opts: T) -> Result<AsyncPool, VoltError> {
525 AsyncPool::with_config(opts, AsyncPoolConfig::default()).await
526 }
527
528 pub async fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<AsyncPool, VoltError> {
530 AsyncPool::with_config(opts, AsyncPoolConfig::new().size(size)).await
531 }
532
533 pub async fn with_config<T: Into<Opts>>(
535 opts: T,
536 config: AsyncPoolConfig,
537 ) -> Result<AsyncPool, VoltError> {
538 let inner = AsyncInnerPool::new(opts.into(), config.clone()).await?;
539 Ok(AsyncPool {
540 inner: Arc::new(Mutex::new(inner)),
541 notify: Arc::new(Notify::new()),
542 counter: AtomicUsize::new(0),
543 shutdown_flag: AtomicBool::new(false),
544 config,
545 })
546 }
547
548 pub async fn get_conn(&self) -> Result<AsyncPooledConn<'_>, VoltError> {
550 if self.shutdown_flag.load(Ordering::Relaxed) {
551 async_pool_warn!("get_conn called on shutdown pool");
552 return Err(VoltError::PoolShutdown);
553 }
554
555 async_pool_metrics::inc_requests_total();
556
557 let preferred_idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.config.size;
558
559 match self.config.exhaustion_policy {
560 ExhaustionPolicy::FailFast => self.get_conn_failfast(preferred_idx).await,
561 ExhaustionPolicy::Block {
562 timeout: wait_timeout,
563 } => self.get_conn_blocking(preferred_idx, wait_timeout).await,
564 }
565 }
566
567 async fn get_conn_failfast(
568 &self,
569 preferred_idx: usize,
570 ) -> Result<AsyncPooledConn<'_>, VoltError> {
571 let inner = self.inner.lock().await;
572
573 if inner.phase != PoolPhase::Running {
574 return Err(VoltError::PoolShutdown);
575 }
576
577 if inner.slots[preferred_idx].is_available() {
579 return self.checkout_slot(&inner, preferred_idx).await;
580 }
581
582 for i in 1..self.config.size {
584 let idx = (preferred_idx + i) % self.config.size;
585 if inner.slots[idx].is_available() {
586 async_pool_debug!(
587 preferred = preferred_idx,
588 actual = idx,
589 "using alternate connection"
590 );
591 return self.checkout_slot(&inner, idx).await;
592 }
593 }
594
595 async_pool_warn!("no healthy connections available");
596 async_pool_metrics::inc_requests_failed_total();
597 Err(VoltError::PoolExhausted)
598 }
599
600 async fn get_conn_blocking(
601 &self,
602 preferred_idx: usize,
603 wait_timeout: Duration,
604 ) -> Result<AsyncPooledConn<'_>, VoltError> {
605 let deadline = Instant::now() + wait_timeout;
606
607 loop {
608 let inner = self.inner.lock().await;
609
610 if inner.phase != PoolPhase::Running {
611 return Err(VoltError::PoolShutdown);
612 }
613
614 if inner.slots[preferred_idx].is_available() {
616 return self.checkout_slot(&inner, preferred_idx).await;
617 }
618
619 for i in 1..self.config.size {
621 let idx = (preferred_idx + i) % self.config.size;
622 if inner.slots[idx].is_available() {
623 return self.checkout_slot(&inner, idx).await;
624 }
625 }
626
627 let remaining = deadline.saturating_duration_since(Instant::now());
629 if remaining.is_zero() {
630 async_pool_warn!(timeout = ?wait_timeout, "connection wait timed out");
631 async_pool_metrics::inc_requests_failed_total();
632 return Err(VoltError::Timeout);
633 }
634
635 async_pool_trace!("waiting for available connection");
636 drop(inner);
637
638 let _ = timeout(remaining, self.notify.notified()).await;
640 }
641 }
642
643 async fn checkout_slot(
644 &self,
645 inner: &AsyncInnerPool,
646 idx: usize,
647 ) -> Result<AsyncPooledConn<'_>, VoltError> {
648 let node = Arc::clone(&inner.nodes[idx]);
649 let config = inner.config.clone();
650 let host_idx = inner.slots[idx].host_idx;
651
652 async_pool_trace!(slot = idx, "connection acquired");
653
654 Ok(AsyncPooledConn {
655 pool: self,
656 idx,
657 node,
658 config,
659 host_idx,
660 })
661 }
662
663 async fn report_fatal_error(&self, idx: usize) {
665 #[allow(clippy::type_complexity)]
666 let reconnect_info: Option<(
667 Arc<Mutex<Option<AsyncNode>>>,
668 NodeOpt,
669 AsyncPoolConfig,
670 )>;
671
672 {
673 let mut inner = self.inner.lock().await;
674 let config = inner.config.clone();
675 inner.slots[idx].record_failure(&config);
676 async_pool_debug!(slot = idx, "fatal error reported");
677
678 inner.update_metrics();
679 self.notify.notify_waiters();
680
681 if !self.shutdown_flag.load(Ordering::Relaxed) {
682 let backoff = inner.config.reconnect_backoff;
683 if inner.slots[idx].needs_reconnect(backoff) {
684 let node_arc = Arc::clone(&inner.nodes[idx]);
685 let host_idx = inner.slots[idx].host_idx;
686 if let Ok(node_opt) = inner.node_opt(host_idx) {
687 inner.slots[idx].state = ConnState::Reconnecting;
688 inner.slots[idx].last_reconnect_attempt = Some(Instant::now());
689
690 reconnect_info = Some((node_arc, node_opt, config));
691 } else {
692 reconnect_info = None;
693 }
694 } else {
695 reconnect_info = None;
696 }
697 } else {
698 reconnect_info = None;
699 }
700 }
701
702 if let Some((node_arc, node_opt, config)) = reconnect_info {
703 self.do_reconnect(idx, node_arc, node_opt, config).await;
704 }
705 }
706
707 async fn do_reconnect(
708 &self,
709 idx: usize,
710 node_arc: Arc<Mutex<Option<AsyncNode>>>,
711 node_opt: NodeOpt,
712 config: AsyncPoolConfig,
713 ) {
714 async_pool_info!(slot = idx, "attempting reconnection");
715 async_pool_metrics::inc_reconnect_total();
716
717 match AsyncNode::new(node_opt).await {
718 Ok(new_node) => {
719 {
720 let mut node_guard = node_arc.lock().await;
721 *node_guard = Some(new_node);
722 }
723
724 {
725 let mut inner = self.inner.lock().await;
726 inner.slots[idx].record_success();
727 inner.update_metrics();
728 }
729
730 self.notify.notify_waiters();
731 async_pool_info!(slot = idx, "reconnection successful");
732 }
733 Err(_e) => {
734 {
735 let mut inner = self.inner.lock().await;
736 inner.slots[idx].record_failure(&config);
737 inner.update_metrics();
738 }
739 async_pool_error!(slot = idx, error = ?_e, "reconnection failed");
740 }
741 }
742 }
743
744 async fn mark_success(&self, idx: usize) {
745 let mut inner = self.inner.lock().await;
746 inner.slots[idx].record_success();
747 inner.update_metrics();
748 }
749
750 pub async fn shutdown(&self) {
753 async_pool_info!("initiating pool shutdown");
754 self.shutdown_flag.store(true, Ordering::Relaxed);
755
756 let nodes: Vec<Arc<Mutex<Option<AsyncNode>>>>;
758 {
759 let mut inner = self.inner.lock().await;
760 inner.phase = PoolPhase::Shutdown;
761 async_pool_info!("entering shutdown phase");
762
763 for slot in &mut inner.slots {
764 slot.state = ConnState::Unhealthy {
765 since: Instant::now(),
766 };
767 }
768
769 nodes = inner.nodes.iter().map(Arc::clone).collect();
770 } for node_arc in &nodes {
774 let mut node_guard = node_arc.lock().await;
775 if let Some(ref node) = *node_guard {
776 let _ = node.shutdown().await;
777 }
778 *node_guard = None;
779 }
780
781 {
782 let inner = self.inner.lock().await;
783 inner.update_metrics();
784 }
785 self.notify.notify_waiters();
786
787 async_pool_info!("pool shutdown complete");
788 }
789
790 pub fn is_shutdown(&self) -> bool {
792 self.shutdown_flag.load(Ordering::Relaxed)
793 }
794
795 pub async fn stats(&self) -> AsyncPoolStats {
797 let inner = self.inner.lock().await;
798 AsyncPoolStats {
799 size: self.config.size,
800 healthy: inner.healthy_count(),
801 total_requests: self.counter.load(Ordering::Relaxed),
802 is_shutdown: self.shutdown_flag.load(Ordering::Relaxed),
803 }
804 }
805}
806
807#[derive(Debug, Clone)]
809pub struct AsyncPoolStats {
810 pub size: usize,
811 pub healthy: usize,
812 pub total_requests: usize,
813 pub is_shutdown: bool,
814}
815
816pub struct AsyncPooledConn<'a> {
822 pool: &'a AsyncPool,
823 idx: usize,
824 node: Arc<Mutex<Option<AsyncNode>>>,
825 #[allow(dead_code)]
826 config: AsyncPoolConfig,
827 #[allow(dead_code)]
828 host_idx: usize,
829}
830
831impl fmt::Debug for AsyncPooledConn<'_> {
832 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
833 f.debug_struct("AsyncPooledConn")
834 .field("idx", &self.idx)
835 .field("host_idx", &self.host_idx)
836 .finish()
837 }
838}
839
840impl AsyncPooledConn<'_> {
841 pub async fn query(&self, sql: &str) -> Result<VoltTable, VoltError> {
843 async_pool_trace!(slot = self.idx, sql = sql, "executing query");
844
845 let mut node_guard = self.node.lock().await;
846 let node = node_guard
847 .as_mut()
848 .ok_or(VoltError::ConnectionNotAvailable)?;
849 let mut rx = node.query(sql).await?;
850 drop(node_guard);
851 let result =
852 async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
853 self.handle_result(&result).await;
854 result
855 }
856
857 pub async fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
858 async_pool_trace!(slot = self.idx, "listing procedures");
859
860 let mut node_guard = self.node.lock().await;
861 let node = node_guard
862 .as_mut()
863 .ok_or(VoltError::ConnectionNotAvailable)?;
864 let mut rx = node.list_procedures().await?;
865 drop(node_guard);
866 let result =
867 async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
868 self.handle_result(&result).await;
869 result
870 }
871
872 pub async fn call_sp(
873 &mut self,
874 proc: &str,
875 params: Vec<&dyn Value>,
876 ) -> Result<VoltTable, VoltError> {
877 async_pool_trace!(
878 slot = self.idx,
879 procedure = proc,
880 "calling stored procedure"
881 );
882 let mut node_guard = self.node.lock().await;
883 let node = node_guard
884 .as_mut()
885 .ok_or(VoltError::ConnectionNotAvailable)?;
886 let mut rx = node.call_sp(proc, params).await?;
887 drop(node_guard);
888
889 let result =
890 async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
891 self.handle_result(&result).await;
892 result
893 }
894
895 pub async fn upload_jar(&self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
897 async_pool_trace!(slot = self.idx, size = bs.len(), "uploading jar");
898
899 let mut node_guard = self.node.lock().await;
900 let node = node_guard
901 .as_mut()
902 .ok_or(VoltError::ConnectionNotAvailable)?;
903
904 let mut rx = node.upload_jar(bs).await?;
905 drop(node_guard);
906
907 let result =
908 async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
909 self.handle_result(&result).await;
910 result
911 }
912
913 async fn handle_result<T>(&self, result: &Result<T, VoltError>) {
915 match result {
916 Ok(_) => {
917 self.pool.mark_success(self.idx).await;
918 }
919 Err(e) if e.is_connection_fatal() => {
920 async_pool_error!(slot = self.idx, error = ?e, "fatal connection error detected");
921 {
922 let mut guard = self.node.lock().await;
923 *guard = None;
924 }
925 self.pool.report_fatal_error(self.idx).await;
926 }
927 Err(_) => {
928 }
930 }
931 }
932
933 pub fn slot_index(&self) -> usize {
935 self.idx
936 }
937}
938
939#[cfg(test)]
944mod tests {
945 use super::*;
946
947 #[test]
948 fn test_conn_state_is_healthy() {
949 assert!(ConnState::Healthy.is_healthy());
950 assert!(
951 !ConnState::Unhealthy {
952 since: Instant::now()
953 }
954 .is_healthy()
955 );
956 assert!(!ConnState::Reconnecting.is_healthy());
957 }
958
959 #[test]
960 fn test_circuit_should_allow_closed() {
961 let circuit = Circuit::Closed;
962 assert!(circuit.should_allow());
963 }
964
965 #[test]
966 fn test_circuit_should_allow_open_not_expired() {
967 let circuit = Circuit::Open {
968 until: Instant::now() + Duration::from_secs(60),
969 };
970 assert!(!circuit.should_allow());
971 }
972
973 #[test]
974 fn test_circuit_should_allow_open_expired() {
975 let circuit = Circuit::Open {
976 until: Instant::now() - Duration::from_secs(1),
977 };
978 assert!(circuit.should_allow());
979 }
980
981 #[test]
982 fn test_circuit_should_allow_half_open() {
983 let circuit = Circuit::HalfOpen;
984 assert!(circuit.should_allow());
985 }
986
987 #[test]
988 fn test_circuit_transitions() {
989 let mut circuit = Circuit::Closed;
990
991 circuit.open(Duration::from_secs(30));
992 assert!(matches!(circuit, Circuit::Open { .. }));
993
994 circuit.half_open();
995 assert!(matches!(circuit, Circuit::HalfOpen));
996
997 circuit.close();
998 assert!(matches!(circuit, Circuit::Closed));
999 }
1000
1001 #[test]
1002 fn test_pool_config_builder() {
1003 let config = AsyncPoolConfig::new()
1004 .size(20)
1005 .reconnect_backoff(Duration::from_secs(10))
1006 .circuit_open_duration(Duration::from_secs(60))
1007 .exhaustion_policy(ExhaustionPolicy::Block {
1008 timeout: Duration::from_secs(5),
1009 })
1010 .validation_mode(ValidationMode::BestEffort)
1011 .circuit_failure_threshold(5)
1012 .shutdown_timeout(Duration::from_secs(60))
1013 .request_timeout(Duration::from_secs(15));
1014
1015 assert_eq!(config.size, 20);
1016 assert_eq!(config.reconnect_backoff, Duration::from_secs(10));
1017 assert_eq!(config.circuit_open_duration, Duration::from_secs(60));
1018 assert_eq!(
1019 config.exhaustion_policy,
1020 ExhaustionPolicy::Block {
1021 timeout: Duration::from_secs(5)
1022 }
1023 );
1024 assert_eq!(config.validation_mode, ValidationMode::BestEffort);
1025 assert_eq!(config.circuit_failure_threshold, 5);
1026 assert_eq!(config.shutdown_timeout, Duration::from_secs(60));
1027 assert_eq!(config.request_timeout, Duration::from_secs(15));
1028 }
1029
1030 #[test]
1031 fn test_pool_config_default() {
1032 let config = AsyncPoolConfig::default();
1033 assert_eq!(config.size, 10);
1034 assert_eq!(config.exhaustion_policy, ExhaustionPolicy::FailFast);
1035 assert_eq!(config.validation_mode, ValidationMode::FailFast);
1036 assert_eq!(config.request_timeout, Duration::from_secs(30));
1037 }
1038
1039 #[test]
1040 fn test_slot_meta_is_available() {
1041 let slot = SlotMeta::new_healthy(0);
1042 assert!(slot.is_available());
1043
1044 let slot = SlotMeta::new_unhealthy(0);
1045 assert!(!slot.is_available());
1046
1047 let mut slot = SlotMeta::new_healthy(0);
1048 slot.circuit = Circuit::Open {
1049 until: Instant::now() + Duration::from_secs(60),
1050 };
1051 assert!(!slot.is_available());
1052 }
1053
1054 #[test]
1055 fn test_slot_meta_needs_reconnect() {
1056 let mut slot = SlotMeta::new_unhealthy(0);
1057 let backoff = Duration::from_millis(100);
1058
1059 assert!(slot.needs_reconnect(backoff));
1060
1061 slot.state = ConnState::Reconnecting;
1062 assert!(!slot.needs_reconnect(backoff));
1063
1064 slot.state = ConnState::Healthy;
1065 assert!(!slot.needs_reconnect(backoff));
1066 }
1067
1068 #[test]
1069 fn test_slot_meta_record_success() {
1070 let mut slot = SlotMeta::new_unhealthy(0);
1071 slot.consecutive_failures = 5;
1072
1073 slot.record_success();
1074
1075 assert_eq!(slot.consecutive_failures, 0);
1076 assert!(matches!(slot.state, ConnState::Healthy));
1077 assert!(matches!(slot.circuit, Circuit::Closed));
1078 }
1079
1080 #[test]
1081 fn test_slot_meta_record_failure_opens_circuit() {
1082 let mut slot = SlotMeta::new_healthy(0);
1083 slot.consecutive_failures = 2;
1084
1085 let config = AsyncPoolConfig::default().circuit_failure_threshold(3);
1086
1087 slot.record_failure(&config);
1088
1089 assert_eq!(slot.consecutive_failures, 3);
1090 assert!(matches!(slot.circuit, Circuit::Open { .. }));
1091 }
1092
1093 #[test]
1094 fn test_pool_phase() {
1095 assert_eq!(PoolPhase::Running, PoolPhase::Running);
1096 assert_ne!(PoolPhase::Running, PoolPhase::Shutdown);
1097 }
1098}