Skip to main content

microsandbox_network/
conn.rs

1//! Connection tracker: manages smoltcp TCP sockets for the poll loop.
2//!
3//! Creates sockets on SYN detection, tracks connection lifecycle, relays data
4//! between smoltcp sockets and proxy task channels, and cleans up closed
5//! connections.
6
7use std::collections::{HashMap, HashSet};
8use std::net::SocketAddr;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11
12use bytes::Bytes;
13use smoltcp::iface::{SocketHandle, SocketSet};
14use smoltcp::socket::tcp;
15use smoltcp::wire::IpListenEndpoint;
16use tokio::sync::mpsc;
17
18//--------------------------------------------------------------------------------------------------
19// Constants
20//--------------------------------------------------------------------------------------------------
21
22/// TCP socket receive buffer size (64 KiB).
23const TCP_RX_BUF_SIZE: usize = 65536;
24
25/// TCP socket transmit buffer size (64 KiB).
26const TCP_TX_BUF_SIZE: usize = 65536;
27
28/// Default max concurrent connections.
29const DEFAULT_MAX_CONNECTIONS: usize = 256;
30
31/// Capacity of the mpsc channels between the poll loop and proxy tasks.
32const CHANNEL_CAPACITY: usize = 32;
33
34/// Buffer size for reading from smoltcp sockets.
35const RELAY_BUF_SIZE: usize = 16384;
36
37//--------------------------------------------------------------------------------------------------
38// Types
39//--------------------------------------------------------------------------------------------------
40
41/// Tracks TCP connections between guest and proxy tasks.
42///
43/// Each guest TCP connection maps to a smoltcp socket and a pair of channels
44/// connecting it to a tokio proxy task. The tracker handles:
45///
46/// - **Socket creation** — on SYN detection, before smoltcp processes the frame.
47/// - **Data relay** — shuttles bytes between smoltcp sockets and channels.
48/// - **Lifecycle detection** — identifies newly-established connections for
49///   proxy spawning.
50/// - **Cleanup** — removes closed sockets from the socket set.
51pub struct ConnectionTracker {
52    /// Active connections keyed by smoltcp socket handle.
53    connections: HashMap<SocketHandle, Connection>,
54    /// Secondary index for O(1) duplicate-SYN detection by (src, dst) 4-tuple.
55    connection_keys: HashSet<(SocketAddr, SocketAddr)>,
56    /// Max concurrent connections (from NetworkConfig).
57    max_connections: usize,
58}
59
60/// Maximum number of poll iterations to attempt flushing remaining data
61/// after the proxy task has exited before force-aborting the socket.
62const DEFERRED_CLOSE_LIMIT: u16 = 64;
63
64/// Internal state for a single tracked TCP connection.
65struct Connection {
66    /// Guest source address (from the guest's SYN).
67    src: SocketAddr,
68    /// Original destination (from the guest's SYN).
69    dst: SocketAddr,
70    /// Sends data from smoltcp socket to proxy task (guest → server).
71    to_proxy: mpsc::Sender<Bytes>,
72    /// Receives data from proxy task to write to smoltcp socket (server → guest).
73    from_proxy: mpsc::Receiver<Bytes>,
74    /// Proxy-side channel ends, held until the connection is ESTABLISHED.
75    /// Taken by [`ConnectionTracker::take_new_connections()`].
76    proxy_channels: Option<ProxyChannels>,
77    /// Whether a proxy task has been spawned for this connection.
78    proxy_spawned: bool,
79    /// Set `true` by the proxy task once its `TcpStream::connect` to
80    /// the upstream succeeds. If the task exits with this still
81    /// `false`, the smoltcp socket is aborted (RST) so the guest
82    /// client gets ECONNREFUSED and happy-eyeballs clients fall back
83    /// to another address family, instead of being stuck on a
84    /// half-open connection that closed cleanly with FIN.
85    upstream_connected: Arc<AtomicBool>,
86    /// Partial data from proxy that couldn't be fully written to smoltcp socket.
87    write_buf: Option<(Bytes, usize)>,
88    /// Data read from smoltcp socket that couldn't be sent to proxy (channel full).
89    /// Must be sent before reading more from the socket to preserve stream order.
90    read_buf: Option<Bytes>,
91    /// Counter for deferred close attempts (prevents stalling forever).
92    close_attempts: u16,
93}
94
95/// Proxy-side channel ends, created at socket creation time and taken when
96/// the connection becomes ESTABLISHED.
97struct ProxyChannels {
98    /// Receive data from smoltcp socket (guest → proxy task).
99    from_smoltcp: mpsc::Receiver<Bytes>,
100    /// Send data to smoltcp socket (proxy task → guest).
101    to_smoltcp: mpsc::Sender<Bytes>,
102}
103
104/// Information for spawning a proxy task for a newly established connection.
105///
106/// Returned by [`ConnectionTracker::take_new_connections()`]. The poll loop
107/// passes this to the proxy task spawner.
108pub struct NewConnection {
109    /// Original destination the guest was connecting to.
110    pub dst: SocketAddr,
111    /// Receive data from smoltcp socket (guest → proxy task).
112    pub from_smoltcp: mpsc::Receiver<Bytes>,
113    /// Send data to smoltcp socket (proxy task → guest).
114    pub to_smoltcp: mpsc::Sender<Bytes>,
115    /// Flag the proxy task flips to `true` after a successful upstream
116    /// connect. Read by the connection tracker on proxy exit to decide
117    /// between FIN (clean close) and RST (upstream never reached).
118    pub upstream_connected: Arc<AtomicBool>,
119}
120
121//--------------------------------------------------------------------------------------------------
122// Methods
123//--------------------------------------------------------------------------------------------------
124
125impl ConnectionTracker {
126    /// Create a new tracker with the given connection limit.
127    pub fn new(max_connections: Option<usize>) -> Self {
128        Self {
129            connections: HashMap::new(),
130            connection_keys: HashSet::new(),
131            max_connections: max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
132        }
133    }
134
135    /// Returns `true` if a tracked socket already exists for this exact
136    /// connection (same source AND destination). O(1) via HashSet lookup.
137    pub fn has_socket_for(&self, src: &SocketAddr, dst: &SocketAddr) -> bool {
138        self.connection_keys.contains(&(*src, *dst))
139    }
140
141    /// Create a smoltcp TCP socket for an incoming SYN and register it.
142    ///
143    /// The socket is put into LISTEN state on the destination IP + port so
144    /// smoltcp will complete the three-way handshake when it processes the
145    /// SYN frame. Binding to the specific destination IP (not just port)
146    /// prevents socket dispatch ambiguity when multiple connections target
147    /// different IPs on the same port.
148    ///
149    /// Returns `false` if at `max_connections` limit.
150    pub fn create_tcp_socket(
151        &mut self,
152        src: SocketAddr,
153        dst: SocketAddr,
154        sockets: &mut SocketSet<'_>,
155    ) -> bool {
156        if self.connections.len() >= self.max_connections {
157            return false;
158        }
159
160        // Create smoltcp TCP socket with buffers.
161        let rx_buf = tcp::SocketBuffer::new(vec![0u8; TCP_RX_BUF_SIZE]);
162        let tx_buf = tcp::SocketBuffer::new(vec![0u8; TCP_TX_BUF_SIZE]);
163        let mut socket = tcp::Socket::new(rx_buf, tx_buf);
164
165        // Listen on the specific destination IP + port. With any_ip mode,
166        // binding to the IP ensures the correct socket accepts each SYN
167        // when multiple connections target the same port on different IPs.
168        let listen_endpoint = IpListenEndpoint {
169            addr: Some(dst.ip().into()),
170            port: dst.port(),
171        };
172        if socket.listen(listen_endpoint).is_err() {
173            return false;
174        }
175
176        let handle = sockets.add(socket);
177
178        // Create channel pairs for proxy task communication.
179        //
180        // smoltcp → proxy (guest sends data, proxy relays to server):
181        let (to_proxy_tx, to_proxy_rx) = mpsc::channel(CHANNEL_CAPACITY);
182        // proxy → smoltcp (server sends data, proxy relays to guest):
183        let (from_proxy_tx, from_proxy_rx) = mpsc::channel(CHANNEL_CAPACITY);
184
185        self.connection_keys.insert((src, dst));
186        self.connections.insert(
187            handle,
188            Connection {
189                src,
190                dst,
191                to_proxy: to_proxy_tx,
192                from_proxy: from_proxy_rx,
193                proxy_channels: Some(ProxyChannels {
194                    from_smoltcp: to_proxy_rx,
195                    to_smoltcp: from_proxy_tx,
196                }),
197                proxy_spawned: false,
198                upstream_connected: Arc::new(AtomicBool::new(false)),
199                write_buf: None,
200                read_buf: None,
201                close_attempts: 0,
202            },
203        );
204
205        true
206    }
207
208    /// Relay data between smoltcp sockets and proxy task channels.
209    ///
210    /// For each connection with a spawned proxy:
211    /// - Reads data from the smoltcp socket and sends it to the proxy channel.
212    /// - Receives data from the proxy channel and writes it to the smoltcp socket.
213    pub fn relay_data(&mut self, sockets: &mut SocketSet<'_>) {
214        let mut relay_buf = [0u8; RELAY_BUF_SIZE];
215
216        for (&handle, conn) in &mut self.connections {
217            if !conn.proxy_spawned {
218                continue;
219            }
220
221            let socket = sockets.get_mut::<tcp::Socket>(handle);
222
223            // Already torn down (e.g. abort fired on a previous pass).
224            // Leave it for `cleanup_closed` to evict.
225            if matches!(socket.state(), tcp::State::Closed) {
226                continue;
227            }
228
229            // Detect proxy task exit: when the proxy drops its channel
230            // ends, close the smoltcp socket so the guest gets a FIN.
231            //
232            // If the proxy never successfully reached the upstream,
233            // an RST via `abort()` is instead sent so happy-eyeballs
234            // clients fall back to another family instead of committing
235            // to this half-open connection.
236            if conn.to_proxy.is_closed() {
237                if !conn.upstream_connected.load(Ordering::Acquire) {
238                    tracing::debug!(
239                        src = %conn.src,
240                        dst = %conn.dst,
241                        "upstream never connected; aborting smoltcp socket (RST to guest)"
242                    );
243                    socket.abort();
244                    continue;
245                }
246                write_proxy_data(socket, conn);
247                if conn.write_buf.is_none() {
248                    socket.close();
249                } else {
250                    // Abort if we've been trying to flush for too long
251                    // (guest stopped reading, socket send buffer full).
252                    conn.close_attempts += 1;
253                    if conn.close_attempts >= DEFERRED_CLOSE_LIMIT {
254                        socket.abort();
255                    }
256                }
257                continue;
258            }
259
260            // smoltcp → proxy: flush read_buf first, then read from socket.
261            if let Some(pending) = conn.read_buf.take()
262                && let Err(e) = conn.to_proxy.try_send(pending)
263            {
264                conn.read_buf = Some(e.into_inner());
265            }
266
267            if conn.read_buf.is_none() {
268                while socket.can_recv() {
269                    match socket.recv_slice(&mut relay_buf) {
270                        Ok(n) if n > 0 => {
271                            let data = Bytes::copy_from_slice(&relay_buf[..n]);
272                            if let Err(e) = conn.to_proxy.try_send(data) {
273                                conn.read_buf = Some(e.into_inner());
274                                break;
275                            }
276                        }
277                        _ => break,
278                    }
279                }
280            }
281
282            // proxy → smoltcp: write pending data, then drain channel.
283            write_proxy_data(socket, conn);
284        }
285    }
286
287    /// Collect newly-established connections that need proxy tasks.
288    ///
289    /// Returns a list of [`NewConnection`] structs containing the channel ends
290    /// for the proxy task. The poll loop is responsible for spawning the task.
291    pub fn take_new_connections(&mut self, sockets: &mut SocketSet<'_>) -> Vec<NewConnection> {
292        let mut new = Vec::new();
293
294        for (&handle, conn) in &mut self.connections {
295            if conn.proxy_spawned {
296                continue;
297            }
298
299            let socket = sockets.get::<tcp::Socket>(handle);
300            if socket.state() == tcp::State::Established {
301                conn.proxy_spawned = true;
302
303                if let Some(channels) = conn.proxy_channels.take() {
304                    new.push(NewConnection {
305                        dst: conn.dst,
306                        from_smoltcp: channels.from_smoltcp,
307                        to_smoltcp: channels.to_smoltcp,
308                        upstream_connected: conn.upstream_connected.clone(),
309                    });
310                }
311            }
312        }
313
314        new
315    }
316
317    /// Remove closed connections and their sockets.
318    ///
319    /// Only removes sockets in the `Closed` state. Sockets in `TimeWait`
320    /// are left for smoltcp to handle naturally (2*MSL timer), preventing
321    /// delayed duplicate segments from being accepted by a reused port.
322    pub fn cleanup_closed(&mut self, sockets: &mut SocketSet<'_>) {
323        let keys = &mut self.connection_keys;
324        self.connections.retain(|&handle, conn| {
325            let socket = sockets.get::<tcp::Socket>(handle);
326            if matches!(socket.state(), tcp::State::Closed) {
327                keys.remove(&(conn.src, conn.dst));
328                sockets.remove(handle);
329                false
330            } else {
331                true
332            }
333        });
334    }
335}
336
337//--------------------------------------------------------------------------------------------------
338// Functions
339//--------------------------------------------------------------------------------------------------
340
341/// Try to write proxy data to the smoltcp socket.
342fn write_proxy_data(socket: &mut tcp::Socket<'_>, conn: &mut Connection) {
343    // First, try to finish writing any pending partial data.
344    if let Some((data, offset)) = &mut conn.write_buf {
345        if socket.can_send() {
346            match socket.send_slice(&data[*offset..]) {
347                Ok(written) => {
348                    *offset += written;
349                    if *offset >= data.len() {
350                        conn.write_buf = None;
351                    }
352                }
353                Err(_) => return,
354            }
355        } else {
356            return;
357        }
358    }
359
360    // Then drain the channel.
361    while conn.write_buf.is_none() {
362        match conn.from_proxy.try_recv() {
363            Ok(data) => {
364                if socket.can_send() {
365                    match socket.send_slice(&data) {
366                        Ok(written) if written < data.len() => {
367                            conn.write_buf = Some((data, written));
368                        }
369                        Err(_) => {
370                            conn.write_buf = Some((data, 0));
371                        }
372                        _ => {}
373                    }
374                } else {
375                    conn.write_buf = Some((data, 0));
376                }
377            }
378            Err(_) => break,
379        }
380    }
381}