Skip to main content

sozu_lib/
socket.rs

1//! Socket I/O wrappers and TCP option helpers.
2//!
3//! Hosts the `SocketHandler` trait, the `FrontRustls` wrapper that drives
4//! a rustls `ServerConnection` over a `TcpStream`, plus the ancillary
5//! `getsockopt(TCP_INFO)` / TCP-keepalive helpers. The
6//! `FrontRustls::socket_write` / `socket_write_vectored` pair is a known
7//! truncation hot spot — keep the two paths structurally symmetric (see
8//! the per-method `///` invariants).
9
10use std::{
11    io::{ErrorKind, Read, Write},
12    net::SocketAddr,
13};
14
15use mio::net::{TcpListener, TcpStream, UdpSocket};
16use rustls::{ProtocolVersion, ServerConnection};
17use rusty_ulid::Ulid;
18use socket2::{Domain, Protocol, Socket, Type};
19use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::ansi_palette};
20
21use crate::metrics::names;
22
23#[derive(thiserror::Error, Debug)]
24pub enum ServerBindError {
25    #[error("could not set bind to socket: {0}")]
26    BindError(std::io::Error),
27    #[error("could not listen on socket: {0}")]
28    Listen(std::io::Error),
29    #[error("could not set socket to nonblocking: {0}")]
30    SetNonBlocking(std::io::Error),
31    #[error("could not set reuse address: {0}")]
32    SetReuseAddress(std::io::Error),
33    #[error("could not set reuse address: {0}")]
34    SetReusePort(std::io::Error),
35    #[error("Could not create socket: {0}")]
36    SocketCreationError(std::io::Error),
37    #[error("Invalid socket address '{address}': {error}")]
38    InvalidSocketAddress { address: String, error: String },
39}
40
41#[derive(Debug, PartialEq, Eq, Copy, Clone)]
42pub enum SocketResult {
43    Continue,
44    Closed,
45    WouldBlock,
46    Error,
47}
48
49#[derive(Debug, PartialEq, Eq, Copy, Clone)]
50pub enum TransportProtocol {
51    Tcp,
52    Ssl2,
53    Ssl3,
54    Tls1_0,
55    Tls1_1,
56    Tls1_2,
57    Tls1_3,
58}
59
60pub trait SocketHandler {
61    fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult);
62    fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult);
63    fn socket_write_vectored(&mut self, _buf: &[std::io::IoSlice]) -> (usize, SocketResult);
64    fn socket_wants_write(&self) -> bool {
65        false
66    }
67    fn socket_close(&mut self) {}
68    fn socket_ref(&self) -> &TcpStream;
69    fn socket_mut(&mut self) -> &mut TcpStream;
70    fn protocol(&self) -> TransportProtocol;
71    fn read_error(&self);
72    fn write_error(&self);
73    /// Returns the owning connection's session ULID when known. Used by
74    /// [`log_socket_context!`] to render the `[<session_ulid> - - -]` segment
75    /// of the socket-layer log prefix, matching the format used by the
76    /// rest of the mux stack. Returns `None` for contextless implementations
77    /// (e.g. raw `mio::TcpStream`); the macro renders `-` in the ULID slot.
78    fn session_ulid(&self) -> Option<Ulid> {
79        None
80    }
81}
82
83/// Format the socket-layer log prefix `[<session_ulid_or_->]\tSOCKET\tSession(
84/// peer=..., local=..., rtt=..., state=..., protocol=...)\t >>>` for a
85/// [`SocketHandler`] impl that has `self` in scope. When `$self.session_ulid()`
86/// returns `None` (e.g. the raw [`TcpStream`] impl that carries no session
87/// context) the ULID slot is rendered as `-` so the column layout stays
88/// stable across sessionless plumbing. The `[ulid - - -]` context comes first
89/// to stay aligned with `MUX-*`, `PIPE` and `RUSTLS` log lines. Colour scheme
90/// comes from [`sozu_command::logging::ansi_palette`] — single source of
91/// truth for every `log_*_context!` macro in the proxy.
92///
93/// `peer` is a live `getpeername(2)` lookup (this macro is used by
94/// [`FrontRustls`] where the accepted-socket peer is reliable; backend-facing
95/// sockets carry a cache via [`log_socket_module_prefix`]). `local`, `rtt`,
96/// `state` render per [`log_socket_module_prefix`]'s description.
97macro_rules! log_socket_context {
98    ($self:expr) => {{
99        let (open, reset, grey, gray, white) = ansi_palette();
100        let ulid = match $self.session_ulid() {
101            Some(ulid) => ulid.to_string(),
102            None => "-".to_string(),
103        };
104        let snapshot = crate::socket::stats::socket_snapshot($self.socket_ref());
105        let rtt = snapshot.as_ref().map(|s| s.rtt);
106        let state = snapshot.as_ref().map(|s| s.state);
107        format!(
108            "[{ulid} - - -]\t{open}SOCKET{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}local{reset}={white}{local:?}{reset}, {gray}rtt{reset}={white}{rtt:?}{reset}, {gray}state{reset}={white}{state:?}{reset}, {gray}protocol{reset}={white}{protocol:?}{reset})\t >>>",
109            open = open,
110            reset = reset,
111            grey = grey,
112            gray = gray,
113            white = white,
114            ulid = ulid,
115            peer = $self.socket_ref().peer_addr().ok(),
116            local = $self.socket_ref().local_addr().ok(),
117            rtt = rtt,
118            state = state,
119            protocol = $self.protocol(),
120        )
121    }};
122}
123
124/// Module-level socket log prefix used from free functions (e.g. the shared
125/// `tcp_socket_*` helpers) where `self` is not in scope but the caller can
126/// still thread a session `Ulid`, a cached peer address, and the underlying
127/// [`TcpStream`] through as parameters. Renders the same
128/// `[<ulid> - - -]\tSOCKET\tSession(peer=..., local=..., rtt=..., state=..., protocol=Tcp)\t >>>`
129/// prefix as [`log_socket_context!`]; colour scheme via
130/// [`sozu_command::logging::ansi_palette`].
131///
132/// Per-slot semantics:
133///
134/// - `peer` — prefers the caller-supplied `configured_peer` (cached at
135///   [`SessionTcpStream`] construction, immune to ENOTCONN on a socket that
136///   failed an asynchronous `connect()`) and falls back to a live
137///   `getpeername(2)` lookup when no cache was provided.
138/// - `local` — `getsockname(2)`, stays valid across failed connects.
139/// - `rtt` / `state` — a single `getsockopt(TCP_INFO)` call via
140///   [`stats::socket_snapshot`]; both render as `None` on an FSM state
141///   where the kernel rejects the call. `state="SYN_SENT"` is the
142///   clearest signal for a failed outbound `connect()`.
143/// - `protocol` — hardcoded to `Tcp` (raw-TCP helpers only).
144fn log_socket_module_prefix(
145    stream: &TcpStream,
146    session_ulid: Option<Ulid>,
147    configured_peer: Option<SocketAddr>,
148) -> String {
149    let (open, reset, grey, gray, white) = ansi_palette();
150    let ulid = match session_ulid {
151        Some(ulid) => ulid.to_string(),
152        None => "-".to_string(),
153    };
154    let snapshot = crate::socket::stats::socket_snapshot(stream);
155    let rtt = snapshot.as_ref().map(|s| s.rtt);
156    let state = snapshot.as_ref().map(|s| s.state);
157    format!(
158        "[{ulid} - - -]\t{open}SOCKET{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}local{reset}={white}{local:?}{reset}, {gray}rtt{reset}={white}{rtt:?}{reset}, {gray}state{reset}={white}{state:?}{reset}, {gray}protocol{reset}={white}Tcp{reset})\t >>>",
159        peer = configured_peer.or_else(|| stream.peer_addr().ok()),
160        local = stream.local_addr().ok(),
161    )
162}
163
164/// Shared read/write/vectored-write logic used by both
165/// [`impl SocketHandler for TcpStream`] and
166/// [`impl SocketHandler for SessionTcpStream`]. Free-function entry point:
167/// `self` is out of scope here, so error logs use [`log_socket_module_prefix`]
168/// which renders the same `Session(peer, rtt, protocol)` context as
169/// [`log_socket_context!`] by reading from the `stream` + `session_ulid`
170/// parameters threaded through each helper.
171fn tcp_socket_read(
172    stream: &mut TcpStream,
173    buf: &mut [u8],
174    session_ulid: Option<Ulid>,
175    configured_peer: Option<SocketAddr>,
176) -> (usize, SocketResult) {
177    let mut size = 0usize;
178    let mut counter = 0;
179    loop {
180        counter += 1;
181        if counter > MAX_LOOP_ITERATIONS {
182            error!(
183                "{} MAX_LOOP_ITERATION reached in TcpStream::socket_read",
184                log_socket_module_prefix(stream, session_ulid, configured_peer)
185            );
186            incr!(names::socket::READ_INFINITE_LOOP_ERROR);
187            return (size, SocketResult::Error);
188        }
189        // Loop invariant: the running cursor never overshoots the buffer, so the
190        // `&mut buf[size..]` slice below can never panic on a bad offset.
191        debug_assert!(
192            size <= buf.len(),
193            "read cursor {size} overran buffer len {} (would slice out of bounds)",
194            buf.len()
195        );
196        if size == buf.len() {
197            return (size, SocketResult::Continue);
198        }
199        match stream.read(&mut buf[size..]) {
200            Ok(0) => return (size, SocketResult::Closed),
201            Ok(sz) => {
202                // `read` cannot report more bytes than the slice it was given.
203                debug_assert!(
204                    sz <= buf.len() - size,
205                    "read reported {sz} bytes into a {}-byte remaining slice",
206                    buf.len() - size
207                );
208                size += sz;
209            }
210            Err(e) => match e.kind() {
211                ErrorKind::WouldBlock => return (size, SocketResult::WouldBlock),
212                // Treat `ConnectionRefused` as a closed socket, mirroring the
213                // write path. On Linux a failed asynchronous `connect()`
214                // surfaces as `ECONNREFUSED` on the first read; it is
215                // operationally identical to any other benign peer-initiated
216                // close and does not warrant a log line on every backend
217                // that happens to be down.
218                ErrorKind::ConnectionReset
219                | ErrorKind::ConnectionAborted
220                | ErrorKind::BrokenPipe
221                | ErrorKind::ConnectionRefused => return (size, SocketResult::Closed),
222                // Noisy-expected transport failures: backend unreachable,
223                // TCP_USER_TIMEOUT expiry, post-close reads. Keep a log line
224                // so operators can still trend the rate, but `warn!` — this
225                // is reality-at-scale, not a sozu invariant break.
226                ErrorKind::HostUnreachable
227                | ErrorKind::NetworkUnreachable
228                | ErrorKind::TimedOut
229                | ErrorKind::NotConnected => {
230                    warn!(
231                        "{} socket_read error={:?}",
232                        log_socket_module_prefix(stream, session_ulid, configured_peer),
233                        e
234                    );
235                    return (size, SocketResult::Error);
236                }
237                // Genuinely loud variants (`PermissionDenied`, `AddrNotAvailable`,
238                // `InvalidInput`/`Data`, …) and the unknown catch-all stay at
239                // `error!` so operators keep paging on real misconfig.
240                _ => {
241                    error!(
242                        "{} socket_read error={:?}",
243                        log_socket_module_prefix(stream, session_ulid, configured_peer),
244                        e
245                    );
246                    return (size, SocketResult::Error);
247                }
248            },
249        }
250    }
251}
252
253fn tcp_socket_write(
254    stream: &mut TcpStream,
255    buf: &[u8],
256    session_ulid: Option<Ulid>,
257    configured_peer: Option<SocketAddr>,
258) -> (usize, SocketResult) {
259    let mut size = 0usize;
260    let mut counter = 0;
261    loop {
262        counter += 1;
263        if counter > MAX_LOOP_ITERATIONS {
264            error!(
265                "{} MAX_LOOP_ITERATION reached in TcpStream::socket_write",
266                log_socket_module_prefix(stream, session_ulid, configured_peer)
267            );
268            incr!(names::socket::WRITE_INFINITE_LOOP_ERROR);
269            return (size, SocketResult::Error);
270        }
271        // Loop invariant: the cursor never overshoots the buffer, so the
272        // `&buf[size..]` slice below can never panic on a bad offset.
273        debug_assert!(
274            size <= buf.len(),
275            "write cursor {size} overran buffer len {} (would slice out of bounds)",
276            buf.len()
277        );
278        if size == buf.len() {
279            return (size, SocketResult::Continue);
280        }
281        match stream.write(&buf[size..]) {
282            Ok(0) => return (size, SocketResult::Continue),
283            Ok(sz) => {
284                // `write` cannot report more bytes than the slice it was given.
285                debug_assert!(
286                    sz <= buf.len() - size,
287                    "write reported {sz} bytes from a {}-byte remaining slice",
288                    buf.len() - size
289                );
290                size += sz;
291            }
292            Err(e) => match e.kind() {
293                ErrorKind::WouldBlock => return (size, SocketResult::WouldBlock),
294                ErrorKind::ConnectionReset
295                | ErrorKind::ConnectionAborted
296                | ErrorKind::BrokenPipe
297                | ErrorKind::ConnectionRefused => {
298                    incr!(names::tcp::WRITE_ERROR);
299                    return (size, SocketResult::Closed);
300                }
301                // Noisy-expected transport failures (see `tcp_socket_read`
302                // for rationale). Log at `warn!` and still bump the
303                // `tcp.write.error` counter so rate-based dashboards stay
304                // accurate.
305                ErrorKind::HostUnreachable
306                | ErrorKind::NetworkUnreachable
307                | ErrorKind::TimedOut
308                | ErrorKind::NotConnected => {
309                    warn!(
310                        "{} socket_write error={:?}",
311                        log_socket_module_prefix(stream, session_ulid, configured_peer),
312                        e
313                    );
314                    incr!(names::tcp::WRITE_ERROR);
315                    return (size, SocketResult::Error);
316                }
317                _ => {
318                    //FIXME: timeout and other common errors should be sent up
319                    error!(
320                        "{} socket_write error={:?}",
321                        log_socket_module_prefix(stream, session_ulid, configured_peer),
322                        e
323                    );
324                    incr!(names::tcp::WRITE_ERROR);
325                    return (size, SocketResult::Error);
326                }
327            },
328        }
329    }
330}
331
332fn tcp_socket_write_vectored(
333    stream: &mut TcpStream,
334    bufs: &[std::io::IoSlice],
335    session_ulid: Option<Ulid>,
336    configured_peer: Option<SocketAddr>,
337) -> (usize, SocketResult) {
338    match stream.write_vectored(bufs) {
339        Ok(sz) => {
340            // `write_vectored` cannot report more bytes than the slices held.
341            debug_assert!(
342                sz <= bufs.iter().map(|b| b.len()).sum::<usize>(),
343                "write_vectored reported {sz} bytes from {}-byte slices",
344                bufs.iter().map(|b| b.len()).sum::<usize>()
345            );
346            (sz, SocketResult::Continue)
347        }
348        Err(e) => match e.kind() {
349            ErrorKind::WouldBlock => (0, SocketResult::WouldBlock),
350            ErrorKind::ConnectionReset
351            | ErrorKind::ConnectionAborted
352            | ErrorKind::BrokenPipe
353            | ErrorKind::ConnectionRefused => {
354                incr!(names::tcp::WRITE_ERROR);
355                (0, SocketResult::Closed)
356            }
357            // Noisy-expected transport failures (see `tcp_socket_read` for
358            // rationale). Same tiering as the scalar write path.
359            ErrorKind::HostUnreachable
360            | ErrorKind::NetworkUnreachable
361            | ErrorKind::TimedOut
362            | ErrorKind::NotConnected => {
363                warn!(
364                    "{} socket_write error={:?}",
365                    log_socket_module_prefix(stream, session_ulid, configured_peer),
366                    e
367                );
368                incr!(names::tcp::WRITE_ERROR);
369                (0, SocketResult::Error)
370            }
371            _ => {
372                //FIXME: timeout and other common errors should be sent up
373                error!(
374                    "{} socket_write error={:?}",
375                    log_socket_module_prefix(stream, session_ulid, configured_peer),
376                    e
377                );
378                incr!(names::tcp::WRITE_ERROR);
379                (0, SocketResult::Error)
380            }
381        },
382    }
383}
384
385impl SocketHandler for TcpStream {
386    fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult) {
387        tcp_socket_read(self, buf, None, None)
388    }
389
390    fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult) {
391        tcp_socket_write(self, buf, None, None)
392    }
393
394    fn socket_write_vectored(&mut self, bufs: &[std::io::IoSlice]) -> (usize, SocketResult) {
395        tcp_socket_write_vectored(self, bufs, None, None)
396    }
397
398    fn socket_ref(&self) -> &TcpStream {
399        self
400    }
401
402    fn socket_mut(&mut self) -> &mut TcpStream {
403        self
404    }
405
406    fn protocol(&self) -> TransportProtocol {
407        TransportProtocol::Tcp
408    }
409
410    fn read_error(&self) {
411        incr!(names::tcp::READ_ERROR);
412    }
413
414    fn write_error(&self) {
415        incr!(names::tcp::WRITE_ERROR);
416    }
417}
418
419/// [`TcpStream`] wrapped with the owning session's ULID. Exists so plain-TCP
420/// frontends and backends inside the mux stack can prefix SOCKET-layer error
421/// logs with `[<session_ulid> - - -]`, matching what TLS-wrapped frontends
422/// already do via [`FrontRustls::session_ulid`].
423///
424/// The inner [`TcpStream`] is exposed directly so mio registration sites can
425/// borrow it as-is; the outer type only participates in the [`SocketHandler`]
426/// trait dispatch.
427#[derive(Debug)]
428pub struct SessionTcpStream {
429    pub stream: TcpStream,
430    pub session_ulid: Ulid,
431    /// Peer address cached at construction. For backend-facing sockets
432    /// (created from a nonblocking `connect()` in `Router::connect`) this is
433    /// the cluster-configured backend address — reliable across ENOTCONN
434    /// after a failed handshake, which is the sharp case that motivates the
435    /// cache. For frontend-facing sockets constructed from an accepted
436    /// `TcpStream`, this is the client's peer address — identical to what a
437    /// live `getpeername(2)` would return, but threaded through the same
438    /// plumbing for uniformity. Used as the preferred source of truth for
439    /// the `peer=` slot in [`log_socket_module_prefix`], falling back to a
440    /// live lookup when `None`.
441    pub configured_peer: Option<SocketAddr>,
442}
443
444impl SessionTcpStream {
445    pub fn new(stream: TcpStream, session_ulid: Ulid, configured_peer: Option<SocketAddr>) -> Self {
446        Self {
447            stream,
448            session_ulid,
449            configured_peer,
450        }
451    }
452}
453
454impl SocketHandler for SessionTcpStream {
455    fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult) {
456        tcp_socket_read(
457            &mut self.stream,
458            buf,
459            Some(self.session_ulid),
460            self.configured_peer,
461        )
462    }
463
464    fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult) {
465        tcp_socket_write(
466            &mut self.stream,
467            buf,
468            Some(self.session_ulid),
469            self.configured_peer,
470        )
471    }
472
473    fn socket_write_vectored(&mut self, bufs: &[std::io::IoSlice]) -> (usize, SocketResult) {
474        tcp_socket_write_vectored(
475            &mut self.stream,
476            bufs,
477            Some(self.session_ulid),
478            self.configured_peer,
479        )
480    }
481
482    fn socket_ref(&self) -> &TcpStream {
483        &self.stream
484    }
485
486    fn socket_mut(&mut self) -> &mut TcpStream {
487        &mut self.stream
488    }
489
490    fn protocol(&self) -> TransportProtocol {
491        TransportProtocol::Tcp
492    }
493
494    fn read_error(&self) {
495        incr!(names::tcp::READ_ERROR);
496    }
497
498    fn write_error(&self) {
499        incr!(names::tcp::WRITE_ERROR);
500    }
501
502    fn session_ulid(&self) -> Option<Ulid> {
503        Some(self.session_ulid)
504    }
505}
506
507pub struct FrontRustls {
508    pub stream: TcpStream,
509    pub session: ServerConnection,
510    /// Peer sent a graceful FIN on the read side (`read()` returned `Ok(0)`).
511    /// We can no longer receive plaintext, but may still have rustls-buffered
512    /// records to flush on the write side — do NOT abort pending writes.
513    pub peer_disconnected: bool,
514    /// Peer reset the connection (RST/ConnectionAborted/BrokenPipe). The TCP
515    /// channel is dead; further writes are pointless and should short-circuit.
516    pub peer_reset: bool,
517    /// Connection/session ULID propagated from the enclosing mux session.
518    /// Rendered into SOCKET-layer error logs via [`Self::session_ulid`].
519    pub session_ulid: Ulid,
520}
521
522impl std::fmt::Debug for FrontRustls {
523    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
524        f.debug_struct("FrontRustls")
525            .field("stream", &self.stream)
526            .finish_non_exhaustive()
527    }
528}
529
530impl SocketHandler for FrontRustls {
531    fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult) {
532        let mut size = 0usize;
533        let mut can_read = true;
534        let mut is_error = false;
535        let mut is_closed = false;
536
537        let mut counter = 0;
538        loop {
539            counter += 1;
540            if counter > MAX_LOOP_ITERATIONS {
541                error!(
542                    "{} MAX_LOOP_ITERATION reached in FrontRustls::socket_read",
543                    log_socket_context!(self)
544                );
545                incr!(names::rustls::READ_INFINITE_LOOP_ERROR);
546                is_error = true;
547                break;
548            }
549
550            // Loop invariant: the plaintext cursor never overshoots the caller's
551            // buffer, so every `&mut buf[size..]` below is a valid slice.
552            debug_assert!(
553                size <= buf.len(),
554                "rustls read cursor {size} overran buffer len {} (would slice out of bounds)",
555                buf.len()
556            );
557            if size == buf.len() {
558                break;
559            }
560
561            if !can_read | is_error | is_closed {
562                break;
563            }
564
565            match self.session.read_tls(&mut self.stream) {
566                Ok(0) => {
567                    // Graceful FIN on the read side: peer closed its write
568                    // half. Keep `peer_reset` unset so outbound writes can
569                    // still flush rustls's buffered records (half-close).
570                    can_read = false;
571                    is_closed = true;
572                    self.peer_disconnected = true;
573                }
574                Ok(_sz) => {}
575                Err(e) => match e.kind() {
576                    ErrorKind::WouldBlock => {
577                        can_read = false;
578                    }
579                    ErrorKind::ConnectionReset
580                    | ErrorKind::ConnectionAborted
581                    | ErrorKind::BrokenPipe => {
582                        // Full RST/abort: the TCP channel is dead. Mark
583                        // `peer_reset` so writes short-circuit (nothing can
584                        // reach the peer anymore) but still set
585                        // `peer_disconnected` for back-compatible read-side
586                        // logic.
587                        is_closed = true;
588                        self.peer_disconnected = true;
589                        self.peer_reset = true;
590                    }
591                    // https://github.com/rustls/rustls/blob/main/rustls/src/conn.rs#L482-L500
592                    // rustls's 16 KB received_plaintext buffer is full — expected
593                    // under H2 where frame-at-a-time reads drain less than a full
594                    // TLS record. The outer loop will drain plaintext next iteration.
595                    ErrorKind::Other => {}
596                    _ => {
597                        error!(
598                            "{} could not read TLS stream from socket: {:?}",
599                            log_socket_context!(self),
600                            e
601                        );
602                        is_error = true;
603                        break;
604                    }
605                },
606            }
607
608            if let Err(e) = self.session.process_new_packets() {
609                error!(
610                    "{} could not process read TLS packets: {:?}",
611                    log_socket_context!(self),
612                    e
613                );
614                is_error = true;
615                break;
616            }
617
618            while !self.session.wants_read() {
619                match self.session.reader().read(&mut buf[size..]) {
620                    Ok(0) => break,
621                    Ok(sz) => {
622                        // The rustls reader cannot return more plaintext than
623                        // the remaining slice it was handed.
624                        debug_assert!(
625                            sz <= buf.len() - size,
626                            "rustls reader returned {sz} bytes into a {}-byte remaining slice",
627                            buf.len() - size
628                        );
629                        size += sz;
630                    }
631                    Err(e) => match e.kind() {
632                        ErrorKind::WouldBlock => {
633                            break;
634                        }
635                        ErrorKind::ConnectionReset
636                        | ErrorKind::ConnectionAborted
637                        | ErrorKind::BrokenPipe => {
638                            is_closed = true;
639                            break;
640                        }
641                        _ => {
642                            error!(
643                                "{} could not read data from TLS stream: {:?}",
644                                log_socket_context!(self),
645                                e
646                            );
647                            is_error = true;
648                            break;
649                        }
650                    },
651                }
652            }
653        }
654
655        // Post-condition: we never report more plaintext than the caller asked
656        // for, and Error/Closed are mutually exclusive (the loop `break`s on the
657        // first one set, so both can never be true on the same pass).
658        debug_assert!(
659            size <= buf.len(),
660            "rustls socket_read returned {size} bytes for a {}-byte buffer",
661            buf.len()
662        );
663        debug_assert!(
664            !(is_error && is_closed),
665            "rustls socket_read cannot be both Error and Closed"
666        );
667        if is_error {
668            (size, SocketResult::Error)
669        } else if is_closed {
670            (size, SocketResult::Closed)
671        } else if size == buf.len() {
672            // The full requested amount was read (possibly from the rustls
673            // plaintext buffer). Report Continue so the caller keeps
674            // READABLE in the readiness set — there may be more decrypted
675            // data available without a new mio event.
676            (size, SocketResult::Continue)
677        } else if !can_read {
678            (size, SocketResult::WouldBlock)
679        } else {
680            (size, SocketResult::Continue)
681        }
682    }
683
684    /// Keep these two functions structurally symmetric — a divergence
685    /// caused the 4.5 MB H2 truncation bug. Tests
686    /// `e2e::tests::h2_correctness_tests::*` and
687    /// `e2e::tests::h2_tests::test_h2_large_*` are the regression guard.
688    fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult) {
689        // Abort only on a true RST — a FIN on the read side still permits
690        // flushing rustls's plaintext buffer (TLS half-close).
691        if self.peer_reset {
692            return (0, SocketResult::Closed);
693        }
694
695        let mut buffered_size = 0usize;
696        let mut can_write = true;
697        let mut is_error = false;
698        let mut is_closed = false;
699
700        let mut counter = 0;
701        loop {
702            counter += 1;
703            if counter > MAX_LOOP_ITERATIONS {
704                error!(
705                    "{} MAX_LOOP_ITERATION reached in FrontRustls::socket_write",
706                    log_socket_context!(self)
707                );
708                incr!(names::rustls::WRITE_INFINITE_LOOP_ERROR);
709                is_error = true;
710                break;
711            }
712            // Loop invariant: the absorbed-plaintext cursor never overshoots the
713            // caller's buffer, so `&buf[buffered_size..]` is always a valid slice.
714            debug_assert!(
715                buffered_size <= buf.len(),
716                "rustls write cursor {buffered_size} overran buffer len {} (would slice out of bounds)",
717                buf.len()
718            );
719            if buffered_size == buf.len() {
720                break;
721            }
722
723            if !can_write | is_error | is_closed {
724                break;
725            }
726
727            match self.session.writer().write(&buf[buffered_size..]) {
728                Ok(0) => {} // zero byte written means that the Rustls buffers are full, we will try to write on the socket and try again
729                Ok(sz) => {
730                    // rustls cannot absorb more plaintext than the remaining slice.
731                    debug_assert!(
732                        sz <= buf.len() - buffered_size,
733                        "rustls writer absorbed {sz} bytes from a {}-byte remaining slice",
734                        buf.len() - buffered_size
735                    );
736                    buffered_size += sz;
737                }
738                Err(e) => match e.kind() {
739                    ErrorKind::WouldBlock => {
740                        // we don't need to do anything, the session will return false in wants_write?
741                        //error!("rustls socket_write wouldblock");
742                    }
743                    ErrorKind::ConnectionReset
744                    | ErrorKind::ConnectionAborted
745                    | ErrorKind::BrokenPipe => {
746                        //FIXME: this should probably not happen here
747                        incr!(names::rustls::WRITE_ERROR);
748                        is_closed = true;
749                        self.peer_reset = true;
750                        break;
751                    }
752                    _ => {
753                        error!(
754                            "{} could not write data to TLS stream: {:?}",
755                            log_socket_context!(self),
756                            e
757                        );
758                        incr!(names::rustls::WRITE_ERROR);
759                        is_error = true;
760                        break;
761                    }
762                },
763            }
764
765            loop {
766                match self.session.write_tls(&mut self.stream) {
767                    Ok(0) => {
768                        //can_write = false;
769                        break;
770                    }
771                    Ok(_sz) => {}
772                    Err(e) => match e.kind() {
773                        ErrorKind::WouldBlock => {
774                            can_write = false;
775                            break;
776                        }
777                        ErrorKind::ConnectionReset
778                        | ErrorKind::ConnectionAborted
779                        | ErrorKind::BrokenPipe => {
780                            incr!(names::rustls::WRITE_ERROR);
781                            is_closed = true;
782                            self.peer_reset = true;
783                            break;
784                        }
785                        _ => {
786                            error!(
787                                "{} could not write TLS stream to socket: {:?}",
788                                log_socket_context!(self),
789                                e
790                            );
791                            incr!(names::rustls::WRITE_ERROR);
792                            is_error = true;
793                            break;
794                        }
795                    },
796                }
797            }
798        }
799
800        // Flush any pending TLS records even if no application data was written.
801        // This handles the case where h2.rs calls socket_write(&[]) to flush
802        // buffered TLS data (e.g. NewSessionTicket, key updates). Without this,
803        // the main loop above exits immediately for empty buffers and write_tls
804        // is never called.
805        if !is_error && !is_closed && can_write && self.session.wants_write() {
806            loop {
807                match self.session.write_tls(&mut self.stream) {
808                    Ok(0) => break,
809                    Ok(_) => {}
810                    Err(e) => match e.kind() {
811                        ErrorKind::WouldBlock => {
812                            can_write = false;
813                            break;
814                        }
815                        ErrorKind::ConnectionReset
816                        | ErrorKind::ConnectionAborted
817                        | ErrorKind::BrokenPipe => {
818                            incr!(names::rustls::WRITE_ERROR);
819                            is_closed = true;
820                            self.peer_reset = true;
821                            break;
822                        }
823                        _ => {
824                            error!(
825                                "{} could not flush TLS stream to socket: {:?}",
826                                log_socket_context!(self),
827                                e
828                            );
829                            incr!(names::rustls::WRITE_ERROR);
830                            is_error = true;
831                            break;
832                        }
833                    },
834                }
835            }
836        }
837
838        // Post-condition: we never report absorbing more plaintext than the
839        // caller handed us — over-reporting is exactly the truncation-class bug
840        // these two symmetric paths exist to avoid.
841        debug_assert!(
842            buffered_size <= buf.len(),
843            "rustls socket_write reported {buffered_size} bytes for a {}-byte buffer",
844            buf.len()
845        );
846        debug_assert!(
847            !(is_error && is_closed),
848            "rustls socket_write cannot be both Error and Closed"
849        );
850        if is_error {
851            (buffered_size, SocketResult::Error)
852        } else if is_closed {
853            (buffered_size, SocketResult::Closed)
854        } else if !can_write {
855            (buffered_size, SocketResult::WouldBlock)
856        } else {
857            (buffered_size, SocketResult::Continue)
858        }
859    }
860
861    /// Write a list of plaintext slices through the rustls session.
862    ///
863    /// Empty-buffer invariant: callers may legitimately pass `bufs.is_empty()`
864    /// or an all-empty slice to request a pure flush pass. In that case
865    /// `total_len == 0`, the top-of-loop `buffered_size == total_len` guard
866    /// fires immediately after `write_tls` drains any pending TLS records the
867    /// session still has buffered (e.g. the remainder of a record split by
868    /// the previous call, or `close_notify` output). This mirrors
869    /// [`Self::socket_write`]: both entry points must stay structurally
870    /// symmetric so that a zero-byte flush never early-returns without giving
871    /// rustls a chance to emit bytes.
872    ///
873    /// Keep these two functions structurally symmetric — a divergence
874    /// caused the 4.5 MB H2 truncation bug. Tests
875    /// `e2e::tests::h2_correctness_tests::*` and
876    /// `e2e::tests::h2_tests::test_h2_large_*` are the regression guard.
877    fn socket_write_vectored(&mut self, bufs: &[std::io::IoSlice]) -> (usize, SocketResult) {
878        if self.peer_reset {
879            return (0, SocketResult::Closed);
880        }
881
882        let total_len: usize = bufs.iter().map(|b| b.len()).sum();
883        let mut buffered_size = 0usize;
884        let mut can_write = true;
885        let mut is_error = false;
886        let mut is_closed = false;
887
888        let mut counter = 0;
889        loop {
890            counter += 1;
891            if counter > MAX_LOOP_ITERATIONS {
892                error!(
893                    "{} MAX_LOOP_ITERATION reached in FrontRustls::socket_write_vectored",
894                    log_socket_context!(self)
895                );
896                incr!(names::rustls::WRITE_INFINITE_LOOP_ERROR);
897                is_error = true;
898                break;
899            }
900            // Loop invariant: the absorbed-plaintext cursor never overshoots the
901            // summed slice length we computed up front (mirrors the scalar path).
902            debug_assert!(
903                buffered_size <= total_len,
904                "rustls vectored write cursor {buffered_size} overran total slice len {total_len}"
905            );
906            if buffered_size == total_len {
907                break;
908            }
909
910            if !can_write | is_error | is_closed {
911                break;
912            }
913
914            // rustls's Writer does not expose a "write from offset across slices"
915            // helper, so we push plaintext once and then drain via write_tls.
916            // If rustls only partially absorbs the slices, we break and return
917            // the partial count so the caller can advance its buffers and retry.
918            if buffered_size == 0 {
919                match self.session.writer().write_vectored(bufs) {
920                    Ok(0) => {}
921                    Ok(sz) => {
922                        // rustls cannot absorb more plaintext than the slices held.
923                        debug_assert!(
924                            sz <= total_len,
925                            "rustls writer absorbed {sz} bytes from {total_len}-byte slices"
926                        );
927                        buffered_size += sz;
928                    }
929                    Err(e) => match e.kind() {
930                        ErrorKind::WouldBlock => {}
931                        ErrorKind::ConnectionReset
932                        | ErrorKind::ConnectionAborted
933                        | ErrorKind::BrokenPipe => {
934                            incr!(names::rustls::WRITE_ERROR);
935                            is_closed = true;
936                            self.peer_reset = true;
937                            break;
938                        }
939                        _ => {
940                            error!(
941                                "{} could not write data to TLS stream: {:?}",
942                                log_socket_context!(self),
943                                e
944                            );
945                            incr!(names::rustls::WRITE_ERROR);
946                            is_error = true;
947                            break;
948                        }
949                    },
950                }
951            }
952
953            // Plaintext was partially absorbed — we cannot re-call write_vectored
954            // because the IoSlice pointers have not been advanced. Drain whatever
955            // rustls buffered to the socket, then return the partial count so the
956            // caller can consume and retry with adjusted slices.
957            if buffered_size > 0 && buffered_size < total_len {
958                loop {
959                    match self.session.write_tls(&mut self.stream) {
960                        Ok(0) => break,
961                        Ok(_) => {}
962                        Err(e) => match e.kind() {
963                            ErrorKind::WouldBlock => {
964                                can_write = false;
965                                break;
966                            }
967                            ErrorKind::ConnectionReset
968                            | ErrorKind::ConnectionAborted
969                            | ErrorKind::BrokenPipe => {
970                                incr!(names::rustls::WRITE_ERROR);
971                                is_closed = true;
972                                self.peer_reset = true;
973                                break;
974                            }
975                            _ => {
976                                error!(
977                                    "{} could not write TLS stream to socket: {:?}",
978                                    log_socket_context!(self),
979                                    e
980                                );
981                                incr!(names::rustls::WRITE_ERROR);
982                                is_error = true;
983                                break;
984                            }
985                        },
986                    }
987                }
988                break;
989            }
990
991            loop {
992                match self.session.write_tls(&mut self.stream) {
993                    Ok(0) => {
994                        break;
995                    }
996                    Ok(_sz) => {}
997                    Err(e) => match e.kind() {
998                        ErrorKind::WouldBlock => {
999                            can_write = false;
1000                            break;
1001                        }
1002                        ErrorKind::ConnectionReset
1003                        | ErrorKind::ConnectionAborted
1004                        | ErrorKind::BrokenPipe => {
1005                            incr!(names::rustls::WRITE_ERROR);
1006                            is_closed = true;
1007                            self.peer_reset = true;
1008                            break;
1009                        }
1010                        _ => {
1011                            error!(
1012                                "{} could not write TLS stream to socket: {:?}",
1013                                log_socket_context!(self),
1014                                e
1015                            );
1016                            incr!(names::rustls::WRITE_ERROR);
1017                            is_error = true;
1018                            break;
1019                        }
1020                    },
1021                }
1022            }
1023        }
1024
1025        if !is_error && !is_closed && can_write && self.session.wants_write() {
1026            loop {
1027                match self.session.write_tls(&mut self.stream) {
1028                    Ok(0) => break,
1029                    Ok(_) => {}
1030                    Err(e) => match e.kind() {
1031                        ErrorKind::WouldBlock => {
1032                            can_write = false;
1033                            break;
1034                        }
1035                        ErrorKind::ConnectionReset
1036                        | ErrorKind::ConnectionAborted
1037                        | ErrorKind::BrokenPipe => {
1038                            incr!(names::rustls::WRITE_ERROR);
1039                            is_closed = true;
1040                            self.peer_reset = true;
1041                            break;
1042                        }
1043                        _ => {
1044                            error!(
1045                                "{} could not flush TLS stream to socket: {:?}",
1046                                log_socket_context!(self),
1047                                e
1048                            );
1049                            incr!(names::rustls::WRITE_ERROR);
1050                            is_error = true;
1051                            break;
1052                        }
1053                    },
1054                }
1055            }
1056        }
1057
1058        // Post-condition: report no more than the summed slice length, and keep
1059        // Error/Closed mutually exclusive — must stay structurally symmetric with
1060        // `socket_write` (divergence here is the 4.5 MB truncation-class bug).
1061        debug_assert!(
1062            buffered_size <= total_len,
1063            "rustls socket_write_vectored reported {buffered_size} bytes for {total_len}-byte slices"
1064        );
1065        debug_assert!(
1066            !(is_error && is_closed),
1067            "rustls socket_write_vectored cannot be both Error and Closed"
1068        );
1069        if is_error {
1070            (buffered_size, SocketResult::Error)
1071        } else if is_closed {
1072            (buffered_size, SocketResult::Closed)
1073        } else if !can_write {
1074            (buffered_size, SocketResult::WouldBlock)
1075        } else {
1076            (buffered_size, SocketResult::Continue)
1077        }
1078    }
1079
1080    fn socket_close(&mut self) {
1081        self.session.send_close_notify();
1082    }
1083
1084    fn socket_wants_write(&self) -> bool {
1085        // Only a true RST stops us wanting to write — a peer FIN still
1086        // allows flushing TLS plaintext buffered in rustls (half-close).
1087        !self.peer_reset && self.session.wants_write()
1088    }
1089
1090    fn socket_ref(&self) -> &TcpStream {
1091        &self.stream
1092    }
1093
1094    fn socket_mut(&mut self) -> &mut TcpStream {
1095        &mut self.stream
1096    }
1097
1098    fn protocol(&self) -> TransportProtocol {
1099        self.session
1100            .protocol_version()
1101            .map(|version| match version {
1102                ProtocolVersion::SSLv2 => TransportProtocol::Ssl2,
1103                ProtocolVersion::SSLv3 => TransportProtocol::Ssl3,
1104                ProtocolVersion::TLSv1_0 => TransportProtocol::Tls1_0,
1105                ProtocolVersion::TLSv1_1 => TransportProtocol::Tls1_1,
1106                ProtocolVersion::TLSv1_2 => TransportProtocol::Tls1_2,
1107                ProtocolVersion::TLSv1_3 => TransportProtocol::Tls1_3,
1108                _ => TransportProtocol::Tls1_3,
1109            })
1110            .unwrap_or(TransportProtocol::Tcp)
1111    }
1112
1113    fn read_error(&self) {
1114        incr!(names::rustls::READ_ERROR);
1115    }
1116
1117    fn write_error(&self) {
1118        incr!(names::rustls::WRITE_ERROR);
1119    }
1120
1121    fn session_ulid(&self) -> Option<Ulid> {
1122        Some(self.session_ulid)
1123    }
1124}
1125
1126pub fn server_bind(addr: SocketAddr) -> Result<TcpListener, ServerBindError> {
1127    let sock = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
1128        .map_err(ServerBindError::SocketCreationError)?;
1129
1130    // set so_reuseaddr, but only on unix (mirrors what libstd does)
1131    if cfg!(unix) {
1132        sock.set_reuse_address(true)
1133            .map_err(ServerBindError::SetReuseAddress)?;
1134    }
1135
1136    sock.set_reuse_port(true)
1137        .map_err(ServerBindError::SetReusePort)?;
1138
1139    sock.bind(&addr.into())
1140        .map_err(ServerBindError::BindError)?;
1141
1142    sock.set_nonblocking(true)
1143        .map_err(ServerBindError::SetNonBlocking)?;
1144
1145    // listen
1146    // FIXME: make the backlog configurable?
1147    sock.listen(1024).map_err(ServerBindError::Listen)?;
1148
1149    // Post-conditions (invariant violations only — every fallible syscall above
1150    // already returns an error; these `debug_assert!`s catch a flag we *set*
1151    // silently not sticking, which would be our own logic bug, not a syscall
1152    // failure on network input). The getters return `io::Result`; we only
1153    // assert when the kernel answers, degrading to a no-op on the rare query
1154    // failure so we never panic on a dying fd.
1155    if let Ok(nonblocking) = sock.nonblocking() {
1156        debug_assert!(
1157            nonblocking,
1158            "server_bind must return a non-blocking socket (the worker event loop is edge-triggered)"
1159        );
1160    }
1161    // `SO_REUSEPORT` is set on every platform; assert it stuck so a SCM hand-off
1162    // across a hot-upgrade can re-bind the same address.
1163    #[cfg(unix)]
1164    if let Ok(reuse_port) = sock.reuse_port() {
1165        debug_assert!(
1166            reuse_port,
1167            "server_bind must set SO_REUSEPORT so the listener survives a hot-upgrade re-bind"
1168        );
1169    }
1170    // `SO_REUSEADDR` is unix-only here (mirrors libstd).
1171    #[cfg(unix)]
1172    if let Ok(reuse_address) = sock.reuse_address() {
1173        debug_assert!(
1174            reuse_address,
1175            "server_bind must set SO_REUSEADDR on unix (mirrors libstd)"
1176        );
1177    }
1178    // A bound STREAM socket carries a local address in the requested family.
1179    if let Ok(local) = sock.local_addr() {
1180        debug_assert_eq!(
1181            local.is_ipv4(),
1182            addr.is_ipv4(),
1183            "bound socket family must match the requested address family"
1184        );
1185        debug_assert_eq!(
1186            local.is_ipv6(),
1187            addr.is_ipv6(),
1188            "bound socket family must match the requested address family"
1189        );
1190    }
1191
1192    Ok(TcpListener::from_std(sock.into()))
1193}
1194
1195/// Bind a non-blocking UDP listener socket on `addr`.
1196///
1197/// Mirrors [`server_bind`] but for DGRAM: `SO_REUSEADDR` (unix) + `SO_REUSEPORT`
1198/// so the socket can be SCM-passed and re-bound across a hot-upgrade, then
1199/// `bind` + non-blocking. Unlike TCP there is **no `listen()`** — a UDP socket
1200/// receives datagrams directly. The returned `mio::net::UdpSocket` is the one
1201/// listener socket the UDP datapath demuxes many flows over (one-socket-many-
1202/// flows; per-flow return sockets are created by [`udp_connect`]).
1203pub fn udp_bind(addr: SocketAddr) -> Result<UdpSocket, ServerBindError> {
1204    let sock = Socket::new(Domain::for_address(addr), Type::DGRAM, Some(Protocol::UDP))
1205        .map_err(ServerBindError::SocketCreationError)?;
1206
1207    // set so_reuseaddr, but only on unix (mirrors what libstd does)
1208    if cfg!(unix) {
1209        sock.set_reuse_address(true)
1210            .map_err(ServerBindError::SetReuseAddress)?;
1211    }
1212
1213    sock.set_reuse_port(true)
1214        .map_err(ServerBindError::SetReusePort)?;
1215
1216    sock.bind(&addr.into())
1217        .map_err(ServerBindError::BindError)?;
1218
1219    sock.set_nonblocking(true)
1220        .map_err(ServerBindError::SetNonBlocking)?;
1221
1222    // No `listen()` for DGRAM sockets.
1223
1224    // Post-conditions — same rationale as `server_bind`: assert the flags we set
1225    // stuck (logic bug if not), degrading to a no-op when the kernel refuses the
1226    // query so a dying fd never panics. There is deliberately no `listen()`
1227    // check here: DGRAM sockets are never listened on.
1228    if let Ok(nonblocking) = sock.nonblocking() {
1229        debug_assert!(
1230            nonblocking,
1231            "udp_bind must return a non-blocking socket (the worker event loop is edge-triggered)"
1232        );
1233    }
1234    #[cfg(unix)]
1235    if let Ok(reuse_port) = sock.reuse_port() {
1236        debug_assert!(
1237            reuse_port,
1238            "udp_bind must set SO_REUSEPORT so the listener survives a hot-upgrade re-bind"
1239        );
1240    }
1241    #[cfg(unix)]
1242    if let Ok(reuse_address) = sock.reuse_address() {
1243        debug_assert!(
1244            reuse_address,
1245            "udp_bind must set SO_REUSEADDR on unix (mirrors libstd / server_bind)"
1246        );
1247    }
1248    if let Ok(local) = sock.local_addr() {
1249        debug_assert_eq!(
1250            local.is_ipv4(),
1251            addr.is_ipv4(),
1252            "bound UDP socket family must match the requested address family"
1253        );
1254        debug_assert_eq!(
1255            local.is_ipv6(),
1256            addr.is_ipv6(),
1257            "bound UDP socket family must match the requested address family"
1258        );
1259    }
1260
1261    Ok(UdpSocket::from_std(sock.into()))
1262}
1263
1264/// Create a non-blocking **connected** per-flow upstream UDP socket toward
1265/// `backend`.
1266///
1267/// The socket is bound to an ephemeral local port (family matched to the
1268/// backend) and `connect`-ed to the backend address. A connected UDP socket
1269/// "only receives from the connected address" (`connect(2)`), so its fd is the
1270/// symmetric-NAT return-demux key for one flow: the shell registers
1271/// `upstream_token -> FlowId` and feeds anything that arrives on it back into
1272/// the manager as a `BackendDatagram`. `send` (not `send_to`) is then used for
1273/// the forward path. Errors (`EMFILE`/`ENFILE`/connect refusal) bubble up so
1274/// the caller can shed the flow rather than panic.
1275pub fn udp_connect(backend: SocketAddr) -> Result<UdpSocket, ServerBindError> {
1276    let unspecified: SocketAddr = match backend {
1277        SocketAddr::V4(_) => (std::net::Ipv4Addr::UNSPECIFIED, 0).into(),
1278        SocketAddr::V6(_) => (std::net::Ipv6Addr::UNSPECIFIED, 0).into(),
1279    };
1280    // The ephemeral bind address must be in the backend's family with port 0,
1281    // or the subsequent `connect` would mix families / pin a wrong local port.
1282    debug_assert_eq!(
1283        unspecified.is_ipv4(),
1284        backend.is_ipv4(),
1285        "ephemeral bind family must match the backend family"
1286    );
1287    debug_assert_eq!(
1288        unspecified.port(),
1289        0,
1290        "ephemeral bind must use port 0 so the kernel picks the source port"
1291    );
1292    let sock = Socket::new(
1293        Domain::for_address(backend),
1294        Type::DGRAM,
1295        Some(Protocol::UDP),
1296    )
1297    .map_err(ServerBindError::SocketCreationError)?;
1298
1299    sock.bind(&unspecified.into())
1300        .map_err(ServerBindError::BindError)?;
1301    sock.set_nonblocking(true)
1302        .map_err(ServerBindError::SetNonBlocking)?;
1303    // `connect` on a DGRAM socket pins the return 4-tuple; a non-blocking
1304    // connect on UDP completes immediately (no handshake).
1305    sock.connect(&backend.into())
1306        .map_err(ServerBindError::BindError)?;
1307
1308    // Post-conditions — assert the flag/connect state stuck (logic bug if not),
1309    // degrading to a no-op when the kernel refuses the query so a dying fd never
1310    // panics on this network-facing path.
1311    if let Ok(nonblocking) = sock.nonblocking() {
1312        debug_assert!(
1313            nonblocking,
1314            "udp_connect must return a non-blocking socket (the worker event loop is edge-triggered)"
1315        );
1316    }
1317    // The connected return socket's local addr family must match the backend,
1318    // and the kernel must have assigned a concrete source port (no longer 0).
1319    if let Ok(local) = sock.local_addr() {
1320        debug_assert_eq!(
1321            local.is_ipv4(),
1322            backend.is_ipv4(),
1323            "connected UDP socket family must match the backend family"
1324        );
1325        if let Some(local) = local.as_socket() {
1326            debug_assert_ne!(
1327                local.port(),
1328                0,
1329                "connect must bind a concrete ephemeral source port (the return-demux key)"
1330            );
1331        }
1332    }
1333    // `connect` pinned the peer 4-tuple — `getpeername(2)` must echo the backend.
1334    if let Ok(peer) = sock.peer_addr() {
1335        if let Some(peer) = peer.as_socket() {
1336            debug_assert_eq!(
1337                peer, backend,
1338                "connect must pin the peer to the requested backend (symmetric-NAT return-demux key)"
1339            );
1340        }
1341    }
1342
1343    Ok(UdpSocket::from_std(sock.into()))
1344}
1345
1346/// Socket statistics
1347pub mod stats {
1348    use std::{os::fd::AsRawFd, time::Duration};
1349
1350    use internal::{OPT_LEVEL, OPT_NAME, TcpInfo};
1351
1352    /// Point-in-time snapshot of kernel TCP bookkeeping for a socket. Populated
1353    /// from a single `getsockopt(TCP_INFO)` syscall so callers that want both
1354    /// the smoothed RTT and the FSM state don't pay for two trips into the
1355    /// kernel. Field set is deliberately narrow — extend with more `tcp_info`
1356    /// members if the log prefix grows.
1357    #[derive(Clone, Debug)]
1358    pub struct TcpSnapshot {
1359        pub rtt: Duration,
1360        pub state: &'static str,
1361    }
1362
1363    /// Round trip time for a TCP socket. Kept for existing metric callers;
1364    /// log-prefix callers should prefer [`socket_snapshot`] which returns the
1365    /// RTT **and** the TCP FSM state from a single syscall.
1366    pub fn socket_rtt<A: AsRawFd>(socket: &A) -> Option<Duration> {
1367        socket_info(socket.as_raw_fd()).map(|info| Duration::from_micros(info.rtt() as u64))
1368    }
1369
1370    /// Smoothed RTT + human-readable TCP state (`"ESTABLISHED"`, `"SYN_SENT"`,
1371    /// `"CLOSE_WAIT"`, …) pulled from a single `getsockopt(TCP_INFO)` call.
1372    /// Returns `None` when the kernel refuses the call — e.g. the socket has
1373    /// been closed, or the FSM is in a state where `TCP_INFO` is not usable.
1374    /// Safe on dying/refused sockets: the inner syscall's `status != 0`
1375    /// branch is the only failure mode and it degrades to `None`.
1376    pub fn socket_snapshot<A: AsRawFd>(socket: &A) -> Option<TcpSnapshot> {
1377        socket_info(socket.as_raw_fd()).map(|info| TcpSnapshot {
1378            rtt: Duration::from_micros(info.rtt() as u64),
1379            state: info.state(),
1380        })
1381    }
1382
1383    #[cfg(unix)]
1384    pub fn socket_info(fd: libc::c_int) -> Option<TcpInfo> {
1385        // SAFETY: `TcpInfo` is a C POD whose every byte pattern is a legal
1386        // representation; zero-init satisfies `assume_init`'s invariant
1387        // (and `std::mem::zeroed` is the canonical idiom for that).
1388        let mut tcp_info: TcpInfo = unsafe { std::mem::zeroed() };
1389        let struct_len = std::mem::size_of::<TcpInfo>() as libc::socklen_t;
1390        let mut len = struct_len;
1391        // SAFETY: `tcp_info` and `len` are fully initialised above; libc
1392        // reads only `len` bytes through the pointer and writes back the
1393        // resulting length. We check the return value (`status != 0`) to
1394        // distinguish success from validation failure.
1395        let status = unsafe {
1396            libc::getsockopt(
1397                fd,
1398                OPT_LEVEL,
1399                OPT_NAME,
1400                &mut tcp_info as *mut _ as *mut _,
1401                &mut len,
1402            )
1403        };
1404        if status != 0 {
1405            None
1406        } else {
1407            // The kernel writes back the number of bytes it populated. It must
1408            // never claim to have written more than the buffer we handed it —
1409            // that would mean it overran `tcp_info`, an out-of-bounds write we
1410            // could not have detected by the return code alone.
1411            debug_assert!(
1412                len <= struct_len,
1413                "getsockopt(TCP_INFO) wrote back len {len} > struct size {struct_len} (buffer overrun)"
1414            );
1415            Some(tcp_info)
1416        }
1417    }
1418    #[cfg(not(unix))]
1419    pub fn socketinfo(fd: libc::c_int) -> Option<TcpInfo> {
1420        None
1421    }
1422
1423    #[cfg(unix)]
1424    #[cfg(not(any(target_os = "macos", target_os = "ios")))]
1425    mod internal {
1426        #[cfg(target_os = "linux")]
1427        pub const OPT_LEVEL: libc::c_int = libc::SOL_TCP;
1428
1429        #[cfg(any(
1430            target_os = "freebsd",
1431            target_os = "dragonfly",
1432            target_os = "openbsd",
1433            target_os = "netbsd"
1434        ))]
1435        pub const OPT_LEVEL: libc::c_int = libc::IPPROTO_TCP;
1436
1437        pub const OPT_NAME: libc::c_int = libc::TCP_INFO;
1438
1439        #[derive(Clone, Debug)]
1440        #[repr(C)]
1441        pub struct TcpInfo {
1442            // State
1443            tcpi_state: u8,
1444            tcpi_ca_state: u8,
1445            tcpi_retransmits: u8,
1446            tcpi_probes: u8,
1447            tcpi_backoff: u8,
1448            tcpi_options: u8,
1449            tcpi_snd_rcv_wscale: u8, // 4bits|4bits
1450
1451            tcpi_rto: u32,
1452            tcpi_ato: u32,
1453            tcpi_snd_mss: u32,
1454            tcpi_rcv_mss: u32,
1455
1456            tcpi_unacked: u32,
1457            tcpi_sacked: u32,
1458            tcpi_lost: u32,
1459            tcpi_retrans: u32,
1460            tcpi_fackets: u32,
1461
1462            // Times
1463            tcpi_last_data_sent: u32,
1464            tcpi_last_ack_sent: u32, // Not remembered
1465            tcpi_last_data_recv: u32,
1466            tcpi_last_ack_recv: u32,
1467
1468            // Metrics
1469            tcpi_pmtu: u32,
1470            tcpi_rcv_ssthresh: u32,
1471            tcpi_rtt: u32,
1472            tcpi_rttvar: u32,
1473            tcpi_snd_ssthresh: u32,
1474            tcpi_snd_cwnd: u32,
1475            tcpi_advmss: u32,
1476            tcpi_reordering: u32,
1477        }
1478        impl TcpInfo {
1479            pub fn rtt(&self) -> u32 {
1480                self.tcpi_rtt
1481            }
1482
1483            /// Human-readable Linux TCP FSM state. Values follow
1484            /// `include/net/tcp_states.h` (`TCP_ESTABLISHED = 1`,
1485            /// `TCP_SYN_SENT = 2`, …). Anything unexpected falls back to
1486            /// `"UNKNOWN"` rather than panicking — the log prefix is a
1487            /// best-effort diagnostic and must not add failure modes.
1488            pub fn state(&self) -> &'static str {
1489                match self.tcpi_state {
1490                    1 => "ESTABLISHED",
1491                    2 => "SYN_SENT",
1492                    3 => "SYN_RECV",
1493                    4 => "FIN_WAIT1",
1494                    5 => "FIN_WAIT2",
1495                    6 => "TIME_WAIT",
1496                    7 => "CLOSE",
1497                    8 => "CLOSE_WAIT",
1498                    9 => "LAST_ACK",
1499                    10 => "LISTEN",
1500                    11 => "CLOSING",
1501                    12 => "NEW_SYN_RECV",
1502                    _ => "UNKNOWN",
1503                }
1504            }
1505        }
1506    }
1507
1508    #[cfg(unix)]
1509    #[cfg(any(target_os = "macos", target_os = "ios"))]
1510    mod internal {
1511        pub const OPT_LEVEL: libc::c_int = libc::IPPROTO_TCP;
1512        pub const OPT_NAME: libc::c_int = 0x106;
1513
1514        #[derive(Clone, Debug)]
1515        #[repr(C)]
1516        pub struct TcpInfo {
1517            tcpi_state: u8,
1518            tcpi_snd_wscale: u8,
1519            tcpi_rcv_wscale: u8,
1520            __pad1: u8,
1521            tcpi_options: u32,
1522            tcpi_flags: u32,
1523            tcpi_rto: u32,
1524            tcpi_maxseg: u32,
1525            tcpi_snd_ssthresh: u32,
1526            tcpi_snd_cwnd: u32,
1527            tcpi_snd_wnd: u32,
1528            tcpi_snd_sbbytes: u32,
1529            tcpi_rcv_wnd: u32,
1530            tcpi_rttcur: u32,
1531            tcpi_srtt: u32,
1532            tcpi_rttvar: u32,
1533            tcpi_tfo: u32,
1534            tcpi_txpackets: u64,
1535            tcpi_txbytes: u64,
1536            tcpi_txretransmitbytes: u64,
1537            tcpi_rxpackets: u64,
1538            tcpi_rxbytes: u64,
1539            tcpi_rxoutoforderbytes: u64,
1540            tcpi_txretransmitpackets: u64,
1541        }
1542        impl TcpInfo {
1543            pub fn rtt(&self) -> u32 {
1544                // tcpi_srtt is in milliseconds not microseconds
1545                self.tcpi_srtt * 1000
1546            }
1547
1548            /// Human-readable Darwin TCP FSM state. Values follow
1549            /// `netinet/tcp_fsm.h` (`TCPS_CLOSED = 0`, `TCPS_LISTEN = 1`,
1550            /// `TCPS_SYN_SENT = 2`, …). Differs from Linux numbering —
1551            /// macOS counts from 0, Linux from 1.
1552            pub fn state(&self) -> &'static str {
1553                match self.tcpi_state {
1554                    0 => "CLOSED",
1555                    1 => "LISTEN",
1556                    2 => "SYN_SENT",
1557                    3 => "SYN_RECEIVED",
1558                    4 => "ESTABLISHED",
1559                    5 => "CLOSE_WAIT",
1560                    6 => "FIN_WAIT_1",
1561                    7 => "CLOSING",
1562                    8 => "LAST_ACK",
1563                    9 => "FIN_WAIT_2",
1564                    10 => "TIME_WAIT",
1565                    _ => "UNKNOWN",
1566                }
1567            }
1568        }
1569    }
1570
1571    #[cfg(not(unix))]
1572    #[derive(Clone, Debug)]
1573    struct TcpInfo {}
1574
1575    #[test]
1576    #[serial_test::serial]
1577    fn test_rtt() {
1578        let sock = std::net::TcpStream::connect("google.com:80").unwrap();
1579        let fd = sock.as_raw_fd();
1580        let info = socket_info(fd);
1581        assert!(info.is_some());
1582        println!("{info:#?}");
1583        println!(
1584            "rtt: {}",
1585            sozu_command::logging::LogDuration(socket_rtt(&sock))
1586        );
1587    }
1588}