easy_http_proxy_server/
pool.rs

1//! Connection pool implementation for HTTP proxy
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::net::TcpStream;
7use tokio::sync::Mutex;
8use log::debug;
9
10/// Connection pool for managing reusable TCP connections
11#[derive(Debug)]
12pub struct ConnectionPool {
13    connections: Arc<Mutex<HashMap<String, Vec<(TcpStream, Instant)>>>>,
14    max_idle_time: Duration,
15}
16
17impl ConnectionPool {
18    /// Create a new connection pool
19    pub fn new() -> Self {
20        Self {
21            connections: Arc::new(Mutex::new(HashMap::new())),
22            max_idle_time: Duration::from_secs(30),
23        }
24    }
25
26    /// Create a new connection pool with custom idle timeout
27    pub fn with_idle_timeout(timeout: Duration) -> Self {
28        Self {
29            connections: Arc::new(Mutex::new(HashMap::new())),
30            max_idle_time: timeout,
31        }
32    }
33
34    /// Get a connection from the pool or create a new one
35    pub async fn get_or_create(&self, target_addr: &str) -> Result<TcpStream, std::io::Error> {
36        // Try to get from pool first
37        if let Some(stream) = self.get(target_addr).await {
38            debug!("Reusing connection from pool for {}", target_addr);
39            return Ok(stream);
40        }
41
42        // Create new connection
43        debug!("Creating new connection to {}", target_addr);
44        TcpStream::connect(target_addr).await
45    }
46
47    /// Get a connection from the pool if available
48    pub async fn get(&self, target_addr: &str) -> Option<TcpStream> {
49        let mut pool = self.connections.lock().await;
50        
51        if let Some(connections) = pool.get_mut(target_addr) {
52            // Clean up expired connections
53            let now = Instant::now();
54            connections.retain(|(_, instant)| now.duration_since(*instant) < self.max_idle_time);
55            
56            if let Some((stream, _)) = connections.pop() {
57                return Some(stream);
58            }
59        }
60        
61        None
62    }
63
64    /// Return a connection to the pool
65    pub async fn put(&self, target_addr: String, stream: TcpStream) {
66        let mut pool = self.connections.lock().await;
67        let connections = pool.entry(target_addr).or_default();
68        connections.push((stream, Instant::now()));
69        debug!("Returned connection to pool, total: {}", connections.len());
70    }
71
72    /// Clear all connections from the pool
73    pub async fn clear(&self) {
74        let mut pool = self.connections.lock().await;
75        pool.clear();
76    }
77
78    /// Get the number of connections in the pool
79    pub async fn len(&self) -> usize {
80        let pool = self.connections.lock().await;
81        pool.values().map(|v| v.len()).sum()
82    }
83
84    /// Check if the pool is empty
85    pub async fn is_empty(&self) -> bool {
86        self.len().await == 0
87    }
88}
89
90impl Default for ConnectionPool {
91    fn default() -> Self {
92        Self::new()
93    }
94}