sozu_lib/protocol/mux/mod.rs
1//! HTTP/1.1 and HTTP/2 multiplexing layer.
2//!
3//! This module unifies HTTP/1.1 and HTTP/2 behind a single [`Mux`] session
4//! state machine that integrates with sozu's mio event loop. The key types:
5//!
6//! - [`Mux`]: The top-level session state, generic over socket (`TcpStream` or
7//! `FrontRustls`) and listener. Implements `SessionState`.
8//! - [`Connection`]: Enum dispatching to [`ConnectionH1`] or [`ConnectionH2`]
9//! for protocol-specific readable/writable logic.
10//! - [`Stream`]: Per-request state with front/back kawa buffers, metrics, and
11//! lifecycle tracking. Shared between H1 and H2 paths.
12//! - [`Context`]: Per-session context (cluster, backends, routing, timeouts).
13//!
14//! The H2 implementation handles RFC 9113 framing, HPACK (RFC 7541), flow
15//! control, flood detection (CVE-2023-44487, CVE-2019-9512/9514/9515/9518,
16//! CVE-2024-27316), and graceful shutdown (double-GOAWAY per RFC 9113 §6.8).
17
18use std::{
19 cell::RefCell,
20 collections::{HashMap, VecDeque},
21 fmt::Debug,
22 io::ErrorKind,
23 net::{Shutdown, SocketAddr},
24 rc::{Rc, Weak},
25 sync::Arc,
26 time::{Duration, Instant},
27};
28
29use mio::{Token, net::TcpStream};
30use rusty_ulid::Ulid;
31use sozu_command::{
32 logging::ansi_palette,
33 proto::command::{Event, EventKind},
34 ready::Ready,
35};
36
37/// Protocol label + session descriptor used as a prefix on every [`Mux`] log
38/// line. Matches the RUSTLS log-context convention:
39/// `[<ulid> - - -]\tMUX\tSession(...)\t >>>`. When colored output is enabled
40/// (via [`ansi_palette`]) the label is wrapped in bold bright-white ANSI
41/// (uniform across every protocol) and the session detail block is rendered
42/// in light grey.
43///
44/// Fields included in the session block:
45/// - `frontend` — mio token of the frontend socket
46/// - `peer` — peer address (or `None` if the socket is gone)
47/// - `streams` — number of streams currently held by the [`Context`]
48/// - `backends` — number of backend connections in the [`Router`]
49/// - `pending_links` — streams waiting to be linked to a backend
50/// - `readiness` — frontend mio readiness snapshot
51macro_rules! log_context {
52 ($self:expr) => {{
53 let (open, reset, grey, gray, white) = ansi_palette();
54 format!(
55 "[{ulid} - - -]\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}peer{reset}={white}{peer:?}{reset}, {gray}streams{reset}={white}{streams}{reset}, {gray}backends{reset}={white}{backends}{reset}, {gray}pending_links{reset}={white}{pending_links}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
56 open = open,
57 reset = reset,
58 grey = grey,
59 gray = gray,
60 white = white,
61 ulid = $self.session_ulid,
62 frontend = $self.frontend_token.0,
63 peer = $self.frontend.socket().peer_addr().ok(),
64 streams = $self.context.streams.len(),
65 backends = $self.router.backends.len(),
66 pending_links = $self.context.pending_links.len(),
67 readiness = $self.frontend.readiness(),
68 )
69 }};
70}
71
72/// Lighter variant of [`log_context!`] that omits the
73/// `streams`/`backends`/`pending_links` counts. Used at sites where the
74/// borrow checker forbids reading `self.router.backends` or
75/// `self.context.streams` (e.g. inside a method that already holds a mutable
76/// borrow on one of them). The ULID and frontend snapshot still carry enough
77/// context to correlate the line back to the rest of the session.
78macro_rules! log_context_lite {
79 ($self:expr) => {{
80 let (open, reset, grey, gray, white) = ansi_palette();
81 format!(
82 "[{ulid} - - -]\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}peer{reset}={white}{peer:?}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
83 open = open,
84 reset = reset,
85 grey = grey,
86 gray = gray,
87 white = white,
88 ulid = $self.session_ulid,
89 frontend = $self.frontend_token.0,
90 peer = $self.frontend.socket().peer_addr().ok(),
91 readiness = $self.frontend.readiness(),
92 )
93 }};
94}
95
96/// Module-level prefix for logs emitted from free functions or routing
97/// blocks where no [`Mux`] is in scope. Honours the colored flag.
98///
99/// Two arms:
100/// * `log_module_context!()` — zero-arg, legacy `MUX\t >>>` output. Kept
101/// for sites without an `HttpContext` in scope (e.g. the generic
102/// `trace!` that fires before the variant-specific match).
103/// * `log_module_context!($http_context)` — rich form. `$http_context`
104/// must be `&HttpContext`. Produces the same
105/// `[session req cluster backend]` bracket as RUSTLS/PIPE/TCP followed
106/// by a `Session(...)` block, so MUX lines emitted from variant match
107/// arms stay filterable by session ULID or request ULID. Mirrors
108/// `router.rs:log_module_context!($http_context)` (see there).
109macro_rules! log_module_context {
110 () => {{
111 let (open, reset, _, _, _) = ansi_palette();
112 format!("{open}MUX{reset}\t >>>", open = open, reset = reset)
113 }};
114 ($http_context:expr) => {{
115 let (open, reset, grey, gray, white) = ansi_palette();
116 let http_ctx: &HttpContext = &$http_context;
117 let ctx = http_ctx.log_context();
118 format!(
119 "{gray}{ctx}{reset}\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend:?}{reset}, {gray}method{reset}={white}{method:?}{reset}, {gray}authority{reset}={white}{authority:?}{reset})\t >>>",
120 open = open,
121 reset = reset,
122 grey = grey,
123 gray = gray,
124 white = white,
125 ctx = ctx,
126 frontend = http_ctx.session_address,
127 method = http_ctx.method,
128 authority = http_ctx.authority,
129 )
130 }};
131}
132
133pub mod answers;
134pub mod auth;
135pub mod connection;
136mod converter;
137pub mod debug;
138mod h1;
139mod h2;
140pub mod parser;
141mod pkawa;
142pub mod router;
143pub(crate) mod serializer;
144mod shared;
145pub mod stream;
146
147use crate::metrics::names;
148use crate::{
149 BackendConnectionError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerHandler,
150 ProxySession, Readiness, RetrieveClusterError, SessionIsToBeClosed, SessionMetrics,
151 SessionResult, StateResult,
152 backends::{Backend, BackendError},
153 http::HttpListener,
154 https::HttpsListener,
155 pool::{Checkout, Pool},
156 protocol::{SessionState, http::editor::HttpContext},
157 retry::RetryPolicy,
158 server::push_event,
159 socket::{FrontRustls, SessionTcpStream, SocketHandler, SocketResult, stats::socket_rtt},
160};
161
162pub(crate) use crate::protocol::mux::answers::{
163 forcefully_terminate_answer, set_default_answer, set_default_answer_with_retry_after,
164};
165use crate::protocol::mux::connection::{EndpointClient, EndpointServer};
166pub use crate::protocol::mux::{
167 answers::terminate_default_answer,
168 connection::Connection,
169 debug::{DebugEvent, DebugHistory},
170 h1::ConnectionH1,
171 h2::ConnectionH2,
172 h2::H2ByteAccounting,
173 h2::H2ConnectionConfig,
174 h2::H2DrainState,
175 h2::H2FloodConfig,
176 h2::H2FlowControl,
177 parser::H2Error,
178 router::Router,
179 stream::{Stream, StreamParts, StreamState},
180};
181
182// ── Tuning Constants ─────────────────────────────────────────────────────────
183
184/// Maximum event loop iterations before forcefully closing a session.
185/// Prevents infinite loops from consuming the single-threaded worker.
186const MAX_LOOP_ITERATIONS: i32 = 10_000;
187// ─────────────────────────────────────────────────────────────────────────────
188
189/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
190type GenericHttpStream = kawa::Kawa<Checkout>;
191type StreamId = u32;
192type GlobalStreamId = usize;
193pub type MuxClear = Mux<SessionTcpStream, HttpListener>;
194pub type MuxTls = Mux<FrontRustls, HttpsListener>;
195
196pub enum Position {
197 Client(String, Rc<RefCell<Backend>>, BackendStatus),
198 Server,
199}
200
201impl Debug for Position {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 match self {
204 Self::Client(cluster_id, _, status) => f
205 .debug_tuple("Client")
206 .field(cluster_id)
207 .field(status)
208 .finish(),
209 Self::Server => write!(f, "Server"),
210 }
211 }
212}
213
214impl Position {
215 fn is_server(&self) -> bool {
216 match self {
217 Position::Client(..) => false,
218 Position::Server => true,
219 }
220 }
221 fn is_client(&self) -> bool {
222 !self.is_server()
223 }
224
225 /// Increment the global `count!()` counter for bytes read on this side.
226 pub fn count_bytes_in_counter(&self, size: usize) {
227 match self {
228 Position::Client(..) => count!(names::backend::BACK_BYTES_IN, size as i64),
229 Position::Server => count!(names::backend::BYTES_IN, size as i64),
230 }
231 }
232
233 /// Increment the global `count!()` counter for bytes written on this side.
234 pub fn count_bytes_out_counter(&self, size: usize) {
235 match self {
236 Position::Client(..) => count!(names::backend::BACK_BYTES_OUT, size as i64),
237 Position::Server => count!(names::backend::BYTES_OUT, size as i64),
238 }
239 }
240
241 /// Attribute `size` bytes read to the appropriate `SessionMetrics` field.
242 pub fn count_bytes_in(&self, metrics: &mut SessionMetrics, size: usize) {
243 match self {
244 Position::Client(..) => metrics.backend_bin += size,
245 Position::Server => metrics.bin += size,
246 }
247 }
248
249 /// Attribute `size` bytes written to the appropriate `SessionMetrics` field.
250 pub fn count_bytes_out(&self, metrics: &mut SessionMetrics, size: usize) {
251 match self {
252 Position::Client(..) => metrics.backend_bout += size,
253 Position::Server => metrics.bout += size,
254 }
255 }
256}
257
258#[derive(Debug)]
259pub enum BackendStatus {
260 Connecting(Instant),
261 Connected,
262 KeepAlive,
263 Disconnecting,
264}
265
266#[derive(Debug, Clone, Copy)]
267pub enum MuxResult {
268 Continue,
269 Upgrade,
270 CloseSession,
271}
272
273pub trait Endpoint: Debug {
274 fn readiness(&self, token: Token) -> &Readiness;
275 fn readiness_mut(&mut self, token: Token) -> &mut Readiness;
276 /// Returns the underlying TCP socket for the peer side of a stream.
277 ///
278 /// Used by access-log emission to capture TCP_INFO RTT for the side the
279 /// caller does NOT own directly: a frontend connection (Position::Server)
280 /// reads the backend socket through this method, and a backend connection
281 /// (Position::Client) reads the frontend socket the same way. `token` is
282 /// ignored by [`super::connection::EndpointServer`] (which has a single
283 /// frontend connection) and used as a key by
284 /// [`super::connection::EndpointClient`] (which keys backends by token).
285 /// Returns `None` when the token doesn't resolve, mirroring the existing
286 /// fallback paths in `readiness`/`readiness_mut`.
287 fn socket(&self, token: Token) -> Option<&TcpStream>;
288 /// If end_stream is called on a client it means the stream has PROPERLY finished,
289 /// the server has completed serving the response and informs the endpoint that this stream won't be used anymore.
290 /// If end_stream is called on a server it means the stream was BROKEN, the client was most likely disconnected or encountered an error
291 /// it is for the server to decide if the stream can be retried or an error should be sent. It should be GUARANTEED that all bytes from
292 /// the backend were read. However it is almost certain that all bytes were not already sent to the client.
293 fn end_stream<L: ListenerHandler + L7ListenerHandler>(
294 &mut self,
295 token: Token,
296 stream: GlobalStreamId,
297 context: &mut Context<L>,
298 );
299 /// If start_stream is called on a client it means the stream should be attached to this endpoint,
300 /// the stream might be recovering from a disconnection, in any case at this point its response MUST be empty.
301 /// If the start_stream is called on a H2 server it means the stream is a server push and its request MUST be empty.
302 /// Returns false if the stream could not be started (e.g. max concurrent streams reached).
303 fn start_stream<L: ListenerHandler + L7ListenerHandler>(
304 &mut self,
305 token: Token,
306 stream: GlobalStreamId,
307 context: &mut Context<L>,
308 ) -> bool;
309}
310
311/// Shared logic for half-close accounting: clear `bit` from `readiness.event` on
312/// socket errors/would-block, and return `true` (yield) when no bytes were
313/// transferred so the caller can park the half. Rationale for clearing only
314/// `bit` (not both halves) on `Closed`: the opposite half may still need one
315/// last pass to flush queued frames or the TLS close_notify.
316fn update_readiness(
317 size: usize,
318 status: SocketResult,
319 readiness: &mut Readiness,
320 bit: Ready,
321) -> bool {
322 trace!(
323 "{} size={}, status={:?}",
324 log_module_context!(),
325 size,
326 status
327 );
328 match status {
329 SocketResult::Continue => {}
330 SocketResult::Closed | SocketResult::Error | SocketResult::WouldBlock => {
331 readiness.event.remove(bit);
332 }
333 }
334 if size > 0 {
335 false
336 } else {
337 readiness.event.remove(bit);
338 true
339 }
340}
341
342fn update_readiness_after_read(
343 size: usize,
344 status: SocketResult,
345 readiness: &mut Readiness,
346) -> bool {
347 update_readiness(size, status, readiness, Ready::READABLE)
348}
349
350fn update_readiness_after_write(
351 size: usize,
352 status: SocketResult,
353 readiness: &mut Readiness,
354) -> bool {
355 update_readiness(size, status, readiness, Ready::WRITABLE)
356}
357pub struct Context<L: ListenerHandler + L7ListenerHandler> {
358 pub streams: Vec<Stream>,
359 /// Streams whose state is `StreamState::Link` and need backend connection.
360 /// Replaces the O(n) scan of `streams` in the ready loop.
361 pub pending_links: VecDeque<GlobalStreamId>,
362 /// Reverse index: backend token -> global stream IDs currently in
363 /// `StreamState::Linked(token)`. Eliminates O(n) scans of `streams`
364 /// when handling backend connect/disconnect/timeout/close events.
365 pub backend_streams: HashMap<Token, Vec<GlobalStreamId>>,
366 pub pool: Weak<RefCell<Pool>>,
367 pub listener: Rc<RefCell<L>>,
368 /// Connection/session ULID — mirrors `Mux.session_ulid`. Stored here so
369 /// per-stream `HttpContext` construction in [`Self::create_stream`] can
370 /// stamp the session slot of the log-context bracket without reaching
371 /// back into the parent [`Mux`].
372 pub session_ulid: Ulid,
373 pub session_address: Option<SocketAddr>,
374 pub public_address: SocketAddr,
375 pub debug: DebugHistory,
376 /// Shrink threshold ratio for recycled stream slots.
377 /// Vec is shrunk when total_slots > active_streams * ratio.
378 pub h2_stream_shrink_ratio: usize,
379 /// TLS SNI value negotiated at handshake, propagated to every
380 /// per-stream [`HttpContext`] so the routing layer can enforce
381 /// the SNI ↔ `:authority` binding on every H2 stream (and the
382 /// single H1 request). `None` for plaintext listeners or when
383 /// the client omitted the SNI extension. Stored pre-lowercased
384 /// and without a port for cheap exact-match comparison.
385 pub tls_server_name: Option<String>,
386 /// Snapshot of the SAN set of the certificate Sōzu actually served at
387 /// the TLS handshake. Captured once in `https.rs::upgrade_handshake`
388 /// from the resolver and frozen for the connection lifetime so H2
389 /// stream coalescing (RFC 7540 §9.1.1 / RFC 9113 §9.1.1) accepts any
390 /// `:authority` covered by the certificate, with RFC 6125 §6.4.3
391 /// wildcard handling. `None` for plaintext listeners or when SNI was
392 /// absent. `Some(empty)` when the default cert was served — every
393 /// `:authority` is rejected. `Arc` so the snapshot is shared across
394 /// every per-stream `HttpContext` without re-allocation.
395 pub tls_cert_names: Option<Arc<Vec<String>>>,
396 /// Whether the routing layer must reject any request whose authority
397 /// host does not exact-match `tls_server_name` (CWE-346 / CWE-444).
398 /// Mirrors `HttpsListenerConfig::strict_sni_binding`; captured once
399 /// at `Context::new` so routing decisions on each stream avoid a
400 /// per-stream `listener.borrow()`.
401 pub strict_sni_binding: bool,
402 /// Whether the request-side block walk must strip any client-supplied
403 /// `X-Real-IP` header before forwarding (anti-spoofing). Mirrors
404 /// `HttpListenerConfig::elide_x_real_ip` /
405 /// `HttpsListenerConfig::elide_x_real_ip`; captured once at
406 /// `Context::new` so per-stream `HttpContext`s do not need to call
407 /// `listener.borrow()` again. Independent of `send_x_real_ip`.
408 pub elide_x_real_ip: bool,
409 /// Whether `on_request_headers` injects a proxy-generated `X-Real-IP`
410 /// header carrying the connection peer IP (post-PROXY-v2 unwrap).
411 /// Mirrors `HttpListenerConfig::send_x_real_ip` /
412 /// `HttpsListenerConfig::send_x_real_ip`; captured once at
413 /// `Context::new`. Independent of `elide_x_real_ip`.
414 pub send_x_real_ip: bool,
415 /// Negotiated TLS protocol version short-form (e.g. `"TLSv1.3"`).
416 /// Captured once at handshake completion in `https.rs` and propagated
417 /// to every per-stream [`HttpContext`] so the access log can record it
418 /// without reaching back into the rustls session per request. `None`
419 /// for plaintext listeners.
420 pub tls_version: Option<&'static str>,
421 /// Negotiated TLS cipher suite short-form (e.g.
422 /// `"TLS_AES_128_GCM_SHA256"`). Captured once at handshake completion
423 /// and propagated to every per-stream [`HttpContext`]. `None` for
424 /// plaintext listeners.
425 pub tls_cipher: Option<&'static str>,
426 /// Negotiated ALPN protocol short-form (e.g. `"h2"`, `"http/1.1"`).
427 /// Captured once at handshake completion and propagated to every
428 /// per-stream [`HttpContext`]. `None` for plaintext listeners or when
429 /// no ALPN was negotiated.
430 pub tls_alpn: Option<&'static str>,
431}
432
433impl<L: ListenerHandler + L7ListenerHandler> Context<L> {
434 pub fn new(
435 session_ulid: Ulid,
436 pool: Weak<RefCell<Pool>>,
437 listener: Rc<RefCell<L>>,
438 session_address: Option<SocketAddr>,
439 public_address: SocketAddr,
440 ) -> Self {
441 let h2_stream_shrink_ratio = listener
442 .borrow()
443 .get_h2_connection_config()
444 .stream_shrink_ratio as usize;
445 let strict_sni_binding = listener.borrow().get_strict_sni_binding();
446 let elide_x_real_ip = listener.borrow().get_elide_x_real_ip();
447 let send_x_real_ip = listener.borrow().get_send_x_real_ip();
448 Self {
449 streams: Vec::new(),
450 pending_links: VecDeque::new(),
451 backend_streams: HashMap::new(),
452 pool,
453 listener,
454 session_ulid,
455 session_address,
456 public_address,
457 debug: DebugHistory::new(),
458 h2_stream_shrink_ratio,
459 tls_server_name: None,
460 tls_cert_names: None,
461 strict_sni_binding,
462 elide_x_real_ip,
463 send_x_real_ip,
464 tls_version: None,
465 tls_cipher: None,
466 tls_alpn: None,
467 }
468 }
469
470 pub fn active_len(&self) -> usize {
471 self.streams
472 .iter()
473 .filter(|s| !matches!(s.state, StreamState::Recycle))
474 .count()
475 }
476
477 /// Shared accessor for the [`HttpContext`] owned by a stream.
478 ///
479 /// Prefer this over `&self.streams[stream_id].context` at call sites
480 /// that only need read access — it keeps the `Stream`/`HttpContext`
481 /// relationship encapsulated and reads the same regardless of whether
482 /// the caller is inside `Router::connect`, the H2 mux, or a free
483 /// helper. Panics on an out-of-bounds `stream_id`, which is the same
484 /// behaviour as the raw `streams[sid]` indexing it replaces.
485 pub fn http_context(&self, stream_id: GlobalStreamId) -> &HttpContext {
486 &self.streams[stream_id].context
487 }
488
489 /// Mutable sibling of [`Self::http_context`]. Use when routing
490 /// decisions need to stamp `cluster_id` / `backend_id` on the stream's
491 /// [`HttpContext`] (e.g. `Router::connect` at the fill-cluster /
492 /// fill-backend points).
493 pub fn http_context_mut(&mut self, stream_id: GlobalStreamId) -> &mut HttpContext {
494 &mut self.streams[stream_id].context
495 }
496
497 /// Register a stream as linked to a backend token in the reverse index.
498 pub fn link_stream(&mut self, stream_id: GlobalStreamId, token: Token) {
499 self.streams[stream_id].state = StreamState::Linked(token);
500 self.backend_streams
501 .entry(token)
502 .or_default()
503 .push(stream_id);
504 }
505
506 /// Remove a stream from the backend reverse index if it is currently
507 /// `Linked`. Returns the backend token if one was removed.
508 pub fn unlink_stream(&mut self, stream_id: GlobalStreamId) -> Option<Token> {
509 if let StreamState::Linked(token) = self.streams[stream_id].state {
510 remove_backend_stream(&mut self.backend_streams, token, stream_id);
511 Some(token)
512 } else {
513 None
514 }
515 }
516
517 pub fn create_stream(&mut self, request_id: Ulid, window: u32) -> Option<GlobalStreamId> {
518 let http_context = {
519 let listener = self.listener.borrow();
520 let mut http_context = HttpContext::new(
521 self.session_ulid,
522 request_id,
523 listener.protocol(),
524 self.public_address,
525 self.session_address,
526 listener.get_sticky_name().to_string(),
527 listener.get_sozu_id_header().to_string(),
528 self.elide_x_real_ip,
529 self.send_x_real_ip,
530 );
531 // Propagate the connection-scoped TLS SNI onto every per-stream
532 // HttpContext so `route_from_request` can enforce the SNI ↔
533 // `:authority` binding for each H2 stream independently.
534 http_context.tls_server_name = self.tls_server_name.clone();
535 // Mirror the frozen-at-handshake SAN snapshot. `Arc` clone is a
536 // refcount bump, not a deep copy — every per-stream
537 // `HttpContext` shares the same `Vec<String>`.
538 http_context.tls_cert_names = self.tls_cert_names.clone();
539 // Mirror the listener's strict_sni_binding flag onto each
540 // HttpContext so the routing layer can honor operator opt-outs
541 // without reaching back into the listener on every request.
542 http_context.strict_sni_binding = self.strict_sni_binding;
543 // Propagate the connection-scoped TLS metadata onto every
544 // per-stream HttpContext so the access log can record it without
545 // touching the rustls session on every request. These are
546 // `&'static str` borrows from the rustls label tables — copy is
547 // a pointer move.
548 http_context.tls_version = self.tls_version;
549 http_context.tls_cipher = self.tls_cipher;
550 http_context.tls_alpn = self.tls_alpn;
551 http_context
552 };
553 let recycle_slot = self
554 .streams
555 .iter()
556 .position(|s| s.state == StreamState::Recycle);
557 if let Some(stream_id) = recycle_slot {
558 let stream = &mut self.streams[stream_id];
559 trace!("{} Reuse stream: {}", log_module_context!(), stream_id);
560 stream.state = StreamState::Idle;
561 stream.attempts = 0;
562 stream.front_received_end_of_stream = false;
563 stream.back_received_end_of_stream = false;
564 stream.front_data_received = 0;
565 stream.back_data_received = 0;
566 stream.request_counted = false;
567 stream.window = i32::try_from(window).unwrap_or(i32::MAX);
568 stream.context = http_context;
569 stream.back.clear();
570 stream.back.storage.clear();
571 stream.front.clear();
572 stream.front.storage.clear();
573 stream.metrics.reset();
574 stream.metrics.start = Some(Instant::now());
575 // After recycling a slot, check if the Vec has excessive trailing
576 // Recycle entries (more than 2x active streams of total capacity).
577 let active = self.active_len();
578 let total = self.streams.len();
579 if total > 1 && active > 0 && total > active * self.h2_stream_shrink_ratio {
580 self.shrink_trailing_recycle();
581 }
582 return Some(stream_id);
583 }
584 self.streams
585 .push(Stream::new(self.pool.clone(), http_context, window)?);
586 Some(self.streams.len() - 1)
587 }
588
589 /// Remove consecutive `Recycle` entries from the end of the streams Vec.
590 ///
591 /// This prevents unbounded growth when H2 streams are created and recycled
592 /// over time, reclaiming memory from slots that are no longer needed.
593 pub fn shrink_trailing_recycle(&mut self) {
594 while self
595 .streams
596 .last()
597 .is_some_and(|s| s.state == StreamState::Recycle)
598 {
599 self.streams.pop();
600 }
601 }
602}
603
604/// Remove `stream_id` from the backend-token reverse index for `token`.
605/// Free function to allow split borrows when `context.streams` is already
606/// mutably borrowed (preventing a `Context::unlink_stream` call).
607pub(super) fn remove_backend_stream(
608 index: &mut HashMap<Token, Vec<GlobalStreamId>>,
609 token: Token,
610 stream_id: GlobalStreamId,
611) {
612 if let Some(ids) = index.get_mut(&token) {
613 ids.retain(|&id| id != stream_id);
614 if ids.is_empty() {
615 index.remove(&token);
616 }
617 }
618}
619
620pub struct Mux<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
621 pub configured_frontend_timeout: Duration,
622 pub frontend_token: Token,
623 pub frontend: Connection<Front>,
624 pub router: Router,
625 pub context: Context<L>,
626 /// Per-session correlation ID generated at construction time. Included in
627 /// every log line emitted from this module so all events for a single
628 /// frontend connection can be reassembled (independent of the ephemeral
629 /// per-stream request id used by access logs).
630 pub session_ulid: Ulid,
631}
632
633impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Mux<Front, L> {
634 pub fn front_socket(&self) -> &TcpStream {
635 self.frontend.socket()
636 }
637}
638
639impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHandler> Mux<Front, L> {
640 fn delay_close_for_frontend_flush(&mut self, reason: &'static str) -> bool {
641 let _ = self.frontend.initiate_close_notify();
642 // LIFECYCLE §9 invariant 16: consult per-stream back-buffers in
643 // addition to the connection-level pending-write predicate so
644 // shutdown does not close while any open H2 stream still has
645 // kawa bytes queued after a voluntary scheduler yield.
646 if self
647 .frontend
648 .has_pending_write_including_streams(&self.context)
649 {
650 let readiness = self.frontend.readiness_mut();
651 readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
652 readiness.signal_pending_write();
653 debug!(
654 "{} Mux delaying close on {}: {:?}",
655 log_context!(self),
656 reason,
657 self.frontend
658 );
659 true
660 } else {
661 false
662 }
663 }
664
665 /// Drive the frontend I/O path during shutdown, when the server is polling
666 /// `shutting_down()` outside the normal epoll readiness loop.
667 ///
668 /// This is required for H2 graceful shutdown because a stream may still
669 /// need one last readable pass to observe the peer's END_STREAM or one last
670 /// writable pass to retire the stream, emit GOAWAY, or flush TLS records.
671 fn drive_frontend_shutdown_io(&mut self) -> SessionIsToBeClosed {
672 let force_h2_read = matches!(self.frontend, Connection::H2(_));
673 let force_h2_write = matches!(self.frontend, Connection::H2(_));
674 let readiness = self.frontend.readiness().clone();
675 if !force_h2_read
676 && !force_h2_write
677 && readiness.event.is_empty()
678 && !self.frontend.has_pending_write()
679 {
680 return false;
681 }
682
683 if force_h2_read || self.frontend.readiness().event.is_readable() {
684 self.frontend
685 .readiness_mut()
686 .interest
687 .insert(Ready::READABLE);
688 match self
689 .frontend
690 .readable(&mut self.context, EndpointClient(&mut self.router))
691 {
692 MuxResult::Continue => {}
693 MuxResult::CloseSession | MuxResult::Upgrade => return true,
694 }
695 }
696
697 if !force_h2_write
698 && !self.frontend.has_pending_write()
699 && !self.frontend.readiness().event.is_writable()
700 {
701 return false;
702 }
703
704 let mut iterations = 0;
705 loop {
706 self.frontend
707 .readiness_mut()
708 .interest
709 .insert(Ready::WRITABLE);
710 if force_h2_write {
711 self.frontend.readiness_mut().signal_pending_write();
712 }
713 match self
714 .frontend
715 .writable(&mut self.context, EndpointClient(&mut self.router))
716 {
717 MuxResult::Continue => {}
718 MuxResult::CloseSession | MuxResult::Upgrade => return true,
719 }
720
721 iterations += 1;
722 if iterations >= MAX_LOOP_ITERATIONS
723 || (!self.frontend.has_pending_write()
724 && !self.frontend.readiness().event.is_writable())
725 {
726 break;
727 }
728 }
729 false
730 }
731}
732
733impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHandler> SessionState
734 for Mux<Front, L>
735{
736 fn ready(
737 &mut self,
738 session: Rc<RefCell<dyn ProxySession>>,
739 proxy: Rc<RefCell<dyn L7Proxy>>,
740 _metrics: &mut SessionMetrics,
741 ) -> SessionResult {
742 let mut counter = 0;
743
744 if self.frontend.readiness().event.is_hup()
745 && !self.delay_close_for_frontend_flush("frontend HUP")
746 {
747 debug!(
748 "{} Mux closing on frontend HUP: {:?}",
749 log_context!(self),
750 self.frontend
751 );
752 return SessionResult::Close;
753 }
754
755 // Start service timers on all active streams after the HUP check.
756 // This mirrors session-level service_start/service_stop in Http(s)Session::ready()
757 // to measure only CPU processing time, excluding epoll wait between cycles.
758 for stream in &mut self.context.streams {
759 if stream.state.is_open() {
760 stream.metrics.service_start();
761 }
762 }
763
764 let start = Instant::now();
765 self.context.debug.push(DebugEvent::ReadyTimestamp(
766 std::time::SystemTime::now()
767 .duration_since(std::time::UNIX_EPOCH)
768 .unwrap_or_default()
769 .as_millis() as usize,
770 ));
771 trace!("{} {:?}", log_context!(self), start);
772 loop {
773 self.context.debug.push(DebugEvent::LoopStart);
774 loop {
775 self.context.debug.push(DebugEvent::LoopIteration(counter));
776 if self.frontend.readiness().filter_interest().is_readable() {
777 let res = {
778 let context = &mut self.context;
779 let res = self
780 .frontend
781 .readable(context, EndpointClient(&mut self.router));
782 context.debug.push(DebugEvent::SR(
783 self.frontend_token,
784 res,
785 self.frontend.readiness().clone(),
786 ));
787 res
788 };
789 match res {
790 MuxResult::Continue => {}
791 MuxResult::CloseSession => {
792 if !self.delay_close_for_frontend_flush("frontend readable") {
793 debug!(
794 "{} Mux close from frontend readable: {:?}",
795 log_context!(self),
796 self.frontend
797 );
798 return SessionResult::Close;
799 }
800 }
801 MuxResult::Upgrade => return SessionResult::Upgrade,
802 }
803 }
804
805 let mut all_backends_readiness_are_empty = true;
806 let mut dead_backends = Vec::new();
807 let mut backend_close: Option<(&'static str, Token)> = None;
808 for (token, client) in self.router.backends.iter_mut() {
809 let readiness = client.readiness_mut();
810 // Check the raw event for HUP/ERROR — not filter_interest(),
811 // because interest only contains READABLE|WRITABLE and would
812 // always mask out HUP (0b01000) and ERROR (0b00100).
813 let dead = readiness.event.is_hup() || readiness.event.is_error();
814 if dead {
815 trace!(
816 "{} Backend({:?}) -> {:?}",
817 log_context_lite!(self),
818 token,
819 readiness
820 );
821 readiness.event.remove(Ready::WRITABLE);
822 }
823
824 if client.readiness().filter_interest().is_writable() {
825 let position = client.position_mut();
826 match position {
827 Position::Client(
828 cluster_id,
829 backend,
830 BackendStatus::Connecting(start),
831 ) => {
832 #[cfg(debug_assertions)]
833 self.context
834 .debug
835 .push(DebugEvent::CCS(*token, cluster_id.clone()));
836
837 let mut backend_borrow = backend.borrow_mut();
838 if backend_borrow.retry_policy.is_down() {
839 info!(
840 "{} backend server {} at {} is up",
841 log_context_lite!(self),
842 backend_borrow.backend_id,
843 backend_borrow.address
844 );
845 incr!(
846 "backend.up",
847 Some(cluster_id),
848 Some(&backend_borrow.backend_id)
849 );
850 gauge!(
851 names::backend::AVAILABLE,
852 1,
853 Some(cluster_id),
854 Some(&backend_borrow.backend_id)
855 );
856 push_event(Event {
857 kind: EventKind::BackendUp as i32,
858 backend_id: Some(backend_borrow.backend_id.to_owned()),
859 address: Some(backend_borrow.address.into()),
860 cluster_id: Some(cluster_id.to_owned()),
861 metric_detail: None,
862 });
863 }
864
865 //successful connection, reset failure counter
866 backend_borrow.failures = 0;
867 backend_borrow.set_connection_time(start.elapsed());
868 backend_borrow.retry_policy.succeed();
869
870 if let Some(ids) = self.context.backend_streams.get(token) {
871 for &stream_id in ids {
872 self.context.streams[stream_id].metrics.backend_connected();
873 backend_borrow.active_requests += 1;
874 }
875 }
876 trace!(
877 "{} connection success: {:#?}",
878 log_context_lite!(self),
879 backend_borrow
880 );
881 drop(backend_borrow);
882 *position = Position::Client(
883 std::mem::take(cluster_id),
884 backend.clone(),
885 BackendStatus::Connected,
886 );
887 client
888 .timeout_container()
889 .set_duration(self.router.configured_backend_timeout);
890 }
891 Position::Client(..) => {}
892 Position::Server => {
893 error!(
894 "{} backend connection cannot be in Server position",
895 log_context_lite!(self)
896 );
897 }
898 }
899 let res = {
900 let context = &mut self.context;
901 let res = client.writable(context, EndpointServer(&mut self.frontend));
902 context.debug.push(DebugEvent::CW(
903 *token,
904 res,
905 client.readiness().clone(),
906 ));
907 res
908 };
909 match res {
910 MuxResult::Continue => {}
911 MuxResult::Upgrade => {
912 error!(
913 "{} only frontend connections can trigger Upgrade",
914 log_context_lite!(self)
915 );
916 }
917 MuxResult::CloseSession => {
918 backend_close = Some(("backend writable", *token));
919 break;
920 }
921 }
922 // Cross-readiness: backend wrote → wake frontend reader
923 let context = &mut self.context;
924 self.frontend.try_resume_reading(context);
925 }
926
927 if client.readiness().filter_interest().is_readable() {
928 let res = {
929 let context = &mut self.context;
930 let res = client.readable(context, EndpointServer(&mut self.frontend));
931 context.debug.push(DebugEvent::CR(
932 *token,
933 res,
934 client.readiness().clone(),
935 ));
936 res
937 };
938 match res {
939 MuxResult::Continue => {}
940 MuxResult::Upgrade => {
941 error!(
942 "{} only frontend connections can trigger Upgrade (readable)",
943 log_context_lite!(self)
944 );
945 }
946 MuxResult::CloseSession => {
947 backend_close = Some(("backend readable", *token));
948 break;
949 }
950 }
951 }
952
953 if dead
954 && !client.readiness().filter_interest().is_readable()
955 && !client.has_buffer_pressure(&self.context)
956 {
957 self.context
958 .debug
959 .push(DebugEvent::CH(*token, client.readiness().clone()));
960 trace!("{} Closing {:#?}", log_context_lite!(self), client);
961 match client.position() {
962 Position::Client(cluster_id, backend, BackendStatus::Connecting(_)) => {
963 let mut backend_borrow = backend.borrow_mut();
964 backend_borrow.failures += 1;
965
966 let already_unavailable = backend_borrow.retry_policy.is_down();
967 backend_borrow.retry_policy.fail();
968 incr!(
969 "backend.connections.error",
970 Some(cluster_id),
971 Some(&backend_borrow.backend_id)
972 );
973 if !already_unavailable && backend_borrow.retry_policy.is_down() {
974 error!(
975 "{} backend server {} at {} is down",
976 log_context_lite!(self),
977 backend_borrow.backend_id,
978 backend_borrow.address
979 );
980 incr!(
981 "backend.down",
982 Some(cluster_id),
983 Some(&backend_borrow.backend_id)
984 );
985 gauge!(
986 names::backend::AVAILABLE,
987 0,
988 Some(cluster_id),
989 Some(&backend_borrow.backend_id)
990 );
991 push_event(Event {
992 kind: EventKind::BackendDown as i32,
993 backend_id: Some(backend_borrow.backend_id.to_owned()),
994 address: Some(backend_borrow.address.into()),
995 cluster_id: Some(cluster_id.to_owned()),
996 metric_detail: None,
997 });
998 }
999 trace!(
1000 "{} connection fail: {:#?}",
1001 log_context_lite!(self),
1002 backend_borrow
1003 );
1004 }
1005 Position::Client(_, backend, _) => {
1006 let mut backend_borrow = backend.borrow_mut();
1007 let count = self
1008 .context
1009 .backend_streams
1010 .get(token)
1011 .map_or(0, |ids| ids.len());
1012 backend_borrow.active_requests =
1013 backend_borrow.active_requests.saturating_sub(count);
1014 }
1015 Position::Server => {
1016 error!(
1017 "{} dead backend cannot be in Server position",
1018 log_context_lite!(self)
1019 );
1020 }
1021 }
1022 client.close(&mut self.context, EndpointServer(&mut self.frontend));
1023 dead_backends.push(*token);
1024 }
1025
1026 if !client.readiness().filter_interest().is_empty() {
1027 all_backends_readiness_are_empty = false;
1028 }
1029 }
1030 // Remove dead backends from the map BEFORE handling
1031 // backend_close. client.close() already decremented
1032 // connections_per_backend / backend.connections gauges in
1033 // the loop above; if we return SessionResult::Close before
1034 // removing them, Mux::close() would decrement again
1035 // (double-decrement → gauge underflow).
1036 if !dead_backends.is_empty() {
1037 for token in &dead_backends {
1038 let proxy_borrow = proxy.borrow();
1039 if let Some(mut client) = self.router.backends.remove(token) {
1040 client.timeout_container().cancel();
1041 let socket = client.socket_mut();
1042 if let Err(e) = proxy_borrow.deregister_socket(socket) {
1043 error!(
1044 "{} error deregistering back socket({:?}): {:?}",
1045 log_context!(self),
1046 socket,
1047 e
1048 );
1049 }
1050 // invariant: write-only shutdown — Shutdown::Both on a TLS frontend
1051 // discards the receive buffer and elicits TCP RST, truncating the
1052 // already-queued response. Canonical write-up: `lib/src/https.rs:650-655`.
1053 // Backend sockets follow the same discipline for symmetry.
1054 if let Err(e) = socket.shutdown(Shutdown::Write) {
1055 if e.kind() != ErrorKind::NotConnected {
1056 error!(
1057 "{} error shutting down back socket({:?}): {:?}",
1058 log_context!(self),
1059 socket,
1060 e
1061 );
1062 }
1063 }
1064 } else {
1065 error!("{} session {:?} has no backend!", log_context!(self), token);
1066 }
1067 if !proxy_borrow.remove_session(*token) {
1068 error!(
1069 "{} session {:?} was already removed!",
1070 log_context!(self),
1071 token
1072 );
1073 }
1074 }
1075 trace!("{} FRONTEND: {:#?}", log_context!(self), self.frontend);
1076 trace!(
1077 "{} BACKENDS: {:#?}",
1078 log_context!(self),
1079 self.router.backends
1080 );
1081 }
1082 if let Some((reason, token)) = backend_close {
1083 if !self.delay_close_for_frontend_flush(reason) {
1084 debug!(
1085 "{} Mux close from {} token={:?}: frontend={:?}",
1086 log_context!(self),
1087 reason,
1088 token,
1089 self.frontend
1090 );
1091 return SessionResult::Close;
1092 }
1093 all_backends_readiness_are_empty = false;
1094 }
1095
1096 if self.frontend.readiness().filter_interest().is_writable() {
1097 let res = {
1098 let context = &mut self.context;
1099 let res = self
1100 .frontend
1101 .writable(context, EndpointClient(&mut self.router));
1102 context.debug.push(DebugEvent::SW(
1103 self.frontend_token,
1104 res,
1105 self.frontend.readiness().clone(),
1106 ));
1107 res
1108 };
1109 match res {
1110 MuxResult::Continue => {}
1111 MuxResult::CloseSession => {
1112 if !self.delay_close_for_frontend_flush("frontend writable") {
1113 debug!(
1114 "{} Mux close from frontend writable: {:?}",
1115 log_context!(self),
1116 self.frontend
1117 );
1118 return SessionResult::Close;
1119 }
1120 }
1121 MuxResult::Upgrade => return SessionResult::Upgrade,
1122 }
1123 // Cross-readiness: frontend wrote → wake parked backends.
1124 // If any backend resumes, invalidate the stale readiness
1125 // flag so the inner loop continues instead of breaking.
1126 let context = &mut self.context;
1127 for (_token, backend) in self.router.backends.iter_mut() {
1128 if backend.try_resume_reading(context) {
1129 all_backends_readiness_are_empty = false;
1130 }
1131 }
1132 }
1133
1134 if self.frontend.readiness().filter_interest().is_empty()
1135 && all_backends_readiness_are_empty
1136 {
1137 break;
1138 }
1139
1140 counter += 1;
1141 if counter >= MAX_LOOP_ITERATIONS {
1142 incr!(names::http::INFINITE_LOOP_ERROR);
1143 if self.frontend.has_pending_write() {
1144 debug!(
1145 "{} Mux loop budget exhausted while frontend flush pending: {:?}",
1146 log_context!(self),
1147 self.frontend
1148 );
1149 self.frontend.readiness_mut().event.remove(Ready::WRITABLE);
1150 self.frontend.timeout_container().set(self.frontend_token);
1151 break;
1152 }
1153 return SessionResult::Close;
1154 }
1155 }
1156
1157 let context = &mut self.context;
1158 let answers_rc = context.listener.borrow().get_answers().clone();
1159 let mut dirty = false;
1160 while let Some(stream_id) = context.pending_links.pop_front() {
1161 let Some(stream) = context.streams.get(stream_id) else {
1162 continue;
1163 };
1164 if stream.state != StreamState::Link {
1165 continue;
1166 }
1167 // Before the first request triggers a stream Link, the frontend timeout is set
1168 // to a shorter request_timeout, here we switch to the longer nominal timeout
1169 self.frontend
1170 .timeout_container()
1171 .set_duration(self.configured_frontend_timeout);
1172 let front_readiness = self.frontend.readiness_mut();
1173 dirty = true;
1174 match self.router.connect(
1175 stream_id,
1176 context,
1177 session.clone(),
1178 proxy.clone(),
1179 self.frontend_token,
1180 ) {
1181 Ok(_) => {
1182 let state = context.streams[stream_id].state;
1183 context.debug.push(DebugEvent::CC(stream_id, state));
1184 }
1185 Err(error) => {
1186 trace!("{} Connection error: {}", log_module_context!(), error);
1187 let stream = &mut context.streams[stream_id];
1188 let answers = answers_rc.borrow();
1189 use BackendConnectionError as BE;
1190 match error {
1191 BE::MaxConnectionRetries(_)
1192 | BE::MaxSessionsMemory
1193 | BE::MaxBuffers => {
1194 warn!(
1195 "{} backend retry budget exhausted: {}",
1196 log_module_context!(stream.context),
1197 error
1198 );
1199 set_default_answer(stream, front_readiness, 503, &answers);
1200 }
1201 BE::Backend(BackendError::NoBackendForCluster(_)) => {
1202 set_default_answer(stream, front_readiness, 503, &answers);
1203 }
1204 BE::RetrieveClusterError(RetrieveClusterError::RetrieveFrontend(
1205 ref err,
1206 )) => {
1207 // RFC 9110 §15.5.1: a malformed authority is a
1208 // 400. A syntactically valid authority that
1209 // simply has no matching frontend stays on the
1210 // historical 404 path.
1211 let code = match err {
1212 FrontendFromRequestError::HostParse { .. }
1213 | FrontendFromRequestError::InvalidCharsAfterHost(_) => 400,
1214 FrontendFromRequestError::NoClusterFound(_) => 404,
1215 };
1216 set_default_answer(stream, front_readiness, code, &answers);
1217 }
1218 BE::RetrieveClusterError(RetrieveClusterError::UnauthorizedRoute) => {
1219 set_default_answer(stream, front_readiness, 401, &answers);
1220 }
1221 BE::RetrieveClusterError(
1222 RetrieveClusterError::SniAuthorityMismatch { .. },
1223 ) => {
1224 // RFC 9110 §15.5.20: 421 Misdirected Request is the
1225 // semantically correct status for an authority that
1226 // does not belong to this TLS connection. The
1227 // http.sni_authority_mismatch metric emitted in
1228 // `route_from_request` remains the durable signal;
1229 // the 421 body here is what a client sees and may
1230 // retry on a fresh TLS connection with a matching SNI.
1231 set_default_answer(stream, front_readiness, 421, &answers);
1232 }
1233 BE::RetrieveClusterError(RetrieveClusterError::HttpsRedirect) => {
1234 // Use the redirect status stashed by `Router::route_from_request`
1235 // (#1009). Falls back to 301 for the legacy
1236 // `cluster.https_redirect = true` path that does
1237 // not set the field.
1238 let code = stream.context.redirect_status.unwrap_or(301);
1239 set_default_answer(stream, front_readiness, code, &answers);
1240 }
1241
1242 BE::Backend(ref e) => {
1243 error!("{} backend connection error: {}", log_module_context!(), e);
1244 set_default_answer(stream, front_readiness, 503, &answers);
1245 }
1246 BE::RetrieveClusterError(ref other) => {
1247 error!(
1248 "{} unexpected RetrieveClusterError variant: {:?}",
1249 log_module_context!(),
1250 other
1251 );
1252 set_default_answer(stream, front_readiness, 503, &answers);
1253 }
1254 // TCP specific error
1255 BE::NotFound(ref msg) => {
1256 error!(
1257 "{} NotFound is TCP-specific, not reachable in mux: {:?}",
1258 log_module_context!(),
1259 msg
1260 );
1261 set_default_answer(stream, front_readiness, 503, &answers);
1262 }
1263 // Per-(cluster, source-IP) connection limit reached.
1264 // Emit HTTP 429 with the resolved `Retry-After`. The
1265 // value is computed in `Router::connect` (where the
1266 // SessionManager + cluster override are reachable)
1267 // and stashed on the stream context just before the
1268 // error is returned, so the answer engine can render
1269 // (or elide) the header without re-deriving the
1270 // resolution chain here.
1271 BE::TooManyConnectionsPerIp { ref cluster_id } => {
1272 debug!(
1273 "{} per-(cluster, source-IP) limit hit for cluster {:?}",
1274 log_module_context!(),
1275 cluster_id
1276 );
1277 let retry_after = stream.context.retry_after_seconds;
1278 set_default_answer_with_retry_after(
1279 stream,
1280 front_readiness,
1281 429,
1282 &answers,
1283 retry_after,
1284 );
1285 }
1286 }
1287 context.debug.push(DebugEvent::CCF(stream_id, error));
1288 }
1289 }
1290 // All routing error arms now set a default answer, transitioning
1291 // the stream out of Link state. No re-enqueue needed.
1292 }
1293 if !dirty {
1294 break;
1295 }
1296 }
1297
1298 // Stop service timers before yielding to epoll, so idle wait time is excluded
1299 // from the service_time metric. For Close/Upgrade returns, close() handles cleanup.
1300 for stream in &mut self.context.streams {
1301 if stream.state.is_open() {
1302 stream.metrics.service_stop();
1303 }
1304 }
1305
1306 #[cfg(debug_assertions)]
1307 {
1308 // Verify backend_streams index matches actual stream states.
1309 let mut expected: HashMap<Token, Vec<GlobalStreamId>> = HashMap::new();
1310 for (id, stream) in self.context.streams.iter().enumerate() {
1311 if let StreamState::Linked(token) = stream.state {
1312 expected.entry(token).or_default().push(id);
1313 }
1314 }
1315 assert_eq!(
1316 expected.len(),
1317 self.context.backend_streams.len(),
1318 "backend_streams index key count mismatch: expected={:?}, actual={:?}",
1319 expected,
1320 self.context.backend_streams
1321 );
1322 for (token, mut expected_ids) in expected {
1323 let mut actual_ids = self
1324 .context
1325 .backend_streams
1326 .get(&token)
1327 .cloned()
1328 .unwrap_or_default();
1329 expected_ids.sort();
1330 actual_ids.sort();
1331 assert_eq!(
1332 expected_ids, actual_ids,
1333 "backend_streams index mismatch for token {token:?}",
1334 );
1335 }
1336 }
1337
1338 SessionResult::Continue
1339 }
1340
1341 fn update_readiness(&mut self, token: Token, events: Ready) {
1342 trace!("{} EVENTS: {:?} on {:?}", log_context!(self), events, token);
1343 self.context.debug.push(DebugEvent::EV(token, events));
1344 if token == self.frontend_token {
1345 self.frontend.readiness_mut().event |= events;
1346 } else if let Some(c) = self.router.backends.get_mut(&token) {
1347 c.readiness_mut().event |= events;
1348 }
1349 }
1350
1351 fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
1352 trace!("{} MuxState::timeout({:?})", log_context!(self), token);
1353 let front_is_h2 = match self.frontend {
1354 Connection::H1(_) => false,
1355 Connection::H2(_) => true,
1356 };
1357 let answers_rc = self.context.listener.borrow().get_answers().clone();
1358 let mut should_close = true;
1359 let mut should_write = false;
1360 if self.frontend_token == token {
1361 trace!(
1362 "{} MuxState::timeout_frontend({:#?})",
1363 log_context!(self),
1364 self.frontend
1365 );
1366 self.frontend.timeout_container().triggered();
1367 let front_readiness = self.frontend.readiness_mut();
1368 for stream_id in 0..self.context.streams.len() {
1369 match self.context.streams[stream_id].state {
1370 StreamState::Idle => {
1371 // In h1 an Idle stream is always the first request, so we can send a 408
1372 // In h2 an Idle stream doesn't necessarily hold a request yet,
1373 // in most cases it was just reserved, so we can just ignore them.
1374 if !front_is_h2 {
1375 let answers = answers_rc.borrow();
1376 let stream = &mut self.context.streams[stream_id];
1377 stream.context.access_log_message = Some("client_timeout");
1378 set_default_answer(stream, front_readiness, 408, &answers);
1379 should_write = true;
1380 }
1381 }
1382 StreamState::Link => {
1383 // This is an unusual case, as we have both a complete request and no
1384 // available backend yet. For now, we answer with 503.
1385 // Not a timeout-driven outcome from the operator's
1386 // perspective — leave access_log_message as None.
1387 let answers = answers_rc.borrow();
1388 let stream = &mut self.context.streams[stream_id];
1389 set_default_answer(stream, front_readiness, 503, &answers);
1390 should_write = true;
1391 }
1392 StreamState::Linked(_) => {
1393 // The frontend timed out while a stream is linked to a backend.
1394 // The backend timeout should handle this, but in case the backend
1395 // is also stalled, send a 504 and terminate the stream.
1396 if !self.context.streams[stream_id].back.consumed {
1397 self.context.unlink_stream(stream_id);
1398 let answers = answers_rc.borrow();
1399 let stream = &mut self.context.streams[stream_id];
1400 stream.context.access_log_message =
1401 Some("client_timeout_during_response");
1402 set_default_answer(stream, front_readiness, 504, &answers);
1403 should_write = true;
1404 } else if self.context.streams[stream_id].back.is_completed() {
1405 // Response fully proxied, stream can be closed
1406 } else if self.context.streams[stream_id].back.is_terminated()
1407 || self.context.streams[stream_id].back.is_error()
1408 {
1409 // Response is terminated/error but not fully written to frontend.
1410 // Keep the session alive briefly to flush remaining data.
1411 should_close = false;
1412 } else {
1413 // Partial response in progress — forcefully terminate
1414 self.context.unlink_stream(stream_id);
1415 let stream = &mut self.context.streams[stream_id];
1416 stream.context.access_log_message =
1417 Some("client_timeout_during_response");
1418 forcefully_terminate_answer(
1419 stream,
1420 front_readiness,
1421 H2Error::InternalError,
1422 );
1423 should_write = true;
1424 }
1425 // end_stream is called in a second pass below to avoid
1426 // borrow conflicts on context.streams.
1427 }
1428 StreamState::Unlinked => {
1429 // A stream Unlinked already has a response and its backend closed.
1430 // In case it hasn't finished proxying we wait. Otherwise it is a stream
1431 // kept alive for a new request, which can be killed.
1432 if !self.context.streams[stream_id].back.is_completed() {
1433 should_close = false;
1434 }
1435 }
1436 StreamState::Recycle => {
1437 // A recycled stream is an h2 stream which doesn't hold a request anymore.
1438 // We can ignore it.
1439 }
1440 }
1441 }
1442 // Second pass: end streams that were linked to backends.
1443 // This is done separately to avoid borrow conflicts on context.streams.
1444 let linked_streams: Vec<(GlobalStreamId, Token)> = self
1445 .context
1446 .streams
1447 .iter()
1448 .enumerate()
1449 .filter_map(|(id, stream)| {
1450 if let StreamState::Linked(back_token) = stream.state {
1451 Some((id, back_token))
1452 } else {
1453 None
1454 }
1455 })
1456 .collect();
1457 for (stream_id, back_token) in linked_streams {
1458 if let Some(backend) = self.router.backends.get_mut(&back_token) {
1459 backend.end_stream(stream_id, &mut self.context);
1460 }
1461 }
1462 } else if let Some(backend) = self.router.backends.get_mut(&token) {
1463 trace!(
1464 "{} MuxState::timeout_backend({:#?})",
1465 log_context_lite!(self),
1466 backend
1467 );
1468 backend.timeout_container().triggered();
1469 let front_readiness = self.frontend.readiness_mut();
1470 let linked_ids: Vec<GlobalStreamId> = self
1471 .context
1472 .backend_streams
1473 .get(&token)
1474 .map_or_else(Vec::new, |ids| ids.to_owned());
1475 for stream_id in linked_ids {
1476 // This stream is linked to the backend that timedout
1477 if self.context.streams[stream_id].back.is_terminated()
1478 || self.context.streams[stream_id].back.is_error()
1479 {
1480 trace!(
1481 "{} Stream terminated or in error, do nothing, just wait a bit more",
1482 log_module_context!()
1483 );
1484 // Nothing to do, simply wait for the remaining bytes to be proxied
1485 if !self.context.streams[stream_id].back.is_completed() {
1486 should_close = false;
1487 }
1488 } else if !self.context.streams[stream_id].back.consumed {
1489 // The response has not started yet
1490 trace!(
1491 "{} Stream still waiting for response, send 504",
1492 log_module_context!()
1493 );
1494 self.context.unlink_stream(stream_id);
1495 let answers = answers_rc.borrow();
1496 let stream = &mut self.context.streams[stream_id];
1497 stream.context.access_log_message = Some("backend_timeout");
1498 set_default_answer(stream, front_readiness, 504, &answers);
1499 should_write = true;
1500 } else {
1501 trace!(
1502 "{} Stream waiting for end of response, forcefully terminate it",
1503 log_module_context!()
1504 );
1505 self.context.unlink_stream(stream_id);
1506 let stream = &mut self.context.streams[stream_id];
1507 stream.context.access_log_message = Some("backend_response_timeout");
1508 forcefully_terminate_answer(stream, front_readiness, H2Error::InternalError);
1509 should_write = true;
1510 }
1511 backend.end_stream(stream_id, &mut self.context);
1512 }
1513 // Re-arm the backend timeout if the session stays alive (draining streams).
1514 // Without this, the timeout is consumed and the session becomes immortal
1515 // until the zombie checker runs.
1516 if !should_close {
1517 backend.timeout_container().set(token);
1518 }
1519 } else {
1520 // Session received a timeout for an unknown token, ignore it
1521 return StateResult::Continue;
1522 }
1523 if should_write {
1524 // Drain as much pending data as possible before closing.
1525 // A single writable() call is insufficient for large responses —
1526 // the TLS buffer may need multiple flushes. Without this loop,
1527 // the session is killed with unflushed TLS data, causing the
1528 // client to receive a truncated TLS record ("decode error").
1529 //
1530 // The constant 16 is empirical: it papers over a missing
1531 // invariant-15 hop in the H2 mux state machine where the
1532 // writable readiness signal is not always re-armed after a
1533 // partial flush. Long-term plan: reach invariant-15 closure
1534 // and remove this loop. See `lib/src/protocol/mux/LIFECYCLE.md`.
1535 let mut result = StateResult::Continue;
1536 for _ in 0..16 {
1537 result = match self
1538 .frontend
1539 .writable(&mut self.context, EndpointClient(&mut self.router))
1540 {
1541 MuxResult::Continue => StateResult::Continue,
1542 MuxResult::Upgrade => StateResult::Upgrade,
1543 MuxResult::CloseSession => StateResult::CloseSession,
1544 };
1545 if result != StateResult::Continue
1546 || !self.frontend.readiness_mut().interest.is_writable()
1547 {
1548 break;
1549 }
1550 }
1551 // Re-arm the frontend timeout so the session doesn't become immortal.
1552 // The writable call may have partially flushed the response — we need
1553 // the timeout to fire again if the flush stalls.
1554 if result == StateResult::Continue {
1555 self.frontend.timeout_container().set(self.frontend_token);
1556 }
1557 return result;
1558 }
1559 if should_close {
1560 if self.delay_close_for_frontend_flush("timeout") {
1561 debug!(
1562 "{} Mux timeout delaying close for frontend flush: token={:?}, frontend={:?}",
1563 log_context!(self),
1564 token,
1565 self.frontend
1566 );
1567 self.frontend.timeout_container().set(self.frontend_token);
1568 return StateResult::Continue;
1569 }
1570 if front_is_h2 {
1571 debug!(
1572 "{} Mux timeout returning CloseSession: token={:?}, frontend={:?}",
1573 log_context!(self),
1574 token,
1575 self.frontend
1576 );
1577 for (idx, stream) in self.context.streams.iter().enumerate() {
1578 if stream.state != StreamState::Recycle {
1579 debug!(
1580 "{} timeout stream[{}]: state={:?}, front_phase={:?}, back_phase={:?}, front_completed={}, back_completed={}",
1581 log_context!(self),
1582 idx,
1583 stream.state,
1584 stream.front.parsing_phase,
1585 stream.back.parsing_phase,
1586 stream.front.is_completed(),
1587 stream.back.is_completed()
1588 );
1589 }
1590 }
1591 }
1592 StateResult::CloseSession
1593 } else {
1594 // Re-arm the frontend timeout. Without this, the timeout is consumed
1595 // by triggered() and the session stays alive indefinitely until the
1596 // zombie checker runs (default: 30 minutes).
1597 self.frontend.timeout_container().set(self.frontend_token);
1598 StateResult::Continue
1599 }
1600 }
1601
1602 fn cancel_timeouts(&mut self) {
1603 trace!("{} MuxState::cancel_timeouts", log_context!(self));
1604 self.frontend.timeout_container().cancel();
1605 for backend in self.router.backends.values_mut() {
1606 backend.timeout_container().cancel();
1607 }
1608 }
1609
1610 fn print_state(&self, _context: &str) {
1611 // The trait-required `context: &str` parameter (protocol tag like
1612 // "HTTPS"/"HTTP" passed by callers) predates the unified
1613 // `log_context!(self)` envelope. The canonical `MUX` tag lives
1614 // inside `log_context!`, so we ignore the parameter here and emit
1615 // the bracketed Session(...) block instead, mirroring the second
1616 // `error!` in this function.
1617 error!(
1618 "\
1619{} Session(Mux)
1620\tFrontend:
1621\t\ttoken: {:?}\treadiness: {:?}
1622\tBackend(s):",
1623 log_context!(self),
1624 self.frontend_token,
1625 self.frontend.readiness()
1626 );
1627 for (backend_token, backend) in &self.router.backends {
1628 error!(
1629 "{} \t\ttoken: {:?}\treadiness: {:?}",
1630 log_context!(self),
1631 backend_token,
1632 backend.readiness()
1633 )
1634 }
1635 }
1636
1637 fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
1638 if self.context.debug.is_interesting() {
1639 warn!("{} {:?}", log_context!(self), self.context.debug.events);
1640 }
1641 debug!("{} MUX CLOSE", log_context!(self));
1642 trace!("{} FRONTEND: {:#?}", log_context!(self), self.frontend);
1643 trace!(
1644 "{} BACKENDS: {:#?}",
1645 log_context!(self),
1646 self.router.backends
1647 );
1648
1649 // Log active streams at session teardown for timeout diagnosis
1650 let active_count = self
1651 .context
1652 .streams
1653 .iter()
1654 .filter(|s| s.state.is_open() && s.metrics.start.is_some())
1655 .count();
1656 if active_count > 0 {
1657 debug!(
1658 "{} Session close with {} active stream(s)",
1659 log_context!(self),
1660 active_count
1661 );
1662 for (idx, stream) in self
1663 .context
1664 .streams
1665 .iter()
1666 .enumerate()
1667 .filter(|(_, s)| s.state.is_open() && s.metrics.start.is_some())
1668 {
1669 let elapsed = stream.metrics.service_time();
1670 debug!(
1671 "{} active stream[{}]: state={:?} service_time={:?} method={:?} path={:?} status={:?}",
1672 log_context!(self),
1673 idx,
1674 stream.state,
1675 elapsed,
1676 stream.context.method,
1677 stream.context.path,
1678 stream.context.status,
1679 );
1680 }
1681 incr!(names::h2::CLOSE_WITH_ACTIVE_STREAMS);
1682 }
1683
1684 // Distribute H2 connection-level overhead (control frames) across in-flight
1685 // streams so that access log bytes_in/bytes_out reflect actual wire cost.
1686 // Integer division may lose up to (active_count - 1) bytes, which is acceptable.
1687 let active_count = active_count.max(1);
1688 let (total_overhead_in, total_overhead_out) = self.frontend.overhead_bytes();
1689 let share_in = total_overhead_in / active_count;
1690 let share_out = total_overhead_out / active_count;
1691
1692 // Generate access logs for in-flight streams on session teardown.
1693 // Skip streams that already had their access log emitted (metrics.start is
1694 // set to None by metrics.reset() after generate_access_log in the happy path).
1695 // Frontend RTT is the same for every stream on this session — snapshot
1696 // it once outside the loop instead of paying one TCP_INFO syscall per
1697 // open stream.
1698 let client_rtt = socket_rtt(self.frontend.socket());
1699 for stream in &mut self.context.streams {
1700 if stream.state.is_open() && stream.metrics.start.is_some() {
1701 stream.metrics.bin += share_in;
1702 stream.metrics.bout += share_out;
1703 stream.metrics.service_stop();
1704 if stream.metrics.backend_stop.is_none() {
1705 stream.metrics.backend_stop();
1706 }
1707 // Only mark as error if the stream had an actual protocol/processing failure
1708 // (kawa parse error, backend error). Normal timeouts, client disconnects,
1709 // and graceful connection closures are not errors.
1710 let is_error = stream.front.is_error() || stream.back.is_error();
1711 let server_rtt = stream.linked_token().and_then(|token| {
1712 self.router
1713 .backends
1714 .get(&token)
1715 .and_then(|c| socket_rtt(c.socket()))
1716 });
1717 stream.generate_access_log(
1718 is_error,
1719 Some("session close"),
1720 self.context.listener.clone(),
1721 client_rtt,
1722 server_rtt,
1723 );
1724 stream.state = StreamState::Recycle;
1725 }
1726 }
1727
1728 self.frontend
1729 .close(&mut self.context, EndpointClient(&mut self.router));
1730
1731 for (token, client) in &mut self.router.backends {
1732 let proxy_borrow = proxy.borrow();
1733 client.timeout_container().cancel();
1734 let socket = client.socket_mut();
1735 if let Err(e) = proxy_borrow.deregister_socket(socket) {
1736 error!(
1737 "{} error deregistering back socket({:?}): {:?}",
1738 log_context_lite!(self),
1739 socket,
1740 e
1741 );
1742 }
1743 // invariant: write-only shutdown — Shutdown::Both on a TLS frontend
1744 // discards the receive buffer and elicits TCP RST, truncating the
1745 // already-queued response. Canonical write-up: `lib/src/https.rs:650-655`.
1746 // Backend sockets follow the same discipline for symmetry.
1747 if let Err(e) = socket.shutdown(Shutdown::Write) {
1748 if e.kind() != ErrorKind::NotConnected {
1749 error!(
1750 "{} error shutting down back socket({:?}): {:?}",
1751 log_context_lite!(self),
1752 socket,
1753 e
1754 );
1755 }
1756 }
1757 if !proxy_borrow.remove_session(*token) {
1758 error!(
1759 "{} session {:?} was already removed!",
1760 log_context_lite!(self),
1761 token
1762 );
1763 }
1764
1765 match client.position() {
1766 Position::Client(cluster_id, backend, _) => {
1767 let mut backend_borrow = backend.borrow_mut();
1768 backend_borrow.dec_connections();
1769 gauge_add!(names::backend::CONNECTIONS, -1);
1770 // Second `-1` site for `backend.pool.size` (the first is
1771 // in `connection.rs::pre_close_client_bookkeeping`). This
1772 // path runs during session teardown when the frontend
1773 // session iterates the backends map directly without
1774 // routing through `Connection::close`. Both `-1` sites
1775 // mirror the single `+1` in router.rs::connect and the
1776 // matching `backend.connections, -1` calls already
1777 // present here, so symmetry follows from
1778 // `backend.connections` correctness.
1779 gauge_add!(names::backend::POOL_SIZE, -1);
1780 gauge_add!(
1781 names::backend::CONNECTIONS_PER_BACKEND,
1782 -1,
1783 Some(cluster_id),
1784 Some(&backend_borrow.backend_id)
1785 );
1786 let count = self
1787 .context
1788 .backend_streams
1789 .get(token)
1790 .map_or(0, |ids| ids.len());
1791 backend_borrow.active_requests =
1792 backend_borrow.active_requests.saturating_sub(count);
1793 trace!(
1794 "{} connection (session) closed: {:#?}",
1795 log_context_lite!(self),
1796 backend_borrow
1797 );
1798 }
1799 Position::Server => {
1800 error!(
1801 "{} close_backend called on Server position",
1802 log_context_lite!(self)
1803 );
1804 }
1805 }
1806 }
1807 // Clear the reverse index after all backends have decremented their
1808 // active_requests counters (which depend on the index for stream counts).
1809 self.context.backend_streams.clear();
1810 }
1811
1812 fn shutting_down(&mut self) -> SessionIsToBeClosed {
1813 // RFC 9113 §6.8: initiate graceful shutdown with double-GOAWAY pattern.
1814 // Only send the initial GOAWAY once. The final GOAWAY (with the real
1815 // last_stream_id) is handled by finalize_write() when all streams drain.
1816 // Calling graceful_goaway() again would send the final GOAWAY
1817 // prematurely and force-disconnect before in-flight streams complete.
1818 if !self.frontend.is_draining() {
1819 match self.frontend.graceful_goaway() {
1820 MuxResult::CloseSession => return true,
1821 MuxResult::Continue => {
1822 // graceful_goaway() queued a GOAWAY frame. Flush it directly
1823 // since the event loop uses edge-triggered epoll and won't
1824 // deliver a new WRITABLE event for an already-writable socket.
1825 self.frontend.flush_zero_buffer();
1826 }
1827 _ => {}
1828 }
1829 } else {
1830 trace!(
1831 "{} shutting_down: already draining, skipping duplicate GOAWAY",
1832 log_context!(self)
1833 );
1834 // shut_down_sessions() runs outside ready(), so retry flushing any
1835 // previously-buffered GOAWAY/TLS records on each pass.
1836 self.frontend.flush_zero_buffer();
1837 }
1838 if self.drive_frontend_shutdown_io() {
1839 return true;
1840 }
1841 // Forced-close deadline: once the H2 listener's
1842 // `h2_graceful_shutdown_deadline_seconds` budget has elapsed from
1843 // the moment `graceful_goaway` armed `drain.started_at`, stop
1844 // waiting for streams and tear the session down. `drive_frontend_
1845 // shutdown_io` above already had a chance to flush any pending
1846 // TLS/GOAWAY records; this branch accepts that some bytes may be
1847 // lost in exchange for honoring the operator-configured SLA.
1848 // Listeners that disable the knob (`= 0` → `None`) short-circuit
1849 // the check inside `graceful_shutdown_deadline_elapsed`.
1850 if self.frontend.graceful_shutdown_deadline_elapsed() {
1851 debug!(
1852 "{} Mux shutting_down: graceful-shutdown deadline elapsed, forcing close",
1853 log_context!(self)
1854 );
1855 return true;
1856 }
1857 if matches!(self.frontend, Connection::H2(_)) && self.frontend.is_draining() {
1858 for stream in &mut self.context.streams {
1859 if stream.front_received_end_of_stream {
1860 continue;
1861 }
1862 if !matches!(stream.state, StreamState::Linked(_) | StreamState::Unlinked) {
1863 continue;
1864 }
1865 if stream.front.consumed
1866 && stream.front.storage.is_empty()
1867 && stream.front.is_completed()
1868 {
1869 stream.front_received_end_of_stream = true;
1870 self.frontend
1871 .readiness_mut()
1872 .interest
1873 .insert(Ready::WRITABLE);
1874 self.frontend.readiness_mut().signal_pending_write();
1875 }
1876 }
1877 }
1878 let mut can_stop = true;
1879 for stream in &mut self.context.streams {
1880 match stream.state {
1881 StreamState::Linked(_) => {
1882 can_stop = false;
1883 }
1884 StreamState::Unlinked => {
1885 kawa::debug_kawa(&stream.front);
1886 kawa::debug_kawa(&stream.back);
1887 if stream.is_quiesced() {
1888 continue;
1889 }
1890 stream.context.closing = true;
1891 can_stop = false;
1892 }
1893 _ => {}
1894 }
1895 }
1896 if self.frontend.has_pending_write() {
1897 return false;
1898 }
1899 if can_stop {
1900 let active_h2_streams = self
1901 .context
1902 .streams
1903 .iter()
1904 .enumerate()
1905 .filter(|(_, s)| {
1906 if s.state == StreamState::Recycle {
1907 return false;
1908 }
1909 if s.state == StreamState::Unlinked && s.is_quiesced() {
1910 return false;
1911 }
1912 true
1913 })
1914 .collect::<Vec<_>>();
1915 if matches!(self.frontend, Connection::H2(_)) && !active_h2_streams.is_empty() {
1916 debug!(
1917 "{} Mux shutting_down returning true with active H2 streams: {:?}",
1918 log_context!(self),
1919 self.frontend
1920 );
1921 for (idx, stream) in active_h2_streams {
1922 debug!(
1923 "{} shutdown stream[{}]: state={:?}, front_phase={:?}, back_phase={:?}, front_completed={}, back_completed={}",
1924 log_context!(self),
1925 idx,
1926 stream.state,
1927 stream.front.parsing_phase,
1928 stream.back.parsing_phase,
1929 stream.front.is_completed(),
1930 stream.back.is_completed()
1931 );
1932 }
1933 }
1934 }
1935 if can_stop {
1936 return true;
1937 }
1938
1939 false
1940 }
1941}
1942
1943#[cfg(test)]
1944mod tests {
1945 use super::*;
1946
1947 #[test]
1948 fn update_readiness_after_read_closed_keeps_writable() {
1949 let mut readiness = Readiness {
1950 event: Ready::READABLE | Ready::WRITABLE | Ready::HUP,
1951 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP,
1952 };
1953
1954 let should_yield = update_readiness_after_read(17, SocketResult::Closed, &mut readiness);
1955
1956 assert!(!should_yield);
1957 assert!(!readiness.event.is_readable());
1958 assert!(readiness.event.is_writable());
1959 assert!(readiness.event.is_hup());
1960 }
1961}