Skip to main content

sozu_lib/protocol/mux/
mod.rs

1//! HTTP/1.1 and HTTP/2 multiplexing layer.
2//!
3//! This module unifies HTTP/1.1 and HTTP/2 behind a single [`Mux`] session
4//! state machine that integrates with sozu's mio event loop. The key types:
5//!
6//! - [`Mux`]: The top-level session state, generic over socket (`TcpStream` or
7//!   `FrontRustls`) and listener. Implements `SessionState`.
8//! - [`Connection`]: Enum dispatching to [`ConnectionH1`] or [`ConnectionH2`]
9//!   for protocol-specific readable/writable logic.
10//! - [`Stream`]: Per-request state with front/back kawa buffers, metrics, and
11//!   lifecycle tracking. Shared between H1 and H2 paths.
12//! - [`Context`]: Per-session context (cluster, backends, routing, timeouts).
13//!
14//! The H2 implementation handles RFC 9113 framing, HPACK (RFC 7541), flow
15//! control, flood detection (CVE-2023-44487, CVE-2019-9512/9514/9515/9518,
16//! CVE-2024-27316), and graceful shutdown (double-GOAWAY per RFC 9113 §6.8).
17
18use std::{
19    cell::RefCell,
20    collections::{HashMap, VecDeque},
21    fmt::Debug,
22    io::ErrorKind,
23    net::{Shutdown, SocketAddr},
24    rc::{Rc, Weak},
25    sync::Arc,
26    time::{Duration, Instant},
27};
28
29use mio::{Token, net::TcpStream};
30use rusty_ulid::Ulid;
31use sozu_command::{
32    logging::ansi_palette,
33    proto::command::{Event, EventKind},
34    ready::Ready,
35};
36
37/// Protocol label + session descriptor used as a prefix on every [`Mux`] log
38/// line. Matches the RUSTLS log-context convention:
39/// `[<ulid> - - -]\tMUX\tSession(...)\t >>>`. When colored output is enabled
40/// (via [`ansi_palette`]) the label is wrapped in bold bright-white ANSI
41/// (uniform across every protocol) and the session detail block is rendered
42/// in light grey.
43///
44/// Fields included in the session block:
45/// - `frontend` — mio token of the frontend socket
46/// - `peer` — peer address (or `None` if the socket is gone)
47/// - `streams` — number of streams currently held by the [`Context`]
48/// - `backends` — number of backend connections in the [`Router`]
49/// - `pending_links` — streams waiting to be linked to a backend
50/// - `readiness` — frontend mio readiness snapshot
51macro_rules! log_context {
52    ($self:expr) => {{
53        let (open, reset, grey, gray, white) = ansi_palette();
54        format!(
55            "[{ulid} - - -]\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}peer{reset}={white}{peer:?}{reset}, {gray}streams{reset}={white}{streams}{reset}, {gray}backends{reset}={white}{backends}{reset}, {gray}pending_links{reset}={white}{pending_links}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
56            open = open,
57            reset = reset,
58            grey = grey,
59            gray = gray,
60            white = white,
61            ulid = $self.session_ulid,
62            frontend = $self.frontend_token.0,
63            peer = $self.frontend.socket().peer_addr().ok(),
64            streams = $self.context.streams.len(),
65            backends = $self.router.backends.len(),
66            pending_links = $self.context.pending_links.len(),
67            readiness = $self.frontend.readiness(),
68        )
69    }};
70}
71
72/// Lighter variant of [`log_context!`] that omits the
73/// `streams`/`backends`/`pending_links` counts. Used at sites where the
74/// borrow checker forbids reading `self.router.backends` or
75/// `self.context.streams` (e.g. inside a method that already holds a mutable
76/// borrow on one of them). The ULID and frontend snapshot still carry enough
77/// context to correlate the line back to the rest of the session.
78macro_rules! log_context_lite {
79    ($self:expr) => {{
80        let (open, reset, grey, gray, white) = ansi_palette();
81        format!(
82            "[{ulid} - - -]\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}peer{reset}={white}{peer:?}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
83            open = open,
84            reset = reset,
85            grey = grey,
86            gray = gray,
87            white = white,
88            ulid = $self.session_ulid,
89            frontend = $self.frontend_token.0,
90            peer = $self.frontend.socket().peer_addr().ok(),
91            readiness = $self.frontend.readiness(),
92        )
93    }};
94}
95
96/// Module-level prefix for logs emitted from free functions or routing
97/// blocks where no [`Mux`] is in scope. Honours the colored flag.
98///
99/// Two arms:
100/// * `log_module_context!()` — zero-arg, legacy `MUX\t >>>` output. Kept
101///   for sites without an `HttpContext` in scope (e.g. the generic
102///   `trace!` that fires before the variant-specific match).
103/// * `log_module_context!($http_context)` — rich form. `$http_context`
104///   must be `&HttpContext`. Produces the same
105///   `[session req cluster backend]` bracket as RUSTLS/PIPE/TCP followed
106///   by a `Session(...)` block, so MUX lines emitted from variant match
107///   arms stay filterable by session ULID or request ULID. Mirrors
108///   `router.rs:log_module_context!($http_context)` (see there).
109macro_rules! log_module_context {
110    () => {{
111        let (open, reset, _, _, _) = ansi_palette();
112        format!("{open}MUX{reset}\t >>>", open = open, reset = reset)
113    }};
114    ($http_context:expr) => {{
115        let (open, reset, grey, gray, white) = ansi_palette();
116        let http_ctx: &HttpContext = &$http_context;
117        let ctx = http_ctx.log_context();
118        format!(
119            "{gray}{ctx}{reset}\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend:?}{reset}, {gray}method{reset}={white}{method:?}{reset}, {gray}authority{reset}={white}{authority:?}{reset})\t >>>",
120            open = open,
121            reset = reset,
122            grey = grey,
123            gray = gray,
124            white = white,
125            ctx = ctx,
126            frontend = http_ctx.session_address,
127            method = http_ctx.method,
128            authority = http_ctx.authority,
129        )
130    }};
131}
132
133pub mod answers;
134pub mod auth;
135pub mod connection;
136mod converter;
137pub mod debug;
138mod h1;
139mod h2;
140pub mod parser;
141mod pkawa;
142pub mod router;
143pub(crate) mod serializer;
144mod shared;
145pub mod stream;
146
147use crate::metrics::names;
148use crate::{
149    BackendConnectionError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerHandler,
150    ProxySession, Readiness, RetrieveClusterError, SessionIsToBeClosed, SessionMetrics,
151    SessionResult, StateResult,
152    backends::{Backend, BackendError},
153    http::HttpListener,
154    https::HttpsListener,
155    pool::{Checkout, Pool},
156    protocol::{SessionState, http::editor::HttpContext},
157    retry::RetryPolicy,
158    server::push_event,
159    socket::{FrontRustls, SessionTcpStream, SocketHandler, SocketResult, stats::socket_rtt},
160};
161
162pub(crate) use crate::protocol::mux::answers::{
163    forcefully_terminate_answer, set_default_answer, set_default_answer_with_retry_after,
164};
165use crate::protocol::mux::connection::{EndpointClient, EndpointServer};
166pub use crate::protocol::mux::{
167    answers::terminate_default_answer,
168    connection::Connection,
169    debug::{DebugEvent, DebugHistory},
170    h1::ConnectionH1,
171    h2::ConnectionH2,
172    h2::H2ByteAccounting,
173    h2::H2ConnectionConfig,
174    h2::H2DrainState,
175    h2::H2FloodConfig,
176    h2::H2FlowControl,
177    parser::H2Error,
178    router::Router,
179    stream::{Stream, StreamParts, StreamState},
180};
181
182// ── Tuning Constants ─────────────────────────────────────────────────────────
183
184/// Maximum event loop iterations before forcefully closing a session.
185/// Prevents infinite loops from consuming the single-threaded worker.
186const MAX_LOOP_ITERATIONS: i32 = 10_000;
187// ─────────────────────────────────────────────────────────────────────────────
188
189/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
190type GenericHttpStream = kawa::Kawa<Checkout>;
191type StreamId = u32;
192type GlobalStreamId = usize;
193pub type MuxClear = Mux<SessionTcpStream, HttpListener>;
194pub type MuxTls = Mux<FrontRustls, HttpsListener>;
195
196pub enum Position {
197    Client(String, Rc<RefCell<Backend>>, BackendStatus),
198    Server,
199}
200
201impl Debug for Position {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        match self {
204            Self::Client(cluster_id, _, status) => f
205                .debug_tuple("Client")
206                .field(cluster_id)
207                .field(status)
208                .finish(),
209            Self::Server => write!(f, "Server"),
210        }
211    }
212}
213
214impl Position {
215    fn is_server(&self) -> bool {
216        match self {
217            Position::Client(..) => false,
218            Position::Server => true,
219        }
220    }
221    fn is_client(&self) -> bool {
222        !self.is_server()
223    }
224
225    /// Increment the global `count!()` counter for bytes read on this side.
226    pub fn count_bytes_in_counter(&self, size: usize) {
227        match self {
228            Position::Client(..) => count!(names::backend::BACK_BYTES_IN, size as i64),
229            Position::Server => count!(names::backend::BYTES_IN, size as i64),
230        }
231    }
232
233    /// Increment the global `count!()` counter for bytes written on this side.
234    pub fn count_bytes_out_counter(&self, size: usize) {
235        match self {
236            Position::Client(..) => count!(names::backend::BACK_BYTES_OUT, size as i64),
237            Position::Server => count!(names::backend::BYTES_OUT, size as i64),
238        }
239    }
240
241    /// Attribute `size` bytes read to the appropriate `SessionMetrics` field.
242    pub fn count_bytes_in(&self, metrics: &mut SessionMetrics, size: usize) {
243        match self {
244            Position::Client(..) => metrics.backend_bin += size,
245            Position::Server => metrics.bin += size,
246        }
247    }
248
249    /// Attribute `size` bytes written to the appropriate `SessionMetrics` field.
250    pub fn count_bytes_out(&self, metrics: &mut SessionMetrics, size: usize) {
251        match self {
252            Position::Client(..) => metrics.backend_bout += size,
253            Position::Server => metrics.bout += size,
254        }
255    }
256}
257
258#[derive(Debug)]
259pub enum BackendStatus {
260    Connecting(Instant),
261    Connected,
262    KeepAlive,
263    Disconnecting,
264}
265
266#[derive(Debug, Clone, Copy)]
267pub enum MuxResult {
268    Continue,
269    Upgrade,
270    CloseSession,
271}
272
273pub trait Endpoint: Debug {
274    fn readiness(&self, token: Token) -> &Readiness;
275    fn readiness_mut(&mut self, token: Token) -> &mut Readiness;
276    /// Returns the underlying TCP socket for the peer side of a stream.
277    ///
278    /// Used by access-log emission to capture TCP_INFO RTT for the side the
279    /// caller does NOT own directly: a frontend connection (Position::Server)
280    /// reads the backend socket through this method, and a backend connection
281    /// (Position::Client) reads the frontend socket the same way. `token` is
282    /// ignored by [`super::connection::EndpointServer`] (which has a single
283    /// frontend connection) and used as a key by
284    /// [`super::connection::EndpointClient`] (which keys backends by token).
285    /// Returns `None` when the token doesn't resolve, mirroring the existing
286    /// fallback paths in `readiness`/`readiness_mut`.
287    fn socket(&self, token: Token) -> Option<&TcpStream>;
288    /// If end_stream is called on a client it means the stream has PROPERLY finished,
289    /// the server has completed serving the response and informs the endpoint that this stream won't be used anymore.
290    /// If end_stream is called on a server it means the stream was BROKEN, the client was most likely disconnected or encountered an error
291    /// it is for the server to decide if the stream can be retried or an error should be sent. It should be GUARANTEED that all bytes from
292    /// the backend were read. However it is almost certain that all bytes were not already sent to the client.
293    fn end_stream<L: ListenerHandler + L7ListenerHandler>(
294        &mut self,
295        token: Token,
296        stream: GlobalStreamId,
297        context: &mut Context<L>,
298    );
299    /// If start_stream is called on a client it means the stream should be attached to this endpoint,
300    /// the stream might be recovering from a disconnection, in any case at this point its response MUST be empty.
301    /// If the start_stream is called on a H2 server it means the stream is a server push and its request MUST be empty.
302    /// Returns false if the stream could not be started (e.g. max concurrent streams reached).
303    fn start_stream<L: ListenerHandler + L7ListenerHandler>(
304        &mut self,
305        token: Token,
306        stream: GlobalStreamId,
307        context: &mut Context<L>,
308    ) -> bool;
309}
310
311/// Shared logic for half-close accounting: clear `bit` from `readiness.event` on
312/// socket errors/would-block, and return `true` (yield) when no bytes were
313/// transferred so the caller can park the half. Rationale for clearing only
314/// `bit` (not both halves) on `Closed`: the opposite half may still need one
315/// last pass to flush queued frames or the TLS close_notify.
316fn update_readiness(
317    size: usize,
318    status: SocketResult,
319    readiness: &mut Readiness,
320    bit: Ready,
321) -> bool {
322    trace!(
323        "{}   size={}, status={:?}",
324        log_module_context!(),
325        size,
326        status
327    );
328    match status {
329        SocketResult::Continue => {}
330        SocketResult::Closed | SocketResult::Error | SocketResult::WouldBlock => {
331            readiness.event.remove(bit);
332        }
333    }
334    if size > 0 {
335        false
336    } else {
337        readiness.event.remove(bit);
338        true
339    }
340}
341
342fn update_readiness_after_read(
343    size: usize,
344    status: SocketResult,
345    readiness: &mut Readiness,
346) -> bool {
347    update_readiness(size, status, readiness, Ready::READABLE)
348}
349
350fn update_readiness_after_write(
351    size: usize,
352    status: SocketResult,
353    readiness: &mut Readiness,
354) -> bool {
355    update_readiness(size, status, readiness, Ready::WRITABLE)
356}
357pub struct Context<L: ListenerHandler + L7ListenerHandler> {
358    pub streams: Vec<Stream>,
359    /// Streams whose state is `StreamState::Link` and need backend connection.
360    /// Replaces the O(n) scan of `streams` in the ready loop.
361    pub pending_links: VecDeque<GlobalStreamId>,
362    /// Reverse index: backend token -> global stream IDs currently in
363    /// `StreamState::Linked(token)`. Eliminates O(n) scans of `streams`
364    /// when handling backend connect/disconnect/timeout/close events.
365    pub backend_streams: HashMap<Token, Vec<GlobalStreamId>>,
366    pub pool: Weak<RefCell<Pool>>,
367    pub listener: Rc<RefCell<L>>,
368    /// Connection/session ULID — mirrors `Mux.session_ulid`. Stored here so
369    /// per-stream `HttpContext` construction in [`Self::create_stream`] can
370    /// stamp the session slot of the log-context bracket without reaching
371    /// back into the parent [`Mux`].
372    pub session_ulid: Ulid,
373    pub session_address: Option<SocketAddr>,
374    pub public_address: SocketAddr,
375    pub debug: DebugHistory,
376    /// Shrink threshold ratio for recycled stream slots.
377    /// Vec is shrunk when total_slots > active_streams * ratio.
378    pub h2_stream_shrink_ratio: usize,
379    /// TLS SNI value negotiated at handshake, propagated to every
380    /// per-stream [`HttpContext`] so the routing layer can enforce
381    /// the SNI ↔ `:authority` binding on every H2 stream (and the
382    /// single H1 request). `None` for plaintext listeners or when
383    /// the client omitted the SNI extension. Stored pre-lowercased
384    /// and without a port for cheap exact-match comparison.
385    pub tls_server_name: Option<String>,
386    /// Snapshot of the SAN set of the certificate Sōzu actually served at
387    /// the TLS handshake. Captured once in `https.rs::upgrade_handshake`
388    /// from the resolver and frozen for the connection lifetime so H2
389    /// stream coalescing (RFC 7540 §9.1.1 / RFC 9113 §9.1.1) accepts any
390    /// `:authority` covered by the certificate, with RFC 6125 §6.4.3
391    /// wildcard handling. `None` for plaintext listeners or when SNI was
392    /// absent. `Some(empty)` when the default cert was served — every
393    /// `:authority` is rejected. `Arc` so the snapshot is shared across
394    /// every per-stream `HttpContext` without re-allocation.
395    pub tls_cert_names: Option<Arc<Vec<String>>>,
396    /// Whether the routing layer must reject any request whose authority
397    /// host does not exact-match `tls_server_name` (CWE-346 / CWE-444).
398    /// Mirrors `HttpsListenerConfig::strict_sni_binding`; captured once
399    /// at `Context::new` so routing decisions on each stream avoid a
400    /// per-stream `listener.borrow()`.
401    pub strict_sni_binding: bool,
402    /// Whether the request-side block walk must strip any client-supplied
403    /// `X-Real-IP` header before forwarding (anti-spoofing). Mirrors
404    /// `HttpListenerConfig::elide_x_real_ip` /
405    /// `HttpsListenerConfig::elide_x_real_ip`; captured once at
406    /// `Context::new` so per-stream `HttpContext`s do not need to call
407    /// `listener.borrow()` again. Independent of `send_x_real_ip`.
408    pub elide_x_real_ip: bool,
409    /// Whether `on_request_headers` injects a proxy-generated `X-Real-IP`
410    /// header carrying the connection peer IP (post-PROXY-v2 unwrap).
411    /// Mirrors `HttpListenerConfig::send_x_real_ip` /
412    /// `HttpsListenerConfig::send_x_real_ip`; captured once at
413    /// `Context::new`. Independent of `elide_x_real_ip`.
414    pub send_x_real_ip: bool,
415    /// Negotiated TLS protocol version short-form (e.g. `"TLSv1.3"`).
416    /// Captured once at handshake completion in `https.rs` and propagated
417    /// to every per-stream [`HttpContext`] so the access log can record it
418    /// without reaching back into the rustls session per request. `None`
419    /// for plaintext listeners.
420    pub tls_version: Option<&'static str>,
421    /// Negotiated TLS cipher suite short-form (e.g.
422    /// `"TLS_AES_128_GCM_SHA256"`). Captured once at handshake completion
423    /// and propagated to every per-stream [`HttpContext`]. `None` for
424    /// plaintext listeners.
425    pub tls_cipher: Option<&'static str>,
426    /// Negotiated ALPN protocol short-form (e.g. `"h2"`, `"http/1.1"`).
427    /// Captured once at handshake completion and propagated to every
428    /// per-stream [`HttpContext`]. `None` for plaintext listeners or when
429    /// no ALPN was negotiated.
430    pub tls_alpn: Option<&'static str>,
431}
432
433impl<L: ListenerHandler + L7ListenerHandler> Context<L> {
434    pub fn new(
435        session_ulid: Ulid,
436        pool: Weak<RefCell<Pool>>,
437        listener: Rc<RefCell<L>>,
438        session_address: Option<SocketAddr>,
439        public_address: SocketAddr,
440    ) -> Self {
441        let h2_stream_shrink_ratio = listener
442            .borrow()
443            .get_h2_connection_config()
444            .stream_shrink_ratio as usize;
445        let strict_sni_binding = listener.borrow().get_strict_sni_binding();
446        let elide_x_real_ip = listener.borrow().get_elide_x_real_ip();
447        let send_x_real_ip = listener.borrow().get_send_x_real_ip();
448        Self {
449            streams: Vec::new(),
450            pending_links: VecDeque::new(),
451            backend_streams: HashMap::new(),
452            pool,
453            listener,
454            session_ulid,
455            session_address,
456            public_address,
457            debug: DebugHistory::new(),
458            h2_stream_shrink_ratio,
459            tls_server_name: None,
460            tls_cert_names: None,
461            strict_sni_binding,
462            elide_x_real_ip,
463            send_x_real_ip,
464            tls_version: None,
465            tls_cipher: None,
466            tls_alpn: None,
467        }
468    }
469
470    pub fn active_len(&self) -> usize {
471        self.streams
472            .iter()
473            .filter(|s| !matches!(s.state, StreamState::Recycle))
474            .count()
475    }
476
477    /// Shared accessor for the [`HttpContext`] owned by a stream.
478    ///
479    /// Prefer this over `&self.streams[stream_id].context` at call sites
480    /// that only need read access — it keeps the `Stream`/`HttpContext`
481    /// relationship encapsulated and reads the same regardless of whether
482    /// the caller is inside `Router::connect`, the H2 mux, or a free
483    /// helper. Panics on an out-of-bounds `stream_id`, which is the same
484    /// behaviour as the raw `streams[sid]` indexing it replaces.
485    pub fn http_context(&self, stream_id: GlobalStreamId) -> &HttpContext {
486        &self.streams[stream_id].context
487    }
488
489    /// Mutable sibling of [`Self::http_context`]. Use when routing
490    /// decisions need to stamp `cluster_id` / `backend_id` on the stream's
491    /// [`HttpContext`] (e.g. `Router::connect` at the fill-cluster /
492    /// fill-backend points).
493    pub fn http_context_mut(&mut self, stream_id: GlobalStreamId) -> &mut HttpContext {
494        &mut self.streams[stream_id].context
495    }
496
497    /// Register a stream as linked to a backend token in the reverse index.
498    pub fn link_stream(&mut self, stream_id: GlobalStreamId, token: Token) {
499        self.streams[stream_id].state = StreamState::Linked(token);
500        self.backend_streams
501            .entry(token)
502            .or_default()
503            .push(stream_id);
504    }
505
506    /// Remove a stream from the backend reverse index if it is currently
507    /// `Linked`. Returns the backend token if one was removed.
508    pub fn unlink_stream(&mut self, stream_id: GlobalStreamId) -> Option<Token> {
509        if let StreamState::Linked(token) = self.streams[stream_id].state {
510            remove_backend_stream(&mut self.backend_streams, token, stream_id);
511            Some(token)
512        } else {
513            None
514        }
515    }
516
517    pub fn create_stream(&mut self, request_id: Ulid, window: u32) -> Option<GlobalStreamId> {
518        let http_context = {
519            let listener = self.listener.borrow();
520            let mut http_context = HttpContext::new(
521                self.session_ulid,
522                request_id,
523                listener.protocol(),
524                self.public_address,
525                self.session_address,
526                listener.get_sticky_name().to_string(),
527                listener.get_sozu_id_header().to_string(),
528                self.elide_x_real_ip,
529                self.send_x_real_ip,
530            );
531            // Propagate the connection-scoped TLS SNI onto every per-stream
532            // HttpContext so `route_from_request` can enforce the SNI ↔
533            // `:authority` binding for each H2 stream independently.
534            http_context.tls_server_name = self.tls_server_name.clone();
535            // Mirror the frozen-at-handshake SAN snapshot. `Arc` clone is a
536            // refcount bump, not a deep copy — every per-stream
537            // `HttpContext` shares the same `Vec<String>`.
538            http_context.tls_cert_names = self.tls_cert_names.clone();
539            // Mirror the listener's strict_sni_binding flag onto each
540            // HttpContext so the routing layer can honor operator opt-outs
541            // without reaching back into the listener on every request.
542            http_context.strict_sni_binding = self.strict_sni_binding;
543            // Propagate the connection-scoped TLS metadata onto every
544            // per-stream HttpContext so the access log can record it without
545            // touching the rustls session on every request. These are
546            // `&'static str` borrows from the rustls label tables — copy is
547            // a pointer move.
548            http_context.tls_version = self.tls_version;
549            http_context.tls_cipher = self.tls_cipher;
550            http_context.tls_alpn = self.tls_alpn;
551            http_context
552        };
553        let recycle_slot = self
554            .streams
555            .iter()
556            .position(|s| s.state == StreamState::Recycle);
557        if let Some(stream_id) = recycle_slot {
558            let stream = &mut self.streams[stream_id];
559            trace!("{} Reuse stream: {}", log_module_context!(), stream_id);
560            stream.state = StreamState::Idle;
561            stream.attempts = 0;
562            stream.front_received_end_of_stream = false;
563            stream.back_received_end_of_stream = false;
564            stream.front_data_received = 0;
565            stream.back_data_received = 0;
566            stream.request_counted = false;
567            stream.window = i32::try_from(window).unwrap_or(i32::MAX);
568            stream.context = http_context;
569            stream.back.clear();
570            stream.back.storage.clear();
571            stream.front.clear();
572            stream.front.storage.clear();
573            stream.metrics.reset();
574            stream.metrics.mark_request_start();
575            // After recycling a slot, check if the Vec has excessive trailing
576            // Recycle entries (more than 2x active streams of total capacity).
577            let active = self.active_len();
578            let total = self.streams.len();
579            if total > 1 && active > 0 && total > active * self.h2_stream_shrink_ratio {
580                self.shrink_trailing_recycle();
581            }
582            return Some(stream_id);
583        }
584        self.streams
585            .push(Stream::new(self.pool.clone(), http_context, window)?);
586        Some(self.streams.len() - 1)
587    }
588
589    /// Remove consecutive `Recycle` entries from the end of the streams Vec.
590    ///
591    /// This prevents unbounded growth when H2 streams are created and recycled
592    /// over time, reclaiming memory from slots that are no longer needed.
593    pub fn shrink_trailing_recycle(&mut self) {
594        while self
595            .streams
596            .last()
597            .is_some_and(|s| s.state == StreamState::Recycle)
598        {
599            self.streams.pop();
600        }
601    }
602}
603
604/// Remove `stream_id` from the backend-token reverse index for `token`.
605/// Free function to allow split borrows when `context.streams` is already
606/// mutably borrowed (preventing a `Context::unlink_stream` call).
607pub(super) fn remove_backend_stream(
608    index: &mut HashMap<Token, Vec<GlobalStreamId>>,
609    token: Token,
610    stream_id: GlobalStreamId,
611) {
612    if let Some(ids) = index.get_mut(&token) {
613        ids.retain(|&id| id != stream_id);
614        if ids.is_empty() {
615            index.remove(&token);
616        }
617    }
618}
619
620pub struct Mux<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
621    pub configured_frontend_timeout: Duration,
622    pub frontend_token: Token,
623    pub frontend: Connection<Front>,
624    pub router: Router,
625    pub context: Context<L>,
626    /// Per-session correlation ID generated at construction time. Included in
627    /// every log line emitted from this module so all events for a single
628    /// frontend connection can be reassembled (independent of the ephemeral
629    /// per-stream request id used by access logs).
630    pub session_ulid: Ulid,
631}
632
633impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Mux<Front, L> {
634    pub fn front_socket(&self) -> &TcpStream {
635        self.frontend.socket()
636    }
637}
638
639impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHandler> Mux<Front, L> {
640    fn delay_close_for_frontend_flush(&mut self, reason: &'static str) -> bool {
641        let _ = self.frontend.initiate_close_notify();
642        // LIFECYCLE §9 invariant 16: consult per-stream back-buffers in
643        // addition to the connection-level pending-write predicate so
644        // shutdown does not close while any open H2 stream still has
645        // kawa bytes queued after a voluntary scheduler yield.
646        if self
647            .frontend
648            .has_pending_write_including_streams(&self.context)
649        {
650            let readiness = self.frontend.readiness_mut();
651            readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
652            readiness.signal_pending_write();
653            debug!(
654                "{} Mux delaying close on {}: {:?}",
655                log_context!(self),
656                reason,
657                self.frontend
658            );
659            true
660        } else {
661            false
662        }
663    }
664
665    /// Drive the frontend I/O path during shutdown, when the server is polling
666    /// `shutting_down()` outside the normal epoll readiness loop.
667    ///
668    /// This is required for H2 graceful shutdown because a stream may still
669    /// need one last readable pass to observe the peer's END_STREAM or one last
670    /// writable pass to retire the stream, emit GOAWAY, or flush TLS records.
671    fn drive_frontend_shutdown_io(&mut self) -> SessionIsToBeClosed {
672        let force_h2_read = matches!(self.frontend, Connection::H2(_));
673        let force_h2_write = matches!(self.frontend, Connection::H2(_));
674        let readiness = self.frontend.readiness().clone();
675        if !force_h2_read
676            && !force_h2_write
677            && readiness.event.is_empty()
678            && !self.frontend.has_pending_write()
679        {
680            return false;
681        }
682
683        if force_h2_read || self.frontend.readiness().event.is_readable() {
684            self.frontend
685                .readiness_mut()
686                .interest
687                .insert(Ready::READABLE);
688            match self
689                .frontend
690                .readable(&mut self.context, EndpointClient(&mut self.router))
691            {
692                MuxResult::Continue => {}
693                MuxResult::CloseSession | MuxResult::Upgrade => return true,
694            }
695        }
696
697        if !force_h2_write
698            && !self.frontend.has_pending_write()
699            && !self.frontend.readiness().event.is_writable()
700        {
701            return false;
702        }
703
704        let mut iterations = 0;
705        loop {
706            self.frontend
707                .readiness_mut()
708                .interest
709                .insert(Ready::WRITABLE);
710            if force_h2_write {
711                self.frontend.readiness_mut().signal_pending_write();
712            }
713            match self
714                .frontend
715                .writable(&mut self.context, EndpointClient(&mut self.router))
716            {
717                MuxResult::Continue => {}
718                MuxResult::CloseSession | MuxResult::Upgrade => return true,
719            }
720
721            iterations += 1;
722            if iterations >= MAX_LOOP_ITERATIONS
723                || (!self.frontend.has_pending_write()
724                    && !self.frontend.readiness().event.is_writable())
725            {
726                break;
727            }
728        }
729        false
730    }
731}
732
733impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHandler> SessionState
734    for Mux<Front, L>
735{
736    fn ready(
737        &mut self,
738        session: Rc<RefCell<dyn ProxySession>>,
739        proxy: Rc<RefCell<dyn L7Proxy>>,
740        _metrics: &mut SessionMetrics,
741    ) -> SessionResult {
742        let mut counter = 0;
743
744        if self.frontend.readiness().event.is_hup()
745            && !self.delay_close_for_frontend_flush("frontend HUP")
746        {
747            debug!(
748                "{} Mux closing on frontend HUP: {:?}",
749                log_context!(self),
750                self.frontend
751            );
752            return SessionResult::Close;
753        }
754
755        // Start service timers on all active streams after the HUP check.
756        // This mirrors session-level service_start/service_stop in Http(s)Session::ready()
757        // to measure only CPU processing time, excluding epoll wait between cycles.
758        for stream in &mut self.context.streams {
759            if stream.state.is_open() {
760                stream.metrics.service_start();
761            }
762        }
763
764        let start = Instant::now();
765        self.context.debug.push(DebugEvent::ReadyTimestamp(
766            std::time::SystemTime::now()
767                .duration_since(std::time::UNIX_EPOCH)
768                .unwrap_or_default()
769                .as_millis() as usize,
770        ));
771        trace!("{} {:?}", log_context!(self), start);
772        loop {
773            self.context.debug.push(DebugEvent::LoopStart);
774            loop {
775                self.context.debug.push(DebugEvent::LoopIteration(counter));
776                if self.frontend.readiness().filter_interest().is_readable() {
777                    let res = {
778                        let context = &mut self.context;
779                        let res = self
780                            .frontend
781                            .readable(context, EndpointClient(&mut self.router));
782                        context.debug.push(DebugEvent::SR(
783                            self.frontend_token,
784                            res,
785                            self.frontend.readiness().clone(),
786                        ));
787                        res
788                    };
789                    match res {
790                        MuxResult::Continue => {}
791                        MuxResult::CloseSession => {
792                            if !self.delay_close_for_frontend_flush("frontend readable") {
793                                debug!(
794                                    "{} Mux close from frontend readable: {:?}",
795                                    log_context!(self),
796                                    self.frontend
797                                );
798                                return SessionResult::Close;
799                            }
800                        }
801                        MuxResult::Upgrade => return SessionResult::Upgrade,
802                    }
803                }
804
805                let mut all_backends_readiness_are_empty = true;
806                let mut dead_backends = Vec::new();
807                let mut backend_close: Option<(&'static str, Token)> = None;
808                for (token, client) in self.router.backends.iter_mut() {
809                    let readiness = client.readiness_mut();
810                    // Check the raw event for HUP/ERROR — not filter_interest(),
811                    // because interest only contains READABLE|WRITABLE and would
812                    // always mask out HUP (0b01000) and ERROR (0b00100).
813                    let dead = readiness.event.is_hup() || readiness.event.is_error();
814                    if dead {
815                        trace!(
816                            "{} Backend({:?}) -> {:?}",
817                            log_context_lite!(self),
818                            token,
819                            readiness
820                        );
821                        readiness.event.remove(Ready::WRITABLE);
822                    }
823
824                    if client.readiness().filter_interest().is_writable() {
825                        let position = client.position_mut();
826                        match position {
827                            Position::Client(
828                                cluster_id,
829                                backend,
830                                BackendStatus::Connecting(start),
831                            ) => {
832                                #[cfg(debug_assertions)]
833                                self.context
834                                    .debug
835                                    .push(DebugEvent::CCS(*token, cluster_id.clone()));
836
837                                let mut backend_borrow = backend.borrow_mut();
838                                if backend_borrow.retry_policy.is_down() {
839                                    info!(
840                                        "{} backend server {} at {} is up",
841                                        log_context_lite!(self),
842                                        backend_borrow.backend_id,
843                                        backend_borrow.address
844                                    );
845                                    incr!(
846                                        "backend.up",
847                                        Some(cluster_id),
848                                        Some(&backend_borrow.backend_id)
849                                    );
850                                    gauge!(
851                                        names::backend::AVAILABLE,
852                                        1,
853                                        Some(cluster_id),
854                                        Some(&backend_borrow.backend_id)
855                                    );
856                                    push_event(Event {
857                                        kind: EventKind::BackendUp as i32,
858                                        backend_id: Some(backend_borrow.backend_id.to_owned()),
859                                        address: Some(backend_borrow.address.into()),
860                                        cluster_id: Some(cluster_id.to_owned()),
861                                        metric_detail: None,
862                                    });
863                                }
864
865                                //successful connection, reset failure counter
866                                backend_borrow.failures = 0;
867                                backend_borrow.set_connection_time(start.elapsed());
868                                backend_borrow.retry_policy.succeed();
869
870                                if let Some(ids) = self.context.backend_streams.get(token) {
871                                    for &stream_id in ids {
872                                        self.context.streams[stream_id].metrics.backend_connected();
873                                        backend_borrow.active_requests += 1;
874                                    }
875                                }
876                                trace!(
877                                    "{} connection success: {:#?}",
878                                    log_context_lite!(self),
879                                    backend_borrow
880                                );
881                                drop(backend_borrow);
882                                *position = Position::Client(
883                                    std::mem::take(cluster_id),
884                                    backend.clone(),
885                                    BackendStatus::Connected,
886                                );
887                                client
888                                    .timeout_container()
889                                    .set_duration(self.router.configured_backend_timeout);
890                            }
891                            Position::Client(..) => {}
892                            Position::Server => {
893                                error!(
894                                    "{} backend connection cannot be in Server position",
895                                    log_context_lite!(self)
896                                );
897                            }
898                        }
899                        let res = {
900                            let context = &mut self.context;
901                            let res = client.writable(context, EndpointServer(&mut self.frontend));
902                            context.debug.push(DebugEvent::CW(
903                                *token,
904                                res,
905                                client.readiness().clone(),
906                            ));
907                            res
908                        };
909                        match res {
910                            MuxResult::Continue => {}
911                            MuxResult::Upgrade => {
912                                error!(
913                                    "{} only frontend connections can trigger Upgrade",
914                                    log_context_lite!(self)
915                                );
916                            }
917                            MuxResult::CloseSession => {
918                                backend_close = Some(("backend writable", *token));
919                                break;
920                            }
921                        }
922                        // Cross-readiness: backend wrote → wake frontend reader
923                        let context = &mut self.context;
924                        self.frontend.try_resume_reading(context);
925                    }
926
927                    if client.readiness().filter_interest().is_readable() {
928                        let res = {
929                            let context = &mut self.context;
930                            let res = client.readable(context, EndpointServer(&mut self.frontend));
931                            context.debug.push(DebugEvent::CR(
932                                *token,
933                                res,
934                                client.readiness().clone(),
935                            ));
936                            res
937                        };
938                        match res {
939                            MuxResult::Continue => {}
940                            MuxResult::Upgrade => {
941                                error!(
942                                    "{} only frontend connections can trigger Upgrade (readable)",
943                                    log_context_lite!(self)
944                                );
945                            }
946                            MuxResult::CloseSession => {
947                                backend_close = Some(("backend readable", *token));
948                                break;
949                            }
950                        }
951                    }
952
953                    if dead
954                        && !client.readiness().filter_interest().is_readable()
955                        && !client.has_buffer_pressure(&self.context)
956                    {
957                        self.context
958                            .debug
959                            .push(DebugEvent::CH(*token, client.readiness().clone()));
960                        trace!("{} Closing {:#?}", log_context_lite!(self), client);
961                        match client.position() {
962                            Position::Client(cluster_id, backend, BackendStatus::Connecting(_)) => {
963                                let mut backend_borrow = backend.borrow_mut();
964                                backend_borrow.failures += 1;
965
966                                let already_unavailable = backend_borrow.retry_policy.is_down();
967                                backend_borrow.retry_policy.fail();
968                                incr!(
969                                    "backend.connections.error",
970                                    Some(cluster_id),
971                                    Some(&backend_borrow.backend_id)
972                                );
973                                if !already_unavailable && backend_borrow.retry_policy.is_down() {
974                                    error!(
975                                        "{} backend server {} at {} is down",
976                                        log_context_lite!(self),
977                                        backend_borrow.backend_id,
978                                        backend_borrow.address
979                                    );
980                                    incr!(
981                                        "backend.down",
982                                        Some(cluster_id),
983                                        Some(&backend_borrow.backend_id)
984                                    );
985                                    gauge!(
986                                        names::backend::AVAILABLE,
987                                        0,
988                                        Some(cluster_id),
989                                        Some(&backend_borrow.backend_id)
990                                    );
991                                    push_event(Event {
992                                        kind: EventKind::BackendDown as i32,
993                                        backend_id: Some(backend_borrow.backend_id.to_owned()),
994                                        address: Some(backend_borrow.address.into()),
995                                        cluster_id: Some(cluster_id.to_owned()),
996                                        metric_detail: None,
997                                    });
998                                }
999                                trace!(
1000                                    "{} connection fail: {:#?}",
1001                                    log_context_lite!(self),
1002                                    backend_borrow
1003                                );
1004                            }
1005                            Position::Client(_, backend, _) => {
1006                                let mut backend_borrow = backend.borrow_mut();
1007                                let count = self
1008                                    .context
1009                                    .backend_streams
1010                                    .get(token)
1011                                    .map_or(0, |ids| ids.len());
1012                                backend_borrow.active_requests =
1013                                    backend_borrow.active_requests.saturating_sub(count);
1014                            }
1015                            Position::Server => {
1016                                error!(
1017                                    "{} dead backend cannot be in Server position",
1018                                    log_context_lite!(self)
1019                                );
1020                            }
1021                        }
1022                        client.close(&mut self.context, EndpointServer(&mut self.frontend));
1023                        dead_backends.push(*token);
1024                    }
1025
1026                    if !client.readiness().filter_interest().is_empty() {
1027                        all_backends_readiness_are_empty = false;
1028                    }
1029                }
1030                // Remove dead backends from the map BEFORE handling
1031                // backend_close. client.close() already decremented
1032                // connections_per_backend / backend.connections gauges in
1033                // the loop above; if we return SessionResult::Close before
1034                // removing them, Mux::close() would decrement again
1035                // (double-decrement → gauge underflow).
1036                if !dead_backends.is_empty() {
1037                    for token in &dead_backends {
1038                        let proxy_borrow = proxy.borrow();
1039                        if let Some(mut client) = self.router.backends.remove(token) {
1040                            client.timeout_container().cancel();
1041                            let socket = client.socket_mut();
1042                            if let Err(e) = proxy_borrow.deregister_socket(socket) {
1043                                error!(
1044                                    "{} error deregistering back socket({:?}): {:?}",
1045                                    log_context!(self),
1046                                    socket,
1047                                    e
1048                                );
1049                            }
1050                            // invariant: write-only shutdown — Shutdown::Both on a TLS frontend
1051                            // discards the receive buffer and elicits TCP RST, truncating the
1052                            // already-queued response. Canonical write-up: `lib/src/https.rs:650-655`.
1053                            // Backend sockets follow the same discipline for symmetry.
1054                            if let Err(e) = socket.shutdown(Shutdown::Write) {
1055                                if e.kind() != ErrorKind::NotConnected {
1056                                    error!(
1057                                        "{} error shutting down back socket({:?}): {:?}",
1058                                        log_context!(self),
1059                                        socket,
1060                                        e
1061                                    );
1062                                }
1063                            }
1064                        } else {
1065                            error!("{} session {:?} has no backend!", log_context!(self), token);
1066                        }
1067                        if !proxy_borrow.remove_session(*token) {
1068                            error!(
1069                                "{} session {:?} was already removed!",
1070                                log_context!(self),
1071                                token
1072                            );
1073                        }
1074                    }
1075                    trace!("{} FRONTEND: {:#?}", log_context!(self), self.frontend);
1076                    trace!(
1077                        "{} BACKENDS: {:#?}",
1078                        log_context!(self),
1079                        self.router.backends
1080                    );
1081                }
1082                if let Some((reason, token)) = backend_close {
1083                    if !self.delay_close_for_frontend_flush(reason) {
1084                        debug!(
1085                            "{} Mux close from {} token={:?}: frontend={:?}",
1086                            log_context!(self),
1087                            reason,
1088                            token,
1089                            self.frontend
1090                        );
1091                        return SessionResult::Close;
1092                    }
1093                    all_backends_readiness_are_empty = false;
1094                }
1095
1096                if self.frontend.readiness().filter_interest().is_writable() {
1097                    let res = {
1098                        let context = &mut self.context;
1099                        let res = self
1100                            .frontend
1101                            .writable(context, EndpointClient(&mut self.router));
1102                        context.debug.push(DebugEvent::SW(
1103                            self.frontend_token,
1104                            res,
1105                            self.frontend.readiness().clone(),
1106                        ));
1107                        res
1108                    };
1109                    match res {
1110                        MuxResult::Continue => {}
1111                        MuxResult::CloseSession => {
1112                            if !self.delay_close_for_frontend_flush("frontend writable") {
1113                                debug!(
1114                                    "{} Mux close from frontend writable: {:?}",
1115                                    log_context!(self),
1116                                    self.frontend
1117                                );
1118                                return SessionResult::Close;
1119                            }
1120                        }
1121                        MuxResult::Upgrade => return SessionResult::Upgrade,
1122                    }
1123                    // Cross-readiness: frontend wrote → wake parked backends.
1124                    // If any backend resumes, invalidate the stale readiness
1125                    // flag so the inner loop continues instead of breaking.
1126                    let context = &mut self.context;
1127                    for (_token, backend) in self.router.backends.iter_mut() {
1128                        if backend.try_resume_reading(context) {
1129                            all_backends_readiness_are_empty = false;
1130                        }
1131                    }
1132                }
1133
1134                if self.frontend.readiness().filter_interest().is_empty()
1135                    && all_backends_readiness_are_empty
1136                {
1137                    break;
1138                }
1139
1140                counter += 1;
1141                if counter >= MAX_LOOP_ITERATIONS {
1142                    incr!(names::http::INFINITE_LOOP_ERROR);
1143                    if self.frontend.has_pending_write() {
1144                        debug!(
1145                            "{} Mux loop budget exhausted while frontend flush pending: {:?}",
1146                            log_context!(self),
1147                            self.frontend
1148                        );
1149                        self.frontend.readiness_mut().event.remove(Ready::WRITABLE);
1150                        self.frontend.timeout_container().set(self.frontend_token);
1151                        break;
1152                    }
1153                    return SessionResult::Close;
1154                }
1155            }
1156
1157            let context = &mut self.context;
1158            let answers_rc = context.listener.borrow().get_answers().clone();
1159            let mut dirty = false;
1160            while let Some(stream_id) = context.pending_links.pop_front() {
1161                let Some(stream) = context.streams.get(stream_id) else {
1162                    continue;
1163                };
1164                if stream.state != StreamState::Link {
1165                    continue;
1166                }
1167                // Before the first request triggers a stream Link, the frontend timeout is set
1168                // to a shorter request_timeout, here we switch to the longer nominal timeout
1169                self.frontend
1170                    .timeout_container()
1171                    .set_duration(self.configured_frontend_timeout);
1172                let front_readiness = self.frontend.readiness_mut();
1173                dirty = true;
1174                match self.router.connect(
1175                    stream_id,
1176                    context,
1177                    session.clone(),
1178                    proxy.clone(),
1179                    self.frontend_token,
1180                ) {
1181                    Ok(_) => {
1182                        let state = context.streams[stream_id].state;
1183                        context.debug.push(DebugEvent::CC(stream_id, state));
1184                    }
1185                    Err(error) => {
1186                        trace!("{} Connection error: {}", log_module_context!(), error);
1187                        let stream = &mut context.streams[stream_id];
1188                        let answers = answers_rc.borrow();
1189                        use BackendConnectionError as BE;
1190                        match error {
1191                            BE::MaxConnectionRetries(_)
1192                            | BE::MaxSessionsMemory
1193                            | BE::MaxBuffers => {
1194                                warn!(
1195                                    "{} backend retry budget exhausted: {}",
1196                                    log_module_context!(stream.context),
1197                                    error
1198                                );
1199                                set_default_answer(stream, front_readiness, 503, &answers);
1200                            }
1201                            BE::Backend(BackendError::NoBackendForCluster(_)) => {
1202                                set_default_answer(stream, front_readiness, 503, &answers);
1203                            }
1204                            BE::RetrieveClusterError(RetrieveClusterError::RetrieveFrontend(
1205                                ref err,
1206                            )) => {
1207                                // RFC 9110 §15.5.1: a malformed authority is a
1208                                // 400. A syntactically valid authority that
1209                                // simply has no matching frontend stays on the
1210                                // historical 404 path.
1211                                let code = match err {
1212                                    FrontendFromRequestError::HostParse { .. }
1213                                    | FrontendFromRequestError::InvalidCharsAfterHost(_) => 400,
1214                                    FrontendFromRequestError::NoClusterFound(_) => 404,
1215                                };
1216                                set_default_answer(stream, front_readiness, code, &answers);
1217                            }
1218                            BE::RetrieveClusterError(RetrieveClusterError::UnauthorizedRoute) => {
1219                                set_default_answer(stream, front_readiness, 401, &answers);
1220                            }
1221                            BE::RetrieveClusterError(
1222                                RetrieveClusterError::SniAuthorityMismatch { .. },
1223                            ) => {
1224                                // RFC 9110 §15.5.20: 421 Misdirected Request is the
1225                                // semantically correct status for an authority that
1226                                // does not belong to this TLS connection. The
1227                                // http.sni_authority_mismatch metric emitted in
1228                                // `route_from_request` remains the durable signal;
1229                                // the 421 body here is what a client sees and may
1230                                // retry on a fresh TLS connection with a matching SNI.
1231                                set_default_answer(stream, front_readiness, 421, &answers);
1232                            }
1233                            BE::RetrieveClusterError(RetrieveClusterError::HttpsRedirect) => {
1234                                // Use the redirect status stashed by `Router::route_from_request`
1235                                // (#1009). Falls back to 301 for the legacy
1236                                // `cluster.https_redirect = true` path that does
1237                                // not set the field.
1238                                let code = stream.context.redirect_status.unwrap_or(301);
1239                                set_default_answer(stream, front_readiness, code, &answers);
1240                            }
1241
1242                            BE::Backend(ref e) => {
1243                                error!("{} backend connection error: {}", log_module_context!(), e);
1244                                set_default_answer(stream, front_readiness, 503, &answers);
1245                            }
1246                            BE::RetrieveClusterError(ref other) => {
1247                                error!(
1248                                    "{} unexpected RetrieveClusterError variant: {:?}",
1249                                    log_module_context!(),
1250                                    other
1251                                );
1252                                set_default_answer(stream, front_readiness, 503, &answers);
1253                            }
1254                            // TCP specific error
1255                            BE::NotFound(ref msg) => {
1256                                error!(
1257                                    "{} NotFound is TCP-specific, not reachable in mux: {:?}",
1258                                    log_module_context!(),
1259                                    msg
1260                                );
1261                                set_default_answer(stream, front_readiness, 503, &answers);
1262                            }
1263                            // Per-(cluster, source-IP) connection limit reached.
1264                            // Emit HTTP 429 with the resolved `Retry-After`. The
1265                            // value is computed in `Router::connect` (where the
1266                            // SessionManager + cluster override are reachable)
1267                            // and stashed on the stream context just before the
1268                            // error is returned, so the answer engine can render
1269                            // (or elide) the header without re-deriving the
1270                            // resolution chain here.
1271                            BE::TooManyConnectionsPerIp { ref cluster_id } => {
1272                                debug!(
1273                                    "{} per-(cluster, source-IP) limit hit for cluster {:?}",
1274                                    log_module_context!(),
1275                                    cluster_id
1276                                );
1277                                let retry_after = stream.context.retry_after_seconds;
1278                                set_default_answer_with_retry_after(
1279                                    stream,
1280                                    front_readiness,
1281                                    429,
1282                                    &answers,
1283                                    retry_after,
1284                                );
1285                            }
1286                        }
1287                        context.debug.push(DebugEvent::CCF(stream_id, error));
1288                    }
1289                }
1290                // All routing error arms now set a default answer, transitioning
1291                // the stream out of Link state. No re-enqueue needed.
1292            }
1293            if !dirty {
1294                break;
1295            }
1296        }
1297
1298        // Stop service timers before yielding to epoll, so idle wait time is excluded
1299        // from the service_time metric. For Close/Upgrade returns, close() handles cleanup.
1300        for stream in &mut self.context.streams {
1301            if stream.state.is_open() {
1302                stream.metrics.service_stop();
1303            }
1304        }
1305
1306        #[cfg(debug_assertions)]
1307        {
1308            // Verify backend_streams index matches actual stream states.
1309            let mut expected: HashMap<Token, Vec<GlobalStreamId>> = HashMap::new();
1310            for (id, stream) in self.context.streams.iter().enumerate() {
1311                if let StreamState::Linked(token) = stream.state {
1312                    expected.entry(token).or_default().push(id);
1313                }
1314            }
1315            assert_eq!(
1316                expected.len(),
1317                self.context.backend_streams.len(),
1318                "backend_streams index key count mismatch: expected={:?}, actual={:?}",
1319                expected,
1320                self.context.backend_streams
1321            );
1322            for (token, mut expected_ids) in expected {
1323                let mut actual_ids = self
1324                    .context
1325                    .backend_streams
1326                    .get(&token)
1327                    .cloned()
1328                    .unwrap_or_default();
1329                expected_ids.sort();
1330                actual_ids.sort();
1331                assert_eq!(
1332                    expected_ids, actual_ids,
1333                    "backend_streams index mismatch for token {token:?}",
1334                );
1335            }
1336        }
1337
1338        SessionResult::Continue
1339    }
1340
1341    fn update_readiness(&mut self, token: Token, events: Ready) {
1342        trace!("{} EVENTS: {:?} on {:?}", log_context!(self), events, token);
1343        self.context.debug.push(DebugEvent::EV(token, events));
1344        if token == self.frontend_token {
1345            self.frontend.readiness_mut().event |= events;
1346        } else if let Some(c) = self.router.backends.get_mut(&token) {
1347            c.readiness_mut().event |= events;
1348        }
1349    }
1350
1351    fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
1352        trace!("{} MuxState::timeout({:?})", log_context!(self), token);
1353        let front_is_h2 = match self.frontend {
1354            Connection::H1(_) => false,
1355            Connection::H2(_) => true,
1356        };
1357        let answers_rc = self.context.listener.borrow().get_answers().clone();
1358        let mut should_close = true;
1359        let mut should_write = false;
1360        if self.frontend_token == token {
1361            trace!(
1362                "{} MuxState::timeout_frontend({:#?})",
1363                log_context!(self),
1364                self.frontend
1365            );
1366            self.frontend.timeout_container().triggered();
1367            // The per-stream reaper (bidirectional-idle + outbound
1368            // flow-control-stall guards) normally runs only from `readable()`,
1369            // but a fully-silent peer never triggers a read event. Run it on the
1370            // connection-timeout path too so a window-stalled stream — a buffered
1371            // response the peer refuses to drain by holding its receive window
1372            // shut — is reaped and its MAX_CONCURRENT_STREAMS slot freed, instead
1373            // of lingering until the 30-minute zombie checker. The reaper queues
1374            // an `RST_STREAM(CANCEL)`; because `has_pending_write()` does NOT
1375            // observe `pending_rst_streams` (it gates connection close, so a
1376            // queued RST must not read as "keep open"), set `should_write` via
1377            // the dedicated `has_pending_control_write()` probe so the reset is
1378            // actually flushed to the peer before the connection closes — without
1379            // it, a fully-silent peer's stalled stream is freed but the peer sees
1380            // only EOF, never the RST(CANCEL).
1381            if let Connection::H2(h2) = &mut self.frontend {
1382                h2.cancel_timed_out_streams(
1383                    &mut self.context,
1384                    &mut EndpointClient(&mut self.router),
1385                );
1386                if h2.has_pending_control_write() {
1387                    should_write = true;
1388                }
1389            }
1390            if self.frontend.has_pending_write() {
1391                should_write = true;
1392            }
1393            let front_readiness = self.frontend.readiness_mut();
1394            for stream_id in 0..self.context.streams.len() {
1395                match self.context.streams[stream_id].state {
1396                    StreamState::Idle => {
1397                        // In h1 an Idle stream is always the first request, so we can send a 408
1398                        // In h2 an Idle stream doesn't necessarily hold a request yet,
1399                        // in most cases it was just reserved, so we can just ignore them.
1400                        if !front_is_h2 {
1401                            let answers = answers_rc.borrow();
1402                            let stream = &mut self.context.streams[stream_id];
1403                            stream.context.access_log_message = Some("client_timeout");
1404                            set_default_answer(stream, front_readiness, 408, &answers);
1405                            should_write = true;
1406                        }
1407                    }
1408                    StreamState::Link => {
1409                        // This is an unusual case, as we have both a complete request and no
1410                        // available backend yet. For now, we answer with 503.
1411                        // Not a timeout-driven outcome from the operator's
1412                        // perspective — leave access_log_message as None.
1413                        let answers = answers_rc.borrow();
1414                        let stream = &mut self.context.streams[stream_id];
1415                        set_default_answer(stream, front_readiness, 503, &answers);
1416                        should_write = true;
1417                    }
1418                    StreamState::Linked(_) => {
1419                        // The frontend timed out while a stream is linked to a backend.
1420                        // The backend timeout should handle this, but in case the backend
1421                        // is also stalled, send a 504 and terminate the stream.
1422                        if !self.context.streams[stream_id].back.consumed {
1423                            self.context.unlink_stream(stream_id);
1424                            let answers = answers_rc.borrow();
1425                            let stream = &mut self.context.streams[stream_id];
1426                            stream.context.access_log_message =
1427                                Some("client_timeout_during_response");
1428                            set_default_answer(stream, front_readiness, 504, &answers);
1429                            should_write = true;
1430                        } else if self.context.streams[stream_id].back.is_completed() {
1431                            // Response fully proxied, stream can be closed
1432                        } else if self.context.streams[stream_id].back.is_terminated()
1433                            || self.context.streams[stream_id].back.is_error()
1434                        {
1435                            // Response is terminated/error but not fully written to frontend.
1436                            // Keep the session alive briefly to flush remaining data.
1437                            should_close = false;
1438                        } else {
1439                            // Partial response in progress — forcefully terminate
1440                            self.context.unlink_stream(stream_id);
1441                            let stream = &mut self.context.streams[stream_id];
1442                            stream.context.access_log_message =
1443                                Some("client_timeout_during_response");
1444                            forcefully_terminate_answer(
1445                                stream,
1446                                front_readiness,
1447                                H2Error::InternalError,
1448                            );
1449                            should_write = true;
1450                        }
1451                        // end_stream is called in a second pass below to avoid
1452                        // borrow conflicts on context.streams.
1453                    }
1454                    StreamState::Unlinked => {
1455                        // A stream Unlinked already has a response and its backend closed.
1456                        // In case it hasn't finished proxying we wait. Otherwise it is a stream
1457                        // kept alive for a new request, which can be killed.
1458                        if !self.context.streams[stream_id].back.is_completed() {
1459                            should_close = false;
1460                        }
1461                    }
1462                    StreamState::Recycle => {
1463                        // A recycled stream is an h2 stream which doesn't hold a request anymore.
1464                        // We can ignore it.
1465                    }
1466                }
1467            }
1468            // Second pass: end streams that were linked to backends.
1469            // This is done separately to avoid borrow conflicts on context.streams.
1470            let linked_streams: Vec<(GlobalStreamId, Token)> = self
1471                .context
1472                .streams
1473                .iter()
1474                .enumerate()
1475                .filter_map(|(id, stream)| {
1476                    if let StreamState::Linked(back_token) = stream.state {
1477                        Some((id, back_token))
1478                    } else {
1479                        None
1480                    }
1481                })
1482                .collect();
1483            for (stream_id, back_token) in linked_streams {
1484                if let Some(backend) = self.router.backends.get_mut(&back_token) {
1485                    backend.end_stream(stream_id, &mut self.context);
1486                }
1487            }
1488        } else if let Some(backend) = self.router.backends.get_mut(&token) {
1489            trace!(
1490                "{} MuxState::timeout_backend({:#?})",
1491                log_context_lite!(self),
1492                backend
1493            );
1494            backend.timeout_container().triggered();
1495            let front_readiness = self.frontend.readiness_mut();
1496            let linked_ids: Vec<GlobalStreamId> = self
1497                .context
1498                .backend_streams
1499                .get(&token)
1500                .map_or_else(Vec::new, |ids| ids.to_owned());
1501            for stream_id in linked_ids {
1502                // This stream is linked to the backend that timedout
1503                if self.context.streams[stream_id].back.is_terminated()
1504                    || self.context.streams[stream_id].back.is_error()
1505                {
1506                    trace!(
1507                        "{} Stream terminated or in error, do nothing, just wait a bit more",
1508                        log_module_context!()
1509                    );
1510                    // Nothing to do, simply wait for the remaining bytes to be proxied
1511                    if !self.context.streams[stream_id].back.is_completed() {
1512                        should_close = false;
1513                    }
1514                } else if !self.context.streams[stream_id].back.consumed {
1515                    // The response has not started yet
1516                    trace!(
1517                        "{} Stream still waiting for response, send 504",
1518                        log_module_context!()
1519                    );
1520                    self.context.unlink_stream(stream_id);
1521                    let answers = answers_rc.borrow();
1522                    let stream = &mut self.context.streams[stream_id];
1523                    stream.context.access_log_message = Some("backend_timeout");
1524                    set_default_answer(stream, front_readiness, 504, &answers);
1525                    should_write = true;
1526                } else {
1527                    trace!(
1528                        "{} Stream waiting for end of response, forcefully terminate it",
1529                        log_module_context!()
1530                    );
1531                    self.context.unlink_stream(stream_id);
1532                    let stream = &mut self.context.streams[stream_id];
1533                    stream.context.access_log_message = Some("backend_response_timeout");
1534                    forcefully_terminate_answer(stream, front_readiness, H2Error::InternalError);
1535                    should_write = true;
1536                }
1537                backend.end_stream(stream_id, &mut self.context);
1538            }
1539            // Re-arm the backend timeout if the session stays alive (draining streams).
1540            // Without this, the timeout is consumed and the session becomes immortal
1541            // until the zombie checker runs.
1542            if !should_close {
1543                backend.timeout_container().set(token);
1544            }
1545        } else {
1546            // Session received a timeout for an unknown token, ignore it
1547            return StateResult::Continue;
1548        }
1549        if should_write {
1550            // Drain as much pending data as possible before closing.
1551            // A single writable() call is insufficient for large responses —
1552            // the TLS buffer may need multiple flushes. Without this loop,
1553            // the session is killed with unflushed TLS data, causing the
1554            // client to receive a truncated TLS record ("decode error").
1555            //
1556            // The constant 16 is empirical: it papers over a missing
1557            // invariant-15 hop in the H2 mux state machine where the
1558            // writable readiness signal is not always re-armed after a
1559            // partial flush. Long-term plan: reach invariant-15 closure
1560            // and remove this loop. See `lib/src/protocol/mux/LIFECYCLE.md`.
1561            let mut result = StateResult::Continue;
1562            for _ in 0..16 {
1563                result = match self
1564                    .frontend
1565                    .writable(&mut self.context, EndpointClient(&mut self.router))
1566                {
1567                    MuxResult::Continue => StateResult::Continue,
1568                    MuxResult::Upgrade => StateResult::Upgrade,
1569                    MuxResult::CloseSession => StateResult::CloseSession,
1570                };
1571                if result != StateResult::Continue
1572                    || !self.frontend.readiness_mut().interest.is_writable()
1573                {
1574                    break;
1575                }
1576            }
1577            // Re-arm the frontend timeout so the session doesn't become immortal.
1578            // The writable call may have partially flushed the response — we need
1579            // the timeout to fire again if the flush stalls.
1580            if result == StateResult::Continue {
1581                self.frontend.timeout_container().set(self.frontend_token);
1582            }
1583            return result;
1584        }
1585        if should_close {
1586            if self.delay_close_for_frontend_flush("timeout") {
1587                debug!(
1588                    "{} Mux timeout delaying close for frontend flush: token={:?}, frontend={:?}",
1589                    log_context!(self),
1590                    token,
1591                    self.frontend
1592                );
1593                self.frontend.timeout_container().set(self.frontend_token);
1594                return StateResult::Continue;
1595            }
1596            if front_is_h2 {
1597                debug!(
1598                    "{} Mux timeout returning CloseSession: token={:?}, frontend={:?}",
1599                    log_context!(self),
1600                    token,
1601                    self.frontend
1602                );
1603                for (idx, stream) in self.context.streams.iter().enumerate() {
1604                    if stream.state != StreamState::Recycle {
1605                        debug!(
1606                            "{}   timeout stream[{}]: state={:?}, front_phase={:?}, back_phase={:?}, front_completed={}, back_completed={}",
1607                            log_context!(self),
1608                            idx,
1609                            stream.state,
1610                            stream.front.parsing_phase,
1611                            stream.back.parsing_phase,
1612                            stream.front.is_completed(),
1613                            stream.back.is_completed()
1614                        );
1615                    }
1616                }
1617            }
1618            StateResult::CloseSession
1619        } else {
1620            // Re-arm the frontend timeout. Without this, the timeout is consumed
1621            // by triggered() and the session stays alive indefinitely until the
1622            // zombie checker runs (default: 30 minutes).
1623            self.frontend.timeout_container().set(self.frontend_token);
1624            StateResult::Continue
1625        }
1626    }
1627
1628    fn cancel_timeouts(&mut self) {
1629        trace!("{} MuxState::cancel_timeouts", log_context!(self));
1630        self.frontend.timeout_container().cancel();
1631        for backend in self.router.backends.values_mut() {
1632            backend.timeout_container().cancel();
1633        }
1634    }
1635
1636    fn print_state(&self, _context: &str) {
1637        // The trait-required `context: &str` parameter (protocol tag like
1638        // "HTTPS"/"HTTP" passed by callers) predates the unified
1639        // `log_context!(self)` envelope. The canonical `MUX` tag lives
1640        // inside `log_context!`, so we ignore the parameter here and emit
1641        // the bracketed Session(...) block instead, mirroring the second
1642        // `error!` in this function.
1643        error!(
1644            "\
1645{} Session(Mux)
1646\tFrontend:
1647\t\ttoken: {:?}\treadiness: {:?}
1648\tBackend(s):",
1649            log_context!(self),
1650            self.frontend_token,
1651            self.frontend.readiness()
1652        );
1653        for (backend_token, backend) in &self.router.backends {
1654            error!(
1655                "{} \t\ttoken: {:?}\treadiness: {:?}",
1656                log_context!(self),
1657                backend_token,
1658                backend.readiness()
1659            )
1660        }
1661    }
1662
1663    fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
1664        if self.context.debug.is_interesting() {
1665            warn!("{} {:?}", log_context!(self), self.context.debug.events);
1666        }
1667        debug!("{} MUX CLOSE", log_context!(self));
1668        trace!("{} FRONTEND: {:#?}", log_context!(self), self.frontend);
1669        trace!(
1670            "{} BACKENDS: {:#?}",
1671            log_context!(self),
1672            self.router.backends
1673        );
1674
1675        // Log active streams at session teardown for timeout diagnosis
1676        let active_count = self
1677            .context
1678            .streams
1679            .iter()
1680            .filter(|s| s.state.is_open() && s.metrics.start.is_some())
1681            .count();
1682        if active_count > 0 {
1683            debug!(
1684                "{} Session close with {} active stream(s)",
1685                log_context!(self),
1686                active_count
1687            );
1688            for (idx, stream) in self
1689                .context
1690                .streams
1691                .iter()
1692                .enumerate()
1693                .filter(|(_, s)| s.state.is_open() && s.metrics.start.is_some())
1694            {
1695                let elapsed = stream.metrics.service_time();
1696                debug!(
1697                    "{}   active stream[{}]: state={:?} service_time={:?} method={:?} path={:?} status={:?}",
1698                    log_context!(self),
1699                    idx,
1700                    stream.state,
1701                    elapsed,
1702                    stream.context.method,
1703                    stream.context.path,
1704                    stream.context.status,
1705                );
1706            }
1707            incr!(names::h2::CLOSE_WITH_ACTIVE_STREAMS);
1708        }
1709
1710        // Distribute H2 connection-level overhead (control frames) across in-flight
1711        // streams so that access log bytes_in/bytes_out reflect actual wire cost.
1712        // Integer division may lose up to (active_count - 1) bytes, which is acceptable.
1713        let active_count = active_count.max(1);
1714        let (total_overhead_in, total_overhead_out) = self.frontend.overhead_bytes();
1715        let share_in = total_overhead_in / active_count;
1716        let share_out = total_overhead_out / active_count;
1717
1718        // Generate access logs for in-flight streams on session teardown.
1719        // Skip streams that already had their access log emitted (metrics.start is
1720        // set to None by metrics.reset() after generate_access_log in the happy path).
1721        // Frontend RTT is the same for every stream on this session — snapshot
1722        // it once outside the loop instead of paying one TCP_INFO syscall per
1723        // open stream.
1724        let client_rtt = socket_rtt(self.frontend.socket());
1725        for stream in &mut self.context.streams {
1726            if stream.state.is_open() && stream.metrics.start.is_some() {
1727                stream.metrics.bin += share_in;
1728                stream.metrics.bout += share_out;
1729                stream.metrics.service_stop();
1730                if stream.metrics.backend_stop.is_none() {
1731                    stream.metrics.backend_stop();
1732                }
1733                // Only mark as error if the stream had an actual protocol/processing failure
1734                // (kawa parse error, backend error). Normal timeouts, client disconnects,
1735                // and graceful connection closures are not errors.
1736                let is_error = stream.front.is_error() || stream.back.is_error();
1737                let server_rtt = stream.linked_token().and_then(|token| {
1738                    self.router
1739                        .backends
1740                        .get(&token)
1741                        .and_then(|c| socket_rtt(c.socket()))
1742                });
1743                stream.generate_access_log(
1744                    is_error,
1745                    Some("session close"),
1746                    self.context.listener.clone(),
1747                    client_rtt,
1748                    server_rtt,
1749                );
1750                stream.state = StreamState::Recycle;
1751            }
1752        }
1753
1754        self.frontend
1755            .close(&mut self.context, EndpointClient(&mut self.router));
1756
1757        for (token, client) in &mut self.router.backends {
1758            let proxy_borrow = proxy.borrow();
1759            client.timeout_container().cancel();
1760            let socket = client.socket_mut();
1761            if let Err(e) = proxy_borrow.deregister_socket(socket) {
1762                error!(
1763                    "{} error deregistering back socket({:?}): {:?}",
1764                    log_context_lite!(self),
1765                    socket,
1766                    e
1767                );
1768            }
1769            // invariant: write-only shutdown — Shutdown::Both on a TLS frontend
1770            // discards the receive buffer and elicits TCP RST, truncating the
1771            // already-queued response. Canonical write-up: `lib/src/https.rs:650-655`.
1772            // Backend sockets follow the same discipline for symmetry.
1773            if let Err(e) = socket.shutdown(Shutdown::Write) {
1774                if e.kind() != ErrorKind::NotConnected {
1775                    error!(
1776                        "{} error shutting down back socket({:?}): {:?}",
1777                        log_context_lite!(self),
1778                        socket,
1779                        e
1780                    );
1781                }
1782            }
1783            if !proxy_borrow.remove_session(*token) {
1784                error!(
1785                    "{} session {:?} was already removed!",
1786                    log_context_lite!(self),
1787                    token
1788                );
1789            }
1790
1791            match client.position() {
1792                Position::Client(cluster_id, backend, _) => {
1793                    let mut backend_borrow = backend.borrow_mut();
1794                    backend_borrow.dec_connections();
1795                    gauge_add!(names::backend::CONNECTIONS, -1);
1796                    // Second `-1` site for `backend.pool.size` (the first is
1797                    // in `connection.rs::pre_close_client_bookkeeping`). This
1798                    // path runs during session teardown when the frontend
1799                    // session iterates the backends map directly without
1800                    // routing through `Connection::close`. Both `-1` sites
1801                    // mirror the single `+1` in router.rs::connect and the
1802                    // matching `backend.connections, -1` calls already
1803                    // present here, so symmetry follows from
1804                    // `backend.connections` correctness.
1805                    gauge_add!(names::backend::POOL_SIZE, -1);
1806                    gauge_add!(
1807                        names::backend::CONNECTIONS_PER_BACKEND,
1808                        -1,
1809                        Some(cluster_id),
1810                        Some(&backend_borrow.backend_id)
1811                    );
1812                    let count = self
1813                        .context
1814                        .backend_streams
1815                        .get(token)
1816                        .map_or(0, |ids| ids.len());
1817                    backend_borrow.active_requests =
1818                        backend_borrow.active_requests.saturating_sub(count);
1819                    trace!(
1820                        "{} connection (session) closed: {:#?}",
1821                        log_context_lite!(self),
1822                        backend_borrow
1823                    );
1824                }
1825                Position::Server => {
1826                    error!(
1827                        "{} close_backend called on Server position",
1828                        log_context_lite!(self)
1829                    );
1830                }
1831            }
1832        }
1833        // Clear the reverse index after all backends have decremented their
1834        // active_requests counters (which depend on the index for stream counts).
1835        self.context.backend_streams.clear();
1836    }
1837
1838    fn shutting_down(&mut self) -> SessionIsToBeClosed {
1839        // RFC 9113 §6.8: initiate graceful shutdown with double-GOAWAY pattern.
1840        // Only send the initial GOAWAY once. The final GOAWAY (with the real
1841        // last_stream_id) is handled by finalize_write() when all streams drain.
1842        // Calling graceful_goaway() again would send the final GOAWAY
1843        // prematurely and force-disconnect before in-flight streams complete.
1844        if !self.frontend.is_draining() {
1845            match self.frontend.graceful_goaway() {
1846                MuxResult::CloseSession => return true,
1847                MuxResult::Continue => {
1848                    // graceful_goaway() queued a GOAWAY frame. Flush it directly
1849                    // since the event loop uses edge-triggered epoll and won't
1850                    // deliver a new WRITABLE event for an already-writable socket.
1851                    self.frontend.flush_zero_buffer();
1852                }
1853                _ => {}
1854            }
1855        } else {
1856            trace!(
1857                "{} shutting_down: already draining, skipping duplicate GOAWAY",
1858                log_context!(self)
1859            );
1860            // shut_down_sessions() runs outside ready(), so retry flushing any
1861            // previously-buffered GOAWAY/TLS records on each pass.
1862            self.frontend.flush_zero_buffer();
1863        }
1864        if self.drive_frontend_shutdown_io() {
1865            return true;
1866        }
1867        // Forced-close deadline: once the H2 listener's
1868        // `h2_graceful_shutdown_deadline_seconds` budget has elapsed from
1869        // the moment `graceful_goaway` armed `drain.started_at`, stop
1870        // waiting for streams and tear the session down. `drive_frontend_
1871        // shutdown_io` above already had a chance to flush any pending
1872        // TLS/GOAWAY records; this branch accepts that some bytes may be
1873        // lost in exchange for honoring the operator-configured SLA.
1874        // Listeners that disable the knob (`= 0` → `None`) short-circuit
1875        // the check inside `graceful_shutdown_deadline_elapsed`.
1876        if self.frontend.graceful_shutdown_deadline_elapsed() {
1877            debug!(
1878                "{} Mux shutting_down: graceful-shutdown deadline elapsed, forcing close",
1879                log_context!(self)
1880            );
1881            return true;
1882        }
1883        if matches!(self.frontend, Connection::H2(_)) && self.frontend.is_draining() {
1884            for stream in &mut self.context.streams {
1885                if stream.front_received_end_of_stream {
1886                    continue;
1887                }
1888                if !matches!(stream.state, StreamState::Linked(_) | StreamState::Unlinked) {
1889                    continue;
1890                }
1891                if stream.front.consumed
1892                    && stream.front.storage.is_empty()
1893                    && stream.front.is_completed()
1894                {
1895                    stream.front_received_end_of_stream = true;
1896                    self.frontend
1897                        .readiness_mut()
1898                        .interest
1899                        .insert(Ready::WRITABLE);
1900                    self.frontend.readiness_mut().signal_pending_write();
1901                }
1902            }
1903        }
1904        let mut can_stop = true;
1905        for stream in &mut self.context.streams {
1906            match stream.state {
1907                StreamState::Linked(_) => {
1908                    can_stop = false;
1909                }
1910                StreamState::Unlinked => {
1911                    kawa::debug_kawa(&stream.front);
1912                    kawa::debug_kawa(&stream.back);
1913                    if stream.is_quiesced() {
1914                        continue;
1915                    }
1916                    stream.context.closing = true;
1917                    can_stop = false;
1918                }
1919                _ => {}
1920            }
1921        }
1922        if self.frontend.has_pending_write() {
1923            return false;
1924        }
1925        if can_stop {
1926            let active_h2_streams = self
1927                .context
1928                .streams
1929                .iter()
1930                .enumerate()
1931                .filter(|(_, s)| {
1932                    if s.state == StreamState::Recycle {
1933                        return false;
1934                    }
1935                    if s.state == StreamState::Unlinked && s.is_quiesced() {
1936                        return false;
1937                    }
1938                    true
1939                })
1940                .collect::<Vec<_>>();
1941            if matches!(self.frontend, Connection::H2(_)) && !active_h2_streams.is_empty() {
1942                debug!(
1943                    "{} Mux shutting_down returning true with active H2 streams: {:?}",
1944                    log_context!(self),
1945                    self.frontend
1946                );
1947                for (idx, stream) in active_h2_streams {
1948                    debug!(
1949                        "{}   shutdown stream[{}]: state={:?}, front_phase={:?}, back_phase={:?}, front_completed={}, back_completed={}",
1950                        log_context!(self),
1951                        idx,
1952                        stream.state,
1953                        stream.front.parsing_phase,
1954                        stream.back.parsing_phase,
1955                        stream.front.is_completed(),
1956                        stream.back.is_completed()
1957                    );
1958                }
1959            }
1960        }
1961        if can_stop {
1962            return true;
1963        }
1964
1965        false
1966    }
1967}
1968
1969#[cfg(test)]
1970mod tests {
1971    use super::*;
1972
1973    #[test]
1974    fn update_readiness_after_read_closed_keeps_writable() {
1975        let mut readiness = Readiness {
1976            event: Ready::READABLE | Ready::WRITABLE | Ready::HUP,
1977            interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP,
1978        };
1979
1980        let should_yield = update_readiness_after_read(17, SocketResult::Closed, &mut readiness);
1981
1982        assert!(!should_yield);
1983        assert!(!readiness.event.is_readable());
1984        assert!(readiness.event.is_writable());
1985        assert!(readiness.event.is_hup());
1986    }
1987}