mockforge_core/
connection_pool.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4use tokio::sync::{RwLock, Semaphore};
5use tracing::{debug, warn};
6
7/// Connection pool configuration
8#[derive(Debug, Clone)]
9pub struct PoolConfig {
10    /// Maximum number of connections
11    pub max_connections: usize,
12    /// Minimum number of idle connections to maintain
13    pub min_idle: usize,
14    /// Maximum idle time before connection is closed
15    pub max_idle_time: Duration,
16    /// Connection timeout
17    pub connection_timeout: Duration,
18    /// Enable connection health checks
19    pub health_check_enabled: bool,
20    /// Health check interval
21    pub health_check_interval: Duration,
22}
23
24impl Default for PoolConfig {
25    fn default() -> Self {
26        Self {
27            max_connections: 100,
28            min_idle: 10,
29            max_idle_time: Duration::from_secs(600), // 10 minutes
30            connection_timeout: Duration::from_secs(30),
31            health_check_enabled: true,
32            health_check_interval: Duration::from_secs(60),
33        }
34    }
35}
36
37/// Connection wrapper with metadata
38pub struct PooledConnection<T> {
39    inner: T,
40    created_at: Instant,
41    last_used: Instant,
42}
43
44impl<T> PooledConnection<T> {
45    /// Creates a new pooled connection wrapper
46    pub fn new(connection: T) -> Self {
47        let now = Instant::now();
48        Self {
49            inner: connection,
50            created_at: now,
51            last_used: now,
52        }
53    }
54
55    /// Gets a reference to the underlying connection
56    pub fn get(&self) -> &T {
57        &self.inner
58    }
59
60    /// Gets a mutable reference to the underlying connection and updates last used time
61    pub fn get_mut(&mut self) -> &mut T {
62        self.last_used = Instant::now();
63        &mut self.inner
64    }
65
66    /// Checks if the connection is stale based on idle time
67    pub fn is_stale(&self, max_idle_time: Duration) -> bool {
68        self.last_used.elapsed() > max_idle_time
69    }
70
71    /// Returns the age of the connection since creation
72    pub fn age(&self) -> Duration {
73        self.created_at.elapsed()
74    }
75}
76
77/// Generic connection pool
78pub struct ConnectionPool<T> {
79    config: PoolConfig,
80    available: Arc<RwLock<Vec<PooledConnection<T>>>>,
81    semaphore: Arc<Semaphore>,
82    metrics: Arc<RwLock<PoolMetrics>>,
83}
84
85/// Metrics for connection pool usage and health
86#[derive(Debug, Default, Clone)]
87pub struct PoolMetrics {
88    /// Number of currently active connections
89    pub active_connections: usize,
90    /// Number of idle connections available
91    pub idle_connections: usize,
92    /// Total number of connection acquisitions
93    pub total_acquired: u64,
94    /// Total number of connection releases
95    pub total_released: u64,
96    /// Total number of connections created
97    pub total_created: u64,
98    /// Total number of connections closed
99    pub total_closed: u64,
100    /// Number of acquire timeouts
101    pub acquire_timeouts: u64,
102    /// Number of health check failures
103    pub health_check_failures: u64,
104}
105
106impl<T> ConnectionPool<T>
107where
108    T: Send + 'static,
109{
110    /// Creates a new connection pool with the given configuration
111    pub fn new(config: PoolConfig) -> Self {
112        Self {
113            semaphore: Arc::new(Semaphore::new(config.max_connections)),
114            available: Arc::new(RwLock::new(Vec::with_capacity(config.max_connections))),
115            metrics: Arc::new(RwLock::new(PoolMetrics::default())),
116            config,
117        }
118    }
119
120    /// Acquire a connection from the pool
121    pub async fn acquire<F, Fut>(&self, create_fn: F) -> Result<PooledConnection<T>, PoolError>
122    where
123        F: FnOnce() -> Fut,
124        Fut: std::future::Future<Output = Result<T, PoolError>>,
125    {
126        // Wait for available slot
127        let permit = tokio::time::timeout(
128            self.config.connection_timeout,
129            self.semaphore.clone().acquire_owned(),
130        )
131        .await
132        .map_err(|_| {
133            debug!("Connection pool acquire timeout");
134            PoolError::Timeout
135        })?
136        .map_err(|_| PoolError::Closed)?;
137
138        // Try to get an existing connection
139        let mut available = self.available.write().await;
140
141        // Remove stale connections
142        available.retain(|conn| !conn.is_stale(self.config.max_idle_time));
143
144        let connection = if let Some(mut conn) = available.pop() {
145            // Reuse existing connection
146            conn.last_used = Instant::now();
147            drop(available);
148
149            let mut metrics = self.metrics.write().await;
150            metrics.total_acquired += 1;
151            metrics.active_connections += 1;
152            metrics.idle_connections = metrics.idle_connections.saturating_sub(1);
153            drop(metrics);
154
155            debug!("Reusing pooled connection");
156            conn
157        } else {
158            drop(available);
159
160            // Create new connection
161            let inner = create_fn().await?;
162            let conn = PooledConnection::new(inner);
163
164            let mut metrics = self.metrics.write().await;
165            metrics.total_created += 1;
166            metrics.total_acquired += 1;
167            metrics.active_connections += 1;
168            drop(metrics);
169
170            debug!("Created new pooled connection");
171            conn
172        };
173
174        // Permit will be returned when connection is released
175        std::mem::forget(permit);
176
177        Ok(connection)
178    }
179
180    /// Release a connection back to the pool
181    pub async fn release(&self, connection: PooledConnection<T>) {
182        let mut available = self.available.write().await;
183
184        // Don't return to pool if we're above max idle or connection is stale
185        if available.len() >= self.config.min_idle && connection.is_stale(self.config.max_idle_time)
186        {
187            drop(available);
188
189            let mut metrics = self.metrics.write().await;
190            metrics.total_closed += 1;
191            metrics.active_connections = metrics.active_connections.saturating_sub(1);
192            drop(metrics);
193
194            self.semaphore.add_permits(1);
195            debug!("Closed stale connection");
196            return;
197        }
198
199        available.push(connection);
200        drop(available);
201
202        let mut metrics = self.metrics.write().await;
203        metrics.total_released += 1;
204        metrics.active_connections = metrics.active_connections.saturating_sub(1);
205        metrics.idle_connections += 1;
206        drop(metrics);
207
208        self.semaphore.add_permits(1);
209        debug!("Released connection to pool");
210    }
211
212    /// Get current pool metrics
213    pub async fn metrics(&self) -> PoolMetrics {
214        self.metrics.read().await.clone()
215    }
216
217    /// Get current pool size
218    pub async fn size(&self) -> usize {
219        self.available.read().await.len()
220    }
221
222    /// Run health checks on idle connections
223    pub async fn health_check<F, Fut>(&self, check_fn: F)
224    where
225        F: Fn(&T) -> Fut,
226        Fut: std::future::Future<Output = bool>,
227    {
228        if !self.config.health_check_enabled {
229            return;
230        }
231
232        let mut available = self.available.write().await;
233        let mut healthy = Vec::new();
234        let mut failures = 0;
235
236        for conn in available.drain(..) {
237            if check_fn(conn.get()).await {
238                healthy.push(conn);
239            } else {
240                failures += 1;
241                warn!("Connection failed health check");
242            }
243        }
244
245        *available = healthy;
246        drop(available);
247
248        if failures > 0 {
249            let mut metrics = self.metrics.write().await;
250            metrics.health_check_failures += failures;
251            metrics.total_closed += failures;
252            metrics.idle_connections = metrics.idle_connections.saturating_sub(failures as usize);
253            drop(metrics);
254
255            self.semaphore.add_permits(failures as usize);
256        }
257    }
258
259    /// Maintain minimum idle connections
260    pub async fn maintain_idle<F, Fut>(&self, create_fn: F)
261    where
262        F: Fn() -> Fut,
263        Fut: std::future::Future<Output = Result<T, PoolError>>,
264    {
265        let current_idle = self.available.read().await.len();
266
267        if current_idle < self.config.min_idle {
268            let needed = self.config.min_idle - current_idle;
269
270            for _ in 0..needed {
271                if let Ok(permit) = self.semaphore.clone().try_acquire_owned() {
272                    match create_fn().await {
273                        Ok(conn) => {
274                            let pooled = PooledConnection::new(conn);
275                            self.available.write().await.push(pooled);
276
277                            let mut metrics = self.metrics.write().await;
278                            metrics.total_created += 1;
279                            metrics.idle_connections += 1;
280
281                            std::mem::forget(permit);
282                        }
283                        Err(e) => {
284                            warn!("Failed to create idle connection: {:?}", e);
285                            drop(permit);
286                        }
287                    }
288                } else {
289                    break;
290                }
291            }
292        }
293    }
294}
295
296/// Errors that can occur in connection pool operations
297#[derive(Debug, thiserror::Error)]
298pub enum PoolError {
299    /// Connection acquisition timed out
300    #[error("Connection pool timeout")]
301    Timeout,
302
303    /// Connection pool has been closed
304    #[error("Connection pool closed")]
305    Closed,
306
307    /// Failed to create a new connection
308    #[error("Failed to create connection: {0}")]
309    CreateError(String),
310
311    /// Error during connection operation
312    #[error("Connection error: {0}")]
313    ConnectionError(String),
314}
315
316/// HTTP client connection pool (example usage)
317pub type HttpClientPool = ConnectionPool<reqwest::Client>;
318
319impl HttpClientPool {
320    /// Creates a new HTTP client pool with the given configuration
321    pub fn new_http(config: PoolConfig) -> Self {
322        Self::new(config)
323    }
324
325    /// Acquires an HTTP client from the pool
326    pub async fn acquire_client(&self) -> Result<PooledConnection<reqwest::Client>, PoolError> {
327        self.acquire(|| async {
328            reqwest::Client::builder()
329                .timeout(Duration::from_secs(30))
330                .pool_max_idle_per_host(10)
331                .build()
332                .map_err(|e| PoolError::CreateError(e.to_string()))
333        })
334        .await
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use std::sync::atomic::{AtomicUsize, Ordering};
342
343    // PoolConfig tests
344    #[test]
345    fn test_pool_config_default() {
346        let config = PoolConfig::default();
347        assert_eq!(config.max_connections, 100);
348        assert_eq!(config.min_idle, 10);
349        assert_eq!(config.max_idle_time, Duration::from_secs(600));
350        assert_eq!(config.connection_timeout, Duration::from_secs(30));
351        assert!(config.health_check_enabled);
352        assert_eq!(config.health_check_interval, Duration::from_secs(60));
353    }
354
355    #[test]
356    fn test_pool_config_clone() {
357        let config = PoolConfig {
358            max_connections: 50,
359            min_idle: 5,
360            ..Default::default()
361        };
362        let cloned = config.clone();
363        assert_eq!(cloned.max_connections, config.max_connections);
364        assert_eq!(cloned.min_idle, config.min_idle);
365    }
366
367    #[test]
368    fn test_pool_config_debug() {
369        let config = PoolConfig::default();
370        let debug = format!("{:?}", config);
371        assert!(debug.contains("PoolConfig"));
372        assert!(debug.contains("max_connections"));
373    }
374
375    // PooledConnection tests
376    #[test]
377    fn test_pooled_connection_new() {
378        let conn = PooledConnection::new(42u32);
379        assert_eq!(*conn.get(), 42);
380    }
381
382    #[test]
383    fn test_pooled_connection_get() {
384        let conn = PooledConnection::new("test".to_string());
385        assert_eq!(*conn.get(), "test");
386    }
387
388    #[test]
389    fn test_pooled_connection_get_mut() {
390        let mut conn = PooledConnection::new(vec![1, 2, 3]);
391        conn.get_mut().push(4);
392        assert_eq!(*conn.get(), vec![1, 2, 3, 4]);
393    }
394
395    #[test]
396    fn test_pooled_connection_is_stale() {
397        let conn = PooledConnection::new(42u32);
398        // Just created, should not be stale
399        assert!(!conn.is_stale(Duration::from_secs(1)));
400        // With zero duration, should be stale immediately
401        assert!(conn.is_stale(Duration::from_nanos(0)));
402    }
403
404    #[test]
405    fn test_pooled_connection_age() {
406        let conn = PooledConnection::new(42u32);
407        let age = conn.age();
408        // Should be very small (just created)
409        assert!(age < Duration::from_secs(1));
410    }
411
412    // PoolMetrics tests
413    #[test]
414    fn test_pool_metrics_default() {
415        let metrics = PoolMetrics::default();
416        assert_eq!(metrics.active_connections, 0);
417        assert_eq!(metrics.idle_connections, 0);
418        assert_eq!(metrics.total_acquired, 0);
419        assert_eq!(metrics.total_released, 0);
420        assert_eq!(metrics.total_created, 0);
421        assert_eq!(metrics.total_closed, 0);
422        assert_eq!(metrics.acquire_timeouts, 0);
423        assert_eq!(metrics.health_check_failures, 0);
424    }
425
426    #[test]
427    fn test_pool_metrics_clone() {
428        let mut metrics = PoolMetrics::default();
429        metrics.total_acquired = 10;
430        metrics.active_connections = 5;
431        let cloned = metrics.clone();
432        assert_eq!(cloned.total_acquired, 10);
433        assert_eq!(cloned.active_connections, 5);
434    }
435
436    #[test]
437    fn test_pool_metrics_debug() {
438        let metrics = PoolMetrics::default();
439        let debug = format!("{:?}", metrics);
440        assert!(debug.contains("PoolMetrics"));
441        assert!(debug.contains("active_connections"));
442    }
443
444    // PoolError tests
445    #[test]
446    fn test_pool_error_timeout() {
447        let error = PoolError::Timeout;
448        assert!(error.to_string().contains("timeout"));
449    }
450
451    #[test]
452    fn test_pool_error_closed() {
453        let error = PoolError::Closed;
454        assert!(error.to_string().contains("closed"));
455    }
456
457    #[test]
458    fn test_pool_error_create_error() {
459        let error = PoolError::CreateError("connection failed".to_string());
460        let msg = error.to_string();
461        assert!(msg.contains("create connection"));
462        assert!(msg.contains("connection failed"));
463    }
464
465    #[test]
466    fn test_pool_error_connection_error() {
467        let error = PoolError::ConnectionError("network issue".to_string());
468        let msg = error.to_string();
469        assert!(msg.contains("Connection error"));
470        assert!(msg.contains("network issue"));
471    }
472
473    #[test]
474    fn test_pool_error_debug() {
475        let error = PoolError::Timeout;
476        let debug = format!("{:?}", error);
477        assert!(debug.contains("Timeout"));
478    }
479
480    // ConnectionPool tests
481    #[tokio::test]
482    async fn test_connection_pool() {
483        let config = PoolConfig {
484            max_connections: 5,
485            min_idle: 2,
486            ..Default::default()
487        };
488
489        let pool = ConnectionPool::<u32>::new(config);
490
491        // Acquire connection
492        let conn1 = pool.acquire(|| async { Ok(42) }).await.unwrap();
493
494        assert_eq!(*conn1.get(), 42);
495
496        // Release connection
497        pool.release(conn1).await;
498
499        // Verify metrics
500        let metrics = pool.metrics().await;
501        assert_eq!(metrics.total_created, 1);
502        assert_eq!(metrics.total_acquired, 1);
503        assert_eq!(metrics.total_released, 1);
504    }
505
506    #[tokio::test]
507    async fn test_connection_pool_new() {
508        let config = PoolConfig {
509            max_connections: 10,
510            min_idle: 2,
511            ..Default::default()
512        };
513        let pool = ConnectionPool::<u32>::new(config);
514
515        // Pool should start empty
516        assert_eq!(pool.size().await, 0);
517    }
518
519    #[tokio::test]
520    async fn test_connection_pool_acquire_creates_connection() {
521        let config = PoolConfig::default();
522        let pool = ConnectionPool::<String>::new(config);
523
524        let conn = pool.acquire(|| async { Ok("test-connection".to_string()) }).await.unwrap();
525
526        assert_eq!(*conn.get(), "test-connection");
527
528        let metrics = pool.metrics().await;
529        assert_eq!(metrics.total_created, 1);
530        assert_eq!(metrics.total_acquired, 1);
531    }
532
533    #[tokio::test]
534    async fn test_connection_pool_reuses_connection() {
535        let config = PoolConfig {
536            max_connections: 5,
537            min_idle: 1,
538            ..Default::default()
539        };
540        let pool = ConnectionPool::<u32>::new(config);
541        let create_count = Arc::new(AtomicUsize::new(0));
542
543        // First acquire - creates connection
544        let create_count_clone = create_count.clone();
545        let conn1 = pool
546            .acquire(move || {
547                let count = create_count_clone.clone();
548                async move {
549                    count.fetch_add(1, Ordering::SeqCst);
550                    Ok(42u32)
551                }
552            })
553            .await
554            .unwrap();
555
556        // Release it
557        pool.release(conn1).await;
558
559        // Second acquire - should reuse
560        let create_count_clone = create_count.clone();
561        let conn2 = pool
562            .acquire(move || {
563                let count = create_count_clone.clone();
564                async move {
565                    count.fetch_add(1, Ordering::SeqCst);
566                    Ok(100u32)
567                }
568            })
569            .await
570            .unwrap();
571
572        // Should still be the original connection (value 42), not a new one
573        assert_eq!(*conn2.get(), 42);
574        assert_eq!(create_count.load(Ordering::SeqCst), 1); // Only created once
575
576        let metrics = pool.metrics().await;
577        assert_eq!(metrics.total_created, 1);
578        assert_eq!(metrics.total_acquired, 2);
579    }
580
581    #[tokio::test]
582    async fn test_connection_pool_release() {
583        let config = PoolConfig::default();
584        let pool = ConnectionPool::<u32>::new(config);
585
586        let conn = pool.acquire(|| async { Ok(42) }).await.unwrap();
587        assert_eq!(pool.size().await, 0); // Connection in use, not in pool
588
589        pool.release(conn).await;
590        assert_eq!(pool.size().await, 1); // Connection returned to pool
591
592        let metrics = pool.metrics().await;
593        assert_eq!(metrics.total_released, 1);
594        assert_eq!(metrics.idle_connections, 1);
595    }
596
597    #[tokio::test]
598    async fn test_connection_pool_metrics() {
599        let config = PoolConfig::default();
600        let pool = ConnectionPool::<u32>::new(config);
601
602        // Initial metrics
603        let metrics = pool.metrics().await;
604        assert_eq!(metrics.total_created, 0);
605
606        // Acquire and release
607        let conn = pool.acquire(|| async { Ok(1) }).await.unwrap();
608        pool.release(conn).await;
609
610        let metrics = pool.metrics().await;
611        assert_eq!(metrics.total_created, 1);
612        assert_eq!(metrics.total_acquired, 1);
613        assert_eq!(metrics.total_released, 1);
614    }
615
616    #[tokio::test]
617    async fn test_connection_pool_size() {
618        let config = PoolConfig::default();
619        let pool = ConnectionPool::<u32>::new(config);
620
621        assert_eq!(pool.size().await, 0);
622
623        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
624        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
625
626        // Connections in use, pool is empty
627        assert_eq!(pool.size().await, 0);
628
629        pool.release(conn1).await;
630        assert_eq!(pool.size().await, 1);
631
632        pool.release(conn2).await;
633        assert_eq!(pool.size().await, 2);
634    }
635
636    #[tokio::test]
637    async fn test_connection_pool_multiple_concurrent_acquires() {
638        let config = PoolConfig {
639            max_connections: 10,
640            ..Default::default()
641        };
642        let pool = Arc::new(ConnectionPool::<u32>::new(config));
643
644        let mut handles = vec![];
645        for i in 0..5 {
646            let pool_clone = pool.clone();
647            let handle = tokio::spawn(async move {
648                let conn = pool_clone.acquire(move || async move { Ok(i as u32) }).await.unwrap();
649                // Simulate some work
650                tokio::time::sleep(Duration::from_millis(10)).await;
651                pool_clone.release(conn).await;
652            });
653            handles.push(handle);
654        }
655
656        for handle in handles {
657            handle.await.unwrap();
658        }
659
660        let metrics = pool.metrics().await;
661        assert_eq!(metrics.total_acquired, 5);
662        assert_eq!(metrics.total_released, 5);
663    }
664
665    #[tokio::test]
666    async fn test_connection_pool_acquire_error() {
667        let config = PoolConfig::default();
668        let pool = ConnectionPool::<u32>::new(config);
669
670        let result = pool
671            .acquire(|| async { Err(PoolError::CreateError("test error".to_string())) })
672            .await;
673
674        assert!(result.is_err());
675        if let Err(PoolError::CreateError(msg)) = result {
676            assert_eq!(msg, "test error");
677        }
678    }
679
680    #[tokio::test]
681    async fn test_connection_pool_health_check() {
682        let config = PoolConfig {
683            max_connections: 5,
684            min_idle: 0,
685            health_check_enabled: true,
686            ..Default::default()
687        };
688        let pool = ConnectionPool::<u32>::new(config);
689
690        // Add some connections to pool
691        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
692        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
693        pool.release(conn1).await;
694        pool.release(conn2).await;
695
696        // Health check - all connections pass
697        pool.health_check(|_| async { true }).await;
698
699        assert_eq!(pool.size().await, 2);
700
701        // Health check - all connections fail
702        pool.health_check(|_| async { false }).await;
703
704        assert_eq!(pool.size().await, 0);
705
706        let metrics = pool.metrics().await;
707        assert_eq!(metrics.health_check_failures, 2);
708    }
709
710    #[tokio::test]
711    async fn test_connection_pool_health_check_disabled() {
712        let config = PoolConfig {
713            health_check_enabled: false,
714            ..Default::default()
715        };
716        let pool = ConnectionPool::<u32>::new(config);
717
718        let conn = pool.acquire(|| async { Ok(1) }).await.unwrap();
719        pool.release(conn).await;
720
721        // Health check should do nothing when disabled
722        pool.health_check(|_| async { false }).await;
723
724        // Connection should still be there
725        assert_eq!(pool.size().await, 1);
726    }
727
728    #[tokio::test]
729    async fn test_connection_pool_maintain_idle() {
730        let config = PoolConfig {
731            max_connections: 10,
732            min_idle: 3,
733            ..Default::default()
734        };
735        let pool = ConnectionPool::<u32>::new(config);
736
737        // Pool starts empty
738        assert_eq!(pool.size().await, 0);
739
740        // Maintain idle should create min_idle connections
741        pool.maintain_idle(|| async { Ok(42u32) }).await;
742
743        assert_eq!(pool.size().await, 3);
744
745        let metrics = pool.metrics().await;
746        assert_eq!(metrics.total_created, 3);
747        assert_eq!(metrics.idle_connections, 3);
748    }
749
750    #[tokio::test]
751    async fn test_connection_pool_maintain_idle_already_sufficient() {
752        let config = PoolConfig {
753            max_connections: 10,
754            min_idle: 2,
755            ..Default::default()
756        };
757        let pool = ConnectionPool::<u32>::new(config);
758
759        // Manually add 3 connections
760        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
761        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
762        let conn3 = pool.acquire(|| async { Ok(3) }).await.unwrap();
763        pool.release(conn1).await;
764        pool.release(conn2).await;
765        pool.release(conn3).await;
766
767        let initial_created = pool.metrics().await.total_created;
768
769        // Maintain idle should not create more since we have 3 > min_idle(2)
770        pool.maintain_idle(|| async { Ok(100u32) }).await;
771
772        let final_created = pool.metrics().await.total_created;
773        assert_eq!(initial_created, final_created);
774    }
775
776    #[tokio::test]
777    async fn test_connection_pool_maintain_idle_error() {
778        let config = PoolConfig {
779            max_connections: 10,
780            min_idle: 3,
781            ..Default::default()
782        };
783        let pool = ConnectionPool::<u32>::new(config);
784
785        // maintain_idle with failing create function
786        pool.maintain_idle(|| async { Err(PoolError::CreateError("test".to_string())) })
787            .await;
788
789        // Pool should still be empty
790        assert_eq!(pool.size().await, 0);
791    }
792
793    // HttpClientPool tests
794    #[tokio::test]
795    async fn test_http_client_pool_new() {
796        let config = PoolConfig::default();
797        let pool = HttpClientPool::new_http(config);
798        assert_eq!(pool.size().await, 0);
799    }
800
801    #[tokio::test]
802    async fn test_http_client_pool_acquire() {
803        let config = PoolConfig::default();
804        let pool = HttpClientPool::new_http(config);
805
806        let result = pool.acquire_client().await;
807        assert!(result.is_ok());
808
809        let conn = result.unwrap();
810        // Verify it's a valid reqwest client
811        let _client: &reqwest::Client = conn.get();
812    }
813
814    // Edge cases
815    #[tokio::test]
816    async fn test_connection_pool_stale_connection_not_returned() {
817        let config = PoolConfig {
818            max_connections: 5,
819            min_idle: 0, // Set to 0 so stale connections get closed
820            max_idle_time: Duration::from_millis(1), // Very short idle time
821            ..Default::default()
822        };
823        let pool = ConnectionPool::<u32>::new(config);
824
825        let conn = pool.acquire(|| async { Ok(42) }).await.unwrap();
826
827        // Wait for connection to become stale
828        tokio::time::sleep(Duration::from_millis(10)).await;
829
830        // Release stale connection
831        pool.release(conn).await;
832
833        // Stale connection should be closed, not returned to pool
834        let metrics = pool.metrics().await;
835        assert_eq!(metrics.total_closed, 1);
836    }
837
838    #[tokio::test]
839    async fn test_connection_pool_with_complex_type() {
840        #[derive(Debug, Clone)]
841        struct ComplexConnection {
842            id: u32,
843            data: Vec<String>,
844        }
845
846        let config = PoolConfig::default();
847        let pool = ConnectionPool::<ComplexConnection>::new(config);
848
849        let conn = pool
850            .acquire(|| async {
851                Ok(ComplexConnection {
852                    id: 123,
853                    data: vec!["test".to_string()],
854                })
855            })
856            .await
857            .unwrap();
858
859        assert_eq!(conn.get().id, 123);
860        assert_eq!(conn.get().data, vec!["test".to_string()]);
861    }
862
863    #[tokio::test]
864    async fn test_pooled_connection_updates_last_used() {
865        let mut conn = PooledConnection::new(42u32);
866        let initial_time = conn.last_used;
867
868        // Sleep a tiny bit
869        tokio::time::sleep(Duration::from_millis(1)).await;
870
871        // get_mut should update last_used
872        let _ = conn.get_mut();
873
874        assert!(conn.last_used > initial_time);
875    }
876
877    #[tokio::test]
878    async fn test_connection_pool_partial_health_check() {
879        let config = PoolConfig {
880            max_connections: 10,
881            min_idle: 0,
882            health_check_enabled: true,
883            ..Default::default()
884        };
885        let pool = ConnectionPool::<u32>::new(config);
886
887        // Add connections with different values
888        let conn1 = pool.acquire(|| async { Ok(1) }).await.unwrap();
889        let conn2 = pool.acquire(|| async { Ok(2) }).await.unwrap();
890        let conn3 = pool.acquire(|| async { Ok(3) }).await.unwrap();
891        pool.release(conn1).await;
892        pool.release(conn2).await;
893        pool.release(conn3).await;
894
895        // Health check that fails only even numbers
896        pool.health_check(|val| {
897            let v = *val;
898            async move { v % 2 != 0 }
899        })
900        .await;
901
902        // Only odd-valued connections should remain
903        assert_eq!(pool.size().await, 2); // 1 and 3
904
905        let metrics = pool.metrics().await;
906        assert_eq!(metrics.health_check_failures, 1); // Connection with value 2
907    }
908}