Skip to main content

oxirs_stream/
connection_pool_types.rs

1//! Connection Pool — Type definitions
2//!
3//! Pool configuration, connection state types, health check types,
4//! load balancing strategies, and pool status/metrics structures.
5
6use crate::circuit_breaker::CircuitBreakerConfig;
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::time::{Duration, Instant};
10
11/// Connection pool configuration with advanced features
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct PoolConfig {
14    pub min_connections: usize,
15    pub max_connections: usize,
16    pub connection_timeout: Duration,
17    pub idle_timeout: Duration,
18    pub max_lifetime: Duration,
19    pub health_check_interval: Duration,
20    pub retry_attempts: u32,
21    /// Enable adaptive pool sizing based on load
22    pub adaptive_sizing: bool,
23    /// Target response time for adaptive sizing (milliseconds)
24    pub target_response_time_ms: u64,
25    /// Load balancing strategy for connection distribution
26    pub load_balancing: LoadBalancingStrategy,
27    /// Enable circuit breaker for connection failures
28    pub enable_circuit_breaker: bool,
29    /// Circuit breaker configuration
30    pub circuit_breaker_config: Option<CircuitBreakerConfig>,
31    /// Enable comprehensive metrics collection
32    pub enable_metrics: bool,
33    /// Connection validation timeout
34    pub validation_timeout: Duration,
35    /// Maximum wait time for acquiring a connection
36    pub acquire_timeout: Duration,
37}
38
39impl Default for PoolConfig {
40    fn default() -> Self {
41        Self {
42            min_connections: 1,
43            max_connections: 10,
44            connection_timeout: Duration::from_secs(30),
45            idle_timeout: Duration::from_secs(300),
46            max_lifetime: Duration::from_secs(1800),
47            health_check_interval: Duration::from_secs(60),
48            retry_attempts: 3,
49            adaptive_sizing: true,
50            target_response_time_ms: 100,
51            load_balancing: LoadBalancingStrategy::RoundRobin,
52            enable_circuit_breaker: true,
53            circuit_breaker_config: Some(CircuitBreakerConfig::default()),
54            enable_metrics: true,
55            validation_timeout: Duration::from_secs(5),
56            acquire_timeout: Duration::from_secs(30),
57        }
58    }
59}
60
61/// Load balancing strategies for connection distribution
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63pub enum LoadBalancingStrategy {
64    /// Round-robin selection
65    RoundRobin,
66    /// Least recently used
67    LeastRecentlyUsed,
68    /// Random selection
69    Random,
70    /// Least connections (best for varying load)
71    LeastConnections,
72    /// Weighted round-robin based on response times
73    WeightedRoundRobin,
74}
75
76/// Connection pool status with comprehensive metrics
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct PoolStatus {
79    pub total_connections: usize,
80    pub active_connections: usize,
81    pub idle_connections: usize,
82    pub pending_requests: usize,
83    pub is_healthy: bool,
84    #[serde(skip)]
85    pub last_health_check: Option<Instant>,
86    /// Current pool utilization percentage
87    pub utilization_percent: f64,
88    /// Average response time across all connections
89    pub avg_response_time_ms: f64,
90    /// Current load balancing strategy
91    pub load_balancing_strategy: LoadBalancingStrategy,
92    /// Circuit breaker status
93    pub circuit_breaker_open: bool,
94    /// Pool configuration hash for validation
95    pub config_hash: u64,
96}
97
98/// Pool statistics with enhanced metrics
99#[derive(Debug, Default, Clone)]
100pub struct PoolStats {
101    pub total_created: u64,
102    pub total_destroyed: u64,
103    pub total_borrowed: u64,
104    pub total_returned: u64,
105    pub creation_failures: u64,
106    pub health_check_failures: u64,
107    pub timeouts: u64,
108    pub circuit_breaker_failures: u64,
109    pub adaptive_scaling_events: u64,
110    pub load_balancing_decisions: u64,
111    pub failover_count: u64,
112}
113
114/// Comprehensive pool metrics for monitoring (crate-internal)
115#[derive(Debug, Clone)]
116pub(crate) struct PoolMetrics {
117    pub(crate) current_size: usize,
118    pub(crate) peak_size: usize,
119    pub(crate) total_requests: u64,
120    pub(crate) avg_wait_time_ms: f64,
121    pub(crate) utilization_history: VecDeque<(Instant, f64)>,
122    pub(crate) response_time_p50: Duration,
123    pub(crate) response_time_p95: Duration,
124    pub(crate) response_time_p99: Duration,
125    pub(crate) error_rates: HashMap<String, f64>,
126    pub(crate) last_updated: Instant,
127}
128
129impl Default for PoolMetrics {
130    fn default() -> Self {
131        Self {
132            current_size: 0,
133            peak_size: 0,
134            total_requests: 0,
135            avg_wait_time_ms: 0.0,
136            utilization_history: VecDeque::new(),
137            response_time_p50: Duration::ZERO,
138            response_time_p95: Duration::ZERO,
139            response_time_p99: Duration::ZERO,
140            error_rates: HashMap::new(),
141            last_updated: Instant::now(),
142        }
143    }
144}
145
146/// Adaptive sizing controller (crate-internal)
147#[derive(Debug, Clone)]
148pub(crate) struct AdaptiveController {
149    pub(crate) enabled: bool,
150    pub(crate) target_response_time: Duration,
151    pub(crate) last_adjustment: Instant,
152    pub(crate) adjustment_cooldown: Duration,
153    pub(crate) current_target_size: usize,
154    pub(crate) response_time_samples: VecDeque<Duration>,
155    pub(crate) utilization_samples: VecDeque<f64>,
156}
157
158impl Default for AdaptiveController {
159    fn default() -> Self {
160        Self {
161            enabled: false,
162            target_response_time: Duration::from_millis(100),
163            last_adjustment: Instant::now(),
164            adjustment_cooldown: Duration::from_secs(60),
165            current_target_size: 1,
166            response_time_samples: VecDeque::with_capacity(100),
167            utilization_samples: VecDeque::with_capacity(100),
168        }
169    }
170}
171
172impl AdaptiveController {
173    pub(crate) fn should_scale_up(
174        &self,
175        _current_size: usize,
176        avg_response_time: Duration,
177        utilization: f64,
178    ) -> bool {
179        if !self.enabled || self.last_adjustment.elapsed() < self.adjustment_cooldown {
180            return false;
181        }
182        avg_response_time > self.target_response_time && utilization > 0.8
183    }
184
185    pub(crate) fn should_scale_down(
186        &self,
187        current_size: usize,
188        avg_response_time: Duration,
189        utilization: f64,
190    ) -> bool {
191        if !self.enabled
192            || self.last_adjustment.elapsed() < self.adjustment_cooldown
193            || current_size <= 1
194        {
195            return false;
196        }
197        avg_response_time < self.target_response_time / 2 && utilization < 0.3
198    }
199
200    pub(crate) fn record_metrics(&mut self, response_time: Duration, utilization: f64) {
201        self.response_time_samples.push_back(response_time);
202        if self.response_time_samples.len() > 100 {
203            self.response_time_samples.pop_front();
204        }
205        self.utilization_samples.push_back(utilization);
206        if self.utilization_samples.len() > 100 {
207            self.utilization_samples.pop_front();
208        }
209    }
210}
211
212/// Generic connection trait
213#[async_trait::async_trait]
214pub trait PooledConnection: Send + Sync + 'static {
215    async fn is_healthy(&self) -> bool;
216    async fn close(&mut self) -> anyhow::Result<()>;
217    fn created_at(&self) -> Instant;
218    fn last_activity(&self) -> Instant;
219    fn update_activity(&mut self);
220    fn clone_connection(&self) -> Box<dyn PooledConnection>;
221}
222
223/// Implement PooledConnection for `Box<dyn PooledConnection>`
224#[async_trait::async_trait]
225impl PooledConnection for Box<dyn PooledConnection> {
226    async fn is_healthy(&self) -> bool {
227        self.as_ref().is_healthy().await
228    }
229    async fn close(&mut self) -> anyhow::Result<()> {
230        self.as_mut().close().await
231    }
232    fn created_at(&self) -> Instant {
233        self.as_ref().created_at()
234    }
235    fn last_activity(&self) -> Instant {
236        self.as_ref().last_activity()
237    }
238    fn update_activity(&mut self) {
239        self.as_mut().update_activity()
240    }
241    fn clone_connection(&self) -> Box<dyn PooledConnection> {
242        self.as_ref().clone_connection()
243    }
244}
245
246/// Connection factory trait
247#[async_trait::async_trait]
248pub trait ConnectionFactory<T: PooledConnection + Clone>: Send + Sync {
249    async fn create_connection(&self) -> anyhow::Result<T>;
250}
251
252/// Detailed pool metrics for comprehensive monitoring
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct DetailedPoolMetrics {
255    pub status: PoolStatus,
256    pub total_requests: u64,
257    pub peak_size: usize,
258    pub avg_wait_time_ms: f64,
259    #[serde(skip)]
260    pub response_time_p50: Duration,
261    #[serde(skip)]
262    pub response_time_p95: Duration,
263    #[serde(skip)]
264    pub response_time_p99: Duration,
265    pub adaptive_scaling_events: u64,
266    pub circuit_breaker_failures: u64,
267    pub load_balancing_decisions: u64,
268    pub current_target_size: usize,
269    #[serde(skip)]
270    pub pool_uptime: Duration,
271}
272
273/// Connection wrapper with comprehensive metadata and monitoring (crate-internal)
274pub(crate) struct PooledConnectionWrapper<T: PooledConnection> {
275    pub(crate) connection: T,
276    pub(crate) created_at: Instant,
277    pub(crate) last_activity: Instant,
278    pub(crate) is_in_use: bool,
279    pub(crate) connection_id: String,
280    pub(crate) usage_count: u64,
281    pub(crate) total_execution_time: Duration,
282    pub(crate) avg_response_time: Duration,
283    pub(crate) failure_count: u32,
284    pub(crate) last_health_check: Option<(Instant, bool)>,
285    pub(crate) weight: f64,
286}
287
288impl<T: PooledConnection> PooledConnectionWrapper<T> {
289    pub(crate) fn new(connection: T) -> Self {
290        let now = Instant::now();
291        Self {
292            connection,
293            created_at: now,
294            last_activity: now,
295            is_in_use: false,
296            connection_id: uuid::Uuid::new_v4().to_string(),
297            usage_count: 0,
298            total_execution_time: Duration::ZERO,
299            avg_response_time: Duration::from_millis(50),
300            failure_count: 0,
301            last_health_check: None,
302            weight: 1.0,
303        }
304    }
305
306    pub(crate) fn record_usage(&mut self, execution_time: Duration, success: bool) {
307        self.usage_count += 1;
308        self.last_activity = Instant::now();
309        self.total_execution_time += execution_time;
310
311        let alpha = 0.1;
312        let new_time_ms = execution_time.as_millis() as f64;
313        let current_avg_ms = self.avg_response_time.as_millis() as f64;
314        let updated_avg_ms = alpha * new_time_ms + (1.0 - alpha) * current_avg_ms;
315        self.avg_response_time = Duration::from_millis(updated_avg_ms as u64);
316
317        if !success {
318            self.failure_count += 1;
319            self.weight = (self.weight * 0.9).max(0.1);
320        } else if self.failure_count > 0 {
321            self.weight = (self.weight * 1.01).min(1.0);
322        }
323    }
324
325    pub(crate) fn efficiency_score(&self) -> f64 {
326        if self.usage_count == 0 {
327            return 1.0;
328        }
329        let failure_rate = self.failure_count as f64 / self.usage_count as f64;
330        let response_time_penalty = (self.avg_response_time.as_millis() as f64).ln() / 10.0;
331        (1.0 - failure_rate) * self.weight / (1.0 + response_time_penalty)
332    }
333
334    pub(crate) fn is_expired(&self, max_lifetime: Duration, idle_timeout: Duration) -> bool {
335        let now = Instant::now();
336        now.duration_since(self.created_at) > max_lifetime
337            || (!self.is_in_use && now.duration_since(self.last_activity) > idle_timeout)
338    }
339
340    pub(crate) async fn is_healthy(&self) -> bool {
341        self.connection.is_healthy().await
342    }
343}