microsandbox_network/conn.rs
1//! Connection tracker: manages smoltcp TCP sockets for the poll loop.
2//!
3//! Creates sockets on SYN detection, tracks connection lifecycle, relays data
4//! between smoltcp sockets and proxy task channels, and cleans up closed
5//! connections.
6
7use std::collections::{HashMap, HashSet};
8use std::net::SocketAddr;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11
12use bytes::Bytes;
13use smoltcp::iface::{SocketHandle, SocketSet};
14use smoltcp::socket::tcp;
15use smoltcp::wire::IpListenEndpoint;
16use tokio::sync::mpsc;
17
18//--------------------------------------------------------------------------------------------------
19// Constants
20//--------------------------------------------------------------------------------------------------
21
22/// TCP socket receive buffer size (64 KiB).
23const TCP_RX_BUF_SIZE: usize = 65536;
24
25/// TCP socket transmit buffer size (64 KiB).
26const TCP_TX_BUF_SIZE: usize = 65536;
27
28/// Default max concurrent connections.
29const DEFAULT_MAX_CONNECTIONS: usize = 256;
30
31/// Capacity of the mpsc channels between the poll loop and proxy tasks.
32const CHANNEL_CAPACITY: usize = 32;
33
34/// Buffer size for reading from smoltcp sockets.
35const RELAY_BUF_SIZE: usize = 16384;
36
37//--------------------------------------------------------------------------------------------------
38// Types
39//--------------------------------------------------------------------------------------------------
40
41/// Tracks TCP connections between guest and proxy tasks.
42///
43/// Each guest TCP connection maps to a smoltcp socket and a pair of channels
44/// connecting it to a tokio proxy task. The tracker handles:
45///
46/// - **Socket creation** — on SYN detection, before smoltcp processes the frame.
47/// - **Data relay** — shuttles bytes between smoltcp sockets and channels.
48/// - **Lifecycle detection** — identifies newly-established connections for
49/// proxy spawning.
50/// - **Cleanup** — removes closed sockets from the socket set.
51pub struct ConnectionTracker {
52 /// Active connections keyed by smoltcp socket handle.
53 connections: HashMap<SocketHandle, Connection>,
54 /// Secondary index for O(1) duplicate-SYN detection by (src, dst) 4-tuple.
55 connection_keys: HashSet<(SocketAddr, SocketAddr)>,
56 /// Max concurrent connections (from NetworkConfig).
57 max_connections: usize,
58}
59
60/// Maximum number of poll iterations to attempt flushing remaining data
61/// after the proxy task has exited before force-aborting the socket.
62const DEFERRED_CLOSE_LIMIT: u16 = 64;
63
64/// Internal state for a single tracked TCP connection.
65struct Connection {
66 /// Guest source address (from the guest's SYN).
67 src: SocketAddr,
68 /// Original destination (from the guest's SYN).
69 dst: SocketAddr,
70 /// Sends data from smoltcp socket to proxy task (guest → server).
71 to_proxy: mpsc::Sender<Bytes>,
72 /// Receives data from proxy task to write to smoltcp socket (server → guest).
73 from_proxy: mpsc::Receiver<Bytes>,
74 /// Proxy-side channel ends, held until the connection is ESTABLISHED.
75 /// Taken by [`ConnectionTracker::take_new_connections()`].
76 proxy_channels: Option<ProxyChannels>,
77 /// Whether a proxy task has been spawned for this connection.
78 proxy_spawned: bool,
79 /// Set `true` by the proxy task once its `TcpStream::connect` to
80 /// the upstream succeeds. If the task exits with this still
81 /// `false`, the smoltcp socket is aborted (RST) so the guest
82 /// client gets ECONNREFUSED and happy-eyeballs clients fall back
83 /// to another address family, instead of being stuck on a
84 /// half-open connection that closed cleanly with FIN.
85 upstream_connected: Arc<AtomicBool>,
86 /// Partial data from proxy that couldn't be fully written to smoltcp socket.
87 write_buf: Option<(Bytes, usize)>,
88 /// Data read from smoltcp socket that couldn't be sent to proxy (channel full).
89 /// Must be sent before reading more from the socket to preserve stream order.
90 read_buf: Option<Bytes>,
91 /// Counter for deferred close attempts (prevents stalling forever).
92 close_attempts: u16,
93}
94
95/// Proxy-side channel ends, created at socket creation time and taken when
96/// the connection becomes ESTABLISHED.
97struct ProxyChannels {
98 /// Receive data from smoltcp socket (guest → proxy task).
99 from_smoltcp: mpsc::Receiver<Bytes>,
100 /// Send data to smoltcp socket (proxy task → guest).
101 to_smoltcp: mpsc::Sender<Bytes>,
102}
103
104/// Information for spawning a proxy task for a newly established connection.
105///
106/// Returned by [`ConnectionTracker::take_new_connections()`]. The poll loop
107/// passes this to the proxy task spawner.
108pub struct NewConnection {
109 /// Original destination the guest was connecting to.
110 pub dst: SocketAddr,
111 /// Receive data from smoltcp socket (guest → proxy task).
112 pub from_smoltcp: mpsc::Receiver<Bytes>,
113 /// Send data to smoltcp socket (proxy task → guest).
114 pub to_smoltcp: mpsc::Sender<Bytes>,
115 /// Flag the proxy task flips to `true` after a successful upstream
116 /// connect. Read by the connection tracker on proxy exit to decide
117 /// between FIN (clean close) and RST (upstream never reached).
118 pub upstream_connected: Arc<AtomicBool>,
119}
120
121//--------------------------------------------------------------------------------------------------
122// Methods
123//--------------------------------------------------------------------------------------------------
124
125impl ConnectionTracker {
126 /// Create a new tracker with the given connection limit.
127 pub fn new(max_connections: Option<usize>) -> Self {
128 Self {
129 connections: HashMap::new(),
130 connection_keys: HashSet::new(),
131 max_connections: max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
132 }
133 }
134
135 /// Returns `true` if a tracked socket already exists for this exact
136 /// connection (same source AND destination). O(1) via HashSet lookup.
137 pub fn has_socket_for(&self, src: &SocketAddr, dst: &SocketAddr) -> bool {
138 self.connection_keys.contains(&(*src, *dst))
139 }
140
141 /// Create a smoltcp TCP socket for an incoming SYN and register it.
142 ///
143 /// The socket is put into LISTEN state on the destination IP + port so
144 /// smoltcp will complete the three-way handshake when it processes the
145 /// SYN frame. Binding to the specific destination IP (not just port)
146 /// prevents socket dispatch ambiguity when multiple connections target
147 /// different IPs on the same port.
148 ///
149 /// Returns `false` if at `max_connections` limit.
150 pub fn create_tcp_socket(
151 &mut self,
152 src: SocketAddr,
153 dst: SocketAddr,
154 sockets: &mut SocketSet<'_>,
155 ) -> bool {
156 if self.connections.len() >= self.max_connections {
157 return false;
158 }
159
160 // Create smoltcp TCP socket with buffers.
161 let rx_buf = tcp::SocketBuffer::new(vec![0u8; TCP_RX_BUF_SIZE]);
162 let tx_buf = tcp::SocketBuffer::new(vec![0u8; TCP_TX_BUF_SIZE]);
163 let mut socket = tcp::Socket::new(rx_buf, tx_buf);
164
165 // Listen on the specific destination IP + port. With any_ip mode,
166 // binding to the IP ensures the correct socket accepts each SYN
167 // when multiple connections target the same port on different IPs.
168 let listen_endpoint = IpListenEndpoint {
169 addr: Some(dst.ip().into()),
170 port: dst.port(),
171 };
172 if socket.listen(listen_endpoint).is_err() {
173 return false;
174 }
175
176 let handle = sockets.add(socket);
177
178 // Create channel pairs for proxy task communication.
179 //
180 // smoltcp → proxy (guest sends data, proxy relays to server):
181 let (to_proxy_tx, to_proxy_rx) = mpsc::channel(CHANNEL_CAPACITY);
182 // proxy → smoltcp (server sends data, proxy relays to guest):
183 let (from_proxy_tx, from_proxy_rx) = mpsc::channel(CHANNEL_CAPACITY);
184
185 self.connection_keys.insert((src, dst));
186 self.connections.insert(
187 handle,
188 Connection {
189 src,
190 dst,
191 to_proxy: to_proxy_tx,
192 from_proxy: from_proxy_rx,
193 proxy_channels: Some(ProxyChannels {
194 from_smoltcp: to_proxy_rx,
195 to_smoltcp: from_proxy_tx,
196 }),
197 proxy_spawned: false,
198 upstream_connected: Arc::new(AtomicBool::new(false)),
199 write_buf: None,
200 read_buf: None,
201 close_attempts: 0,
202 },
203 );
204
205 true
206 }
207
208 /// Relay data between smoltcp sockets and proxy task channels.
209 ///
210 /// For each connection with a spawned proxy:
211 /// - Reads data from the smoltcp socket and sends it to the proxy channel.
212 /// - Receives data from the proxy channel and writes it to the smoltcp socket.
213 pub fn relay_data(&mut self, sockets: &mut SocketSet<'_>) {
214 let mut relay_buf = [0u8; RELAY_BUF_SIZE];
215
216 for (&handle, conn) in &mut self.connections {
217 if !conn.proxy_spawned {
218 continue;
219 }
220
221 let socket = sockets.get_mut::<tcp::Socket>(handle);
222
223 // Already torn down (e.g. abort fired on a previous pass).
224 // Leave it for `cleanup_closed` to evict.
225 if matches!(socket.state(), tcp::State::Closed) {
226 continue;
227 }
228
229 // Detect proxy task exit: when the proxy drops its channel
230 // ends, close the smoltcp socket so the guest gets a FIN.
231 //
232 // If the proxy never successfully reached the upstream,
233 // an RST via `abort()` is instead sent so happy-eyeballs
234 // clients fall back to another family instead of committing
235 // to this half-open connection.
236 if conn.to_proxy.is_closed() {
237 if !conn.upstream_connected.load(Ordering::Acquire) {
238 tracing::debug!(
239 src = %conn.src,
240 dst = %conn.dst,
241 "upstream never connected; aborting smoltcp socket (RST to guest)"
242 );
243 socket.abort();
244 continue;
245 }
246 write_proxy_data(socket, conn);
247 if conn.write_buf.is_none() {
248 socket.close();
249 } else {
250 // Abort if we've been trying to flush for too long
251 // (guest stopped reading, socket send buffer full).
252 conn.close_attempts += 1;
253 if conn.close_attempts >= DEFERRED_CLOSE_LIMIT {
254 socket.abort();
255 }
256 }
257 continue;
258 }
259
260 // smoltcp → proxy: flush read_buf first, then read from socket.
261 if let Some(pending) = conn.read_buf.take()
262 && let Err(e) = conn.to_proxy.try_send(pending)
263 {
264 conn.read_buf = Some(e.into_inner());
265 }
266
267 if conn.read_buf.is_none() {
268 while socket.can_recv() {
269 match socket.recv_slice(&mut relay_buf) {
270 Ok(n) if n > 0 => {
271 let data = Bytes::copy_from_slice(&relay_buf[..n]);
272 if let Err(e) = conn.to_proxy.try_send(data) {
273 conn.read_buf = Some(e.into_inner());
274 break;
275 }
276 }
277 _ => break,
278 }
279 }
280 }
281
282 // proxy → smoltcp: write pending data, then drain channel.
283 write_proxy_data(socket, conn);
284 }
285 }
286
287 /// Collect newly-established connections that need proxy tasks.
288 ///
289 /// Returns a list of [`NewConnection`] structs containing the channel ends
290 /// for the proxy task. The poll loop is responsible for spawning the task.
291 pub fn take_new_connections(&mut self, sockets: &mut SocketSet<'_>) -> Vec<NewConnection> {
292 let mut new = Vec::new();
293
294 for (&handle, conn) in &mut self.connections {
295 if conn.proxy_spawned {
296 continue;
297 }
298
299 let socket = sockets.get::<tcp::Socket>(handle);
300 if socket.state() == tcp::State::Established {
301 conn.proxy_spawned = true;
302
303 if let Some(channels) = conn.proxy_channels.take() {
304 new.push(NewConnection {
305 dst: conn.dst,
306 from_smoltcp: channels.from_smoltcp,
307 to_smoltcp: channels.to_smoltcp,
308 upstream_connected: conn.upstream_connected.clone(),
309 });
310 }
311 }
312 }
313
314 new
315 }
316
317 /// Remove closed connections and their sockets.
318 ///
319 /// Only removes sockets in the `Closed` state. Sockets in `TimeWait`
320 /// are left for smoltcp to handle naturally (2*MSL timer), preventing
321 /// delayed duplicate segments from being accepted by a reused port.
322 pub fn cleanup_closed(&mut self, sockets: &mut SocketSet<'_>) {
323 let keys = &mut self.connection_keys;
324 self.connections.retain(|&handle, conn| {
325 let socket = sockets.get::<tcp::Socket>(handle);
326 if matches!(socket.state(), tcp::State::Closed) {
327 keys.remove(&(conn.src, conn.dst));
328 sockets.remove(handle);
329 false
330 } else {
331 true
332 }
333 });
334 }
335}
336
337//--------------------------------------------------------------------------------------------------
338// Functions
339//--------------------------------------------------------------------------------------------------
340
341/// Try to write proxy data to the smoltcp socket.
342fn write_proxy_data(socket: &mut tcp::Socket<'_>, conn: &mut Connection) {
343 // First, try to finish writing any pending partial data.
344 if let Some((data, offset)) = &mut conn.write_buf {
345 if socket.can_send() {
346 match socket.send_slice(&data[*offset..]) {
347 Ok(written) => {
348 *offset += written;
349 if *offset >= data.len() {
350 conn.write_buf = None;
351 }
352 }
353 Err(_) => return,
354 }
355 } else {
356 return;
357 }
358 }
359
360 // Then drain the channel.
361 while conn.write_buf.is_none() {
362 match conn.from_proxy.try_recv() {
363 Ok(data) => {
364 if socket.can_send() {
365 match socket.send_slice(&data) {
366 Ok(written) if written < data.len() => {
367 conn.write_buf = Some((data, written));
368 }
369 Err(_) => {
370 conn.write_buf = Some((data, 0));
371 }
372 _ => {}
373 }
374 } else {
375 conn.write_buf = Some((data, 0));
376 }
377 }
378 Err(_) => break,
379 }
380 }
381}