cloudflare_quick_tunnel/
pool.rs1use std::time::{Duration, Instant};
15
16use tokio::net::TcpStream;
17use tokio::sync::Mutex;
18use tracing::trace;
19
20pub const DEFAULT_IDLE_TTL: Duration = Duration::from_secs(30);
24
25pub const DEFAULT_MAX_IDLE: usize = 16;
29
30struct Idle {
31 stream: TcpStream,
32 released_at: Instant,
33}
34
35pub struct Pool {
36 port: u16,
37 idle_ttl: Duration,
38 max_idle: usize,
39 idle: Mutex<Vec<Idle>>,
40}
41
42impl Pool {
43 pub fn new(port: u16) -> Self {
44 Self {
45 port,
46 idle_ttl: DEFAULT_IDLE_TTL,
47 max_idle: DEFAULT_MAX_IDLE,
48 idle: Mutex::new(Vec::new()),
49 }
50 }
51
52 pub async fn acquire(&self) -> std::io::Result<TcpStream> {
55 {
60 let mut g = self.idle.lock().await;
61 while let Some(entry) = g.last() {
62 if entry.released_at.elapsed() <= self.idle_ttl {
63 let entry = g.pop().expect("checked not-empty");
64 trace!(port = self.port, pool_size = g.len(), "pool hit");
65 return Ok(entry.stream);
66 }
67 g.pop();
68 }
69 }
70 trace!(port = self.port, "pool miss; opening fresh TCP");
71 TcpStream::connect(("127.0.0.1", self.port)).await
72 }
73
74 pub async fn release(&self, stream: TcpStream) {
76 let mut g = self.idle.lock().await;
77 if g.len() >= self.max_idle {
78 trace!(port = self.port, "pool full; dropping released stream");
79 return;
80 }
81 g.push(Idle {
82 stream,
83 released_at: Instant::now(),
84 });
85 trace!(port = self.port, pool_size = g.len(), "pool released");
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use tokio::io::AsyncWriteExt;
93 use tokio::net::TcpListener;
94
95 #[tokio::test]
97 async fn acquire_after_release_returns_same_stream() {
98 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
99 let port = listener.local_addr().unwrap().port();
100 tokio::spawn(async move {
101 loop {
102 let (mut s, _) = listener.accept().await.unwrap();
103 tokio::spawn(async move {
104 let _ = s.write_all(b"ping").await;
105 });
106 }
107 });
108
109 let pool = Pool::new(port);
110 let s1 = pool.acquire().await.unwrap();
111 let s1_local = s1.local_addr().unwrap();
112 pool.release(s1).await;
113
114 let s2 = pool.acquire().await.unwrap();
115 assert_eq!(s2.local_addr().unwrap(), s1_local, "should reuse socket");
116 }
117
118 #[tokio::test]
119 async fn pool_evicts_stale_entries() {
120 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
121 let port = listener.local_addr().unwrap().port();
122 tokio::spawn(async move {
123 loop {
124 let _ = listener.accept().await;
125 }
126 });
127 let mut pool = Pool::new(port);
128 pool.idle_ttl = Duration::from_millis(50);
129
130 let s1 = pool.acquire().await.unwrap();
131 let s1_local = s1.local_addr().unwrap();
132 pool.release(s1).await;
133
134 tokio::time::sleep(Duration::from_millis(100)).await;
135 let s2 = pool.acquire().await.unwrap();
136 assert_ne!(
137 s2.local_addr().unwrap(),
138 s1_local,
139 "stale entry should have been evicted"
140 );
141 }
142}