rust_web_server/proxy/pool.rs
1use std::collections::{HashMap, VecDeque};
2use std::net::TcpStream;
3use std::sync::Mutex;
4use std::time::{Duration, Instant};
5
6const DEFAULT_MAX_IDLE: usize = 8;
7const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
8
9/// Per-backend HTTP/1.1 connection pool.
10///
11/// Holds idle `TcpStream` connections keyed by `"host:port"`. When a
12/// backend responds with `Connection: keep-alive` (or the HTTP/1.1 default),
13/// the stream is returned here and reused for the next request to the same
14/// backend, eliminating the TCP-handshake cost and reducing ephemeral-port
15/// exhaustion under load.
16///
17/// # Thread safety
18///
19/// All methods take `&self` and are safe to call from multiple threads.
20/// The inner map is protected by a `Mutex`.
21///
22/// # Example
23///
24/// ```rust,ignore
25/// use std::sync::Arc;
26/// use std::time::Duration;
27/// use rust_web_server::proxy::ConnPool;
28/// use rust_web_server::proxy::ReverseProxy;
29///
30/// let pool = Arc::new(ConnPool::new(16, Duration::from_secs(30)));
31/// let _proxy = ReverseProxy::new(["http://backend:8080"])
32/// .with_pool(Arc::clone(&pool));
33/// ```
34pub struct ConnPool {
35 inner: Mutex<HashMap<String, VecDeque<PoolEntry>>>,
36 max_idle: usize,
37 idle_timeout: Duration,
38}
39
40struct PoolEntry {
41 stream: TcpStream,
42 added: Instant,
43}
44
45impl ConnPool {
46 /// Create a pool with the given per-backend idle limit and idle timeout.
47 pub fn new(max_idle: usize, idle_timeout: Duration) -> Self {
48 ConnPool {
49 inner: Mutex::new(HashMap::new()),
50 max_idle,
51 idle_timeout,
52 }
53 }
54
55 /// Create a pool with defaults: 8 idle connections per backend, 60-second timeout.
56 pub fn new_default() -> Self {
57 Self::new(DEFAULT_MAX_IDLE, DEFAULT_IDLE_TIMEOUT)
58 }
59
60 /// Try to acquire an idle connection for `key = "host:port"`.
61 ///
62 /// Stale entries (older than `idle_timeout`) are discarded automatically.
63 /// Returns `None` if no usable connection is available.
64 pub fn acquire(&self, key: &str) -> Option<TcpStream> {
65 let mut map = self.inner.lock().unwrap_or_else(|e| e.into_inner());
66 let queue = map.get_mut(key)?;
67 let now = Instant::now();
68 while let Some(entry) = queue.pop_front() {
69 if now.duration_since(entry.added) < self.idle_timeout {
70 return Some(entry.stream);
71 }
72 // stale — drop, which closes the TCP connection
73 }
74 None
75 }
76
77 /// Return a keep-alive connection to the pool.
78 ///
79 /// If the backend slot is already at `max_idle`, the stream is dropped
80 /// (the TCP connection closes) rather than exceeding the limit.
81 pub fn release(&self, key: &str, stream: TcpStream) {
82 let mut map = self.inner.lock().unwrap_or_else(|e| e.into_inner());
83 let queue = map.entry(key.to_string()).or_default();
84 if queue.len() < self.max_idle {
85 queue.push_back(PoolEntry { stream, added: Instant::now() });
86 }
87 // over limit — stream dropped here, closing the connection
88 }
89
90 /// Total idle connections across all backends (useful for testing/metrics).
91 pub fn idle_count(&self) -> usize {
92 let map = self.inner.lock().unwrap_or_else(|e| e.into_inner());
93 map.values().map(|q| q.len()).sum()
94 }
95}