Skip to main content

rust_web_server/proxy/
pool.rs

1use std::collections::{HashMap, VecDeque};
2use std::net::TcpStream;
3use std::sync::Mutex;
4use std::time::{Duration, Instant};
5
6const DEFAULT_MAX_IDLE: usize = 8;
7const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
8
9/// Per-backend HTTP/1.1 connection pool.
10///
11/// Holds idle `TcpStream` connections keyed by `"host:port"`.  When a
12/// backend responds with `Connection: keep-alive` (or the HTTP/1.1 default),
13/// the stream is returned here and reused for the next request to the same
14/// backend, eliminating the TCP-handshake cost and reducing ephemeral-port
15/// exhaustion under load.
16///
17/// # Thread safety
18///
19/// All methods take `&self` and are safe to call from multiple threads.
20/// The inner map is protected by a `Mutex`.
21///
22/// # Example
23///
24/// ```rust,ignore
25/// use std::sync::Arc;
26/// use std::time::Duration;
27/// use rust_web_server::proxy::ConnPool;
28/// use rust_web_server::proxy::ReverseProxy;
29///
30/// let pool = Arc::new(ConnPool::new(16, Duration::from_secs(30)));
31/// let _proxy = ReverseProxy::new(["http://backend:8080"])
32///     .with_pool(Arc::clone(&pool));
33/// ```
34pub struct ConnPool {
35    inner: Mutex<HashMap<String, VecDeque<PoolEntry>>>,
36    max_idle: usize,
37    idle_timeout: Duration,
38}
39
40struct PoolEntry {
41    stream: TcpStream,
42    added: Instant,
43}
44
45impl ConnPool {
46    /// Create a pool with the given per-backend idle limit and idle timeout.
47    pub fn new(max_idle: usize, idle_timeout: Duration) -> Self {
48        ConnPool {
49            inner: Mutex::new(HashMap::new()),
50            max_idle,
51            idle_timeout,
52        }
53    }
54
55    /// Create a pool with defaults: 8 idle connections per backend, 60-second timeout.
56    pub fn new_default() -> Self {
57        Self::new(DEFAULT_MAX_IDLE, DEFAULT_IDLE_TIMEOUT)
58    }
59
60    /// Try to acquire an idle connection for `key = "host:port"`.
61    ///
62    /// Stale entries (older than `idle_timeout`) are discarded automatically.
63    /// Returns `None` if no usable connection is available.
64    pub fn acquire(&self, key: &str) -> Option<TcpStream> {
65        let mut map = self.inner.lock().unwrap_or_else(|e| e.into_inner());
66        let queue = map.get_mut(key)?;
67        let now = Instant::now();
68        while let Some(entry) = queue.pop_front() {
69            if now.duration_since(entry.added) < self.idle_timeout {
70                return Some(entry.stream);
71            }
72            // stale — drop, which closes the TCP connection
73        }
74        None
75    }
76
77    /// Return a keep-alive connection to the pool.
78    ///
79    /// If the backend slot is already at `max_idle`, the stream is dropped
80    /// (the TCP connection closes) rather than exceeding the limit.
81    pub fn release(&self, key: &str, stream: TcpStream) {
82        let mut map = self.inner.lock().unwrap_or_else(|e| e.into_inner());
83        let queue = map.entry(key.to_string()).or_default();
84        if queue.len() < self.max_idle {
85            queue.push_back(PoolEntry { stream, added: Instant::now() });
86        }
87        // over limit — stream dropped here, closing the connection
88    }
89
90    /// Total idle connections across all backends (useful for testing/metrics).
91    pub fn idle_count(&self) -> usize {
92        let map = self.inner.lock().unwrap_or_else(|e| e.into_inner());
93        map.values().map(|q| q.len()).sum()
94    }
95}