Skip to main content

voltdb_client_rust/
pool_core.rs

1//! Shared pool components for sync and async connection pools.
2//!
3//! This module contains types and traits that are shared between
4//! `Pool` (sync) and `AsyncPool` (async) implementations.
5
6use std::time::{Duration, Instant};
7
8// ============================================================================
9// Connection State Machine
10// ============================================================================
11
12/// Connection health state.
13#[derive(Debug, Clone)]
14pub enum ConnState {
15    /// Connection is working normally
16    Healthy,
17    /// Connection has failed, tracking when it became unhealthy
18    Unhealthy { since: Instant },
19    /// Connection is being replaced (reconnection in progress)
20    Reconnecting,
21}
22
23impl ConnState {
24    /// Check if the connection is in a healthy state.
25    pub fn is_healthy(&self) -> bool {
26        matches!(self, ConnState::Healthy)
27    }
28
29    /// Check if the connection is currently reconnecting.
30    pub fn is_reconnecting(&self) -> bool {
31        matches!(self, ConnState::Reconnecting)
32    }
33}
34
35// ============================================================================
36// Circuit Breaker
37// ============================================================================
38
39/// Per-connection circuit breaker state.
40#[derive(Debug, Clone)]
41pub enum Circuit {
42    /// Normal operation - requests flow through
43    Closed,
44    /// Circuit is open - fail fast until `until` time
45    Open { until: Instant },
46    /// Allow one probe request to test recovery
47    HalfOpen,
48}
49
50impl Circuit {
51    /// Check if request should be allowed through.
52    pub fn should_allow(&self) -> bool {
53        match self {
54            Circuit::Closed => true,
55            Circuit::Open { until } => Instant::now() >= *until,
56            Circuit::HalfOpen => true,
57        }
58    }
59
60    /// Transition to Open state.
61    pub fn open(&mut self, duration: Duration) {
62        *self = Circuit::Open {
63            until: Instant::now() + duration,
64        };
65    }
66
67    /// Transition to HalfOpen state (for probing).
68    #[allow(dead_code)]
69    pub fn half_open(&mut self) {
70        *self = Circuit::HalfOpen;
71    }
72
73    /// Transition to Closed state (healthy).
74    pub fn close(&mut self) {
75        *self = Circuit::Closed;
76    }
77}
78
79// ============================================================================
80// Configuration
81// ============================================================================
82
83/// What to do when all connections are busy.
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub enum ExhaustionPolicy {
86    /// Return error immediately
87    #[default]
88    FailFast,
89    /// Block up to the specified duration waiting for a connection
90    Block { timeout: Duration },
91}
92
93/// How to handle startup connection failures.
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
95pub enum ValidationMode {
96    /// Panic if any connection fails during pool creation
97    #[default]
98    FailFast,
99    /// Mark failed connections as unhealthy, continue startup
100    BestEffort,
101}
102
103// ============================================================================
104// Pool Phase (Lifecycle)
105// ============================================================================
106
107/// Pool lifecycle phase for graceful shutdown.
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum PoolPhase {
110    /// Normal operation
111    Running,
112    /// Fully shut down
113    Shutdown,
114}
115
116// ============================================================================
117// Pool Statistics
118// ============================================================================
119
120/// Pool statistics snapshot.
121#[derive(Debug, Clone)]
122pub struct PoolStats {
123    /// Total number of slots in the pool
124    pub size: usize,
125    /// Number of healthy connections
126    pub healthy: usize,
127    /// Total number of requests made
128    pub total_requests: usize,
129    /// Whether the pool is shut down
130    pub is_shutdown: bool,
131}
132
133// ============================================================================
134// Logging Macros (for use by pool implementations)
135// ============================================================================
136
137/// Generate logging macros for a pool module.
138///
139/// Usage: `define_pool_logging_macros!(pool);` generates `pool_trace!`, `pool_debug!`, etc.
140#[macro_export]
141macro_rules! define_pool_logging_macros {
142    ($prefix:ident) => {
143        #[cfg(feature = "tracing")]
144        macro_rules! $prefix_trace {
145            ($($arg:tt)*) => { tracing::trace!($($arg)*) };
146        }
147        #[cfg(not(feature = "tracing"))]
148        macro_rules! $prefix_trace {
149            ($($arg:tt)*) => {};
150        }
151
152        #[cfg(feature = "tracing")]
153        macro_rules! $prefix_debug {
154            ($($arg:tt)*) => { tracing::debug!($($arg)*) };
155        }
156        #[cfg(not(feature = "tracing"))]
157        macro_rules! $prefix_debug {
158            ($($arg:tt)*) => {};
159        }
160
161        #[cfg(feature = "tracing")]
162        macro_rules! $prefix_info {
163            ($($arg:tt)*) => { tracing::info!($($arg)*) };
164        }
165        #[cfg(not(feature = "tracing"))]
166        macro_rules! $prefix_info {
167            ($($arg:tt)*) => {};
168        }
169
170        #[cfg(feature = "tracing")]
171        macro_rules! $prefix_warn {
172            ($($arg:tt)*) => { tracing::warn!($($arg)*) };
173        }
174        #[cfg(not(feature = "tracing"))]
175        macro_rules! $prefix_warn {
176            ($($arg:tt)*) => {};
177        }
178
179        #[cfg(feature = "tracing")]
180        macro_rules! $prefix_error {
181            ($($arg:tt)*) => { tracing::error!($($arg)*) };
182        }
183        #[cfg(not(feature = "tracing"))]
184        macro_rules! $prefix_error {
185            ($($arg:tt)*) => {};
186        }
187    };
188}
189
190// ============================================================================
191// Tests
192// ============================================================================
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197
198    #[test]
199    fn test_conn_state_is_healthy() {
200        assert!(ConnState::Healthy.is_healthy());
201        assert!(
202            !ConnState::Unhealthy {
203                since: Instant::now()
204            }
205            .is_healthy()
206        );
207        assert!(!ConnState::Reconnecting.is_healthy());
208    }
209
210    #[test]
211    fn test_conn_state_is_reconnecting() {
212        assert!(!ConnState::Healthy.is_reconnecting());
213        assert!(
214            !ConnState::Unhealthy {
215                since: Instant::now()
216            }
217            .is_reconnecting()
218        );
219        assert!(ConnState::Reconnecting.is_reconnecting());
220    }
221
222    #[test]
223    fn test_circuit_should_allow_closed() {
224        let circuit = Circuit::Closed;
225        assert!(circuit.should_allow());
226    }
227
228    #[test]
229    fn test_circuit_should_allow_open_not_expired() {
230        let circuit = Circuit::Open {
231            until: Instant::now() + Duration::from_secs(60),
232        };
233        assert!(!circuit.should_allow());
234    }
235
236    #[test]
237    fn test_circuit_should_allow_open_expired() {
238        let circuit = Circuit::Open {
239            until: Instant::now() - Duration::from_secs(1),
240        };
241        assert!(circuit.should_allow());
242    }
243
244    #[test]
245    fn test_circuit_should_allow_half_open() {
246        let circuit = Circuit::HalfOpen;
247        assert!(circuit.should_allow());
248    }
249
250    #[test]
251    fn test_circuit_transitions() {
252        let mut circuit = Circuit::Closed;
253
254        circuit.open(Duration::from_secs(30));
255        assert!(matches!(circuit, Circuit::Open { .. }));
256
257        circuit.half_open();
258        assert!(matches!(circuit, Circuit::HalfOpen));
259
260        circuit.close();
261        assert!(matches!(circuit, Circuit::Closed));
262    }
263
264    #[test]
265    fn test_exhaustion_policy_default() {
266        let policy = ExhaustionPolicy::default();
267        assert_eq!(policy, ExhaustionPolicy::FailFast);
268    }
269
270    #[test]
271    fn test_validation_mode_default() {
272        let mode = ValidationMode::default();
273        assert_eq!(mode, ValidationMode::FailFast);
274    }
275
276    #[test]
277    fn test_pool_stats() {
278        let stats = PoolStats {
279            size: 10,
280            healthy: 8,
281            total_requests: 100,
282            is_shutdown: false,
283        };
284
285        assert_eq!(stats.size, 10);
286        assert_eq!(stats.healthy, 8);
287        assert_eq!(stats.total_requests, 100);
288        assert!(!stats.is_shutdown);
289    }
290
291    #[test]
292    fn test_pool_phase() {
293        assert_eq!(PoolPhase::Running, PoolPhase::Running);
294        assert_ne!(PoolPhase::Running, PoolPhase::Shutdown);
295    }
296}