1use crate::{
8 circuit_breaker::{
9 new_shared_circuit_breaker, CircuitBreakerConfig, FailureType, SharedCircuitBreaker,
10 SharedCircuitBreakerExt,
11 },
12 failover::{ConnectionEndpoint, FailoverConfig, FailoverManager},
13 health_monitor::{HealthCheckConfig, HealthMonitor, HealthStatus},
14 reconnect::{ReconnectConfig, ReconnectManager, ReconnectStrategy},
15 StreamConfig,
16};
17use anyhow::{anyhow, Result};
18use fastrand;
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, VecDeque};
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::{
24 atomic::{AtomicUsize, Ordering},
25 Arc,
26};
27
28#[cfg(test)]
29use futures_util;
30use std::time::{Duration, Instant};
31use tokio::sync::{broadcast, Mutex, RwLock, Semaphore};
32use tracing::{debug, error, info, warn};
33use uuid::Uuid;
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct PoolConfig {
38 pub min_connections: usize,
39 pub max_connections: usize,
40 pub connection_timeout: Duration,
41 pub idle_timeout: Duration,
42 pub max_lifetime: Duration,
43 pub health_check_interval: Duration,
44 pub retry_attempts: u32,
45 pub adaptive_sizing: bool,
47 pub target_response_time_ms: u64,
49 pub load_balancing: LoadBalancingStrategy,
51 pub enable_circuit_breaker: bool,
53 pub circuit_breaker_config: Option<CircuitBreakerConfig>,
55 pub enable_metrics: bool,
57 pub validation_timeout: Duration,
59 pub acquire_timeout: Duration,
61}
62
63impl Default for PoolConfig {
64 fn default() -> Self {
65 Self {
66 min_connections: 1,
67 max_connections: 10,
68 connection_timeout: Duration::from_secs(30),
69 idle_timeout: Duration::from_secs(300), max_lifetime: Duration::from_secs(1800), health_check_interval: Duration::from_secs(60),
72 retry_attempts: 3,
73 adaptive_sizing: true,
74 target_response_time_ms: 100,
75 load_balancing: LoadBalancingStrategy::RoundRobin,
76 enable_circuit_breaker: true,
77 circuit_breaker_config: Some(CircuitBreakerConfig::default()),
78 enable_metrics: true,
79 validation_timeout: Duration::from_secs(5),
80 acquire_timeout: Duration::from_secs(30),
81 }
82 }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
87pub enum LoadBalancingStrategy {
88 RoundRobin,
90 LeastRecentlyUsed,
92 Random,
94 LeastConnections,
96 WeightedRoundRobin,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct PoolStatus {
103 pub total_connections: usize,
104 pub active_connections: usize,
105 pub idle_connections: usize,
106 pub pending_requests: usize,
107 pub is_healthy: bool,
108 #[serde(skip)]
109 pub last_health_check: Option<Instant>,
110 pub utilization_percent: f64,
112 pub avg_response_time_ms: f64,
114 pub load_balancing_strategy: LoadBalancingStrategy,
116 pub circuit_breaker_open: bool,
118 pub config_hash: u64,
120}
121
122#[async_trait::async_trait]
124pub trait PooledConnection: Send + Sync + 'static {
125 async fn is_healthy(&self) -> bool;
127
128 async fn close(&mut self) -> Result<()>;
130
131 fn created_at(&self) -> Instant;
133
134 fn last_activity(&self) -> Instant;
136
137 fn update_activity(&mut self);
139
140 fn clone_connection(&self) -> Box<dyn PooledConnection>;
142}
143
144#[async_trait::async_trait]
146impl PooledConnection for Box<dyn PooledConnection> {
147 async fn is_healthy(&self) -> bool {
148 self.as_ref().is_healthy().await
149 }
150
151 async fn close(&mut self) -> Result<()> {
152 self.as_mut().close().await
153 }
154
155 fn created_at(&self) -> Instant {
156 self.as_ref().created_at()
157 }
158
159 fn last_activity(&self) -> Instant {
160 self.as_ref().last_activity()
161 }
162
163 fn update_activity(&mut self) {
164 self.as_mut().update_activity()
165 }
166
167 fn clone_connection(&self) -> Box<dyn PooledConnection> {
168 self.as_ref().clone_connection()
169 }
170}
171
172struct PooledConnectionWrapper<T: PooledConnection> {
174 connection: T,
175 created_at: Instant,
176 last_activity: Instant,
177 is_in_use: bool,
178 connection_id: String,
180 usage_count: u64,
182 total_execution_time: Duration,
184 avg_response_time: Duration,
186 failure_count: u32,
188 last_health_check: Option<(Instant, bool)>,
190 weight: f64,
192}
193
194impl<T: PooledConnection> PooledConnectionWrapper<T> {
195 fn new(connection: T) -> Self {
196 let now = Instant::now();
197 Self {
198 connection,
199 created_at: now,
200 last_activity: now,
201 is_in_use: false,
202 connection_id: Uuid::new_v4().to_string(),
203 usage_count: 0,
204 total_execution_time: Duration::ZERO,
205 avg_response_time: Duration::from_millis(50), failure_count: 0,
207 last_health_check: None,
208 weight: 1.0, }
210 }
211
212 fn record_usage(&mut self, execution_time: Duration, success: bool) {
214 self.usage_count += 1;
215 self.last_activity = Instant::now();
216 self.total_execution_time += execution_time;
217
218 let alpha = 0.1; let new_time_ms = execution_time.as_millis() as f64;
221 let current_avg_ms = self.avg_response_time.as_millis() as f64;
222 let updated_avg_ms = alpha * new_time_ms + (1.0 - alpha) * current_avg_ms;
223 self.avg_response_time = Duration::from_millis(updated_avg_ms as u64);
224
225 if !success {
226 self.failure_count += 1;
227 self.weight = (self.weight * 0.9).max(0.1);
229 } else if self.failure_count > 0 {
230 self.weight = (self.weight * 1.01).min(1.0);
232 }
233 }
234
235 fn efficiency_score(&self) -> f64 {
237 if self.usage_count == 0 {
238 return 1.0;
239 }
240
241 let failure_rate = self.failure_count as f64 / self.usage_count as f64;
242 let response_time_penalty = (self.avg_response_time.as_millis() as f64).ln() / 10.0;
243
244 (1.0 - failure_rate) * self.weight / (1.0 + response_time_penalty)
245 }
246
247 fn is_expired(&self, max_lifetime: Duration, idle_timeout: Duration) -> bool {
248 let now = Instant::now();
249 now.duration_since(self.created_at) > max_lifetime
250 || (!self.is_in_use && now.duration_since(self.last_activity) > idle_timeout)
251 }
252
253 async fn is_healthy(&self) -> bool {
254 self.connection.is_healthy().await
255 }
256}
257
258pub struct ConnectionPool<T: PooledConnection + Clone> {
260 config: PoolConfig,
261 connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
262 active_count: Arc<Mutex<usize>>,
263 semaphore: Arc<Semaphore>,
264 stats: Arc<RwLock<PoolStats>>,
265 connection_factory: Arc<dyn ConnectionFactory<T>>,
266 circuit_breaker: Option<SharedCircuitBreaker>,
268 round_robin_counter: Arc<AtomicUsize>,
270 metrics: Arc<RwLock<PoolMetrics>>,
272 pending_requests: Arc<AtomicUsize>,
274 created_at: Instant,
276 adaptive_controller: Arc<RwLock<AdaptiveController>>,
278 health_monitor: Arc<HealthMonitor<T>>,
280 reconnect_manager: Arc<ReconnectManager<T>>,
282 failover_manager: Option<Arc<FailoverManager<T>>>,
284}
285
286#[async_trait::async_trait]
288pub trait ConnectionFactory<T: PooledConnection + Clone>: Send + Sync {
289 async fn create_connection(&self) -> Result<T>;
290}
291
292#[derive(Debug, Default, Clone)]
294pub struct PoolStats {
295 total_created: u64,
296 total_destroyed: u64,
297 total_borrowed: u64,
298 total_returned: u64,
299 creation_failures: u64,
300 health_check_failures: u64,
301 timeouts: u64,
302 circuit_breaker_failures: u64,
303 adaptive_scaling_events: u64,
304 load_balancing_decisions: u64,
305 failover_count: u64,
306}
307
308#[derive(Debug, Clone)]
310struct PoolMetrics {
311 current_size: usize,
313 peak_size: usize,
315 total_requests: u64,
317 avg_wait_time_ms: f64,
319 utilization_history: VecDeque<(Instant, f64)>,
321 response_time_p50: Duration,
323 response_time_p95: Duration,
324 response_time_p99: Duration,
325 error_rates: HashMap<String, f64>,
327 last_updated: Instant,
329}
330
331impl Default for PoolMetrics {
332 fn default() -> Self {
333 Self {
334 current_size: 0,
335 peak_size: 0,
336 total_requests: 0,
337 avg_wait_time_ms: 0.0,
338 utilization_history: VecDeque::new(),
339 response_time_p50: Duration::ZERO,
340 response_time_p95: Duration::ZERO,
341 response_time_p99: Duration::ZERO,
342 error_rates: HashMap::new(),
343 last_updated: Instant::now(),
344 }
345 }
346}
347
348#[derive(Debug, Clone)]
350struct AdaptiveController {
351 enabled: bool,
352 target_response_time: Duration,
353 last_adjustment: Instant,
354 adjustment_cooldown: Duration,
355 current_target_size: usize,
356 response_time_samples: VecDeque<Duration>,
357 utilization_samples: VecDeque<f64>,
358}
359
360impl Default for AdaptiveController {
361 fn default() -> Self {
362 Self {
363 enabled: false,
364 target_response_time: Duration::from_millis(100),
365 last_adjustment: Instant::now(),
366 adjustment_cooldown: Duration::from_secs(60),
367 current_target_size: 1,
368 response_time_samples: VecDeque::with_capacity(100),
369 utilization_samples: VecDeque::with_capacity(100),
370 }
371 }
372}
373
374impl AdaptiveController {
375 fn should_scale_up(
376 &self,
377 _current_size: usize,
378 avg_response_time: Duration,
379 utilization: f64,
380 ) -> bool {
381 if !self.enabled || self.last_adjustment.elapsed() < self.adjustment_cooldown {
382 return false;
383 }
384
385 avg_response_time > self.target_response_time && utilization > 0.8
386 }
387
388 fn should_scale_down(
389 &self,
390 current_size: usize,
391 avg_response_time: Duration,
392 utilization: f64,
393 ) -> bool {
394 if !self.enabled
395 || self.last_adjustment.elapsed() < self.adjustment_cooldown
396 || current_size <= 1
397 {
398 return false;
399 }
400
401 avg_response_time < self.target_response_time / 2 && utilization < 0.3
402 }
403
404 fn record_metrics(&mut self, response_time: Duration, utilization: f64) {
405 self.response_time_samples.push_back(response_time);
406 if self.response_time_samples.len() > 100 {
407 self.response_time_samples.pop_front();
408 }
409
410 self.utilization_samples.push_back(utilization);
411 if self.utilization_samples.len() > 100 {
412 self.utilization_samples.pop_front();
413 }
414 }
415}
416
417impl<T: PooledConnection + Clone> ConnectionPool<T> {
418 pub async fn new(config: PoolConfig, factory: Arc<dyn ConnectionFactory<T>>) -> Result<Self> {
420 let circuit_breaker = if config.enable_circuit_breaker {
422 Some(new_shared_circuit_breaker(
423 config.circuit_breaker_config.clone().unwrap_or_default(),
424 ))
425 } else {
426 None
427 };
428
429 let adaptive_controller = AdaptiveController {
431 enabled: config.adaptive_sizing,
432 target_response_time: Duration::from_millis(config.target_response_time_ms),
433 current_target_size: config.min_connections,
434 ..Default::default()
435 };
436
437 let health_check_config = HealthCheckConfig {
439 check_interval: config.health_check_interval,
440 check_timeout: config.validation_timeout,
441 enable_statistics: config.enable_metrics,
442 ..Default::default()
443 };
444 let health_monitor = Arc::new(HealthMonitor::new(health_check_config));
445
446 let reconnect_config = ReconnectConfig {
448 initial_delay: Duration::from_millis(100),
449 max_delay: Duration::from_secs(30),
450 max_attempts: config.retry_attempts,
451 connection_timeout: config.connection_timeout,
452 ..Default::default()
453 };
454 let reconnect_manager = Arc::new(ReconnectManager::new(
455 reconnect_config,
456 ReconnectStrategy::ExponentialBackoff,
457 ));
458
459 let pool = Self {
460 semaphore: Arc::new(Semaphore::new(config.max_connections)),
461 connections: Arc::new(Mutex::new(VecDeque::new())),
462 active_count: Arc::new(Mutex::new(0)),
463 stats: Arc::new(RwLock::new(PoolStats::default())),
464 connection_factory: factory,
465 circuit_breaker,
466 round_robin_counter: Arc::new(AtomicUsize::new(0)),
467 metrics: Arc::new(RwLock::new(PoolMetrics::default())),
468 pending_requests: Arc::new(AtomicUsize::new(0)),
469 created_at: Instant::now(),
470 adaptive_controller: Arc::new(RwLock::new(adaptive_controller)),
471 health_monitor,
472 reconnect_manager,
473 failover_manager: None,
474 config,
475 };
476
477 pool.ensure_min_connections().await?;
479
480 pool.start_maintenance_task().await;
482
483 if pool.config.adaptive_sizing {
485 pool.start_adaptive_sizing_task().await;
486 }
487
488 pool.start_health_monitoring().await;
490
491 info!(
492 "Created advanced connection pool with health monitoring, automatic reconnection, and {} features",
493 if pool.circuit_breaker.is_some() {
494 "circuit breaker"
495 } else {
496 "standard"
497 }
498 );
499
500 Ok(pool)
501 }
502
503 pub async fn get_connection(&self) -> Result<PooledConnectionHandle<T>> {
505 let start_time = Instant::now();
506 self.pending_requests.fetch_add(1, Ordering::Relaxed);
507
508 if let Some(cb) = &self.circuit_breaker {
510 if !cb.can_execute().await {
511 self.pending_requests.fetch_sub(1, Ordering::Relaxed);
512 return Err(anyhow!(
513 "Circuit breaker is open - connection pool unavailable"
514 ));
515 }
516 }
517
518 let _permit = tokio::time::timeout(self.config.acquire_timeout, self.semaphore.acquire())
520 .await
521 .map_err(|_| anyhow!("Timeout acquiring connection from pool"))?
522 .map_err(|_| anyhow!("Failed to acquire semaphore permit"))?;
523
524 let connection = match self.try_get_existing_connection_with_lb().await {
525 Some(conn) => {
526 if let Some(cb) = &self.circuit_breaker {
528 cb.record_success_with_duration(start_time.elapsed()).await;
529 }
530 conn
531 }
532 None => match self.create_new_connection().await {
533 Ok(conn) => {
534 if let Some(cb) = &self.circuit_breaker {
535 cb.record_success_with_duration(start_time.elapsed()).await;
536 }
537 conn
538 }
539 Err(e) => {
540 if let Some(cb) = &self.circuit_breaker {
541 cb.record_failure_with_type(FailureType::NetworkError).await;
542 }
543 self.pending_requests.fetch_sub(1, Ordering::Relaxed);
544 return Err(e);
545 }
546 },
547 };
548
549 *self.active_count.lock().await += 1;
550 let mut stats = self.stats.write().await;
551 stats.total_borrowed += 1;
552 stats.load_balancing_decisions += 1;
553 drop(stats);
554
555 let wait_time = start_time.elapsed();
557 self.update_metrics(wait_time).await;
558
559 self.pending_requests.fetch_sub(1, Ordering::Relaxed);
560
561 Ok(PooledConnectionHandle::new(
562 connection,
563 self.connections.clone(),
564 self.active_count.clone(),
565 self.stats.clone(),
566 self.metrics.clone(),
567 self.adaptive_controller.clone(),
568 ))
569 }
570
571 async fn try_get_existing_connection_with_lb(&self) -> Option<T> {
573 let mut connections = self.connections.lock().await;
574
575 if connections.is_empty() {
576 return None;
577 }
578
579 let selected_index = match self.config.load_balancing {
581 LoadBalancingStrategy::RoundRobin => {
582 self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % connections.len()
583 }
584 LoadBalancingStrategy::Random => fastrand::usize(..connections.len()),
585 LoadBalancingStrategy::LeastRecentlyUsed => {
586 connections
588 .iter()
589 .enumerate()
590 .min_by_key(|(_, wrapper)| wrapper.last_activity)
591 .map(|(idx, _)| idx)
592 .unwrap_or(0)
593 }
594 LoadBalancingStrategy::LeastConnections => {
595 connections
597 .iter()
598 .enumerate()
599 .min_by_key(|(_, wrapper)| wrapper.usage_count)
600 .map(|(idx, _)| idx)
601 .unwrap_or(0)
602 }
603 LoadBalancingStrategy::WeightedRoundRobin => {
604 connections
606 .iter()
607 .enumerate()
608 .max_by(|(_, a), (_, b)| {
609 a.efficiency_score()
610 .partial_cmp(&b.efficiency_score())
611 .unwrap_or(std::cmp::Ordering::Equal)
612 })
613 .map(|(idx, _)| idx)
614 .unwrap_or(0)
615 }
616 };
617
618 for attempt in 0..connections.len() {
620 let index = (selected_index + attempt) % connections.len();
621
622 if let Some(mut wrapper) = connections.remove(index) {
623 if wrapper.is_expired(self.config.max_lifetime, self.config.idle_timeout) {
624 if let Err(e) = wrapper.connection.close().await {
626 warn!("Failed to close expired connection: {}", e);
627 }
628 self.stats.write().await.total_destroyed += 1;
629 continue;
630 }
631
632 let health_check =
634 tokio::time::timeout(self.config.validation_timeout, wrapper.is_healthy())
635 .await;
636
637 match health_check {
638 Ok(true) => {
639 wrapper.is_in_use = true;
640 wrapper.last_activity = Instant::now();
641 wrapper.last_health_check = Some((Instant::now(), true));
642 debug!(
643 "Selected connection {} using {:?} strategy",
644 wrapper.connection_id, self.config.load_balancing
645 );
646 return Some(wrapper.connection);
647 }
648 Ok(false) | Err(_) => {
649 if let Err(e) = wrapper.connection.close().await {
651 warn!("Failed to close unhealthy connection: {}", e);
652 }
653 let mut stats = self.stats.write().await;
654 stats.health_check_failures += 1;
655 stats.total_destroyed += 1;
656 continue;
657 }
658 }
659 }
660 }
661
662 None
663 }
664
665 async fn create_new_connection(&self) -> Result<T> {
667 match self.connection_factory.create_connection().await {
668 Ok(connection) => {
669 self.stats.write().await.total_created += 1;
670 debug!("Created new connection");
671 Ok(connection)
672 }
673 Err(e) => {
674 self.stats.write().await.creation_failures += 1;
675 error!("Failed to create connection: {}", e);
676 Err(e)
677 }
678 }
679 }
680
681 async fn return_connection_with_metrics(
683 &self,
684 mut connection: T,
685 execution_time: Duration,
686 success: bool,
687 ) {
688 connection.update_activity();
689
690 let mut wrapper = PooledConnectionWrapper::new(connection);
691 wrapper.record_usage(execution_time, success);
692 wrapper.is_in_use = false;
693
694 self.connections.lock().await.push_back(wrapper);
695
696 let mut active_count = self.active_count.lock().await;
697 if *active_count > 0 {
698 *active_count -= 1;
699 }
700
701 self.stats.write().await.total_returned += 1;
702
703 let mut controller = self.adaptive_controller.write().await;
705 let utilization = (*active_count as f64) / (self.config.max_connections as f64);
706 controller.record_metrics(execution_time, utilization);
707
708 debug!(
709 "Returned connection to pool with metrics: exec_time={:?}, success={}",
710 execution_time, success
711 );
712 }
713
714 async fn return_connection(&self, connection: T) {
716 self.return_connection_with_metrics(connection, Duration::from_millis(100), true)
717 .await;
718 }
719
720 async fn ensure_min_connections(&self) -> Result<()> {
722 let current_count = self.connections.lock().await.len();
723 let active_count = *self.active_count.lock().await;
724 let total_count = current_count + active_count;
725
726 if total_count < self.config.min_connections {
727 let needed = self.config.min_connections - total_count;
728
729 for _ in 0..needed {
730 match self.create_new_connection().await {
731 Ok(connection) => {
732 let wrapper = PooledConnectionWrapper::new(connection);
733 self.connections.lock().await.push_back(wrapper);
734 }
735 Err(e) => {
736 warn!("Failed to create minimum connection: {}", e);
737 break;
738 }
739 }
740 }
741 }
742
743 Ok(())
744 }
745
746 async fn start_maintenance_task(&self) {
748 let connections = self.connections.clone();
749 let stats = self.stats.clone();
750 let config = self.config.clone();
751 let health_monitor = self.health_monitor.clone();
752 let reconnect_manager = self.reconnect_manager.clone();
753 let connection_factory = self.connection_factory.clone();
754
755 tokio::spawn(async move {
756 let mut interval = tokio::time::interval(config.health_check_interval);
757
758 loop {
759 interval.tick().await;
760
761 let mut connections_guard = connections.lock().await;
762 let mut to_remove = Vec::new();
763 let mut to_reconnect = Vec::new();
764
765 for (index, wrapper) in connections_guard.iter().enumerate() {
766 let conn_id = wrapper.connection_id.clone();
767
768 if wrapper.is_expired(config.max_lifetime, config.idle_timeout) {
770 to_remove.push(index);
771 health_monitor.unregister_connection(&conn_id).await;
772 } else {
773 let health_status = health_monitor
775 .check_connection_health(&conn_id, &wrapper.connection)
776 .await
777 .unwrap_or(HealthStatus::Unknown);
778
779 match health_status {
780 HealthStatus::Dead => {
781 to_remove.push(index);
782 health_monitor.unregister_connection(&conn_id).await;
783 }
784 HealthStatus::Unhealthy => {
785 to_reconnect.push((index, conn_id.clone()));
786 }
787 _ => {}
788 }
789 }
790 }
791
792 for &index in to_remove.iter().rev() {
794 if let Some(mut wrapper) = connections_guard.remove(index) {
795 if let Err(e) = wrapper.connection.close().await {
796 warn!("Failed to close connection during maintenance: {}", e);
797 }
798 stats.write().await.total_destroyed += 1;
799 }
800 }
801
802 let to_reconnect_count = to_reconnect.len();
804 for (index, conn_id) in &to_reconnect {
805 if *index < connections_guard.len() {
806 match reconnect_manager
807 .reconnect(conn_id.clone(), connection_factory.clone())
808 .await
809 {
810 Ok(new_conn) => {
811 let mut new_wrapper = PooledConnectionWrapper::new(new_conn);
812 new_wrapper.connection_id = conn_id.clone();
813
814 let mut metadata = HashMap::new();
816 metadata.insert("pool_id".to_string(), "main".to_string());
817 health_monitor
818 .register_connection(conn_id.clone(), metadata)
819 .await;
820
821 connections_guard[*index] = new_wrapper;
822 info!("Successfully reconnected connection {}", conn_id);
823 }
824 Err(e) => {
825 warn!("Failed to reconnect connection {}: {}", conn_id, e);
826 connections_guard.remove(*index);
828 stats.write().await.total_destroyed += 1;
829 }
830 }
831 }
832 }
833
834 let dead_connections = health_monitor.get_dead_connections().await;
836 if !dead_connections.is_empty() {
837 warn!(
838 "Health monitor detected {} dead connections",
839 dead_connections.len()
840 );
841 }
842
843 debug!(
844 "Pool maintenance completed, removed {} connections, attempted {} reconnections",
845 to_remove.len(),
846 to_reconnect_count
847 );
848 }
849 });
850 }
851
852 async fn start_health_monitoring(&self) {
854 let connections = self.connections.lock().await;
856 for wrapper in connections.iter() {
857 let mut metadata = HashMap::new();
858 metadata.insert("pool_id".to_string(), "main".to_string());
859 metadata.insert(
860 "created_at".to_string(),
861 wrapper.created_at.elapsed().as_secs().to_string(),
862 );
863
864 self.health_monitor
865 .register_connection(wrapper.connection_id.clone(), metadata)
866 .await;
867 }
868
869 let mut health_events = self.health_monitor.subscribe();
871 let stats = self.stats.clone();
872
873 tokio::spawn(async move {
874 while let Ok(event) = health_events.recv().await {
875 match event {
876 crate::health_monitor::HealthEvent::ConnectionDead {
877 connection_id,
878 reason,
879 } => {
880 error!("Connection {} marked as dead: {}", connection_id, reason);
881 stats.write().await.health_check_failures += 1;
882 }
883 crate::health_monitor::HealthEvent::ConnectionRecovered { connection_id } => {
884 info!("Connection {} recovered", connection_id);
885 }
886 crate::health_monitor::HealthEvent::StatusChanged {
887 connection_id,
888 old_status,
889 new_status,
890 } => {
891 debug!(
892 "Connection {} status changed from {:?} to {:?}",
893 connection_id, old_status, new_status
894 );
895 }
896 _ => {}
897 }
898 }
899 });
900 }
901
902 pub async fn status(&self) -> PoolStatus {
904 let connections = self.connections.lock().await;
905 let active_count = *self.active_count.lock().await;
906 let metrics = self.metrics.read().await;
907 let pending = self.pending_requests.load(Ordering::Relaxed);
908
909 let total_connections = connections.len() + active_count;
910 let utilization = if self.config.max_connections > 0 {
911 (total_connections as f64 / self.config.max_connections as f64) * 100.0
912 } else {
913 0.0
914 };
915
916 let circuit_breaker_open = if let Some(cb) = &self.circuit_breaker {
917 !cb.is_healthy().await
918 } else {
919 false
920 };
921
922 let is_healthy =
923 !circuit_breaker_open && utilization < 95.0 && metrics.avg_wait_time_ms < 1000.0;
924
925 PoolStatus {
926 total_connections,
927 active_connections: active_count,
928 idle_connections: connections.len(),
929 pending_requests: pending,
930 is_healthy,
931 last_health_check: Some(Instant::now()),
932 utilization_percent: utilization,
933 avg_response_time_ms: metrics.avg_wait_time_ms,
934 load_balancing_strategy: self.config.load_balancing.clone(),
935 circuit_breaker_open,
936 config_hash: self.calculate_config_hash(),
937 }
938 }
939
940 fn calculate_config_hash(&self) -> u64 {
942 use std::collections::hash_map::DefaultHasher;
943 use std::hash::{Hash, Hasher};
944
945 let mut hasher = DefaultHasher::new();
946 self.config.min_connections.hash(&mut hasher);
947 self.config.max_connections.hash(&mut hasher);
948 self.config.adaptive_sizing.hash(&mut hasher);
949 hasher.finish()
950 }
951
952 pub async fn stats(&self) -> PoolStats {
954 self.stats.read().await.clone()
955 }
956
957 async fn update_metrics(&self, wait_time: Duration) {
959 let mut metrics = self.metrics.write().await;
960
961 metrics.total_requests += 1;
962 let wait_time_ms = wait_time.as_millis() as f64;
963
964 let alpha = 0.1;
966 metrics.avg_wait_time_ms = alpha * wait_time_ms + (1.0 - alpha) * metrics.avg_wait_time_ms;
967
968 let connections = self.connections.lock().await;
970 let active_count = *self.active_count.lock().await;
971 let utilization = (active_count as f64) / (self.config.max_connections as f64);
972
973 metrics
974 .utilization_history
975 .push_back((Instant::now(), utilization));
976 if metrics.utilization_history.len() > 1000 {
977 metrics.utilization_history.pop_front();
978 }
979
980 metrics.current_size = connections.len() + active_count;
981 metrics.peak_size = metrics.peak_size.max(metrics.current_size);
982 metrics.last_updated = Instant::now();
983 }
984
985 async fn start_adaptive_sizing_task(&self) {
987 let pool_metrics = self.metrics.clone();
988 let adaptive_controller = self.adaptive_controller.clone();
989 let pool_config = self.config.clone();
990 let stats = self.stats.clone();
991
992 tokio::spawn(async move {
993 let mut interval = tokio::time::interval(Duration::from_secs(30));
994
995 loop {
996 interval.tick().await;
997
998 let metrics = pool_metrics.read().await;
999 let mut controller = adaptive_controller.write().await;
1000
1001 if !controller.enabled {
1002 continue;
1003 }
1004
1005 let avg_response_time = Duration::from_millis(metrics.avg_wait_time_ms as u64);
1006 let current_utilization =
1007 if let Some((_, util)) = metrics.utilization_history.back() {
1008 *util
1009 } else {
1010 0.0
1011 };
1012
1013 let should_scale_up = controller.should_scale_up(
1014 metrics.current_size,
1015 avg_response_time,
1016 current_utilization,
1017 );
1018
1019 let should_scale_down = controller.should_scale_down(
1020 metrics.current_size,
1021 avg_response_time,
1022 current_utilization,
1023 );
1024
1025 if should_scale_up && metrics.current_size < pool_config.max_connections {
1026 controller.current_target_size =
1027 (controller.current_target_size + 1).min(pool_config.max_connections);
1028 controller.last_adjustment = Instant::now();
1029 stats.write().await.adaptive_scaling_events += 1;
1030 info!(
1031 "Adaptive scaling: scaling UP to {}",
1032 controller.current_target_size
1033 );
1034 } else if should_scale_down && metrics.current_size > pool_config.min_connections {
1035 controller.current_target_size =
1036 (controller.current_target_size.saturating_sub(1))
1037 .max(pool_config.min_connections);
1038 controller.last_adjustment = Instant::now();
1039 stats.write().await.adaptive_scaling_events += 1;
1040 info!(
1041 "Adaptive scaling: scaling DOWN to {}",
1042 controller.current_target_size
1043 );
1044 }
1045 }
1046 });
1047 }
1048}
1049
1050pub struct PooledConnectionHandle<T: PooledConnection> {
1052 connection: Option<T>,
1053 pool_connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
1054 active_count: Arc<Mutex<usize>>,
1055 stats: Arc<RwLock<PoolStats>>,
1056 metrics: Arc<RwLock<PoolMetrics>>,
1057 adaptive_controller: Arc<RwLock<AdaptiveController>>,
1058 acquired_at: Instant,
1059 execution_times: Vec<Duration>,
1060 operation_count: u32,
1061 success_count: u32,
1062}
1063
1064impl<T: PooledConnection> PooledConnectionHandle<T> {
1065 fn new(
1066 connection: T,
1067 pool_connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
1068 active_count: Arc<Mutex<usize>>,
1069 stats: Arc<RwLock<PoolStats>>,
1070 metrics: Arc<RwLock<PoolMetrics>>,
1071 adaptive_controller: Arc<RwLock<AdaptiveController>>,
1072 ) -> Self {
1073 Self {
1074 connection: Some(connection),
1075 pool_connections,
1076 active_count,
1077 stats,
1078 metrics,
1079 adaptive_controller,
1080 acquired_at: Instant::now(),
1081 execution_times: Vec::new(),
1082 operation_count: 0,
1083 success_count: 0,
1084 }
1085 }
1086
1087 pub fn record_operation(&mut self, execution_time: Duration, success: bool) {
1089 self.execution_times.push(execution_time);
1090 self.operation_count += 1;
1091 if success {
1092 self.success_count += 1;
1093 }
1094
1095 debug!(
1096 "Recorded operation: time={:?}, success={}, total_ops={}",
1097 execution_time, success, self.operation_count
1098 );
1099 }
1100
1101 pub fn get_operation_stats(&self) -> (u32, u32, Duration) {
1103 let avg_time = if !self.execution_times.is_empty() {
1104 self.execution_times.iter().sum::<Duration>() / self.execution_times.len() as u32
1105 } else {
1106 Duration::ZERO
1107 };
1108
1109 (self.operation_count, self.success_count, avg_time)
1110 }
1111
1112 pub fn held_duration(&self) -> Duration {
1114 self.acquired_at.elapsed()
1115 }
1116
1117 pub fn as_ref(&self) -> Option<&T> {
1119 self.connection.as_ref()
1120 }
1121
1122 pub fn as_mut(&mut self) -> Option<&mut T> {
1124 self.connection.as_mut()
1125 }
1126
1127 pub fn take(mut self) -> Option<T> {
1129 self.connection.take()
1130 }
1131}
1132
1133impl<T: PooledConnection> Drop for PooledConnectionHandle<T> {
1134 fn drop(&mut self) {
1135 if let Some(connection) = self.connection.take() {
1136 let pool_connections = self.pool_connections.clone();
1137 let active_count = self.active_count.clone();
1138 let stats = self.stats.clone();
1139 let _metrics = self.metrics.clone();
1140 let adaptive_controller = self.adaptive_controller.clone();
1141
1142 let total_held_time = self.acquired_at.elapsed();
1144 let avg_execution_time = if !self.execution_times.is_empty() {
1145 self.execution_times.iter().sum::<Duration>() / self.execution_times.len() as u32
1146 } else {
1147 Duration::from_millis(50) };
1149
1150 let success_rate = if self.operation_count > 0 {
1151 self.success_count as f64 / self.operation_count as f64
1152 } else {
1153 1.0 };
1155
1156 let overall_success = success_rate > 0.8; tokio::spawn(async move {
1159 let mut wrapper = PooledConnectionWrapper::new(connection);
1160 wrapper.record_usage(avg_execution_time, overall_success);
1161 wrapper.is_in_use = false;
1162
1163 let usage_count = wrapper.usage_count;
1164 pool_connections.lock().await.push_back(wrapper);
1165
1166 let mut active = active_count.lock().await;
1167 if *active > 0 {
1168 *active -= 1;
1169 }
1170
1171 stats.write().await.total_returned += 1;
1173
1174 let utilization = (*active as f64) / 10.0; adaptive_controller
1177 .write()
1178 .await
1179 .record_metrics(avg_execution_time, utilization);
1180
1181 debug!(
1182 "Returned connection to pool: held_time={:?}, ops={}, success_rate={:.2}",
1183 total_held_time, usage_count, success_rate
1184 );
1185 });
1186 }
1187 }
1188}
1189
1190impl<T: PooledConnection + Clone> ConnectionPool<T> {
1192 pub async fn new_from_config(
1194 config: &StreamConfig,
1195 factory: Arc<dyn ConnectionFactory<T>>,
1196 ) -> Result<Self> {
1197 let pool_config = PoolConfig {
1198 min_connections: 1,
1199 max_connections: config.max_connections,
1200 connection_timeout: config.connection_timeout,
1201 adaptive_sizing: true,
1202 enable_circuit_breaker: true,
1203 enable_metrics: true,
1204 ..Default::default()
1205 };
1206
1207 Self::new(pool_config, factory).await
1208 }
1209
1210 pub async fn health_check(&self) -> PoolStatus {
1212 self.status().await
1213 }
1214
1215 pub async fn get_detailed_metrics(&self) -> DetailedPoolMetrics {
1217 let status = self.status().await;
1218 let metrics = self.metrics.read().await;
1219 let stats = self.stats.read().await;
1220 let controller = self.adaptive_controller.read().await;
1221
1222 DetailedPoolMetrics {
1223 status,
1224 total_requests: metrics.total_requests,
1225 peak_size: metrics.peak_size,
1226 avg_wait_time_ms: metrics.avg_wait_time_ms,
1227 response_time_p50: metrics.response_time_p50,
1228 response_time_p95: metrics.response_time_p95,
1229 response_time_p99: metrics.response_time_p99,
1230 adaptive_scaling_events: stats.adaptive_scaling_events,
1231 circuit_breaker_failures: stats.circuit_breaker_failures,
1232 load_balancing_decisions: stats.load_balancing_decisions,
1233 current_target_size: controller.current_target_size,
1234 pool_uptime: self.created_at.elapsed(),
1235 }
1236 }
1237
1238 pub async fn reset_statistics(&self) {
1240 *self.stats.write().await = PoolStats::default();
1241 *self.metrics.write().await = PoolMetrics::default();
1242 info!("Pool statistics reset");
1243 }
1244
1245 pub async fn resize(&self, new_size: usize) -> Result<()> {
1247 if new_size < self.config.min_connections || new_size > self.config.max_connections {
1248 return Err(anyhow!(
1249 "New size {} outside allowed range [{}, {}]",
1250 new_size,
1251 self.config.min_connections,
1252 self.config.max_connections
1253 ));
1254 }
1255
1256 let mut controller = self.adaptive_controller.write().await;
1257 controller.current_target_size = new_size;
1258 controller.last_adjustment = Instant::now();
1259
1260 info!("Pool manually resized to {}", new_size);
1261 Ok(())
1262 }
1263
1264 pub async fn new_with_failover(
1266 config: PoolConfig,
1267 primary_factory: Arc<dyn ConnectionFactory<T>>,
1268 secondary_factory: Arc<dyn ConnectionFactory<T>>,
1269 failover_config: FailoverConfig,
1270 ) -> Result<Self> {
1271 let primary_endpoint = ConnectionEndpoint {
1273 name: "primary".to_string(),
1274 factory: primary_factory.clone(),
1275 priority: 1,
1276 metadata: HashMap::new(),
1277 };
1278
1279 let secondary_endpoint = ConnectionEndpoint {
1280 name: "secondary".to_string(),
1281 factory: secondary_factory,
1282 priority: 2,
1283 metadata: HashMap::new(),
1284 };
1285
1286 let failover_manager = Arc::new(
1288 FailoverManager::new(failover_config, primary_endpoint, secondary_endpoint).await?,
1289 );
1290
1291 let mut pool = Self::new(config, primary_factory).await?;
1293 pool.failover_manager = Some(failover_manager.clone());
1294
1295 let mut failover_events = failover_manager.subscribe();
1297 let stats = pool.stats.clone();
1298
1299 tokio::spawn(async move {
1300 while let Ok(event) = failover_events.recv().await {
1301 match event {
1302 crate::failover::FailoverEvent::FailoverCompleted { from, to, duration } => {
1303 info!(
1304 "Failover completed from {} to {} in {:?}",
1305 from, to, duration
1306 );
1307 stats.write().await.failover_count += 1;
1308 }
1309 crate::failover::FailoverEvent::FailbackCompleted { from, to, duration } => {
1310 info!(
1311 "Failback completed from {} to {} in {:?}",
1312 from, to, duration
1313 );
1314 }
1315 crate::failover::FailoverEvent::AllConnectionsUnavailable => {
1316 error!("All connections unavailable!");
1317 }
1318 _ => {}
1319 }
1320 }
1321 });
1322
1323 Ok(pool)
1324 }
1325
1326 pub async fn get_health_statistics(&self) -> crate::health_monitor::OverallHealthStatistics {
1328 self.health_monitor.get_overall_statistics().await
1329 }
1330
1331 pub async fn get_reconnection_statistics(&self) -> crate::reconnect::ReconnectStatistics {
1333 self.reconnect_manager.get_statistics().await
1334 }
1335
1336 pub async fn get_failover_statistics(&self) -> Option<crate::failover::FailoverStatistics> {
1338 if let Some(fm) = &self.failover_manager {
1339 Some(fm.get_statistics().await)
1340 } else {
1341 None
1342 }
1343 }
1344
1345 pub async fn register_failure_callback<F>(&self, callback: F)
1347 where
1348 F: Fn(String, String, u32) -> Pin<Box<dyn Future<Output = ()> + Send>>
1349 + Send
1350 + Sync
1351 + 'static,
1352 {
1353 self.reconnect_manager
1354 .register_failure_callback(callback)
1355 .await;
1356 }
1357
1358 pub async fn trigger_failover(&self) -> Result<()> {
1360 if let Some(fm) = &self.failover_manager {
1361 fm.trigger_failover().await
1362 } else {
1363 Err(anyhow!("Failover not configured for this pool"))
1364 }
1365 }
1366
1367 pub fn has_failover(&self) -> bool {
1369 self.failover_manager.is_some()
1370 }
1371
1372 pub async fn get_unhealthy_connections(&self) -> Vec<String> {
1374 self.health_monitor.get_unhealthy_connections().await
1375 }
1376
1377 pub fn subscribe_health_events(
1379 &self,
1380 ) -> broadcast::Receiver<crate::health_monitor::HealthEvent> {
1381 self.health_monitor.subscribe()
1382 }
1383
1384 pub fn subscribe_reconnect_events(
1386 &self,
1387 ) -> broadcast::Receiver<crate::reconnect::ReconnectEvent> {
1388 self.reconnect_manager.subscribe()
1389 }
1390}
1391
1392#[derive(Debug, Clone, Serialize, Deserialize)]
1394pub struct DetailedPoolMetrics {
1395 pub status: PoolStatus,
1396 pub total_requests: u64,
1397 pub peak_size: usize,
1398 pub avg_wait_time_ms: f64,
1399 #[serde(skip)]
1400 pub response_time_p50: Duration,
1401 #[serde(skip)]
1402 pub response_time_p95: Duration,
1403 #[serde(skip)]
1404 pub response_time_p99: Duration,
1405 pub adaptive_scaling_events: u64,
1406 pub circuit_breaker_failures: u64,
1407 pub load_balancing_decisions: u64,
1408 pub current_target_size: usize,
1409 #[serde(skip)]
1410 pub pool_uptime: Duration,
1411}
1412
1413#[cfg(test)]
1414mod tests {
1415 use super::*;
1416 use std::sync::atomic::{AtomicBool, Ordering};
1417
1418 #[derive(Debug, Clone)]
1419 struct TestConnection {
1420 id: u32,
1421 created_at: Instant,
1422 last_activity: Instant,
1423 is_healthy: Arc<AtomicBool>,
1424 is_closed: bool,
1425 }
1426
1427 impl TestConnection {
1428 fn new(id: u32) -> Self {
1429 let now = Instant::now();
1430 Self {
1431 id,
1432 created_at: now,
1433 last_activity: now,
1434 is_healthy: Arc::new(AtomicBool::new(true)),
1435 is_closed: false,
1436 }
1437 }
1438 }
1439
1440 #[async_trait::async_trait]
1441 impl PooledConnection for TestConnection {
1442 async fn is_healthy(&self) -> bool {
1443 !self.is_closed && self.is_healthy.load(Ordering::Relaxed)
1444 }
1445
1446 async fn close(&mut self) -> Result<()> {
1447 self.is_closed = true;
1448 Ok(())
1449 }
1450
1451 fn created_at(&self) -> Instant {
1452 self.created_at
1453 }
1454
1455 fn last_activity(&self) -> Instant {
1456 self.last_activity
1457 }
1458
1459 fn update_activity(&mut self) {
1460 self.last_activity = Instant::now();
1461 }
1462
1463 fn clone_connection(&self) -> Box<dyn PooledConnection> {
1464 Box::new(self.clone())
1465 }
1466 }
1467
1468 struct TestConnectionFactory {
1469 counter: Arc<Mutex<u32>>,
1470 }
1471
1472 impl TestConnectionFactory {
1473 fn new() -> Self {
1474 Self {
1475 counter: Arc::new(Mutex::new(0)),
1476 }
1477 }
1478 }
1479
1480 #[async_trait::async_trait]
1481 impl ConnectionFactory<TestConnection> for TestConnectionFactory {
1482 async fn create_connection(&self) -> Result<TestConnection> {
1483 let mut counter = self.counter.lock().await;
1484 *counter += 1;
1485 Ok(TestConnection::new(*counter))
1486 }
1487 }
1488
1489 #[tokio::test]
1490 async fn test_pool_creation() {
1491 let config = PoolConfig {
1492 min_connections: 2,
1493 max_connections: 5,
1494 ..Default::default()
1495 };
1496
1497 let factory = Arc::new(TestConnectionFactory::new());
1498 let pool = ConnectionPool::new(config, factory).await.unwrap();
1499
1500 let status = pool.status().await;
1501 assert_eq!(status.idle_connections, 2);
1502 assert_eq!(status.active_connections, 0);
1503 }
1504
1505 #[tokio::test]
1506 async fn test_connection_borrowing() {
1507 let config = PoolConfig {
1508 min_connections: 1,
1509 max_connections: 3,
1510 ..Default::default()
1511 };
1512
1513 let factory = Arc::new(TestConnectionFactory::new());
1514 let pool = ConnectionPool::new(config, factory).await.unwrap();
1515
1516 let mut handle = pool.get_connection().await.unwrap();
1517
1518 let status = pool.status().await;
1519 assert_eq!(status.active_connections, 1);
1520 assert_eq!(status.idle_connections, 0);
1521 assert!(status.is_healthy);
1522
1523 handle.record_operation(Duration::from_millis(50), true);
1525 handle.record_operation(Duration::from_millis(75), true);
1526
1527 let (ops, successes, avg_time) = handle.get_operation_stats();
1528 assert_eq!(ops, 2);
1529 assert_eq!(successes, 2);
1530 assert!(avg_time > Duration::ZERO);
1531
1532 drop(handle);
1533
1534 tokio::time::sleep(Duration::from_millis(10)).await;
1536
1537 let status = pool.status().await;
1538 assert_eq!(status.active_connections, 0);
1539 assert_eq!(status.idle_connections, 1);
1540 }
1541
1542 #[tokio::test]
1543 async fn test_load_balancing_strategies() {
1544 for strategy in [
1545 LoadBalancingStrategy::RoundRobin,
1546 LoadBalancingStrategy::Random,
1547 LoadBalancingStrategy::LeastRecentlyUsed,
1548 LoadBalancingStrategy::LeastConnections,
1549 LoadBalancingStrategy::WeightedRoundRobin,
1550 ] {
1551 let config = PoolConfig {
1552 min_connections: 3,
1553 max_connections: 5,
1554 load_balancing: strategy.clone(),
1555 ..Default::default()
1556 };
1557
1558 let factory = Arc::new(TestConnectionFactory::new());
1559 let pool = ConnectionPool::new(config, factory).await.unwrap();
1560
1561 let handles: Vec<_> =
1563 futures_util::future::join_all((0..3).map(|_| pool.get_connection()))
1564 .await
1565 .into_iter()
1566 .collect::<Result<Vec<_>, _>>()
1567 .unwrap();
1568
1569 let status = pool.status().await;
1570 assert_eq!(status.active_connections, 3);
1571 assert_eq!(status.load_balancing_strategy, strategy);
1572
1573 drop(handles);
1574 tokio::time::sleep(Duration::from_millis(10)).await;
1575 }
1576 }
1577
1578 #[tokio::test]
1579 async fn test_circuit_breaker_integration() {
1580 let config = PoolConfig {
1581 min_connections: 1,
1582 max_connections: 2,
1583 enable_circuit_breaker: true,
1584 circuit_breaker_config: Some(CircuitBreakerConfig {
1585 failure_threshold: 2,
1586 timeout: Duration::from_millis(50),
1587 ..Default::default()
1588 }),
1589 ..Default::default()
1590 };
1591
1592 let factory = Arc::new(TestConnectionFactory::new());
1593 let pool = ConnectionPool::new(config, factory).await.unwrap();
1594
1595 let handle = pool.get_connection().await;
1597 assert!(handle.is_ok());
1598 drop(handle);
1599
1600 let status = pool.status().await;
1602 assert!(!status.circuit_breaker_open);
1603 }
1604
1605 #[tokio::test]
1606 async fn test_adaptive_sizing() {
1607 let config = PoolConfig {
1608 min_connections: 1,
1609 max_connections: 5,
1610 adaptive_sizing: true,
1611 target_response_time_ms: 50,
1612 ..Default::default()
1613 };
1614
1615 let factory = Arc::new(TestConnectionFactory::new());
1616 let pool = ConnectionPool::new(config, factory).await.unwrap();
1617
1618 let metrics = pool.get_detailed_metrics().await;
1619 assert_eq!(metrics.current_target_size, 1); assert!(metrics.adaptive_scaling_events == 0);
1621
1622 pool.resize(3).await.unwrap();
1624 let metrics = pool.get_detailed_metrics().await;
1625 assert_eq!(metrics.current_target_size, 3);
1626 }
1627
1628 #[tokio::test]
1629 async fn test_detailed_metrics() {
1630 let config = PoolConfig {
1631 min_connections: 2,
1632 max_connections: 4,
1633 enable_metrics: true,
1634 ..Default::default()
1635 };
1636
1637 let factory = Arc::new(TestConnectionFactory::new());
1638 let pool = ConnectionPool::new(config, factory).await.unwrap();
1639
1640 let handles: Vec<_> = futures_util::future::join_all((0..3).map(|_| pool.get_connection()))
1642 .await
1643 .into_iter()
1644 .collect::<Result<Vec<_>, _>>()
1645 .unwrap();
1646
1647 let metrics = pool.get_detailed_metrics().await;
1648 assert!(metrics.total_requests >= 3);
1649 assert!(metrics.status.utilization_percent > 0.0);
1650 assert!(metrics.pool_uptime > Duration::ZERO);
1651 assert_eq!(metrics.status.active_connections, 3);
1652
1653 drop(handles);
1654
1655 pool.reset_statistics().await;
1657 let metrics = pool.get_detailed_metrics().await;
1658 assert_eq!(metrics.adaptive_scaling_events, 0);
1659 }
1660}