Skip to main content

voltdb_client_rust/
async_pool.rs

1//! Production-ready async connection pool for VoltDB.
2//!
3//! # Features
4//! - Thread-safe with fine-grained locking
5//! - Connection state machine (Healthy, Unhealthy, Reconnecting)
6//! - Per-connection circuit breaker
7//! - Configurable exhaustion policy (FailFast or Block with async waiting)
8//! - Graceful shutdown with drain mode
9//! - Optional structured logging (`tracing` feature)
10//! - Optional metrics (`metrics` feature)
11//!
12//! # Design
13//! - Pool lock only guards metadata (state, circuit breaker)
14//! - Each AsyncNode has its own lock for I/O operations
15//! - Network I/O never holds the pool lock
16
17#![cfg(feature = "tokio")]
18
19use std::fmt;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use tokio::sync::{Mutex, Notify};
24use tokio::time::timeout;
25
26use crate::async_node::{AsyncNode, async_block_for_result};
27use crate::{NodeOpt, Opts, Value, VoltError, VoltTable};
28
29// ============================================================================
30// Logging macros - use tracing if available, otherwise no-op
31// ============================================================================
32
33#[cfg(feature = "tracing")]
34macro_rules! async_pool_trace {
35    ($($arg:tt)*) => { tracing::trace!($($arg)*) };
36}
37#[cfg(not(feature = "tracing"))]
38macro_rules! async_pool_trace {
39    ($($arg:tt)*) => {};
40}
41
42#[cfg(feature = "tracing")]
43macro_rules! async_pool_debug {
44    ($($arg:tt)*) => { tracing::debug!($($arg)*) };
45}
46#[cfg(not(feature = "tracing"))]
47macro_rules! async_pool_debug {
48    ($($arg:tt)*) => {};
49}
50
51#[cfg(feature = "tracing")]
52macro_rules! async_pool_info {
53    ($($arg:tt)*) => { tracing::info!($($arg)*) };
54}
55#[cfg(not(feature = "tracing"))]
56macro_rules! async_pool_info {
57    ($($arg:tt)*) => {};
58}
59
60#[cfg(feature = "tracing")]
61macro_rules! async_pool_warn {
62    ($($arg:tt)*) => { tracing::warn!($($arg)*) };
63}
64#[cfg(not(feature = "tracing"))]
65macro_rules! async_pool_warn {
66    ($($arg:tt)*) => {};
67}
68
69#[cfg(feature = "tracing")]
70macro_rules! async_pool_error {
71    ($($arg:tt)*) => { tracing::error!($($arg)*) };
72}
73#[cfg(not(feature = "tracing"))]
74macro_rules! async_pool_error {
75    ($($arg:tt)*) => {};
76}
77
78// ============================================================================
79// Metrics - use metrics crate if available, otherwise no-op
80// ============================================================================
81
82#[cfg(feature = "metrics")]
83mod async_pool_metrics {
84    use metrics::{counter, gauge};
85
86    pub fn set_connections_total(count: usize) {
87        gauge!("voltdb_async_pool_connections_total").set(count as f64);
88    }
89
90    pub fn set_connections_healthy(count: usize) {
91        gauge!("voltdb_async_pool_connections_healthy").set(count as f64);
92    }
93
94    pub fn inc_reconnect_total() {
95        counter!("voltdb_async_pool_reconnect_total").increment(1);
96    }
97
98    pub fn inc_circuit_open_total() {
99        counter!("voltdb_async_pool_circuit_open_total").increment(1);
100    }
101
102    pub fn inc_requests_failed_total() {
103        counter!("voltdb_async_pool_requests_failed_total").increment(1);
104    }
105
106    pub fn inc_requests_total() {
107        counter!("voltdb_async_pool_requests_total").increment(1);
108    }
109}
110
111#[cfg(not(feature = "metrics"))]
112mod async_pool_metrics {
113    pub fn set_connections_total(_count: usize) {}
114    pub fn set_connections_healthy(_count: usize) {}
115    pub fn inc_reconnect_total() {}
116    pub fn inc_circuit_open_total() {}
117    pub fn inc_requests_failed_total() {}
118    pub fn inc_requests_total() {}
119}
120
121// ============================================================================
122// Pool-specific errors
123// ============================================================================
124
125/// Pool-specific error conditions.
126#[derive(Debug, Clone, PartialEq, Eq)]
127pub enum AsyncPoolError {
128    /// Pool is shutting down, no new connections allowed
129    PoolShutdown,
130    /// Circuit breaker is open for the requested connection
131    CircuitOpen,
132    /// All connections are busy or unhealthy
133    PoolExhausted,
134    /// Timed out waiting for a connection
135    Timeout,
136    /// Internal lock was poisoned
137    LockPoisoned,
138}
139
140impl fmt::Display for AsyncPoolError {
141    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142        match self {
143            AsyncPoolError::PoolShutdown => write!(f, "Pool is shutting down"),
144            AsyncPoolError::CircuitOpen => write!(f, "Circuit breaker is open"),
145            AsyncPoolError::PoolExhausted => write!(f, "Pool exhausted, no healthy connections"),
146            AsyncPoolError::Timeout => write!(f, "Timed out waiting for connection"),
147            AsyncPoolError::LockPoisoned => write!(f, "Internal lock poisoned"),
148        }
149    }
150}
151
152impl std::error::Error for AsyncPoolError {}
153
154impl From<AsyncPoolError> for VoltError {
155    fn from(e: AsyncPoolError) -> Self {
156        match e {
157            AsyncPoolError::PoolShutdown => VoltError::ConnectionNotAvailable,
158            AsyncPoolError::CircuitOpen => VoltError::ConnectionNotAvailable,
159            AsyncPoolError::PoolExhausted => VoltError::ConnectionNotAvailable,
160            AsyncPoolError::Timeout => VoltError::Timeout,
161            AsyncPoolError::LockPoisoned => {
162                VoltError::PoisonError("Pool lock poisoned".to_string())
163            }
164        }
165    }
166}
167
168// ============================================================================
169// Connection State Machine
170// ============================================================================
171
172/// Connection health state.
173#[derive(Debug, Clone)]
174pub enum ConnState {
175    /// Connection is working normally
176    Healthy,
177    /// Connection has failed, tracking when it became unhealthy
178    Unhealthy { since: Instant },
179    /// Connection is being replaced (reconnection in progress)
180    Reconnecting,
181}
182
183impl ConnState {
184    fn is_healthy(&self) -> bool {
185        matches!(self, ConnState::Healthy)
186    }
187
188    fn is_reconnecting(&self) -> bool {
189        matches!(self, ConnState::Reconnecting)
190    }
191}
192
193// ============================================================================
194// Circuit Breaker
195// ============================================================================
196
197/// Per-connection circuit breaker state.
198#[derive(Debug, Clone)]
199pub enum Circuit {
200    /// Normal operation - requests flow through
201    Closed,
202    /// Circuit is open - fail fast until `until` time
203    Open { until: Instant },
204    /// Allow one probe request to test recovery
205    HalfOpen,
206}
207
208impl Circuit {
209    /// Check if request should be allowed through
210    fn should_allow(&self) -> bool {
211        match self {
212            Circuit::Closed => true,
213            Circuit::Open { until } => Instant::now() >= *until,
214            Circuit::HalfOpen => true,
215        }
216    }
217
218    /// Transition to Open state
219    fn open(&mut self, duration: Duration) {
220        *self = Circuit::Open {
221            until: Instant::now() + duration,
222        };
223        async_pool_metrics::inc_circuit_open_total();
224        async_pool_warn!("circuit breaker opened");
225    }
226
227    /// Transition to HalfOpen state (for probing)
228    #[allow(dead_code)]
229    fn half_open(&mut self) {
230        *self = Circuit::HalfOpen;
231        async_pool_debug!("circuit breaker half-open");
232    }
233
234    /// Transition to Closed state (healthy)
235    fn close(&mut self) {
236        *self = Circuit::Closed;
237        async_pool_info!("circuit breaker closed");
238    }
239}
240
241// ============================================================================
242// Configuration
243// ============================================================================
244
245/// What to do when all connections are busy.
246#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
247pub enum ExhaustionPolicy {
248    /// Return error immediately
249    #[default]
250    FailFast,
251    /// Block up to the specified duration waiting for a connection
252    Block { timeout: Duration },
253}
254
255/// How to handle startup connection failures.
256#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
257pub enum ValidationMode {
258    /// Panic if any connection fails during pool creation
259    #[default]
260    FailFast,
261    /// Mark failed connections as unhealthy, continue startup
262    BestEffort,
263}
264
265/// Async pool configuration.
266#[derive(Debug, Clone)]
267pub struct AsyncPoolConfig {
268    /// Number of connections in the pool
269    pub size: usize,
270    /// Backoff duration before retry reconnection
271    pub reconnect_backoff: Duration,
272    /// How long circuit breaker stays open
273    pub circuit_open_duration: Duration,
274    /// What to do when pool is exhausted
275    pub exhaustion_policy: ExhaustionPolicy,
276    /// How to handle startup failures
277    pub validation_mode: ValidationMode,
278    /// Number of consecutive failures before opening circuit
279    pub circuit_failure_threshold: u32,
280    /// Timeout for graceful shutdown drain.
281    ///
282    /// **Note:** This field is currently reserved for future use.
283    /// The current shutdown implementation closes connections immediately.
284    pub shutdown_timeout: Duration,
285}
286
287impl Default for AsyncPoolConfig {
288    fn default() -> Self {
289        Self {
290            size: 10,
291            reconnect_backoff: Duration::from_secs(5),
292            circuit_open_duration: Duration::from_secs(30),
293            exhaustion_policy: ExhaustionPolicy::FailFast,
294            validation_mode: ValidationMode::FailFast,
295            circuit_failure_threshold: 3,
296            shutdown_timeout: Duration::from_secs(30),
297        }
298    }
299}
300
301impl AsyncPoolConfig {
302    pub fn new() -> Self {
303        Self::default()
304    }
305
306    pub fn size(mut self, size: usize) -> Self {
307        self.size = size;
308        self
309    }
310
311    pub fn reconnect_backoff(mut self, duration: Duration) -> Self {
312        self.reconnect_backoff = duration;
313        self
314    }
315
316    pub fn circuit_open_duration(mut self, duration: Duration) -> Self {
317        self.circuit_open_duration = duration;
318        self
319    }
320
321    pub fn exhaustion_policy(mut self, policy: ExhaustionPolicy) -> Self {
322        self.exhaustion_policy = policy;
323        self
324    }
325
326    pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
327        self.validation_mode = mode;
328        self
329    }
330
331    pub fn circuit_failure_threshold(mut self, threshold: u32) -> Self {
332        self.circuit_failure_threshold = threshold;
333        self
334    }
335
336    pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
337        self.shutdown_timeout = timeout;
338        self
339    }
340}
341
342// ============================================================================
343// Pool Phase (Lifecycle)
344// ============================================================================
345
346/// Pool lifecycle phase for graceful shutdown.
347#[derive(Debug, Clone, Copy, PartialEq, Eq)]
348enum PoolPhase {
349    /// Normal operation
350    Running,
351    /// Fully shut down
352    Shutdown,
353}
354
355// ============================================================================
356// Connection Slot (Metadata only - AsyncNode is separate)
357// ============================================================================
358
359/// Metadata for a connection slot. The actual AsyncNode is stored separately.
360#[derive(Debug)]
361struct SlotMeta {
362    state: ConnState,
363    circuit: Circuit,
364    consecutive_failures: u32,
365    last_reconnect_attempt: Option<Instant>,
366    host_idx: usize,
367}
368
369impl SlotMeta {
370    fn new_healthy(host_idx: usize) -> Self {
371        Self {
372            state: ConnState::Healthy,
373            circuit: Circuit::Closed,
374            consecutive_failures: 0,
375            last_reconnect_attempt: None,
376            host_idx,
377        }
378    }
379
380    fn new_unhealthy(host_idx: usize) -> Self {
381        Self {
382            state: ConnState::Unhealthy {
383                since: Instant::now(),
384            },
385            circuit: Circuit::Open {
386                until: Instant::now() + Duration::from_secs(5),
387            },
388            consecutive_failures: 1,
389            last_reconnect_attempt: None,
390            host_idx,
391        }
392    }
393
394    /// Check if this slot can be used (healthy and circuit allows)
395    fn is_available(&self) -> bool {
396        self.state.is_healthy() && self.circuit.should_allow()
397    }
398
399    /// Check if this slot needs reconnection and can attempt it
400    fn needs_reconnect(&self, backoff: Duration) -> bool {
401        if self.state.is_healthy() || self.state.is_reconnecting() {
402            return false;
403        }
404
405        match self.last_reconnect_attempt {
406            None => true,
407            Some(last) => Instant::now().duration_since(last) >= backoff,
408        }
409    }
410
411    fn record_success(&mut self) {
412        self.consecutive_failures = 0;
413        self.state = ConnState::Healthy;
414        self.circuit.close();
415    }
416
417    fn record_failure(&mut self, config: &AsyncPoolConfig) {
418        self.consecutive_failures += 1;
419        self.state = ConnState::Unhealthy {
420            since: Instant::now(),
421        };
422
423        if self.consecutive_failures >= config.circuit_failure_threshold {
424            self.circuit.open(config.circuit_open_duration);
425        }
426    }
427}
428
429// ============================================================================
430// Inner Pool (protected by Mutex)
431// ============================================================================
432
433struct AsyncInnerPool {
434    opts: Opts,
435    config: AsyncPoolConfig,
436    slots: Vec<SlotMeta>,
437    nodes: Vec<Arc<Mutex<Option<AsyncNode>>>>,
438    phase: PoolPhase,
439}
440
441impl fmt::Debug for AsyncInnerPool {
442    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
443        f.debug_struct("AsyncInnerPool")
444            .field("config", &self.config)
445            .field("slots_count", &self.slots.len())
446            .field("phase", &self.phase)
447            .finish()
448    }
449}
450
451impl AsyncInnerPool {
452    fn node_opt(&self, host_idx: usize) -> Result<NodeOpt, VoltError> {
453        let ip_port = self
454            .opts
455            .0
456            .ip_ports
457            .get(host_idx)
458            .cloned()
459            .ok_or(VoltError::InvalidConfig)?;
460        Ok(NodeOpt {
461            ip_port,
462            pass: self.opts.0.pass.clone(),
463            user: self.opts.0.user.clone(),
464            connect_timeout: self.opts.0.connect_timeout,
465            read_timeout: self.opts.0.read_timeout,
466        })
467    }
468
469    async fn new(opts: Opts, config: AsyncPoolConfig) -> Result<Self, VoltError> {
470        let num_hosts = opts.0.ip_ports.len();
471        let mut inner = AsyncInnerPool {
472            opts,
473            config: config.clone(),
474            slots: Vec::with_capacity(config.size),
475            nodes: Vec::with_capacity(config.size),
476            phase: PoolPhase::Running,
477        };
478
479        for i in 0..config.size {
480            let host_idx = i % num_hosts;
481            let node_opt = inner.node_opt(host_idx)?;
482
483            async_pool_debug!(slot = i, host = host_idx, "creating connection");
484
485            match AsyncNode::new(node_opt).await {
486                Ok(node) => {
487                    inner.slots.push(SlotMeta::new_healthy(host_idx));
488                    inner.nodes.push(Arc::new(Mutex::new(Some(node))));
489                    async_pool_info!(slot = i, "connection established");
490                }
491                Err(e) => match config.validation_mode {
492                    ValidationMode::FailFast => {
493                        async_pool_error!(slot = i, error = ?e, "connection failed, aborting pool creation");
494                        return Err(e);
495                    }
496                    ValidationMode::BestEffort => {
497                        async_pool_warn!(slot = i, error = ?e, "connection failed, marking unhealthy");
498                        inner.slots.push(SlotMeta::new_unhealthy(host_idx));
499                        inner.nodes.push(Arc::new(Mutex::new(None)));
500                    }
501                },
502            }
503        }
504
505        inner.update_metrics();
506        async_pool_info!(
507            size = config.size,
508            healthy = inner.healthy_count(),
509            "async pool initialized"
510        );
511
512        Ok(inner)
513    }
514
515    fn healthy_count(&self) -> usize {
516        self.slots.iter().filter(|s| s.state.is_healthy()).count()
517    }
518
519    fn update_metrics(&self) {
520        async_pool_metrics::set_connections_total(self.slots.len());
521        async_pool_metrics::set_connections_healthy(self.healthy_count());
522    }
523}
524
525// ============================================================================
526// Pool (Thread-safe)
527// ============================================================================
528
529/// Thread-safe async connection pool for VoltDB.
530///
531/// # Example
532/// ```ignore
533/// use voltdb_client_rust::{AsyncPool, AsyncPoolConfig, Opts, IpPort};
534///
535/// let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
536/// let config = AsyncPoolConfig::new().size(5);
537/// let pool = AsyncPool::with_config(Opts::new(hosts), config).await?;
538///
539/// let conn = pool.get_conn().await?;
540/// let table = conn.query("SELECT * FROM foo").await?;
541/// ```
542pub struct AsyncPool {
543    inner: Arc<Mutex<AsyncInnerPool>>,
544    notify: Arc<Notify>,
545    counter: AtomicUsize,
546    shutdown_flag: AtomicBool,
547    config: AsyncPoolConfig,
548}
549
550impl fmt::Debug for AsyncPool {
551    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
552        f.debug_struct("AsyncPool")
553            .field("counter", &self.counter.load(Ordering::Relaxed))
554            .field("shutdown", &self.shutdown_flag.load(Ordering::Relaxed))
555            .field("config", &self.config)
556            .finish()
557    }
558}
559
560impl AsyncPool {
561    /// Create a new pool with default configuration (10 connections).
562    pub async fn new<T: Into<Opts>>(opts: T) -> Result<AsyncPool, VoltError> {
563        AsyncPool::with_config(opts, AsyncPoolConfig::default()).await
564    }
565
566    /// Create a new pool with custom size (convenience method).
567    pub async fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<AsyncPool, VoltError> {
568        AsyncPool::with_config(opts, AsyncPoolConfig::new().size(size)).await
569    }
570
571    /// Create a new pool with full configuration.
572    pub async fn with_config<T: Into<Opts>>(
573        opts: T,
574        config: AsyncPoolConfig,
575    ) -> Result<AsyncPool, VoltError> {
576        let inner = AsyncInnerPool::new(opts.into(), config.clone()).await?;
577        Ok(AsyncPool {
578            inner: Arc::new(Mutex::new(inner)),
579            notify: Arc::new(Notify::new()),
580            counter: AtomicUsize::new(0),
581            shutdown_flag: AtomicBool::new(false),
582            config,
583        })
584    }
585
586    /// Get a connection from the pool.
587    pub async fn get_conn(&self) -> Result<AsyncPooledConn<'_>, VoltError> {
588        if self.shutdown_flag.load(Ordering::Relaxed) {
589            async_pool_warn!("get_conn called on shutdown pool");
590            return Err(AsyncPoolError::PoolShutdown.into());
591        }
592
593        async_pool_metrics::inc_requests_total();
594
595        let preferred_idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.config.size;
596
597        match self.config.exhaustion_policy {
598            ExhaustionPolicy::FailFast => self.get_conn_failfast(preferred_idx).await,
599            ExhaustionPolicy::Block {
600                timeout: wait_timeout,
601            } => self.get_conn_blocking(preferred_idx, wait_timeout).await,
602        }
603    }
604
605    async fn get_conn_failfast(
606        &self,
607        preferred_idx: usize,
608    ) -> Result<AsyncPooledConn<'_>, VoltError> {
609        let inner = self.inner.lock().await;
610
611        if inner.phase != PoolPhase::Running {
612            return Err(AsyncPoolError::PoolShutdown.into());
613        }
614
615        // Try preferred index first
616        if inner.slots[preferred_idx].is_available() {
617            return self.checkout_slot(&inner, preferred_idx).await;
618        }
619
620        // Try to find any usable connection
621        for i in 1..self.config.size {
622            let idx = (preferred_idx + i) % self.config.size;
623            if inner.slots[idx].is_available() {
624                async_pool_debug!(
625                    preferred = preferred_idx,
626                    actual = idx,
627                    "using alternate connection"
628                );
629                return self.checkout_slot(&inner, idx).await;
630            }
631        }
632
633        async_pool_warn!("no healthy connections available");
634        async_pool_metrics::inc_requests_failed_total();
635        Err(AsyncPoolError::PoolExhausted.into())
636    }
637
638    async fn get_conn_blocking(
639        &self,
640        preferred_idx: usize,
641        wait_timeout: Duration,
642    ) -> Result<AsyncPooledConn<'_>, VoltError> {
643        let deadline = Instant::now() + wait_timeout;
644
645        loop {
646            let inner = self.inner.lock().await;
647
648            if inner.phase != PoolPhase::Running {
649                return Err(AsyncPoolError::PoolShutdown.into());
650            }
651
652            // Try preferred index first
653            if inner.slots[preferred_idx].is_available() {
654                return self.checkout_slot(&inner, preferred_idx).await;
655            }
656
657            // Try any available connection
658            for i in 1..self.config.size {
659                let idx = (preferred_idx + i) % self.config.size;
660                if inner.slots[idx].is_available() {
661                    return self.checkout_slot(&inner, idx).await;
662                }
663            }
664
665            // No connection available, wait
666            let remaining = deadline.saturating_duration_since(Instant::now());
667            if remaining.is_zero() {
668                async_pool_warn!(timeout = ?wait_timeout, "connection wait timed out");
669                async_pool_metrics::inc_requests_failed_total();
670                return Err(AsyncPoolError::Timeout.into());
671            }
672
673            async_pool_trace!("waiting for available connection");
674            drop(inner);
675
676            // Wait for notification or timeout
677            let _ = timeout(remaining, self.notify.notified()).await;
678        }
679    }
680
681    async fn checkout_slot(
682        &self,
683        inner: &AsyncInnerPool,
684        idx: usize,
685    ) -> Result<AsyncPooledConn<'_>, VoltError> {
686        let node = Arc::clone(&inner.nodes[idx]);
687        let config = inner.config.clone();
688        let host_idx = inner.slots[idx].host_idx;
689
690        async_pool_trace!(slot = idx, "connection acquired");
691
692        Ok(AsyncPooledConn {
693            pool: self,
694            idx,
695            node,
696            config,
697            host_idx,
698        })
699    }
700
701    /// Report a fatal error on a connection slot
702    async fn report_fatal_error(&self, idx: usize) {
703        #[allow(clippy::type_complexity)]
704        let reconnect_info: Option<(
705            Arc<Mutex<Option<AsyncNode>>>,
706            NodeOpt,
707            AsyncPoolConfig,
708        )>;
709
710        {
711            let mut inner = self.inner.lock().await;
712            let config = inner.config.clone();
713            inner.slots[idx].record_failure(&config);
714            async_pool_debug!(slot = idx, "fatal error reported");
715
716            inner.update_metrics();
717            self.notify.notify_waiters();
718
719            if !self.shutdown_flag.load(Ordering::Relaxed) {
720                let backoff = inner.config.reconnect_backoff;
721                if inner.slots[idx].needs_reconnect(backoff) {
722                    let node_arc = Arc::clone(&inner.nodes[idx]);
723                    let host_idx = inner.slots[idx].host_idx;
724                    if let Ok(node_opt) = inner.node_opt(host_idx) {
725                        inner.slots[idx].state = ConnState::Reconnecting;
726                        inner.slots[idx].last_reconnect_attempt = Some(Instant::now());
727
728                        reconnect_info = Some((node_arc, node_opt, config));
729                    } else {
730                        reconnect_info = None;
731                    }
732                } else {
733                    reconnect_info = None;
734                }
735            } else {
736                reconnect_info = None;
737            }
738        }
739
740        if let Some((node_arc, node_opt, config)) = reconnect_info {
741            self.do_reconnect(idx, node_arc, node_opt, config).await;
742        }
743    }
744
745    async fn do_reconnect(
746        &self,
747        idx: usize,
748        node_arc: Arc<Mutex<Option<AsyncNode>>>,
749        node_opt: NodeOpt,
750        config: AsyncPoolConfig,
751    ) {
752        async_pool_info!(slot = idx, "attempting reconnection");
753        async_pool_metrics::inc_reconnect_total();
754
755        match AsyncNode::new(node_opt).await {
756            Ok(new_node) => {
757                {
758                    let mut node_guard = node_arc.lock().await;
759                    *node_guard = Some(new_node);
760                }
761
762                {
763                    let mut inner = self.inner.lock().await;
764                    inner.slots[idx].record_success();
765                    inner.update_metrics();
766                }
767
768                self.notify.notify_waiters();
769                async_pool_info!(slot = idx, "reconnection successful");
770            }
771            Err(_e) => {
772                {
773                    let mut inner = self.inner.lock().await;
774                    inner.slots[idx].record_failure(&config);
775                    inner.update_metrics();
776                }
777                async_pool_error!(slot = idx, error = ?_e, "reconnection failed");
778            }
779        }
780    }
781
782    async fn mark_success(&self, idx: usize) {
783        let mut inner = self.inner.lock().await;
784        inner.slots[idx].record_success();
785        inner.update_metrics();
786    }
787
788    /// Initiate graceful shutdown.
789    pub async fn shutdown(&self) {
790        async_pool_info!("initiating pool shutdown");
791        self.shutdown_flag.store(true, Ordering::Relaxed);
792
793        let mut inner = self.inner.lock().await;
794        inner.phase = PoolPhase::Shutdown;
795        async_pool_info!("entering shutdown phase");
796
797        for slot in &mut inner.slots {
798            slot.state = ConnState::Unhealthy {
799                since: Instant::now(),
800            };
801        }
802
803        for node_arc in &inner.nodes {
804            let mut node_guard = node_arc.lock().await;
805            *node_guard = None;
806        }
807
808        inner.update_metrics();
809        self.notify.notify_waiters();
810
811        async_pool_info!("pool shutdown complete");
812    }
813
814    /// Check if pool is shut down.
815    pub fn is_shutdown(&self) -> bool {
816        self.shutdown_flag.load(Ordering::Relaxed)
817    }
818
819    /// Get current pool statistics.
820    pub async fn stats(&self) -> AsyncPoolStats {
821        let inner = self.inner.lock().await;
822        AsyncPoolStats {
823            size: self.config.size,
824            healthy: inner.healthy_count(),
825            total_requests: self.counter.load(Ordering::Relaxed),
826            is_shutdown: self.shutdown_flag.load(Ordering::Relaxed),
827        }
828    }
829}
830
831/// Pool statistics snapshot.
832#[derive(Debug, Clone)]
833pub struct AsyncPoolStats {
834    pub size: usize,
835    pub healthy: usize,
836    pub total_requests: usize,
837    pub is_shutdown: bool,
838}
839
840// ============================================================================
841// Pooled Connection
842// ============================================================================
843
844/// A connection handle from the async pool.
845pub struct AsyncPooledConn<'a> {
846    pool: &'a AsyncPool,
847    idx: usize,
848    node: Arc<Mutex<Option<AsyncNode>>>,
849    #[allow(dead_code)]
850    config: AsyncPoolConfig,
851    #[allow(dead_code)]
852    host_idx: usize,
853}
854
855impl fmt::Debug for AsyncPooledConn<'_> {
856    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
857        f.debug_struct("AsyncPooledConn")
858            .field("idx", &self.idx)
859            .field("host_idx", &self.host_idx)
860            .finish()
861    }
862}
863
864impl AsyncPooledConn<'_> {
865    /// Execute a SQL query.
866    pub async fn query(&self, sql: &str) -> Result<VoltTable, VoltError> {
867        async_pool_trace!(slot = self.idx, sql = sql, "executing query");
868
869        let mut node_guard = self.node.lock().await;
870        let node = node_guard
871            .as_mut()
872            .ok_or(VoltError::ConnectionNotAvailable)?;
873        let mut rx = node.query(sql).await?;
874        drop(node_guard);
875        let result = async_block_for_result(&mut rx).await;
876        self.handle_result(&result).await;
877        result
878        // result
879    }
880
881    pub async fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
882        async_pool_trace!(slot = self.idx, "listing procedures");
883
884        let mut node_guard = self.node.lock().await;
885        let node = node_guard
886            .as_mut()
887            .ok_or(VoltError::ConnectionNotAvailable)?;
888        let mut rx = node.list_procedures().await?;
889        drop(node_guard);
890        let result = async_block_for_result(&mut rx).await;
891        self.handle_result(&result).await;
892        result
893    }
894
895    pub async fn call_sp(
896        &mut self,
897        proc: &str,
898        params: Vec<&dyn Value>,
899    ) -> Result<VoltTable, VoltError> {
900        async_pool_trace!(
901            slot = self.idx,
902            procedure = proc,
903            "calling stored procedure"
904        );
905        let mut node_guard = self.node.lock().await;
906        let node = node_guard
907            .as_mut()
908            .ok_or(VoltError::ConnectionNotAvailable)?;
909        let mut rx = node.call_sp(proc, params).await?;
910        drop(node_guard);
911
912        let result = async_block_for_result(&mut rx).await;
913        self.handle_result(&result).await;
914        result
915    }
916
917    /// Upload a JAR file.
918    pub async fn upload_jar(&self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
919        async_pool_trace!(slot = self.idx, size = bs.len(), "uploading jar");
920
921        let mut node_guard = self.node.lock().await;
922        let node = node_guard
923            .as_mut()
924            .ok_or(VoltError::ConnectionNotAvailable)?;
925
926        let mut rx = node.upload_jar(bs).await?;
927        drop(node_guard);
928
929        let result = async_block_for_result(&mut rx).await;
930        self.handle_result(&result).await;
931        result
932    }
933
934    /// Handle operation result - update state and trigger reconnection if needed
935    async fn handle_result<T>(&self, result: &Result<T, VoltError>) {
936        match result {
937            Ok(_) => {
938                self.pool.mark_success(self.idx).await;
939            }
940            Err(e) if e.is_connection_fatal() => {
941                async_pool_error!(slot = self.idx, error = ?e, "fatal connection error detected");
942                {
943                    let mut guard = self.node.lock().await;
944                    *guard = None;
945                }
946                self.pool.report_fatal_error(self.idx).await;
947            }
948            Err(_) => {
949                // Non-fatal error, no action needed
950            }
951        }
952    }
953
954    /// Get the slot index of this connection (for debugging).
955    pub fn slot_index(&self) -> usize {
956        self.idx
957    }
958}
959
960// ============================================================================
961// Tests
962// ============================================================================
963
964#[cfg(test)]
965mod tests {
966    use super::*;
967
968    #[test]
969    fn test_conn_state_is_healthy() {
970        assert!(ConnState::Healthy.is_healthy());
971        assert!(
972            !ConnState::Unhealthy {
973                since: Instant::now()
974            }
975            .is_healthy()
976        );
977        assert!(!ConnState::Reconnecting.is_healthy());
978    }
979
980    #[test]
981    fn test_circuit_should_allow_closed() {
982        let circuit = Circuit::Closed;
983        assert!(circuit.should_allow());
984    }
985
986    #[test]
987    fn test_circuit_should_allow_open_not_expired() {
988        let circuit = Circuit::Open {
989            until: Instant::now() + Duration::from_secs(60),
990        };
991        assert!(!circuit.should_allow());
992    }
993
994    #[test]
995    fn test_circuit_should_allow_open_expired() {
996        let circuit = Circuit::Open {
997            until: Instant::now() - Duration::from_secs(1),
998        };
999        assert!(circuit.should_allow());
1000    }
1001
1002    #[test]
1003    fn test_circuit_should_allow_half_open() {
1004        let circuit = Circuit::HalfOpen;
1005        assert!(circuit.should_allow());
1006    }
1007
1008    #[test]
1009    fn test_circuit_transitions() {
1010        let mut circuit = Circuit::Closed;
1011
1012        circuit.open(Duration::from_secs(30));
1013        assert!(matches!(circuit, Circuit::Open { .. }));
1014
1015        circuit.half_open();
1016        assert!(matches!(circuit, Circuit::HalfOpen));
1017
1018        circuit.close();
1019        assert!(matches!(circuit, Circuit::Closed));
1020    }
1021
1022    #[test]
1023    fn test_pool_config_builder() {
1024        let config = AsyncPoolConfig::new()
1025            .size(20)
1026            .reconnect_backoff(Duration::from_secs(10))
1027            .circuit_open_duration(Duration::from_secs(60))
1028            .exhaustion_policy(ExhaustionPolicy::Block {
1029                timeout: Duration::from_secs(5),
1030            })
1031            .validation_mode(ValidationMode::BestEffort)
1032            .circuit_failure_threshold(5)
1033            .shutdown_timeout(Duration::from_secs(60));
1034
1035        assert_eq!(config.size, 20);
1036        assert_eq!(config.reconnect_backoff, Duration::from_secs(10));
1037        assert_eq!(config.circuit_open_duration, Duration::from_secs(60));
1038        assert_eq!(
1039            config.exhaustion_policy,
1040            ExhaustionPolicy::Block {
1041                timeout: Duration::from_secs(5)
1042            }
1043        );
1044        assert_eq!(config.validation_mode, ValidationMode::BestEffort);
1045        assert_eq!(config.circuit_failure_threshold, 5);
1046        assert_eq!(config.shutdown_timeout, Duration::from_secs(60));
1047    }
1048
1049    #[test]
1050    fn test_pool_config_default() {
1051        let config = AsyncPoolConfig::default();
1052        assert_eq!(config.size, 10);
1053        assert_eq!(config.exhaustion_policy, ExhaustionPolicy::FailFast);
1054        assert_eq!(config.validation_mode, ValidationMode::FailFast);
1055    }
1056
1057    #[test]
1058    fn test_slot_meta_is_available() {
1059        let slot = SlotMeta::new_healthy(0);
1060        assert!(slot.is_available());
1061
1062        let slot = SlotMeta::new_unhealthy(0);
1063        assert!(!slot.is_available());
1064
1065        let mut slot = SlotMeta::new_healthy(0);
1066        slot.circuit = Circuit::Open {
1067            until: Instant::now() + Duration::from_secs(60),
1068        };
1069        assert!(!slot.is_available());
1070    }
1071
1072    #[test]
1073    fn test_slot_meta_needs_reconnect() {
1074        let mut slot = SlotMeta::new_unhealthy(0);
1075        let backoff = Duration::from_millis(100);
1076
1077        assert!(slot.needs_reconnect(backoff));
1078
1079        slot.state = ConnState::Reconnecting;
1080        assert!(!slot.needs_reconnect(backoff));
1081
1082        slot.state = ConnState::Healthy;
1083        assert!(!slot.needs_reconnect(backoff));
1084    }
1085
1086    #[test]
1087    fn test_slot_meta_record_success() {
1088        let mut slot = SlotMeta::new_unhealthy(0);
1089        slot.consecutive_failures = 5;
1090
1091        slot.record_success();
1092
1093        assert_eq!(slot.consecutive_failures, 0);
1094        assert!(matches!(slot.state, ConnState::Healthy));
1095        assert!(matches!(slot.circuit, Circuit::Closed));
1096    }
1097
1098    #[test]
1099    fn test_slot_meta_record_failure_opens_circuit() {
1100        let mut slot = SlotMeta::new_healthy(0);
1101        slot.consecutive_failures = 2;
1102
1103        let config = AsyncPoolConfig::default().circuit_failure_threshold(3);
1104
1105        slot.record_failure(&config);
1106
1107        assert_eq!(slot.consecutive_failures, 3);
1108        assert!(matches!(slot.circuit, Circuit::Open { .. }));
1109    }
1110
1111    #[test]
1112    fn test_async_pool_error_display() {
1113        assert_eq!(
1114            format!("{}", AsyncPoolError::PoolShutdown),
1115            "Pool is shutting down"
1116        );
1117        assert_eq!(
1118            format!("{}", AsyncPoolError::CircuitOpen),
1119            "Circuit breaker is open"
1120        );
1121        assert_eq!(
1122            format!("{}", AsyncPoolError::PoolExhausted),
1123            "Pool exhausted, no healthy connections"
1124        );
1125        assert_eq!(
1126            format!("{}", AsyncPoolError::Timeout),
1127            "Timed out waiting for connection"
1128        );
1129    }
1130
1131    #[test]
1132    fn test_pool_phase() {
1133        assert_eq!(PoolPhase::Running, PoolPhase::Running);
1134        assert_ne!(PoolPhase::Running, PoolPhase::Shutdown);
1135    }
1136}