Skip to main content

voltdb_client_rust/
async_pool.rs

1//! Production-ready async connection pool for VoltDB.
2//!
3//! # Features
4//! - Thread-safe with fine-grained locking
5//! - Connection state machine (Healthy, Unhealthy, Reconnecting)
6//! - Per-connection circuit breaker
7//! - Configurable exhaustion policy (FailFast or Block with async waiting)
8//! - Graceful shutdown with drain mode
9//! - Optional structured logging (`tracing` feature)
10//! - Optional metrics (`metrics` feature)
11//!
12//! # Design
13//! - Pool lock only guards metadata (state, circuit breaker)
14//! - Each AsyncNode has its own lock for I/O operations
15//! - Network I/O never holds the pool lock
16
17#![cfg(feature = "tokio")]
18
19use std::fmt;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use tokio::sync::{Mutex, Notify};
24use tokio::time::timeout;
25
26use crate::async_node::{AsyncNode, async_block_for_result_with_timeout};
27use crate::{NodeOpt, Opts, Value, VoltError, VoltTable};
28
29// ============================================================================
30// Logging macros - use tracing if available, otherwise no-op
31// ============================================================================
32
33#[cfg(feature = "tracing")]
34macro_rules! async_pool_trace {
35    ($($arg:tt)*) => { tracing::trace!($($arg)*) };
36}
37#[cfg(not(feature = "tracing"))]
38macro_rules! async_pool_trace {
39    ($($arg:tt)*) => {};
40}
41
42#[cfg(feature = "tracing")]
43macro_rules! async_pool_debug {
44    ($($arg:tt)*) => { tracing::debug!($($arg)*) };
45}
46#[cfg(not(feature = "tracing"))]
47macro_rules! async_pool_debug {
48    ($($arg:tt)*) => {};
49}
50
51#[cfg(feature = "tracing")]
52macro_rules! async_pool_info {
53    ($($arg:tt)*) => { tracing::info!($($arg)*) };
54}
55#[cfg(not(feature = "tracing"))]
56macro_rules! async_pool_info {
57    ($($arg:tt)*) => {};
58}
59
60#[cfg(feature = "tracing")]
61macro_rules! async_pool_warn {
62    ($($arg:tt)*) => { tracing::warn!($($arg)*) };
63}
64#[cfg(not(feature = "tracing"))]
65macro_rules! async_pool_warn {
66    ($($arg:tt)*) => {};
67}
68
69#[cfg(feature = "tracing")]
70macro_rules! async_pool_error {
71    ($($arg:tt)*) => { tracing::error!($($arg)*) };
72}
73#[cfg(not(feature = "tracing"))]
74macro_rules! async_pool_error {
75    ($($arg:tt)*) => {};
76}
77
78// ============================================================================
79// Metrics - use metrics crate if available, otherwise no-op
80// ============================================================================
81
82#[cfg(feature = "metrics")]
83mod async_pool_metrics {
84    use metrics::{counter, gauge};
85
86    pub fn set_connections_total(count: usize) {
87        gauge!("voltdb_async_pool_connections_total").set(count as f64);
88    }
89
90    pub fn set_connections_healthy(count: usize) {
91        gauge!("voltdb_async_pool_connections_healthy").set(count as f64);
92    }
93
94    pub fn inc_reconnect_total() {
95        counter!("voltdb_async_pool_reconnect_total").increment(1);
96    }
97
98    pub fn inc_circuit_open_total() {
99        counter!("voltdb_async_pool_circuit_open_total").increment(1);
100    }
101
102    pub fn inc_requests_failed_total() {
103        counter!("voltdb_async_pool_requests_failed_total").increment(1);
104    }
105
106    pub fn inc_requests_total() {
107        counter!("voltdb_async_pool_requests_total").increment(1);
108    }
109}
110
111#[cfg(not(feature = "metrics"))]
112mod async_pool_metrics {
113    pub fn set_connections_total(_count: usize) {}
114    pub fn set_connections_healthy(_count: usize) {}
115    pub fn inc_reconnect_total() {}
116    pub fn inc_circuit_open_total() {}
117    pub fn inc_requests_failed_total() {}
118    pub fn inc_requests_total() {}
119}
120
121// ============================================================================
122// Connection State Machine
123// ============================================================================
124
125/// Connection health state.
126#[derive(Debug, Clone)]
127pub enum ConnState {
128    /// Connection is working normally
129    Healthy,
130    /// Connection has failed, tracking when it became unhealthy
131    Unhealthy { since: Instant },
132    /// Connection is being replaced (reconnection in progress)
133    Reconnecting,
134}
135
136impl ConnState {
137    fn is_healthy(&self) -> bool {
138        matches!(self, ConnState::Healthy)
139    }
140
141    fn is_reconnecting(&self) -> bool {
142        matches!(self, ConnState::Reconnecting)
143    }
144}
145
146// ============================================================================
147// Circuit Breaker
148// ============================================================================
149
150/// Per-connection circuit breaker state.
151#[derive(Debug, Clone)]
152pub enum Circuit {
153    /// Normal operation - requests flow through
154    Closed,
155    /// Circuit is open - fail fast until `until` time
156    Open { until: Instant },
157    /// Allow one probe request to test recovery
158    HalfOpen,
159}
160
161impl Circuit {
162    /// Check if request should be allowed through
163    fn should_allow(&self) -> bool {
164        match self {
165            Circuit::Closed => true,
166            Circuit::Open { until } => Instant::now() >= *until,
167            Circuit::HalfOpen => true,
168        }
169    }
170
171    /// Transition to Open state
172    fn open(&mut self, duration: Duration) {
173        *self = Circuit::Open {
174            until: Instant::now() + duration,
175        };
176        async_pool_metrics::inc_circuit_open_total();
177        async_pool_warn!("circuit breaker opened");
178    }
179
180    /// Transition to HalfOpen state (for probing)
181    #[allow(dead_code)]
182    fn half_open(&mut self) {
183        *self = Circuit::HalfOpen;
184        async_pool_debug!("circuit breaker half-open");
185    }
186
187    /// Transition to Closed state (healthy)
188    fn close(&mut self) {
189        *self = Circuit::Closed;
190        async_pool_info!("circuit breaker closed");
191    }
192}
193
194// ============================================================================
195// Configuration
196// ============================================================================
197
198/// What to do when all connections are busy.
199#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
200pub enum ExhaustionPolicy {
201    /// Return error immediately
202    #[default]
203    FailFast,
204    /// Block up to the specified duration waiting for a connection
205    Block { timeout: Duration },
206}
207
208/// How to handle startup connection failures.
209#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
210pub enum ValidationMode {
211    /// Panic if any connection fails during pool creation
212    #[default]
213    FailFast,
214    /// Mark failed connections as unhealthy, continue startup
215    BestEffort,
216}
217
218/// Async pool configuration.
219#[derive(Debug, Clone)]
220pub struct AsyncPoolConfig {
221    /// Number of connections in the pool
222    pub size: usize,
223    /// Backoff duration before retry reconnection
224    pub reconnect_backoff: Duration,
225    /// How long circuit breaker stays open
226    pub circuit_open_duration: Duration,
227    /// What to do when pool is exhausted
228    pub exhaustion_policy: ExhaustionPolicy,
229    /// How to handle startup failures
230    pub validation_mode: ValidationMode,
231    /// Number of consecutive failures before opening circuit
232    pub circuit_failure_threshold: u32,
233    /// Timeout for graceful shutdown drain.
234    ///
235    /// **Note:** This field is currently reserved for future use.
236    /// The current shutdown implementation closes connections immediately.
237    pub shutdown_timeout: Duration,
238    /// Timeout for individual requests (query, call_sp, etc.).
239    /// If a response is not received within this duration, the request fails with `VoltError::Timeout`.
240    pub request_timeout: Duration,
241}
242
243impl Default for AsyncPoolConfig {
244    fn default() -> Self {
245        Self {
246            size: 10,
247            reconnect_backoff: Duration::from_secs(5),
248            circuit_open_duration: Duration::from_secs(30),
249            exhaustion_policy: ExhaustionPolicy::FailFast,
250            validation_mode: ValidationMode::FailFast,
251            circuit_failure_threshold: 3,
252            shutdown_timeout: Duration::from_secs(30),
253            request_timeout: Duration::from_secs(30),
254        }
255    }
256}
257
258impl AsyncPoolConfig {
259    pub fn new() -> Self {
260        Self::default()
261    }
262
263    pub fn size(mut self, size: usize) -> Self {
264        self.size = size;
265        self
266    }
267
268    pub fn reconnect_backoff(mut self, duration: Duration) -> Self {
269        self.reconnect_backoff = duration;
270        self
271    }
272
273    pub fn circuit_open_duration(mut self, duration: Duration) -> Self {
274        self.circuit_open_duration = duration;
275        self
276    }
277
278    pub fn exhaustion_policy(mut self, policy: ExhaustionPolicy) -> Self {
279        self.exhaustion_policy = policy;
280        self
281    }
282
283    pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
284        self.validation_mode = mode;
285        self
286    }
287
288    pub fn circuit_failure_threshold(mut self, threshold: u32) -> Self {
289        self.circuit_failure_threshold = threshold;
290        self
291    }
292
293    pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
294        self.shutdown_timeout = timeout;
295        self
296    }
297
298    pub fn request_timeout(mut self, duration: Duration) -> Self {
299        self.request_timeout = duration;
300        self
301    }
302}
303
304// ============================================================================
305// Pool Phase (Lifecycle)
306// ============================================================================
307
308/// Pool lifecycle phase for graceful shutdown.
309#[derive(Debug, Clone, Copy, PartialEq, Eq)]
310enum PoolPhase {
311    /// Normal operation
312    Running,
313    /// Fully shut down
314    Shutdown,
315}
316
317// ============================================================================
318// Connection Slot (Metadata only - AsyncNode is separate)
319// ============================================================================
320
321/// Metadata for a connection slot. The actual AsyncNode is stored separately.
322#[derive(Debug)]
323struct SlotMeta {
324    state: ConnState,
325    circuit: Circuit,
326    consecutive_failures: u32,
327    last_reconnect_attempt: Option<Instant>,
328    host_idx: usize,
329}
330
331impl SlotMeta {
332    fn new_healthy(host_idx: usize) -> Self {
333        Self {
334            state: ConnState::Healthy,
335            circuit: Circuit::Closed,
336            consecutive_failures: 0,
337            last_reconnect_attempt: None,
338            host_idx,
339        }
340    }
341
342    fn new_unhealthy(host_idx: usize) -> Self {
343        Self {
344            state: ConnState::Unhealthy {
345                since: Instant::now(),
346            },
347            circuit: Circuit::Open {
348                until: Instant::now() + Duration::from_secs(5),
349            },
350            consecutive_failures: 1,
351            last_reconnect_attempt: None,
352            host_idx,
353        }
354    }
355
356    /// Check if this slot can be used (healthy and circuit allows)
357    fn is_available(&self) -> bool {
358        self.state.is_healthy() && self.circuit.should_allow()
359    }
360
361    /// Check if this slot needs reconnection and can attempt it
362    fn needs_reconnect(&self, backoff: Duration) -> bool {
363        if self.state.is_healthy() || self.state.is_reconnecting() {
364            return false;
365        }
366
367        match self.last_reconnect_attempt {
368            None => true,
369            Some(last) => Instant::now().duration_since(last) >= backoff,
370        }
371    }
372
373    fn record_success(&mut self) {
374        self.consecutive_failures = 0;
375        self.state = ConnState::Healthy;
376        self.circuit.close();
377    }
378
379    fn record_failure(&mut self, config: &AsyncPoolConfig) {
380        self.consecutive_failures += 1;
381        self.state = ConnState::Unhealthy {
382            since: Instant::now(),
383        };
384
385        if self.consecutive_failures >= config.circuit_failure_threshold {
386            self.circuit.open(config.circuit_open_duration);
387        }
388    }
389}
390
391// ============================================================================
392// Inner Pool (protected by Mutex)
393// ============================================================================
394
395struct AsyncInnerPool {
396    opts: Opts,
397    config: AsyncPoolConfig,
398    slots: Vec<SlotMeta>,
399    nodes: Vec<Arc<Mutex<Option<AsyncNode>>>>,
400    phase: PoolPhase,
401}
402
403impl fmt::Debug for AsyncInnerPool {
404    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405        f.debug_struct("AsyncInnerPool")
406            .field("config", &self.config)
407            .field("slots_count", &self.slots.len())
408            .field("phase", &self.phase)
409            .finish()
410    }
411}
412
413impl AsyncInnerPool {
414    fn node_opt(&self, host_idx: usize) -> Result<NodeOpt, VoltError> {
415        let ip_port = self
416            .opts
417            .0
418            .ip_ports
419            .get(host_idx)
420            .cloned()
421            .ok_or(VoltError::InvalidConfig)?;
422        Ok(NodeOpt {
423            ip_port,
424            pass: self.opts.0.pass.clone(),
425            user: self.opts.0.user.clone(),
426            connect_timeout: self.opts.0.connect_timeout,
427            read_timeout: self.opts.0.read_timeout,
428        })
429    }
430
431    async fn new(opts: Opts, config: AsyncPoolConfig) -> Result<Self, VoltError> {
432        let num_hosts = opts.0.ip_ports.len();
433        let mut inner = AsyncInnerPool {
434            opts,
435            config: config.clone(),
436            slots: Vec::with_capacity(config.size),
437            nodes: Vec::with_capacity(config.size),
438            phase: PoolPhase::Running,
439        };
440
441        for i in 0..config.size {
442            let host_idx = i % num_hosts;
443            let node_opt = inner.node_opt(host_idx)?;
444
445            async_pool_debug!(slot = i, host = host_idx, "creating connection");
446
447            match AsyncNode::new(node_opt).await {
448                Ok(node) => {
449                    inner.slots.push(SlotMeta::new_healthy(host_idx));
450                    inner.nodes.push(Arc::new(Mutex::new(Some(node))));
451                    async_pool_info!(slot = i, "connection established");
452                }
453                Err(e) => match config.validation_mode {
454                    ValidationMode::FailFast => {
455                        async_pool_error!(slot = i, error = ?e, "connection failed, aborting pool creation");
456                        return Err(e);
457                    }
458                    ValidationMode::BestEffort => {
459                        async_pool_warn!(slot = i, error = ?e, "connection failed, marking unhealthy");
460                        inner.slots.push(SlotMeta::new_unhealthy(host_idx));
461                        inner.nodes.push(Arc::new(Mutex::new(None)));
462                    }
463                },
464            }
465        }
466
467        inner.update_metrics();
468        async_pool_info!(
469            size = config.size,
470            healthy = inner.healthy_count(),
471            "async pool initialized"
472        );
473
474        Ok(inner)
475    }
476
477    fn healthy_count(&self) -> usize {
478        self.slots.iter().filter(|s| s.state.is_healthy()).count()
479    }
480
481    fn update_metrics(&self) {
482        async_pool_metrics::set_connections_total(self.slots.len());
483        async_pool_metrics::set_connections_healthy(self.healthy_count());
484    }
485}
486
487// ============================================================================
488// Pool (Thread-safe)
489// ============================================================================
490
491/// Thread-safe async connection pool for VoltDB.
492///
493/// # Example
494/// ```ignore
495/// use voltdb_client_rust::{AsyncPool, AsyncPoolConfig, Opts, IpPort};
496///
497/// let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
498/// let config = AsyncPoolConfig::new().size(5);
499/// let pool = AsyncPool::with_config(Opts::new(hosts), config).await?;
500///
501/// let conn = pool.get_conn().await?;
502/// let table = conn.query("SELECT * FROM foo").await?;
503/// ```
504pub struct AsyncPool {
505    inner: Arc<Mutex<AsyncInnerPool>>,
506    notify: Arc<Notify>,
507    counter: AtomicUsize,
508    shutdown_flag: AtomicBool,
509    config: AsyncPoolConfig,
510}
511
512impl fmt::Debug for AsyncPool {
513    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
514        f.debug_struct("AsyncPool")
515            .field("counter", &self.counter.load(Ordering::Relaxed))
516            .field("shutdown", &self.shutdown_flag.load(Ordering::Relaxed))
517            .field("config", &self.config)
518            .finish()
519    }
520}
521
522impl AsyncPool {
523    /// Create a new pool with default configuration (10 connections).
524    pub async fn new<T: Into<Opts>>(opts: T) -> Result<AsyncPool, VoltError> {
525        AsyncPool::with_config(opts, AsyncPoolConfig::default()).await
526    }
527
528    /// Create a new pool with custom size (convenience method).
529    pub async fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<AsyncPool, VoltError> {
530        AsyncPool::with_config(opts, AsyncPoolConfig::new().size(size)).await
531    }
532
533    /// Create a new pool with full configuration.
534    pub async fn with_config<T: Into<Opts>>(
535        opts: T,
536        config: AsyncPoolConfig,
537    ) -> Result<AsyncPool, VoltError> {
538        let inner = AsyncInnerPool::new(opts.into(), config.clone()).await?;
539        Ok(AsyncPool {
540            inner: Arc::new(Mutex::new(inner)),
541            notify: Arc::new(Notify::new()),
542            counter: AtomicUsize::new(0),
543            shutdown_flag: AtomicBool::new(false),
544            config,
545        })
546    }
547
548    /// Get a connection from the pool.
549    pub async fn get_conn(&self) -> Result<AsyncPooledConn<'_>, VoltError> {
550        if self.shutdown_flag.load(Ordering::Relaxed) {
551            async_pool_warn!("get_conn called on shutdown pool");
552            return Err(VoltError::PoolShutdown);
553        }
554
555        async_pool_metrics::inc_requests_total();
556
557        let preferred_idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.config.size;
558
559        match self.config.exhaustion_policy {
560            ExhaustionPolicy::FailFast => self.get_conn_failfast(preferred_idx).await,
561            ExhaustionPolicy::Block {
562                timeout: wait_timeout,
563            } => self.get_conn_blocking(preferred_idx, wait_timeout).await,
564        }
565    }
566
567    async fn get_conn_failfast(
568        &self,
569        preferred_idx: usize,
570    ) -> Result<AsyncPooledConn<'_>, VoltError> {
571        let inner = self.inner.lock().await;
572
573        if inner.phase != PoolPhase::Running {
574            return Err(VoltError::PoolShutdown);
575        }
576
577        // Try preferred index first
578        if inner.slots[preferred_idx].is_available() {
579            return self.checkout_slot(&inner, preferred_idx).await;
580        }
581
582        // Try to find any usable connection
583        for i in 1..self.config.size {
584            let idx = (preferred_idx + i) % self.config.size;
585            if inner.slots[idx].is_available() {
586                async_pool_debug!(
587                    preferred = preferred_idx,
588                    actual = idx,
589                    "using alternate connection"
590                );
591                return self.checkout_slot(&inner, idx).await;
592            }
593        }
594
595        async_pool_warn!("no healthy connections available");
596        async_pool_metrics::inc_requests_failed_total();
597        Err(VoltError::PoolExhausted)
598    }
599
600    async fn get_conn_blocking(
601        &self,
602        preferred_idx: usize,
603        wait_timeout: Duration,
604    ) -> Result<AsyncPooledConn<'_>, VoltError> {
605        let deadline = Instant::now() + wait_timeout;
606
607        loop {
608            let inner = self.inner.lock().await;
609
610            if inner.phase != PoolPhase::Running {
611                return Err(VoltError::PoolShutdown);
612            }
613
614            // Try preferred index first
615            if inner.slots[preferred_idx].is_available() {
616                return self.checkout_slot(&inner, preferred_idx).await;
617            }
618
619            // Try any available connection
620            for i in 1..self.config.size {
621                let idx = (preferred_idx + i) % self.config.size;
622                if inner.slots[idx].is_available() {
623                    return self.checkout_slot(&inner, idx).await;
624                }
625            }
626
627            // No connection available, wait
628            let remaining = deadline.saturating_duration_since(Instant::now());
629            if remaining.is_zero() {
630                async_pool_warn!(timeout = ?wait_timeout, "connection wait timed out");
631                async_pool_metrics::inc_requests_failed_total();
632                return Err(VoltError::Timeout);
633            }
634
635            async_pool_trace!("waiting for available connection");
636            drop(inner);
637
638            // Wait for notification or timeout
639            let _ = timeout(remaining, self.notify.notified()).await;
640        }
641    }
642
643    async fn checkout_slot(
644        &self,
645        inner: &AsyncInnerPool,
646        idx: usize,
647    ) -> Result<AsyncPooledConn<'_>, VoltError> {
648        let node = Arc::clone(&inner.nodes[idx]);
649        let config = inner.config.clone();
650        let host_idx = inner.slots[idx].host_idx;
651
652        async_pool_trace!(slot = idx, "connection acquired");
653
654        Ok(AsyncPooledConn {
655            pool: self,
656            idx,
657            node,
658            config,
659            host_idx,
660        })
661    }
662
663    /// Report a fatal error on a connection slot
664    async fn report_fatal_error(&self, idx: usize) {
665        #[allow(clippy::type_complexity)]
666        let reconnect_info: Option<(
667            Arc<Mutex<Option<AsyncNode>>>,
668            NodeOpt,
669            AsyncPoolConfig,
670        )>;
671
672        {
673            let mut inner = self.inner.lock().await;
674            let config = inner.config.clone();
675            inner.slots[idx].record_failure(&config);
676            async_pool_debug!(slot = idx, "fatal error reported");
677
678            inner.update_metrics();
679            self.notify.notify_waiters();
680
681            if !self.shutdown_flag.load(Ordering::Relaxed) {
682                let backoff = inner.config.reconnect_backoff;
683                if inner.slots[idx].needs_reconnect(backoff) {
684                    let node_arc = Arc::clone(&inner.nodes[idx]);
685                    let host_idx = inner.slots[idx].host_idx;
686                    if let Ok(node_opt) = inner.node_opt(host_idx) {
687                        inner.slots[idx].state = ConnState::Reconnecting;
688                        inner.slots[idx].last_reconnect_attempt = Some(Instant::now());
689
690                        reconnect_info = Some((node_arc, node_opt, config));
691                    } else {
692                        reconnect_info = None;
693                    }
694                } else {
695                    reconnect_info = None;
696                }
697            } else {
698                reconnect_info = None;
699            }
700        }
701
702        if let Some((node_arc, node_opt, config)) = reconnect_info {
703            self.do_reconnect(idx, node_arc, node_opt, config).await;
704        }
705    }
706
707    async fn do_reconnect(
708        &self,
709        idx: usize,
710        node_arc: Arc<Mutex<Option<AsyncNode>>>,
711        node_opt: NodeOpt,
712        config: AsyncPoolConfig,
713    ) {
714        async_pool_info!(slot = idx, "attempting reconnection");
715        async_pool_metrics::inc_reconnect_total();
716
717        match AsyncNode::new(node_opt).await {
718            Ok(new_node) => {
719                {
720                    let mut node_guard = node_arc.lock().await;
721                    *node_guard = Some(new_node);
722                }
723
724                {
725                    let mut inner = self.inner.lock().await;
726                    inner.slots[idx].record_success();
727                    inner.update_metrics();
728                }
729
730                self.notify.notify_waiters();
731                async_pool_info!(slot = idx, "reconnection successful");
732            }
733            Err(_e) => {
734                {
735                    let mut inner = self.inner.lock().await;
736                    inner.slots[idx].record_failure(&config);
737                    inner.update_metrics();
738                }
739                async_pool_error!(slot = idx, error = ?_e, "reconnection failed");
740            }
741        }
742    }
743
744    async fn mark_success(&self, idx: usize) {
745        let mut inner = self.inner.lock().await;
746        inner.slots[idx].record_success();
747        inner.update_metrics();
748    }
749
750    /// Initiate graceful shutdown.
751    /// Signals all background tasks to stop and waits for them to finish.
752    pub async fn shutdown(&self) {
753        async_pool_info!("initiating pool shutdown");
754        self.shutdown_flag.store(true, Ordering::Relaxed);
755
756        // Collect node arcs while holding pool lock, then release before awaiting shutdowns
757        let nodes: Vec<Arc<Mutex<Option<AsyncNode>>>>;
758        {
759            let mut inner = self.inner.lock().await;
760            inner.phase = PoolPhase::Shutdown;
761            async_pool_info!("entering shutdown phase");
762
763            for slot in &mut inner.slots {
764                slot.state = ConnState::Unhealthy {
765                    since: Instant::now(),
766                };
767            }
768
769            nodes = inner.nodes.iter().map(Arc::clone).collect();
770        } // pool lock released to prevent deadlocks during node shutdown
771
772        // Shutdown each node (awaits background tasks)
773        for node_arc in &nodes {
774            let mut node_guard = node_arc.lock().await;
775            if let Some(ref node) = *node_guard {
776                let _ = node.shutdown().await;
777            }
778            *node_guard = None;
779        }
780
781        {
782            let inner = self.inner.lock().await;
783            inner.update_metrics();
784        }
785        self.notify.notify_waiters();
786
787        async_pool_info!("pool shutdown complete");
788    }
789
790    /// Check if pool is shut down.
791    pub fn is_shutdown(&self) -> bool {
792        self.shutdown_flag.load(Ordering::Relaxed)
793    }
794
795    /// Get current pool statistics.
796    pub async fn stats(&self) -> AsyncPoolStats {
797        let inner = self.inner.lock().await;
798        AsyncPoolStats {
799            size: self.config.size,
800            healthy: inner.healthy_count(),
801            total_requests: self.counter.load(Ordering::Relaxed),
802            is_shutdown: self.shutdown_flag.load(Ordering::Relaxed),
803        }
804    }
805}
806
807/// Pool statistics snapshot.
808#[derive(Debug, Clone)]
809pub struct AsyncPoolStats {
810    pub size: usize,
811    pub healthy: usize,
812    pub total_requests: usize,
813    pub is_shutdown: bool,
814}
815
816// ============================================================================
817// Pooled Connection
818// ============================================================================
819
820/// A connection handle from the async pool.
821pub struct AsyncPooledConn<'a> {
822    pool: &'a AsyncPool,
823    idx: usize,
824    node: Arc<Mutex<Option<AsyncNode>>>,
825    #[allow(dead_code)]
826    config: AsyncPoolConfig,
827    #[allow(dead_code)]
828    host_idx: usize,
829}
830
831impl fmt::Debug for AsyncPooledConn<'_> {
832    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
833        f.debug_struct("AsyncPooledConn")
834            .field("idx", &self.idx)
835            .field("host_idx", &self.host_idx)
836            .finish()
837    }
838}
839
840impl AsyncPooledConn<'_> {
841    /// Execute a SQL query.
842    pub async fn query(&self, sql: &str) -> Result<VoltTable, VoltError> {
843        async_pool_trace!(slot = self.idx, sql = sql, "executing query");
844
845        let mut node_guard = self.node.lock().await;
846        let node = node_guard
847            .as_mut()
848            .ok_or(VoltError::ConnectionNotAvailable)?;
849        let mut rx = node.query(sql).await?;
850        drop(node_guard);
851        let result =
852            async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
853        self.handle_result(&result).await;
854        result
855    }
856
857    pub async fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
858        async_pool_trace!(slot = self.idx, "listing procedures");
859
860        let mut node_guard = self.node.lock().await;
861        let node = node_guard
862            .as_mut()
863            .ok_or(VoltError::ConnectionNotAvailable)?;
864        let mut rx = node.list_procedures().await?;
865        drop(node_guard);
866        let result =
867            async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
868        self.handle_result(&result).await;
869        result
870    }
871
872    pub async fn call_sp(
873        &mut self,
874        proc: &str,
875        params: Vec<&dyn Value>,
876    ) -> Result<VoltTable, VoltError> {
877        async_pool_trace!(
878            slot = self.idx,
879            procedure = proc,
880            "calling stored procedure"
881        );
882        let mut node_guard = self.node.lock().await;
883        let node = node_guard
884            .as_mut()
885            .ok_or(VoltError::ConnectionNotAvailable)?;
886        let mut rx = node.call_sp(proc, params).await?;
887        drop(node_guard);
888
889        let result =
890            async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
891        self.handle_result(&result).await;
892        result
893    }
894
895    /// Upload a JAR file.
896    pub async fn upload_jar(&self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
897        async_pool_trace!(slot = self.idx, size = bs.len(), "uploading jar");
898
899        let mut node_guard = self.node.lock().await;
900        let node = node_guard
901            .as_mut()
902            .ok_or(VoltError::ConnectionNotAvailable)?;
903
904        let mut rx = node.upload_jar(bs).await?;
905        drop(node_guard);
906
907        let result =
908            async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
909        self.handle_result(&result).await;
910        result
911    }
912
913    /// Handle operation result - update state and trigger reconnection if needed
914    async fn handle_result<T>(&self, result: &Result<T, VoltError>) {
915        match result {
916            Ok(_) => {
917                self.pool.mark_success(self.idx).await;
918            }
919            Err(e) if e.is_connection_fatal() => {
920                async_pool_error!(slot = self.idx, error = ?e, "fatal connection error detected");
921                {
922                    let mut guard = self.node.lock().await;
923                    *guard = None;
924                }
925                self.pool.report_fatal_error(self.idx).await;
926            }
927            Err(_) => {
928                // Non-fatal error, no action needed
929            }
930        }
931    }
932
933    /// Get the slot index of this connection (for debugging).
934    pub fn slot_index(&self) -> usize {
935        self.idx
936    }
937}
938
939// ============================================================================
940// Tests
941// ============================================================================
942
943#[cfg(test)]
944mod tests {
945    use super::*;
946
947    #[test]
948    fn test_conn_state_is_healthy() {
949        assert!(ConnState::Healthy.is_healthy());
950        assert!(
951            !ConnState::Unhealthy {
952                since: Instant::now()
953            }
954            .is_healthy()
955        );
956        assert!(!ConnState::Reconnecting.is_healthy());
957    }
958
959    #[test]
960    fn test_circuit_should_allow_closed() {
961        let circuit = Circuit::Closed;
962        assert!(circuit.should_allow());
963    }
964
965    #[test]
966    fn test_circuit_should_allow_open_not_expired() {
967        let circuit = Circuit::Open {
968            until: Instant::now() + Duration::from_secs(60),
969        };
970        assert!(!circuit.should_allow());
971    }
972
973    #[test]
974    fn test_circuit_should_allow_open_expired() {
975        let circuit = Circuit::Open {
976            until: Instant::now() - Duration::from_secs(1),
977        };
978        assert!(circuit.should_allow());
979    }
980
981    #[test]
982    fn test_circuit_should_allow_half_open() {
983        let circuit = Circuit::HalfOpen;
984        assert!(circuit.should_allow());
985    }
986
987    #[test]
988    fn test_circuit_transitions() {
989        let mut circuit = Circuit::Closed;
990
991        circuit.open(Duration::from_secs(30));
992        assert!(matches!(circuit, Circuit::Open { .. }));
993
994        circuit.half_open();
995        assert!(matches!(circuit, Circuit::HalfOpen));
996
997        circuit.close();
998        assert!(matches!(circuit, Circuit::Closed));
999    }
1000
1001    #[test]
1002    fn test_pool_config_builder() {
1003        let config = AsyncPoolConfig::new()
1004            .size(20)
1005            .reconnect_backoff(Duration::from_secs(10))
1006            .circuit_open_duration(Duration::from_secs(60))
1007            .exhaustion_policy(ExhaustionPolicy::Block {
1008                timeout: Duration::from_secs(5),
1009            })
1010            .validation_mode(ValidationMode::BestEffort)
1011            .circuit_failure_threshold(5)
1012            .shutdown_timeout(Duration::from_secs(60))
1013            .request_timeout(Duration::from_secs(15));
1014
1015        assert_eq!(config.size, 20);
1016        assert_eq!(config.reconnect_backoff, Duration::from_secs(10));
1017        assert_eq!(config.circuit_open_duration, Duration::from_secs(60));
1018        assert_eq!(
1019            config.exhaustion_policy,
1020            ExhaustionPolicy::Block {
1021                timeout: Duration::from_secs(5)
1022            }
1023        );
1024        assert_eq!(config.validation_mode, ValidationMode::BestEffort);
1025        assert_eq!(config.circuit_failure_threshold, 5);
1026        assert_eq!(config.shutdown_timeout, Duration::from_secs(60));
1027        assert_eq!(config.request_timeout, Duration::from_secs(15));
1028    }
1029
1030    #[test]
1031    fn test_pool_config_default() {
1032        let config = AsyncPoolConfig::default();
1033        assert_eq!(config.size, 10);
1034        assert_eq!(config.exhaustion_policy, ExhaustionPolicy::FailFast);
1035        assert_eq!(config.validation_mode, ValidationMode::FailFast);
1036        assert_eq!(config.request_timeout, Duration::from_secs(30));
1037    }
1038
1039    #[test]
1040    fn test_slot_meta_is_available() {
1041        let slot = SlotMeta::new_healthy(0);
1042        assert!(slot.is_available());
1043
1044        let slot = SlotMeta::new_unhealthy(0);
1045        assert!(!slot.is_available());
1046
1047        let mut slot = SlotMeta::new_healthy(0);
1048        slot.circuit = Circuit::Open {
1049            until: Instant::now() + Duration::from_secs(60),
1050        };
1051        assert!(!slot.is_available());
1052    }
1053
1054    #[test]
1055    fn test_slot_meta_needs_reconnect() {
1056        let mut slot = SlotMeta::new_unhealthy(0);
1057        let backoff = Duration::from_millis(100);
1058
1059        assert!(slot.needs_reconnect(backoff));
1060
1061        slot.state = ConnState::Reconnecting;
1062        assert!(!slot.needs_reconnect(backoff));
1063
1064        slot.state = ConnState::Healthy;
1065        assert!(!slot.needs_reconnect(backoff));
1066    }
1067
1068    #[test]
1069    fn test_slot_meta_record_success() {
1070        let mut slot = SlotMeta::new_unhealthy(0);
1071        slot.consecutive_failures = 5;
1072
1073        slot.record_success();
1074
1075        assert_eq!(slot.consecutive_failures, 0);
1076        assert!(matches!(slot.state, ConnState::Healthy));
1077        assert!(matches!(slot.circuit, Circuit::Closed));
1078    }
1079
1080    #[test]
1081    fn test_slot_meta_record_failure_opens_circuit() {
1082        let mut slot = SlotMeta::new_healthy(0);
1083        slot.consecutive_failures = 2;
1084
1085        let config = AsyncPoolConfig::default().circuit_failure_threshold(3);
1086
1087        slot.record_failure(&config);
1088
1089        assert_eq!(slot.consecutive_failures, 3);
1090        assert!(matches!(slot.circuit, Circuit::Open { .. }));
1091    }
1092
1093    #[test]
1094    fn test_pool_phase() {
1095        assert_eq!(PoolPhase::Running, PoolPhase::Running);
1096        assert_ne!(PoolPhase::Running, PoolPhase::Shutdown);
1097    }
1098}