Skip to main content

cloudflare_quick_tunnel/
pool.rs

1//! Bounded idle-TCP-connection pool against `127.0.0.1:<port>`.
2//!
3//! Reduces socket() + connect() overhead per inbound stream by
4//! reusing keep-alive connections to the local origin. Each pooled
5//! entry carries a release timestamp; entries older than `idle_ttl`
6//! are reaped on acquire so we never hand back a connection the
7//! origin has already half-closed by idle timeout.
8//!
9//! The pool only stores connections that are known to be in a
10//! healthy "ready for next request" state — the proxy must
11//! call [`Pool::release`] **only** after fully reading a response
12//! whose framing it understood (Content-Length-bounded).
13
14use std::time::{Duration, Instant};
15
16use tokio::net::TcpStream;
17use tokio::sync::Mutex;
18use tracing::trace;
19
20/// Idle socket TTL — most servers (e.g. axum, hyper, nginx) idle
21/// out keep-alive connections at 60-75s; 30s gives us a safe
22/// buffer and matches the typical client-side default.
23pub const DEFAULT_IDLE_TTL: Duration = Duration::from_secs(30);
24
25/// Soft cap on idle sockets per pool. Beyond this we drop the
26/// freshly-released socket on the floor; the next acquire will
27/// open a new one if needed.
28pub const DEFAULT_MAX_IDLE: usize = 16;
29
30struct Idle {
31    stream: TcpStream,
32    released_at: Instant,
33}
34
35pub struct Pool {
36    port: u16,
37    idle_ttl: Duration,
38    max_idle: usize,
39    idle: Mutex<Vec<Idle>>,
40}
41
42impl Pool {
43    pub fn new(port: u16) -> Self {
44        Self {
45            port,
46            idle_ttl: DEFAULT_IDLE_TTL,
47            max_idle: DEFAULT_MAX_IDLE,
48            idle: Mutex::new(Vec::new()),
49        }
50    }
51
52    /// Acquire a socket: pop a fresh idle entry if available,
53    /// otherwise open a new TCP connection.
54    pub async fn acquire(&self) -> std::io::Result<TcpStream> {
55        // Drain stale entries up-front so callers never see a
56        // stream older than `idle_ttl`. Locking inside the loop
57        // gives the reaper write-access without blocking other
58        // acquires for the connect path.
59        {
60            let mut g = self.idle.lock().await;
61            while let Some(entry) = g.last() {
62                if entry.released_at.elapsed() <= self.idle_ttl {
63                    let entry = g.pop().expect("checked not-empty");
64                    trace!(port = self.port, pool_size = g.len(), "pool hit");
65                    return Ok(entry.stream);
66                }
67                g.pop();
68            }
69        }
70        trace!(port = self.port, "pool miss; opening fresh TCP");
71        TcpStream::connect(("127.0.0.1", self.port)).await
72    }
73
74    /// Return a socket to the pool. Drop on overflow.
75    pub async fn release(&self, stream: TcpStream) {
76        let mut g = self.idle.lock().await;
77        if g.len() >= self.max_idle {
78            trace!(port = self.port, "pool full; dropping released stream");
79            return;
80        }
81        g.push(Idle {
82            stream,
83            released_at: Instant::now(),
84        });
85        trace!(port = self.port, pool_size = g.len(), "pool released");
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use tokio::io::AsyncWriteExt;
93    use tokio::net::TcpListener;
94
95    /// Pool hits should reuse the same socket.
96    #[tokio::test]
97    async fn acquire_after_release_returns_same_stream() {
98        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
99        let port = listener.local_addr().unwrap().port();
100        tokio::spawn(async move {
101            loop {
102                let (mut s, _) = listener.accept().await.unwrap();
103                tokio::spawn(async move {
104                    let _ = s.write_all(b"ping").await;
105                });
106            }
107        });
108
109        let pool = Pool::new(port);
110        let s1 = pool.acquire().await.unwrap();
111        let s1_local = s1.local_addr().unwrap();
112        pool.release(s1).await;
113
114        let s2 = pool.acquire().await.unwrap();
115        assert_eq!(s2.local_addr().unwrap(), s1_local, "should reuse socket");
116    }
117
118    #[tokio::test]
119    async fn pool_evicts_stale_entries() {
120        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
121        let port = listener.local_addr().unwrap().port();
122        tokio::spawn(async move {
123            loop {
124                let _ = listener.accept().await;
125            }
126        });
127        let mut pool = Pool::new(port);
128        pool.idle_ttl = Duration::from_millis(50);
129
130        let s1 = pool.acquire().await.unwrap();
131        let s1_local = s1.local_addr().unwrap();
132        pool.release(s1).await;
133
134        tokio::time::sleep(Duration::from_millis(100)).await;
135        let s2 = pool.acquire().await.unwrap();
136        assert_ne!(
137            s2.local_addr().unwrap(),
138            s1_local,
139            "stale entry should have been evicted"
140        );
141    }
142}