Skip to main content

oxirs_stream/
connection_pool_manager.rs

1//! Connection Pool — Pool lifecycle management
2//!
3//! Acquire/release, connection reuse, idle cleanup, adaptive sizing,
4//! the ConnectionPool struct and PooledConnectionHandle.
5
6use crate::{
7    circuit_breaker::{
8        new_shared_circuit_breaker, FailureType, SharedCircuitBreaker, SharedCircuitBreakerExt,
9    },
10    failover::{ConnectionEndpoint, FailoverConfig, FailoverManager},
11    health_monitor::{HealthCheckConfig, HealthMonitor},
12    reconnect::{ReconnectConfig, ReconnectManager, ReconnectStrategy},
13    StreamConfig,
14};
15use anyhow::{anyhow, Result};
16use fastrand;
17use std::collections::{HashMap, VecDeque};
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::{
21    atomic::{AtomicUsize, Ordering},
22    Arc,
23};
24use std::time::{Duration, Instant};
25use tokio::sync::{broadcast, Mutex, RwLock, Semaphore};
26use tracing::{debug, error, info, warn};
27
28use super::connection_pool_types::{
29    AdaptiveController, ConnectionFactory, DetailedPoolMetrics, LoadBalancingStrategy, PoolConfig,
30    PoolMetrics, PoolStats, PoolStatus, PooledConnection, PooledConnectionWrapper,
31};
32
33/// Advanced connection pool implementation with monitoring and load balancing
34pub struct ConnectionPool<T: PooledConnection + Clone> {
35    pub(super) config: PoolConfig,
36    pub(super) connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
37    pub(super) active_count: Arc<Mutex<usize>>,
38    pub(super) semaphore: Arc<Semaphore>,
39    pub(super) stats: Arc<RwLock<PoolStats>>,
40    pub(super) connection_factory: Arc<dyn ConnectionFactory<T>>,
41    pub(super) circuit_breaker: Option<SharedCircuitBreaker>,
42    pub(super) round_robin_counter: Arc<AtomicUsize>,
43    pub(super) metrics: Arc<RwLock<PoolMetrics>>,
44    pub(super) pending_requests: Arc<AtomicUsize>,
45    pub(super) created_at: Instant,
46    pub(super) adaptive_controller: Arc<RwLock<AdaptiveController>>,
47    pub(super) health_monitor: Arc<HealthMonitor<T>>,
48    pub(super) reconnect_manager: Arc<ReconnectManager<T>>,
49    pub(super) failover_manager: Option<Arc<FailoverManager<T>>>,
50}
51
52impl<T: PooledConnection + Clone> ConnectionPool<T> {
53    /// Create a new advanced connection pool
54    pub async fn new(config: PoolConfig, factory: Arc<dyn ConnectionFactory<T>>) -> Result<Self> {
55        let circuit_breaker = if config.enable_circuit_breaker {
56            Some(new_shared_circuit_breaker(
57                config.circuit_breaker_config.clone().unwrap_or_default(),
58            ))
59        } else {
60            None
61        };
62
63        let adaptive_controller = AdaptiveController {
64            enabled: config.adaptive_sizing,
65            target_response_time: Duration::from_millis(config.target_response_time_ms),
66            current_target_size: config.min_connections,
67            ..Default::default()
68        };
69
70        let health_check_config = HealthCheckConfig {
71            check_interval: config.health_check_interval,
72            check_timeout: config.validation_timeout,
73            enable_statistics: config.enable_metrics,
74            ..Default::default()
75        };
76        let health_monitor = Arc::new(HealthMonitor::new(health_check_config));
77
78        let reconnect_config = ReconnectConfig {
79            initial_delay: Duration::from_millis(100),
80            max_delay: Duration::from_secs(30),
81            max_attempts: config.retry_attempts,
82            connection_timeout: config.connection_timeout,
83            ..Default::default()
84        };
85        let reconnect_manager = Arc::new(ReconnectManager::new(
86            reconnect_config,
87            ReconnectStrategy::ExponentialBackoff,
88        ));
89
90        let pool = Self {
91            semaphore: Arc::new(Semaphore::new(config.max_connections)),
92            connections: Arc::new(Mutex::new(VecDeque::new())),
93            active_count: Arc::new(Mutex::new(0)),
94            stats: Arc::new(RwLock::new(PoolStats::default())),
95            connection_factory: factory,
96            circuit_breaker,
97            round_robin_counter: Arc::new(AtomicUsize::new(0)),
98            metrics: Arc::new(RwLock::new(PoolMetrics::default())),
99            pending_requests: Arc::new(AtomicUsize::new(0)),
100            created_at: Instant::now(),
101            adaptive_controller: Arc::new(RwLock::new(adaptive_controller)),
102            health_monitor,
103            reconnect_manager,
104            failover_manager: None,
105            config,
106        };
107
108        pool.ensure_min_connections().await?;
109        pool.start_maintenance_task().await;
110
111        if pool.config.adaptive_sizing {
112            pool.start_adaptive_sizing_task().await;
113        }
114
115        pool.start_health_monitoring().await;
116
117        info!(
118            "Created advanced connection pool with health monitoring, automatic reconnection, and {} features",
119            if pool.circuit_breaker.is_some() { "circuit breaker" } else { "standard" }
120        );
121
122        Ok(pool)
123    }
124
125    /// Get a connection from the pool
126    pub async fn get_connection(&self) -> Result<PooledConnectionHandle<T>> {
127        let start_time = Instant::now();
128        self.pending_requests.fetch_add(1, Ordering::Relaxed);
129
130        if let Some(cb) = &self.circuit_breaker {
131            if !cb.can_execute().await {
132                self.pending_requests.fetch_sub(1, Ordering::Relaxed);
133                return Err(anyhow!(
134                    "Circuit breaker is open - connection pool unavailable"
135                ));
136            }
137        }
138
139        let _permit = tokio::time::timeout(self.config.acquire_timeout, self.semaphore.acquire())
140            .await
141            .map_err(|_| anyhow!("Timeout acquiring connection from pool"))?
142            .map_err(|_| anyhow!("Failed to acquire semaphore permit"))?;
143
144        let connection = match self.try_get_existing_connection_with_lb().await {
145            Some(conn) => {
146                if let Some(cb) = &self.circuit_breaker {
147                    cb.record_success_with_duration(start_time.elapsed()).await;
148                }
149                conn
150            }
151            None => match self.create_new_connection().await {
152                Ok(conn) => {
153                    if let Some(cb) = &self.circuit_breaker {
154                        cb.record_success_with_duration(start_time.elapsed()).await;
155                    }
156                    conn
157                }
158                Err(e) => {
159                    if let Some(cb) = &self.circuit_breaker {
160                        cb.record_failure_with_type(FailureType::NetworkError).await;
161                    }
162                    self.pending_requests.fetch_sub(1, Ordering::Relaxed);
163                    return Err(e);
164                }
165            },
166        };
167
168        *self.active_count.lock().await += 1;
169        let mut stats = self.stats.write().await;
170        stats.total_borrowed += 1;
171        stats.load_balancing_decisions += 1;
172        drop(stats);
173
174        let wait_time = start_time.elapsed();
175        self.update_metrics(wait_time).await;
176        self.pending_requests.fetch_sub(1, Ordering::Relaxed);
177
178        Ok(PooledConnectionHandle::new(
179            connection,
180            self.connections.clone(),
181            self.active_count.clone(),
182            self.stats.clone(),
183            self.metrics.clone(),
184            self.adaptive_controller.clone(),
185        ))
186    }
187
188    /// Try to get an existing healthy connection with load balancing
189    async fn try_get_existing_connection_with_lb(&self) -> Option<T> {
190        let mut connections = self.connections.lock().await;
191
192        if connections.is_empty() {
193            return None;
194        }
195
196        let selected_index = match self.config.load_balancing {
197            LoadBalancingStrategy::RoundRobin => {
198                self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % connections.len()
199            }
200            LoadBalancingStrategy::Random => fastrand::usize(..connections.len()),
201            LoadBalancingStrategy::LeastRecentlyUsed => connections
202                .iter()
203                .enumerate()
204                .min_by_key(|(_, wrapper)| wrapper.last_activity)
205                .map(|(idx, _)| idx)
206                .unwrap_or(0),
207            LoadBalancingStrategy::LeastConnections => connections
208                .iter()
209                .enumerate()
210                .min_by_key(|(_, wrapper)| wrapper.usage_count)
211                .map(|(idx, _)| idx)
212                .unwrap_or(0),
213            LoadBalancingStrategy::WeightedRoundRobin => connections
214                .iter()
215                .enumerate()
216                .max_by(|(_, a), (_, b)| {
217                    a.efficiency_score()
218                        .partial_cmp(&b.efficiency_score())
219                        .unwrap_or(std::cmp::Ordering::Equal)
220                })
221                .map(|(idx, _)| idx)
222                .unwrap_or(0),
223        };
224
225        for attempt in 0..connections.len() {
226            let index = (selected_index + attempt) % connections.len();
227
228            if let Some(mut wrapper) = connections.remove(index) {
229                if wrapper.is_expired(self.config.max_lifetime, self.config.idle_timeout) {
230                    if let Err(e) = wrapper.connection.close().await {
231                        warn!("Failed to close expired connection: {}", e);
232                    }
233                    self.stats.write().await.total_destroyed += 1;
234                    continue;
235                }
236
237                let health_check =
238                    tokio::time::timeout(self.config.validation_timeout, wrapper.is_healthy())
239                        .await;
240
241                match health_check {
242                    Ok(true) => {
243                        wrapper.is_in_use = true;
244                        wrapper.last_activity = Instant::now();
245                        wrapper.last_health_check = Some((Instant::now(), true));
246                        debug!(
247                            "Selected connection {} using {:?} strategy",
248                            wrapper.connection_id, self.config.load_balancing
249                        );
250                        return Some(wrapper.connection);
251                    }
252                    Ok(false) | Err(_) => {
253                        if let Err(e) = wrapper.connection.close().await {
254                            warn!("Failed to close unhealthy connection: {}", e);
255                        }
256                        let mut stats = self.stats.write().await;
257                        stats.health_check_failures += 1;
258                        stats.total_destroyed += 1;
259                        continue;
260                    }
261                }
262            }
263        }
264
265        None
266    }
267
268    /// Create a new connection
269    pub(super) async fn create_new_connection(&self) -> Result<T> {
270        match self.connection_factory.create_connection().await {
271            Ok(connection) => {
272                self.stats.write().await.total_created += 1;
273                debug!("Created new connection");
274                Ok(connection)
275            }
276            Err(e) => {
277                self.stats.write().await.creation_failures += 1;
278                error!("Failed to create connection: {}", e);
279                Err(e)
280            }
281        }
282    }
283
284    /// Return a connection to the pool with usage tracking
285    async fn return_connection_with_metrics(
286        &self,
287        mut connection: T,
288        execution_time: Duration,
289        success: bool,
290    ) {
291        connection.update_activity();
292
293        let mut wrapper = PooledConnectionWrapper::new(connection);
294        wrapper.record_usage(execution_time, success);
295        wrapper.is_in_use = false;
296
297        self.connections.lock().await.push_back(wrapper);
298
299        let mut active_count = self.active_count.lock().await;
300        if *active_count > 0 {
301            *active_count -= 1;
302        }
303
304        self.stats.write().await.total_returned += 1;
305
306        let mut controller = self.adaptive_controller.write().await;
307        let utilization = (*active_count as f64) / (self.config.max_connections as f64);
308        controller.record_metrics(execution_time, utilization);
309
310        debug!(
311            "Returned connection to pool with metrics: exec_time={:?}, success={}",
312            execution_time, success
313        );
314    }
315
316    /// Legacy method for backward compatibility
317    #[allow(dead_code)]
318    async fn return_connection(&self, connection: T) {
319        self.return_connection_with_metrics(connection, Duration::from_millis(100), true)
320            .await;
321    }
322
323    /// Ensure minimum connections are available
324    pub(super) async fn ensure_min_connections(&self) -> Result<()> {
325        let current_count = self.connections.lock().await.len();
326        let active_count = *self.active_count.lock().await;
327        let total_count = current_count + active_count;
328
329        if total_count < self.config.min_connections {
330            let needed = self.config.min_connections - total_count;
331            for _ in 0..needed {
332                match self.create_new_connection().await {
333                    Ok(connection) => {
334                        let wrapper = PooledConnectionWrapper::new(connection);
335                        self.connections.lock().await.push_back(wrapper);
336                    }
337                    Err(e) => {
338                        warn!("Failed to create minimum connection: {}", e);
339                        break;
340                    }
341                }
342            }
343        }
344
345        Ok(())
346    }
347
348    /// Update pool metrics
349    pub(super) async fn update_metrics(&self, wait_time: Duration) {
350        let mut metrics = self.metrics.write().await;
351        metrics.total_requests += 1;
352        let wait_time_ms = wait_time.as_millis() as f64;
353        let alpha = 0.1;
354        metrics.avg_wait_time_ms = alpha * wait_time_ms + (1.0 - alpha) * metrics.avg_wait_time_ms;
355
356        let connections = self.connections.lock().await;
357        let active_count = *self.active_count.lock().await;
358        let utilization = (active_count as f64) / (self.config.max_connections as f64);
359
360        metrics
361            .utilization_history
362            .push_back((Instant::now(), utilization));
363        if metrics.utilization_history.len() > 1000 {
364            metrics.utilization_history.pop_front();
365        }
366
367        metrics.current_size = connections.len() + active_count;
368        metrics.peak_size = metrics.peak_size.max(metrics.current_size);
369        metrics.last_updated = Instant::now();
370    }
371
372    /// Get comprehensive pool status
373    pub async fn status(&self) -> PoolStatus {
374        let connections = self.connections.lock().await;
375        let active_count = *self.active_count.lock().await;
376        let metrics = self.metrics.read().await;
377        let pending = self.pending_requests.load(Ordering::Relaxed);
378
379        let total_connections = connections.len() + active_count;
380        let utilization = if self.config.max_connections > 0 {
381            (total_connections as f64 / self.config.max_connections as f64) * 100.0
382        } else {
383            0.0
384        };
385
386        let circuit_breaker_open = if let Some(cb) = &self.circuit_breaker {
387            !cb.is_healthy().await
388        } else {
389            false
390        };
391
392        let is_healthy =
393            !circuit_breaker_open && utilization < 95.0 && metrics.avg_wait_time_ms < 1000.0;
394
395        PoolStatus {
396            total_connections,
397            active_connections: active_count,
398            idle_connections: connections.len(),
399            pending_requests: pending,
400            is_healthy,
401            last_health_check: Some(Instant::now()),
402            utilization_percent: utilization,
403            avg_response_time_ms: metrics.avg_wait_time_ms,
404            load_balancing_strategy: self.config.load_balancing.clone(),
405            circuit_breaker_open,
406            config_hash: self.calculate_config_hash(),
407        }
408    }
409
410    fn calculate_config_hash(&self) -> u64 {
411        use std::collections::hash_map::DefaultHasher;
412        use std::hash::{Hash, Hasher};
413        let mut hasher = DefaultHasher::new();
414        self.config.min_connections.hash(&mut hasher);
415        self.config.max_connections.hash(&mut hasher);
416        self.config.adaptive_sizing.hash(&mut hasher);
417        hasher.finish()
418    }
419
420    /// Get pool statistics
421    pub async fn stats(&self) -> PoolStats {
422        self.stats.read().await.clone()
423    }
424
425    /// Create a connection pool from stream configuration
426    pub async fn new_from_config(
427        config: &StreamConfig,
428        factory: Arc<dyn ConnectionFactory<T>>,
429    ) -> Result<Self> {
430        let pool_config = PoolConfig {
431            min_connections: 1,
432            max_connections: config.max_connections,
433            connection_timeout: config.connection_timeout,
434            adaptive_sizing: true,
435            enable_circuit_breaker: true,
436            enable_metrics: true,
437            ..Default::default()
438        };
439        Self::new(pool_config, factory).await
440    }
441
442    /// Health check with enhanced status
443    pub async fn health_check(&self) -> PoolStatus {
444        self.status().await
445    }
446
447    /// Get detailed pool metrics
448    pub async fn get_detailed_metrics(&self) -> DetailedPoolMetrics {
449        let status = self.status().await;
450        let metrics = self.metrics.read().await;
451        let stats = self.stats.read().await;
452        let controller = self.adaptive_controller.read().await;
453
454        DetailedPoolMetrics {
455            status,
456            total_requests: metrics.total_requests,
457            peak_size: metrics.peak_size,
458            avg_wait_time_ms: metrics.avg_wait_time_ms,
459            response_time_p50: metrics.response_time_p50,
460            response_time_p95: metrics.response_time_p95,
461            response_time_p99: metrics.response_time_p99,
462            adaptive_scaling_events: stats.adaptive_scaling_events,
463            circuit_breaker_failures: stats.circuit_breaker_failures,
464            load_balancing_decisions: stats.load_balancing_decisions,
465            current_target_size: controller.current_target_size,
466            pool_uptime: self.created_at.elapsed(),
467        }
468    }
469
470    /// Reset pool statistics
471    pub async fn reset_statistics(&self) {
472        *self.stats.write().await = PoolStats::default();
473        *self.metrics.write().await = PoolMetrics::default();
474        info!("Pool statistics reset");
475    }
476
477    /// Force resize the pool
478    pub async fn resize(&self, new_size: usize) -> Result<()> {
479        if new_size < self.config.min_connections || new_size > self.config.max_connections {
480            return Err(anyhow!(
481                "New size {} outside allowed range [{}, {}]",
482                new_size,
483                self.config.min_connections,
484                self.config.max_connections
485            ));
486        }
487        let mut controller = self.adaptive_controller.write().await;
488        controller.current_target_size = new_size;
489        controller.last_adjustment = Instant::now();
490        info!("Pool manually resized to {}", new_size);
491        Ok(())
492    }
493
494    /// Create a connection pool with failover support
495    pub async fn new_with_failover(
496        config: PoolConfig,
497        primary_factory: Arc<dyn ConnectionFactory<T>>,
498        secondary_factory: Arc<dyn ConnectionFactory<T>>,
499        failover_config: FailoverConfig,
500    ) -> Result<Self> {
501        let primary_endpoint = ConnectionEndpoint {
502            name: "primary".to_string(),
503            factory: primary_factory.clone(),
504            priority: 1,
505            metadata: HashMap::new(),
506        };
507        let secondary_endpoint = ConnectionEndpoint {
508            name: "secondary".to_string(),
509            factory: secondary_factory,
510            priority: 2,
511            metadata: HashMap::new(),
512        };
513
514        let failover_manager = Arc::new(
515            FailoverManager::new(failover_config, primary_endpoint, secondary_endpoint).await?,
516        );
517
518        let mut pool = Self::new(config, primary_factory).await?;
519        pool.failover_manager = Some(failover_manager.clone());
520
521        let mut failover_events = failover_manager.subscribe();
522        let stats = pool.stats.clone();
523
524        tokio::spawn(async move {
525            while let Ok(event) = failover_events.recv().await {
526                match event {
527                    crate::failover::FailoverEvent::FailoverCompleted { from, to, duration } => {
528                        info!(
529                            "Failover completed from {} to {} in {:?}",
530                            from, to, duration
531                        );
532                        stats.write().await.failover_count += 1;
533                    }
534                    crate::failover::FailoverEvent::FailbackCompleted { from, to, duration } => {
535                        info!(
536                            "Failback completed from {} to {} in {:?}",
537                            from, to, duration
538                        );
539                    }
540                    crate::failover::FailoverEvent::AllConnectionsUnavailable => {
541                        error!("All connections unavailable!");
542                    }
543                    _ => {}
544                }
545            }
546        });
547
548        Ok(pool)
549    }
550
551    /// Get health statistics from the health monitor
552    pub async fn get_health_statistics(&self) -> crate::health_monitor::OverallHealthStatistics {
553        self.health_monitor.get_overall_statistics().await
554    }
555
556    /// Get reconnection statistics
557    pub async fn get_reconnection_statistics(&self) -> crate::reconnect::ReconnectStatistics {
558        self.reconnect_manager.get_statistics().await
559    }
560
561    /// Get failover statistics if failover is enabled
562    pub async fn get_failover_statistics(&self) -> Option<crate::failover::FailoverStatistics> {
563        if let Some(fm) = &self.failover_manager {
564            Some(fm.get_statistics().await)
565        } else {
566            None
567        }
568    }
569
570    /// Register a connection failure callback for automatic reconnection
571    pub async fn register_failure_callback<F>(&self, callback: F)
572    where
573        F: Fn(String, String, u32) -> Pin<Box<dyn Future<Output = ()> + Send>>
574            + Send
575            + Sync
576            + 'static,
577    {
578        self.reconnect_manager
579            .register_failure_callback(callback)
580            .await;
581    }
582
583    /// Manually trigger failover (if configured)
584    pub async fn trigger_failover(&self) -> Result<()> {
585        if let Some(fm) = &self.failover_manager {
586            fm.trigger_failover().await
587        } else {
588            Err(anyhow!("Failover not configured for this pool"))
589        }
590    }
591
592    /// Check if the pool has failover configured
593    pub fn has_failover(&self) -> bool {
594        self.failover_manager.is_some()
595    }
596
597    /// Get unhealthy connections from health monitor
598    pub async fn get_unhealthy_connections(&self) -> Vec<String> {
599        self.health_monitor.get_unhealthy_connections().await
600    }
601
602    /// Subscribe to health monitoring events
603    pub fn subscribe_health_events(
604        &self,
605    ) -> broadcast::Receiver<crate::health_monitor::HealthEvent> {
606        self.health_monitor.subscribe()
607    }
608
609    /// Subscribe to reconnection events
610    pub fn subscribe_reconnect_events(
611        &self,
612    ) -> broadcast::Receiver<crate::reconnect::ReconnectEvent> {
613        self.reconnect_manager.subscribe()
614    }
615}
616
617// ── PooledConnectionHandle ────────────────────────────────────────────────────
618
619/// Enhanced connection handle with usage tracking and metrics
620pub struct PooledConnectionHandle<T: PooledConnection> {
621    connection: Option<T>,
622    pool_connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
623    active_count: Arc<Mutex<usize>>,
624    stats: Arc<RwLock<PoolStats>>,
625    metrics: Arc<RwLock<PoolMetrics>>,
626    adaptive_controller: Arc<RwLock<AdaptiveController>>,
627    acquired_at: Instant,
628    execution_times: Vec<Duration>,
629    operation_count: u32,
630    success_count: u32,
631}
632
633impl<T: PooledConnection> PooledConnectionHandle<T> {
634    pub(super) fn new(
635        connection: T,
636        pool_connections: Arc<Mutex<VecDeque<PooledConnectionWrapper<T>>>>,
637        active_count: Arc<Mutex<usize>>,
638        stats: Arc<RwLock<PoolStats>>,
639        metrics: Arc<RwLock<PoolMetrics>>,
640        adaptive_controller: Arc<RwLock<AdaptiveController>>,
641    ) -> Self {
642        Self {
643            connection: Some(connection),
644            pool_connections,
645            active_count,
646            stats,
647            metrics,
648            adaptive_controller,
649            acquired_at: Instant::now(),
650            execution_times: Vec::new(),
651            operation_count: 0,
652            success_count: 0,
653        }
654    }
655
656    pub fn record_operation(&mut self, execution_time: Duration, success: bool) {
657        self.execution_times.push(execution_time);
658        self.operation_count += 1;
659        if success {
660            self.success_count += 1;
661        }
662        debug!(
663            "Recorded operation: time={:?}, success={}, total_ops={}",
664            execution_time, success, self.operation_count
665        );
666    }
667
668    pub fn get_operation_stats(&self) -> (u32, u32, Duration) {
669        let avg_time = if !self.execution_times.is_empty() {
670            self.execution_times.iter().sum::<Duration>() / self.execution_times.len() as u32
671        } else {
672            Duration::ZERO
673        };
674        (self.operation_count, self.success_count, avg_time)
675    }
676
677    pub fn held_duration(&self) -> Duration {
678        self.acquired_at.elapsed()
679    }
680
681    pub fn as_ref(&self) -> Option<&T> {
682        self.connection.as_ref()
683    }
684
685    pub fn as_mut(&mut self) -> Option<&mut T> {
686        self.connection.as_mut()
687    }
688
689    pub fn take(mut self) -> Option<T> {
690        self.connection.take()
691    }
692}
693
694impl<T: PooledConnection> Drop for PooledConnectionHandle<T> {
695    fn drop(&mut self) {
696        if let Some(connection) = self.connection.take() {
697            let pool_connections = self.pool_connections.clone();
698            let active_count = self.active_count.clone();
699            let stats = self.stats.clone();
700            let _metrics = self.metrics.clone();
701            let adaptive_controller = self.adaptive_controller.clone();
702
703            let total_held_time = self.acquired_at.elapsed();
704            let avg_execution_time = if !self.execution_times.is_empty() {
705                self.execution_times.iter().sum::<Duration>() / self.execution_times.len() as u32
706            } else {
707                Duration::from_millis(50)
708            };
709
710            let success_rate = if self.operation_count > 0 {
711                self.success_count as f64 / self.operation_count as f64
712            } else {
713                1.0
714            };
715            let overall_success = success_rate > 0.8;
716
717            tokio::spawn(async move {
718                let mut wrapper = PooledConnectionWrapper::new(connection);
719                wrapper.record_usage(avg_execution_time, overall_success);
720                wrapper.is_in_use = false;
721
722                let usage_count = wrapper.usage_count;
723                pool_connections.lock().await.push_back(wrapper);
724
725                let mut active = active_count.lock().await;
726                if *active > 0 {
727                    *active -= 1;
728                }
729
730                stats.write().await.total_returned += 1;
731
732                let utilization = (*active as f64) / 10.0;
733                adaptive_controller
734                    .write()
735                    .await
736                    .record_metrics(avg_execution_time, utilization);
737
738                debug!(
739                    "Returned connection to pool: held_time={:?}, ops={}, success_rate={:.2}",
740                    total_held_time, usage_count, success_rate
741                );
742            });
743        }
744    }
745}