Skip to main content

voltdb_client_rust/
pool.rs

1//! Production-ready connection pool for VoltDB.
2//!
3//! # Features
4//! - Thread-safe with fine-grained locking
5//! - Connection state machine (Healthy, Unhealthy, Reconnecting)
6//! - Per-connection circuit breaker
7//! - Configurable exhaustion policy (FailFast or Block with Condvar)
8//! - Graceful shutdown with drain mode
9//! - Optional structured logging (`tracing` feature)
10//! - Optional metrics (`metrics` feature)
11//!
12//! # Design
13//! - Pool lock only guards metadata (state, circuit breaker)
14//! - Each Node has its own lock for I/O operations
15//! - Network I/O never holds the pool lock
16
17use std::fmt;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::sync::{Arc, Condvar, Mutex};
20use std::time::{Duration, Instant};
21
22use crate::pool_core::{
23    Circuit, ConnState, ExhaustionPolicy, PoolError, PoolPhase, ValidationMode,
24};
25use crate::{Node, NodeOpt, Opts, Value, VoltError, VoltTable, block_for_result, node};
26
27// ============================================================================
28// Logging macros - use tracing if available, otherwise no-op
29// ============================================================================
30
31#[cfg(feature = "tracing")]
32macro_rules! pool_trace {
33    ($($arg:tt)*) => { tracing::trace!($($arg)*) };
34}
35#[cfg(not(feature = "tracing"))]
36macro_rules! pool_trace {
37    ($($arg:tt)*) => {};
38}
39
40#[cfg(feature = "tracing")]
41macro_rules! pool_debug {
42    ($($arg:tt)*) => { tracing::debug!($($arg)*) };
43}
44#[cfg(not(feature = "tracing"))]
45macro_rules! pool_debug {
46    ($($arg:tt)*) => {};
47}
48
49#[cfg(feature = "tracing")]
50macro_rules! pool_info {
51    ($($arg:tt)*) => { tracing::info!($($arg)*) };
52}
53#[cfg(not(feature = "tracing"))]
54macro_rules! pool_info {
55    ($($arg:tt)*) => {};
56}
57
58#[cfg(feature = "tracing")]
59macro_rules! pool_warn {
60    ($($arg:tt)*) => { tracing::warn!($($arg)*) };
61}
62#[cfg(not(feature = "tracing"))]
63macro_rules! pool_warn {
64    ($($arg:tt)*) => {};
65}
66
67#[cfg(feature = "tracing")]
68macro_rules! pool_error {
69    ($($arg:tt)*) => { tracing::error!($($arg)*) };
70}
71#[cfg(not(feature = "tracing"))]
72macro_rules! pool_error {
73    ($($arg:tt)*) => {};
74}
75
76// ============================================================================
77// Metrics - use metrics crate if available, otherwise no-op
78// ============================================================================
79
80#[cfg(feature = "metrics")]
81mod pool_metrics {
82    use metrics::{counter, gauge};
83
84    pub fn set_connections_total(count: usize) {
85        gauge!("voltdb_pool_connections_total").set(count as f64);
86    }
87
88    pub fn set_connections_healthy(count: usize) {
89        gauge!("voltdb_pool_connections_healthy").set(count as f64);
90    }
91
92    pub fn inc_reconnect_total() {
93        counter!("voltdb_pool_reconnect_total").increment(1);
94    }
95
96    pub fn inc_circuit_open_total() {
97        counter!("voltdb_pool_circuit_open_total").increment(1);
98    }
99
100    pub fn inc_requests_failed_total() {
101        counter!("voltdb_pool_requests_failed_total").increment(1);
102    }
103
104    pub fn inc_requests_total() {
105        counter!("voltdb_pool_requests_total").increment(1);
106    }
107}
108
109#[cfg(not(feature = "metrics"))]
110mod pool_metrics {
111    pub fn set_connections_total(_count: usize) {}
112    pub fn set_connections_healthy(_count: usize) {}
113    pub fn inc_reconnect_total() {}
114    pub fn inc_circuit_open_total() {}
115    pub fn inc_requests_failed_total() {}
116    pub fn inc_requests_total() {}
117}
118
119// ============================================================================
120// Configuration
121// ============================================================================
122
123/// Pool configuration.
124#[derive(Debug, Clone)]
125pub struct PoolConfig {
126    /// Number of connections in the pool
127    pub size: usize,
128    /// Backoff duration before retry reconnection
129    pub reconnect_backoff: Duration,
130    /// How long circuit breaker stays open
131    pub circuit_open_duration: Duration,
132    /// What to do when pool is exhausted
133    pub exhaustion_policy: ExhaustionPolicy,
134    /// How to handle startup failures
135    pub validation_mode: ValidationMode,
136    /// Number of consecutive failures before opening circuit
137    pub circuit_failure_threshold: u32,
138    /// Timeout for graceful shutdown drain.
139    ///
140    /// **Note:** This field is currently reserved for future use.
141    /// The current shutdown implementation closes connections immediately.
142    pub shutdown_timeout: Duration,
143}
144
145impl Default for PoolConfig {
146    fn default() -> Self {
147        Self {
148            size: 10,
149            reconnect_backoff: Duration::from_secs(5),
150            circuit_open_duration: Duration::from_secs(30),
151            exhaustion_policy: ExhaustionPolicy::FailFast,
152            validation_mode: ValidationMode::FailFast,
153            circuit_failure_threshold: 3,
154            shutdown_timeout: Duration::from_secs(30),
155        }
156    }
157}
158
159impl PoolConfig {
160    pub fn new() -> Self {
161        Self::default()
162    }
163
164    pub fn size(mut self, size: usize) -> Self {
165        self.size = size;
166        self
167    }
168
169    pub fn reconnect_backoff(mut self, duration: Duration) -> Self {
170        self.reconnect_backoff = duration;
171        self
172    }
173
174    pub fn circuit_open_duration(mut self, duration: Duration) -> Self {
175        self.circuit_open_duration = duration;
176        self
177    }
178
179    pub fn exhaustion_policy(mut self, policy: ExhaustionPolicy) -> Self {
180        self.exhaustion_policy = policy;
181        self
182    }
183
184    pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
185        self.validation_mode = mode;
186        self
187    }
188
189    pub fn circuit_failure_threshold(mut self, threshold: u32) -> Self {
190        self.circuit_failure_threshold = threshold;
191        self
192    }
193
194    pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
195        self.shutdown_timeout = timeout;
196        self
197    }
198}
199
200// ============================================================================
201// Pool Phase (Lifecycle)
202// ============================================================================
203// Connection Slot (Metadata only - Node is separate)
204// ============================================================================
205
206/// Metadata for a connection slot. The actual Node is stored separately.
207///
208/// Note: VoltDB connections support concurrent requests via handle-based
209/// request tracking. Multiple threads can share the same connection.
210#[derive(Debug)]
211struct SlotMeta {
212    state: ConnState,
213    circuit: Circuit,
214    consecutive_failures: u32,
215    last_reconnect_attempt: Option<Instant>,
216    host_idx: usize,
217}
218
219impl SlotMeta {
220    fn new_healthy(host_idx: usize) -> Self {
221        Self {
222            state: ConnState::Healthy,
223            circuit: Circuit::Closed,
224            consecutive_failures: 0,
225            last_reconnect_attempt: None,
226            host_idx,
227        }
228    }
229
230    fn new_unhealthy(host_idx: usize) -> Self {
231        Self {
232            state: ConnState::Unhealthy {
233                since: Instant::now(),
234            },
235            circuit: Circuit::Open {
236                until: Instant::now() + Duration::from_secs(5),
237            },
238            consecutive_failures: 1,
239            last_reconnect_attempt: None,
240            host_idx,
241        }
242    }
243
244    /// Check if this slot can be used (healthy and circuit allows)
245    /// Note: Multiple threads can use the same slot concurrently
246    fn is_available(&self) -> bool {
247        self.state.is_healthy() && self.circuit.should_allow()
248    }
249
250    /// Check if this slot needs reconnection and can attempt it
251    fn needs_reconnect(&self, backoff: Duration) -> bool {
252        if self.state.is_healthy() || self.state.is_reconnecting() {
253            return false;
254        }
255
256        match self.last_reconnect_attempt {
257            None => true,
258            Some(last) => Instant::now().duration_since(last) >= backoff,
259        }
260    }
261
262    fn record_success(&mut self) {
263        self.consecutive_failures = 0;
264        self.state = ConnState::Healthy;
265        self.circuit.close();
266    }
267
268    fn record_failure(&mut self, config: &PoolConfig) {
269        self.consecutive_failures += 1;
270        self.state = ConnState::Unhealthy {
271            since: Instant::now(),
272        };
273
274        if self.consecutive_failures >= config.circuit_failure_threshold {
275            self.circuit.open(config.circuit_open_duration);
276            pool_metrics::inc_circuit_open_total();
277        }
278    }
279}
280
281// ============================================================================
282// Inner Pool (protected by Mutex)
283// ============================================================================
284
285struct InnerPool {
286    opts: Opts,
287    config: PoolConfig,
288    slots: Vec<SlotMeta>,
289    // Nodes are stored separately with their own locks.
290    // Multiple threads can share the same node concurrently -
291    // VoltDB uses handle-based request tracking for multiplexing.
292    nodes: Vec<Arc<Mutex<Option<Node>>>>,
293    phase: PoolPhase,
294}
295
296impl fmt::Debug for InnerPool {
297    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298        f.debug_struct("InnerPool")
299            .field("config", &self.config)
300            .field("slots_count", &self.slots.len())
301            .field("phase", &self.phase)
302            .finish()
303    }
304}
305
306impl InnerPool {
307    fn node_opt(&self, host_idx: usize) -> Result<NodeOpt, VoltError> {
308        let ip_port = self
309            .opts
310            .0
311            .ip_ports
312            .get(host_idx)
313            .cloned()
314            .ok_or(VoltError::InvalidConfig)?;
315        Ok(NodeOpt {
316            ip_port,
317            pass: self.opts.0.pass.clone(),
318            user: self.opts.0.user.clone(),
319            connect_timeout: self.opts.0.connect_timeout,
320            read_timeout: self.opts.0.read_timeout,
321        })
322    }
323
324    fn new(opts: Opts, config: PoolConfig) -> Result<Self, VoltError> {
325        let num_hosts = opts.0.ip_ports.len();
326        let mut inner = InnerPool {
327            opts,
328            config: config.clone(),
329            slots: Vec::with_capacity(config.size),
330            nodes: Vec::with_capacity(config.size),
331            phase: PoolPhase::Running,
332        };
333
334        for i in 0..config.size {
335            let host_idx = i % num_hosts;
336            let node_opt = inner.node_opt(host_idx)?;
337
338            pool_debug!(slot = i, host = host_idx, "creating connection");
339
340            match node::Node::new(node_opt) {
341                Ok(node) => {
342                    inner.slots.push(SlotMeta::new_healthy(host_idx));
343                    inner.nodes.push(Arc::new(Mutex::new(Some(node))));
344                    pool_info!(slot = i, "connection established");
345                }
346                Err(e) => match config.validation_mode {
347                    ValidationMode::FailFast => {
348                        pool_error!(slot = i, error = ?e, "connection failed, aborting pool creation");
349                        return Err(e);
350                    }
351                    ValidationMode::BestEffort => {
352                        pool_warn!(slot = i, error = ?e, "connection failed, marking unhealthy");
353                        inner.slots.push(SlotMeta::new_unhealthy(host_idx));
354                        inner.nodes.push(Arc::new(Mutex::new(None)));
355                    }
356                },
357            }
358        }
359
360        inner.update_metrics();
361        pool_info!(
362            size = config.size,
363            healthy = inner.healthy_count(),
364            "pool initialized"
365        );
366
367        Ok(inner)
368    }
369
370    fn healthy_count(&self) -> usize {
371        self.slots.iter().filter(|s| s.state.is_healthy()).count()
372    }
373
374    fn update_metrics(&self) {
375        pool_metrics::set_connections_total(self.slots.len());
376        pool_metrics::set_connections_healthy(self.healthy_count());
377    }
378}
379
380// ============================================================================
381// Pool (Thread-safe)
382// ============================================================================
383
384/// Thread-safe connection pool for VoltDB.
385///
386/// # Thread Safety
387/// `Pool` is designed for high concurrency:
388/// - Pool lock only guards metadata (microseconds)
389/// - Each connection has its own lock for I/O
390/// - Network I/O never blocks other threads from getting connections
391///
392/// # Example
393/// ```ignore
394/// use voltdb_client_rust::{Pool, PoolConfig, Opts, IpPort};
395///
396/// let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
397/// let config = PoolConfig::new().size(5);
398/// let pool = Pool::with_config(Opts::new(hosts), config)?;
399///
400/// let mut conn = pool.get_conn()?;
401/// let table = conn.query("SELECT * FROM foo")?;
402/// ```
403pub struct Pool {
404    inner: Arc<(Mutex<InnerPool>, Condvar)>,
405    counter: AtomicUsize,
406    shutdown_flag: AtomicBool,
407    config: PoolConfig,
408}
409
410impl fmt::Debug for Pool {
411    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
412        let inner = self.inner.0.lock().ok();
413        f.debug_struct("Pool")
414            .field("counter", &self.counter.load(Ordering::Relaxed))
415            .field("shutdown", &self.shutdown_flag.load(Ordering::Relaxed))
416            .field(
417                "healthy",
418                &inner.as_ref().map(|i| i.healthy_count()).unwrap_or(0),
419            )
420            .field("config", &self.config)
421            .finish()
422    }
423}
424
425impl Pool {
426    /// Create a new pool with default configuration (10 connections).
427    pub fn new<T: Into<Opts>>(opts: T) -> Result<Pool, VoltError> {
428        Pool::with_config(opts, PoolConfig::default())
429    }
430
431    /// Create a new pool with custom size (convenience method).
432    pub fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<Pool, VoltError> {
433        Pool::with_config(opts, PoolConfig::new().size(size))
434    }
435
436    /// Create a new pool with full configuration.
437    pub fn with_config<T: Into<Opts>>(opts: T, config: PoolConfig) -> Result<Pool, VoltError> {
438        let inner = InnerPool::new(opts.into(), config.clone())?;
439        Ok(Pool {
440            inner: Arc::new((Mutex::new(inner), Condvar::new())),
441            counter: AtomicUsize::new(0),
442            shutdown_flag: AtomicBool::new(false),
443            config,
444        })
445    }
446
447    /// Get a connection from the pool.
448    ///
449    /// # Errors
450    /// - `PoolError::PoolShutdown` if pool is shutting down
451    /// - `PoolError::PoolExhausted` if no healthy connections (FailFast policy)
452    /// - `PoolError::Timeout` if wait times out (Block policy)
453    pub fn get_conn(&self) -> Result<PooledConn<'_>, VoltError> {
454        if self.shutdown_flag.load(Ordering::Relaxed) {
455            pool_warn!("get_conn called on shutdown pool");
456            return Err(PoolError::PoolShutdown.into());
457        }
458
459        pool_metrics::inc_requests_total();
460
461        let preferred_idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.config.size;
462
463        match self.config.exhaustion_policy {
464            ExhaustionPolicy::FailFast => self.get_conn_failfast(preferred_idx),
465            ExhaustionPolicy::Block { timeout } => self.get_conn_blocking(preferred_idx, timeout),
466        }
467    }
468
469    fn get_conn_failfast(&self, preferred_idx: usize) -> Result<PooledConn<'_>, VoltError> {
470        let (lock, _cvar) = &*self.inner;
471        let inner = lock.lock().map_err(|_| PoolError::LockPoisoned)?;
472
473        if inner.phase != PoolPhase::Running {
474            return Err(PoolError::PoolShutdown.into());
475        }
476
477        // Try preferred index first
478        if inner.slots[preferred_idx].is_available() {
479            return self.checkout_slot(&inner, preferred_idx);
480        }
481
482        // Try to find any usable connection (with health-aware rotation)
483        for i in 1..self.config.size {
484            let idx = (preferred_idx + i) % self.config.size;
485            if inner.slots[idx].is_available() {
486                pool_debug!(
487                    preferred = preferred_idx,
488                    actual = idx,
489                    "using alternate connection"
490                );
491                return self.checkout_slot(&inner, idx);
492            }
493        }
494
495        pool_warn!("no healthy connections available");
496        pool_metrics::inc_requests_failed_total();
497        Err(PoolError::PoolExhausted.into())
498    }
499
500    fn get_conn_blocking(
501        &self,
502        preferred_idx: usize,
503        timeout: Duration,
504    ) -> Result<PooledConn<'_>, VoltError> {
505        let deadline = Instant::now() + timeout;
506        let (lock, cvar) = &*self.inner;
507
508        let mut inner = lock.lock().map_err(|_| PoolError::LockPoisoned)?;
509
510        loop {
511            if inner.phase != PoolPhase::Running {
512                return Err(PoolError::PoolShutdown.into());
513            }
514
515            // Try preferred index first
516            if inner.slots[preferred_idx].is_available() {
517                return self.checkout_slot(&inner, preferred_idx);
518            }
519
520            // Try any available connection
521            for i in 1..self.config.size {
522                let idx = (preferred_idx + i) % self.config.size;
523                if inner.slots[idx].is_available() {
524                    return self.checkout_slot(&inner, idx);
525                }
526            }
527
528            // No connection available, wait on condvar
529            let remaining = deadline.saturating_duration_since(Instant::now());
530            if remaining.is_zero() {
531                pool_warn!(timeout = ?timeout, "connection wait timed out");
532                pool_metrics::inc_requests_failed_total();
533                return Err(PoolError::Timeout.into());
534            }
535
536            pool_trace!("waiting for available connection");
537            let (guard, _timeout_result) = cvar
538                .wait_timeout(inner, remaining)
539                .map_err(|_| PoolError::LockPoisoned)?;
540            inner = guard;
541        }
542    }
543
544    /// Check out a slot - returns a connection handle
545    /// Note: VoltDB allows concurrent requests per connection, so we don't track "in_use"
546    fn checkout_slot<'a>(
547        &'a self,
548        inner: &InnerPool,
549        idx: usize,
550    ) -> Result<PooledConn<'a>, VoltError> {
551        // Get the node Arc (cheap clone - multiple threads can share)
552        let node = Arc::clone(&inner.nodes[idx]);
553        let config = inner.config.clone();
554        let host_idx = inner.slots[idx].host_idx;
555
556        pool_trace!(slot = idx, "connection acquired");
557
558        // Release pool lock here - PooledConn does NOT hold it
559        Ok(PooledConn {
560            pool: self,
561            idx,
562            node,
563            config,
564            host_idx,
565        })
566    }
567
568    /// Report a fatal error on a connection slot and trigger reconnection if needed
569    fn report_fatal_error(&self, idx: usize) {
570        let (lock, cvar) = &*self.inner;
571
572        // Variables for reconnection (populated inside lock, used outside)
573        #[allow(clippy::type_complexity)]
574        let mut reconnect_info: Option<(Arc<Mutex<Option<Node>>>, NodeOpt, PoolConfig)> = None;
575
576        if let Ok(mut inner) = lock.lock() {
577            // Mark as needing reconnection
578            let config = inner.config.clone();
579            inner.slots[idx].record_failure(&config);
580            pool_debug!(slot = idx, "fatal error reported");
581
582            inner.update_metrics();
583
584            // Wake up any waiters (in case they're waiting for healthy connections)
585            cvar.notify_all();
586
587            // Check if we should trigger background reconnection
588            if !self.shutdown_flag.load(Ordering::Relaxed) {
589                let backoff = inner.config.reconnect_backoff;
590                if inner.slots[idx].needs_reconnect(backoff) {
591                    // Gather info for reconnection
592                    let node_arc = Arc::clone(&inner.nodes[idx]);
593                    let host_idx = inner.slots[idx].host_idx;
594                    if let Ok(node_opt) = inner.node_opt(host_idx) {
595                        inner.slots[idx].state = ConnState::Reconnecting;
596                        inner.slots[idx].last_reconnect_attempt = Some(Instant::now());
597
598                        reconnect_info = Some((node_arc, node_opt, config));
599                    }
600                }
601            }
602        }
603
604        // Reconnection happens OUTSIDE the pool lock
605        if let Some((node_arc, node_opt, config)) = reconnect_info {
606            self.do_reconnect(idx, node_arc, node_opt, config);
607        }
608    }
609
610    /// Perform reconnection OUTSIDE the pool lock
611    fn do_reconnect(
612        &self,
613        idx: usize,
614        node_arc: Arc<Mutex<Option<Node>>>,
615        node_opt: NodeOpt,
616        config: PoolConfig,
617    ) {
618        pool_info!(slot = idx, "attempting reconnection");
619        pool_metrics::inc_reconnect_total();
620
621        // This network I/O happens OUTSIDE the pool lock
622        match node::Node::new(node_opt) {
623            Ok(new_node) => {
624                // Install new node (only node lock held)
625                if let Ok(mut node_guard) = node_arc.lock() {
626                    *node_guard = Some(new_node);
627                }
628
629                // Update metadata (pool lock)
630                let (lock, cvar) = &*self.inner;
631                if let Ok(mut inner) = lock.lock() {
632                    inner.slots[idx].record_success();
633                    inner.update_metrics();
634                    cvar.notify_all(); // Wake waiters
635                }
636                pool_info!(slot = idx, "reconnection successful");
637            }
638            Err(_e) => {
639                // Update metadata only
640                let (lock, _) = &*self.inner;
641                if let Ok(mut inner) = lock.lock() {
642                    inner.slots[idx].record_failure(&config);
643                    inner.update_metrics();
644                }
645                pool_error!(slot = idx, error = ?_e, "reconnection failed");
646            }
647        }
648    }
649
650    /// Mark success for a slot
651    fn mark_success(&self, idx: usize) {
652        let (lock, _) = &*self.inner;
653        if let Ok(mut inner) = lock.lock() {
654            inner.slots[idx].record_success();
655            inner.update_metrics();
656        }
657    }
658
659    /// Initiate graceful shutdown.
660    ///
661    /// This method:
662    /// 1. Stops accepting new connections
663    /// 2. Closes all connections
664    ///
665    /// Note: Since VoltDB connections are shared (multiple threads can use the same
666    /// connection concurrently), we don't track "active" connections. Shutdown
667    /// simply prevents new checkouts and clears the connections.
668    pub fn shutdown(&self) {
669        pool_info!("initiating pool shutdown");
670        self.shutdown_flag.store(true, Ordering::Relaxed);
671
672        let (lock, cvar) = &*self.inner;
673
674        if let Ok(mut inner) = lock.lock() {
675            // Set phase to Shutdown - reject new connections
676            inner.phase = PoolPhase::Shutdown;
677            pool_info!("entering shutdown phase");
678
679            // Mark all slots as unhealthy
680            for slot in &mut inner.slots {
681                slot.state = ConnState::Unhealthy {
682                    since: Instant::now(),
683                };
684            }
685
686            // Clear all nodes
687            for node_arc in &inner.nodes {
688                if let Ok(mut node_guard) = node_arc.lock() {
689                    *node_guard = None;
690                }
691            }
692            inner.update_metrics();
693
694            // Wake any waiters so they get shutdown error
695            cvar.notify_all();
696        }
697
698        pool_info!("pool shutdown complete");
699    }
700
701    /// Check if pool is shut down.
702    pub fn is_shutdown(&self) -> bool {
703        self.shutdown_flag.load(Ordering::Relaxed)
704    }
705
706    /// Get current pool statistics.
707    pub fn stats(&self) -> PoolStats {
708        let (lock, _) = &*self.inner;
709        let inner = lock.lock().ok();
710        PoolStats {
711            size: self.config.size,
712            healthy: inner.as_ref().map(|i| i.healthy_count()).unwrap_or(0),
713            total_requests: self.counter.load(Ordering::Relaxed),
714            is_shutdown: self.shutdown_flag.load(Ordering::Relaxed),
715        }
716    }
717}
718
719/// Pool statistics snapshot.
720#[derive(Debug, Clone)]
721pub struct PoolStats {
722    pub size: usize,
723    pub healthy: usize,
724    pub total_requests: usize,
725    pub is_shutdown: bool,
726}
727
728// ============================================================================
729// Pooled Connection
730// ============================================================================
731
732/// A connection handle from the pool.
733///
734/// # Important
735/// - Does NOT hold the pool lock during I/O operations
736/// - Only the connection's own mutex is held during queries
737/// - Multiple PooledConn instances can share the same underlying connection
738///   (VoltDB supports concurrent requests via handle-based tracking)
739pub struct PooledConn<'a> {
740    pool: &'a Pool,
741    idx: usize,
742    node: Arc<Mutex<Option<Node>>>,
743    #[allow(dead_code)] // Reserved for future per-connection config
744    config: PoolConfig,
745    #[allow(dead_code)]
746    host_idx: usize,
747}
748
749impl fmt::Debug for PooledConn<'_> {
750    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
751        f.debug_struct("PooledConn")
752            .field("idx", &self.idx)
753            .field("host_idx", &self.host_idx)
754            .finish()
755    }
756}
757
758impl PooledConn<'_> {
759    /// Execute a SQL query.
760    pub fn query(&mut self, sql: &str) -> Result<VoltTable, VoltError> {
761        pool_trace!(slot = self.idx, sql = sql, "executing query");
762
763        let mut node_guard = self
764            .node
765            .lock()
766            .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
767
768        let node = node_guard
769            .as_mut()
770            .ok_or(VoltError::ConnectionNotAvailable)?;
771
772        let result = node.query(sql).and_then(|r| block_for_result(&r));
773        drop(node_guard);
774
775        self.handle_result(&result);
776        result
777    }
778
779    /// List all stored procedures.
780    pub fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
781        pool_trace!(slot = self.idx, "listing procedures");
782
783        let mut node_guard = self
784            .node
785            .lock()
786            .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
787
788        let node = node_guard
789            .as_mut()
790            .ok_or(VoltError::ConnectionNotAvailable)?;
791
792        let result = node.list_procedures().and_then(|r| block_for_result(&r));
793        drop(node_guard);
794
795        self.handle_result(&result);
796        result
797    }
798
799    /// Call a stored procedure with parameters.
800    pub fn call_sp(&mut self, proc: &str, params: Vec<&dyn Value>) -> Result<VoltTable, VoltError> {
801        pool_trace!(
802            slot = self.idx,
803            procedure = proc,
804            "calling stored procedure"
805        );
806
807        let mut node_guard = self
808            .node
809            .lock()
810            .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
811
812        let node = node_guard
813            .as_mut()
814            .ok_or(VoltError::ConnectionNotAvailable)?;
815
816        let result = node
817            .call_sp(proc, params)
818            .and_then(|r| block_for_result(&r));
819        drop(node_guard);
820
821        self.handle_result(&result);
822        result
823    }
824
825    /// Upload a JAR file.
826    pub fn upload_jar(&mut self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
827        pool_trace!(slot = self.idx, size = bs.len(), "uploading jar");
828
829        let mut node_guard = self
830            .node
831            .lock()
832            .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
833
834        let node = node_guard
835            .as_mut()
836            .ok_or(VoltError::ConnectionNotAvailable)?;
837
838        let result = node.upload_jar(bs).and_then(|r| block_for_result(&r));
839        drop(node_guard);
840
841        self.handle_result(&result);
842        result
843    }
844
845    /// Handle operation result - update state and trigger reconnection if needed
846    fn handle_result<T>(&self, result: &Result<T, VoltError>) {
847        match result {
848            Ok(_) => {
849                self.pool.mark_success(self.idx);
850            }
851            Err(e) if e.is_connection_fatal() => {
852                pool_error!(slot = self.idx, error = ?e, "fatal connection error detected");
853                // Clear the node and trigger reconnection
854                if let Ok(mut guard) = self.node.lock() {
855                    *guard = None;
856                }
857                self.pool.report_fatal_error(self.idx);
858            }
859            Err(_) => {
860                // Non-fatal error, no action needed
861            }
862        }
863    }
864
865    /// Get the slot index of this connection (for debugging).
866    pub fn slot_index(&self) -> usize {
867        self.idx
868    }
869}
870
871// No Drop implementation needed - VoltDB connections are shared and
872// don't need to be "returned" to the pool. The Arc<Node> handles cleanup.
873
874// ============================================================================
875// Tests
876// ============================================================================
877
878#[cfg(test)]
879mod tests {
880    use super::*;
881
882    #[test]
883    fn test_pool_config_builder() {
884        let config = PoolConfig::new()
885            .size(20)
886            .reconnect_backoff(Duration::from_secs(10))
887            .circuit_open_duration(Duration::from_secs(60))
888            .exhaustion_policy(ExhaustionPolicy::Block {
889                timeout: Duration::from_secs(5),
890            })
891            .validation_mode(ValidationMode::BestEffort)
892            .circuit_failure_threshold(5)
893            .shutdown_timeout(Duration::from_secs(60));
894
895        assert_eq!(config.size, 20);
896        assert_eq!(config.reconnect_backoff, Duration::from_secs(10));
897        assert_eq!(config.circuit_open_duration, Duration::from_secs(60));
898        assert_eq!(
899            config.exhaustion_policy,
900            ExhaustionPolicy::Block {
901                timeout: Duration::from_secs(5)
902            }
903        );
904        assert_eq!(config.validation_mode, ValidationMode::BestEffort);
905        assert_eq!(config.circuit_failure_threshold, 5);
906        assert_eq!(config.shutdown_timeout, Duration::from_secs(60));
907    }
908
909    #[test]
910    fn test_pool_config_default() {
911        let config = PoolConfig::default();
912        assert_eq!(config.size, 10);
913        assert_eq!(config.exhaustion_policy, ExhaustionPolicy::FailFast);
914        assert_eq!(config.validation_mode, ValidationMode::FailFast);
915    }
916
917    #[test]
918    fn test_slot_meta_is_available() {
919        // Healthy slot is available
920        let slot = SlotMeta::new_healthy(0);
921        assert!(slot.is_available());
922
923        // Unhealthy slot is not available
924        let slot = SlotMeta::new_unhealthy(0);
925        assert!(!slot.is_available());
926
927        // Healthy slot with open circuit is not available
928        let mut slot = SlotMeta::new_healthy(0);
929        slot.circuit = Circuit::Open {
930            until: Instant::now() + Duration::from_secs(60),
931        };
932        assert!(!slot.is_available());
933    }
934
935    #[test]
936    fn test_slot_meta_needs_reconnect() {
937        let mut slot = SlotMeta::new_unhealthy(0);
938        let backoff = Duration::from_millis(100);
939
940        // Unhealthy slot needs reconnect
941        assert!(slot.needs_reconnect(backoff));
942
943        // After marking reconnecting, doesn't need it
944        slot.state = ConnState::Reconnecting;
945        assert!(!slot.needs_reconnect(backoff));
946
947        // Healthy slot doesn't need reconnect
948        slot.state = ConnState::Healthy;
949        assert!(!slot.needs_reconnect(backoff));
950    }
951
952    #[test]
953    fn test_slot_meta_record_success() {
954        let mut slot = SlotMeta::new_unhealthy(0);
955        slot.consecutive_failures = 5;
956
957        slot.record_success();
958
959        assert_eq!(slot.consecutive_failures, 0);
960        assert!(matches!(slot.state, ConnState::Healthy));
961        assert!(matches!(slot.circuit, Circuit::Closed));
962    }
963
964    #[test]
965    fn test_slot_meta_record_failure_opens_circuit() {
966        let mut slot = SlotMeta::new_healthy(0);
967        slot.consecutive_failures = 2;
968
969        let config = PoolConfig::default().circuit_failure_threshold(3);
970
971        slot.record_failure(&config);
972
973        assert_eq!(slot.consecutive_failures, 3);
974        assert!(matches!(slot.circuit, Circuit::Open { .. }));
975    }
976}