Skip to main content

mockforge_core/
connection_pool.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3use tokio::sync::{RwLock, Semaphore};
4use tracing::{debug, warn};
5
6/// Connection pool configuration
7#[derive(Debug, Clone)]
8pub struct PoolConfig {
9    /// Maximum number of connections
10    pub max_connections: usize,
11    /// Minimum number of idle connections to maintain
12    pub min_idle: usize,
13    /// Maximum idle time before connection is closed
14    pub max_idle_time: Duration,
15    /// Connection timeout
16    pub connection_timeout: Duration,
17    /// Enable connection health checks
18    pub health_check_enabled: bool,
19    /// Health check interval
20    pub health_check_interval: Duration,
21}
22
23impl Default for PoolConfig {
24    fn default() -> Self {
25        Self {
26            max_connections: 100,
27            min_idle: 10,
28            max_idle_time: Duration::from_secs(600), // 10 minutes
29            connection_timeout: Duration::from_secs(30),
30            health_check_enabled: true,
31            health_check_interval: Duration::from_secs(60),
32        }
33    }
34}
35
36/// Connection wrapper with metadata
37pub struct PooledConnection<T> {
38    inner: T,
39    created_at: Instant,
40    last_used: Instant,
41}
42
43impl<T> PooledConnection<T> {
44    /// Creates a new pooled connection wrapper
45    pub fn new(connection: T) -> Self {
46        let now = Instant::now();
47        Self {
48            inner: connection,
49            created_at: now,
50            last_used: now,
51        }
52    }
53
54    /// Gets a reference to the underlying connection
55    pub fn get(&self) -> &T {
56        &self.inner
57    }
58
59    /// Gets a mutable reference to the underlying connection and updates last used time
60    pub fn get_mut(&mut self) -> &mut T {
61        self.last_used = Instant::now();
62        &mut self.inner
63    }
64
65    /// Checks if the connection is stale based on idle time
66    pub fn is_stale(&self, max_idle_time: Duration) -> bool {
67        self.last_used.elapsed() > max_idle_time
68    }
69
70    /// Returns the age of the connection since creation
71    pub fn age(&self) -> Duration {
72        self.created_at.elapsed()
73    }
74}
75
76/// Generic connection pool
77pub struct ConnectionPool<T> {
78    config: PoolConfig,
79    available: Arc<RwLock<Vec<PooledConnection<T>>>>,
80    semaphore: Arc<Semaphore>,
81    metrics: Arc<RwLock<PoolMetrics>>,
82}
83
84/// Metrics for connection pool usage and health
85#[derive(Debug, Default, Clone)]
86pub struct PoolMetrics {
87    /// Number of currently active connections
88    pub active_connections: usize,
89    /// Number of idle connections available
90    pub idle_connections: usize,
91    /// Total number of connection acquisitions
92    pub total_acquired: u64,
93    /// Total number of connection releases
94    pub total_released: u64,
95    /// Total number of connections created
96    pub total_created: u64,
97    /// Total number of connections closed
98    pub total_closed: u64,
99    /// Number of acquire timeouts
100    pub acquire_timeouts: u64,
101    /// Number of health check failures
102    pub health_check_failures: u64,
103}
104
105impl<T> ConnectionPool<T>
106where
107    T: Send + 'static,
108{
109    /// Creates a new connection pool with the given configuration
110    pub fn new(config: PoolConfig) -> Self {
111        Self {
112            semaphore: Arc::new(Semaphore::new(config.max_connections)),
113            available: Arc::new(RwLock::new(Vec::with_capacity(config.max_connections))),
114            metrics: Arc::new(RwLock::new(PoolMetrics::default())),
115            config,
116        }
117    }
118
119    /// Acquire a connection from the pool
120    pub async fn acquire<F, Fut>(&self, create_fn: F) -> Result<PooledConnection<T>, PoolError>
121    where
122        F: FnOnce() -> Fut,
123        Fut: std::future::Future<Output = Result<T, PoolError>>,
124    {
125        // Wait for available slot
126        let permit = tokio::time::timeout(
127            self.config.connection_timeout,
128            self.semaphore.clone().acquire_owned(),
129        )
130        .await
131        .map_err(|_| {
132            debug!("Connection pool acquire timeout");
133            PoolError::Timeout
134        })?
135        .map_err(|_| PoolError::Closed)?;
136
137        // Try to get an existing connection
138        let mut available = self.available.write().await;
139
140        // Remove stale connections
141        available.retain(|conn| !conn.is_stale(self.config.max_idle_time));
142
143        let connection = if let Some(mut conn) = available.pop() {
144            // Reuse existing connection
145            conn.last_used = Instant::now();
146            drop(available);
147
148            let mut metrics = self.metrics.write().await;
149            metrics.total_acquired += 1;
150            metrics.active_connections += 1;
151            metrics.idle_connections = metrics.idle_connections.saturating_sub(1);
152            drop(metrics);
153
154            debug!("Reusing pooled connection");
155            conn
156        } else {
157            drop(available);
158
159            // Create new connection
160            let inner = create_fn().await?;
161            let conn = PooledConnection::new(inner);
162
163            let mut metrics = self.metrics.write().await;
164            metrics.total_created += 1;
165            metrics.total_acquired += 1;
166            metrics.active_connections += 1;
167            drop(metrics);
168
169            debug!("Created new pooled connection");
170            conn
171        };
172
173        // Permit will be returned when connection is released
174        std::mem::forget(permit);
175
176        Ok(connection)
177    }
178
179    /// Release a connection back to the pool
180    pub async fn release(&self, connection: PooledConnection<T>) {
181        let mut available = self.available.write().await;
182
183        // Don't return to pool if we're above max idle or connection is stale
184        if available.len() >= self.config.min_idle && connection.is_stale(self.config.max_idle_time)
185        {
186            drop(available);
187
188            let mut metrics = self.metrics.write().await;
189            metrics.total_closed += 1;
190            metrics.active_connections = metrics.active_connections.saturating_sub(1);
191            drop(metrics);
192
193            self.semaphore.add_permits(1);
194            debug!("Closed stale connection");
195            return;
196        }
197
198        available.push(connection);
199        drop(available);
200
201        let mut metrics = self.metrics.write().await;
202        metrics.total_released += 1;
203        metrics.active_connections = metrics.active_connections.saturating_sub(1);
204        metrics.idle_connections += 1;
205        drop(metrics);
206
207        self.semaphore.add_permits(1);
208        debug!("Released connection to pool");
209    }
210
211    /// Get current pool metrics
212    pub async fn metrics(&self) -> PoolMetrics {
213        self.metrics.read().await.clone()
214    }
215
216    /// Get current pool size
217    pub async fn size(&self) -> usize {
218        self.available.read().await.len()
219    }
220
221    /// Run health checks on idle connections
222    pub async fn health_check<F, Fut>(&self, check_fn: F)
223    where
224        F: Fn(&T) -> Fut,
225        Fut: std::future::Future<Output = bool>,
226    {
227        if !self.config.health_check_enabled {
228            return;
229        }
230
231        let mut available = self.available.write().await;
232        let mut healthy = Vec::new();
233        let mut failures = 0;
234
235        for conn in available.drain(..) {
236            if check_fn(conn.get()).await {
237                healthy.push(conn);
238            } else {
239                failures += 1;
240                warn!("Connection failed health check");
241            }
242        }
243
244        *available = healthy;
245        drop(available);
246
247        if failures > 0 {
248            let mut metrics = self.metrics.write().await;
249            metrics.health_check_failures += failures;
250            metrics.total_closed += failures;
251            metrics.idle_connections = metrics.idle_connections.saturating_sub(failures as usize);
252            drop(metrics);
253
254            self.semaphore.add_permits(failures as usize);
255        }
256    }
257
258    /// Maintain minimum idle connections
259    pub async fn maintain_idle<F, Fut>(&self, create_fn: F)
260    where
261        F: Fn() -> Fut,
262        Fut: std::future::Future<Output = Result<T, PoolError>>,
263    {
264        let current_idle = self.available.read().await.len();
265
266        if current_idle < self.config.min_idle {
267            let needed = self.config.min_idle - current_idle;
268
269            for _ in 0..needed {
270                if let Ok(permit) = self.semaphore.clone().try_acquire_owned() {
271                    match create_fn().await {
272                        Ok(conn) => {
273                            let pooled = PooledConnection::new(conn);
274                            self.available.write().await.push(pooled);
275
276                            let mut metrics = self.metrics.write().await;
277                            metrics.total_created += 1;
278                            metrics.idle_connections += 1;
279
280                            std::mem::forget(permit);
281                        }
282                        Err(e) => {
283                            warn!("Failed to create idle connection: {:?}", e);
284                            drop(permit);
285                        }
286                    }
287                } else {
288                    break;
289                }
290            }
291        }
292    }
293}
294
295/// Errors that can occur in connection pool operations
296#[derive(Debug, thiserror::Error)]
297pub enum PoolError {
298    /// Connection acquisition timed out
299    #[error("Connection pool timeout")]
300    Timeout,
301
302    /// Connection pool has been closed
303    #[error("Connection pool closed")]
304    Closed,
305
306    /// Failed to create a new connection
307    #[error("Failed to create connection: {0}")]
308    CreateError(String),
309
310    /// Error during connection operation
311    #[error("Connection error: {0}")]
312    ConnectionError(String),
313}
314
315/// HTTP client connection pool (example usage)
316pub type HttpClientPool = ConnectionPool<reqwest::Client>;
317
318impl HttpClientPool {
319    /// Creates a new HTTP client pool with the given configuration
320    pub fn new_http(config: PoolConfig) -> Self {
321        Self::new(config)
322    }
323
324    /// Acquires an HTTP client from the pool
325    pub async fn acquire_client(&self) -> Result<PooledConnection<reqwest::Client>, PoolError> {
326        self.acquire(|| async {
327            reqwest::Client::builder()
328                .timeout(Duration::from_secs(30))
329                .pool_max_idle_per_host(10)
330                .build()
331                .map_err(|e| PoolError::CreateError(e.to_string()))
332        })
333        .await
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use std::sync::atomic::{AtomicUsize, Ordering};
341
342    // PoolConfig tests
343    #[test]
344    fn test_pool_config_default() {
345        let config = PoolConfig::default();
346        assert_eq!(config.max_connections, 100);
347        assert_eq!(config.min_idle, 10);
348        assert_eq!(config.max_idle_time, Duration::from_secs(600));
349        assert_eq!(config.connection_timeout, Duration::from_secs(30));
350        assert!(config.health_check_enabled);
351        assert_eq!(config.health_check_interval, Duration::from_secs(60));
352    }
353
354    #[test]
355    fn test_pool_config_clone() {
356        let config = PoolConfig {
357            max_connections: 50,
358            min_idle: 5,
359            ..Default::default()
360        };
361        let cloned = config.clone();
362        assert_eq!(cloned.max_connections, config.max_connections);
363        assert_eq!(cloned.min_idle, config.min_idle);
364    }
365
366    #[test]
367    fn test_pool_config_debug() {
368        let config = PoolConfig::default();
369        let debug = format!("{:?}", config);
370        assert!(debug.contains("PoolConfig"));
371        assert!(debug.contains("max_connections"));
372    }
373
374    // PooledConnection tests
375    #[test]
376    fn test_pooled_connection_new() {
377        let conn = PooledConnection::new(42u32);
378        assert_eq!(*conn.get(), 42);
379    }
380
381    #[test]
382    fn test_pooled_connection_get() {
383        let conn = PooledConnection::new("test".to_string());
384        assert_eq!(*conn.get(), "test");
385    }
386
387    #[test]
388    fn test_pooled_connection_get_mut() {
389        let mut conn = PooledConnection::new(vec![1, 2, 3]);
390        conn.get_mut().push(4);
391        assert_eq!(*conn.get(), vec![1, 2, 3, 4]);
392    }
393
394    #[test]
395    fn test_pooled_connection_is_stale() {
396        let conn = PooledConnection::new(42u32);
397        // Just created, should not be stale
398        assert!(!conn.is_stale(Duration::from_secs(1)));
399        // With zero duration, should be stale immediately
400        assert!(conn.is_stale(Duration::from_nanos(0)));
401    }
402
403    #[test]
404    fn test_pooled_connection_age() {
405        let conn = PooledConnection::new(42u32);
406        let age = conn.age();
407        // Should be very small (just created)
408        assert!(age < Duration::from_secs(1));
409    }
410
411    // PoolMetrics tests
412    #[test]
413    fn test_pool_metrics_default() {
414        let metrics = PoolMetrics::default();
415        assert_eq!(metrics.active_connections, 0);
416        assert_eq!(metrics.idle_connections, 0);
417        assert_eq!(metrics.total_acquired, 0);
418        assert_eq!(metrics.total_released, 0);
419        assert_eq!(metrics.total_created, 0);
420        assert_eq!(metrics.total_closed, 0);
421        assert_eq!(metrics.acquire_timeouts, 0);
422        assert_eq!(metrics.health_check_failures, 0);
423    }
424
425    #[test]
426    fn test_pool_metrics_clone() {
427        let mut metrics = PoolMetrics::default();
428        metrics.total_acquired = 10;
429        metrics.active_connections = 5;
430        let cloned = metrics.clone();
431        assert_eq!(cloned.total_acquired, 10);
432        assert_eq!(cloned.active_connections, 5);
433    }
434
435    #[test]
436    fn test_pool_metrics_debug() {
437        let metrics = PoolMetrics::default();
438        let debug = format!("{:?}", metrics);
439        assert!(debug.contains("PoolMetrics"));
440        assert!(debug.contains("active_connections"));
441    }
442
443    // PoolError tests
444    #[test]
445    fn test_pool_error_timeout() {
446        let error = PoolError::Timeout;
447        assert!(error.to_string().contains("timeout"));
448    }
449
450    #[test]
451    fn test_pool_error_closed() {
452        let error = PoolError::Closed;
453        assert!(error.to_string().contains("closed"));
454    }
455
456    #[test]
457    fn test_pool_error_create_error() {
458        let error = PoolError::CreateError("connection failed".to_string());
459        let msg = error.to_string();
460        assert!(msg.contains("create connection"));
461        assert!(msg.contains("connection failed"));
462    }
463
464    #[test]
465    fn test_pool_error_connection_error() {
466        let error = PoolError::ConnectionError("network issue".to_string());
467        let msg = error.to_string();
468        assert!(msg.contains("Connection error"));
469        assert!(msg.contains("network issue"));
470    }
471
472    #[test]
473    fn test_pool_error_debug() {
474        let error = PoolError::Timeout;
475        let debug = format!("{:?}", error);
476        assert!(debug.contains("Timeout"));
477    }
478
479    // ConnectionPool tests
480    #[tokio::test]
481    async fn test_connection_pool() {
482        let config = PoolConfig {
483            max_connections: 5,
484            min_idle: 2,
485            ..Default::default()
486        };
487
488        let pool = ConnectionPool::<u32>::new(config);
489
490        // Acquire connection
491        let conn1 = pool.acquire(|| async { Ok(42) }).await.unwrap();
492
493        assert_eq!(*conn1.get(), 42);
494
495        // Release connection
496        pool.release(conn1).await;
497
498        // Verify metrics
499        let metrics = pool.metrics().await;
500        assert_eq!(metrics.total_created, 1);
501        assert_eq!(metrics.total_acquired, 1);
502        assert_eq!(metrics.total_released, 1);
503    }
504
505    #[tokio::test]
506    async fn test_connection_pool_new() {
507        let config = PoolConfig {
508            max_connections: 10,
509            min_idle: 2,
510            ..Default::default()
511        };
512        let pool = ConnectionPool::<u32>::new(config);
513
514        // Pool should start empty
515        assert_eq!(pool.size().await, 0);
516    }
517
518    #[tokio::test]
519    async fn test_connection_pool_acquire_creates_connection() {
520        let config = PoolConfig::default();
521        let pool = ConnectionPool::<String>::new(config);
522
523        let conn = pool.acquire(|| async { Ok("test-connection".to_string()) }).await.unwrap();
524
525        assert_eq!(*conn.get(), "test-connection");
526
527        let metrics = pool.metrics().await;
528        assert_eq!(metrics.total_created, 1);
529        assert_eq!(metrics.total_acquired, 1);
530    }
531
532    #[tokio::test]
533    async fn test_connection_pool_reuses_connection() {
534        let config = PoolConfig {
535            max_connections: 5,
536            min_idle: 1,
537            ..Default::default()
538        };
539        let pool = ConnectionPool::<u32>::new(config);
540        let create_count = Arc::new(AtomicUsize::new(0));
541
542        // First acquire - creates connection
543        let create_count_clone = create_count.clone();
544        let conn1 = pool
545            .acquire(move || {
546                let count = create_count_clone.clone();
547                async move {
548                    count.fetch_add(1, Ordering::SeqCst);
549                    Ok(42u32)
550                }
551            })
552            .await
553            .unwrap();
554
555        // Release it
556        pool.release(conn1).await;
557
558        // Second acquire - should reuse
559        let create_count_clone = create_count.clone();
560        let conn2 = pool
561            .acquire(move || {
562                let count = create_count_clone.clone();
563                async move {
564                    count.fetch_add(1, Ordering::SeqCst);
565                    Ok(100u32)
566                }
567            })
568            .await
569            .unwrap();
570
571        // Should still be the original connection (value 42), not a new one
572        assert_eq!(*conn2.get(), 42);
573        assert_eq!(create_count.load(Ordering::SeqCst), 1); // Only created once
574
575        let metrics = pool.metrics().await;
576        assert_eq!(metrics.total_created, 1);
577        assert_eq!(metrics.total_acquired, 2);
578    }
579
580    #[tokio::test]
581    async fn test_connection_pool_release() {
582        let config = PoolConfig::default();
583        let pool = ConnectionPool::<u32>::new(config);
584
585        let conn = pool.acquire(|| async { Ok(42) }).await.unwrap();
586        assert_eq!(pool.size().await, 0); // Connection in use, not in pool
587
588        pool.release(conn).await;
589        assert_eq!(pool.size().await, 1); // Connection returned to pool
590
591        let metrics = pool.metrics().await;
592        assert_eq!(metrics.total_released, 1);
593        assert_eq!(metrics.idle_connections, 1);
594    }
595
596    #[tokio::test]
597    async fn test_connection_pool_metrics() {
598        let config = PoolConfig::default();
599        let pool = ConnectionPool::<u32>::new(config);
600
601        // Initial metrics
602        let metrics = pool.metrics().await;
603        assert_eq!(metrics.total_created, 0);
604
605        // Acquire and release
606        let conn = pool.acquire(|| async { Ok(1) }).await.unwrap();
607        pool.release(conn).await;
608
609        let metrics = pool.metrics().await;
610        assert_eq!(metrics.total_created, 1);
611        assert_eq!(metrics.total_acquired, 1);
612        assert_eq!(metrics.total_released, 1);
613    }
614
615    #[tokio::test]
616    async fn test_connection_pool_size() {
617        let config = PoolConfig::default();
618        let pool = ConnectionPool::<u32>::new(config);
619
620        assert_eq!(pool.size().await, 0);
621
622        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
623        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
624
625        // Connections in use, pool is empty
626        assert_eq!(pool.size().await, 0);
627
628        pool.release(conn1).await;
629        assert_eq!(pool.size().await, 1);
630
631        pool.release(conn2).await;
632        assert_eq!(pool.size().await, 2);
633    }
634
635    #[tokio::test]
636    async fn test_connection_pool_multiple_concurrent_acquires() {
637        let config = PoolConfig {
638            max_connections: 10,
639            ..Default::default()
640        };
641        let pool = Arc::new(ConnectionPool::<u32>::new(config));
642
643        let mut handles = vec![];
644        for i in 0..5 {
645            let pool_clone = pool.clone();
646            let handle = tokio::spawn(async move {
647                let conn = pool_clone.acquire(move || async move { Ok(i as u32) }).await.unwrap();
648                // Simulate some work
649                tokio::time::sleep(Duration::from_millis(10)).await;
650                pool_clone.release(conn).await;
651            });
652            handles.push(handle);
653        }
654
655        for handle in handles {
656            handle.await.unwrap();
657        }
658
659        let metrics = pool.metrics().await;
660        assert_eq!(metrics.total_acquired, 5);
661        assert_eq!(metrics.total_released, 5);
662    }
663
664    #[tokio::test]
665    async fn test_connection_pool_acquire_error() {
666        let config = PoolConfig::default();
667        let pool = ConnectionPool::<u32>::new(config);
668
669        let result = pool
670            .acquire(|| async { Err(PoolError::CreateError("test error".to_string())) })
671            .await;
672
673        assert!(result.is_err());
674        if let Err(PoolError::CreateError(msg)) = result {
675            assert_eq!(msg, "test error");
676        }
677    }
678
679    #[tokio::test]
680    async fn test_connection_pool_health_check() {
681        let config = PoolConfig {
682            max_connections: 5,
683            min_idle: 0,
684            health_check_enabled: true,
685            ..Default::default()
686        };
687        let pool = ConnectionPool::<u32>::new(config);
688
689        // Add some connections to pool
690        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
691        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
692        pool.release(conn1).await;
693        pool.release(conn2).await;
694
695        // Health check - all connections pass
696        pool.health_check(|_| async { true }).await;
697
698        assert_eq!(pool.size().await, 2);
699
700        // Health check - all connections fail
701        pool.health_check(|_| async { false }).await;
702
703        assert_eq!(pool.size().await, 0);
704
705        let metrics = pool.metrics().await;
706        assert_eq!(metrics.health_check_failures, 2);
707    }
708
709    #[tokio::test]
710    async fn test_connection_pool_health_check_disabled() {
711        let config = PoolConfig {
712            health_check_enabled: false,
713            ..Default::default()
714        };
715        let pool = ConnectionPool::<u32>::new(config);
716
717        let conn = pool.acquire(|| async { Ok(1) }).await.unwrap();
718        pool.release(conn).await;
719
720        // Health check should do nothing when disabled
721        pool.health_check(|_| async { false }).await;
722
723        // Connection should still be there
724        assert_eq!(pool.size().await, 1);
725    }
726
727    #[tokio::test]
728    async fn test_connection_pool_maintain_idle() {
729        let config = PoolConfig {
730            max_connections: 10,
731            min_idle: 3,
732            ..Default::default()
733        };
734        let pool = ConnectionPool::<u32>::new(config);
735
736        // Pool starts empty
737        assert_eq!(pool.size().await, 0);
738
739        // Maintain idle should create min_idle connections
740        pool.maintain_idle(|| async { Ok(42u32) }).await;
741
742        assert_eq!(pool.size().await, 3);
743
744        let metrics = pool.metrics().await;
745        assert_eq!(metrics.total_created, 3);
746        assert_eq!(metrics.idle_connections, 3);
747    }
748
749    #[tokio::test]
750    async fn test_connection_pool_maintain_idle_already_sufficient() {
751        let config = PoolConfig {
752            max_connections: 10,
753            min_idle: 2,
754            ..Default::default()
755        };
756        let pool = ConnectionPool::<u32>::new(config);
757
758        // Manually add 3 connections
759        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
760        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
761        let conn3 = pool.acquire(|| async { Ok(3) }).await.unwrap();
762        pool.release(conn1).await;
763        pool.release(conn2).await;
764        pool.release(conn3).await;
765
766        let initial_created = pool.metrics().await.total_created;
767
768        // Maintain idle should not create more since we have 3 > min_idle(2)
769        pool.maintain_idle(|| async { Ok(100u32) }).await;
770
771        let final_created = pool.metrics().await.total_created;
772        assert_eq!(initial_created, final_created);
773    }
774
775    #[tokio::test]
776    async fn test_connection_pool_maintain_idle_error() {
777        let config = PoolConfig {
778            max_connections: 10,
779            min_idle: 3,
780            ..Default::default()
781        };
782        let pool = ConnectionPool::<u32>::new(config);
783
784        // maintain_idle with failing create function
785        pool.maintain_idle(|| async { Err(PoolError::CreateError("test".to_string())) })
786            .await;
787
788        // Pool should still be empty
789        assert_eq!(pool.size().await, 0);
790    }
791
792    // HttpClientPool tests
793    #[tokio::test]
794    async fn test_http_client_pool_new() {
795        let config = PoolConfig::default();
796        let pool = HttpClientPool::new_http(config);
797        assert_eq!(pool.size().await, 0);
798    }
799
800    #[tokio::test]
801    async fn test_http_client_pool_acquire() {
802        let config = PoolConfig::default();
803        let pool = HttpClientPool::new_http(config);
804
805        let result = pool.acquire_client().await;
806        assert!(result.is_ok());
807
808        let conn = result.unwrap();
809        // Verify it's a valid reqwest client
810        let _client: &reqwest::Client = conn.get();
811    }
812
813    // Edge cases
814    #[tokio::test]
815    async fn test_connection_pool_stale_connection_not_returned() {
816        let config = PoolConfig {
817            max_connections: 5,
818            min_idle: 0, // Set to 0 so stale connections get closed
819            max_idle_time: Duration::from_millis(1), // Very short idle time
820            ..Default::default()
821        };
822        let pool = ConnectionPool::<u32>::new(config);
823
824        let conn = pool.acquire(|| async { Ok(42) }).await.unwrap();
825
826        // Wait for connection to become stale
827        tokio::time::sleep(Duration::from_millis(10)).await;
828
829        // Release stale connection
830        pool.release(conn).await;
831
832        // Stale connection should be closed, not returned to pool
833        let metrics = pool.metrics().await;
834        assert_eq!(metrics.total_closed, 1);
835    }
836
837    #[tokio::test]
838    async fn test_connection_pool_with_complex_type() {
839        #[derive(Debug, Clone)]
840        struct ComplexConnection {
841            id: u32,
842            data: Vec<String>,
843        }
844
845        let config = PoolConfig::default();
846        let pool = ConnectionPool::<ComplexConnection>::new(config);
847
848        let conn = pool
849            .acquire(|| async {
850                Ok(ComplexConnection {
851                    id: 123,
852                    data: vec!["test".to_string()],
853                })
854            })
855            .await
856            .unwrap();
857
858        assert_eq!(conn.get().id, 123);
859        assert_eq!(conn.get().data, vec!["test".to_string()]);
860    }
861
862    #[tokio::test]
863    async fn test_pooled_connection_updates_last_used() {
864        let mut conn = PooledConnection::new(42u32);
865        let initial_time = conn.last_used;
866
867        // Sleep a tiny bit
868        tokio::time::sleep(Duration::from_millis(1)).await;
869
870        // get_mut should update last_used
871        let _ = conn.get_mut();
872
873        assert!(conn.last_used > initial_time);
874    }
875
876    #[tokio::test]
877    async fn test_connection_pool_partial_health_check() {
878        let config = PoolConfig {
879            max_connections: 10,
880            min_idle: 0,
881            health_check_enabled: true,
882            ..Default::default()
883        };
884        let pool = ConnectionPool::<u32>::new(config);
885
886        // Add connections with different values
887        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
888        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
889        let conn3 = pool.acquire(|| async { Ok(3) }).await.unwrap();
890        pool.release(conn1).await;
891        pool.release(conn2).await;
892        pool.release(conn3).await;
893
894        // Health check that fails only even numbers
895        pool.health_check(|val| {
896            let v = *val;
897            async move { v % 2 != 0 }
898        })
899        .await;
900
901        // Only odd-valued connections should remain
902        assert_eq!(pool.size().await, 2); // 1 and 3
903
904        let metrics = pool.metrics().await;
905        assert_eq!(metrics.health_check_failures, 1); // Connection with value 2
906    }
907}