Skip to main content

sozu_lib/protocol/mux/
mod.rs

1//! HTTP/1.1 and HTTP/2 multiplexing layer.
2//!
3//! This module unifies HTTP/1.1 and HTTP/2 behind a single [`Mux`] session
4//! state machine that integrates with sozu's mio event loop. The key types:
5//!
6//! - [`Mux`]: The top-level session state, generic over socket (`TcpStream` or
7//!   `FrontRustls`) and listener. Implements `SessionState`.
8//! - [`Connection`]: Enum dispatching to [`ConnectionH1`] or [`ConnectionH2`]
9//!   for protocol-specific readable/writable logic.
10//! - [`Stream`]: Per-request state with front/back kawa buffers, metrics, and
11//!   lifecycle tracking. Shared between H1 and H2 paths.
12//! - [`Context`]: Per-session context (cluster, backends, routing, timeouts).
13//!
14//! The H2 implementation handles RFC 9113 framing, HPACK (RFC 7541), flow
15//! control, flood detection (CVE-2023-44487, CVE-2019-9512/9514/9515/9518,
16//! CVE-2024-27316), and graceful shutdown (double-GOAWAY per RFC 9113 §6.8).
17
18use std::{
19    cell::RefCell,
20    collections::{HashMap, VecDeque},
21    fmt::Debug,
22    io::ErrorKind,
23    net::{Shutdown, SocketAddr},
24    rc::{Rc, Weak},
25    sync::Arc,
26    time::{Duration, Instant},
27};
28
29use mio::{Token, net::TcpStream};
30use rusty_ulid::Ulid;
31use sozu_command::{
32    logging::ansi_palette,
33    proto::command::{Event, EventKind},
34    ready::Ready,
35};
36
37/// Protocol label + session descriptor used as a prefix on every [`Mux`] log
38/// line. Matches the RUSTLS log-context convention:
39/// `[<ulid> - - -]\tMUX\tSession(...)\t >>>`. When colored output is enabled
40/// (via [`ansi_palette`]) the label is wrapped in bold bright-white ANSI
41/// (uniform across every protocol) and the session detail block is rendered
42/// in light grey.
43///
44/// Fields included in the session block:
45/// - `frontend` — mio token of the frontend socket
46/// - `peer` — peer address (or `None` if the socket is gone)
47/// - `streams` — number of streams currently held by the [`Context`]
48/// - `backends` — number of backend connections in the [`Router`]
49/// - `pending_links` — streams waiting to be linked to a backend
50/// - `readiness` — frontend mio readiness snapshot
51macro_rules! log_context {
52    ($self:expr) => {{
53        let (open, reset, grey, gray, white) = ansi_palette();
54        format!(
55            "[{ulid} - - -]\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}peer{reset}={white}{peer:?}{reset}, {gray}streams{reset}={white}{streams}{reset}, {gray}backends{reset}={white}{backends}{reset}, {gray}pending_links{reset}={white}{pending_links}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
56            open = open,
57            reset = reset,
58            grey = grey,
59            gray = gray,
60            white = white,
61            ulid = $self.session_ulid,
62            frontend = $self.frontend_token.0,
63            peer = $self.frontend.socket().peer_addr().ok(),
64            streams = $self.context.streams.len(),
65            backends = $self.router.backends.len(),
66            pending_links = $self.context.pending_links.len(),
67            readiness = $self.frontend.readiness(),
68        )
69    }};
70}
71
72/// Lighter variant of [`log_context!`] that omits the
73/// `streams`/`backends`/`pending_links` counts. Used at sites where the
74/// borrow checker forbids reading `self.router.backends` or
75/// `self.context.streams` (e.g. inside a method that already holds a mutable
76/// borrow on one of them). The ULID and frontend snapshot still carry enough
77/// context to correlate the line back to the rest of the session.
78macro_rules! log_context_lite {
79    ($self:expr) => {{
80        let (open, reset, grey, gray, white) = ansi_palette();
81        format!(
82            "[{ulid} - - -]\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}peer{reset}={white}{peer:?}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
83            open = open,
84            reset = reset,
85            grey = grey,
86            gray = gray,
87            white = white,
88            ulid = $self.session_ulid,
89            frontend = $self.frontend_token.0,
90            peer = $self.frontend.socket().peer_addr().ok(),
91            readiness = $self.frontend.readiness(),
92        )
93    }};
94}
95
96/// Module-level prefix for logs emitted from free functions or routing
97/// blocks where no [`Mux`] is in scope. Honours the colored flag.
98///
99/// Two arms:
100/// * `log_module_context!()` — zero-arg, legacy `MUX\t >>>` output. Kept
101///   for sites without an `HttpContext` in scope (e.g. the generic
102///   `trace!` that fires before the variant-specific match).
103/// * `log_module_context!($http_context)` — rich form. `$http_context`
104///   must be `&HttpContext`. Produces the same
105///   `[session req cluster backend]` bracket as RUSTLS/PIPE/TCP followed
106///   by a `Session(...)` block, so MUX lines emitted from variant match
107///   arms stay filterable by session ULID or request ULID. Mirrors
108///   `router.rs:log_module_context!($http_context)` (see there).
109macro_rules! log_module_context {
110    () => {{
111        let (open, reset, _, _, _) = ansi_palette();
112        format!("{open}MUX{reset}\t >>>", open = open, reset = reset)
113    }};
114    ($http_context:expr) => {{
115        let (open, reset, grey, gray, white) = ansi_palette();
116        let http_ctx: &HttpContext = &$http_context;
117        let ctx = http_ctx.log_context();
118        format!(
119            "{gray}{ctx}{reset}\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend:?}{reset}, {gray}method{reset}={white}{method:?}{reset}, {gray}authority{reset}={white}{authority:?}{reset})\t >>>",
120            open = open,
121            reset = reset,
122            grey = grey,
123            gray = gray,
124            white = white,
125            ctx = ctx,
126            frontend = http_ctx.session_address,
127            method = http_ctx.method,
128            authority = http_ctx.authority,
129        )
130    }};
131}
132
133pub mod answers;
134pub mod auth;
135pub mod connection;
136mod converter;
137pub mod debug;
138mod h1;
139mod h2;
140pub mod parser;
141mod pkawa;
142pub mod router;
143pub(crate) mod serializer;
144mod shared;
145pub mod stream;
146
147use crate::metrics::names;
148use crate::{
149    BackendConnectionError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerHandler,
150    ProxySession, Readiness, RetrieveClusterError, SessionIsToBeClosed, SessionMetrics,
151    SessionResult, StateResult,
152    backends::{Backend, BackendError},
153    http::HttpListener,
154    https::HttpsListener,
155    pool::{Checkout, Pool},
156    protocol::{SessionState, http::editor::HttpContext},
157    retry::RetryPolicy,
158    server::push_event,
159    socket::{FrontRustls, SessionTcpStream, SocketHandler, SocketResult, stats::socket_rtt},
160};
161
162pub(crate) use crate::protocol::mux::answers::{
163    forcefully_terminate_answer, set_default_answer, set_default_answer_with_retry_after,
164};
165use crate::protocol::mux::connection::{EndpointClient, EndpointServer};
166pub use crate::protocol::mux::{
167    answers::terminate_default_answer,
168    connection::Connection,
169    debug::{DebugEvent, DebugHistory},
170    h1::ConnectionH1,
171    h2::ConnectionH2,
172    h2::H2ByteAccounting,
173    h2::H2ConnectionConfig,
174    h2::H2DrainState,
175    h2::H2FloodConfig,
176    h2::H2FlowControl,
177    parser::H2Error,
178    router::Router,
179    stream::{Stream, StreamParts, StreamState},
180};
181
182// ── Tuning Constants ─────────────────────────────────────────────────────────
183
184/// Maximum event loop iterations before forcefully closing a session.
185/// Prevents infinite loops from consuming the single-threaded worker.
186const MAX_LOOP_ITERATIONS: i32 = 10_000;
187// ─────────────────────────────────────────────────────────────────────────────
188
189/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
190type GenericHttpStream = kawa::Kawa<Checkout>;
191type StreamId = u32;
192type GlobalStreamId = usize;
193pub type MuxClear = Mux<SessionTcpStream, HttpListener>;
194pub type MuxTls = Mux<FrontRustls, HttpsListener>;
195
196pub enum Position {
197    Client(String, Rc<RefCell<Backend>>, BackendStatus),
198    Server,
199}
200
201impl Debug for Position {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        match self {
204            Self::Client(cluster_id, _, status) => f
205                .debug_tuple("Client")
206                .field(cluster_id)
207                .field(status)
208                .finish(),
209            Self::Server => write!(f, "Server"),
210        }
211    }
212}
213
214impl Position {
215    fn is_server(&self) -> bool {
216        match self {
217            Position::Client(..) => false,
218            Position::Server => true,
219        }
220    }
221    fn is_client(&self) -> bool {
222        !self.is_server()
223    }
224
225    /// Increment the global `count!()` counter for bytes read on this side.
226    pub fn count_bytes_in_counter(&self, size: usize) {
227        match self {
228            Position::Client(..) => count!(names::backend::BACK_BYTES_IN, size as i64),
229            Position::Server => count!(names::backend::BYTES_IN, size as i64),
230        }
231    }
232
233    /// Increment the global `count!()` counter for bytes written on this side.
234    pub fn count_bytes_out_counter(&self, size: usize) {
235        match self {
236            Position::Client(..) => count!(names::backend::BACK_BYTES_OUT, size as i64),
237            Position::Server => count!(names::backend::BYTES_OUT, size as i64),
238        }
239    }
240
241    /// Attribute `size` bytes read to the appropriate `SessionMetrics` field.
242    pub fn count_bytes_in(&self, metrics: &mut SessionMetrics, size: usize) {
243        match self {
244            Position::Client(..) => metrics.backend_bin += size,
245            Position::Server => metrics.bin += size,
246        }
247    }
248
249    /// Attribute `size` bytes written to the appropriate `SessionMetrics` field.
250    pub fn count_bytes_out(&self, metrics: &mut SessionMetrics, size: usize) {
251        match self {
252            Position::Client(..) => metrics.backend_bout += size,
253            Position::Server => metrics.bout += size,
254        }
255    }
256}
257
258#[derive(Debug)]
259pub enum BackendStatus {
260    Connecting(Instant),
261    Connected,
262    KeepAlive,
263    Disconnecting,
264}
265
266#[derive(Debug, Clone, Copy)]
267pub enum MuxResult {
268    Continue,
269    Upgrade,
270    CloseSession,
271}
272
273pub trait Endpoint: Debug {
274    fn readiness(&self, token: Token) -> &Readiness;
275    fn readiness_mut(&mut self, token: Token) -> &mut Readiness;
276    /// Returns the underlying TCP socket for the peer side of a stream.
277    ///
278    /// Used by access-log emission to capture TCP_INFO RTT for the side the
279    /// caller does NOT own directly: a frontend connection (Position::Server)
280    /// reads the backend socket through this method, and a backend connection
281    /// (Position::Client) reads the frontend socket the same way. `token` is
282    /// ignored by [`super::connection::EndpointServer`] (which has a single
283    /// frontend connection) and used as a key by
284    /// [`super::connection::EndpointClient`] (which keys backends by token).
285    /// Returns `None` when the token doesn't resolve, mirroring the existing
286    /// fallback paths in `readiness`/`readiness_mut`.
287    fn socket(&self, token: Token) -> Option<&TcpStream>;
288    /// If end_stream is called on a client it means the stream has PROPERLY finished,
289    /// the server has completed serving the response and informs the endpoint that this stream won't be used anymore.
290    /// If end_stream is called on a server it means the stream was BROKEN, the client was most likely disconnected or encountered an error
291    /// it is for the server to decide if the stream can be retried or an error should be sent. It should be GUARANTEED that all bytes from
292    /// the backend were read. However it is almost certain that all bytes were not already sent to the client.
293    fn end_stream<L: ListenerHandler + L7ListenerHandler>(
294        &mut self,
295        token: Token,
296        stream: GlobalStreamId,
297        context: &mut Context<L>,
298    );
299    /// If start_stream is called on a client it means the stream should be attached to this endpoint,
300    /// the stream might be recovering from a disconnection, in any case at this point its response MUST be empty.
301    /// If the start_stream is called on a H2 server it means the stream is a server push and its request MUST be empty.
302    /// Returns false if the stream could not be started (e.g. max concurrent streams reached).
303    fn start_stream<L: ListenerHandler + L7ListenerHandler>(
304        &mut self,
305        token: Token,
306        stream: GlobalStreamId,
307        context: &mut Context<L>,
308    ) -> bool;
309}
310
311/// Shared logic for half-close accounting: clear `bit` from `readiness.event` on
312/// socket errors/would-block, and return `true` (yield) when no bytes were
313/// transferred so the caller can park the half. Rationale for clearing only
314/// `bit` (not both halves) on `Closed`: the opposite half may still need one
315/// last pass to flush queued frames or the TLS close_notify.
316fn update_readiness(
317    size: usize,
318    status: SocketResult,
319    readiness: &mut Readiness,
320    bit: Ready,
321) -> bool {
322    trace!(
323        "{}   size={}, status={:?}",
324        log_module_context!(),
325        size,
326        status
327    );
328    match status {
329        SocketResult::Continue => {}
330        SocketResult::Closed | SocketResult::Error | SocketResult::WouldBlock => {
331            readiness.event.remove(bit);
332        }
333    }
334    if size > 0 {
335        false
336    } else {
337        readiness.event.remove(bit);
338        true
339    }
340}
341
342fn update_readiness_after_read(
343    size: usize,
344    status: SocketResult,
345    readiness: &mut Readiness,
346) -> bool {
347    update_readiness(size, status, readiness, Ready::READABLE)
348}
349
350fn update_readiness_after_write(
351    size: usize,
352    status: SocketResult,
353    readiness: &mut Readiness,
354) -> bool {
355    update_readiness(size, status, readiness, Ready::WRITABLE)
356}
357pub struct Context<L: ListenerHandler + L7ListenerHandler> {
358    pub streams: Vec<Stream>,
359    /// Streams whose state is `StreamState::Link` and need backend connection.
360    /// Replaces the O(n) scan of `streams` in the ready loop.
361    pub pending_links: VecDeque<GlobalStreamId>,
362    /// Reverse index: backend token -> global stream IDs currently in
363    /// `StreamState::Linked(token)`. Eliminates O(n) scans of `streams`
364    /// when handling backend connect/disconnect/timeout/close events.
365    pub backend_streams: HashMap<Token, Vec<GlobalStreamId>>,
366    pub pool: Weak<RefCell<Pool>>,
367    pub listener: Rc<RefCell<L>>,
368    /// Connection/session ULID — mirrors `Mux.session_ulid`. Stored here so
369    /// per-stream `HttpContext` construction in [`Self::create_stream`] can
370    /// stamp the session slot of the log-context bracket without reaching
371    /// back into the parent [`Mux`].
372    pub session_ulid: Ulid,
373    pub session_address: Option<SocketAddr>,
374    pub public_address: SocketAddr,
375    pub debug: DebugHistory,
376    /// Shrink threshold ratio for recycled stream slots.
377    /// Vec is shrunk when total_slots > active_streams * ratio.
378    pub h2_stream_shrink_ratio: usize,
379    /// TLS SNI value negotiated at handshake, propagated to every
380    /// per-stream [`HttpContext`] so the routing layer can enforce
381    /// the SNI ↔ `:authority` binding on every H2 stream (and the
382    /// single H1 request). `None` for plaintext listeners or when
383    /// the client omitted the SNI extension. Stored pre-lowercased
384    /// and without a port for cheap exact-match comparison.
385    pub tls_server_name: Option<String>,
386    /// Snapshot of the SAN set of the certificate Sōzu actually served at
387    /// the TLS handshake. Captured once in `https.rs::upgrade_handshake`
388    /// from the resolver and frozen for the connection lifetime so H2
389    /// stream coalescing (RFC 7540 §9.1.1 / RFC 9113 §9.1.1) accepts any
390    /// `:authority` covered by the certificate, with RFC 6125 §6.4.3
391    /// wildcard handling. `None` for plaintext listeners or when SNI was
392    /// absent. `Some(empty)` when the default cert was served — every
393    /// `:authority` is rejected. `Arc` so the snapshot is shared across
394    /// every per-stream `HttpContext` without re-allocation.
395    pub tls_cert_names: Option<Arc<Vec<String>>>,
396    /// Whether the routing layer must reject any request whose authority
397    /// host does not exact-match `tls_server_name` (CWE-346 / CWE-444).
398    /// Mirrors `HttpsListenerConfig::strict_sni_binding`; captured once
399    /// at `Context::new` so routing decisions on each stream avoid a
400    /// per-stream `listener.borrow()`.
401    pub strict_sni_binding: bool,
402    /// Whether the request-side block walk must strip any client-supplied
403    /// `X-Real-IP` header before forwarding (anti-spoofing). Mirrors
404    /// `HttpListenerConfig::elide_x_real_ip` /
405    /// `HttpsListenerConfig::elide_x_real_ip`; captured once at
406    /// `Context::new` so per-stream `HttpContext`s do not need to call
407    /// `listener.borrow()` again. Independent of `send_x_real_ip`.
408    pub elide_x_real_ip: bool,
409    /// Whether `on_request_headers` injects a proxy-generated `X-Real-IP`
410    /// header carrying the connection peer IP (post-PROXY-v2 unwrap).
411    /// Mirrors `HttpListenerConfig::send_x_real_ip` /
412    /// `HttpsListenerConfig::send_x_real_ip`; captured once at
413    /// `Context::new`. Independent of `elide_x_real_ip`.
414    pub send_x_real_ip: bool,
415    /// Negotiated TLS protocol version short-form (e.g. `"TLSv1.3"`).
416    /// Captured once at handshake completion in `https.rs` and propagated
417    /// to every per-stream [`HttpContext`] so the access log can record it
418    /// without reaching back into the rustls session per request. `None`
419    /// for plaintext listeners.
420    pub tls_version: Option<&'static str>,
421    /// Negotiated TLS cipher suite short-form (e.g.
422    /// `"TLS_AES_128_GCM_SHA256"`). Captured once at handshake completion
423    /// and propagated to every per-stream [`HttpContext`]. `None` for
424    /// plaintext listeners.
425    pub tls_cipher: Option<&'static str>,
426    /// Negotiated ALPN protocol short-form (e.g. `"h2"`, `"http/1.1"`).
427    /// Captured once at handshake completion and propagated to every
428    /// per-stream [`HttpContext`]. `None` for plaintext listeners or when
429    /// no ALPN was negotiated.
430    pub tls_alpn: Option<&'static str>,
431}
432
433impl<L: ListenerHandler + L7ListenerHandler> Context<L> {
434    pub fn new(
435        session_ulid: Ulid,
436        pool: Weak<RefCell<Pool>>,
437        listener: Rc<RefCell<L>>,
438        session_address: Option<SocketAddr>,
439        public_address: SocketAddr,
440    ) -> Self {
441        let h2_stream_shrink_ratio = listener
442            .borrow()
443            .get_h2_connection_config()
444            .stream_shrink_ratio as usize;
445        let strict_sni_binding = listener.borrow().get_strict_sni_binding();
446        let elide_x_real_ip = listener.borrow().get_elide_x_real_ip();
447        let send_x_real_ip = listener.borrow().get_send_x_real_ip();
448        Self {
449            streams: Vec::new(),
450            pending_links: VecDeque::new(),
451            backend_streams: HashMap::new(),
452            pool,
453            listener,
454            session_ulid,
455            session_address,
456            public_address,
457            debug: DebugHistory::new(),
458            h2_stream_shrink_ratio,
459            tls_server_name: None,
460            tls_cert_names: None,
461            strict_sni_binding,
462            elide_x_real_ip,
463            send_x_real_ip,
464            tls_version: None,
465            tls_cipher: None,
466            tls_alpn: None,
467        }
468    }
469
470    pub fn active_len(&self) -> usize {
471        self.streams
472            .iter()
473            .filter(|s| !matches!(s.state, StreamState::Recycle))
474            .count()
475    }
476
477    /// Shared accessor for the [`HttpContext`] owned by a stream.
478    ///
479    /// Prefer this over `&self.streams[stream_id].context` at call sites
480    /// that only need read access — it keeps the `Stream`/`HttpContext`
481    /// relationship encapsulated and reads the same regardless of whether
482    /// the caller is inside `Router::connect`, the H2 mux, or a free
483    /// helper. Panics on an out-of-bounds `stream_id`, which is the same
484    /// behaviour as the raw `streams[sid]` indexing it replaces.
485    pub fn http_context(&self, stream_id: GlobalStreamId) -> &HttpContext {
486        &self.streams[stream_id].context
487    }
488
489    /// Mutable sibling of [`Self::http_context`]. Use when routing
490    /// decisions need to stamp `cluster_id` / `backend_id` on the stream's
491    /// [`HttpContext`] (e.g. `Router::connect` at the fill-cluster /
492    /// fill-backend points).
493    pub fn http_context_mut(&mut self, stream_id: GlobalStreamId) -> &mut HttpContext {
494        &mut self.streams[stream_id].context
495    }
496
497    /// Register a stream as linked to a backend token in the reverse index.
498    pub fn link_stream(&mut self, stream_id: GlobalStreamId, token: Token) {
499        self.streams[stream_id].state = StreamState::Linked(token);
500        self.backend_streams
501            .entry(token)
502            .or_default()
503            .push(stream_id);
504    }
505
506    /// Remove a stream from the backend reverse index if it is currently
507    /// `Linked`. Returns the backend token if one was removed.
508    pub fn unlink_stream(&mut self, stream_id: GlobalStreamId) -> Option<Token> {
509        if let StreamState::Linked(token) = self.streams[stream_id].state {
510            remove_backend_stream(&mut self.backend_streams, token, stream_id);
511            Some(token)
512        } else {
513            None
514        }
515    }
516
517    pub fn create_stream(&mut self, request_id: Ulid, window: u32) -> Option<GlobalStreamId> {
518        let http_context = {
519            let listener = self.listener.borrow();
520            let mut http_context = HttpContext::new(
521                self.session_ulid,
522                request_id,
523                listener.protocol(),
524                self.public_address,
525                self.session_address,
526                listener.get_sticky_name().to_string(),
527                listener.get_sozu_id_header().to_string(),
528                self.elide_x_real_ip,
529                self.send_x_real_ip,
530            );
531            // Propagate the connection-scoped TLS SNI onto every per-stream
532            // HttpContext so `route_from_request` can enforce the SNI ↔
533            // `:authority` binding for each H2 stream independently.
534            http_context.tls_server_name = self.tls_server_name.clone();
535            // Mirror the frozen-at-handshake SAN snapshot. `Arc` clone is a
536            // refcount bump, not a deep copy — every per-stream
537            // `HttpContext` shares the same `Vec<String>`.
538            http_context.tls_cert_names = self.tls_cert_names.clone();
539            // Mirror the listener's strict_sni_binding flag onto each
540            // HttpContext so the routing layer can honor operator opt-outs
541            // without reaching back into the listener on every request.
542            http_context.strict_sni_binding = self.strict_sni_binding;
543            // Propagate the connection-scoped TLS metadata onto every
544            // per-stream HttpContext so the access log can record it without
545            // touching the rustls session on every request. These are
546            // `&'static str` borrows from the rustls label tables — copy is
547            // a pointer move.
548            http_context.tls_version = self.tls_version;
549            http_context.tls_cipher = self.tls_cipher;
550            http_context.tls_alpn = self.tls_alpn;
551            http_context
552        };
553        let recycle_slot = self
554            .streams
555            .iter()
556            .position(|s| s.state == StreamState::Recycle);
557        if let Some(stream_id) = recycle_slot {
558            let stream = &mut self.streams[stream_id];
559            trace!("{} Reuse stream: {}", log_module_context!(), stream_id);
560            stream.state = StreamState::Idle;
561            stream.attempts = 0;
562            stream.front_received_end_of_stream = false;
563            stream.back_received_end_of_stream = false;
564            stream.front_data_received = 0;
565            stream.back_data_received = 0;
566            stream.request_counted = false;
567            stream.window = i32::try_from(window).unwrap_or(i32::MAX);
568            stream.context = http_context;
569            stream.back.clear();
570            stream.back.storage.clear();
571            stream.front.clear();
572            stream.front.storage.clear();
573            stream.metrics.reset();
574            stream.metrics.start = Some(Instant::now());
575            // After recycling a slot, check if the Vec has excessive trailing
576            // Recycle entries (more than 2x active streams of total capacity).
577            let active = self.active_len();
578            let total = self.streams.len();
579            if total > 1 && active > 0 && total > active * self.h2_stream_shrink_ratio {
580                self.shrink_trailing_recycle();
581            }
582            return Some(stream_id);
583        }
584        self.streams
585            .push(Stream::new(self.pool.clone(), http_context, window)?);
586        Some(self.streams.len() - 1)
587    }
588
589    /// Remove consecutive `Recycle` entries from the end of the streams Vec.
590    ///
591    /// This prevents unbounded growth when H2 streams are created and recycled
592    /// over time, reclaiming memory from slots that are no longer needed.
593    pub fn shrink_trailing_recycle(&mut self) {
594        while self
595            .streams
596            .last()
597            .is_some_and(|s| s.state == StreamState::Recycle)
598        {
599            self.streams.pop();
600        }
601    }
602}
603
604/// Remove `stream_id` from the backend-token reverse index for `token`.
605/// Free function to allow split borrows when `context.streams` is already
606/// mutably borrowed (preventing a `Context::unlink_stream` call).
607pub(super) fn remove_backend_stream(
608    index: &mut HashMap<Token, Vec<GlobalStreamId>>,
609    token: Token,
610    stream_id: GlobalStreamId,
611) {
612    if let Some(ids) = index.get_mut(&token) {
613        ids.retain(|&id| id != stream_id);
614        if ids.is_empty() {
615            index.remove(&token);
616        }
617    }
618}
619
620pub struct Mux<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
621    pub configured_frontend_timeout: Duration,
622    pub frontend_token: Token,
623    pub frontend: Connection<Front>,
624    pub router: Router,
625    pub context: Context<L>,
626    /// Per-session correlation ID generated at construction time. Included in
627    /// every log line emitted from this module so all events for a single
628    /// frontend connection can be reassembled (independent of the ephemeral
629    /// per-stream request id used by access logs).
630    pub session_ulid: Ulid,
631}
632
633impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Mux<Front, L> {
634    pub fn front_socket(&self) -> &TcpStream {
635        self.frontend.socket()
636    }
637}
638
639impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHandler> Mux<Front, L> {
640    fn delay_close_for_frontend_flush(&mut self, reason: &'static str) -> bool {
641        let _ = self.frontend.initiate_close_notify();
642        // LIFECYCLE §9 invariant 16: consult per-stream back-buffers in
643        // addition to the connection-level pending-write predicate so
644        // shutdown does not close while any open H2 stream still has
645        // kawa bytes queued after a voluntary scheduler yield.
646        if self
647            .frontend
648            .has_pending_write_including_streams(&self.context)
649        {
650            let readiness = self.frontend.readiness_mut();
651            readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
652            readiness.signal_pending_write();
653            debug!(
654                "{} Mux delaying close on {}: {:?}",
655                log_context!(self),
656                reason,
657                self.frontend
658            );
659            true
660        } else {
661            false
662        }
663    }
664
665    /// Drive the frontend I/O path during shutdown, when the server is polling
666    /// `shutting_down()` outside the normal epoll readiness loop.
667    ///
668    /// This is required for H2 graceful shutdown because a stream may still
669    /// need one last readable pass to observe the peer's END_STREAM or one last
670    /// writable pass to retire the stream, emit GOAWAY, or flush TLS records.
671    fn drive_frontend_shutdown_io(&mut self) -> SessionIsToBeClosed {
672        let force_h2_read = matches!(self.frontend, Connection::H2(_));
673        let force_h2_write = matches!(self.frontend, Connection::H2(_));
674        let readiness = self.frontend.readiness().clone();
675        if !force_h2_read
676            && !force_h2_write
677            && readiness.event.is_empty()
678            && !self.frontend.has_pending_write()
679        {
680            return false;
681        }
682
683        if force_h2_read || self.frontend.readiness().event.is_readable() {
684            self.frontend
685                .readiness_mut()
686                .interest
687                .insert(Ready::READABLE);
688            match self
689                .frontend
690                .readable(&mut self.context, EndpointClient(&mut self.router))
691            {
692                MuxResult::Continue => {}
693                MuxResult::CloseSession | MuxResult::Upgrade => return true,
694            }
695        }
696
697        if !force_h2_write
698            && !self.frontend.has_pending_write()
699            && !self.frontend.readiness().event.is_writable()
700        {
701            return false;
702        }
703
704        let mut iterations = 0;
705        loop {
706            self.frontend
707                .readiness_mut()
708                .interest
709                .insert(Ready::WRITABLE);
710            if force_h2_write {
711                self.frontend.readiness_mut().signal_pending_write();
712            }
713            match self
714                .frontend
715                .writable(&mut self.context, EndpointClient(&mut self.router))
716            {
717                MuxResult::Continue => {}
718                MuxResult::CloseSession | MuxResult::Upgrade => return true,
719            }
720
721            iterations += 1;
722            if iterations >= MAX_LOOP_ITERATIONS
723                || (!self.frontend.has_pending_write()
724                    && !self.frontend.readiness().event.is_writable())
725            {
726                break;
727            }
728        }
729        false
730    }
731}
732
733impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHandler> SessionState
734    for Mux<Front, L>
735{
736    fn ready(
737        &mut self,
738        session: Rc<RefCell<dyn ProxySession>>,
739        proxy: Rc<RefCell<dyn L7Proxy>>,
740        _metrics: &mut SessionMetrics,
741    ) -> SessionResult {
742        let mut counter = 0;
743
744        if self.frontend.readiness().event.is_hup()
745            && !self.delay_close_for_frontend_flush("frontend HUP")
746        {
747            debug!(
748                "{} Mux closing on frontend HUP: {:?}",
749                log_context!(self),
750                self.frontend
751            );
752            return SessionResult::Close;
753        }
754
755        // Start service timers on all active streams after the HUP check.
756        // This mirrors session-level service_start/service_stop in Http(s)Session::ready()
757        // to measure only CPU processing time, excluding epoll wait between cycles.
758        for stream in &mut self.context.streams {
759            if stream.state.is_open() {
760                stream.metrics.service_start();
761            }
762        }
763
764        let start = Instant::now();
765        self.context.debug.push(DebugEvent::ReadyTimestamp(
766            std::time::SystemTime::now()
767                .duration_since(std::time::UNIX_EPOCH)
768                .unwrap_or_default()
769                .as_millis() as usize,
770        ));
771        trace!("{} {:?}", log_context!(self), start);
772        loop {
773            self.context.debug.push(DebugEvent::LoopStart);
774            loop {
775                self.context.debug.push(DebugEvent::LoopIteration(counter));
776                if self.frontend.readiness().filter_interest().is_readable() {
777                    let res = {
778                        let context = &mut self.context;
779                        let res = self
780                            .frontend
781                            .readable(context, EndpointClient(&mut self.router));
782                        context.debug.push(DebugEvent::SR(
783                            self.frontend_token,
784                            res,
785                            self.frontend.readiness().clone(),
786                        ));
787                        res
788                    };
789                    match res {
790                        MuxResult::Continue => {}
791                        MuxResult::CloseSession => {
792                            if !self.delay_close_for_frontend_flush("frontend readable") {
793                                debug!(
794                                    "{} Mux close from frontend readable: {:?}",
795                                    log_context!(self),
796                                    self.frontend
797                                );
798                                return SessionResult::Close;
799                            }
800                        }
801                        MuxResult::Upgrade => return SessionResult::Upgrade,
802                    }
803                }
804
805                let mut all_backends_readiness_are_empty = true;
806                let mut dead_backends = Vec::new();
807                let mut backend_close: Option<(&'static str, Token)> = None;
808                for (token, client) in self.router.backends.iter_mut() {
809                    let readiness = client.readiness_mut();
810                    // Check the raw event for HUP/ERROR — not filter_interest(),
811                    // because interest only contains READABLE|WRITABLE and would
812                    // always mask out HUP (0b01000) and ERROR (0b00100).
813                    let dead = readiness.event.is_hup() || readiness.event.is_error();
814                    if dead {
815                        trace!(
816                            "{} Backend({:?}) -> {:?}",
817                            log_context_lite!(self),
818                            token,
819                            readiness
820                        );
821                        readiness.event.remove(Ready::WRITABLE);
822                    }
823
824                    if client.readiness().filter_interest().is_writable() {
825                        let position = client.position_mut();
826                        match position {
827                            Position::Client(
828                                cluster_id,
829                                backend,
830                                BackendStatus::Connecting(start),
831                            ) => {
832                                #[cfg(debug_assertions)]
833                                self.context
834                                    .debug
835                                    .push(DebugEvent::CCS(*token, cluster_id.clone()));
836
837                                let mut backend_borrow = backend.borrow_mut();
838                                if backend_borrow.retry_policy.is_down() {
839                                    info!(
840                                        "{} backend server {} at {} is up",
841                                        log_context_lite!(self),
842                                        backend_borrow.backend_id,
843                                        backend_borrow.address
844                                    );
845                                    incr!(
846                                        "backend.up",
847                                        Some(cluster_id),
848                                        Some(&backend_borrow.backend_id)
849                                    );
850                                    gauge!(
851                                        names::backend::AVAILABLE,
852                                        1,
853                                        Some(cluster_id),
854                                        Some(&backend_borrow.backend_id)
855                                    );
856                                    push_event(Event {
857                                        kind: EventKind::BackendUp as i32,
858                                        backend_id: Some(backend_borrow.backend_id.to_owned()),
859                                        address: Some(backend_borrow.address.into()),
860                                        cluster_id: Some(cluster_id.to_owned()),
861                                        metric_detail: None,
862                                    });
863                                }
864
865                                //successful connection, reset failure counter
866                                backend_borrow.failures = 0;
867                                backend_borrow.set_connection_time(start.elapsed());
868                                backend_borrow.retry_policy.succeed();
869
870                                if let Some(ids) = self.context.backend_streams.get(token) {
871                                    for &stream_id in ids {
872                                        self.context.streams[stream_id].metrics.backend_connected();
873                                        backend_borrow.active_requests += 1;
874                                    }
875                                }
876                                trace!(
877                                    "{} connection success: {:#?}",
878                                    log_context_lite!(self),
879                                    backend_borrow
880                                );
881                                drop(backend_borrow);
882                                *position = Position::Client(
883                                    std::mem::take(cluster_id),
884                                    backend.clone(),
885                                    BackendStatus::Connected,
886                                );
887                                client
888                                    .timeout_container()
889                                    .set_duration(self.router.configured_backend_timeout);
890                            }
891                            Position::Client(..) => {}
892                            Position::Server => {
893                                error!(
894                                    "{} backend connection cannot be in Server position",
895                                    log_context_lite!(self)
896                                );
897                            }
898                        }
899                        let res = {
900                            let context = &mut self.context;
901                            let res = client.writable(context, EndpointServer(&mut self.frontend));
902                            context.debug.push(DebugEvent::CW(
903                                *token,
904                                res,
905                                client.readiness().clone(),
906                            ));
907                            res
908                        };
909                        match res {
910                            MuxResult::Continue => {}
911                            MuxResult::Upgrade => {
912                                error!(
913                                    "{} only frontend connections can trigger Upgrade",
914                                    log_context_lite!(self)
915                                );
916                            }
917                            MuxResult::CloseSession => {
918                                backend_close = Some(("backend writable", *token));
919                                break;
920                            }
921                        }
922                        // Cross-readiness: backend wrote → wake frontend reader
923                        let context = &mut self.context;
924                        self.frontend.try_resume_reading(context);
925                    }
926
927                    if client.readiness().filter_interest().is_readable() {
928                        let res = {
929                            let context = &mut self.context;
930                            let res = client.readable(context, EndpointServer(&mut self.frontend));
931                            context.debug.push(DebugEvent::CR(
932                                *token,
933                                res,
934                                client.readiness().clone(),
935                            ));
936                            res
937                        };
938                        match res {
939                            MuxResult::Continue => {}
940                            MuxResult::Upgrade => {
941                                error!(
942                                    "{} only frontend connections can trigger Upgrade (readable)",
943                                    log_context_lite!(self)
944                                );
945                            }
946                            MuxResult::CloseSession => {
947                                backend_close = Some(("backend readable", *token));
948                                break;
949                            }
950                        }
951                    }
952
953                    if dead
954                        && !client.readiness().filter_interest().is_readable()
955                        && !client.has_buffer_pressure(&self.context)
956                    {
957                        self.context
958                            .debug
959                            .push(DebugEvent::CH(*token, client.readiness().clone()));
960                        trace!("{} Closing {:#?}", log_context_lite!(self), client);
961                        match client.position() {
962                            Position::Client(cluster_id, backend, BackendStatus::Connecting(_)) => {
963                                let mut backend_borrow = backend.borrow_mut();
964                                backend_borrow.failures += 1;
965
966                                let already_unavailable = backend_borrow.retry_policy.is_down();
967                                backend_borrow.retry_policy.fail();
968                                incr!(
969                                    "backend.connections.error",
970                                    Some(cluster_id),
971                                    Some(&backend_borrow.backend_id)
972                                );
973                                if !already_unavailable && backend_borrow.retry_policy.is_down() {
974                                    error!(
975                                        "{} backend server {} at {} is down",
976                                        log_context_lite!(self),
977                                        backend_borrow.backend_id,
978                                        backend_borrow.address
979                                    );
980                                    incr!(
981                                        "backend.down",
982                                        Some(cluster_id),
983                                        Some(&backend_borrow.backend_id)
984                                    );
985                                    gauge!(
986                                        names::backend::AVAILABLE,
987                                        0,
988                                        Some(cluster_id),
989                                        Some(&backend_borrow.backend_id)
990                                    );
991                                    push_event(Event {
992                                        kind: EventKind::BackendDown as i32,
993                                        backend_id: Some(backend_borrow.backend_id.to_owned()),
994                                        address: Some(backend_borrow.address.into()),
995                                        cluster_id: Some(cluster_id.to_owned()),
996                                        metric_detail: None,
997                                    });
998                                }
999                                trace!(
1000                                    "{} connection fail: {:#?}",
1001                                    log_context_lite!(self),
1002                                    backend_borrow
1003                                );
1004                            }
1005                            Position::Client(_, backend, _) => {
1006                                let mut backend_borrow = backend.borrow_mut();
1007                                let count = self
1008                                    .context
1009                                    .backend_streams
1010                                    .get(token)
1011                                    .map_or(0, |ids| ids.len());
1012                                backend_borrow.active_requests =
1013                                    backend_borrow.active_requests.saturating_sub(count);
1014                            }
1015                            Position::Server => {
1016                                error!(
1017                                    "{} dead backend cannot be in Server position",
1018                                    log_context_lite!(self)
1019                                );
1020                            }
1021                        }
1022                        client.close(&mut self.context, EndpointServer(&mut self.frontend));
1023                        dead_backends.push(*token);
1024                    }
1025
1026                    if !client.readiness().filter_interest().is_empty() {
1027                        all_backends_readiness_are_empty = false;
1028                    }
1029                }
1030                // Remove dead backends from the map BEFORE handling
1031                // backend_close. client.close() already decremented
1032                // connections_per_backend / backend.connections gauges in
1033                // the loop above; if we return SessionResult::Close before
1034                // removing them, Mux::close() would decrement again
1035                // (double-decrement → gauge underflow).
1036                if !dead_backends.is_empty() {
1037                    for token in &dead_backends {
1038                        let proxy_borrow = proxy.borrow();
1039                        if let Some(mut client) = self.router.backends.remove(token) {
1040                            client.timeout_container().cancel();
1041                            let socket = client.socket_mut();
1042                            if let Err(e) = proxy_borrow.deregister_socket(socket) {
1043                                error!(
1044                                    "{} error deregistering back socket({:?}): {:?}",
1045                                    log_context!(self),
1046                                    socket,
1047                                    e
1048                                );
1049                            }
1050                            // invariant: write-only shutdown — Shutdown::Both on a TLS frontend
1051                            // discards the receive buffer and elicits TCP RST, truncating the
1052                            // already-queued response. Canonical write-up: `lib/src/https.rs:650-655`.
1053                            // Backend sockets follow the same discipline for symmetry.
1054                            if let Err(e) = socket.shutdown(Shutdown::Write) {
1055                                if e.kind() != ErrorKind::NotConnected {
1056                                    error!(
1057                                        "{} error shutting down back socket({:?}): {:?}",
1058                                        log_context!(self),
1059                                        socket,
1060                                        e
1061                                    );
1062                                }
1063                            }
1064                        } else {
1065                            error!("{} session {:?} has no backend!", log_context!(self), token);
1066                        }
1067                        if !proxy_borrow.remove_session(*token) {
1068                            error!(
1069                                "{} session {:?} was already removed!",
1070                                log_context!(self),
1071                                token
1072                            );
1073                        }
1074                    }
1075                    trace!("{} FRONTEND: {:#?}", log_context!(self), self.frontend);
1076                    trace!(
1077                        "{} BACKENDS: {:#?}",
1078                        log_context!(self),
1079                        self.router.backends
1080                    );
1081                }
1082                if let Some((reason, token)) = backend_close {
1083                    if !self.delay_close_for_frontend_flush(reason) {
1084                        debug!(
1085                            "{} Mux close from {} token={:?}: frontend={:?}",
1086                            log_context!(self),
1087                            reason,
1088                            token,
1089                            self.frontend
1090                        );
1091                        return SessionResult::Close;
1092                    }
1093                    all_backends_readiness_are_empty = false;
1094                }
1095
1096                if self.frontend.readiness().filter_interest().is_writable() {
1097                    let res = {
1098                        let context = &mut self.context;
1099                        let res = self
1100                            .frontend
1101                            .writable(context, EndpointClient(&mut self.router));
1102                        context.debug.push(DebugEvent::SW(
1103                            self.frontend_token,
1104                            res,
1105                            self.frontend.readiness().clone(),
1106                        ));
1107                        res
1108                    };
1109                    match res {
1110                        MuxResult::Continue => {}
1111                        MuxResult::CloseSession => {
1112                            if !self.delay_close_for_frontend_flush("frontend writable") {
1113                                debug!(
1114                                    "{} Mux close from frontend writable: {:?}",
1115                                    log_context!(self),
1116                                    self.frontend
1117                                );
1118                                return SessionResult::Close;
1119                            }
1120                        }
1121                        MuxResult::Upgrade => return SessionResult::Upgrade,
1122                    }
1123                    // Cross-readiness: frontend wrote → wake parked backends.
1124                    // If any backend resumes, invalidate the stale readiness
1125                    // flag so the inner loop continues instead of breaking.
1126                    let context = &mut self.context;
1127                    for (_token, backend) in self.router.backends.iter_mut() {
1128                        if backend.try_resume_reading(context) {
1129                            all_backends_readiness_are_empty = false;
1130                        }
1131                    }
1132                }
1133
1134                if self.frontend.readiness().filter_interest().is_empty()
1135                    && all_backends_readiness_are_empty
1136                {
1137                    break;
1138                }
1139
1140                counter += 1;
1141                if counter >= MAX_LOOP_ITERATIONS {
1142                    incr!(names::http::INFINITE_LOOP_ERROR);
1143                    if self.frontend.has_pending_write() {
1144                        debug!(
1145                            "{} Mux loop budget exhausted while frontend flush pending: {:?}",
1146                            log_context!(self),
1147                            self.frontend
1148                        );
1149                        self.frontend.readiness_mut().event.remove(Ready::WRITABLE);
1150                        self.frontend.timeout_container().set(self.frontend_token);
1151                        break;
1152                    }
1153                    return SessionResult::Close;
1154                }
1155            }
1156
1157            let context = &mut self.context;
1158            let answers_rc = context.listener.borrow().get_answers().clone();
1159            let mut dirty = false;
1160            while let Some(stream_id) = context.pending_links.pop_front() {
1161                let Some(stream) = context.streams.get(stream_id) else {
1162                    continue;
1163                };
1164                if stream.state != StreamState::Link {
1165                    continue;
1166                }
1167                // Before the first request triggers a stream Link, the frontend timeout is set
1168                // to a shorter request_timeout, here we switch to the longer nominal timeout
1169                self.frontend
1170                    .timeout_container()
1171                    .set_duration(self.configured_frontend_timeout);
1172                let front_readiness = self.frontend.readiness_mut();
1173                dirty = true;
1174                match self.router.connect(
1175                    stream_id,
1176                    context,
1177                    session.clone(),
1178                    proxy.clone(),
1179                    self.frontend_token,
1180                ) {
1181                    Ok(_) => {
1182                        let state = context.streams[stream_id].state;
1183                        context.debug.push(DebugEvent::CC(stream_id, state));
1184                    }
1185                    Err(error) => {
1186                        trace!("{} Connection error: {}", log_module_context!(), error);
1187                        let stream = &mut context.streams[stream_id];
1188                        let answers = answers_rc.borrow();
1189                        use BackendConnectionError as BE;
1190                        match error {
1191                            BE::MaxConnectionRetries(_)
1192                            | BE::MaxSessionsMemory
1193                            | BE::MaxBuffers => {
1194                                warn!(
1195                                    "{} backend retry budget exhausted: {}",
1196                                    log_module_context!(stream.context),
1197                                    error
1198                                );
1199                                set_default_answer(stream, front_readiness, 503, &answers);
1200                            }
1201                            BE::Backend(BackendError::NoBackendForCluster(_)) => {
1202                                set_default_answer(stream, front_readiness, 503, &answers);
1203                            }
1204                            BE::RetrieveClusterError(RetrieveClusterError::RetrieveFrontend(
1205                                ref err,
1206                            )) => {
1207                                // RFC 9110 §15.5.1: a malformed authority is a
1208                                // 400. A syntactically valid authority that
1209                                // simply has no matching frontend stays on the
1210                                // historical 404 path.
1211                                let code = match err {
1212                                    FrontendFromRequestError::HostParse { .. }
1213                                    | FrontendFromRequestError::InvalidCharsAfterHost(_) => 400,
1214                                    FrontendFromRequestError::NoClusterFound(_) => 404,
1215                                };
1216                                set_default_answer(stream, front_readiness, code, &answers);
1217                            }
1218                            BE::RetrieveClusterError(RetrieveClusterError::UnauthorizedRoute) => {
1219                                set_default_answer(stream, front_readiness, 401, &answers);
1220                            }
1221                            BE::RetrieveClusterError(
1222                                RetrieveClusterError::SniAuthorityMismatch { .. },
1223                            ) => {
1224                                // RFC 9110 §15.5.20: 421 Misdirected Request is the
1225                                // semantically correct status for an authority that
1226                                // does not belong to this TLS connection. The
1227                                // http.sni_authority_mismatch metric emitted in
1228                                // `route_from_request` remains the durable signal;
1229                                // the 421 body here is what a client sees and may
1230                                // retry on a fresh TLS connection with a matching SNI.
1231                                set_default_answer(stream, front_readiness, 421, &answers);
1232                            }
1233                            BE::RetrieveClusterError(RetrieveClusterError::HttpsRedirect) => {
1234                                // Use the redirect status stashed by `Router::route_from_request`
1235                                // (#1009). Falls back to 301 for the legacy
1236                                // `cluster.https_redirect = true` path that does
1237                                // not set the field.
1238                                let code = stream.context.redirect_status.unwrap_or(301);
1239                                set_default_answer(stream, front_readiness, code, &answers);
1240                            }
1241
1242                            BE::Backend(ref e) => {
1243                                error!("{} backend connection error: {}", log_module_context!(), e);
1244                                set_default_answer(stream, front_readiness, 503, &answers);
1245                            }
1246                            BE::RetrieveClusterError(ref other) => {
1247                                error!(
1248                                    "{} unexpected RetrieveClusterError variant: {:?}",
1249                                    log_module_context!(),
1250                                    other
1251                                );
1252                                set_default_answer(stream, front_readiness, 503, &answers);
1253                            }
1254                            // TCP specific error
1255                            BE::NotFound(ref msg) => {
1256                                error!(
1257                                    "{} NotFound is TCP-specific, not reachable in mux: {:?}",
1258                                    log_module_context!(),
1259                                    msg
1260                                );
1261                                set_default_answer(stream, front_readiness, 503, &answers);
1262                            }
1263                            // Per-(cluster, source-IP) connection limit reached.
1264                            // Emit HTTP 429 with the resolved `Retry-After`. The
1265                            // value is computed in `Router::connect` (where the
1266                            // SessionManager + cluster override are reachable)
1267                            // and stashed on the stream context just before the
1268                            // error is returned, so the answer engine can render
1269                            // (or elide) the header without re-deriving the
1270                            // resolution chain here.
1271                            BE::TooManyConnectionsPerIp { ref cluster_id } => {
1272                                debug!(
1273                                    "{} per-(cluster, source-IP) limit hit for cluster {:?}",
1274                                    log_module_context!(),
1275                                    cluster_id
1276                                );
1277                                let retry_after = stream.context.retry_after_seconds;
1278                                set_default_answer_with_retry_after(
1279                                    stream,
1280                                    front_readiness,
1281                                    429,
1282                                    &answers,
1283                                    retry_after,
1284                                );
1285                            }
1286                        }
1287                        context.debug.push(DebugEvent::CCF(stream_id, error));
1288                    }
1289                }
1290                // All routing error arms now set a default answer, transitioning
1291                // the stream out of Link state. No re-enqueue needed.
1292            }
1293            if !dirty {
1294                break;
1295            }
1296        }
1297
1298        // Stop service timers before yielding to epoll, so idle wait time is excluded
1299        // from the service_time metric. For Close/Upgrade returns, close() handles cleanup.
1300        for stream in &mut self.context.streams {
1301            if stream.state.is_open() {
1302                stream.metrics.service_stop();
1303            }
1304        }
1305
1306        #[cfg(debug_assertions)]
1307        {
1308            // Verify backend_streams index matches actual stream states.
1309            let mut expected: HashMap<Token, Vec<GlobalStreamId>> = HashMap::new();
1310            for (id, stream) in self.context.streams.iter().enumerate() {
1311                if let StreamState::Linked(token) = stream.state {
1312                    expected.entry(token).or_default().push(id);
1313                }
1314            }
1315            assert_eq!(
1316                expected.len(),
1317                self.context.backend_streams.len(),
1318                "backend_streams index key count mismatch: expected={:?}, actual={:?}",
1319                expected,
1320                self.context.backend_streams
1321            );
1322            for (token, mut expected_ids) in expected {
1323                let mut actual_ids = self
1324                    .context
1325                    .backend_streams
1326                    .get(&token)
1327                    .cloned()
1328                    .unwrap_or_default();
1329                expected_ids.sort();
1330                actual_ids.sort();
1331                assert_eq!(
1332                    expected_ids, actual_ids,
1333                    "backend_streams index mismatch for token {token:?}",
1334                );
1335            }
1336        }
1337
1338        SessionResult::Continue
1339    }
1340
1341    fn update_readiness(&mut self, token: Token, events: Ready) {
1342        trace!("{} EVENTS: {:?} on {:?}", log_context!(self), events, token);
1343        self.context.debug.push(DebugEvent::EV(token, events));
1344        if token == self.frontend_token {
1345            self.frontend.readiness_mut().event |= events;
1346        } else if let Some(c) = self.router.backends.get_mut(&token) {
1347            c.readiness_mut().event |= events;
1348        }
1349    }
1350
1351    fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
1352        trace!("{} MuxState::timeout({:?})", log_context!(self), token);
1353        let front_is_h2 = match self.frontend {
1354            Connection::H1(_) => false,
1355            Connection::H2(_) => true,
1356        };
1357        let answers_rc = self.context.listener.borrow().get_answers().clone();
1358        let mut should_close = true;
1359        let mut should_write = false;
1360        if self.frontend_token == token {
1361            trace!(
1362                "{} MuxState::timeout_frontend({:#?})",
1363                log_context!(self),
1364                self.frontend
1365            );
1366            self.frontend.timeout_container().triggered();
1367            let front_readiness = self.frontend.readiness_mut();
1368            for stream_id in 0..self.context.streams.len() {
1369                match self.context.streams[stream_id].state {
1370                    StreamState::Idle => {
1371                        // In h1 an Idle stream is always the first request, so we can send a 408
1372                        // In h2 an Idle stream doesn't necessarily hold a request yet,
1373                        // in most cases it was just reserved, so we can just ignore them.
1374                        if !front_is_h2 {
1375                            let answers = answers_rc.borrow();
1376                            let stream = &mut self.context.streams[stream_id];
1377                            stream.context.access_log_message = Some("client_timeout");
1378                            set_default_answer(stream, front_readiness, 408, &answers);
1379                            should_write = true;
1380                        }
1381                    }
1382                    StreamState::Link => {
1383                        // This is an unusual case, as we have both a complete request and no
1384                        // available backend yet. For now, we answer with 503.
1385                        // Not a timeout-driven outcome from the operator's
1386                        // perspective — leave access_log_message as None.
1387                        let answers = answers_rc.borrow();
1388                        let stream = &mut self.context.streams[stream_id];
1389                        set_default_answer(stream, front_readiness, 503, &answers);
1390                        should_write = true;
1391                    }
1392                    StreamState::Linked(_) => {
1393                        // The frontend timed out while a stream is linked to a backend.
1394                        // The backend timeout should handle this, but in case the backend
1395                        // is also stalled, send a 504 and terminate the stream.
1396                        if !self.context.streams[stream_id].back.consumed {
1397                            self.context.unlink_stream(stream_id);
1398                            let answers = answers_rc.borrow();
1399                            let stream = &mut self.context.streams[stream_id];
1400                            stream.context.access_log_message =
1401                                Some("client_timeout_during_response");
1402                            set_default_answer(stream, front_readiness, 504, &answers);
1403                            should_write = true;
1404                        } else if self.context.streams[stream_id].back.is_completed() {
1405                            // Response fully proxied, stream can be closed
1406                        } else if self.context.streams[stream_id].back.is_terminated()
1407                            || self.context.streams[stream_id].back.is_error()
1408                        {
1409                            // Response is terminated/error but not fully written to frontend.
1410                            // Keep the session alive briefly to flush remaining data.
1411                            should_close = false;
1412                        } else {
1413                            // Partial response in progress — forcefully terminate
1414                            self.context.unlink_stream(stream_id);
1415                            let stream = &mut self.context.streams[stream_id];
1416                            stream.context.access_log_message =
1417                                Some("client_timeout_during_response");
1418                            forcefully_terminate_answer(
1419                                stream,
1420                                front_readiness,
1421                                H2Error::InternalError,
1422                            );
1423                            should_write = true;
1424                        }
1425                        // end_stream is called in a second pass below to avoid
1426                        // borrow conflicts on context.streams.
1427                    }
1428                    StreamState::Unlinked => {
1429                        // A stream Unlinked already has a response and its backend closed.
1430                        // In case it hasn't finished proxying we wait. Otherwise it is a stream
1431                        // kept alive for a new request, which can be killed.
1432                        if !self.context.streams[stream_id].back.is_completed() {
1433                            should_close = false;
1434                        }
1435                    }
1436                    StreamState::Recycle => {
1437                        // A recycled stream is an h2 stream which doesn't hold a request anymore.
1438                        // We can ignore it.
1439                    }
1440                }
1441            }
1442            // Second pass: end streams that were linked to backends.
1443            // This is done separately to avoid borrow conflicts on context.streams.
1444            let linked_streams: Vec<(GlobalStreamId, Token)> = self
1445                .context
1446                .streams
1447                .iter()
1448                .enumerate()
1449                .filter_map(|(id, stream)| {
1450                    if let StreamState::Linked(back_token) = stream.state {
1451                        Some((id, back_token))
1452                    } else {
1453                        None
1454                    }
1455                })
1456                .collect();
1457            for (stream_id, back_token) in linked_streams {
1458                if let Some(backend) = self.router.backends.get_mut(&back_token) {
1459                    backend.end_stream(stream_id, &mut self.context);
1460                }
1461            }
1462        } else if let Some(backend) = self.router.backends.get_mut(&token) {
1463            trace!(
1464                "{} MuxState::timeout_backend({:#?})",
1465                log_context_lite!(self),
1466                backend
1467            );
1468            backend.timeout_container().triggered();
1469            let front_readiness = self.frontend.readiness_mut();
1470            let linked_ids: Vec<GlobalStreamId> = self
1471                .context
1472                .backend_streams
1473                .get(&token)
1474                .map_or_else(Vec::new, |ids| ids.to_owned());
1475            for stream_id in linked_ids {
1476                // This stream is linked to the backend that timedout
1477                if self.context.streams[stream_id].back.is_terminated()
1478                    || self.context.streams[stream_id].back.is_error()
1479                {
1480                    trace!(
1481                        "{} Stream terminated or in error, do nothing, just wait a bit more",
1482                        log_module_context!()
1483                    );
1484                    // Nothing to do, simply wait for the remaining bytes to be proxied
1485                    if !self.context.streams[stream_id].back.is_completed() {
1486                        should_close = false;
1487                    }
1488                } else if !self.context.streams[stream_id].back.consumed {
1489                    // The response has not started yet
1490                    trace!(
1491                        "{} Stream still waiting for response, send 504",
1492                        log_module_context!()
1493                    );
1494                    self.context.unlink_stream(stream_id);
1495                    let answers = answers_rc.borrow();
1496                    let stream = &mut self.context.streams[stream_id];
1497                    stream.context.access_log_message = Some("backend_timeout");
1498                    set_default_answer(stream, front_readiness, 504, &answers);
1499                    should_write = true;
1500                } else {
1501                    trace!(
1502                        "{} Stream waiting for end of response, forcefully terminate it",
1503                        log_module_context!()
1504                    );
1505                    self.context.unlink_stream(stream_id);
1506                    let stream = &mut self.context.streams[stream_id];
1507                    stream.context.access_log_message = Some("backend_response_timeout");
1508                    forcefully_terminate_answer(stream, front_readiness, H2Error::InternalError);
1509                    should_write = true;
1510                }
1511                backend.end_stream(stream_id, &mut self.context);
1512            }
1513            // Re-arm the backend timeout if the session stays alive (draining streams).
1514            // Without this, the timeout is consumed and the session becomes immortal
1515            // until the zombie checker runs.
1516            if !should_close {
1517                backend.timeout_container().set(token);
1518            }
1519        } else {
1520            // Session received a timeout for an unknown token, ignore it
1521            return StateResult::Continue;
1522        }
1523        if should_write {
1524            // Drain as much pending data as possible before closing.
1525            // A single writable() call is insufficient for large responses —
1526            // the TLS buffer may need multiple flushes. Without this loop,
1527            // the session is killed with unflushed TLS data, causing the
1528            // client to receive a truncated TLS record ("decode error").
1529            //
1530            // The constant 16 is empirical: it papers over a missing
1531            // invariant-15 hop in the H2 mux state machine where the
1532            // writable readiness signal is not always re-armed after a
1533            // partial flush. Long-term plan: reach invariant-15 closure
1534            // and remove this loop. See `lib/src/protocol/mux/LIFECYCLE.md`.
1535            let mut result = StateResult::Continue;
1536            for _ in 0..16 {
1537                result = match self
1538                    .frontend
1539                    .writable(&mut self.context, EndpointClient(&mut self.router))
1540                {
1541                    MuxResult::Continue => StateResult::Continue,
1542                    MuxResult::Upgrade => StateResult::Upgrade,
1543                    MuxResult::CloseSession => StateResult::CloseSession,
1544                };
1545                if result != StateResult::Continue
1546                    || !self.frontend.readiness_mut().interest.is_writable()
1547                {
1548                    break;
1549                }
1550            }
1551            // Re-arm the frontend timeout so the session doesn't become immortal.
1552            // The writable call may have partially flushed the response — we need
1553            // the timeout to fire again if the flush stalls.
1554            if result == StateResult::Continue {
1555                self.frontend.timeout_container().set(self.frontend_token);
1556            }
1557            return result;
1558        }
1559        if should_close {
1560            if self.delay_close_for_frontend_flush("timeout") {
1561                debug!(
1562                    "{} Mux timeout delaying close for frontend flush: token={:?}, frontend={:?}",
1563                    log_context!(self),
1564                    token,
1565                    self.frontend
1566                );
1567                self.frontend.timeout_container().set(self.frontend_token);
1568                return StateResult::Continue;
1569            }
1570            if front_is_h2 {
1571                debug!(
1572                    "{} Mux timeout returning CloseSession: token={:?}, frontend={:?}",
1573                    log_context!(self),
1574                    token,
1575                    self.frontend
1576                );
1577                for (idx, stream) in self.context.streams.iter().enumerate() {
1578                    if stream.state != StreamState::Recycle {
1579                        debug!(
1580                            "{}   timeout stream[{}]: state={:?}, front_phase={:?}, back_phase={:?}, front_completed={}, back_completed={}",
1581                            log_context!(self),
1582                            idx,
1583                            stream.state,
1584                            stream.front.parsing_phase,
1585                            stream.back.parsing_phase,
1586                            stream.front.is_completed(),
1587                            stream.back.is_completed()
1588                        );
1589                    }
1590                }
1591            }
1592            StateResult::CloseSession
1593        } else {
1594            // Re-arm the frontend timeout. Without this, the timeout is consumed
1595            // by triggered() and the session stays alive indefinitely until the
1596            // zombie checker runs (default: 30 minutes).
1597            self.frontend.timeout_container().set(self.frontend_token);
1598            StateResult::Continue
1599        }
1600    }
1601
1602    fn cancel_timeouts(&mut self) {
1603        trace!("{} MuxState::cancel_timeouts", log_context!(self));
1604        self.frontend.timeout_container().cancel();
1605        for backend in self.router.backends.values_mut() {
1606            backend.timeout_container().cancel();
1607        }
1608    }
1609
1610    fn print_state(&self, _context: &str) {
1611        // The trait-required `context: &str` parameter (protocol tag like
1612        // "HTTPS"/"HTTP" passed by callers) predates the unified
1613        // `log_context!(self)` envelope. The canonical `MUX` tag lives
1614        // inside `log_context!`, so we ignore the parameter here and emit
1615        // the bracketed Session(...) block instead, mirroring the second
1616        // `error!` in this function.
1617        error!(
1618            "\
1619{} Session(Mux)
1620\tFrontend:
1621\t\ttoken: {:?}\treadiness: {:?}
1622\tBackend(s):",
1623            log_context!(self),
1624            self.frontend_token,
1625            self.frontend.readiness()
1626        );
1627        for (backend_token, backend) in &self.router.backends {
1628            error!(
1629                "{} \t\ttoken: {:?}\treadiness: {:?}",
1630                log_context!(self),
1631                backend_token,
1632                backend.readiness()
1633            )
1634        }
1635    }
1636
1637    fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
1638        if self.context.debug.is_interesting() {
1639            warn!("{} {:?}", log_context!(self), self.context.debug.events);
1640        }
1641        debug!("{} MUX CLOSE", log_context!(self));
1642        trace!("{} FRONTEND: {:#?}", log_context!(self), self.frontend);
1643        trace!(
1644            "{} BACKENDS: {:#?}",
1645            log_context!(self),
1646            self.router.backends
1647        );
1648
1649        // Log active streams at session teardown for timeout diagnosis
1650        let active_count = self
1651            .context
1652            .streams
1653            .iter()
1654            .filter(|s| s.state.is_open() && s.metrics.start.is_some())
1655            .count();
1656        if active_count > 0 {
1657            debug!(
1658                "{} Session close with {} active stream(s)",
1659                log_context!(self),
1660                active_count
1661            );
1662            for (idx, stream) in self
1663                .context
1664                .streams
1665                .iter()
1666                .enumerate()
1667                .filter(|(_, s)| s.state.is_open() && s.metrics.start.is_some())
1668            {
1669                let elapsed = stream.metrics.service_time();
1670                debug!(
1671                    "{}   active stream[{}]: state={:?} service_time={:?} method={:?} path={:?} status={:?}",
1672                    log_context!(self),
1673                    idx,
1674                    stream.state,
1675                    elapsed,
1676                    stream.context.method,
1677                    stream.context.path,
1678                    stream.context.status,
1679                );
1680            }
1681            incr!(names::h2::CLOSE_WITH_ACTIVE_STREAMS);
1682        }
1683
1684        // Distribute H2 connection-level overhead (control frames) across in-flight
1685        // streams so that access log bytes_in/bytes_out reflect actual wire cost.
1686        // Integer division may lose up to (active_count - 1) bytes, which is acceptable.
1687        let active_count = active_count.max(1);
1688        let (total_overhead_in, total_overhead_out) = self.frontend.overhead_bytes();
1689        let share_in = total_overhead_in / active_count;
1690        let share_out = total_overhead_out / active_count;
1691
1692        // Generate access logs for in-flight streams on session teardown.
1693        // Skip streams that already had their access log emitted (metrics.start is
1694        // set to None by metrics.reset() after generate_access_log in the happy path).
1695        // Frontend RTT is the same for every stream on this session — snapshot
1696        // it once outside the loop instead of paying one TCP_INFO syscall per
1697        // open stream.
1698        let client_rtt = socket_rtt(self.frontend.socket());
1699        for stream in &mut self.context.streams {
1700            if stream.state.is_open() && stream.metrics.start.is_some() {
1701                stream.metrics.bin += share_in;
1702                stream.metrics.bout += share_out;
1703                stream.metrics.service_stop();
1704                if stream.metrics.backend_stop.is_none() {
1705                    stream.metrics.backend_stop();
1706                }
1707                // Only mark as error if the stream had an actual protocol/processing failure
1708                // (kawa parse error, backend error). Normal timeouts, client disconnects,
1709                // and graceful connection closures are not errors.
1710                let is_error = stream.front.is_error() || stream.back.is_error();
1711                let server_rtt = stream.linked_token().and_then(|token| {
1712                    self.router
1713                        .backends
1714                        .get(&token)
1715                        .and_then(|c| socket_rtt(c.socket()))
1716                });
1717                stream.generate_access_log(
1718                    is_error,
1719                    Some("session close"),
1720                    self.context.listener.clone(),
1721                    client_rtt,
1722                    server_rtt,
1723                );
1724                stream.state = StreamState::Recycle;
1725            }
1726        }
1727
1728        self.frontend
1729            .close(&mut self.context, EndpointClient(&mut self.router));
1730
1731        for (token, client) in &mut self.router.backends {
1732            let proxy_borrow = proxy.borrow();
1733            client.timeout_container().cancel();
1734            let socket = client.socket_mut();
1735            if let Err(e) = proxy_borrow.deregister_socket(socket) {
1736                error!(
1737                    "{} error deregistering back socket({:?}): {:?}",
1738                    log_context_lite!(self),
1739                    socket,
1740                    e
1741                );
1742            }
1743            // invariant: write-only shutdown — Shutdown::Both on a TLS frontend
1744            // discards the receive buffer and elicits TCP RST, truncating the
1745            // already-queued response. Canonical write-up: `lib/src/https.rs:650-655`.
1746            // Backend sockets follow the same discipline for symmetry.
1747            if let Err(e) = socket.shutdown(Shutdown::Write) {
1748                if e.kind() != ErrorKind::NotConnected {
1749                    error!(
1750                        "{} error shutting down back socket({:?}): {:?}",
1751                        log_context_lite!(self),
1752                        socket,
1753                        e
1754                    );
1755                }
1756            }
1757            if !proxy_borrow.remove_session(*token) {
1758                error!(
1759                    "{} session {:?} was already removed!",
1760                    log_context_lite!(self),
1761                    token
1762                );
1763            }
1764
1765            match client.position() {
1766                Position::Client(cluster_id, backend, _) => {
1767                    let mut backend_borrow = backend.borrow_mut();
1768                    backend_borrow.dec_connections();
1769                    gauge_add!(names::backend::CONNECTIONS, -1);
1770                    // Second `-1` site for `backend.pool.size` (the first is
1771                    // in `connection.rs::pre_close_client_bookkeeping`). This
1772                    // path runs during session teardown when the frontend
1773                    // session iterates the backends map directly without
1774                    // routing through `Connection::close`. Both `-1` sites
1775                    // mirror the single `+1` in router.rs::connect and the
1776                    // matching `backend.connections, -1` calls already
1777                    // present here, so symmetry follows from
1778                    // `backend.connections` correctness.
1779                    gauge_add!(names::backend::POOL_SIZE, -1);
1780                    gauge_add!(
1781                        names::backend::CONNECTIONS_PER_BACKEND,
1782                        -1,
1783                        Some(cluster_id),
1784                        Some(&backend_borrow.backend_id)
1785                    );
1786                    let count = self
1787                        .context
1788                        .backend_streams
1789                        .get(token)
1790                        .map_or(0, |ids| ids.len());
1791                    backend_borrow.active_requests =
1792                        backend_borrow.active_requests.saturating_sub(count);
1793                    trace!(
1794                        "{} connection (session) closed: {:#?}",
1795                        log_context_lite!(self),
1796                        backend_borrow
1797                    );
1798                }
1799                Position::Server => {
1800                    error!(
1801                        "{} close_backend called on Server position",
1802                        log_context_lite!(self)
1803                    );
1804                }
1805            }
1806        }
1807        // Clear the reverse index after all backends have decremented their
1808        // active_requests counters (which depend on the index for stream counts).
1809        self.context.backend_streams.clear();
1810    }
1811
1812    fn shutting_down(&mut self) -> SessionIsToBeClosed {
1813        // RFC 9113 §6.8: initiate graceful shutdown with double-GOAWAY pattern.
1814        // Only send the initial GOAWAY once. The final GOAWAY (with the real
1815        // last_stream_id) is handled by finalize_write() when all streams drain.
1816        // Calling graceful_goaway() again would send the final GOAWAY
1817        // prematurely and force-disconnect before in-flight streams complete.
1818        if !self.frontend.is_draining() {
1819            match self.frontend.graceful_goaway() {
1820                MuxResult::CloseSession => return true,
1821                MuxResult::Continue => {
1822                    // graceful_goaway() queued a GOAWAY frame. Flush it directly
1823                    // since the event loop uses edge-triggered epoll and won't
1824                    // deliver a new WRITABLE event for an already-writable socket.
1825                    self.frontend.flush_zero_buffer();
1826                }
1827                _ => {}
1828            }
1829        } else {
1830            trace!(
1831                "{} shutting_down: already draining, skipping duplicate GOAWAY",
1832                log_context!(self)
1833            );
1834            // shut_down_sessions() runs outside ready(), so retry flushing any
1835            // previously-buffered GOAWAY/TLS records on each pass.
1836            self.frontend.flush_zero_buffer();
1837        }
1838        if self.drive_frontend_shutdown_io() {
1839            return true;
1840        }
1841        // Forced-close deadline: once the H2 listener's
1842        // `h2_graceful_shutdown_deadline_seconds` budget has elapsed from
1843        // the moment `graceful_goaway` armed `drain.started_at`, stop
1844        // waiting for streams and tear the session down. `drive_frontend_
1845        // shutdown_io` above already had a chance to flush any pending
1846        // TLS/GOAWAY records; this branch accepts that some bytes may be
1847        // lost in exchange for honoring the operator-configured SLA.
1848        // Listeners that disable the knob (`= 0` → `None`) short-circuit
1849        // the check inside `graceful_shutdown_deadline_elapsed`.
1850        if self.frontend.graceful_shutdown_deadline_elapsed() {
1851            debug!(
1852                "{} Mux shutting_down: graceful-shutdown deadline elapsed, forcing close",
1853                log_context!(self)
1854            );
1855            return true;
1856        }
1857        if matches!(self.frontend, Connection::H2(_)) && self.frontend.is_draining() {
1858            for stream in &mut self.context.streams {
1859                if stream.front_received_end_of_stream {
1860                    continue;
1861                }
1862                if !matches!(stream.state, StreamState::Linked(_) | StreamState::Unlinked) {
1863                    continue;
1864                }
1865                if stream.front.consumed
1866                    && stream.front.storage.is_empty()
1867                    && stream.front.is_completed()
1868                {
1869                    stream.front_received_end_of_stream = true;
1870                    self.frontend
1871                        .readiness_mut()
1872                        .interest
1873                        .insert(Ready::WRITABLE);
1874                    self.frontend.readiness_mut().signal_pending_write();
1875                }
1876            }
1877        }
1878        let mut can_stop = true;
1879        for stream in &mut self.context.streams {
1880            match stream.state {
1881                StreamState::Linked(_) => {
1882                    can_stop = false;
1883                }
1884                StreamState::Unlinked => {
1885                    kawa::debug_kawa(&stream.front);
1886                    kawa::debug_kawa(&stream.back);
1887                    if stream.is_quiesced() {
1888                        continue;
1889                    }
1890                    stream.context.closing = true;
1891                    can_stop = false;
1892                }
1893                _ => {}
1894            }
1895        }
1896        if self.frontend.has_pending_write() {
1897            return false;
1898        }
1899        if can_stop {
1900            let active_h2_streams = self
1901                .context
1902                .streams
1903                .iter()
1904                .enumerate()
1905                .filter(|(_, s)| {
1906                    if s.state == StreamState::Recycle {
1907                        return false;
1908                    }
1909                    if s.state == StreamState::Unlinked && s.is_quiesced() {
1910                        return false;
1911                    }
1912                    true
1913                })
1914                .collect::<Vec<_>>();
1915            if matches!(self.frontend, Connection::H2(_)) && !active_h2_streams.is_empty() {
1916                debug!(
1917                    "{} Mux shutting_down returning true with active H2 streams: {:?}",
1918                    log_context!(self),
1919                    self.frontend
1920                );
1921                for (idx, stream) in active_h2_streams {
1922                    debug!(
1923                        "{}   shutdown stream[{}]: state={:?}, front_phase={:?}, back_phase={:?}, front_completed={}, back_completed={}",
1924                        log_context!(self),
1925                        idx,
1926                        stream.state,
1927                        stream.front.parsing_phase,
1928                        stream.back.parsing_phase,
1929                        stream.front.is_completed(),
1930                        stream.back.is_completed()
1931                    );
1932                }
1933            }
1934        }
1935        if can_stop {
1936            return true;
1937        }
1938
1939        false
1940    }
1941}
1942
1943#[cfg(test)]
1944mod tests {
1945    use super::*;
1946
1947    #[test]
1948    fn update_readiness_after_read_closed_keeps_writable() {
1949        let mut readiness = Readiness {
1950            event: Ready::READABLE | Ready::WRITABLE | Ready::HUP,
1951            interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP,
1952        };
1953
1954        let should_yield = update_readiness_after_read(17, SocketResult::Closed, &mut readiness);
1955
1956        assert!(!should_yield);
1957        assert!(!readiness.event.is_readable());
1958        assert!(readiness.event.is_writable());
1959        assert!(readiness.event.is_hup());
1960    }
1961}