Skip to main content

voltdb_client_rust/
pool_core.rs

1//! Shared pool components for sync and async connection pools.
2//!
3//! This module contains types and traits that are shared between
4//! `Pool` (sync) and `AsyncPool` (async) implementations.
5
6use std::fmt;
7use std::time::{Duration, Instant};
8
9use crate::encode::VoltError;
10
11// ============================================================================
12// Pool-specific errors
13// ============================================================================
14
15/// Pool-specific error conditions.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum PoolError {
18    /// Pool is shutting down, no new connections allowed
19    PoolShutdown,
20    /// Circuit breaker is open for the requested connection
21    CircuitOpen,
22    /// All connections are busy or unhealthy
23    PoolExhausted,
24    /// Timed out waiting for a connection
25    Timeout,
26    /// Internal lock was poisoned
27    LockPoisoned,
28}
29
30impl fmt::Display for PoolError {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            PoolError::PoolShutdown => write!(f, "Pool is shutting down"),
34            PoolError::CircuitOpen => write!(f, "Circuit breaker is open"),
35            PoolError::PoolExhausted => write!(f, "Pool exhausted, no healthy connections"),
36            PoolError::Timeout => write!(f, "Timed out waiting for connection"),
37            PoolError::LockPoisoned => write!(f, "Internal lock poisoned"),
38        }
39    }
40}
41
42impl std::error::Error for PoolError {}
43
44impl From<PoolError> for VoltError {
45    fn from(e: PoolError) -> Self {
46        match e {
47            PoolError::PoolShutdown => VoltError::ConnectionNotAvailable,
48            PoolError::CircuitOpen => VoltError::ConnectionNotAvailable,
49            PoolError::PoolExhausted => VoltError::ConnectionNotAvailable,
50            PoolError::Timeout => VoltError::Timeout,
51            PoolError::LockPoisoned => VoltError::PoisonError("Pool lock poisoned".to_string()),
52        }
53    }
54}
55
56// ============================================================================
57// Connection State Machine
58// ============================================================================
59
60/// Connection health state.
61#[derive(Debug, Clone)]
62pub enum ConnState {
63    /// Connection is working normally
64    Healthy,
65    /// Connection has failed, tracking when it became unhealthy
66    Unhealthy { since: Instant },
67    /// Connection is being replaced (reconnection in progress)
68    Reconnecting,
69}
70
71impl ConnState {
72    /// Check if the connection is in a healthy state.
73    pub fn is_healthy(&self) -> bool {
74        matches!(self, ConnState::Healthy)
75    }
76
77    /// Check if the connection is currently reconnecting.
78    pub fn is_reconnecting(&self) -> bool {
79        matches!(self, ConnState::Reconnecting)
80    }
81}
82
83// ============================================================================
84// Circuit Breaker
85// ============================================================================
86
87/// Per-connection circuit breaker state.
88#[derive(Debug, Clone)]
89pub enum Circuit {
90    /// Normal operation - requests flow through
91    Closed,
92    /// Circuit is open - fail fast until `until` time
93    Open { until: Instant },
94    /// Allow one probe request to test recovery
95    HalfOpen,
96}
97
98impl Circuit {
99    /// Check if request should be allowed through.
100    pub fn should_allow(&self) -> bool {
101        match self {
102            Circuit::Closed => true,
103            Circuit::Open { until } => Instant::now() >= *until,
104            Circuit::HalfOpen => true,
105        }
106    }
107
108    /// Transition to Open state.
109    pub fn open(&mut self, duration: Duration) {
110        *self = Circuit::Open {
111            until: Instant::now() + duration,
112        };
113    }
114
115    /// Transition to HalfOpen state (for probing).
116    #[allow(dead_code)]
117    pub fn half_open(&mut self) {
118        *self = Circuit::HalfOpen;
119    }
120
121    /// Transition to Closed state (healthy).
122    pub fn close(&mut self) {
123        *self = Circuit::Closed;
124    }
125}
126
127// ============================================================================
128// Configuration
129// ============================================================================
130
131/// What to do when all connections are busy.
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
133pub enum ExhaustionPolicy {
134    /// Return error immediately
135    #[default]
136    FailFast,
137    /// Block up to the specified duration waiting for a connection
138    Block { timeout: Duration },
139}
140
141/// How to handle startup connection failures.
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
143pub enum ValidationMode {
144    /// Panic if any connection fails during pool creation
145    #[default]
146    FailFast,
147    /// Mark failed connections as unhealthy, continue startup
148    BestEffort,
149}
150
151// ============================================================================
152// Pool Phase (Lifecycle)
153// ============================================================================
154
155/// Pool lifecycle phase for graceful shutdown.
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum PoolPhase {
158    /// Normal operation
159    Running,
160    /// Fully shut down
161    Shutdown,
162}
163
164// ============================================================================
165// Pool Statistics
166// ============================================================================
167
168/// Pool statistics snapshot.
169#[derive(Debug, Clone)]
170pub struct PoolStats {
171    /// Total number of slots in the pool
172    pub size: usize,
173    /// Number of healthy connections
174    pub healthy: usize,
175    /// Total number of requests made
176    pub total_requests: usize,
177    /// Whether the pool is shut down
178    pub is_shutdown: bool,
179}
180
181// ============================================================================
182// Logging Macros (for use by pool implementations)
183// ============================================================================
184
185/// Generate logging macros for a pool module.
186///
187/// Usage: `define_pool_logging_macros!(pool);` generates `pool_trace!`, `pool_debug!`, etc.
188#[macro_export]
189macro_rules! define_pool_logging_macros {
190    ($prefix:ident) => {
191        #[cfg(feature = "tracing")]
192        macro_rules! $prefix_trace {
193            ($($arg:tt)*) => { tracing::trace!($($arg)*) };
194        }
195        #[cfg(not(feature = "tracing"))]
196        macro_rules! $prefix_trace {
197            ($($arg:tt)*) => {};
198        }
199
200        #[cfg(feature = "tracing")]
201        macro_rules! $prefix_debug {
202            ($($arg:tt)*) => { tracing::debug!($($arg)*) };
203        }
204        #[cfg(not(feature = "tracing"))]
205        macro_rules! $prefix_debug {
206            ($($arg:tt)*) => {};
207        }
208
209        #[cfg(feature = "tracing")]
210        macro_rules! $prefix_info {
211            ($($arg:tt)*) => { tracing::info!($($arg)*) };
212        }
213        #[cfg(not(feature = "tracing"))]
214        macro_rules! $prefix_info {
215            ($($arg:tt)*) => {};
216        }
217
218        #[cfg(feature = "tracing")]
219        macro_rules! $prefix_warn {
220            ($($arg:tt)*) => { tracing::warn!($($arg)*) };
221        }
222        #[cfg(not(feature = "tracing"))]
223        macro_rules! $prefix_warn {
224            ($($arg:tt)*) => {};
225        }
226
227        #[cfg(feature = "tracing")]
228        macro_rules! $prefix_error {
229            ($($arg:tt)*) => { tracing::error!($($arg)*) };
230        }
231        #[cfg(not(feature = "tracing"))]
232        macro_rules! $prefix_error {
233            ($($arg:tt)*) => {};
234        }
235    };
236}
237
238// ============================================================================
239// Tests
240// ============================================================================
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    #[test]
247    fn test_conn_state_is_healthy() {
248        assert!(ConnState::Healthy.is_healthy());
249        assert!(
250            !ConnState::Unhealthy {
251                since: Instant::now()
252            }
253            .is_healthy()
254        );
255        assert!(!ConnState::Reconnecting.is_healthy());
256    }
257
258    #[test]
259    fn test_conn_state_is_reconnecting() {
260        assert!(!ConnState::Healthy.is_reconnecting());
261        assert!(
262            !ConnState::Unhealthy {
263                since: Instant::now()
264            }
265            .is_reconnecting()
266        );
267        assert!(ConnState::Reconnecting.is_reconnecting());
268    }
269
270    #[test]
271    fn test_circuit_should_allow_closed() {
272        let circuit = Circuit::Closed;
273        assert!(circuit.should_allow());
274    }
275
276    #[test]
277    fn test_circuit_should_allow_open_not_expired() {
278        let circuit = Circuit::Open {
279            until: Instant::now() + Duration::from_secs(60),
280        };
281        assert!(!circuit.should_allow());
282    }
283
284    #[test]
285    fn test_circuit_should_allow_open_expired() {
286        let circuit = Circuit::Open {
287            until: Instant::now() - Duration::from_secs(1),
288        };
289        assert!(circuit.should_allow());
290    }
291
292    #[test]
293    fn test_circuit_should_allow_half_open() {
294        let circuit = Circuit::HalfOpen;
295        assert!(circuit.should_allow());
296    }
297
298    #[test]
299    fn test_circuit_transitions() {
300        let mut circuit = Circuit::Closed;
301
302        circuit.open(Duration::from_secs(30));
303        assert!(matches!(circuit, Circuit::Open { .. }));
304
305        circuit.half_open();
306        assert!(matches!(circuit, Circuit::HalfOpen));
307
308        circuit.close();
309        assert!(matches!(circuit, Circuit::Closed));
310    }
311
312    #[test]
313    fn test_exhaustion_policy_default() {
314        let policy = ExhaustionPolicy::default();
315        assert_eq!(policy, ExhaustionPolicy::FailFast);
316    }
317
318    #[test]
319    fn test_validation_mode_default() {
320        let mode = ValidationMode::default();
321        assert_eq!(mode, ValidationMode::FailFast);
322    }
323
324    #[test]
325    fn test_pool_stats() {
326        let stats = PoolStats {
327            size: 10,
328            healthy: 8,
329            total_requests: 100,
330            is_shutdown: false,
331        };
332
333        assert_eq!(stats.size, 10);
334        assert_eq!(stats.healthy, 8);
335        assert_eq!(stats.total_requests, 100);
336        assert!(!stats.is_shutdown);
337    }
338
339    #[test]
340    fn test_pool_error_display() {
341        assert_eq!(
342            format!("{}", PoolError::PoolShutdown),
343            "Pool is shutting down"
344        );
345        assert_eq!(
346            format!("{}", PoolError::CircuitOpen),
347            "Circuit breaker is open"
348        );
349        assert_eq!(
350            format!("{}", PoolError::PoolExhausted),
351            "Pool exhausted, no healthy connections"
352        );
353        assert_eq!(
354            format!("{}", PoolError::Timeout),
355            "Timed out waiting for connection"
356        );
357    }
358
359    #[test]
360    fn test_pool_phase() {
361        assert_eq!(PoolPhase::Running, PoolPhase::Running);
362        assert_ne!(PoolPhase::Running, PoolPhase::Shutdown);
363    }
364
365    #[test]
366    fn test_pool_error_into_volt_error() {
367        let err: VoltError = PoolError::PoolShutdown.into();
368        assert!(matches!(err, VoltError::ConnectionNotAvailable));
369
370        let err: VoltError = PoolError::Timeout.into();
371        assert!(matches!(err, VoltError::Timeout));
372    }
373}