Skip to main content

mockforge_core/
connection_pool.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3use tokio::sync::{RwLock, Semaphore};
4use tracing::{debug, warn};
5
6/// Connection pool configuration
7#[derive(Debug, Clone)]
8pub struct PoolConfig {
9    /// Maximum number of connections
10    pub max_connections: usize,
11    /// Minimum number of idle connections to maintain
12    pub min_idle: usize,
13    /// Maximum idle time before connection is closed
14    pub max_idle_time: Duration,
15    /// Connection timeout
16    pub connection_timeout: Duration,
17    /// Enable connection health checks
18    pub health_check_enabled: bool,
19    /// Health check interval
20    pub health_check_interval: Duration,
21}
22
23impl Default for PoolConfig {
24    fn default() -> Self {
25        Self {
26            max_connections: 100,
27            min_idle: 10,
28            max_idle_time: Duration::from_secs(600), // 10 minutes
29            connection_timeout: Duration::from_secs(30),
30            health_check_enabled: true,
31            health_check_interval: Duration::from_secs(60),
32        }
33    }
34}
35
36/// Connection wrapper with metadata
37pub struct PooledConnection<T> {
38    inner: T,
39    created_at: Instant,
40    last_used: Instant,
41}
42
43impl<T> PooledConnection<T> {
44    /// Creates a new pooled connection wrapper
45    pub fn new(connection: T) -> Self {
46        let now = Instant::now();
47        Self {
48            inner: connection,
49            created_at: now,
50            last_used: now,
51        }
52    }
53
54    /// Gets a reference to the underlying connection
55    pub fn get(&self) -> &T {
56        &self.inner
57    }
58
59    /// Gets a mutable reference to the underlying connection and updates last used time
60    pub fn get_mut(&mut self) -> &mut T {
61        self.last_used = Instant::now();
62        &mut self.inner
63    }
64
65    /// Checks if the connection is stale based on idle time
66    pub fn is_stale(&self, max_idle_time: Duration) -> bool {
67        self.last_used.elapsed() > max_idle_time
68    }
69
70    /// Returns the age of the connection since creation
71    pub fn age(&self) -> Duration {
72        self.created_at.elapsed()
73    }
74}
75
76/// Generic connection pool
77pub struct ConnectionPool<T> {
78    config: PoolConfig,
79    available: Arc<RwLock<Vec<PooledConnection<T>>>>,
80    semaphore: Arc<Semaphore>,
81    metrics: Arc<RwLock<PoolMetrics>>,
82}
83
84/// Metrics for connection pool usage and health
85#[derive(Debug, Default, Clone)]
86pub struct PoolMetrics {
87    /// Number of currently active connections
88    pub active_connections: usize,
89    /// Number of idle connections available
90    pub idle_connections: usize,
91    /// Total number of connection acquisitions
92    pub total_acquired: u64,
93    /// Total number of connection releases
94    pub total_released: u64,
95    /// Total number of connections created
96    pub total_created: u64,
97    /// Total number of connections closed
98    pub total_closed: u64,
99    /// Number of acquire timeouts
100    pub acquire_timeouts: u64,
101    /// Number of health check failures
102    pub health_check_failures: u64,
103}
104
105impl<T> ConnectionPool<T>
106where
107    T: Send + 'static,
108{
109    /// Creates a new connection pool with the given configuration
110    pub fn new(config: PoolConfig) -> Self {
111        Self {
112            semaphore: Arc::new(Semaphore::new(config.max_connections)),
113            available: Arc::new(RwLock::new(Vec::with_capacity(config.max_connections))),
114            metrics: Arc::new(RwLock::new(PoolMetrics::default())),
115            config,
116        }
117    }
118
119    /// Acquire a connection from the pool
120    pub async fn acquire<F, Fut>(&self, create_fn: F) -> Result<PooledConnection<T>, PoolError>
121    where
122        F: FnOnce() -> Fut,
123        Fut: std::future::Future<Output = Result<T, PoolError>>,
124    {
125        // Wait for available slot
126        let permit = tokio::time::timeout(
127            self.config.connection_timeout,
128            self.semaphore.clone().acquire_owned(),
129        )
130        .await
131        .map_err(|_| {
132            debug!("Connection pool acquire timeout");
133            PoolError::Timeout
134        })?
135        .map_err(|_| PoolError::Closed)?;
136
137        // Try to get an existing connection
138        let mut available = self.available.write().await;
139
140        // Remove stale connections
141        available.retain(|conn| !conn.is_stale(self.config.max_idle_time));
142
143        let connection = if let Some(mut conn) = available.pop() {
144            // Reuse existing connection
145            conn.last_used = Instant::now();
146            drop(available);
147
148            let mut metrics = self.metrics.write().await;
149            metrics.total_acquired += 1;
150            metrics.active_connections += 1;
151            metrics.idle_connections = metrics.idle_connections.saturating_sub(1);
152            drop(metrics);
153
154            debug!("Reusing pooled connection");
155            conn
156        } else {
157            drop(available);
158
159            // Create new connection
160            let inner = create_fn().await?;
161            let conn = PooledConnection::new(inner);
162
163            let mut metrics = self.metrics.write().await;
164            metrics.total_created += 1;
165            metrics.total_acquired += 1;
166            metrics.active_connections += 1;
167            drop(metrics);
168
169            debug!("Created new pooled connection");
170            conn
171        };
172
173        // Permit will be returned when connection is released
174        std::mem::forget(permit);
175
176        Ok(connection)
177    }
178
179    /// Release a connection back to the pool
180    pub async fn release(&self, connection: PooledConnection<T>) {
181        let mut available = self.available.write().await;
182
183        // Don't return to pool if we're above max idle or connection is stale
184        if available.len() >= self.config.min_idle && connection.is_stale(self.config.max_idle_time)
185        {
186            drop(available);
187
188            let mut metrics = self.metrics.write().await;
189            metrics.total_closed += 1;
190            metrics.active_connections = metrics.active_connections.saturating_sub(1);
191            drop(metrics);
192
193            self.semaphore.add_permits(1);
194            debug!("Closed stale connection");
195            return;
196        }
197
198        available.push(connection);
199        drop(available);
200
201        let mut metrics = self.metrics.write().await;
202        metrics.total_released += 1;
203        metrics.active_connections = metrics.active_connections.saturating_sub(1);
204        metrics.idle_connections += 1;
205        drop(metrics);
206
207        self.semaphore.add_permits(1);
208        debug!("Released connection to pool");
209    }
210
211    /// Get current pool metrics
212    pub async fn metrics(&self) -> PoolMetrics {
213        self.metrics.read().await.clone()
214    }
215
216    /// Get current pool size
217    pub async fn size(&self) -> usize {
218        self.available.read().await.len()
219    }
220
221    /// Run health checks on idle connections
222    pub async fn health_check<F, Fut>(&self, check_fn: F)
223    where
224        F: Fn(&T) -> Fut,
225        Fut: std::future::Future<Output = bool>,
226    {
227        if !self.config.health_check_enabled {
228            return;
229        }
230
231        let mut available = self.available.write().await;
232        let mut healthy = Vec::new();
233        let mut failures = 0;
234
235        for conn in available.drain(..) {
236            if check_fn(conn.get()).await {
237                healthy.push(conn);
238            } else {
239                failures += 1;
240                warn!("Connection failed health check");
241            }
242        }
243
244        *available = healthy;
245        drop(available);
246
247        if failures > 0 {
248            let mut metrics = self.metrics.write().await;
249            metrics.health_check_failures += failures;
250            metrics.total_closed += failures;
251            metrics.idle_connections = metrics.idle_connections.saturating_sub(failures as usize);
252            drop(metrics);
253
254            self.semaphore.add_permits(failures as usize);
255        }
256    }
257
258    /// Maintain minimum idle connections
259    pub async fn maintain_idle<F, Fut>(&self, create_fn: F)
260    where
261        F: Fn() -> Fut,
262        Fut: std::future::Future<Output = Result<T, PoolError>>,
263    {
264        let current_idle = self.available.read().await.len();
265
266        if current_idle < self.config.min_idle {
267            let needed = self.config.min_idle - current_idle;
268
269            for _ in 0..needed {
270                if let Ok(permit) = self.semaphore.clone().try_acquire_owned() {
271                    match create_fn().await {
272                        Ok(conn) => {
273                            let pooled = PooledConnection::new(conn);
274                            self.available.write().await.push(pooled);
275
276                            let mut metrics = self.metrics.write().await;
277                            metrics.total_created += 1;
278                            metrics.idle_connections += 1;
279
280                            std::mem::forget(permit);
281                        }
282                        Err(e) => {
283                            warn!("Failed to create idle connection: {:?}", e);
284                            drop(permit);
285                        }
286                    }
287                } else {
288                    break;
289                }
290            }
291        }
292    }
293}
294
295/// Errors that can occur in connection pool operations
296#[derive(Debug, thiserror::Error)]
297pub enum PoolError {
298    /// Connection acquisition timed out
299    #[error("Connection pool timeout")]
300    Timeout,
301
302    /// Connection pool has been closed
303    #[error("Connection pool closed")]
304    Closed,
305
306    /// Failed to create a new connection
307    #[error("Failed to create connection: {0}")]
308    CreateError(String),
309
310    /// Error during connection operation
311    #[error("Connection error: {0}")]
312    ConnectionError(String),
313}
314
315/// HTTP client connection pool (example usage)
316pub type HttpClientPool = ConnectionPool<reqwest::Client>;
317
318impl HttpClientPool {
319    /// Creates a new HTTP client pool with the given configuration
320    pub fn new_http(config: PoolConfig) -> Self {
321        Self::new(config)
322    }
323
324    /// Acquires an HTTP client from the pool
325    pub async fn acquire_client(&self) -> Result<PooledConnection<reqwest::Client>, PoolError> {
326        self.acquire(|| async {
327            reqwest::Client::builder()
328                .timeout(Duration::from_secs(30))
329                .pool_max_idle_per_host(10)
330                .build()
331                .map_err(|e| PoolError::CreateError(e.to_string()))
332        })
333        .await
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use std::sync::atomic::{AtomicUsize, Ordering};
341
342    // PoolConfig tests
343    #[test]
344    fn test_pool_config_default() {
345        let config = PoolConfig::default();
346        assert_eq!(config.max_connections, 100);
347        assert_eq!(config.min_idle, 10);
348        assert_eq!(config.max_idle_time, Duration::from_secs(600));
349        assert_eq!(config.connection_timeout, Duration::from_secs(30));
350        assert!(config.health_check_enabled);
351        assert_eq!(config.health_check_interval, Duration::from_secs(60));
352    }
353
354    #[test]
355    fn test_pool_config_clone() {
356        let config = PoolConfig {
357            max_connections: 50,
358            min_idle: 5,
359            ..Default::default()
360        };
361        let cloned = config.clone();
362        assert_eq!(cloned.max_connections, config.max_connections);
363        assert_eq!(cloned.min_idle, config.min_idle);
364    }
365
366    #[test]
367    fn test_pool_config_debug() {
368        let config = PoolConfig::default();
369        let debug = format!("{:?}", config);
370        assert!(debug.contains("PoolConfig"));
371        assert!(debug.contains("max_connections"));
372    }
373
374    // PooledConnection tests
375    #[test]
376    fn test_pooled_connection_new() {
377        let conn = PooledConnection::new(42u32);
378        assert_eq!(*conn.get(), 42);
379    }
380
381    #[test]
382    fn test_pooled_connection_get() {
383        let conn = PooledConnection::new("test".to_string());
384        assert_eq!(*conn.get(), "test");
385    }
386
387    #[test]
388    fn test_pooled_connection_get_mut() {
389        let mut conn = PooledConnection::new(vec![1, 2, 3]);
390        conn.get_mut().push(4);
391        assert_eq!(*conn.get(), vec![1, 2, 3, 4]);
392    }
393
394    #[test]
395    fn test_pooled_connection_is_stale() {
396        let conn = PooledConnection::new(42u32);
397        // Just created, should not be stale
398        assert!(!conn.is_stale(Duration::from_secs(1)));
399        // With zero duration, should be stale immediately
400        assert!(conn.is_stale(Duration::from_nanos(0)));
401    }
402
403    #[test]
404    fn test_pooled_connection_age() {
405        let conn = PooledConnection::new(42u32);
406        let age = conn.age();
407        // Should be very small (just created)
408        assert!(age < Duration::from_secs(1));
409    }
410
411    // PoolMetrics tests
412    #[test]
413    fn test_pool_metrics_default() {
414        let metrics = PoolMetrics::default();
415        assert_eq!(metrics.active_connections, 0);
416        assert_eq!(metrics.idle_connections, 0);
417        assert_eq!(metrics.total_acquired, 0);
418        assert_eq!(metrics.total_released, 0);
419        assert_eq!(metrics.total_created, 0);
420        assert_eq!(metrics.total_closed, 0);
421        assert_eq!(metrics.acquire_timeouts, 0);
422        assert_eq!(metrics.health_check_failures, 0);
423    }
424
425    #[test]
426    fn test_pool_metrics_clone() {
427        let metrics = PoolMetrics {
428            total_acquired: 10,
429            active_connections: 5,
430            ..Default::default()
431        };
432        let cloned = metrics.clone();
433        assert_eq!(cloned.total_acquired, 10);
434        assert_eq!(cloned.active_connections, 5);
435    }
436
437    #[test]
438    fn test_pool_metrics_debug() {
439        let metrics = PoolMetrics::default();
440        let debug = format!("{:?}", metrics);
441        assert!(debug.contains("PoolMetrics"));
442        assert!(debug.contains("active_connections"));
443    }
444
445    // PoolError tests
446    #[test]
447    fn test_pool_error_timeout() {
448        let error = PoolError::Timeout;
449        assert!(error.to_string().contains("timeout"));
450    }
451
452    #[test]
453    fn test_pool_error_closed() {
454        let error = PoolError::Closed;
455        assert!(error.to_string().contains("closed"));
456    }
457
458    #[test]
459    fn test_pool_error_create_error() {
460        let error = PoolError::CreateError("connection failed".to_string());
461        let msg = error.to_string();
462        assert!(msg.contains("create connection"));
463        assert!(msg.contains("connection failed"));
464    }
465
466    #[test]
467    fn test_pool_error_connection_error() {
468        let error = PoolError::ConnectionError("network issue".to_string());
469        let msg = error.to_string();
470        assert!(msg.contains("Connection error"));
471        assert!(msg.contains("network issue"));
472    }
473
474    #[test]
475    fn test_pool_error_debug() {
476        let error = PoolError::Timeout;
477        let debug = format!("{:?}", error);
478        assert!(debug.contains("Timeout"));
479    }
480
481    // ConnectionPool tests
482    #[tokio::test]
483    async fn test_connection_pool() {
484        let config = PoolConfig {
485            max_connections: 5,
486            min_idle: 2,
487            ..Default::default()
488        };
489
490        let pool = ConnectionPool::<u32>::new(config);
491
492        // Acquire connection
493        let conn1 = pool.acquire(|| async { Ok(42) }).await.unwrap();
494
495        assert_eq!(*conn1.get(), 42);
496
497        // Release connection
498        pool.release(conn1).await;
499
500        // Verify metrics
501        let metrics = pool.metrics().await;
502        assert_eq!(metrics.total_created, 1);
503        assert_eq!(metrics.total_acquired, 1);
504        assert_eq!(metrics.total_released, 1);
505    }
506
507    #[tokio::test]
508    async fn test_connection_pool_new() {
509        let config = PoolConfig {
510            max_connections: 10,
511            min_idle: 2,
512            ..Default::default()
513        };
514        let pool = ConnectionPool::<u32>::new(config);
515
516        // Pool should start empty
517        assert_eq!(pool.size().await, 0);
518    }
519
520    #[tokio::test]
521    async fn test_connection_pool_acquire_creates_connection() {
522        let config = PoolConfig::default();
523        let pool = ConnectionPool::<String>::new(config);
524
525        let conn = pool.acquire(|| async { Ok("test-connection".to_string()) }).await.unwrap();
526
527        assert_eq!(*conn.get(), "test-connection");
528
529        let metrics = pool.metrics().await;
530        assert_eq!(metrics.total_created, 1);
531        assert_eq!(metrics.total_acquired, 1);
532    }
533
534    #[tokio::test]
535    async fn test_connection_pool_reuses_connection() {
536        let config = PoolConfig {
537            max_connections: 5,
538            min_idle: 1,
539            ..Default::default()
540        };
541        let pool = ConnectionPool::<u32>::new(config);
542        let create_count = Arc::new(AtomicUsize::new(0));
543
544        // First acquire - creates connection
545        let create_count_clone = create_count.clone();
546        let conn1 = pool
547            .acquire(move || {
548                let count = create_count_clone.clone();
549                async move {
550                    count.fetch_add(1, Ordering::SeqCst);
551                    Ok(42u32)
552                }
553            })
554            .await
555            .unwrap();
556
557        // Release it
558        pool.release(conn1).await;
559
560        // Second acquire - should reuse
561        let create_count_clone = create_count.clone();
562        let conn2 = pool
563            .acquire(move || {
564                let count = create_count_clone.clone();
565                async move {
566                    count.fetch_add(1, Ordering::SeqCst);
567                    Ok(100u32)
568                }
569            })
570            .await
571            .unwrap();
572
573        // Should still be the original connection (value 42), not a new one
574        assert_eq!(*conn2.get(), 42);
575        assert_eq!(create_count.load(Ordering::SeqCst), 1); // Only created once
576
577        let metrics = pool.metrics().await;
578        assert_eq!(metrics.total_created, 1);
579        assert_eq!(metrics.total_acquired, 2);
580    }
581
582    #[tokio::test]
583    async fn test_connection_pool_release() {
584        let config = PoolConfig::default();
585        let pool = ConnectionPool::<u32>::new(config);
586
587        let conn = pool.acquire(|| async { Ok(42) }).await.unwrap();
588        assert_eq!(pool.size().await, 0); // Connection in use, not in pool
589
590        pool.release(conn).await;
591        assert_eq!(pool.size().await, 1); // Connection returned to pool
592
593        let metrics = pool.metrics().await;
594        assert_eq!(metrics.total_released, 1);
595        assert_eq!(metrics.idle_connections, 1);
596    }
597
598    #[tokio::test]
599    async fn test_connection_pool_metrics() {
600        let config = PoolConfig::default();
601        let pool = ConnectionPool::<u32>::new(config);
602
603        // Initial metrics
604        let metrics = pool.metrics().await;
605        assert_eq!(metrics.total_created, 0);
606
607        // Acquire and release
608        let conn = pool.acquire(|| async { Ok(1) }).await.unwrap();
609        pool.release(conn).await;
610
611        let metrics = pool.metrics().await;
612        assert_eq!(metrics.total_created, 1);
613        assert_eq!(metrics.total_acquired, 1);
614        assert_eq!(metrics.total_released, 1);
615    }
616
617    #[tokio::test]
618    async fn test_connection_pool_size() {
619        let config = PoolConfig::default();
620        let pool = ConnectionPool::<u32>::new(config);
621
622        assert_eq!(pool.size().await, 0);
623
624        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
625        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
626
627        // Connections in use, pool is empty
628        assert_eq!(pool.size().await, 0);
629
630        pool.release(conn1).await;
631        assert_eq!(pool.size().await, 1);
632
633        pool.release(conn2).await;
634        assert_eq!(pool.size().await, 2);
635    }
636
637    #[tokio::test]
638    async fn test_connection_pool_multiple_concurrent_acquires() {
639        let config = PoolConfig {
640            max_connections: 10,
641            ..Default::default()
642        };
643        let pool = Arc::new(ConnectionPool::<u32>::new(config));
644
645        let mut handles = vec![];
646        for i in 0..5 {
647            let pool_clone = pool.clone();
648            let handle = tokio::spawn(async move {
649                let conn = pool_clone.acquire(move || async move { Ok(i as u32) }).await.unwrap();
650                // Simulate some work
651                tokio::time::sleep(Duration::from_millis(10)).await;
652                pool_clone.release(conn).await;
653            });
654            handles.push(handle);
655        }
656
657        for handle in handles {
658            handle.await.unwrap();
659        }
660
661        let metrics = pool.metrics().await;
662        assert_eq!(metrics.total_acquired, 5);
663        assert_eq!(metrics.total_released, 5);
664    }
665
666    #[tokio::test]
667    async fn test_connection_pool_acquire_error() {
668        let config = PoolConfig::default();
669        let pool = ConnectionPool::<u32>::new(config);
670
671        let result = pool
672            .acquire(|| async { Err(PoolError::CreateError("test error".to_string())) })
673            .await;
674
675        assert!(result.is_err());
676        if let Err(PoolError::CreateError(msg)) = result {
677            assert_eq!(msg, "test error");
678        }
679    }
680
681    #[tokio::test]
682    async fn test_connection_pool_health_check() {
683        let config = PoolConfig {
684            max_connections: 5,
685            min_idle: 0,
686            health_check_enabled: true,
687            ..Default::default()
688        };
689        let pool = ConnectionPool::<u32>::new(config);
690
691        // Add some connections to pool
692        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
693        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
694        pool.release(conn1).await;
695        pool.release(conn2).await;
696
697        // Health check - all connections pass
698        pool.health_check(|_| async { true }).await;
699
700        assert_eq!(pool.size().await, 2);
701
702        // Health check - all connections fail
703        pool.health_check(|_| async { false }).await;
704
705        assert_eq!(pool.size().await, 0);
706
707        let metrics = pool.metrics().await;
708        assert_eq!(metrics.health_check_failures, 2);
709    }
710
711    #[tokio::test]
712    async fn test_connection_pool_health_check_disabled() {
713        let config = PoolConfig {
714            health_check_enabled: false,
715            ..Default::default()
716        };
717        let pool = ConnectionPool::<u32>::new(config);
718
719        let conn = pool.acquire(|| async { Ok(1) }).await.unwrap();
720        pool.release(conn).await;
721
722        // Health check should do nothing when disabled
723        pool.health_check(|_| async { false }).await;
724
725        // Connection should still be there
726        assert_eq!(pool.size().await, 1);
727    }
728
729    #[tokio::test]
730    async fn test_connection_pool_maintain_idle() {
731        let config = PoolConfig {
732            max_connections: 10,
733            min_idle: 3,
734            ..Default::default()
735        };
736        let pool = ConnectionPool::<u32>::new(config);
737
738        // Pool starts empty
739        assert_eq!(pool.size().await, 0);
740
741        // Maintain idle should create min_idle connections
742        pool.maintain_idle(|| async { Ok(42u32) }).await;
743
744        assert_eq!(pool.size().await, 3);
745
746        let metrics = pool.metrics().await;
747        assert_eq!(metrics.total_created, 3);
748        assert_eq!(metrics.idle_connections, 3);
749    }
750
751    #[tokio::test]
752    async fn test_connection_pool_maintain_idle_already_sufficient() {
753        let config = PoolConfig {
754            max_connections: 10,
755            min_idle: 2,
756            ..Default::default()
757        };
758        let pool = ConnectionPool::<u32>::new(config);
759
760        // Manually add 3 connections
761        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
762        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
763        let conn3 = pool.acquire(|| async { Ok(3) }).await.unwrap();
764        pool.release(conn1).await;
765        pool.release(conn2).await;
766        pool.release(conn3).await;
767
768        let initial_created = pool.metrics().await.total_created;
769
770        // Maintain idle should not create more since we have 3 > min_idle(2)
771        pool.maintain_idle(|| async { Ok(100u32) }).await;
772
773        let final_created = pool.metrics().await.total_created;
774        assert_eq!(initial_created, final_created);
775    }
776
777    #[tokio::test]
778    async fn test_connection_pool_maintain_idle_error() {
779        let config = PoolConfig {
780            max_connections: 10,
781            min_idle: 3,
782            ..Default::default()
783        };
784        let pool = ConnectionPool::<u32>::new(config);
785
786        // maintain_idle with failing create function
787        pool.maintain_idle(|| async { Err(PoolError::CreateError("test".to_string())) })
788            .await;
789
790        // Pool should still be empty
791        assert_eq!(pool.size().await, 0);
792    }
793
794    // HttpClientPool tests
795    #[tokio::test]
796    async fn test_http_client_pool_new() {
797        let config = PoolConfig::default();
798        let pool = HttpClientPool::new_http(config);
799        assert_eq!(pool.size().await, 0);
800    }
801
802    #[tokio::test]
803    async fn test_http_client_pool_acquire() {
804        let config = PoolConfig::default();
805        let pool = HttpClientPool::new_http(config);
806
807        let result = pool.acquire_client().await;
808        assert!(result.is_ok());
809
810        let conn = result.unwrap();
811        // Verify it's a valid reqwest client
812        let _client: &reqwest::Client = conn.get();
813    }
814
815    // Edge cases
816    #[tokio::test]
817    async fn test_connection_pool_stale_connection_not_returned() {
818        let config = PoolConfig {
819            max_connections: 5,
820            min_idle: 0, // Set to 0 so stale connections get closed
821            max_idle_time: Duration::from_millis(1), // Very short idle time
822            ..Default::default()
823        };
824        let pool = ConnectionPool::<u32>::new(config);
825
826        let conn = pool.acquire(|| async { Ok(42) }).await.unwrap();
827
828        // Wait for connection to become stale
829        tokio::time::sleep(Duration::from_millis(10)).await;
830
831        // Release stale connection
832        pool.release(conn).await;
833
834        // Stale connection should be closed, not returned to pool
835        let metrics = pool.metrics().await;
836        assert_eq!(metrics.total_closed, 1);
837    }
838
839    #[tokio::test]
840    async fn test_connection_pool_with_complex_type() {
841        #[derive(Debug, Clone)]
842        struct ComplexConnection {
843            id: u32,
844            data: Vec<String>,
845        }
846
847        let config = PoolConfig::default();
848        let pool = ConnectionPool::<ComplexConnection>::new(config);
849
850        let conn = pool
851            .acquire(|| async {
852                Ok(ComplexConnection {
853                    id: 123,
854                    data: vec!["test".to_string()],
855                })
856            })
857            .await
858            .unwrap();
859
860        assert_eq!(conn.get().id, 123);
861        assert_eq!(conn.get().data, vec!["test".to_string()]);
862    }
863
864    #[tokio::test]
865    async fn test_pooled_connection_updates_last_used() {
866        let mut conn = PooledConnection::new(42u32);
867        let initial_time = conn.last_used;
868
869        // Sleep a tiny bit
870        tokio::time::sleep(Duration::from_millis(1)).await;
871
872        // get_mut should update last_used
873        let _ = conn.get_mut();
874
875        assert!(conn.last_used > initial_time);
876    }
877
878    #[tokio::test]
879    async fn test_connection_pool_partial_health_check() {
880        let config = PoolConfig {
881            max_connections: 10,
882            min_idle: 0,
883            health_check_enabled: true,
884            ..Default::default()
885        };
886        let pool = ConnectionPool::<u32>::new(config);
887
888        // Add connections with different values
889        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
890        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
891        let conn3 = pool.acquire(|| async { Ok(3) }).await.unwrap();
892        pool.release(conn1).await;
893        pool.release(conn2).await;
894        pool.release(conn3).await;
895
896        // Health check that fails only even numbers
897        pool.health_check(|val| {
898            let v = *val;
899            async move { v % 2 != 0 }
900        })
901        .await;
902
903        // Only odd-valued connections should remain
904        assert_eq!(pool.size().await, 2); // 1 and 3
905
906        let metrics = pool.metrics().await;
907        assert_eq!(metrics.health_check_failures, 1); // Connection with value 2
908    }
909}