1use crate::{
7 circuit_breaker::{
8 new_shared_circuit_breaker, FailureType, SharedCircuitBreaker, SharedCircuitBreakerExt,
9 },
10 failover::{ConnectionEndpoint, FailoverConfig, FailoverManager},
11 health_monitor::{HealthCheckConfig, HealthMonitor},
12 reconnect::{ReconnectConfig, ReconnectManager, ReconnectStrategy},
13 StreamConfig,
14};
15use anyhow::{anyhow, Result};
16use fastrand;
17use std::collections::{HashMap, VecDeque};
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::{
21 atomic::{AtomicUsize, Ordering},
22 Arc,
23};
24use std::time::{Duration, Instant};
25use tokio::sync::{broadcast, Mutex, RwLock, Semaphore};
26use tracing::{debug, error, info, warn};
27
28use super::connection_pool_types::{
29 AdaptiveController, ConnectionFactory, DetailedPoolMetrics, LoadBalancingStrategy, PoolConfig,
30 PoolMetrics, PoolStats, PoolStatus, PooledConnection, PooledConnectionWrapper,
31};
32
33pub struct ConnectionPool<T: PooledConnection + Clone> {
35 pub(super) config: PoolConfig,
36 pub(super) connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
37 pub(super) active_count: Arc<Mutex<usize>>,
38 pub(super) semaphore: Arc<Semaphore>,
39 pub(super) stats: Arc<RwLock<PoolStats>>,
40 pub(super) connection_factory: Arc<dyn ConnectionFactory<T>>,
41 pub(super) circuit_breaker: Option<SharedCircuitBreaker>,
42 pub(super) round_robin_counter: Arc<AtomicUsize>,
43 pub(super) metrics: Arc<RwLock<PoolMetrics>>,
44 pub(super) pending_requests: Arc<AtomicUsize>,
45 pub(super) created_at: Instant,
46 pub(super) adaptive_controller: Arc<RwLock<AdaptiveController>>,
47 pub(super) health_monitor: Arc<HealthMonitor<T>>,
48 pub(super) reconnect_manager: Arc<ReconnectManager<T>>,
49 pub(super) failover_manager: Option<Arc<FailoverManager<T>>>,
50}
51
52impl<T: PooledConnection + Clone> ConnectionPool<T> {
53 pub async fn new(config: PoolConfig, factory: Arc<dyn ConnectionFactory<T>>) -> Result<Self> {
55 let circuit_breaker = if config.enable_circuit_breaker {
56 Some(new_shared_circuit_breaker(
57 config.circuit_breaker_config.clone().unwrap_or_default(),
58 ))
59 } else {
60 None
61 };
62
63 let adaptive_controller = AdaptiveController {
64 enabled: config.adaptive_sizing,
65 target_response_time: Duration::from_millis(config.target_response_time_ms),
66 current_target_size: config.min_connections,
67 ..Default::default()
68 };
69
70 let health_check_config = HealthCheckConfig {
71 check_interval: config.health_check_interval,
72 check_timeout: config.validation_timeout,
73 enable_statistics: config.enable_metrics,
74 ..Default::default()
75 };
76 let health_monitor = Arc::new(HealthMonitor::new(health_check_config));
77
78 let reconnect_config = ReconnectConfig {
79 initial_delay: Duration::from_millis(100),
80 max_delay: Duration::from_secs(30),
81 max_attempts: config.retry_attempts,
82 connection_timeout: config.connection_timeout,
83 ..Default::default()
84 };
85 let reconnect_manager = Arc::new(ReconnectManager::new(
86 reconnect_config,
87 ReconnectStrategy::ExponentialBackoff,
88 ));
89
90 let pool = Self {
91 semaphore: Arc::new(Semaphore::new(config.max_connections)),
92 connections: Arc::new(Mutex::new(VecDeque::new())),
93 active_count: Arc::new(Mutex::new(0)),
94 stats: Arc::new(RwLock::new(PoolStats::default())),
95 connection_factory: factory,
96 circuit_breaker,
97 round_robin_counter: Arc::new(AtomicUsize::new(0)),
98 metrics: Arc::new(RwLock::new(PoolMetrics::default())),
99 pending_requests: Arc::new(AtomicUsize::new(0)),
100 created_at: Instant::now(),
101 adaptive_controller: Arc::new(RwLock::new(adaptive_controller)),
102 health_monitor,
103 reconnect_manager,
104 failover_manager: None,
105 config,
106 };
107
108 pool.ensure_min_connections().await?;
109 pool.start_maintenance_task().await;
110
111 if pool.config.adaptive_sizing {
112 pool.start_adaptive_sizing_task().await;
113 }
114
115 pool.start_health_monitoring().await;
116
117 info!(
118 "Created advanced connection pool with health monitoring, automatic reconnection, and {} features",
119 if pool.circuit_breaker.is_some() { "circuit breaker" } else { "standard" }
120 );
121
122 Ok(pool)
123 }
124
125 pub async fn get_connection(&self) -> Result<PooledConnectionHandle<T>> {
127 let start_time = Instant::now();
128 self.pending_requests.fetch_add(1, Ordering::Relaxed);
129
130 if let Some(cb) = &self.circuit_breaker {
131 if !cb.can_execute().await {
132 self.pending_requests.fetch_sub(1, Ordering::Relaxed);
133 return Err(anyhow!(
134 "Circuit breaker is open - connection pool unavailable"
135 ));
136 }
137 }
138
139 let _permit = tokio::time::timeout(self.config.acquire_timeout, self.semaphore.acquire())
140 .await
141 .map_err(|_| anyhow!("Timeout acquiring connection from pool"))?
142 .map_err(|_| anyhow!("Failed to acquire semaphore permit"))?;
143
144 let connection = match self.try_get_existing_connection_with_lb().await {
145 Some(conn) => {
146 if let Some(cb) = &self.circuit_breaker {
147 cb.record_success_with_duration(start_time.elapsed()).await;
148 }
149 conn
150 }
151 None => match self.create_new_connection().await {
152 Ok(conn) => {
153 if let Some(cb) = &self.circuit_breaker {
154 cb.record_success_with_duration(start_time.elapsed()).await;
155 }
156 conn
157 }
158 Err(e) => {
159 if let Some(cb) = &self.circuit_breaker {
160 cb.record_failure_with_type(FailureType::NetworkError).await;
161 }
162 self.pending_requests.fetch_sub(1, Ordering::Relaxed);
163 return Err(e);
164 }
165 },
166 };
167
168 *self.active_count.lock().await += 1;
169 let mut stats = self.stats.write().await;
170 stats.total_borrowed += 1;
171 stats.load_balancing_decisions += 1;
172 drop(stats);
173
174 let wait_time = start_time.elapsed();
175 self.update_metrics(wait_time).await;
176 self.pending_requests.fetch_sub(1, Ordering::Relaxed);
177
178 Ok(PooledConnectionHandle::new(
179 connection,
180 self.connections.clone(),
181 self.active_count.clone(),
182 self.stats.clone(),
183 self.metrics.clone(),
184 self.adaptive_controller.clone(),
185 ))
186 }
187
188 async fn try_get_existing_connection_with_lb(&self) -> Option<T> {
190 let mut connections = self.connections.lock().await;
191
192 if connections.is_empty() {
193 return None;
194 }
195
196 let selected_index = match self.config.load_balancing {
197 LoadBalancingStrategy::RoundRobin => {
198 self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % connections.len()
199 }
200 LoadBalancingStrategy::Random => fastrand::usize(..connections.len()),
201 LoadBalancingStrategy::LeastRecentlyUsed => connections
202 .iter()
203 .enumerate()
204 .min_by_key(|(_, wrapper)| wrapper.last_activity)
205 .map(|(idx, _)| idx)
206 .unwrap_or(0),
207 LoadBalancingStrategy::LeastConnections => connections
208 .iter()
209 .enumerate()
210 .min_by_key(|(_, wrapper)| wrapper.usage_count)
211 .map(|(idx, _)| idx)
212 .unwrap_or(0),
213 LoadBalancingStrategy::WeightedRoundRobin => connections
214 .iter()
215 .enumerate()
216 .max_by(|(_, a), (_, b)| {
217 a.efficiency_score()
218 .partial_cmp(&b.efficiency_score())
219 .unwrap_or(std::cmp::Ordering::Equal)
220 })
221 .map(|(idx, _)| idx)
222 .unwrap_or(0),
223 };
224
225 for attempt in 0..connections.len() {
226 let index = (selected_index + attempt) % connections.len();
227
228 if let Some(mut wrapper) = connections.remove(index) {
229 if wrapper.is_expired(self.config.max_lifetime, self.config.idle_timeout) {
230 if let Err(e) = wrapper.connection.close().await {
231 warn!("Failed to close expired connection: {}", e);
232 }
233 self.stats.write().await.total_destroyed += 1;
234 continue;
235 }
236
237 let health_check =
238 tokio::time::timeout(self.config.validation_timeout, wrapper.is_healthy())
239 .await;
240
241 match health_check {
242 Ok(true) => {
243 wrapper.is_in_use = true;
244 wrapper.last_activity = Instant::now();
245 wrapper.last_health_check = Some((Instant::now(), true));
246 debug!(
247 "Selected connection {} using {:?} strategy",
248 wrapper.connection_id, self.config.load_balancing
249 );
250 return Some(wrapper.connection);
251 }
252 Ok(false) | Err(_) => {
253 if let Err(e) = wrapper.connection.close().await {
254 warn!("Failed to close unhealthy connection: {}", e);
255 }
256 let mut stats = self.stats.write().await;
257 stats.health_check_failures += 1;
258 stats.total_destroyed += 1;
259 continue;
260 }
261 }
262 }
263 }
264
265 None
266 }
267
268 pub(super) async fn create_new_connection(&self) -> Result<T> {
270 match self.connection_factory.create_connection().await {
271 Ok(connection) => {
272 self.stats.write().await.total_created += 1;
273 debug!("Created new connection");
274 Ok(connection)
275 }
276 Err(e) => {
277 self.stats.write().await.creation_failures += 1;
278 error!("Failed to create connection: {}", e);
279 Err(e)
280 }
281 }
282 }
283
284 async fn return_connection_with_metrics(
286 &self,
287 mut connection: T,
288 execution_time: Duration,
289 success: bool,
290 ) {
291 connection.update_activity();
292
293 let mut wrapper = PooledConnectionWrapper::new(connection);
294 wrapper.record_usage(execution_time, success);
295 wrapper.is_in_use = false;
296
297 self.connections.lock().await.push_back(wrapper);
298
299 let mut active_count = self.active_count.lock().await;
300 if *active_count > 0 {
301 *active_count -= 1;
302 }
303
304 self.stats.write().await.total_returned += 1;
305
306 let mut controller = self.adaptive_controller.write().await;
307 let utilization = (*active_count as f64) / (self.config.max_connections as f64);
308 controller.record_metrics(execution_time, utilization);
309
310 debug!(
311 "Returned connection to pool with metrics: exec_time={:?}, success={}",
312 execution_time, success
313 );
314 }
315
316 #[allow(dead_code)]
318 async fn return_connection(&self, connection: T) {
319 self.return_connection_with_metrics(connection, Duration::from_millis(100), true)
320 .await;
321 }
322
323 pub(super) async fn ensure_min_connections(&self) -> Result<()> {
325 let current_count = self.connections.lock().await.len();
326 let active_count = *self.active_count.lock().await;
327 let total_count = current_count + active_count;
328
329 if total_count < self.config.min_connections {
330 let needed = self.config.min_connections - total_count;
331 for _ in 0..needed {
332 match self.create_new_connection().await {
333 Ok(connection) => {
334 let wrapper = PooledConnectionWrapper::new(connection);
335 self.connections.lock().await.push_back(wrapper);
336 }
337 Err(e) => {
338 warn!("Failed to create minimum connection: {}", e);
339 break;
340 }
341 }
342 }
343 }
344
345 Ok(())
346 }
347
348 pub(super) async fn update_metrics(&self, wait_time: Duration) {
350 let mut metrics = self.metrics.write().await;
351 metrics.total_requests += 1;
352 let wait_time_ms = wait_time.as_millis() as f64;
353 let alpha = 0.1;
354 metrics.avg_wait_time_ms = alpha * wait_time_ms + (1.0 - alpha) * metrics.avg_wait_time_ms;
355
356 let connections = self.connections.lock().await;
357 let active_count = *self.active_count.lock().await;
358 let utilization = (active_count as f64) / (self.config.max_connections as f64);
359
360 metrics
361 .utilization_history
362 .push_back((Instant::now(), utilization));
363 if metrics.utilization_history.len() > 1000 {
364 metrics.utilization_history.pop_front();
365 }
366
367 metrics.current_size = connections.len() + active_count;
368 metrics.peak_size = metrics.peak_size.max(metrics.current_size);
369 metrics.last_updated = Instant::now();
370 }
371
372 pub async fn status(&self) -> PoolStatus {
374 let connections = self.connections.lock().await;
375 let active_count = *self.active_count.lock().await;
376 let metrics = self.metrics.read().await;
377 let pending = self.pending_requests.load(Ordering::Relaxed);
378
379 let total_connections = connections.len() + active_count;
380 let utilization = if self.config.max_connections > 0 {
381 (total_connections as f64 / self.config.max_connections as f64) * 100.0
382 } else {
383 0.0
384 };
385
386 let circuit_breaker_open = if let Some(cb) = &self.circuit_breaker {
387 !cb.is_healthy().await
388 } else {
389 false
390 };
391
392 let is_healthy =
393 !circuit_breaker_open && utilization < 95.0 && metrics.avg_wait_time_ms < 1000.0;
394
395 PoolStatus {
396 total_connections,
397 active_connections: active_count,
398 idle_connections: connections.len(),
399 pending_requests: pending,
400 is_healthy,
401 last_health_check: Some(Instant::now()),
402 utilization_percent: utilization,
403 avg_response_time_ms: metrics.avg_wait_time_ms,
404 load_balancing_strategy: self.config.load_balancing.clone(),
405 circuit_breaker_open,
406 config_hash: self.calculate_config_hash(),
407 }
408 }
409
410 fn calculate_config_hash(&self) -> u64 {
411 use std::collections::hash_map::DefaultHasher;
412 use std::hash::{Hash, Hasher};
413 let mut hasher = DefaultHasher::new();
414 self.config.min_connections.hash(&mut hasher);
415 self.config.max_connections.hash(&mut hasher);
416 self.config.adaptive_sizing.hash(&mut hasher);
417 hasher.finish()
418 }
419
420 pub async fn stats(&self) -> PoolStats {
422 self.stats.read().await.clone()
423 }
424
425 pub async fn new_from_config(
427 config: &StreamConfig,
428 factory: Arc<dyn ConnectionFactory<T>>,
429 ) -> Result<Self> {
430 let pool_config = PoolConfig {
431 min_connections: 1,
432 max_connections: config.max_connections,
433 connection_timeout: config.connection_timeout,
434 adaptive_sizing: true,
435 enable_circuit_breaker: true,
436 enable_metrics: true,
437 ..Default::default()
438 };
439 Self::new(pool_config, factory).await
440 }
441
442 pub async fn health_check(&self) -> PoolStatus {
444 self.status().await
445 }
446
447 pub async fn get_detailed_metrics(&self) -> DetailedPoolMetrics {
449 let status = self.status().await;
450 let metrics = self.metrics.read().await;
451 let stats = self.stats.read().await;
452 let controller = self.adaptive_controller.read().await;
453
454 DetailedPoolMetrics {
455 status,
456 total_requests: metrics.total_requests,
457 peak_size: metrics.peak_size,
458 avg_wait_time_ms: metrics.avg_wait_time_ms,
459 response_time_p50: metrics.response_time_p50,
460 response_time_p95: metrics.response_time_p95,
461 response_time_p99: metrics.response_time_p99,
462 adaptive_scaling_events: stats.adaptive_scaling_events,
463 circuit_breaker_failures: stats.circuit_breaker_failures,
464 load_balancing_decisions: stats.load_balancing_decisions,
465 current_target_size: controller.current_target_size,
466 pool_uptime: self.created_at.elapsed(),
467 }
468 }
469
470 pub async fn reset_statistics(&self) {
472 *self.stats.write().await = PoolStats::default();
473 *self.metrics.write().await = PoolMetrics::default();
474 info!("Pool statistics reset");
475 }
476
477 pub async fn resize(&self, new_size: usize) -> Result<()> {
479 if new_size < self.config.min_connections || new_size > self.config.max_connections {
480 return Err(anyhow!(
481 "New size {} outside allowed range [{}, {}]",
482 new_size,
483 self.config.min_connections,
484 self.config.max_connections
485 ));
486 }
487 let mut controller = self.adaptive_controller.write().await;
488 controller.current_target_size = new_size;
489 controller.last_adjustment = Instant::now();
490 info!("Pool manually resized to {}", new_size);
491 Ok(())
492 }
493
494 pub async fn new_with_failover(
496 config: PoolConfig,
497 primary_factory: Arc<dyn ConnectionFactory<T>>,
498 secondary_factory: Arc<dyn ConnectionFactory<T>>,
499 failover_config: FailoverConfig,
500 ) -> Result<Self> {
501 let primary_endpoint = ConnectionEndpoint {
502 name: "primary".to_string(),
503 factory: primary_factory.clone(),
504 priority: 1,
505 metadata: HashMap::new(),
506 };
507 let secondary_endpoint = ConnectionEndpoint {
508 name: "secondary".to_string(),
509 factory: secondary_factory,
510 priority: 2,
511 metadata: HashMap::new(),
512 };
513
514 let failover_manager = Arc::new(
515 FailoverManager::new(failover_config, primary_endpoint, secondary_endpoint).await?,
516 );
517
518 let mut pool = Self::new(config, primary_factory).await?;
519 pool.failover_manager = Some(failover_manager.clone());
520
521 let mut failover_events = failover_manager.subscribe();
522 let stats = pool.stats.clone();
523
524 tokio::spawn(async move {
525 while let Ok(event) = failover_events.recv().await {
526 match event {
527 crate::failover::FailoverEvent::FailoverCompleted { from, to, duration } => {
528 info!(
529 "Failover completed from {} to {} in {:?}",
530 from, to, duration
531 );
532 stats.write().await.failover_count += 1;
533 }
534 crate::failover::FailoverEvent::FailbackCompleted { from, to, duration } => {
535 info!(
536 "Failback completed from {} to {} in {:?}",
537 from, to, duration
538 );
539 }
540 crate::failover::FailoverEvent::AllConnectionsUnavailable => {
541 error!("All connections unavailable!");
542 }
543 _ => {}
544 }
545 }
546 });
547
548 Ok(pool)
549 }
550
551 pub async fn get_health_statistics(&self) -> crate::health_monitor::OverallHealthStatistics {
553 self.health_monitor.get_overall_statistics().await
554 }
555
556 pub async fn get_reconnection_statistics(&self) -> crate::reconnect::ReconnectStatistics {
558 self.reconnect_manager.get_statistics().await
559 }
560
561 pub async fn get_failover_statistics(&self) -> Option<crate::failover::FailoverStatistics> {
563 if let Some(fm) = &self.failover_manager {
564 Some(fm.get_statistics().await)
565 } else {
566 None
567 }
568 }
569
570 pub async fn register_failure_callback<F>(&self, callback: F)
572 where
573 F: Fn(String, String, u32) -> Pin<Box<dyn Future<Output = ()> + Send>>
574 + Send
575 + Sync
576 + 'static,
577 {
578 self.reconnect_manager
579 .register_failure_callback(callback)
580 .await;
581 }
582
583 pub async fn trigger_failover(&self) -> Result<()> {
585 if let Some(fm) = &self.failover_manager {
586 fm.trigger_failover().await
587 } else {
588 Err(anyhow!("Failover not configured for this pool"))
589 }
590 }
591
592 pub fn has_failover(&self) -> bool {
594 self.failover_manager.is_some()
595 }
596
597 pub async fn get_unhealthy_connections(&self) -> Vec<String> {
599 self.health_monitor.get_unhealthy_connections().await
600 }
601
602 pub fn subscribe_health_events(
604 &self,
605 ) -> broadcast::Receiver<crate::health_monitor::HealthEvent> {
606 self.health_monitor.subscribe()
607 }
608
609 pub fn subscribe_reconnect_events(
611 &self,
612 ) -> broadcast::Receiver<crate::reconnect::ReconnectEvent> {
613 self.reconnect_manager.subscribe()
614 }
615}
616
617pub struct PooledConnectionHandle<T: PooledConnection> {
621 connection: Option<T>,
622 pool_connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
623 active_count: Arc<Mutex<usize>>,
624 stats: Arc<RwLock<PoolStats>>,
625 metrics: Arc<RwLock<PoolMetrics>>,
626 adaptive_controller: Arc<RwLock<AdaptiveController>>,
627 acquired_at: Instant,
628 execution_times: Vec<Duration>,
629 operation_count: u32,
630 success_count: u32,
631}
632
633impl<T: PooledConnection> PooledConnectionHandle<T> {
634 pub(super) fn new(
635 connection: T,
636 pool_connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
637 active_count: Arc<Mutex<usize>>,
638 stats: Arc<RwLock<PoolStats>>,
639 metrics: Arc<RwLock<PoolMetrics>>,
640 adaptive_controller: Arc<RwLock<AdaptiveController>>,
641 ) -> Self {
642 Self {
643 connection: Some(connection),
644 pool_connections,
645 active_count,
646 stats,
647 metrics,
648 adaptive_controller,
649 acquired_at: Instant::now(),
650 execution_times: Vec::new(),
651 operation_count: 0,
652 success_count: 0,
653 }
654 }
655
656 pub fn record_operation(&mut self, execution_time: Duration, success: bool) {
657 self.execution_times.push(execution_time);
658 self.operation_count += 1;
659 if success {
660 self.success_count += 1;
661 }
662 debug!(
663 "Recorded operation: time={:?}, success={}, total_ops={}",
664 execution_time, success, self.operation_count
665 );
666 }
667
668 pub fn get_operation_stats(&self) -> (u32, u32, Duration) {
669 let avg_time = if !self.execution_times.is_empty() {
670 self.execution_times.iter().sum::<Duration>() / self.execution_times.len() as u32
671 } else {
672 Duration::ZERO
673 };
674 (self.operation_count, self.success_count, avg_time)
675 }
676
677 pub fn held_duration(&self) -> Duration {
678 self.acquired_at.elapsed()
679 }
680
681 pub fn as_ref(&self) -> Option<&T> {
682 self.connection.as_ref()
683 }
684
685 pub fn as_mut(&mut self) -> Option<&mut T> {
686 self.connection.as_mut()
687 }
688
689 pub fn take(mut self) -> Option<T> {
690 self.connection.take()
691 }
692}
693
694impl<T: PooledConnection> Drop for PooledConnectionHandle<T> {
695 fn drop(&mut self) {
696 if let Some(connection) = self.connection.take() {
697 let pool_connections = self.pool_connections.clone();
698 let active_count = self.active_count.clone();
699 let stats = self.stats.clone();
700 let _metrics = self.metrics.clone();
701 let adaptive_controller = self.adaptive_controller.clone();
702
703 let total_held_time = self.acquired_at.elapsed();
704 let avg_execution_time = if !self.execution_times.is_empty() {
705 self.execution_times.iter().sum::<Duration>() / self.execution_times.len() as u32
706 } else {
707 Duration::from_millis(50)
708 };
709
710 let success_rate = if self.operation_count > 0 {
711 self.success_count as f64 / self.operation_count as f64
712 } else {
713 1.0
714 };
715 let overall_success = success_rate > 0.8;
716
717 tokio::spawn(async move {
718 let mut wrapper = PooledConnectionWrapper::new(connection);
719 wrapper.record_usage(avg_execution_time, overall_success);
720 wrapper.is_in_use = false;
721
722 let usage_count = wrapper.usage_count;
723 pool_connections.lock().await.push_back(wrapper);
724
725 let mut active = active_count.lock().await;
726 if *active > 0 {
727 *active -= 1;
728 }
729
730 stats.write().await.total_returned += 1;
731
732 let utilization = (*active as f64) / 10.0;
733 adaptive_controller
734 .write()
735 .await
736 .record_metrics(avg_execution_time, utilization);
737
738 debug!(
739 "Returned connection to pool: held_time={:?}, ops={}, success_rate={:.2}",
740 total_held_time, usage_count, success_rate
741 );
742 });
743 }
744 }
745}