Skip to main content

oxirs_stream/
connection_pool.rs

1//! # Advanced Connection Pool Implementation
2//!
3//! Enterprise-grade connection pooling with adaptive sizing, health monitoring,
4//! metrics collection, circuit breaker integration, and intelligent load balancing
5//! for high-throughput streaming scenarios.
6
7use crate::{
8    circuit_breaker::{
9        new_shared_circuit_breaker, CircuitBreakerConfig, FailureType, SharedCircuitBreaker,
10        SharedCircuitBreakerExt,
11    },
12    failover::{ConnectionEndpoint, FailoverConfig, FailoverManager},
13    health_monitor::{HealthCheckConfig, HealthMonitor, HealthStatus},
14    reconnect::{ReconnectConfig, ReconnectManager, ReconnectStrategy},
15    StreamConfig,
16};
17use anyhow::{anyhow, Result};
18use fastrand;
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, VecDeque};
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::{
24    atomic::{AtomicUsize, Ordering},
25    Arc,
26};
27
28#[cfg(test)]
29use futures_util;
30use std::time::{Duration, Instant};
31use tokio::sync::{broadcast, Mutex, RwLock, Semaphore};
32use tracing::{debug, error, info, warn};
33use uuid::Uuid;
34
35/// Connection pool configuration with advanced features
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct PoolConfig {
38    pub min_connections: usize,
39    pub max_connections: usize,
40    pub connection_timeout: Duration,
41    pub idle_timeout: Duration,
42    pub max_lifetime: Duration,
43    pub health_check_interval: Duration,
44    pub retry_attempts: u32,
45    /// Enable adaptive pool sizing based on load
46    pub adaptive_sizing: bool,
47    /// Target response time for adaptive sizing (milliseconds)
48    pub target_response_time_ms: u64,
49    /// Load balancing strategy for connection distribution
50    pub load_balancing: LoadBalancingStrategy,
51    /// Enable circuit breaker for connection failures
52    pub enable_circuit_breaker: bool,
53    /// Circuit breaker configuration
54    pub circuit_breaker_config: Option<CircuitBreakerConfig>,
55    /// Enable comprehensive metrics collection
56    pub enable_metrics: bool,
57    /// Connection validation timeout
58    pub validation_timeout: Duration,
59    /// Maximum wait time for acquiring a connection
60    pub acquire_timeout: Duration,
61}
62
63impl Default for PoolConfig {
64    fn default() -> Self {
65        Self {
66            min_connections: 1,
67            max_connections: 10,
68            connection_timeout: Duration::from_secs(30),
69            idle_timeout: Duration::from_secs(300), // 5 minutes
70            max_lifetime: Duration::from_secs(1800), // 30 minutes
71            health_check_interval: Duration::from_secs(60),
72            retry_attempts: 3,
73            adaptive_sizing: true,
74            target_response_time_ms: 100,
75            load_balancing: LoadBalancingStrategy::RoundRobin,
76            enable_circuit_breaker: true,
77            circuit_breaker_config: Some(CircuitBreakerConfig::default()),
78            enable_metrics: true,
79            validation_timeout: Duration::from_secs(5),
80            acquire_timeout: Duration::from_secs(30),
81        }
82    }
83}
84
85/// Load balancing strategies for connection distribution
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
87pub enum LoadBalancingStrategy {
88    /// Round-robin selection
89    RoundRobin,
90    /// Least recently used
91    LeastRecentlyUsed,
92    /// Random selection
93    Random,
94    /// Least connections (best for varying load)
95    LeastConnections,
96    /// Weighted round-robin based on response times
97    WeightedRoundRobin,
98}
99
100/// Connection pool status with comprehensive metrics
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct PoolStatus {
103    pub total_connections: usize,
104    pub active_connections: usize,
105    pub idle_connections: usize,
106    pub pending_requests: usize,
107    pub is_healthy: bool,
108    #[serde(skip)]
109    pub last_health_check: Option<Instant>,
110    /// Current pool utilization percentage
111    pub utilization_percent: f64,
112    /// Average response time across all connections
113    pub avg_response_time_ms: f64,
114    /// Current load balancing strategy
115    pub load_balancing_strategy: LoadBalancingStrategy,
116    /// Circuit breaker status
117    pub circuit_breaker_open: bool,
118    /// Pool configuration hash for validation
119    pub config_hash: u64,
120}
121
122/// Generic connection trait
123#[async_trait::async_trait]
124pub trait PooledConnection: Send + Sync + 'static {
125    /// Check if connection is healthy
126    async fn is_healthy(&self) -> bool;
127
128    /// Close the connection
129    async fn close(&mut self) -> Result<()>;
130
131    /// Get connection creation time
132    fn created_at(&self) -> Instant;
133
134    /// Get last activity time
135    fn last_activity(&self) -> Instant;
136
137    /// Update last activity time
138    fn update_activity(&mut self);
139
140    /// Clone the connection (custom method for trait object compatibility)
141    fn clone_connection(&self) -> Box<dyn PooledConnection>;
142}
143
144/// Implement PooledConnection for `Box<dyn PooledConnection>` to enable trait object usage
145#[async_trait::async_trait]
146impl PooledConnection for Box<dyn PooledConnection> {
147    async fn is_healthy(&self) -> bool {
148        self.as_ref().is_healthy().await
149    }
150
151    async fn close(&mut self) -> Result<()> {
152        self.as_mut().close().await
153    }
154
155    fn created_at(&self) -> Instant {
156        self.as_ref().created_at()
157    }
158
159    fn last_activity(&self) -> Instant {
160        self.as_ref().last_activity()
161    }
162
163    fn update_activity(&mut self) {
164        self.as_mut().update_activity()
165    }
166
167    fn clone_connection(&self) -> Box<dyn PooledConnection> {
168        self.as_ref().clone_connection()
169    }
170}
171
172/// Connection wrapper with comprehensive metadata and monitoring
173struct PooledConnectionWrapper<T: PooledConnection> {
174    connection: T,
175    created_at: Instant,
176    last_activity: Instant,
177    is_in_use: bool,
178    /// Unique connection identifier
179    connection_id: String,
180    /// Connection usage statistics
181    usage_count: u64,
182    /// Total time spent executing operations
183    total_execution_time: Duration,
184    /// Average response time for this connection
185    avg_response_time: Duration,
186    /// Number of failures on this connection
187    failure_count: u32,
188    /// Last known health status
189    last_health_check: Option<(Instant, bool)>,
190    /// Connection weight for load balancing
191    weight: f64,
192}
193
194impl<T: PooledConnection> PooledConnectionWrapper<T> {
195    fn new(connection: T) -> Self {
196        let now = Instant::now();
197        Self {
198            connection,
199            created_at: now,
200            last_activity: now,
201            is_in_use: false,
202            connection_id: Uuid::new_v4().to_string(),
203            usage_count: 0,
204            total_execution_time: Duration::ZERO,
205            avg_response_time: Duration::from_millis(50), // Default baseline
206            failure_count: 0,
207            last_health_check: None,
208            weight: 1.0, // Default weight
209        }
210    }
211
212    /// Update connection usage statistics
213    fn record_usage(&mut self, execution_time: Duration, success: bool) {
214        self.usage_count += 1;
215        self.last_activity = Instant::now();
216        self.total_execution_time += execution_time;
217
218        // Update average response time with exponential moving average
219        let alpha = 0.1; // Smoothing factor
220        let new_time_ms = execution_time.as_millis() as f64;
221        let current_avg_ms = self.avg_response_time.as_millis() as f64;
222        let updated_avg_ms = alpha * new_time_ms + (1.0 - alpha) * current_avg_ms;
223        self.avg_response_time = Duration::from_millis(updated_avg_ms as u64);
224
225        if !success {
226            self.failure_count += 1;
227            // Reduce weight for failing connections
228            self.weight = (self.weight * 0.9).max(0.1);
229        } else if self.failure_count > 0 {
230            // Gradually restore weight for recovering connections
231            self.weight = (self.weight * 1.01).min(1.0);
232        }
233    }
234
235    /// Get connection efficiency score for load balancing
236    fn efficiency_score(&self) -> f64 {
237        if self.usage_count == 0 {
238            return 1.0;
239        }
240
241        let failure_rate = self.failure_count as f64 / self.usage_count as f64;
242        let response_time_penalty = (self.avg_response_time.as_millis() as f64).ln() / 10.0;
243
244        (1.0 - failure_rate) * self.weight / (1.0 + response_time_penalty)
245    }
246
247    fn is_expired(&self, max_lifetime: Duration, idle_timeout: Duration) -> bool {
248        let now = Instant::now();
249        now.duration_since(self.created_at) > max_lifetime
250            || (!self.is_in_use && now.duration_since(self.last_activity) > idle_timeout)
251    }
252
253    async fn is_healthy(&self) -> bool {
254        self.connection.is_healthy().await
255    }
256}
257
258/// Advanced connection pool implementation with monitoring and load balancing
259pub struct ConnectionPool<T: PooledConnection + Clone> {
260    config: PoolConfig,
261    connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
262    active_count: Arc<Mutex<usize>>,
263    semaphore: Arc<Semaphore>,
264    stats: Arc<RwLock<PoolStats>>,
265    connection_factory: Arc<dyn ConnectionFactory<T>>,
266    /// Circuit breaker for connection failures
267    circuit_breaker: Option<SharedCircuitBreaker>,
268    /// Round-robin counter for load balancing
269    round_robin_counter: Arc<AtomicUsize>,
270    /// Pool metrics and monitoring
271    metrics: Arc<RwLock<PoolMetrics>>,
272    /// Pending connection requests queue
273    pending_requests: Arc<AtomicUsize>,
274    /// Pool creation timestamp
275    created_at: Instant,
276    /// Adaptive sizing controller
277    adaptive_controller: Arc<RwLock<AdaptiveController>>,
278    /// Health monitor for connection health tracking
279    health_monitor: Arc<HealthMonitor<T>>,
280    /// Reconnection manager for automatic reconnection
281    reconnect_manager: Arc<ReconnectManager<T>>,
282    /// Failover manager for primary/secondary failover
283    failover_manager: Option<Arc<FailoverManager<T>>>,
284}
285
286/// Connection factory trait
287#[async_trait::async_trait]
288pub trait ConnectionFactory<T: PooledConnection + Clone>: Send + Sync {
289    async fn create_connection(&self) -> Result<T>;
290}
291
292/// Pool statistics with enhanced metrics
293#[derive(Debug, Default, Clone)]
294pub struct PoolStats {
295    total_created: u64,
296    total_destroyed: u64,
297    total_borrowed: u64,
298    total_returned: u64,
299    creation_failures: u64,
300    health_check_failures: u64,
301    timeouts: u64,
302    circuit_breaker_failures: u64,
303    adaptive_scaling_events: u64,
304    load_balancing_decisions: u64,
305    failover_count: u64,
306}
307
308/// Comprehensive pool metrics for monitoring
309#[derive(Debug, Clone)]
310struct PoolMetrics {
311    /// Current pool size
312    current_size: usize,
313    /// Peak pool size reached
314    peak_size: usize,
315    /// Total connection requests
316    total_requests: u64,
317    /// Average wait time for connections
318    avg_wait_time_ms: f64,
319    /// Pool utilization over time
320    utilization_history: VecDeque<(Instant, f64)>,
321    /// Response time percentiles
322    response_time_p50: Duration,
323    response_time_p95: Duration,
324    response_time_p99: Duration,
325    /// Error rates by type
326    error_rates: HashMap<String, f64>,
327    /// Last update timestamp
328    last_updated: Instant,
329}
330
331impl Default for PoolMetrics {
332    fn default() -> Self {
333        Self {
334            current_size: 0,
335            peak_size: 0,
336            total_requests: 0,
337            avg_wait_time_ms: 0.0,
338            utilization_history: VecDeque::new(),
339            response_time_p50: Duration::ZERO,
340            response_time_p95: Duration::ZERO,
341            response_time_p99: Duration::ZERO,
342            error_rates: HashMap::new(),
343            last_updated: Instant::now(),
344        }
345    }
346}
347
348/// Adaptive sizing controller
349#[derive(Debug, Clone)]
350struct AdaptiveController {
351    enabled: bool,
352    target_response_time: Duration,
353    last_adjustment: Instant,
354    adjustment_cooldown: Duration,
355    current_target_size: usize,
356    response_time_samples: VecDeque<Duration>,
357    utilization_samples: VecDeque<f64>,
358}
359
360impl Default for AdaptiveController {
361    fn default() -> Self {
362        Self {
363            enabled: false,
364            target_response_time: Duration::from_millis(100),
365            last_adjustment: Instant::now(),
366            adjustment_cooldown: Duration::from_secs(60),
367            current_target_size: 1,
368            response_time_samples: VecDeque::with_capacity(100),
369            utilization_samples: VecDeque::with_capacity(100),
370        }
371    }
372}
373
374impl AdaptiveController {
375    fn should_scale_up(
376        &self,
377        _current_size: usize,
378        avg_response_time: Duration,
379        utilization: f64,
380    ) -> bool {
381        if !self.enabled || self.last_adjustment.elapsed() < self.adjustment_cooldown {
382            return false;
383        }
384
385        avg_response_time > self.target_response_time && utilization > 0.8
386    }
387
388    fn should_scale_down(
389        &self,
390        current_size: usize,
391        avg_response_time: Duration,
392        utilization: f64,
393    ) -> bool {
394        if !self.enabled
395            || self.last_adjustment.elapsed() < self.adjustment_cooldown
396            || current_size <= 1
397        {
398            return false;
399        }
400
401        avg_response_time < self.target_response_time / 2 && utilization < 0.3
402    }
403
404    fn record_metrics(&mut self, response_time: Duration, utilization: f64) {
405        self.response_time_samples.push_back(response_time);
406        if self.response_time_samples.len() > 100 {
407            self.response_time_samples.pop_front();
408        }
409
410        self.utilization_samples.push_back(utilization);
411        if self.utilization_samples.len() > 100 {
412            self.utilization_samples.pop_front();
413        }
414    }
415}
416
417impl<T: PooledConnection + Clone> ConnectionPool<T> {
418    /// Create a new advanced connection pool with monitoring and circuit breaker
419    pub async fn new(config: PoolConfig, factory: Arc<dyn ConnectionFactory<T>>) -> Result<Self> {
420        // Initialize circuit breaker if enabled
421        let circuit_breaker = if config.enable_circuit_breaker {
422            Some(new_shared_circuit_breaker(
423                config.circuit_breaker_config.clone().unwrap_or_default(),
424            ))
425        } else {
426            None
427        };
428
429        // Initialize adaptive controller
430        let adaptive_controller = AdaptiveController {
431            enabled: config.adaptive_sizing,
432            target_response_time: Duration::from_millis(config.target_response_time_ms),
433            current_target_size: config.min_connections,
434            ..Default::default()
435        };
436
437        // Initialize health monitor
438        let health_check_config = HealthCheckConfig {
439            check_interval: config.health_check_interval,
440            check_timeout: config.validation_timeout,
441            enable_statistics: config.enable_metrics,
442            ..Default::default()
443        };
444        let health_monitor = Arc::new(HealthMonitor::new(health_check_config));
445
446        // Initialize reconnection manager
447        let reconnect_config = ReconnectConfig {
448            initial_delay: Duration::from_millis(100),
449            max_delay: Duration::from_secs(30),
450            max_attempts: config.retry_attempts,
451            connection_timeout: config.connection_timeout,
452            ..Default::default()
453        };
454        let reconnect_manager = Arc::new(ReconnectManager::new(
455            reconnect_config,
456            ReconnectStrategy::ExponentialBackoff,
457        ));
458
459        let pool = Self {
460            semaphore: Arc::new(Semaphore::new(config.max_connections)),
461            connections: Arc::new(Mutex::new(VecDeque::new())),
462            active_count: Arc::new(Mutex::new(0)),
463            stats: Arc::new(RwLock::new(PoolStats::default())),
464            connection_factory: factory,
465            circuit_breaker,
466            round_robin_counter: Arc::new(AtomicUsize::new(0)),
467            metrics: Arc::new(RwLock::new(PoolMetrics::default())),
468            pending_requests: Arc::new(AtomicUsize::new(0)),
469            created_at: Instant::now(),
470            adaptive_controller: Arc::new(RwLock::new(adaptive_controller)),
471            health_monitor,
472            reconnect_manager,
473            failover_manager: None,
474            config,
475        };
476
477        // Initialize minimum connections
478        pool.ensure_min_connections().await?;
479
480        // Start background maintenance task
481        pool.start_maintenance_task().await;
482
483        // Start adaptive sizing task if enabled
484        if pool.config.adaptive_sizing {
485            pool.start_adaptive_sizing_task().await;
486        }
487
488        // Start health monitoring
489        pool.start_health_monitoring().await;
490
491        info!(
492            "Created advanced connection pool with health monitoring, automatic reconnection, and {} features",
493            if pool.circuit_breaker.is_some() {
494                "circuit breaker"
495            } else {
496                "standard"
497            }
498        );
499
500        Ok(pool)
501    }
502
503    /// Get a connection from the pool with advanced load balancing and monitoring
504    pub async fn get_connection(&self) -> Result<PooledConnectionHandle<T>> {
505        let start_time = Instant::now();
506        self.pending_requests.fetch_add(1, Ordering::Relaxed);
507
508        // Check circuit breaker if enabled
509        if let Some(cb) = &self.circuit_breaker {
510            if !cb.can_execute().await {
511                self.pending_requests.fetch_sub(1, Ordering::Relaxed);
512                return Err(anyhow!(
513                    "Circuit breaker is open - connection pool unavailable"
514                ));
515            }
516        }
517
518        // Acquire permit with timeout
519        let _permit = tokio::time::timeout(self.config.acquire_timeout, self.semaphore.acquire())
520            .await
521            .map_err(|_| anyhow!("Timeout acquiring connection from pool"))?
522            .map_err(|_| anyhow!("Failed to acquire semaphore permit"))?;
523
524        let connection = match self.try_get_existing_connection_with_lb().await {
525            Some(conn) => {
526                // Record successful circuit breaker operation
527                if let Some(cb) = &self.circuit_breaker {
528                    cb.record_success_with_duration(start_time.elapsed()).await;
529                }
530                conn
531            }
532            None => match self.create_new_connection().await {
533                Ok(conn) => {
534                    if let Some(cb) = &self.circuit_breaker {
535                        cb.record_success_with_duration(start_time.elapsed()).await;
536                    }
537                    conn
538                }
539                Err(e) => {
540                    if let Some(cb) = &self.circuit_breaker {
541                        cb.record_failure_with_type(FailureType::NetworkError).await;
542                    }
543                    self.pending_requests.fetch_sub(1, Ordering::Relaxed);
544                    return Err(e);
545                }
546            },
547        };
548
549        *self.active_count.lock().await += 1;
550        let mut stats = self.stats.write().await;
551        stats.total_borrowed += 1;
552        stats.load_balancing_decisions += 1;
553        drop(stats);
554
555        // Update metrics
556        let wait_time = start_time.elapsed();
557        self.update_metrics(wait_time).await;
558
559        self.pending_requests.fetch_sub(1, Ordering::Relaxed);
560
561        Ok(PooledConnectionHandle::new(
562            connection,
563            self.connections.clone(),
564            self.active_count.clone(),
565            self.stats.clone(),
566            self.metrics.clone(),
567            self.adaptive_controller.clone(),
568        ))
569    }
570
571    /// Try to get an existing healthy connection with load balancing
572    async fn try_get_existing_connection_with_lb(&self) -> Option<T> {
573        let mut connections = self.connections.lock().await;
574
575        if connections.is_empty() {
576            return None;
577        }
578
579        // Apply load balancing strategy
580        let selected_index = match self.config.load_balancing {
581            LoadBalancingStrategy::RoundRobin => {
582                self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % connections.len()
583            }
584            LoadBalancingStrategy::Random => fastrand::usize(..connections.len()),
585            LoadBalancingStrategy::LeastRecentlyUsed => {
586                // Find connection with oldest last_activity
587                connections
588                    .iter()
589                    .enumerate()
590                    .min_by_key(|(_, wrapper)| wrapper.last_activity)
591                    .map(|(idx, _)| idx)
592                    .unwrap_or(0)
593            }
594            LoadBalancingStrategy::LeastConnections => {
595                // Find connection with lowest usage_count
596                connections
597                    .iter()
598                    .enumerate()
599                    .min_by_key(|(_, wrapper)| wrapper.usage_count)
600                    .map(|(idx, _)| idx)
601                    .unwrap_or(0)
602            }
603            LoadBalancingStrategy::WeightedRoundRobin => {
604                // Select based on efficiency score
605                connections
606                    .iter()
607                    .enumerate()
608                    .max_by(|(_, a), (_, b)| {
609                        a.efficiency_score()
610                            .partial_cmp(&b.efficiency_score())
611                            .unwrap_or(std::cmp::Ordering::Equal)
612                    })
613                    .map(|(idx, _)| idx)
614                    .unwrap_or(0)
615            }
616        };
617
618        // Try to get the selected connection, falling back to linear search
619        for attempt in 0..connections.len() {
620            let index = (selected_index + attempt) % connections.len();
621
622            if let Some(mut wrapper) = connections.remove(index) {
623                if wrapper.is_expired(self.config.max_lifetime, self.config.idle_timeout) {
624                    // Connection expired, destroy it
625                    if let Err(e) = wrapper.connection.close().await {
626                        warn!("Failed to close expired connection: {}", e);
627                    }
628                    self.stats.write().await.total_destroyed += 1;
629                    continue;
630                }
631
632                // Validate connection health with timeout
633                let health_check =
634                    tokio::time::timeout(self.config.validation_timeout, wrapper.is_healthy())
635                        .await;
636
637                match health_check {
638                    Ok(true) => {
639                        wrapper.is_in_use = true;
640                        wrapper.last_activity = Instant::now();
641                        wrapper.last_health_check = Some((Instant::now(), true));
642                        debug!(
643                            "Selected connection {} using {:?} strategy",
644                            wrapper.connection_id, self.config.load_balancing
645                        );
646                        return Some(wrapper.connection);
647                    }
648                    Ok(false) | Err(_) => {
649                        // Connection unhealthy or timed out, destroy it
650                        if let Err(e) = wrapper.connection.close().await {
651                            warn!("Failed to close unhealthy connection: {}", e);
652                        }
653                        let mut stats = self.stats.write().await;
654                        stats.health_check_failures += 1;
655                        stats.total_destroyed += 1;
656                        continue;
657                    }
658                }
659            }
660        }
661
662        None
663    }
664
665    /// Create a new connection
666    async fn create_new_connection(&self) -> Result<T> {
667        match self.connection_factory.create_connection().await {
668            Ok(connection) => {
669                self.stats.write().await.total_created += 1;
670                debug!("Created new connection");
671                Ok(connection)
672            }
673            Err(e) => {
674                self.stats.write().await.creation_failures += 1;
675                error!("Failed to create connection: {}", e);
676                Err(e)
677            }
678        }
679    }
680
681    /// Return a connection to the pool with usage tracking
682    async fn return_connection_with_metrics(
683        &self,
684        mut connection: T,
685        execution_time: Duration,
686        success: bool,
687    ) {
688        connection.update_activity();
689
690        let mut wrapper = PooledConnectionWrapper::new(connection);
691        wrapper.record_usage(execution_time, success);
692        wrapper.is_in_use = false;
693
694        self.connections.lock().await.push_back(wrapper);
695
696        let mut active_count = self.active_count.lock().await;
697        if *active_count > 0 {
698            *active_count -= 1;
699        }
700
701        self.stats.write().await.total_returned += 1;
702
703        // Record metrics for adaptive sizing
704        let mut controller = self.adaptive_controller.write().await;
705        let utilization = (*active_count as f64) / (self.config.max_connections as f64);
706        controller.record_metrics(execution_time, utilization);
707
708        debug!(
709            "Returned connection to pool with metrics: exec_time={:?}, success={}",
710            execution_time, success
711        );
712    }
713
714    /// Legacy method for backward compatibility
715    async fn return_connection(&self, connection: T) {
716        self.return_connection_with_metrics(connection, Duration::from_millis(100), true)
717            .await;
718    }
719
720    /// Ensure minimum connections are available
721    async fn ensure_min_connections(&self) -> Result<()> {
722        let current_count = self.connections.lock().await.len();
723        let active_count = *self.active_count.lock().await;
724        let total_count = current_count + active_count;
725
726        if total_count < self.config.min_connections {
727            let needed = self.config.min_connections - total_count;
728
729            for _ in 0..needed {
730                match self.create_new_connection().await {
731                    Ok(connection) => {
732                        let wrapper = PooledConnectionWrapper::new(connection);
733                        self.connections.lock().await.push_back(wrapper);
734                    }
735                    Err(e) => {
736                        warn!("Failed to create minimum connection: {}", e);
737                        break;
738                    }
739                }
740            }
741        }
742
743        Ok(())
744    }
745
746    /// Start background maintenance task with health monitoring
747    async fn start_maintenance_task(&self) {
748        let connections = self.connections.clone();
749        let stats = self.stats.clone();
750        let config = self.config.clone();
751        let health_monitor = self.health_monitor.clone();
752        let reconnect_manager = self.reconnect_manager.clone();
753        let connection_factory = self.connection_factory.clone();
754
755        tokio::spawn(async move {
756            let mut interval = tokio::time::interval(config.health_check_interval);
757
758            loop {
759                interval.tick().await;
760
761                let mut connections_guard = connections.lock().await;
762                let mut to_remove = Vec::new();
763                let mut to_reconnect = Vec::new();
764
765                for (index, wrapper) in connections_guard.iter().enumerate() {
766                    let conn_id = wrapper.connection_id.clone();
767
768                    // Check if connection is expired
769                    if wrapper.is_expired(config.max_lifetime, config.idle_timeout) {
770                        to_remove.push(index);
771                        health_monitor.unregister_connection(&conn_id).await;
772                    } else {
773                        // Perform health check via health monitor
774                        let health_status = health_monitor
775                            .check_connection_health(&conn_id, &wrapper.connection)
776                            .await
777                            .unwrap_or(HealthStatus::Unknown);
778
779                        match health_status {
780                            HealthStatus::Dead => {
781                                to_remove.push(index);
782                                health_monitor.unregister_connection(&conn_id).await;
783                            }
784                            HealthStatus::Unhealthy => {
785                                to_reconnect.push((index, conn_id.clone()));
786                            }
787                            _ => {}
788                        }
789                    }
790                }
791
792                // Remove dead connections in reverse order
793                for &index in to_remove.iter().rev() {
794                    if let Some(mut wrapper) = connections_guard.remove(index) {
795                        if let Err(e) = wrapper.connection.close().await {
796                            warn!("Failed to close connection during maintenance: {}", e);
797                        }
798                        stats.write().await.total_destroyed += 1;
799                    }
800                }
801
802                // Attempt to reconnect unhealthy connections
803                let to_reconnect_count = to_reconnect.len();
804                for (index, conn_id) in &to_reconnect {
805                    if *index < connections_guard.len() {
806                        match reconnect_manager
807                            .reconnect(conn_id.clone(), connection_factory.clone())
808                            .await
809                        {
810                            Ok(new_conn) => {
811                                let mut new_wrapper = PooledConnectionWrapper::new(new_conn);
812                                new_wrapper.connection_id = conn_id.clone();
813
814                                // Register new connection with health monitor
815                                let mut metadata = HashMap::new();
816                                metadata.insert("pool_id".to_string(), "main".to_string());
817                                health_monitor
818                                    .register_connection(conn_id.clone(), metadata)
819                                    .await;
820
821                                connections_guard[*index] = new_wrapper;
822                                info!("Successfully reconnected connection {}", conn_id);
823                            }
824                            Err(e) => {
825                                warn!("Failed to reconnect connection {}: {}", conn_id, e);
826                                // Remove the failed connection
827                                connections_guard.remove(*index);
828                                stats.write().await.total_destroyed += 1;
829                            }
830                        }
831                    }
832                }
833
834                // Get dead connections from health monitor
835                let dead_connections = health_monitor.get_dead_connections().await;
836                if !dead_connections.is_empty() {
837                    warn!(
838                        "Health monitor detected {} dead connections",
839                        dead_connections.len()
840                    );
841                }
842
843                debug!(
844                    "Pool maintenance completed, removed {} connections, attempted {} reconnections",
845                    to_remove.len(),
846                    to_reconnect_count
847                );
848            }
849        });
850    }
851
852    /// Start health monitoring for all connections
853    async fn start_health_monitoring(&self) {
854        // Register existing connections with health monitor
855        let connections = self.connections.lock().await;
856        for wrapper in connections.iter() {
857            let mut metadata = HashMap::new();
858            metadata.insert("pool_id".to_string(), "main".to_string());
859            metadata.insert(
860                "created_at".to_string(),
861                wrapper.created_at.elapsed().as_secs().to_string(),
862            );
863
864            self.health_monitor
865                .register_connection(wrapper.connection_id.clone(), metadata)
866                .await;
867        }
868
869        // Subscribe to health events
870        let mut health_events = self.health_monitor.subscribe();
871        let stats = self.stats.clone();
872
873        tokio::spawn(async move {
874            while let Ok(event) = health_events.recv().await {
875                match event {
876                    crate::health_monitor::HealthEvent::ConnectionDead {
877                        connection_id,
878                        reason,
879                    } => {
880                        error!("Connection {} marked as dead: {}", connection_id, reason);
881                        stats.write().await.health_check_failures += 1;
882                    }
883                    crate::health_monitor::HealthEvent::ConnectionRecovered { connection_id } => {
884                        info!("Connection {} recovered", connection_id);
885                    }
886                    crate::health_monitor::HealthEvent::StatusChanged {
887                        connection_id,
888                        old_status,
889                        new_status,
890                    } => {
891                        debug!(
892                            "Connection {} status changed from {:?} to {:?}",
893                            connection_id, old_status, new_status
894                        );
895                    }
896                    _ => {}
897                }
898            }
899        });
900    }
901
902    /// Get comprehensive pool status with advanced metrics
903    pub async fn status(&self) -> PoolStatus {
904        let connections = self.connections.lock().await;
905        let active_count = *self.active_count.lock().await;
906        let metrics = self.metrics.read().await;
907        let pending = self.pending_requests.load(Ordering::Relaxed);
908
909        let total_connections = connections.len() + active_count;
910        let utilization = if self.config.max_connections > 0 {
911            (total_connections as f64 / self.config.max_connections as f64) * 100.0
912        } else {
913            0.0
914        };
915
916        let circuit_breaker_open = if let Some(cb) = &self.circuit_breaker {
917            !cb.is_healthy().await
918        } else {
919            false
920        };
921
922        let is_healthy =
923            !circuit_breaker_open && utilization < 95.0 && metrics.avg_wait_time_ms < 1000.0;
924
925        PoolStatus {
926            total_connections,
927            active_connections: active_count,
928            idle_connections: connections.len(),
929            pending_requests: pending,
930            is_healthy,
931            last_health_check: Some(Instant::now()),
932            utilization_percent: utilization,
933            avg_response_time_ms: metrics.avg_wait_time_ms,
934            load_balancing_strategy: self.config.load_balancing.clone(),
935            circuit_breaker_open,
936            config_hash: self.calculate_config_hash(),
937        }
938    }
939
940    /// Calculate hash of current configuration for validation
941    fn calculate_config_hash(&self) -> u64 {
942        use std::collections::hash_map::DefaultHasher;
943        use std::hash::{Hash, Hasher};
944
945        let mut hasher = DefaultHasher::new();
946        self.config.min_connections.hash(&mut hasher);
947        self.config.max_connections.hash(&mut hasher);
948        self.config.adaptive_sizing.hash(&mut hasher);
949        hasher.finish()
950    }
951
952    /// Get pool statistics
953    pub async fn stats(&self) -> PoolStats {
954        self.stats.read().await.clone()
955    }
956
957    /// Update pool metrics for monitoring
958    async fn update_metrics(&self, wait_time: Duration) {
959        let mut metrics = self.metrics.write().await;
960
961        metrics.total_requests += 1;
962        let wait_time_ms = wait_time.as_millis() as f64;
963
964        // Update average wait time with exponential moving average
965        let alpha = 0.1;
966        metrics.avg_wait_time_ms = alpha * wait_time_ms + (1.0 - alpha) * metrics.avg_wait_time_ms;
967
968        // Update utilization history
969        let connections = self.connections.lock().await;
970        let active_count = *self.active_count.lock().await;
971        let utilization = (active_count as f64) / (self.config.max_connections as f64);
972
973        metrics
974            .utilization_history
975            .push_back((Instant::now(), utilization));
976        if metrics.utilization_history.len() > 1000 {
977            metrics.utilization_history.pop_front();
978        }
979
980        metrics.current_size = connections.len() + active_count;
981        metrics.peak_size = metrics.peak_size.max(metrics.current_size);
982        metrics.last_updated = Instant::now();
983    }
984
985    /// Start adaptive sizing background task
986    async fn start_adaptive_sizing_task(&self) {
987        let pool_metrics = self.metrics.clone();
988        let adaptive_controller = self.adaptive_controller.clone();
989        let pool_config = self.config.clone();
990        let stats = self.stats.clone();
991
992        tokio::spawn(async move {
993            let mut interval = tokio::time::interval(Duration::from_secs(30));
994
995            loop {
996                interval.tick().await;
997
998                let metrics = pool_metrics.read().await;
999                let mut controller = adaptive_controller.write().await;
1000
1001                if !controller.enabled {
1002                    continue;
1003                }
1004
1005                let avg_response_time = Duration::from_millis(metrics.avg_wait_time_ms as u64);
1006                let current_utilization =
1007                    if let Some((_, util)) = metrics.utilization_history.back() {
1008                        *util
1009                    } else {
1010                        0.0
1011                    };
1012
1013                let should_scale_up = controller.should_scale_up(
1014                    metrics.current_size,
1015                    avg_response_time,
1016                    current_utilization,
1017                );
1018
1019                let should_scale_down = controller.should_scale_down(
1020                    metrics.current_size,
1021                    avg_response_time,
1022                    current_utilization,
1023                );
1024
1025                if should_scale_up && metrics.current_size < pool_config.max_connections {
1026                    controller.current_target_size =
1027                        (controller.current_target_size + 1).min(pool_config.max_connections);
1028                    controller.last_adjustment = Instant::now();
1029                    stats.write().await.adaptive_scaling_events += 1;
1030                    info!(
1031                        "Adaptive scaling: scaling UP to {}",
1032                        controller.current_target_size
1033                    );
1034                } else if should_scale_down && metrics.current_size > pool_config.min_connections {
1035                    controller.current_target_size =
1036                        (controller.current_target_size.saturating_sub(1))
1037                            .max(pool_config.min_connections);
1038                    controller.last_adjustment = Instant::now();
1039                    stats.write().await.adaptive_scaling_events += 1;
1040                    info!(
1041                        "Adaptive scaling: scaling DOWN to {}",
1042                        controller.current_target_size
1043                    );
1044                }
1045            }
1046        });
1047    }
1048}
1049
1050/// Enhanced connection handle with usage tracking and metrics
1051pub struct PooledConnectionHandle<T: PooledConnection> {
1052    connection: Option<T>,
1053    pool_connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
1054    active_count: Arc<Mutex<usize>>,
1055    stats: Arc<RwLock<PoolStats>>,
1056    metrics: Arc<RwLock<PoolMetrics>>,
1057    adaptive_controller: Arc<RwLock<AdaptiveController>>,
1058    acquired_at: Instant,
1059    execution_times: Vec<Duration>,
1060    operation_count: u32,
1061    success_count: u32,
1062}
1063
1064impl<T: PooledConnection> PooledConnectionHandle<T> {
1065    fn new(
1066        connection: T,
1067        pool_connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
1068        active_count: Arc<Mutex<usize>>,
1069        stats: Arc<RwLock<PoolStats>>,
1070        metrics: Arc<RwLock<PoolMetrics>>,
1071        adaptive_controller: Arc<RwLock<AdaptiveController>>,
1072    ) -> Self {
1073        Self {
1074            connection: Some(connection),
1075            pool_connections,
1076            active_count,
1077            stats,
1078            metrics,
1079            adaptive_controller,
1080            acquired_at: Instant::now(),
1081            execution_times: Vec::new(),
1082            operation_count: 0,
1083            success_count: 0,
1084        }
1085    }
1086
1087    /// Record an operation execution time for metrics
1088    pub fn record_operation(&mut self, execution_time: Duration, success: bool) {
1089        self.execution_times.push(execution_time);
1090        self.operation_count += 1;
1091        if success {
1092            self.success_count += 1;
1093        }
1094
1095        debug!(
1096            "Recorded operation: time={:?}, success={}, total_ops={}",
1097            execution_time, success, self.operation_count
1098        );
1099    }
1100
1101    /// Get operation statistics for this handle
1102    pub fn get_operation_stats(&self) -> (u32, u32, Duration) {
1103        let avg_time = if !self.execution_times.is_empty() {
1104            self.execution_times.iter().sum::<Duration>() / self.execution_times.len() as u32
1105        } else {
1106            Duration::ZERO
1107        };
1108
1109        (self.operation_count, self.success_count, avg_time)
1110    }
1111
1112    /// Get the total time this connection has been held
1113    pub fn held_duration(&self) -> Duration {
1114        self.acquired_at.elapsed()
1115    }
1116
1117    /// Get reference to the connection
1118    pub fn as_ref(&self) -> Option<&T> {
1119        self.connection.as_ref()
1120    }
1121
1122    /// Get mutable reference to the connection
1123    pub fn as_mut(&mut self) -> Option<&mut T> {
1124        self.connection.as_mut()
1125    }
1126
1127    /// Take the connection out of the handle (won't be returned to pool)
1128    pub fn take(mut self) -> Option<T> {
1129        self.connection.take()
1130    }
1131}
1132
1133impl<T: PooledConnection> Drop for PooledConnectionHandle<T> {
1134    fn drop(&mut self) {
1135        if let Some(connection) = self.connection.take() {
1136            let pool_connections = self.pool_connections.clone();
1137            let active_count = self.active_count.clone();
1138            let stats = self.stats.clone();
1139            let _metrics = self.metrics.clone();
1140            let adaptive_controller = self.adaptive_controller.clone();
1141
1142            // Calculate metrics for this connection usage
1143            let total_held_time = self.acquired_at.elapsed();
1144            let avg_execution_time = if !self.execution_times.is_empty() {
1145                self.execution_times.iter().sum::<Duration>() / self.execution_times.len() as u32
1146            } else {
1147                Duration::from_millis(50) // Default assumption
1148            };
1149
1150            let success_rate = if self.operation_count > 0 {
1151                self.success_count as f64 / self.operation_count as f64
1152            } else {
1153                1.0 // Assume success if no operations recorded
1154            };
1155
1156            let overall_success = success_rate > 0.8; // Consider successful if >80% operations succeeded
1157
1158            tokio::spawn(async move {
1159                let mut wrapper = PooledConnectionWrapper::new(connection);
1160                wrapper.record_usage(avg_execution_time, overall_success);
1161                wrapper.is_in_use = false;
1162
1163                let usage_count = wrapper.usage_count;
1164                pool_connections.lock().await.push_back(wrapper);
1165
1166                let mut active = active_count.lock().await;
1167                if *active > 0 {
1168                    *active -= 1;
1169                }
1170
1171                // Update pool statistics
1172                stats.write().await.total_returned += 1;
1173
1174                // Update adaptive controller metrics
1175                let utilization = (*active as f64) / 10.0; // Simplified calculation
1176                adaptive_controller
1177                    .write()
1178                    .await
1179                    .record_metrics(avg_execution_time, utilization);
1180
1181                debug!(
1182                    "Returned connection to pool: held_time={:?}, ops={}, success_rate={:.2}",
1183                    total_held_time, usage_count, success_rate
1184                );
1185            });
1186        }
1187    }
1188}
1189
1190/// Helper for creating connection pools from stream config
1191impl<T: PooledConnection + Clone> ConnectionPool<T> {
1192    /// Create a connection pool from stream configuration
1193    pub async fn new_from_config(
1194        config: &StreamConfig,
1195        factory: Arc<dyn ConnectionFactory<T>>,
1196    ) -> Result<Self> {
1197        let pool_config = PoolConfig {
1198            min_connections: 1,
1199            max_connections: config.max_connections,
1200            connection_timeout: config.connection_timeout,
1201            adaptive_sizing: true,
1202            enable_circuit_breaker: true,
1203            enable_metrics: true,
1204            ..Default::default()
1205        };
1206
1207        Self::new(pool_config, factory).await
1208    }
1209
1210    /// Health check with enhanced status
1211    pub async fn health_check(&self) -> PoolStatus {
1212        self.status().await
1213    }
1214
1215    /// Get detailed pool metrics for monitoring
1216    pub async fn get_detailed_metrics(&self) -> DetailedPoolMetrics {
1217        let status = self.status().await;
1218        let metrics = self.metrics.read().await;
1219        let stats = self.stats.read().await;
1220        let controller = self.adaptive_controller.read().await;
1221
1222        DetailedPoolMetrics {
1223            status,
1224            total_requests: metrics.total_requests,
1225            peak_size: metrics.peak_size,
1226            avg_wait_time_ms: metrics.avg_wait_time_ms,
1227            response_time_p50: metrics.response_time_p50,
1228            response_time_p95: metrics.response_time_p95,
1229            response_time_p99: metrics.response_time_p99,
1230            adaptive_scaling_events: stats.adaptive_scaling_events,
1231            circuit_breaker_failures: stats.circuit_breaker_failures,
1232            load_balancing_decisions: stats.load_balancing_decisions,
1233            current_target_size: controller.current_target_size,
1234            pool_uptime: self.created_at.elapsed(),
1235        }
1236    }
1237
1238    /// Reset pool statistics
1239    pub async fn reset_statistics(&self) {
1240        *self.stats.write().await = PoolStats::default();
1241        *self.metrics.write().await = PoolMetrics::default();
1242        info!("Pool statistics reset");
1243    }
1244
1245    /// Force resize the pool
1246    pub async fn resize(&self, new_size: usize) -> Result<()> {
1247        if new_size < self.config.min_connections || new_size > self.config.max_connections {
1248            return Err(anyhow!(
1249                "New size {} outside allowed range [{}, {}]",
1250                new_size,
1251                self.config.min_connections,
1252                self.config.max_connections
1253            ));
1254        }
1255
1256        let mut controller = self.adaptive_controller.write().await;
1257        controller.current_target_size = new_size;
1258        controller.last_adjustment = Instant::now();
1259
1260        info!("Pool manually resized to {}", new_size);
1261        Ok(())
1262    }
1263
1264    /// Create a connection pool with failover support
1265    pub async fn new_with_failover(
1266        config: PoolConfig,
1267        primary_factory: Arc<dyn ConnectionFactory<T>>,
1268        secondary_factory: Arc<dyn ConnectionFactory<T>>,
1269        failover_config: FailoverConfig,
1270    ) -> Result<Self> {
1271        // Create primary and secondary endpoints
1272        let primary_endpoint = ConnectionEndpoint {
1273            name: "primary".to_string(),
1274            factory: primary_factory.clone(),
1275            priority: 1,
1276            metadata: HashMap::new(),
1277        };
1278
1279        let secondary_endpoint = ConnectionEndpoint {
1280            name: "secondary".to_string(),
1281            factory: secondary_factory,
1282            priority: 2,
1283            metadata: HashMap::new(),
1284        };
1285
1286        // Create failover manager
1287        let failover_manager = Arc::new(
1288            FailoverManager::new(failover_config, primary_endpoint, secondary_endpoint).await?,
1289        );
1290
1291        // Create pool with primary factory
1292        let mut pool = Self::new(config, primary_factory).await?;
1293        pool.failover_manager = Some(failover_manager.clone());
1294
1295        // Subscribe to failover events
1296        let mut failover_events = failover_manager.subscribe();
1297        let stats = pool.stats.clone();
1298
1299        tokio::spawn(async move {
1300            while let Ok(event) = failover_events.recv().await {
1301                match event {
1302                    crate::failover::FailoverEvent::FailoverCompleted { from, to, duration } => {
1303                        info!(
1304                            "Failover completed from {} to {} in {:?}",
1305                            from, to, duration
1306                        );
1307                        stats.write().await.failover_count += 1;
1308                    }
1309                    crate::failover::FailoverEvent::FailbackCompleted { from, to, duration } => {
1310                        info!(
1311                            "Failback completed from {} to {} in {:?}",
1312                            from, to, duration
1313                        );
1314                    }
1315                    crate::failover::FailoverEvent::AllConnectionsUnavailable => {
1316                        error!("All connections unavailable!");
1317                    }
1318                    _ => {}
1319                }
1320            }
1321        });
1322
1323        Ok(pool)
1324    }
1325
1326    /// Get health statistics from the health monitor
1327    pub async fn get_health_statistics(&self) -> crate::health_monitor::OverallHealthStatistics {
1328        self.health_monitor.get_overall_statistics().await
1329    }
1330
1331    /// Get reconnection statistics
1332    pub async fn get_reconnection_statistics(&self) -> crate::reconnect::ReconnectStatistics {
1333        self.reconnect_manager.get_statistics().await
1334    }
1335
1336    /// Get failover statistics if failover is enabled
1337    pub async fn get_failover_statistics(&self) -> Option<crate::failover::FailoverStatistics> {
1338        if let Some(fm) = &self.failover_manager {
1339            Some(fm.get_statistics().await)
1340        } else {
1341            None
1342        }
1343    }
1344
1345    /// Register a connection failure callback for automatic reconnection
1346    pub async fn register_failure_callback<F>(&self, callback: F)
1347    where
1348        F: Fn(String, String, u32) -> Pin<Box<dyn Future<Output = ()> + Send>>
1349            + Send
1350            + Sync
1351            + 'static,
1352    {
1353        self.reconnect_manager
1354            .register_failure_callback(callback)
1355            .await;
1356    }
1357
1358    /// Manually trigger failover (if configured)
1359    pub async fn trigger_failover(&self) -> Result<()> {
1360        if let Some(fm) = &self.failover_manager {
1361            fm.trigger_failover().await
1362        } else {
1363            Err(anyhow!("Failover not configured for this pool"))
1364        }
1365    }
1366
1367    /// Check if the pool has failover configured
1368    pub fn has_failover(&self) -> bool {
1369        self.failover_manager.is_some()
1370    }
1371
1372    /// Get unhealthy connections from health monitor
1373    pub async fn get_unhealthy_connections(&self) -> Vec<String> {
1374        self.health_monitor.get_unhealthy_connections().await
1375    }
1376
1377    /// Subscribe to health monitoring events
1378    pub fn subscribe_health_events(
1379        &self,
1380    ) -> broadcast::Receiver<crate::health_monitor::HealthEvent> {
1381        self.health_monitor.subscribe()
1382    }
1383
1384    /// Subscribe to reconnection events
1385    pub fn subscribe_reconnect_events(
1386        &self,
1387    ) -> broadcast::Receiver<crate::reconnect::ReconnectEvent> {
1388        self.reconnect_manager.subscribe()
1389    }
1390}
1391
1392/// Detailed pool metrics for comprehensive monitoring
1393#[derive(Debug, Clone, Serialize, Deserialize)]
1394pub struct DetailedPoolMetrics {
1395    pub status: PoolStatus,
1396    pub total_requests: u64,
1397    pub peak_size: usize,
1398    pub avg_wait_time_ms: f64,
1399    #[serde(skip)]
1400    pub response_time_p50: Duration,
1401    #[serde(skip)]
1402    pub response_time_p95: Duration,
1403    #[serde(skip)]
1404    pub response_time_p99: Duration,
1405    pub adaptive_scaling_events: u64,
1406    pub circuit_breaker_failures: u64,
1407    pub load_balancing_decisions: u64,
1408    pub current_target_size: usize,
1409    #[serde(skip)]
1410    pub pool_uptime: Duration,
1411}
1412
1413#[cfg(test)]
1414mod tests {
1415    use super::*;
1416    use std::sync::atomic::{AtomicBool, Ordering};
1417
1418    #[derive(Debug, Clone)]
1419    struct TestConnection {
1420        id: u32,
1421        created_at: Instant,
1422        last_activity: Instant,
1423        is_healthy: Arc<AtomicBool>,
1424        is_closed: bool,
1425    }
1426
1427    impl TestConnection {
1428        fn new(id: u32) -> Self {
1429            let now = Instant::now();
1430            Self {
1431                id,
1432                created_at: now,
1433                last_activity: now,
1434                is_healthy: Arc::new(AtomicBool::new(true)),
1435                is_closed: false,
1436            }
1437        }
1438    }
1439
1440    #[async_trait::async_trait]
1441    impl PooledConnection for TestConnection {
1442        async fn is_healthy(&self) -> bool {
1443            !self.is_closed && self.is_healthy.load(Ordering::Relaxed)
1444        }
1445
1446        async fn close(&mut self) -> Result<()> {
1447            self.is_closed = true;
1448            Ok(())
1449        }
1450
1451        fn created_at(&self) -> Instant {
1452            self.created_at
1453        }
1454
1455        fn last_activity(&self) -> Instant {
1456            self.last_activity
1457        }
1458
1459        fn update_activity(&mut self) {
1460            self.last_activity = Instant::now();
1461        }
1462
1463        fn clone_connection(&self) -> Box<dyn PooledConnection> {
1464            Box::new(self.clone())
1465        }
1466    }
1467
1468    struct TestConnectionFactory {
1469        counter: Arc<Mutex<u32>>,
1470    }
1471
1472    impl TestConnectionFactory {
1473        fn new() -> Self {
1474            Self {
1475                counter: Arc::new(Mutex::new(0)),
1476            }
1477        }
1478    }
1479
1480    #[async_trait::async_trait]
1481    impl ConnectionFactory<TestConnection> for TestConnectionFactory {
1482        async fn create_connection(&self) -> Result<TestConnection> {
1483            let mut counter = self.counter.lock().await;
1484            *counter += 1;
1485            Ok(TestConnection::new(*counter))
1486        }
1487    }
1488
1489    #[tokio::test]
1490    async fn test_pool_creation() {
1491        let config = PoolConfig {
1492            min_connections: 2,
1493            max_connections: 5,
1494            ..Default::default()
1495        };
1496
1497        let factory = Arc::new(TestConnectionFactory::new());
1498        let pool = ConnectionPool::new(config, factory).await.unwrap();
1499
1500        let status = pool.status().await;
1501        assert_eq!(status.idle_connections, 2);
1502        assert_eq!(status.active_connections, 0);
1503    }
1504
1505    #[tokio::test]
1506    async fn test_connection_borrowing() {
1507        let config = PoolConfig {
1508            min_connections: 1,
1509            max_connections: 3,
1510            ..Default::default()
1511        };
1512
1513        let factory = Arc::new(TestConnectionFactory::new());
1514        let pool = ConnectionPool::new(config, factory).await.unwrap();
1515
1516        let mut handle = pool.get_connection().await.unwrap();
1517
1518        let status = pool.status().await;
1519        assert_eq!(status.active_connections, 1);
1520        assert_eq!(status.idle_connections, 0);
1521        assert!(status.is_healthy);
1522
1523        // Record some operations
1524        handle.record_operation(Duration::from_millis(50), true);
1525        handle.record_operation(Duration::from_millis(75), true);
1526
1527        let (ops, successes, avg_time) = handle.get_operation_stats();
1528        assert_eq!(ops, 2);
1529        assert_eq!(successes, 2);
1530        assert!(avg_time > Duration::ZERO);
1531
1532        drop(handle);
1533
1534        // Wait for the connection to be returned
1535        tokio::time::sleep(Duration::from_millis(10)).await;
1536
1537        let status = pool.status().await;
1538        assert_eq!(status.active_connections, 0);
1539        assert_eq!(status.idle_connections, 1);
1540    }
1541
1542    #[tokio::test]
1543    async fn test_load_balancing_strategies() {
1544        for strategy in [
1545            LoadBalancingStrategy::RoundRobin,
1546            LoadBalancingStrategy::Random,
1547            LoadBalancingStrategy::LeastRecentlyUsed,
1548            LoadBalancingStrategy::LeastConnections,
1549            LoadBalancingStrategy::WeightedRoundRobin,
1550        ] {
1551            let config = PoolConfig {
1552                min_connections: 3,
1553                max_connections: 5,
1554                load_balancing: strategy.clone(),
1555                ..Default::default()
1556            };
1557
1558            let factory = Arc::new(TestConnectionFactory::new());
1559            let pool = ConnectionPool::new(config, factory).await.unwrap();
1560
1561            // Get multiple connections to test load balancing
1562            let handles: Vec<_> =
1563                futures_util::future::join_all((0..3).map(|_| pool.get_connection()))
1564                    .await
1565                    .into_iter()
1566                    .collect::<Result<Vec<_>, _>>()
1567                    .unwrap();
1568
1569            let status = pool.status().await;
1570            assert_eq!(status.active_connections, 3);
1571            assert_eq!(status.load_balancing_strategy, strategy);
1572
1573            drop(handles);
1574            tokio::time::sleep(Duration::from_millis(10)).await;
1575        }
1576    }
1577
1578    #[tokio::test]
1579    async fn test_circuit_breaker_integration() {
1580        let config = PoolConfig {
1581            min_connections: 1,
1582            max_connections: 2,
1583            enable_circuit_breaker: true,
1584            circuit_breaker_config: Some(CircuitBreakerConfig {
1585                failure_threshold: 2,
1586                timeout: Duration::from_millis(50),
1587                ..Default::default()
1588            }),
1589            ..Default::default()
1590        };
1591
1592        let factory = Arc::new(TestConnectionFactory::new());
1593        let pool = ConnectionPool::new(config, factory).await.unwrap();
1594
1595        // Should work normally initially
1596        let handle = pool.get_connection().await;
1597        assert!(handle.is_ok());
1598        drop(handle);
1599
1600        // Test that circuit breaker status is included in pool status
1601        let status = pool.status().await;
1602        assert!(!status.circuit_breaker_open);
1603    }
1604
1605    #[tokio::test]
1606    async fn test_adaptive_sizing() {
1607        let config = PoolConfig {
1608            min_connections: 1,
1609            max_connections: 5,
1610            adaptive_sizing: true,
1611            target_response_time_ms: 50,
1612            ..Default::default()
1613        };
1614
1615        let factory = Arc::new(TestConnectionFactory::new());
1616        let pool = ConnectionPool::new(config, factory).await.unwrap();
1617
1618        let metrics = pool.get_detailed_metrics().await;
1619        assert_eq!(metrics.current_target_size, 1); // Should start at min_connections
1620        assert!(metrics.adaptive_scaling_events == 0);
1621
1622        // Test pool resizing
1623        pool.resize(3).await.unwrap();
1624        let metrics = pool.get_detailed_metrics().await;
1625        assert_eq!(metrics.current_target_size, 3);
1626    }
1627
1628    #[tokio::test]
1629    async fn test_detailed_metrics() {
1630        let config = PoolConfig {
1631            min_connections: 2,
1632            max_connections: 4,
1633            enable_metrics: true,
1634            ..Default::default()
1635        };
1636
1637        let factory = Arc::new(TestConnectionFactory::new());
1638        let pool = ConnectionPool::new(config, factory).await.unwrap();
1639
1640        // Generate some activity
1641        let handles: Vec<_> = futures_util::future::join_all((0..3).map(|_| pool.get_connection()))
1642            .await
1643            .into_iter()
1644            .collect::<Result<Vec<_>, _>>()
1645            .unwrap();
1646
1647        let metrics = pool.get_detailed_metrics().await;
1648        assert!(metrics.total_requests >= 3);
1649        assert!(metrics.status.utilization_percent > 0.0);
1650        assert!(metrics.pool_uptime > Duration::ZERO);
1651        assert_eq!(metrics.status.active_connections, 3);
1652
1653        drop(handles);
1654
1655        // Test statistics reset
1656        pool.reset_statistics().await;
1657        let metrics = pool.get_detailed_metrics().await;
1658        assert_eq!(metrics.adaptive_scaling_events, 0);
1659    }
1660}