Skip to main content

voltdb_client_rust/
pool.rs

1//! Production-ready connection pool for VoltDB.
2//!
3//! # Features
4//! - Thread-safe with fine-grained locking
5//! - Connection state machine (Healthy, Unhealthy, Reconnecting)
6//! - Per-connection circuit breaker
7//! - Configurable exhaustion policy (FailFast or Block with Condvar)
8//! - Graceful shutdown with drain mode
9//! - Optional structured logging (`tracing` feature)
10//! - Optional metrics (`metrics` feature)
11//!
12//! # Design
13//! - Pool lock only guards metadata (state, circuit breaker)
14//! - Each Node has its own lock for I/O operations
15//! - Network I/O never holds the pool lock
16
17use std::fmt;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::sync::{Arc, Condvar, Mutex};
20use std::time::{Duration, Instant};
21
22use crate::pool_core::{Circuit, ConnState, ExhaustionPolicy, PoolPhase, ValidationMode};
23use crate::{Node, NodeOpt, Opts, Value, VoltError, VoltTable, block_for_result, node};
24
25// ============================================================================
26// Logging macros - use tracing if available, otherwise no-op
27// ============================================================================
28
29#[cfg(feature = "tracing")]
30macro_rules! pool_trace {
31    ($($arg:tt)*) => { tracing::trace!($($arg)*) };
32}
33#[cfg(not(feature = "tracing"))]
34macro_rules! pool_trace {
35    ($($arg:tt)*) => {};
36}
37
38#[cfg(feature = "tracing")]
39macro_rules! pool_debug {
40    ($($arg:tt)*) => { tracing::debug!($($arg)*) };
41}
42#[cfg(not(feature = "tracing"))]
43macro_rules! pool_debug {
44    ($($arg:tt)*) => {};
45}
46
47#[cfg(feature = "tracing")]
48macro_rules! pool_info {
49    ($($arg:tt)*) => { tracing::info!($($arg)*) };
50}
51#[cfg(not(feature = "tracing"))]
52macro_rules! pool_info {
53    ($($arg:tt)*) => {};
54}
55
56#[cfg(feature = "tracing")]
57macro_rules! pool_warn {
58    ($($arg:tt)*) => { tracing::warn!($($arg)*) };
59}
60#[cfg(not(feature = "tracing"))]
61macro_rules! pool_warn {
62    ($($arg:tt)*) => {};
63}
64
65#[cfg(feature = "tracing")]
66macro_rules! pool_error {
67    ($($arg:tt)*) => { tracing::error!($($arg)*) };
68}
69#[cfg(not(feature = "tracing"))]
70macro_rules! pool_error {
71    ($($arg:tt)*) => {};
72}
73
74// ============================================================================
75// Metrics - use metrics crate if available, otherwise no-op
76// ============================================================================
77
78#[cfg(feature = "metrics")]
79mod pool_metrics {
80    use metrics::{counter, gauge};
81
82    pub fn set_connections_total(count: usize) {
83        gauge!("voltdb_pool_connections_total").set(count as f64);
84    }
85
86    pub fn set_connections_healthy(count: usize) {
87        gauge!("voltdb_pool_connections_healthy").set(count as f64);
88    }
89
90    pub fn inc_reconnect_total() {
91        counter!("voltdb_pool_reconnect_total").increment(1);
92    }
93
94    pub fn inc_circuit_open_total() {
95        counter!("voltdb_pool_circuit_open_total").increment(1);
96    }
97
98    pub fn inc_requests_failed_total() {
99        counter!("voltdb_pool_requests_failed_total").increment(1);
100    }
101
102    pub fn inc_requests_total() {
103        counter!("voltdb_pool_requests_total").increment(1);
104    }
105}
106
107#[cfg(not(feature = "metrics"))]
108mod pool_metrics {
109    pub fn set_connections_total(_count: usize) {}
110    pub fn set_connections_healthy(_count: usize) {}
111    pub fn inc_reconnect_total() {}
112    pub fn inc_circuit_open_total() {}
113    pub fn inc_requests_failed_total() {}
114    pub fn inc_requests_total() {}
115}
116
117// ============================================================================
118// Configuration
119// ============================================================================
120
121/// Pool configuration.
122#[derive(Debug, Clone)]
123pub struct PoolConfig {
124    /// Number of connections in the pool
125    pub size: usize,
126    /// Backoff duration before retry reconnection
127    pub reconnect_backoff: Duration,
128    /// How long circuit breaker stays open
129    pub circuit_open_duration: Duration,
130    /// What to do when pool is exhausted
131    pub exhaustion_policy: ExhaustionPolicy,
132    /// How to handle startup failures
133    pub validation_mode: ValidationMode,
134    /// Number of consecutive failures before opening circuit
135    pub circuit_failure_threshold: u32,
136    /// Timeout for graceful shutdown drain.
137    ///
138    /// **Note:** This field is currently reserved for future use.
139    /// The current shutdown implementation closes connections immediately.
140    pub shutdown_timeout: Duration,
141}
142
143impl Default for PoolConfig {
144    fn default() -> Self {
145        Self {
146            size: 10,
147            reconnect_backoff: Duration::from_secs(5),
148            circuit_open_duration: Duration::from_secs(30),
149            exhaustion_policy: ExhaustionPolicy::FailFast,
150            validation_mode: ValidationMode::FailFast,
151            circuit_failure_threshold: 3,
152            shutdown_timeout: Duration::from_secs(30),
153        }
154    }
155}
156
157impl PoolConfig {
158    pub fn new() -> Self {
159        Self::default()
160    }
161
162    pub fn size(mut self, size: usize) -> Self {
163        self.size = size;
164        self
165    }
166
167    pub fn reconnect_backoff(mut self, duration: Duration) -> Self {
168        self.reconnect_backoff = duration;
169        self
170    }
171
172    pub fn circuit_open_duration(mut self, duration: Duration) -> Self {
173        self.circuit_open_duration = duration;
174        self
175    }
176
177    pub fn exhaustion_policy(mut self, policy: ExhaustionPolicy) -> Self {
178        self.exhaustion_policy = policy;
179        self
180    }
181
182    pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
183        self.validation_mode = mode;
184        self
185    }
186
187    pub fn circuit_failure_threshold(mut self, threshold: u32) -> Self {
188        self.circuit_failure_threshold = threshold;
189        self
190    }
191
192    pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
193        self.shutdown_timeout = timeout;
194        self
195    }
196}
197
198// ============================================================================
199// Pool Phase (Lifecycle)
200// ============================================================================
201// Connection Slot (Metadata only - Node is separate)
202// ============================================================================
203
204/// Metadata for a connection slot. The actual Node is stored separately.
205///
206/// Note: VoltDB connections support concurrent requests via handle-based
207/// request tracking. Multiple threads can share the same connection.
208#[derive(Debug)]
209struct SlotMeta {
210    state: ConnState,
211    circuit: Circuit,
212    consecutive_failures: u32,
213    last_reconnect_attempt: Option<Instant>,
214    host_idx: usize,
215}
216
217impl SlotMeta {
218    fn new_healthy(host_idx: usize) -> Self {
219        Self {
220            state: ConnState::Healthy,
221            circuit: Circuit::Closed,
222            consecutive_failures: 0,
223            last_reconnect_attempt: None,
224            host_idx,
225        }
226    }
227
228    fn new_unhealthy(host_idx: usize) -> Self {
229        Self {
230            state: ConnState::Unhealthy {
231                since: Instant::now(),
232            },
233            circuit: Circuit::Open {
234                until: Instant::now() + Duration::from_secs(5),
235            },
236            consecutive_failures: 1,
237            last_reconnect_attempt: None,
238            host_idx,
239        }
240    }
241
242    /// Check if this slot can be used (healthy and circuit allows)
243    /// Note: Multiple threads can use the same slot concurrently
244    fn is_available(&self) -> bool {
245        self.state.is_healthy() && self.circuit.should_allow()
246    }
247
248    /// Check if this slot needs reconnection and can attempt it
249    fn needs_reconnect(&self, backoff: Duration) -> bool {
250        if self.state.is_healthy() || self.state.is_reconnecting() {
251            return false;
252        }
253
254        match self.last_reconnect_attempt {
255            None => true,
256            Some(last) => Instant::now().duration_since(last) >= backoff,
257        }
258    }
259
260    fn record_success(&mut self) {
261        self.consecutive_failures = 0;
262        self.state = ConnState::Healthy;
263        self.circuit.close();
264    }
265
266    fn record_failure(&mut self, config: &PoolConfig) {
267        self.consecutive_failures += 1;
268        self.state = ConnState::Unhealthy {
269            since: Instant::now(),
270        };
271
272        if self.consecutive_failures >= config.circuit_failure_threshold {
273            self.circuit.open(config.circuit_open_duration);
274            pool_metrics::inc_circuit_open_total();
275        }
276    }
277}
278
279// ============================================================================
280// Inner Pool (protected by Mutex)
281// ============================================================================
282
283struct InnerPool {
284    opts: Opts,
285    config: PoolConfig,
286    slots: Vec<SlotMeta>,
287    // Nodes are stored separately with their own locks.
288    // Multiple threads can share the same node concurrently -
289    // VoltDB uses handle-based request tracking for multiplexing.
290    nodes: Vec<Arc<Mutex<Option<Node>>>>,
291    phase: PoolPhase,
292}
293
294impl fmt::Debug for InnerPool {
295    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296        f.debug_struct("InnerPool")
297            .field("config", &self.config)
298            .field("slots_count", &self.slots.len())
299            .field("phase", &self.phase)
300            .finish()
301    }
302}
303
304impl InnerPool {
305    fn node_opt(&self, host_idx: usize) -> Result<NodeOpt, VoltError> {
306        let ip_port = self
307            .opts
308            .0
309            .ip_ports
310            .get(host_idx)
311            .cloned()
312            .ok_or(VoltError::InvalidConfig)?;
313        Ok(NodeOpt {
314            ip_port,
315            pass: self.opts.0.pass.clone(),
316            user: self.opts.0.user.clone(),
317            connect_timeout: self.opts.0.connect_timeout,
318            read_timeout: self.opts.0.read_timeout,
319        })
320    }
321
322    fn new(opts: Opts, config: PoolConfig) -> Result<Self, VoltError> {
323        let num_hosts = opts.0.ip_ports.len();
324        let mut inner = InnerPool {
325            opts,
326            config: config.clone(),
327            slots: Vec::with_capacity(config.size),
328            nodes: Vec::with_capacity(config.size),
329            phase: PoolPhase::Running,
330        };
331
332        for i in 0..config.size {
333            let host_idx = i % num_hosts;
334            let node_opt = inner.node_opt(host_idx)?;
335
336            pool_debug!(slot = i, host = host_idx, "creating connection");
337
338            match node::Node::new(node_opt) {
339                Ok(node) => {
340                    inner.slots.push(SlotMeta::new_healthy(host_idx));
341                    inner.nodes.push(Arc::new(Mutex::new(Some(node))));
342                    pool_info!(slot = i, "connection established");
343                }
344                Err(e) => match config.validation_mode {
345                    ValidationMode::FailFast => {
346                        pool_error!(slot = i, error = ?e, "connection failed, aborting pool creation");
347                        return Err(e);
348                    }
349                    ValidationMode::BestEffort => {
350                        pool_warn!(slot = i, error = ?e, "connection failed, marking unhealthy");
351                        inner.slots.push(SlotMeta::new_unhealthy(host_idx));
352                        inner.nodes.push(Arc::new(Mutex::new(None)));
353                    }
354                },
355            }
356        }
357
358        inner.update_metrics();
359        pool_info!(
360            size = config.size,
361            healthy = inner.healthy_count(),
362            "pool initialized"
363        );
364
365        Ok(inner)
366    }
367
368    fn healthy_count(&self) -> usize {
369        self.slots.iter().filter(|s| s.state.is_healthy()).count()
370    }
371
372    fn update_metrics(&self) {
373        pool_metrics::set_connections_total(self.slots.len());
374        pool_metrics::set_connections_healthy(self.healthy_count());
375    }
376}
377
378// ============================================================================
379// Pool (Thread-safe)
380// ============================================================================
381
382/// Thread-safe connection pool for VoltDB.
383///
384/// # Thread Safety
385/// `Pool` is designed for high concurrency:
386/// - Pool lock only guards metadata (microseconds)
387/// - Each connection has its own lock for I/O
388/// - Network I/O never blocks other threads from getting connections
389///
390/// # Example
391/// ```ignore
392/// use voltdb_client_rust::{Pool, PoolConfig, Opts, IpPort};
393///
394/// let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
395/// let config = PoolConfig::new().size(5);
396/// let pool = Pool::with_config(Opts::new(hosts), config)?;
397///
398/// let mut conn = pool.get_conn()?;
399/// let table = conn.query("SELECT * FROM foo")?;
400/// ```
401pub struct Pool {
402    inner: Arc<(Mutex<InnerPool>, Condvar)>,
403    counter: AtomicUsize,
404    shutdown_flag: AtomicBool,
405    config: PoolConfig,
406}
407
408impl fmt::Debug for Pool {
409    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410        let inner = self.inner.0.lock().ok();
411        f.debug_struct("Pool")
412            .field("counter", &self.counter.load(Ordering::Relaxed))
413            .field("shutdown", &self.shutdown_flag.load(Ordering::Relaxed))
414            .field(
415                "healthy",
416                &inner.as_ref().map(|i| i.healthy_count()).unwrap_or(0),
417            )
418            .field("config", &self.config)
419            .finish()
420    }
421}
422
423impl Pool {
424    /// Create a new pool with default configuration (10 connections).
425    pub fn new<T: Into<Opts>>(opts: T) -> Result<Pool, VoltError> {
426        Pool::with_config(opts, PoolConfig::default())
427    }
428
429    /// Create a new pool with custom size (convenience method).
430    pub fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<Pool, VoltError> {
431        Pool::with_config(opts, PoolConfig::new().size(size))
432    }
433
434    /// Create a new pool with full configuration.
435    pub fn with_config<T: Into<Opts>>(opts: T, config: PoolConfig) -> Result<Pool, VoltError> {
436        let inner = InnerPool::new(opts.into(), config.clone())?;
437        Ok(Pool {
438            inner: Arc::new((Mutex::new(inner), Condvar::new())),
439            counter: AtomicUsize::new(0),
440            shutdown_flag: AtomicBool::new(false),
441            config,
442        })
443    }
444
445    /// Get a connection from the pool.
446    ///
447    /// # Errors
448    /// - `PoolError::PoolShutdown` if pool is shutting down
449    /// - `PoolError::PoolExhausted` if no healthy connections (FailFast policy)
450    /// - `PoolError::Timeout` if wait times out (Block policy)
451    pub fn get_conn(&self) -> Result<PooledConn<'_>, VoltError> {
452        if self.shutdown_flag.load(Ordering::Relaxed) {
453            pool_warn!("get_conn called on shutdown pool");
454            return Err(VoltError::PoolShutdown);
455        }
456
457        pool_metrics::inc_requests_total();
458
459        let preferred_idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.config.size;
460
461        match self.config.exhaustion_policy {
462            ExhaustionPolicy::FailFast => self.get_conn_failfast(preferred_idx),
463            ExhaustionPolicy::Block { timeout } => self.get_conn_blocking(preferred_idx, timeout),
464        }
465    }
466
467    fn get_conn_failfast(&self, preferred_idx: usize) -> Result<PooledConn<'_>, VoltError> {
468        let (lock, _cvar) = &*self.inner;
469        let inner = lock
470            .lock()
471            .map_err(|_| VoltError::PoisonError("Pool lock poisoned".into()))?;
472
473        if inner.phase != PoolPhase::Running {
474            return Err(VoltError::PoolShutdown);
475        }
476
477        // Try preferred index first
478        if inner.slots[preferred_idx].is_available() {
479            return self.checkout_slot(&inner, preferred_idx);
480        }
481
482        // Try to find any usable connection (with health-aware rotation)
483        for i in 1..self.config.size {
484            let idx = (preferred_idx + i) % self.config.size;
485            if inner.slots[idx].is_available() {
486                pool_debug!(
487                    preferred = preferred_idx,
488                    actual = idx,
489                    "using alternate connection"
490                );
491                return self.checkout_slot(&inner, idx);
492            }
493        }
494
495        pool_warn!("no healthy connections available");
496        pool_metrics::inc_requests_failed_total();
497        Err(VoltError::PoolExhausted)
498    }
499
500    fn get_conn_blocking(
501        &self,
502        preferred_idx: usize,
503        timeout: Duration,
504    ) -> Result<PooledConn<'_>, VoltError> {
505        let deadline = Instant::now() + timeout;
506        let (lock, cvar) = &*self.inner;
507
508        let mut inner = lock
509            .lock()
510            .map_err(|_| VoltError::PoisonError("Pool lock poisoned".into()))?;
511
512        loop {
513            if inner.phase != PoolPhase::Running {
514                return Err(VoltError::PoolShutdown);
515            }
516
517            // Try preferred index first
518            if inner.slots[preferred_idx].is_available() {
519                return self.checkout_slot(&inner, preferred_idx);
520            }
521
522            // Try any available connection
523            for i in 1..self.config.size {
524                let idx = (preferred_idx + i) % self.config.size;
525                if inner.slots[idx].is_available() {
526                    return self.checkout_slot(&inner, idx);
527                }
528            }
529
530            // No connection available, wait on condvar
531            let remaining = deadline.saturating_duration_since(Instant::now());
532            if remaining.is_zero() {
533                pool_warn!(timeout = ?timeout, "connection wait timed out");
534                pool_metrics::inc_requests_failed_total();
535                return Err(VoltError::Timeout);
536            }
537
538            pool_trace!("waiting for available connection");
539            let (guard, _timeout_result) = cvar
540                .wait_timeout(inner, remaining)
541                .map_err(|_| VoltError::PoisonError("Pool lock poisoned".into()))?;
542            inner = guard;
543        }
544    }
545
546    /// Check out a slot - returns a connection handle
547    /// Note: VoltDB allows concurrent requests per connection, so we don't track "in_use"
548    fn checkout_slot<'a>(
549        &'a self,
550        inner: &InnerPool,
551        idx: usize,
552    ) -> Result<PooledConn<'a>, VoltError> {
553        // Get the node Arc (cheap clone - multiple threads can share)
554        let node = Arc::clone(&inner.nodes[idx]);
555        let config = inner.config.clone();
556        let host_idx = inner.slots[idx].host_idx;
557
558        pool_trace!(slot = idx, "connection acquired");
559
560        // Release pool lock here - PooledConn does NOT hold it
561        Ok(PooledConn {
562            pool: self,
563            idx,
564            node,
565            config,
566            host_idx,
567        })
568    }
569
570    /// Report a fatal error on a connection slot and trigger reconnection if needed
571    fn report_fatal_error(&self, idx: usize) {
572        let (lock, cvar) = &*self.inner;
573
574        // Variables for reconnection (populated inside lock, used outside)
575        #[allow(clippy::type_complexity)]
576        let mut reconnect_info: Option<(Arc<Mutex<Option<Node>>>, NodeOpt, PoolConfig)> = None;
577
578        if let Ok(mut inner) = lock.lock() {
579            // Mark as needing reconnection
580            let config = inner.config.clone();
581            inner.slots[idx].record_failure(&config);
582            pool_debug!(slot = idx, "fatal error reported");
583
584            inner.update_metrics();
585
586            // Wake up any waiters (in case they're waiting for healthy connections)
587            cvar.notify_all();
588
589            // Check if we should trigger background reconnection
590            if !self.shutdown_flag.load(Ordering::Relaxed) {
591                let backoff = inner.config.reconnect_backoff;
592                if inner.slots[idx].needs_reconnect(backoff) {
593                    // Gather info for reconnection
594                    let node_arc = Arc::clone(&inner.nodes[idx]);
595                    let host_idx = inner.slots[idx].host_idx;
596                    if let Ok(node_opt) = inner.node_opt(host_idx) {
597                        inner.slots[idx].state = ConnState::Reconnecting;
598                        inner.slots[idx].last_reconnect_attempt = Some(Instant::now());
599
600                        reconnect_info = Some((node_arc, node_opt, config));
601                    }
602                }
603            }
604        }
605
606        // Reconnection happens OUTSIDE the pool lock
607        if let Some((node_arc, node_opt, config)) = reconnect_info {
608            self.do_reconnect(idx, node_arc, node_opt, config);
609        }
610    }
611
612    /// Perform reconnection OUTSIDE the pool lock
613    fn do_reconnect(
614        &self,
615        idx: usize,
616        node_arc: Arc<Mutex<Option<Node>>>,
617        node_opt: NodeOpt,
618        config: PoolConfig,
619    ) {
620        pool_info!(slot = idx, "attempting reconnection");
621        pool_metrics::inc_reconnect_total();
622
623        // This network I/O happens OUTSIDE the pool lock
624        match node::Node::new(node_opt) {
625            Ok(new_node) => {
626                // Install new node (only node lock held)
627                if let Ok(mut node_guard) = node_arc.lock() {
628                    *node_guard = Some(new_node);
629                }
630
631                // Update metadata (pool lock)
632                let (lock, cvar) = &*self.inner;
633                if let Ok(mut inner) = lock.lock() {
634                    inner.slots[idx].record_success();
635                    inner.update_metrics();
636                    cvar.notify_all(); // Wake waiters
637                }
638                pool_info!(slot = idx, "reconnection successful");
639            }
640            Err(_e) => {
641                // Update metadata only
642                let (lock, _) = &*self.inner;
643                if let Ok(mut inner) = lock.lock() {
644                    inner.slots[idx].record_failure(&config);
645                    inner.update_metrics();
646                }
647                pool_error!(slot = idx, error = ?_e, "reconnection failed");
648            }
649        }
650    }
651
652    /// Mark success for a slot
653    fn mark_success(&self, idx: usize) {
654        let (lock, _) = &*self.inner;
655        if let Ok(mut inner) = lock.lock() {
656            inner.slots[idx].record_success();
657            inner.update_metrics();
658        }
659    }
660
661    /// Initiate graceful shutdown.
662    ///
663    /// This method:
664    /// 1. Stops accepting new connections
665    /// 2. Closes all connections
666    ///
667    /// Note: Since VoltDB connections are shared (multiple threads can use the same
668    /// connection concurrently), we don't track "active" connections. Shutdown
669    /// simply prevents new checkouts and clears the connections.
670    pub fn shutdown(&self) {
671        pool_info!("initiating pool shutdown");
672        self.shutdown_flag.store(true, Ordering::Relaxed);
673
674        let (lock, cvar) = &*self.inner;
675
676        if let Ok(mut inner) = lock.lock() {
677            // Set phase to Shutdown - reject new connections
678            inner.phase = PoolPhase::Shutdown;
679            pool_info!("entering shutdown phase");
680
681            // Mark all slots as unhealthy
682            for slot in &mut inner.slots {
683                slot.state = ConnState::Unhealthy {
684                    since: Instant::now(),
685                };
686            }
687
688            // Shutdown and clear all nodes
689            for node_arc in &inner.nodes {
690                if let Ok(mut node_guard) = node_arc.lock() {
691                    if let Some(ref mut node) = *node_guard {
692                        let _ = node.shutdown();
693                    }
694                    *node_guard = None;
695                }
696            }
697            inner.update_metrics();
698
699            // Wake any waiters so they get shutdown error
700            cvar.notify_all();
701        }
702
703        pool_info!("pool shutdown complete");
704    }
705
706    /// Check if pool is shut down.
707    pub fn is_shutdown(&self) -> bool {
708        self.shutdown_flag.load(Ordering::Relaxed)
709    }
710
711    /// Get current pool statistics.
712    pub fn stats(&self) -> PoolStats {
713        let (lock, _) = &*self.inner;
714        let inner = lock.lock().ok();
715        PoolStats {
716            size: self.config.size,
717            healthy: inner.as_ref().map(|i| i.healthy_count()).unwrap_or(0),
718            total_requests: self.counter.load(Ordering::Relaxed),
719            is_shutdown: self.shutdown_flag.load(Ordering::Relaxed),
720        }
721    }
722}
723
724/// Pool statistics snapshot.
725#[derive(Debug, Clone)]
726pub struct PoolStats {
727    pub size: usize,
728    pub healthy: usize,
729    pub total_requests: usize,
730    pub is_shutdown: bool,
731}
732
733// ============================================================================
734// Pooled Connection
735// ============================================================================
736
737/// A connection handle from the pool.
738///
739/// # Important
740/// - Does NOT hold the pool lock during I/O operations
741/// - Only the connection's own mutex is held during queries
742/// - Multiple PooledConn instances can share the same underlying connection
743///   (VoltDB supports concurrent requests via handle-based tracking)
744pub struct PooledConn<'a> {
745    pool: &'a Pool,
746    idx: usize,
747    node: Arc<Mutex<Option<Node>>>,
748    #[allow(dead_code)] // Reserved for future per-connection config
749    config: PoolConfig,
750    #[allow(dead_code)]
751    host_idx: usize,
752}
753
754impl fmt::Debug for PooledConn<'_> {
755    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
756        f.debug_struct("PooledConn")
757            .field("idx", &self.idx)
758            .field("host_idx", &self.host_idx)
759            .finish()
760    }
761}
762
763impl PooledConn<'_> {
764    /// Execute a SQL query.
765    pub fn query(&mut self, sql: &str) -> Result<VoltTable, VoltError> {
766        pool_trace!(slot = self.idx, sql = sql, "executing query");
767
768        let mut node_guard = self
769            .node
770            .lock()
771            .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
772
773        let node = node_guard
774            .as_mut()
775            .ok_or(VoltError::ConnectionNotAvailable)?;
776
777        let result = node.query(sql).and_then(|r| block_for_result(&r));
778        drop(node_guard);
779
780        self.handle_result(&result);
781        result
782    }
783
784    /// List all stored procedures.
785    pub fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
786        pool_trace!(slot = self.idx, "listing procedures");
787
788        let mut node_guard = self
789            .node
790            .lock()
791            .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
792
793        let node = node_guard
794            .as_mut()
795            .ok_or(VoltError::ConnectionNotAvailable)?;
796
797        let result = node.list_procedures().and_then(|r| block_for_result(&r));
798        drop(node_guard);
799
800        self.handle_result(&result);
801        result
802    }
803
804    /// Call a stored procedure with parameters.
805    pub fn call_sp(&mut self, proc: &str, params: Vec<&dyn Value>) -> Result<VoltTable, VoltError> {
806        pool_trace!(
807            slot = self.idx,
808            procedure = proc,
809            "calling stored procedure"
810        );
811
812        let mut node_guard = self
813            .node
814            .lock()
815            .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
816
817        let node = node_guard
818            .as_mut()
819            .ok_or(VoltError::ConnectionNotAvailable)?;
820
821        let result = node
822            .call_sp(proc, params)
823            .and_then(|r| block_for_result(&r));
824        drop(node_guard);
825
826        self.handle_result(&result);
827        result
828    }
829
830    /// Upload a JAR file.
831    pub fn upload_jar(&mut self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
832        pool_trace!(slot = self.idx, size = bs.len(), "uploading jar");
833
834        let mut node_guard = self
835            .node
836            .lock()
837            .map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
838
839        let node = node_guard
840            .as_mut()
841            .ok_or(VoltError::ConnectionNotAvailable)?;
842
843        let result = node.upload_jar(bs).and_then(|r| block_for_result(&r));
844        drop(node_guard);
845
846        self.handle_result(&result);
847        result
848    }
849
850    /// Handle operation result - update state and trigger reconnection if needed
851    fn handle_result<T>(&self, result: &Result<T, VoltError>) {
852        match result {
853            Ok(_) => {
854                self.pool.mark_success(self.idx);
855            }
856            Err(e) if e.is_connection_fatal() => {
857                pool_error!(slot = self.idx, error = ?e, "fatal connection error detected");
858                // Clear the node and trigger reconnection
859                if let Ok(mut guard) = self.node.lock() {
860                    *guard = None;
861                }
862                self.pool.report_fatal_error(self.idx);
863            }
864            Err(_) => {
865                // Non-fatal error, no action needed
866            }
867        }
868    }
869
870    /// Get the slot index of this connection (for debugging).
871    pub fn slot_index(&self) -> usize {
872        self.idx
873    }
874}
875
876// No Drop implementation needed - VoltDB connections are shared and
877// don't need to be "returned" to the pool. The Arc<Node> handles cleanup.
878
879// ============================================================================
880// Tests
881// ============================================================================
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886
887    #[test]
888    fn test_pool_config_builder() {
889        let config = PoolConfig::new()
890            .size(20)
891            .reconnect_backoff(Duration::from_secs(10))
892            .circuit_open_duration(Duration::from_secs(60))
893            .exhaustion_policy(ExhaustionPolicy::Block {
894                timeout: Duration::from_secs(5),
895            })
896            .validation_mode(ValidationMode::BestEffort)
897            .circuit_failure_threshold(5)
898            .shutdown_timeout(Duration::from_secs(60));
899
900        assert_eq!(config.size, 20);
901        assert_eq!(config.reconnect_backoff, Duration::from_secs(10));
902        assert_eq!(config.circuit_open_duration, Duration::from_secs(60));
903        assert_eq!(
904            config.exhaustion_policy,
905            ExhaustionPolicy::Block {
906                timeout: Duration::from_secs(5)
907            }
908        );
909        assert_eq!(config.validation_mode, ValidationMode::BestEffort);
910        assert_eq!(config.circuit_failure_threshold, 5);
911        assert_eq!(config.shutdown_timeout, Duration::from_secs(60));
912    }
913
914    #[test]
915    fn test_pool_config_default() {
916        let config = PoolConfig::default();
917        assert_eq!(config.size, 10);
918        assert_eq!(config.exhaustion_policy, ExhaustionPolicy::FailFast);
919        assert_eq!(config.validation_mode, ValidationMode::FailFast);
920    }
921
922    #[test]
923    fn test_slot_meta_is_available() {
924        // Healthy slot is available
925        let slot = SlotMeta::new_healthy(0);
926        assert!(slot.is_available());
927
928        // Unhealthy slot is not available
929        let slot = SlotMeta::new_unhealthy(0);
930        assert!(!slot.is_available());
931
932        // Healthy slot with open circuit is not available
933        let mut slot = SlotMeta::new_healthy(0);
934        slot.circuit = Circuit::Open {
935            until: Instant::now() + Duration::from_secs(60),
936        };
937        assert!(!slot.is_available());
938    }
939
940    #[test]
941    fn test_slot_meta_needs_reconnect() {
942        let mut slot = SlotMeta::new_unhealthy(0);
943        let backoff = Duration::from_millis(100);
944
945        // Unhealthy slot needs reconnect
946        assert!(slot.needs_reconnect(backoff));
947
948        // After marking reconnecting, doesn't need it
949        slot.state = ConnState::Reconnecting;
950        assert!(!slot.needs_reconnect(backoff));
951
952        // Healthy slot doesn't need reconnect
953        slot.state = ConnState::Healthy;
954        assert!(!slot.needs_reconnect(backoff));
955    }
956
957    #[test]
958    fn test_slot_meta_record_success() {
959        let mut slot = SlotMeta::new_unhealthy(0);
960        slot.consecutive_failures = 5;
961
962        slot.record_success();
963
964        assert_eq!(slot.consecutive_failures, 0);
965        assert!(matches!(slot.state, ConnState::Healthy));
966        assert!(matches!(slot.circuit, Circuit::Closed));
967    }
968
969    #[test]
970    fn test_slot_meta_record_failure_opens_circuit() {
971        let mut slot = SlotMeta::new_healthy(0);
972        slot.consecutive_failures = 2;
973
974        let config = PoolConfig::default().circuit_failure_threshold(3);
975
976        slot.record_failure(&config);
977
978        assert_eq!(slot.consecutive_failures, 3);
979        assert!(matches!(slot.circuit, Circuit::Open { .. }));
980    }
981}