sozu_lib/socket.rs
1//! Socket I/O wrappers and TCP option helpers.
2//!
3//! Hosts the `SocketHandler` trait, the `FrontRustls` wrapper that drives
4//! a rustls `ServerConnection` over a `TcpStream`, plus the ancillary
5//! `getsockopt(TCP_INFO)` / TCP-keepalive helpers. The
6//! `FrontRustls::socket_write` / `socket_write_vectored` pair is a known
7//! truncation hot spot — keep the two paths structurally symmetric (see
8//! the per-method `///` invariants).
9
10use std::{
11 io::{ErrorKind, Read, Write},
12 net::SocketAddr,
13};
14
15use mio::net::{TcpListener, TcpStream, UdpSocket};
16use rustls::{ProtocolVersion, ServerConnection};
17use rusty_ulid::Ulid;
18use socket2::{Domain, Protocol, Socket, Type};
19use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::ansi_palette};
20
21use crate::metrics::names;
22
23#[derive(thiserror::Error, Debug)]
24pub enum ServerBindError {
25 #[error("could not set bind to socket: {0}")]
26 BindError(std::io::Error),
27 #[error("could not listen on socket: {0}")]
28 Listen(std::io::Error),
29 #[error("could not set socket to nonblocking: {0}")]
30 SetNonBlocking(std::io::Error),
31 #[error("could not set reuse address: {0}")]
32 SetReuseAddress(std::io::Error),
33 #[error("could not set reuse address: {0}")]
34 SetReusePort(std::io::Error),
35 #[error("Could not create socket: {0}")]
36 SocketCreationError(std::io::Error),
37 #[error("Invalid socket address '{address}': {error}")]
38 InvalidSocketAddress { address: String, error: String },
39}
40
41#[derive(Debug, PartialEq, Eq, Copy, Clone)]
42pub enum SocketResult {
43 Continue,
44 Closed,
45 WouldBlock,
46 Error,
47}
48
49#[derive(Debug, PartialEq, Eq, Copy, Clone)]
50pub enum TransportProtocol {
51 Tcp,
52 Ssl2,
53 Ssl3,
54 Tls1_0,
55 Tls1_1,
56 Tls1_2,
57 Tls1_3,
58}
59
60pub trait SocketHandler {
61 fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult);
62 fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult);
63 fn socket_write_vectored(&mut self, _buf: &[std::io::IoSlice]) -> (usize, SocketResult);
64 fn socket_wants_write(&self) -> bool {
65 false
66 }
67 fn socket_close(&mut self) {}
68 fn socket_ref(&self) -> &TcpStream;
69 fn socket_mut(&mut self) -> &mut TcpStream;
70 fn protocol(&self) -> TransportProtocol;
71 fn read_error(&self);
72 fn write_error(&self);
73 /// Returns the owning connection's session ULID when known. Used by
74 /// [`log_socket_context!`] to render the `[<session_ulid> - - -]` segment
75 /// of the socket-layer log prefix, matching the format used by the
76 /// rest of the mux stack. Returns `None` for contextless implementations
77 /// (e.g. raw `mio::TcpStream`); the macro renders `-` in the ULID slot.
78 fn session_ulid(&self) -> Option<Ulid> {
79 None
80 }
81}
82
83/// Format the socket-layer log prefix `[<session_ulid_or_->]\tSOCKET\tSession(
84/// peer=..., local=..., rtt=..., state=..., protocol=...)\t >>>` for a
85/// [`SocketHandler`] impl that has `self` in scope. When `$self.session_ulid()`
86/// returns `None` (e.g. the raw [`TcpStream`] impl that carries no session
87/// context) the ULID slot is rendered as `-` so the column layout stays
88/// stable across sessionless plumbing. The `[ulid - - -]` context comes first
89/// to stay aligned with `MUX-*`, `PIPE` and `RUSTLS` log lines. Colour scheme
90/// comes from [`sozu_command::logging::ansi_palette`] — single source of
91/// truth for every `log_*_context!` macro in the proxy.
92///
93/// `peer` is a live `getpeername(2)` lookup (this macro is used by
94/// [`FrontRustls`] where the accepted-socket peer is reliable; backend-facing
95/// sockets carry a cache via [`log_socket_module_prefix`]). `local`, `rtt`,
96/// `state` render per [`log_socket_module_prefix`]'s description.
97macro_rules! log_socket_context {
98 ($self:expr) => {{
99 let (open, reset, grey, gray, white) = ansi_palette();
100 let ulid = match $self.session_ulid() {
101 Some(ulid) => ulid.to_string(),
102 None => "-".to_string(),
103 };
104 let snapshot = crate::socket::stats::socket_snapshot($self.socket_ref());
105 let rtt = snapshot.as_ref().map(|s| s.rtt);
106 let state = snapshot.as_ref().map(|s| s.state);
107 format!(
108 "[{ulid} - - -]\t{open}SOCKET{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}local{reset}={white}{local:?}{reset}, {gray}rtt{reset}={white}{rtt:?}{reset}, {gray}state{reset}={white}{state:?}{reset}, {gray}protocol{reset}={white}{protocol:?}{reset})\t >>>",
109 open = open,
110 reset = reset,
111 grey = grey,
112 gray = gray,
113 white = white,
114 ulid = ulid,
115 peer = $self.socket_ref().peer_addr().ok(),
116 local = $self.socket_ref().local_addr().ok(),
117 rtt = rtt,
118 state = state,
119 protocol = $self.protocol(),
120 )
121 }};
122}
123
124/// Module-level socket log prefix used from free functions (e.g. the shared
125/// `tcp_socket_*` helpers) where `self` is not in scope but the caller can
126/// still thread a session `Ulid`, a cached peer address, and the underlying
127/// [`TcpStream`] through as parameters. Renders the same
128/// `[<ulid> - - -]\tSOCKET\tSession(peer=..., local=..., rtt=..., state=..., protocol=Tcp)\t >>>`
129/// prefix as [`log_socket_context!`]; colour scheme via
130/// [`sozu_command::logging::ansi_palette`].
131///
132/// Per-slot semantics:
133///
134/// - `peer` — prefers the caller-supplied `configured_peer` (cached at
135/// [`SessionTcpStream`] construction, immune to ENOTCONN on a socket that
136/// failed an asynchronous `connect()`) and falls back to a live
137/// `getpeername(2)` lookup when no cache was provided.
138/// - `local` — `getsockname(2)`, stays valid across failed connects.
139/// - `rtt` / `state` — a single `getsockopt(TCP_INFO)` call via
140/// [`stats::socket_snapshot`]; both render as `None` on an FSM state
141/// where the kernel rejects the call. `state="SYN_SENT"` is the
142/// clearest signal for a failed outbound `connect()`.
143/// - `protocol` — hardcoded to `Tcp` (raw-TCP helpers only).
144fn log_socket_module_prefix(
145 stream: &TcpStream,
146 session_ulid: Option<Ulid>,
147 configured_peer: Option<SocketAddr>,
148) -> String {
149 let (open, reset, grey, gray, white) = ansi_palette();
150 let ulid = match session_ulid {
151 Some(ulid) => ulid.to_string(),
152 None => "-".to_string(),
153 };
154 let snapshot = crate::socket::stats::socket_snapshot(stream);
155 let rtt = snapshot.as_ref().map(|s| s.rtt);
156 let state = snapshot.as_ref().map(|s| s.state);
157 format!(
158 "[{ulid} - - -]\t{open}SOCKET{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}local{reset}={white}{local:?}{reset}, {gray}rtt{reset}={white}{rtt:?}{reset}, {gray}state{reset}={white}{state:?}{reset}, {gray}protocol{reset}={white}Tcp{reset})\t >>>",
159 peer = configured_peer.or_else(|| stream.peer_addr().ok()),
160 local = stream.local_addr().ok(),
161 )
162}
163
164/// Shared read/write/vectored-write logic used by both
165/// [`impl SocketHandler for TcpStream`] and
166/// [`impl SocketHandler for SessionTcpStream`]. Free-function entry point:
167/// `self` is out of scope here, so error logs use [`log_socket_module_prefix`]
168/// which renders the same `Session(peer, rtt, protocol)` context as
169/// [`log_socket_context!`] by reading from the `stream` + `session_ulid`
170/// parameters threaded through each helper.
171fn tcp_socket_read(
172 stream: &mut TcpStream,
173 buf: &mut [u8],
174 session_ulid: Option<Ulid>,
175 configured_peer: Option<SocketAddr>,
176) -> (usize, SocketResult) {
177 let mut size = 0usize;
178 let mut counter = 0;
179 loop {
180 counter += 1;
181 if counter > MAX_LOOP_ITERATIONS {
182 error!(
183 "{} MAX_LOOP_ITERATION reached in TcpStream::socket_read",
184 log_socket_module_prefix(stream, session_ulid, configured_peer)
185 );
186 incr!(names::socket::READ_INFINITE_LOOP_ERROR);
187 return (size, SocketResult::Error);
188 }
189 // Loop invariant: the running cursor never overshoots the buffer, so the
190 // `&mut buf[size..]` slice below can never panic on a bad offset.
191 debug_assert!(
192 size <= buf.len(),
193 "read cursor {size} overran buffer len {} (would slice out of bounds)",
194 buf.len()
195 );
196 if size == buf.len() {
197 return (size, SocketResult::Continue);
198 }
199 match stream.read(&mut buf[size..]) {
200 Ok(0) => return (size, SocketResult::Closed),
201 Ok(sz) => {
202 // `read` cannot report more bytes than the slice it was given.
203 debug_assert!(
204 sz <= buf.len() - size,
205 "read reported {sz} bytes into a {}-byte remaining slice",
206 buf.len() - size
207 );
208 size += sz;
209 }
210 Err(e) => match e.kind() {
211 ErrorKind::WouldBlock => return (size, SocketResult::WouldBlock),
212 // Treat `ConnectionRefused` as a closed socket, mirroring the
213 // write path. On Linux a failed asynchronous `connect()`
214 // surfaces as `ECONNREFUSED` on the first read; it is
215 // operationally identical to any other benign peer-initiated
216 // close and does not warrant a log line on every backend
217 // that happens to be down.
218 ErrorKind::ConnectionReset
219 | ErrorKind::ConnectionAborted
220 | ErrorKind::BrokenPipe
221 | ErrorKind::ConnectionRefused => return (size, SocketResult::Closed),
222 // Noisy-expected transport failures: backend unreachable,
223 // TCP_USER_TIMEOUT expiry, post-close reads. Keep a log line
224 // so operators can still trend the rate, but `warn!` — this
225 // is reality-at-scale, not a sozu invariant break.
226 ErrorKind::HostUnreachable
227 | ErrorKind::NetworkUnreachable
228 | ErrorKind::TimedOut
229 | ErrorKind::NotConnected => {
230 warn!(
231 "{} socket_read error={:?}",
232 log_socket_module_prefix(stream, session_ulid, configured_peer),
233 e
234 );
235 return (size, SocketResult::Error);
236 }
237 // Genuinely loud variants (`PermissionDenied`, `AddrNotAvailable`,
238 // `InvalidInput`/`Data`, …) and the unknown catch-all stay at
239 // `error!` so operators keep paging on real misconfig.
240 _ => {
241 error!(
242 "{} socket_read error={:?}",
243 log_socket_module_prefix(stream, session_ulid, configured_peer),
244 e
245 );
246 return (size, SocketResult::Error);
247 }
248 },
249 }
250 }
251}
252
253fn tcp_socket_write(
254 stream: &mut TcpStream,
255 buf: &[u8],
256 session_ulid: Option<Ulid>,
257 configured_peer: Option<SocketAddr>,
258) -> (usize, SocketResult) {
259 let mut size = 0usize;
260 let mut counter = 0;
261 loop {
262 counter += 1;
263 if counter > MAX_LOOP_ITERATIONS {
264 error!(
265 "{} MAX_LOOP_ITERATION reached in TcpStream::socket_write",
266 log_socket_module_prefix(stream, session_ulid, configured_peer)
267 );
268 incr!(names::socket::WRITE_INFINITE_LOOP_ERROR);
269 return (size, SocketResult::Error);
270 }
271 // Loop invariant: the cursor never overshoots the buffer, so the
272 // `&buf[size..]` slice below can never panic on a bad offset.
273 debug_assert!(
274 size <= buf.len(),
275 "write cursor {size} overran buffer len {} (would slice out of bounds)",
276 buf.len()
277 );
278 if size == buf.len() {
279 return (size, SocketResult::Continue);
280 }
281 match stream.write(&buf[size..]) {
282 Ok(0) => return (size, SocketResult::Continue),
283 Ok(sz) => {
284 // `write` cannot report more bytes than the slice it was given.
285 debug_assert!(
286 sz <= buf.len() - size,
287 "write reported {sz} bytes from a {}-byte remaining slice",
288 buf.len() - size
289 );
290 size += sz;
291 }
292 Err(e) => match e.kind() {
293 ErrorKind::WouldBlock => return (size, SocketResult::WouldBlock),
294 ErrorKind::ConnectionReset
295 | ErrorKind::ConnectionAborted
296 | ErrorKind::BrokenPipe
297 | ErrorKind::ConnectionRefused => {
298 incr!(names::tcp::WRITE_ERROR);
299 return (size, SocketResult::Closed);
300 }
301 // Noisy-expected transport failures (see `tcp_socket_read`
302 // for rationale). Log at `warn!` and still bump the
303 // `tcp.write.error` counter so rate-based dashboards stay
304 // accurate.
305 ErrorKind::HostUnreachable
306 | ErrorKind::NetworkUnreachable
307 | ErrorKind::TimedOut
308 | ErrorKind::NotConnected => {
309 warn!(
310 "{} socket_write error={:?}",
311 log_socket_module_prefix(stream, session_ulid, configured_peer),
312 e
313 );
314 incr!(names::tcp::WRITE_ERROR);
315 return (size, SocketResult::Error);
316 }
317 _ => {
318 //FIXME: timeout and other common errors should be sent up
319 error!(
320 "{} socket_write error={:?}",
321 log_socket_module_prefix(stream, session_ulid, configured_peer),
322 e
323 );
324 incr!(names::tcp::WRITE_ERROR);
325 return (size, SocketResult::Error);
326 }
327 },
328 }
329 }
330}
331
332fn tcp_socket_write_vectored(
333 stream: &mut TcpStream,
334 bufs: &[std::io::IoSlice],
335 session_ulid: Option<Ulid>,
336 configured_peer: Option<SocketAddr>,
337) -> (usize, SocketResult) {
338 match stream.write_vectored(bufs) {
339 Ok(sz) => {
340 // `write_vectored` cannot report more bytes than the slices held.
341 debug_assert!(
342 sz <= bufs.iter().map(|b| b.len()).sum::<usize>(),
343 "write_vectored reported {sz} bytes from {}-byte slices",
344 bufs.iter().map(|b| b.len()).sum::<usize>()
345 );
346 (sz, SocketResult::Continue)
347 }
348 Err(e) => match e.kind() {
349 ErrorKind::WouldBlock => (0, SocketResult::WouldBlock),
350 ErrorKind::ConnectionReset
351 | ErrorKind::ConnectionAborted
352 | ErrorKind::BrokenPipe
353 | ErrorKind::ConnectionRefused => {
354 incr!(names::tcp::WRITE_ERROR);
355 (0, SocketResult::Closed)
356 }
357 // Noisy-expected transport failures (see `tcp_socket_read` for
358 // rationale). Same tiering as the scalar write path.
359 ErrorKind::HostUnreachable
360 | ErrorKind::NetworkUnreachable
361 | ErrorKind::TimedOut
362 | ErrorKind::NotConnected => {
363 warn!(
364 "{} socket_write error={:?}",
365 log_socket_module_prefix(stream, session_ulid, configured_peer),
366 e
367 );
368 incr!(names::tcp::WRITE_ERROR);
369 (0, SocketResult::Error)
370 }
371 _ => {
372 //FIXME: timeout and other common errors should be sent up
373 error!(
374 "{} socket_write error={:?}",
375 log_socket_module_prefix(stream, session_ulid, configured_peer),
376 e
377 );
378 incr!(names::tcp::WRITE_ERROR);
379 (0, SocketResult::Error)
380 }
381 },
382 }
383}
384
385impl SocketHandler for TcpStream {
386 fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult) {
387 tcp_socket_read(self, buf, None, None)
388 }
389
390 fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult) {
391 tcp_socket_write(self, buf, None, None)
392 }
393
394 fn socket_write_vectored(&mut self, bufs: &[std::io::IoSlice]) -> (usize, SocketResult) {
395 tcp_socket_write_vectored(self, bufs, None, None)
396 }
397
398 fn socket_ref(&self) -> &TcpStream {
399 self
400 }
401
402 fn socket_mut(&mut self) -> &mut TcpStream {
403 self
404 }
405
406 fn protocol(&self) -> TransportProtocol {
407 TransportProtocol::Tcp
408 }
409
410 fn read_error(&self) {
411 incr!(names::tcp::READ_ERROR);
412 }
413
414 fn write_error(&self) {
415 incr!(names::tcp::WRITE_ERROR);
416 }
417}
418
419/// [`TcpStream`] wrapped with the owning session's ULID. Exists so plain-TCP
420/// frontends and backends inside the mux stack can prefix SOCKET-layer error
421/// logs with `[<session_ulid> - - -]`, matching what TLS-wrapped frontends
422/// already do via [`FrontRustls::session_ulid`].
423///
424/// The inner [`TcpStream`] is exposed directly so mio registration sites can
425/// borrow it as-is; the outer type only participates in the [`SocketHandler`]
426/// trait dispatch.
427#[derive(Debug)]
428pub struct SessionTcpStream {
429 pub stream: TcpStream,
430 pub session_ulid: Ulid,
431 /// Peer address cached at construction. For backend-facing sockets
432 /// (created from a nonblocking `connect()` in `Router::connect`) this is
433 /// the cluster-configured backend address — reliable across ENOTCONN
434 /// after a failed handshake, which is the sharp case that motivates the
435 /// cache. For frontend-facing sockets constructed from an accepted
436 /// `TcpStream`, this is the client's peer address — identical to what a
437 /// live `getpeername(2)` would return, but threaded through the same
438 /// plumbing for uniformity. Used as the preferred source of truth for
439 /// the `peer=` slot in [`log_socket_module_prefix`], falling back to a
440 /// live lookup when `None`.
441 pub configured_peer: Option<SocketAddr>,
442}
443
444impl SessionTcpStream {
445 pub fn new(stream: TcpStream, session_ulid: Ulid, configured_peer: Option<SocketAddr>) -> Self {
446 Self {
447 stream,
448 session_ulid,
449 configured_peer,
450 }
451 }
452}
453
454impl SocketHandler for SessionTcpStream {
455 fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult) {
456 tcp_socket_read(
457 &mut self.stream,
458 buf,
459 Some(self.session_ulid),
460 self.configured_peer,
461 )
462 }
463
464 fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult) {
465 tcp_socket_write(
466 &mut self.stream,
467 buf,
468 Some(self.session_ulid),
469 self.configured_peer,
470 )
471 }
472
473 fn socket_write_vectored(&mut self, bufs: &[std::io::IoSlice]) -> (usize, SocketResult) {
474 tcp_socket_write_vectored(
475 &mut self.stream,
476 bufs,
477 Some(self.session_ulid),
478 self.configured_peer,
479 )
480 }
481
482 fn socket_ref(&self) -> &TcpStream {
483 &self.stream
484 }
485
486 fn socket_mut(&mut self) -> &mut TcpStream {
487 &mut self.stream
488 }
489
490 fn protocol(&self) -> TransportProtocol {
491 TransportProtocol::Tcp
492 }
493
494 fn read_error(&self) {
495 incr!(names::tcp::READ_ERROR);
496 }
497
498 fn write_error(&self) {
499 incr!(names::tcp::WRITE_ERROR);
500 }
501
502 fn session_ulid(&self) -> Option<Ulid> {
503 Some(self.session_ulid)
504 }
505}
506
507pub struct FrontRustls {
508 pub stream: TcpStream,
509 pub session: ServerConnection,
510 /// Peer sent a graceful FIN on the read side (`read()` returned `Ok(0)`).
511 /// We can no longer receive plaintext, but may still have rustls-buffered
512 /// records to flush on the write side — do NOT abort pending writes.
513 pub peer_disconnected: bool,
514 /// Peer reset the connection (RST/ConnectionAborted/BrokenPipe). The TCP
515 /// channel is dead; further writes are pointless and should short-circuit.
516 pub peer_reset: bool,
517 /// Connection/session ULID propagated from the enclosing mux session.
518 /// Rendered into SOCKET-layer error logs via [`Self::session_ulid`].
519 pub session_ulid: Ulid,
520}
521
522impl std::fmt::Debug for FrontRustls {
523 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
524 f.debug_struct("FrontRustls")
525 .field("stream", &self.stream)
526 .finish_non_exhaustive()
527 }
528}
529
530impl SocketHandler for FrontRustls {
531 fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult) {
532 let mut size = 0usize;
533 let mut can_read = true;
534 let mut is_error = false;
535 let mut is_closed = false;
536
537 let mut counter = 0;
538 loop {
539 counter += 1;
540 if counter > MAX_LOOP_ITERATIONS {
541 error!(
542 "{} MAX_LOOP_ITERATION reached in FrontRustls::socket_read",
543 log_socket_context!(self)
544 );
545 incr!(names::rustls::READ_INFINITE_LOOP_ERROR);
546 is_error = true;
547 break;
548 }
549
550 // Loop invariant: the plaintext cursor never overshoots the caller's
551 // buffer, so every `&mut buf[size..]` below is a valid slice.
552 debug_assert!(
553 size <= buf.len(),
554 "rustls read cursor {size} overran buffer len {} (would slice out of bounds)",
555 buf.len()
556 );
557 if size == buf.len() {
558 break;
559 }
560
561 if !can_read | is_error | is_closed {
562 break;
563 }
564
565 match self.session.read_tls(&mut self.stream) {
566 Ok(0) => {
567 // Graceful FIN on the read side: peer closed its write
568 // half. Keep `peer_reset` unset so outbound writes can
569 // still flush rustls's buffered records (half-close).
570 can_read = false;
571 is_closed = true;
572 self.peer_disconnected = true;
573 }
574 Ok(_sz) => {}
575 Err(e) => match e.kind() {
576 ErrorKind::WouldBlock => {
577 can_read = false;
578 }
579 ErrorKind::ConnectionReset
580 | ErrorKind::ConnectionAborted
581 | ErrorKind::BrokenPipe => {
582 // Full RST/abort: the TCP channel is dead. Mark
583 // `peer_reset` so writes short-circuit (nothing can
584 // reach the peer anymore) but still set
585 // `peer_disconnected` for back-compatible read-side
586 // logic.
587 is_closed = true;
588 self.peer_disconnected = true;
589 self.peer_reset = true;
590 }
591 // https://github.com/rustls/rustls/blob/main/rustls/src/conn.rs#L482-L500
592 // rustls's 16 KB received_plaintext buffer is full — expected
593 // under H2 where frame-at-a-time reads drain less than a full
594 // TLS record. The outer loop will drain plaintext next iteration.
595 ErrorKind::Other => {}
596 _ => {
597 error!(
598 "{} could not read TLS stream from socket: {:?}",
599 log_socket_context!(self),
600 e
601 );
602 is_error = true;
603 break;
604 }
605 },
606 }
607
608 if let Err(e) = self.session.process_new_packets() {
609 error!(
610 "{} could not process read TLS packets: {:?}",
611 log_socket_context!(self),
612 e
613 );
614 is_error = true;
615 break;
616 }
617
618 while !self.session.wants_read() {
619 match self.session.reader().read(&mut buf[size..]) {
620 Ok(0) => break,
621 Ok(sz) => {
622 // The rustls reader cannot return more plaintext than
623 // the remaining slice it was handed.
624 debug_assert!(
625 sz <= buf.len() - size,
626 "rustls reader returned {sz} bytes into a {}-byte remaining slice",
627 buf.len() - size
628 );
629 size += sz;
630 }
631 Err(e) => match e.kind() {
632 ErrorKind::WouldBlock => {
633 break;
634 }
635 ErrorKind::ConnectionReset
636 | ErrorKind::ConnectionAborted
637 | ErrorKind::BrokenPipe => {
638 is_closed = true;
639 break;
640 }
641 _ => {
642 error!(
643 "{} could not read data from TLS stream: {:?}",
644 log_socket_context!(self),
645 e
646 );
647 is_error = true;
648 break;
649 }
650 },
651 }
652 }
653 }
654
655 // Post-condition: we never report more plaintext than the caller asked
656 // for, and Error/Closed are mutually exclusive (the loop `break`s on the
657 // first one set, so both can never be true on the same pass).
658 debug_assert!(
659 size <= buf.len(),
660 "rustls socket_read returned {size} bytes for a {}-byte buffer",
661 buf.len()
662 );
663 debug_assert!(
664 !(is_error && is_closed),
665 "rustls socket_read cannot be both Error and Closed"
666 );
667 if is_error {
668 (size, SocketResult::Error)
669 } else if is_closed {
670 (size, SocketResult::Closed)
671 } else if size == buf.len() {
672 // The full requested amount was read (possibly from the rustls
673 // plaintext buffer). Report Continue so the caller keeps
674 // READABLE in the readiness set — there may be more decrypted
675 // data available without a new mio event.
676 (size, SocketResult::Continue)
677 } else if !can_read {
678 (size, SocketResult::WouldBlock)
679 } else {
680 (size, SocketResult::Continue)
681 }
682 }
683
684 /// Keep these two functions structurally symmetric — a divergence
685 /// caused the 4.5 MB H2 truncation bug. Tests
686 /// `e2e::tests::h2_correctness_tests::*` and
687 /// `e2e::tests::h2_tests::test_h2_large_*` are the regression guard.
688 fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult) {
689 // Abort only on a true RST — a FIN on the read side still permits
690 // flushing rustls's plaintext buffer (TLS half-close).
691 if self.peer_reset {
692 return (0, SocketResult::Closed);
693 }
694
695 let mut buffered_size = 0usize;
696 let mut can_write = true;
697 let mut is_error = false;
698 let mut is_closed = false;
699
700 let mut counter = 0;
701 loop {
702 counter += 1;
703 if counter > MAX_LOOP_ITERATIONS {
704 error!(
705 "{} MAX_LOOP_ITERATION reached in FrontRustls::socket_write",
706 log_socket_context!(self)
707 );
708 incr!(names::rustls::WRITE_INFINITE_LOOP_ERROR);
709 is_error = true;
710 break;
711 }
712 // Loop invariant: the absorbed-plaintext cursor never overshoots the
713 // caller's buffer, so `&buf[buffered_size..]` is always a valid slice.
714 debug_assert!(
715 buffered_size <= buf.len(),
716 "rustls write cursor {buffered_size} overran buffer len {} (would slice out of bounds)",
717 buf.len()
718 );
719 if buffered_size == buf.len() {
720 break;
721 }
722
723 if !can_write | is_error | is_closed {
724 break;
725 }
726
727 match self.session.writer().write(&buf[buffered_size..]) {
728 Ok(0) => {} // zero byte written means that the Rustls buffers are full, we will try to write on the socket and try again
729 Ok(sz) => {
730 // rustls cannot absorb more plaintext than the remaining slice.
731 debug_assert!(
732 sz <= buf.len() - buffered_size,
733 "rustls writer absorbed {sz} bytes from a {}-byte remaining slice",
734 buf.len() - buffered_size
735 );
736 buffered_size += sz;
737 }
738 Err(e) => match e.kind() {
739 ErrorKind::WouldBlock => {
740 // we don't need to do anything, the session will return false in wants_write?
741 //error!("rustls socket_write wouldblock");
742 }
743 ErrorKind::ConnectionReset
744 | ErrorKind::ConnectionAborted
745 | ErrorKind::BrokenPipe => {
746 //FIXME: this should probably not happen here
747 incr!(names::rustls::WRITE_ERROR);
748 is_closed = true;
749 self.peer_reset = true;
750 break;
751 }
752 _ => {
753 error!(
754 "{} could not write data to TLS stream: {:?}",
755 log_socket_context!(self),
756 e
757 );
758 incr!(names::rustls::WRITE_ERROR);
759 is_error = true;
760 break;
761 }
762 },
763 }
764
765 loop {
766 match self.session.write_tls(&mut self.stream) {
767 Ok(0) => {
768 //can_write = false;
769 break;
770 }
771 Ok(_sz) => {}
772 Err(e) => match e.kind() {
773 ErrorKind::WouldBlock => {
774 can_write = false;
775 break;
776 }
777 ErrorKind::ConnectionReset
778 | ErrorKind::ConnectionAborted
779 | ErrorKind::BrokenPipe => {
780 incr!(names::rustls::WRITE_ERROR);
781 is_closed = true;
782 self.peer_reset = true;
783 break;
784 }
785 _ => {
786 error!(
787 "{} could not write TLS stream to socket: {:?}",
788 log_socket_context!(self),
789 e
790 );
791 incr!(names::rustls::WRITE_ERROR);
792 is_error = true;
793 break;
794 }
795 },
796 }
797 }
798 }
799
800 // Flush any pending TLS records even if no application data was written.
801 // This handles the case where h2.rs calls socket_write(&[]) to flush
802 // buffered TLS data (e.g. NewSessionTicket, key updates). Without this,
803 // the main loop above exits immediately for empty buffers and write_tls
804 // is never called.
805 if !is_error && !is_closed && can_write && self.session.wants_write() {
806 loop {
807 match self.session.write_tls(&mut self.stream) {
808 Ok(0) => break,
809 Ok(_) => {}
810 Err(e) => match e.kind() {
811 ErrorKind::WouldBlock => {
812 can_write = false;
813 break;
814 }
815 ErrorKind::ConnectionReset
816 | ErrorKind::ConnectionAborted
817 | ErrorKind::BrokenPipe => {
818 incr!(names::rustls::WRITE_ERROR);
819 is_closed = true;
820 self.peer_reset = true;
821 break;
822 }
823 _ => {
824 error!(
825 "{} could not flush TLS stream to socket: {:?}",
826 log_socket_context!(self),
827 e
828 );
829 incr!(names::rustls::WRITE_ERROR);
830 is_error = true;
831 break;
832 }
833 },
834 }
835 }
836 }
837
838 // Post-condition: we never report absorbing more plaintext than the
839 // caller handed us — over-reporting is exactly the truncation-class bug
840 // these two symmetric paths exist to avoid.
841 debug_assert!(
842 buffered_size <= buf.len(),
843 "rustls socket_write reported {buffered_size} bytes for a {}-byte buffer",
844 buf.len()
845 );
846 debug_assert!(
847 !(is_error && is_closed),
848 "rustls socket_write cannot be both Error and Closed"
849 );
850 if is_error {
851 (buffered_size, SocketResult::Error)
852 } else if is_closed {
853 (buffered_size, SocketResult::Closed)
854 } else if !can_write {
855 (buffered_size, SocketResult::WouldBlock)
856 } else {
857 (buffered_size, SocketResult::Continue)
858 }
859 }
860
861 /// Write a list of plaintext slices through the rustls session.
862 ///
863 /// Empty-buffer invariant: callers may legitimately pass `bufs.is_empty()`
864 /// or an all-empty slice to request a pure flush pass. In that case
865 /// `total_len == 0`, the top-of-loop `buffered_size == total_len` guard
866 /// fires immediately after `write_tls` drains any pending TLS records the
867 /// session still has buffered (e.g. the remainder of a record split by
868 /// the previous call, or `close_notify` output). This mirrors
869 /// [`Self::socket_write`]: both entry points must stay structurally
870 /// symmetric so that a zero-byte flush never early-returns without giving
871 /// rustls a chance to emit bytes.
872 ///
873 /// Keep these two functions structurally symmetric — a divergence
874 /// caused the 4.5 MB H2 truncation bug. Tests
875 /// `e2e::tests::h2_correctness_tests::*` and
876 /// `e2e::tests::h2_tests::test_h2_large_*` are the regression guard.
877 fn socket_write_vectored(&mut self, bufs: &[std::io::IoSlice]) -> (usize, SocketResult) {
878 if self.peer_reset {
879 return (0, SocketResult::Closed);
880 }
881
882 let total_len: usize = bufs.iter().map(|b| b.len()).sum();
883 let mut buffered_size = 0usize;
884 let mut can_write = true;
885 let mut is_error = false;
886 let mut is_closed = false;
887
888 let mut counter = 0;
889 loop {
890 counter += 1;
891 if counter > MAX_LOOP_ITERATIONS {
892 error!(
893 "{} MAX_LOOP_ITERATION reached in FrontRustls::socket_write_vectored",
894 log_socket_context!(self)
895 );
896 incr!(names::rustls::WRITE_INFINITE_LOOP_ERROR);
897 is_error = true;
898 break;
899 }
900 // Loop invariant: the absorbed-plaintext cursor never overshoots the
901 // summed slice length we computed up front (mirrors the scalar path).
902 debug_assert!(
903 buffered_size <= total_len,
904 "rustls vectored write cursor {buffered_size} overran total slice len {total_len}"
905 );
906 if buffered_size == total_len {
907 break;
908 }
909
910 if !can_write | is_error | is_closed {
911 break;
912 }
913
914 // rustls's Writer does not expose a "write from offset across slices"
915 // helper, so we push plaintext once and then drain via write_tls.
916 // If rustls only partially absorbs the slices, we break and return
917 // the partial count so the caller can advance its buffers and retry.
918 if buffered_size == 0 {
919 match self.session.writer().write_vectored(bufs) {
920 Ok(0) => {}
921 Ok(sz) => {
922 // rustls cannot absorb more plaintext than the slices held.
923 debug_assert!(
924 sz <= total_len,
925 "rustls writer absorbed {sz} bytes from {total_len}-byte slices"
926 );
927 buffered_size += sz;
928 }
929 Err(e) => match e.kind() {
930 ErrorKind::WouldBlock => {}
931 ErrorKind::ConnectionReset
932 | ErrorKind::ConnectionAborted
933 | ErrorKind::BrokenPipe => {
934 incr!(names::rustls::WRITE_ERROR);
935 is_closed = true;
936 self.peer_reset = true;
937 break;
938 }
939 _ => {
940 error!(
941 "{} could not write data to TLS stream: {:?}",
942 log_socket_context!(self),
943 e
944 );
945 incr!(names::rustls::WRITE_ERROR);
946 is_error = true;
947 break;
948 }
949 },
950 }
951 }
952
953 // Plaintext was partially absorbed — we cannot re-call write_vectored
954 // because the IoSlice pointers have not been advanced. Drain whatever
955 // rustls buffered to the socket, then return the partial count so the
956 // caller can consume and retry with adjusted slices.
957 if buffered_size > 0 && buffered_size < total_len {
958 loop {
959 match self.session.write_tls(&mut self.stream) {
960 Ok(0) => break,
961 Ok(_) => {}
962 Err(e) => match e.kind() {
963 ErrorKind::WouldBlock => {
964 can_write = false;
965 break;
966 }
967 ErrorKind::ConnectionReset
968 | ErrorKind::ConnectionAborted
969 | ErrorKind::BrokenPipe => {
970 incr!(names::rustls::WRITE_ERROR);
971 is_closed = true;
972 self.peer_reset = true;
973 break;
974 }
975 _ => {
976 error!(
977 "{} could not write TLS stream to socket: {:?}",
978 log_socket_context!(self),
979 e
980 );
981 incr!(names::rustls::WRITE_ERROR);
982 is_error = true;
983 break;
984 }
985 },
986 }
987 }
988 break;
989 }
990
991 loop {
992 match self.session.write_tls(&mut self.stream) {
993 Ok(0) => {
994 break;
995 }
996 Ok(_sz) => {}
997 Err(e) => match e.kind() {
998 ErrorKind::WouldBlock => {
999 can_write = false;
1000 break;
1001 }
1002 ErrorKind::ConnectionReset
1003 | ErrorKind::ConnectionAborted
1004 | ErrorKind::BrokenPipe => {
1005 incr!(names::rustls::WRITE_ERROR);
1006 is_closed = true;
1007 self.peer_reset = true;
1008 break;
1009 }
1010 _ => {
1011 error!(
1012 "{} could not write TLS stream to socket: {:?}",
1013 log_socket_context!(self),
1014 e
1015 );
1016 incr!(names::rustls::WRITE_ERROR);
1017 is_error = true;
1018 break;
1019 }
1020 },
1021 }
1022 }
1023 }
1024
1025 if !is_error && !is_closed && can_write && self.session.wants_write() {
1026 loop {
1027 match self.session.write_tls(&mut self.stream) {
1028 Ok(0) => break,
1029 Ok(_) => {}
1030 Err(e) => match e.kind() {
1031 ErrorKind::WouldBlock => {
1032 can_write = false;
1033 break;
1034 }
1035 ErrorKind::ConnectionReset
1036 | ErrorKind::ConnectionAborted
1037 | ErrorKind::BrokenPipe => {
1038 incr!(names::rustls::WRITE_ERROR);
1039 is_closed = true;
1040 self.peer_reset = true;
1041 break;
1042 }
1043 _ => {
1044 error!(
1045 "{} could not flush TLS stream to socket: {:?}",
1046 log_socket_context!(self),
1047 e
1048 );
1049 incr!(names::rustls::WRITE_ERROR);
1050 is_error = true;
1051 break;
1052 }
1053 },
1054 }
1055 }
1056 }
1057
1058 // Post-condition: report no more than the summed slice length, and keep
1059 // Error/Closed mutually exclusive — must stay structurally symmetric with
1060 // `socket_write` (divergence here is the 4.5 MB truncation-class bug).
1061 debug_assert!(
1062 buffered_size <= total_len,
1063 "rustls socket_write_vectored reported {buffered_size} bytes for {total_len}-byte slices"
1064 );
1065 debug_assert!(
1066 !(is_error && is_closed),
1067 "rustls socket_write_vectored cannot be both Error and Closed"
1068 );
1069 if is_error {
1070 (buffered_size, SocketResult::Error)
1071 } else if is_closed {
1072 (buffered_size, SocketResult::Closed)
1073 } else if !can_write {
1074 (buffered_size, SocketResult::WouldBlock)
1075 } else {
1076 (buffered_size, SocketResult::Continue)
1077 }
1078 }
1079
1080 fn socket_close(&mut self) {
1081 self.session.send_close_notify();
1082 }
1083
1084 fn socket_wants_write(&self) -> bool {
1085 // Only a true RST stops us wanting to write — a peer FIN still
1086 // allows flushing TLS plaintext buffered in rustls (half-close).
1087 !self.peer_reset && self.session.wants_write()
1088 }
1089
1090 fn socket_ref(&self) -> &TcpStream {
1091 &self.stream
1092 }
1093
1094 fn socket_mut(&mut self) -> &mut TcpStream {
1095 &mut self.stream
1096 }
1097
1098 fn protocol(&self) -> TransportProtocol {
1099 self.session
1100 .protocol_version()
1101 .map(|version| match version {
1102 ProtocolVersion::SSLv2 => TransportProtocol::Ssl2,
1103 ProtocolVersion::SSLv3 => TransportProtocol::Ssl3,
1104 ProtocolVersion::TLSv1_0 => TransportProtocol::Tls1_0,
1105 ProtocolVersion::TLSv1_1 => TransportProtocol::Tls1_1,
1106 ProtocolVersion::TLSv1_2 => TransportProtocol::Tls1_2,
1107 ProtocolVersion::TLSv1_3 => TransportProtocol::Tls1_3,
1108 _ => TransportProtocol::Tls1_3,
1109 })
1110 .unwrap_or(TransportProtocol::Tcp)
1111 }
1112
1113 fn read_error(&self) {
1114 incr!(names::rustls::READ_ERROR);
1115 }
1116
1117 fn write_error(&self) {
1118 incr!(names::rustls::WRITE_ERROR);
1119 }
1120
1121 fn session_ulid(&self) -> Option<Ulid> {
1122 Some(self.session_ulid)
1123 }
1124}
1125
1126pub fn server_bind(addr: SocketAddr) -> Result<TcpListener, ServerBindError> {
1127 let sock = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
1128 .map_err(ServerBindError::SocketCreationError)?;
1129
1130 // set so_reuseaddr, but only on unix (mirrors what libstd does)
1131 if cfg!(unix) {
1132 sock.set_reuse_address(true)
1133 .map_err(ServerBindError::SetReuseAddress)?;
1134 }
1135
1136 sock.set_reuse_port(true)
1137 .map_err(ServerBindError::SetReusePort)?;
1138
1139 sock.bind(&addr.into())
1140 .map_err(ServerBindError::BindError)?;
1141
1142 sock.set_nonblocking(true)
1143 .map_err(ServerBindError::SetNonBlocking)?;
1144
1145 // listen
1146 // FIXME: make the backlog configurable?
1147 sock.listen(1024).map_err(ServerBindError::Listen)?;
1148
1149 // Post-conditions (invariant violations only — every fallible syscall above
1150 // already returns an error; these `debug_assert!`s catch a flag we *set*
1151 // silently not sticking, which would be our own logic bug, not a syscall
1152 // failure on network input). The getters return `io::Result`; we only
1153 // assert when the kernel answers, degrading to a no-op on the rare query
1154 // failure so we never panic on a dying fd.
1155 if let Ok(nonblocking) = sock.nonblocking() {
1156 debug_assert!(
1157 nonblocking,
1158 "server_bind must return a non-blocking socket (the worker event loop is edge-triggered)"
1159 );
1160 }
1161 // `SO_REUSEPORT` is set on every platform; assert it stuck so a SCM hand-off
1162 // across a hot-upgrade can re-bind the same address.
1163 #[cfg(unix)]
1164 if let Ok(reuse_port) = sock.reuse_port() {
1165 debug_assert!(
1166 reuse_port,
1167 "server_bind must set SO_REUSEPORT so the listener survives a hot-upgrade re-bind"
1168 );
1169 }
1170 // `SO_REUSEADDR` is unix-only here (mirrors libstd).
1171 #[cfg(unix)]
1172 if let Ok(reuse_address) = sock.reuse_address() {
1173 debug_assert!(
1174 reuse_address,
1175 "server_bind must set SO_REUSEADDR on unix (mirrors libstd)"
1176 );
1177 }
1178 // A bound STREAM socket carries a local address in the requested family.
1179 if let Ok(local) = sock.local_addr() {
1180 debug_assert_eq!(
1181 local.is_ipv4(),
1182 addr.is_ipv4(),
1183 "bound socket family must match the requested address family"
1184 );
1185 debug_assert_eq!(
1186 local.is_ipv6(),
1187 addr.is_ipv6(),
1188 "bound socket family must match the requested address family"
1189 );
1190 }
1191
1192 Ok(TcpListener::from_std(sock.into()))
1193}
1194
1195/// Bind a non-blocking UDP listener socket on `addr`.
1196///
1197/// Mirrors [`server_bind`] but for DGRAM: `SO_REUSEADDR` (unix) + `SO_REUSEPORT`
1198/// so the socket can be SCM-passed and re-bound across a hot-upgrade, then
1199/// `bind` + non-blocking. Unlike TCP there is **no `listen()`** — a UDP socket
1200/// receives datagrams directly. The returned `mio::net::UdpSocket` is the one
1201/// listener socket the UDP datapath demuxes many flows over (one-socket-many-
1202/// flows; per-flow return sockets are created by [`udp_connect`]).
1203pub fn udp_bind(addr: SocketAddr) -> Result<UdpSocket, ServerBindError> {
1204 let sock = Socket::new(Domain::for_address(addr), Type::DGRAM, Some(Protocol::UDP))
1205 .map_err(ServerBindError::SocketCreationError)?;
1206
1207 // set so_reuseaddr, but only on unix (mirrors what libstd does)
1208 if cfg!(unix) {
1209 sock.set_reuse_address(true)
1210 .map_err(ServerBindError::SetReuseAddress)?;
1211 }
1212
1213 sock.set_reuse_port(true)
1214 .map_err(ServerBindError::SetReusePort)?;
1215
1216 sock.bind(&addr.into())
1217 .map_err(ServerBindError::BindError)?;
1218
1219 sock.set_nonblocking(true)
1220 .map_err(ServerBindError::SetNonBlocking)?;
1221
1222 // No `listen()` for DGRAM sockets.
1223
1224 // Post-conditions — same rationale as `server_bind`: assert the flags we set
1225 // stuck (logic bug if not), degrading to a no-op when the kernel refuses the
1226 // query so a dying fd never panics. There is deliberately no `listen()`
1227 // check here: DGRAM sockets are never listened on.
1228 if let Ok(nonblocking) = sock.nonblocking() {
1229 debug_assert!(
1230 nonblocking,
1231 "udp_bind must return a non-blocking socket (the worker event loop is edge-triggered)"
1232 );
1233 }
1234 #[cfg(unix)]
1235 if let Ok(reuse_port) = sock.reuse_port() {
1236 debug_assert!(
1237 reuse_port,
1238 "udp_bind must set SO_REUSEPORT so the listener survives a hot-upgrade re-bind"
1239 );
1240 }
1241 #[cfg(unix)]
1242 if let Ok(reuse_address) = sock.reuse_address() {
1243 debug_assert!(
1244 reuse_address,
1245 "udp_bind must set SO_REUSEADDR on unix (mirrors libstd / server_bind)"
1246 );
1247 }
1248 if let Ok(local) = sock.local_addr() {
1249 debug_assert_eq!(
1250 local.is_ipv4(),
1251 addr.is_ipv4(),
1252 "bound UDP socket family must match the requested address family"
1253 );
1254 debug_assert_eq!(
1255 local.is_ipv6(),
1256 addr.is_ipv6(),
1257 "bound UDP socket family must match the requested address family"
1258 );
1259 }
1260
1261 Ok(UdpSocket::from_std(sock.into()))
1262}
1263
1264/// Create a non-blocking **connected** per-flow upstream UDP socket toward
1265/// `backend`.
1266///
1267/// The socket is bound to an ephemeral local port (family matched to the
1268/// backend) and `connect`-ed to the backend address. A connected UDP socket
1269/// "only receives from the connected address" (`connect(2)`), so its fd is the
1270/// symmetric-NAT return-demux key for one flow: the shell registers
1271/// `upstream_token -> FlowId` and feeds anything that arrives on it back into
1272/// the manager as a `BackendDatagram`. `send` (not `send_to`) is then used for
1273/// the forward path. Errors (`EMFILE`/`ENFILE`/connect refusal) bubble up so
1274/// the caller can shed the flow rather than panic.
1275pub fn udp_connect(backend: SocketAddr) -> Result<UdpSocket, ServerBindError> {
1276 let unspecified: SocketAddr = match backend {
1277 SocketAddr::V4(_) => (std::net::Ipv4Addr::UNSPECIFIED, 0).into(),
1278 SocketAddr::V6(_) => (std::net::Ipv6Addr::UNSPECIFIED, 0).into(),
1279 };
1280 // The ephemeral bind address must be in the backend's family with port 0,
1281 // or the subsequent `connect` would mix families / pin a wrong local port.
1282 debug_assert_eq!(
1283 unspecified.is_ipv4(),
1284 backend.is_ipv4(),
1285 "ephemeral bind family must match the backend family"
1286 );
1287 debug_assert_eq!(
1288 unspecified.port(),
1289 0,
1290 "ephemeral bind must use port 0 so the kernel picks the source port"
1291 );
1292 let sock = Socket::new(
1293 Domain::for_address(backend),
1294 Type::DGRAM,
1295 Some(Protocol::UDP),
1296 )
1297 .map_err(ServerBindError::SocketCreationError)?;
1298
1299 sock.bind(&unspecified.into())
1300 .map_err(ServerBindError::BindError)?;
1301 sock.set_nonblocking(true)
1302 .map_err(ServerBindError::SetNonBlocking)?;
1303 // `connect` on a DGRAM socket pins the return 4-tuple; a non-blocking
1304 // connect on UDP completes immediately (no handshake).
1305 sock.connect(&backend.into())
1306 .map_err(ServerBindError::BindError)?;
1307
1308 // Post-conditions — assert the flag/connect state stuck (logic bug if not),
1309 // degrading to a no-op when the kernel refuses the query so a dying fd never
1310 // panics on this network-facing path.
1311 if let Ok(nonblocking) = sock.nonblocking() {
1312 debug_assert!(
1313 nonblocking,
1314 "udp_connect must return a non-blocking socket (the worker event loop is edge-triggered)"
1315 );
1316 }
1317 // The connected return socket's local addr family must match the backend,
1318 // and the kernel must have assigned a concrete source port (no longer 0).
1319 if let Ok(local) = sock.local_addr() {
1320 debug_assert_eq!(
1321 local.is_ipv4(),
1322 backend.is_ipv4(),
1323 "connected UDP socket family must match the backend family"
1324 );
1325 if let Some(local) = local.as_socket() {
1326 debug_assert_ne!(
1327 local.port(),
1328 0,
1329 "connect must bind a concrete ephemeral source port (the return-demux key)"
1330 );
1331 }
1332 }
1333 // `connect` pinned the peer 4-tuple — `getpeername(2)` must echo the backend.
1334 if let Ok(peer) = sock.peer_addr() {
1335 if let Some(peer) = peer.as_socket() {
1336 debug_assert_eq!(
1337 peer, backend,
1338 "connect must pin the peer to the requested backend (symmetric-NAT return-demux key)"
1339 );
1340 }
1341 }
1342
1343 Ok(UdpSocket::from_std(sock.into()))
1344}
1345
1346/// Socket statistics
1347pub mod stats {
1348 use std::{os::fd::AsRawFd, time::Duration};
1349
1350 use internal::{OPT_LEVEL, OPT_NAME, TcpInfo};
1351
1352 /// Point-in-time snapshot of kernel TCP bookkeeping for a socket. Populated
1353 /// from a single `getsockopt(TCP_INFO)` syscall so callers that want both
1354 /// the smoothed RTT and the FSM state don't pay for two trips into the
1355 /// kernel. Field set is deliberately narrow — extend with more `tcp_info`
1356 /// members if the log prefix grows.
1357 #[derive(Clone, Debug)]
1358 pub struct TcpSnapshot {
1359 pub rtt: Duration,
1360 pub state: &'static str,
1361 }
1362
1363 /// Round trip time for a TCP socket. Kept for existing metric callers;
1364 /// log-prefix callers should prefer [`socket_snapshot`] which returns the
1365 /// RTT **and** the TCP FSM state from a single syscall.
1366 pub fn socket_rtt<A: AsRawFd>(socket: &A) -> Option<Duration> {
1367 socket_info(socket.as_raw_fd()).map(|info| Duration::from_micros(info.rtt() as u64))
1368 }
1369
1370 /// Smoothed RTT + human-readable TCP state (`"ESTABLISHED"`, `"SYN_SENT"`,
1371 /// `"CLOSE_WAIT"`, …) pulled from a single `getsockopt(TCP_INFO)` call.
1372 /// Returns `None` when the kernel refuses the call — e.g. the socket has
1373 /// been closed, or the FSM is in a state where `TCP_INFO` is not usable.
1374 /// Safe on dying/refused sockets: the inner syscall's `status != 0`
1375 /// branch is the only failure mode and it degrades to `None`.
1376 pub fn socket_snapshot<A: AsRawFd>(socket: &A) -> Option<TcpSnapshot> {
1377 socket_info(socket.as_raw_fd()).map(|info| TcpSnapshot {
1378 rtt: Duration::from_micros(info.rtt() as u64),
1379 state: info.state(),
1380 })
1381 }
1382
1383 #[cfg(unix)]
1384 pub fn socket_info(fd: libc::c_int) -> Option<TcpInfo> {
1385 // SAFETY: `TcpInfo` is a C POD whose every byte pattern is a legal
1386 // representation; zero-init satisfies `assume_init`'s invariant
1387 // (and `std::mem::zeroed` is the canonical idiom for that).
1388 let mut tcp_info: TcpInfo = unsafe { std::mem::zeroed() };
1389 let struct_len = std::mem::size_of::<TcpInfo>() as libc::socklen_t;
1390 let mut len = struct_len;
1391 // SAFETY: `tcp_info` and `len` are fully initialised above; libc
1392 // reads only `len` bytes through the pointer and writes back the
1393 // resulting length. We check the return value (`status != 0`) to
1394 // distinguish success from validation failure.
1395 let status = unsafe {
1396 libc::getsockopt(
1397 fd,
1398 OPT_LEVEL,
1399 OPT_NAME,
1400 &mut tcp_info as *mut _ as *mut _,
1401 &mut len,
1402 )
1403 };
1404 if status != 0 {
1405 None
1406 } else {
1407 // The kernel writes back the number of bytes it populated. It must
1408 // never claim to have written more than the buffer we handed it —
1409 // that would mean it overran `tcp_info`, an out-of-bounds write we
1410 // could not have detected by the return code alone.
1411 debug_assert!(
1412 len <= struct_len,
1413 "getsockopt(TCP_INFO) wrote back len {len} > struct size {struct_len} (buffer overrun)"
1414 );
1415 Some(tcp_info)
1416 }
1417 }
1418 #[cfg(not(unix))]
1419 pub fn socketinfo(fd: libc::c_int) -> Option<TcpInfo> {
1420 None
1421 }
1422
1423 #[cfg(unix)]
1424 #[cfg(not(any(target_os = "macos", target_os = "ios")))]
1425 mod internal {
1426 #[cfg(target_os = "linux")]
1427 pub const OPT_LEVEL: libc::c_int = libc::SOL_TCP;
1428
1429 #[cfg(any(
1430 target_os = "freebsd",
1431 target_os = "dragonfly",
1432 target_os = "openbsd",
1433 target_os = "netbsd"
1434 ))]
1435 pub const OPT_LEVEL: libc::c_int = libc::IPPROTO_TCP;
1436
1437 pub const OPT_NAME: libc::c_int = libc::TCP_INFO;
1438
1439 #[derive(Clone, Debug)]
1440 #[repr(C)]
1441 pub struct TcpInfo {
1442 // State
1443 tcpi_state: u8,
1444 tcpi_ca_state: u8,
1445 tcpi_retransmits: u8,
1446 tcpi_probes: u8,
1447 tcpi_backoff: u8,
1448 tcpi_options: u8,
1449 tcpi_snd_rcv_wscale: u8, // 4bits|4bits
1450
1451 tcpi_rto: u32,
1452 tcpi_ato: u32,
1453 tcpi_snd_mss: u32,
1454 tcpi_rcv_mss: u32,
1455
1456 tcpi_unacked: u32,
1457 tcpi_sacked: u32,
1458 tcpi_lost: u32,
1459 tcpi_retrans: u32,
1460 tcpi_fackets: u32,
1461
1462 // Times
1463 tcpi_last_data_sent: u32,
1464 tcpi_last_ack_sent: u32, // Not remembered
1465 tcpi_last_data_recv: u32,
1466 tcpi_last_ack_recv: u32,
1467
1468 // Metrics
1469 tcpi_pmtu: u32,
1470 tcpi_rcv_ssthresh: u32,
1471 tcpi_rtt: u32,
1472 tcpi_rttvar: u32,
1473 tcpi_snd_ssthresh: u32,
1474 tcpi_snd_cwnd: u32,
1475 tcpi_advmss: u32,
1476 tcpi_reordering: u32,
1477 }
1478 impl TcpInfo {
1479 pub fn rtt(&self) -> u32 {
1480 self.tcpi_rtt
1481 }
1482
1483 /// Human-readable Linux TCP FSM state. Values follow
1484 /// `include/net/tcp_states.h` (`TCP_ESTABLISHED = 1`,
1485 /// `TCP_SYN_SENT = 2`, …). Anything unexpected falls back to
1486 /// `"UNKNOWN"` rather than panicking — the log prefix is a
1487 /// best-effort diagnostic and must not add failure modes.
1488 pub fn state(&self) -> &'static str {
1489 match self.tcpi_state {
1490 1 => "ESTABLISHED",
1491 2 => "SYN_SENT",
1492 3 => "SYN_RECV",
1493 4 => "FIN_WAIT1",
1494 5 => "FIN_WAIT2",
1495 6 => "TIME_WAIT",
1496 7 => "CLOSE",
1497 8 => "CLOSE_WAIT",
1498 9 => "LAST_ACK",
1499 10 => "LISTEN",
1500 11 => "CLOSING",
1501 12 => "NEW_SYN_RECV",
1502 _ => "UNKNOWN",
1503 }
1504 }
1505 }
1506 }
1507
1508 #[cfg(unix)]
1509 #[cfg(any(target_os = "macos", target_os = "ios"))]
1510 mod internal {
1511 pub const OPT_LEVEL: libc::c_int = libc::IPPROTO_TCP;
1512 pub const OPT_NAME: libc::c_int = 0x106;
1513
1514 #[derive(Clone, Debug)]
1515 #[repr(C)]
1516 pub struct TcpInfo {
1517 tcpi_state: u8,
1518 tcpi_snd_wscale: u8,
1519 tcpi_rcv_wscale: u8,
1520 __pad1: u8,
1521 tcpi_options: u32,
1522 tcpi_flags: u32,
1523 tcpi_rto: u32,
1524 tcpi_maxseg: u32,
1525 tcpi_snd_ssthresh: u32,
1526 tcpi_snd_cwnd: u32,
1527 tcpi_snd_wnd: u32,
1528 tcpi_snd_sbbytes: u32,
1529 tcpi_rcv_wnd: u32,
1530 tcpi_rttcur: u32,
1531 tcpi_srtt: u32,
1532 tcpi_rttvar: u32,
1533 tcpi_tfo: u32,
1534 tcpi_txpackets: u64,
1535 tcpi_txbytes: u64,
1536 tcpi_txretransmitbytes: u64,
1537 tcpi_rxpackets: u64,
1538 tcpi_rxbytes: u64,
1539 tcpi_rxoutoforderbytes: u64,
1540 tcpi_txretransmitpackets: u64,
1541 }
1542 impl TcpInfo {
1543 pub fn rtt(&self) -> u32 {
1544 // tcpi_srtt is in milliseconds not microseconds
1545 self.tcpi_srtt * 1000
1546 }
1547
1548 /// Human-readable Darwin TCP FSM state. Values follow
1549 /// `netinet/tcp_fsm.h` (`TCPS_CLOSED = 0`, `TCPS_LISTEN = 1`,
1550 /// `TCPS_SYN_SENT = 2`, …). Differs from Linux numbering —
1551 /// macOS counts from 0, Linux from 1.
1552 pub fn state(&self) -> &'static str {
1553 match self.tcpi_state {
1554 0 => "CLOSED",
1555 1 => "LISTEN",
1556 2 => "SYN_SENT",
1557 3 => "SYN_RECEIVED",
1558 4 => "ESTABLISHED",
1559 5 => "CLOSE_WAIT",
1560 6 => "FIN_WAIT_1",
1561 7 => "CLOSING",
1562 8 => "LAST_ACK",
1563 9 => "FIN_WAIT_2",
1564 10 => "TIME_WAIT",
1565 _ => "UNKNOWN",
1566 }
1567 }
1568 }
1569 }
1570
1571 #[cfg(not(unix))]
1572 #[derive(Clone, Debug)]
1573 struct TcpInfo {}
1574
1575 #[test]
1576 #[serial_test::serial]
1577 fn test_rtt() {
1578 let sock = std::net::TcpStream::connect("google.com:80").unwrap();
1579 let fd = sock.as_raw_fd();
1580 let info = socket_info(fd);
1581 assert!(info.is_some());
1582 println!("{info:#?}");
1583 println!(
1584 "rtt: {}",
1585 sozu_command::logging::LogDuration(socket_rtt(&sock))
1586 );
1587 }
1588}