1use crate::circuit_breaker::CircuitBreakerConfig;
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::time::{Duration, Instant};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct PoolConfig {
14 pub min_connections: usize,
15 pub max_connections: usize,
16 pub connection_timeout: Duration,
17 pub idle_timeout: Duration,
18 pub max_lifetime: Duration,
19 pub health_check_interval: Duration,
20 pub retry_attempts: u32,
21 pub adaptive_sizing: bool,
23 pub target_response_time_ms: u64,
25 pub load_balancing: LoadBalancingStrategy,
27 pub enable_circuit_breaker: bool,
29 pub circuit_breaker_config: Option<CircuitBreakerConfig>,
31 pub enable_metrics: bool,
33 pub validation_timeout: Duration,
35 pub acquire_timeout: Duration,
37}
38
39impl Default for PoolConfig {
40 fn default() -> Self {
41 Self {
42 min_connections: 1,
43 max_connections: 10,
44 connection_timeout: Duration::from_secs(30),
45 idle_timeout: Duration::from_secs(300),
46 max_lifetime: Duration::from_secs(1800),
47 health_check_interval: Duration::from_secs(60),
48 retry_attempts: 3,
49 adaptive_sizing: true,
50 target_response_time_ms: 100,
51 load_balancing: LoadBalancingStrategy::RoundRobin,
52 enable_circuit_breaker: true,
53 circuit_breaker_config: Some(CircuitBreakerConfig::default()),
54 enable_metrics: true,
55 validation_timeout: Duration::from_secs(5),
56 acquire_timeout: Duration::from_secs(30),
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63pub enum LoadBalancingStrategy {
64 RoundRobin,
66 LeastRecentlyUsed,
68 Random,
70 LeastConnections,
72 WeightedRoundRobin,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct PoolStatus {
79 pub total_connections: usize,
80 pub active_connections: usize,
81 pub idle_connections: usize,
82 pub pending_requests: usize,
83 pub is_healthy: bool,
84 #[serde(skip)]
85 pub last_health_check: Option<Instant>,
86 pub utilization_percent: f64,
88 pub avg_response_time_ms: f64,
90 pub load_balancing_strategy: LoadBalancingStrategy,
92 pub circuit_breaker_open: bool,
94 pub config_hash: u64,
96}
97
98#[derive(Debug, Default, Clone)]
100pub struct PoolStats {
101 pub total_created: u64,
102 pub total_destroyed: u64,
103 pub total_borrowed: u64,
104 pub total_returned: u64,
105 pub creation_failures: u64,
106 pub health_check_failures: u64,
107 pub timeouts: u64,
108 pub circuit_breaker_failures: u64,
109 pub adaptive_scaling_events: u64,
110 pub load_balancing_decisions: u64,
111 pub failover_count: u64,
112}
113
114#[derive(Debug, Clone)]
116pub(crate) struct PoolMetrics {
117 pub(crate) current_size: usize,
118 pub(crate) peak_size: usize,
119 pub(crate) total_requests: u64,
120 pub(crate) avg_wait_time_ms: f64,
121 pub(crate) utilization_history: VecDeque<(Instant, f64)>,
122 pub(crate) response_time_p50: Duration,
123 pub(crate) response_time_p95: Duration,
124 pub(crate) response_time_p99: Duration,
125 pub(crate) error_rates: HashMap<String, f64>,
126 pub(crate) last_updated: Instant,
127}
128
129impl Default for PoolMetrics {
130 fn default() -> Self {
131 Self {
132 current_size: 0,
133 peak_size: 0,
134 total_requests: 0,
135 avg_wait_time_ms: 0.0,
136 utilization_history: VecDeque::new(),
137 response_time_p50: Duration::ZERO,
138 response_time_p95: Duration::ZERO,
139 response_time_p99: Duration::ZERO,
140 error_rates: HashMap::new(),
141 last_updated: Instant::now(),
142 }
143 }
144}
145
146#[derive(Debug, Clone)]
148pub(crate) struct AdaptiveController {
149 pub(crate) enabled: bool,
150 pub(crate) target_response_time: Duration,
151 pub(crate) last_adjustment: Instant,
152 pub(crate) adjustment_cooldown: Duration,
153 pub(crate) current_target_size: usize,
154 pub(crate) response_time_samples: VecDeque<Duration>,
155 pub(crate) utilization_samples: VecDeque<f64>,
156}
157
158impl Default for AdaptiveController {
159 fn default() -> Self {
160 Self {
161 enabled: false,
162 target_response_time: Duration::from_millis(100),
163 last_adjustment: Instant::now(),
164 adjustment_cooldown: Duration::from_secs(60),
165 current_target_size: 1,
166 response_time_samples: VecDeque::with_capacity(100),
167 utilization_samples: VecDeque::with_capacity(100),
168 }
169 }
170}
171
172impl AdaptiveController {
173 pub(crate) fn should_scale_up(
174 &self,
175 _current_size: usize,
176 avg_response_time: Duration,
177 utilization: f64,
178 ) -> bool {
179 if !self.enabled || self.last_adjustment.elapsed() < self.adjustment_cooldown {
180 return false;
181 }
182 avg_response_time > self.target_response_time && utilization > 0.8
183 }
184
185 pub(crate) fn should_scale_down(
186 &self,
187 current_size: usize,
188 avg_response_time: Duration,
189 utilization: f64,
190 ) -> bool {
191 if !self.enabled
192 || self.last_adjustment.elapsed() < self.adjustment_cooldown
193 || current_size <= 1
194 {
195 return false;
196 }
197 avg_response_time < self.target_response_time / 2 && utilization < 0.3
198 }
199
200 pub(crate) fn record_metrics(&mut self, response_time: Duration, utilization: f64) {
201 self.response_time_samples.push_back(response_time);
202 if self.response_time_samples.len() > 100 {
203 self.response_time_samples.pop_front();
204 }
205 self.utilization_samples.push_back(utilization);
206 if self.utilization_samples.len() > 100 {
207 self.utilization_samples.pop_front();
208 }
209 }
210}
211
212#[async_trait::async_trait]
214pub trait PooledConnection: Send + Sync + 'static {
215 async fn is_healthy(&self) -> bool;
216 async fn close(&mut self) -> anyhow::Result<()>;
217 fn created_at(&self) -> Instant;
218 fn last_activity(&self) -> Instant;
219 fn update_activity(&mut self);
220 fn clone_connection(&self) -> Box<dyn PooledConnection>;
221}
222
223#[async_trait::async_trait]
225impl PooledConnection for Box<dyn PooledConnection> {
226 async fn is_healthy(&self) -> bool {
227 self.as_ref().is_healthy().await
228 }
229 async fn close(&mut self) -> anyhow::Result<()> {
230 self.as_mut().close().await
231 }
232 fn created_at(&self) -> Instant {
233 self.as_ref().created_at()
234 }
235 fn last_activity(&self) -> Instant {
236 self.as_ref().last_activity()
237 }
238 fn update_activity(&mut self) {
239 self.as_mut().update_activity()
240 }
241 fn clone_connection(&self) -> Box<dyn PooledConnection> {
242 self.as_ref().clone_connection()
243 }
244}
245
246#[async_trait::async_trait]
248pub trait ConnectionFactory<T: PooledConnection + Clone>: Send + Sync {
249 async fn create_connection(&self) -> anyhow::Result<T>;
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct DetailedPoolMetrics {
255 pub status: PoolStatus,
256 pub total_requests: u64,
257 pub peak_size: usize,
258 pub avg_wait_time_ms: f64,
259 #[serde(skip)]
260 pub response_time_p50: Duration,
261 #[serde(skip)]
262 pub response_time_p95: Duration,
263 #[serde(skip)]
264 pub response_time_p99: Duration,
265 pub adaptive_scaling_events: u64,
266 pub circuit_breaker_failures: u64,
267 pub load_balancing_decisions: u64,
268 pub current_target_size: usize,
269 #[serde(skip)]
270 pub pool_uptime: Duration,
271}
272
273pub(crate) struct PooledConnectionWrapper<T: PooledConnection> {
275 pub(crate) connection: T,
276 pub(crate) created_at: Instant,
277 pub(crate) last_activity: Instant,
278 pub(crate) is_in_use: bool,
279 pub(crate) connection_id: String,
280 pub(crate) usage_count: u64,
281 pub(crate) total_execution_time: Duration,
282 pub(crate) avg_response_time: Duration,
283 pub(crate) failure_count: u32,
284 pub(crate) last_health_check: Option<(Instant, bool)>,
285 pub(crate) weight: f64,
286}
287
288impl<T: PooledConnection> PooledConnectionWrapper<T> {
289 pub(crate) fn new(connection: T) -> Self {
290 let now = Instant::now();
291 Self {
292 connection,
293 created_at: now,
294 last_activity: now,
295 is_in_use: false,
296 connection_id: uuid::Uuid::new_v4().to_string(),
297 usage_count: 0,
298 total_execution_time: Duration::ZERO,
299 avg_response_time: Duration::from_millis(50),
300 failure_count: 0,
301 last_health_check: None,
302 weight: 1.0,
303 }
304 }
305
306 pub(crate) fn record_usage(&mut self, execution_time: Duration, success: bool) {
307 self.usage_count += 1;
308 self.last_activity = Instant::now();
309 self.total_execution_time += execution_time;
310
311 let alpha = 0.1;
312 let new_time_ms = execution_time.as_millis() as f64;
313 let current_avg_ms = self.avg_response_time.as_millis() as f64;
314 let updated_avg_ms = alpha * new_time_ms + (1.0 - alpha) * current_avg_ms;
315 self.avg_response_time = Duration::from_millis(updated_avg_ms as u64);
316
317 if !success {
318 self.failure_count += 1;
319 self.weight = (self.weight * 0.9).max(0.1);
320 } else if self.failure_count > 0 {
321 self.weight = (self.weight * 1.01).min(1.0);
322 }
323 }
324
325 pub(crate) fn efficiency_score(&self) -> f64 {
326 if self.usage_count == 0 {
327 return 1.0;
328 }
329 let failure_rate = self.failure_count as f64 / self.usage_count as f64;
330 let response_time_penalty = (self.avg_response_time.as_millis() as f64).ln() / 10.0;
331 (1.0 - failure_rate) * self.weight / (1.0 + response_time_penalty)
332 }
333
334 pub(crate) fn is_expired(&self, max_lifetime: Duration, idle_timeout: Duration) -> bool {
335 let now = Instant::now();
336 now.duration_since(self.created_at) > max_lifetime
337 || (!self.is_in_use && now.duration_since(self.last_activity) > idle_timeout)
338 }
339
340 pub(crate) async fn is_healthy(&self) -> bool {
341 self.connection.is_healthy().await
342 }
343}