sozu_lib/protocol/mux/mod.rs
1//! HTTP/1.1 and HTTP/2 multiplexing layer.
2//!
3//! This module unifies HTTP/1.1 and HTTP/2 behind a single [`Mux`] session
4//! state machine that integrates with sozu's mio event loop. The key types:
5//!
6//! - [`Mux`]: The top-level session state, generic over socket (`TcpStream` or
7//! `FrontRustls`) and listener. Implements `SessionState`.
8//! - [`Connection`]: Enum dispatching to [`ConnectionH1`] or [`ConnectionH2`]
9//! for protocol-specific readable/writable logic.
10//! - [`Stream`]: Per-request state with front/back kawa buffers, metrics, and
11//! lifecycle tracking. Shared between H1 and H2 paths.
12//! - [`Context`]: Per-session context (cluster, backends, routing, timeouts).
13//!
14//! The H2 implementation handles RFC 9113 framing, HPACK (RFC 7541), flow
15//! control, flood detection (CVE-2023-44487, CVE-2019-9512/9514/9515/9518,
16//! CVE-2024-27316), and graceful shutdown (double-GOAWAY per RFC 9113 §6.8).
17
18use std::{
19 cell::RefCell,
20 collections::{HashMap, VecDeque},
21 fmt::Debug,
22 io::ErrorKind,
23 net::{Shutdown, SocketAddr},
24 rc::{Rc, Weak},
25 sync::Arc,
26 time::{Duration, Instant},
27};
28
29use mio::{Token, net::TcpStream};
30use rusty_ulid::Ulid;
31use sozu_command::{
32 logging::ansi_palette,
33 proto::command::{Event, EventKind},
34 ready::Ready,
35};
36
37/// Protocol label + session descriptor used as a prefix on every [`Mux`] log
38/// line. Matches the RUSTLS log-context convention:
39/// `[<ulid> - - -]\tMUX\tSession(...)\t >>>`. When colored output is enabled
40/// (via [`ansi_palette`]) the label is wrapped in bold bright-white ANSI
41/// (uniform across every protocol) and the session detail block is rendered
42/// in light grey.
43///
44/// Fields included in the session block:
45/// - `frontend` — mio token of the frontend socket
46/// - `peer` — peer address (or `None` if the socket is gone)
47/// - `streams` — number of streams currently held by the [`Context`]
48/// - `backends` — number of backend connections in the [`Router`]
49/// - `pending_links` — streams waiting to be linked to a backend
50/// - `readiness` — frontend mio readiness snapshot
51macro_rules! log_context {
52 ($self:expr) => {{
53 let (open, reset, grey, gray, white) = ansi_palette();
54 format!(
55 "[{ulid} - - -]\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}peer{reset}={white}{peer:?}{reset}, {gray}streams{reset}={white}{streams}{reset}, {gray}backends{reset}={white}{backends}{reset}, {gray}pending_links{reset}={white}{pending_links}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
56 open = open,
57 reset = reset,
58 grey = grey,
59 gray = gray,
60 white = white,
61 ulid = $self.session_ulid,
62 frontend = $self.frontend_token.0,
63 peer = $self.frontend.socket().peer_addr().ok(),
64 streams = $self.context.streams.len(),
65 backends = $self.router.backends.len(),
66 pending_links = $self.context.pending_links.len(),
67 readiness = $self.frontend.readiness(),
68 )
69 }};
70}
71
72/// Lighter variant of [`log_context!`] that omits the
73/// `streams`/`backends`/`pending_links` counts. Used at sites where the
74/// borrow checker forbids reading `self.router.backends` or
75/// `self.context.streams` (e.g. inside a method that already holds a mutable
76/// borrow on one of them). The ULID and frontend snapshot still carry enough
77/// context to correlate the line back to the rest of the session.
78macro_rules! log_context_lite {
79 ($self:expr) => {{
80 let (open, reset, grey, gray, white) = ansi_palette();
81 format!(
82 "[{ulid} - - -]\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}peer{reset}={white}{peer:?}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
83 open = open,
84 reset = reset,
85 grey = grey,
86 gray = gray,
87 white = white,
88 ulid = $self.session_ulid,
89 frontend = $self.frontend_token.0,
90 peer = $self.frontend.socket().peer_addr().ok(),
91 readiness = $self.frontend.readiness(),
92 )
93 }};
94}
95
96/// Module-level prefix for logs emitted from free functions or routing
97/// blocks where no [`Mux`] is in scope. Honours the colored flag.
98///
99/// Two arms:
100/// * `log_module_context!()` — zero-arg, legacy `MUX\t >>>` output. Kept
101/// for sites without an `HttpContext` in scope (e.g. the generic
102/// `trace!` that fires before the variant-specific match).
103/// * `log_module_context!($http_context)` — rich form. `$http_context`
104/// must be `&HttpContext`. Produces the same
105/// `[session req cluster backend]` bracket as RUSTLS/PIPE/TCP followed
106/// by a `Session(...)` block, so MUX lines emitted from variant match
107/// arms stay filterable by session ULID or request ULID. Mirrors
108/// `router.rs:log_module_context!($http_context)` (see there).
109macro_rules! log_module_context {
110 () => {{
111 let (open, reset, _, _, _) = ansi_palette();
112 format!("{open}MUX{reset}\t >>>", open = open, reset = reset)
113 }};
114 ($http_context:expr) => {{
115 let (open, reset, grey, gray, white) = ansi_palette();
116 let http_ctx: &HttpContext = &$http_context;
117 let ctx = http_ctx.log_context();
118 format!(
119 "{gray}{ctx}{reset}\t{open}MUX{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend:?}{reset}, {gray}method{reset}={white}{method:?}{reset}, {gray}authority{reset}={white}{authority:?}{reset})\t >>>",
120 open = open,
121 reset = reset,
122 grey = grey,
123 gray = gray,
124 white = white,
125 ctx = ctx,
126 frontend = http_ctx.session_address,
127 method = http_ctx.method,
128 authority = http_ctx.authority,
129 )
130 }};
131}
132
133pub mod answers;
134pub mod auth;
135pub mod connection;
136mod converter;
137pub mod debug;
138mod h1;
139mod h2;
140pub mod parser;
141mod pkawa;
142pub mod router;
143pub(crate) mod serializer;
144mod shared;
145pub mod stream;
146
147use crate::metrics::names;
148use crate::{
149 BackendConnectionError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerHandler,
150 ProxySession, Readiness, RetrieveClusterError, SessionIsToBeClosed, SessionMetrics,
151 SessionResult, StateResult,
152 backends::{Backend, BackendError},
153 http::HttpListener,
154 https::HttpsListener,
155 pool::{Checkout, Pool},
156 protocol::{SessionState, http::editor::HttpContext},
157 retry::RetryPolicy,
158 server::push_event,
159 socket::{FrontRustls, SessionTcpStream, SocketHandler, SocketResult, stats::socket_rtt},
160};
161
162pub(crate) use crate::protocol::mux::answers::{
163 forcefully_terminate_answer, set_default_answer, set_default_answer_with_retry_after,
164};
165use crate::protocol::mux::connection::{EndpointClient, EndpointServer};
166pub use crate::protocol::mux::{
167 answers::terminate_default_answer,
168 connection::Connection,
169 debug::{DebugEvent, DebugHistory},
170 h1::ConnectionH1,
171 h2::ConnectionH2,
172 h2::H2ByteAccounting,
173 h2::H2ConnectionConfig,
174 h2::H2DrainState,
175 h2::H2FloodConfig,
176 h2::H2FlowControl,
177 parser::H2Error,
178 router::Router,
179 stream::{Stream, StreamParts, StreamState},
180};
181
182// ── Tuning Constants ─────────────────────────────────────────────────────────
183
184/// Maximum event loop iterations before forcefully closing a session.
185/// Prevents infinite loops from consuming the single-threaded worker.
186const MAX_LOOP_ITERATIONS: i32 = 10_000;
187// ─────────────────────────────────────────────────────────────────────────────
188
189/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
190type GenericHttpStream = kawa::Kawa<Checkout>;
191type StreamId = u32;
192type GlobalStreamId = usize;
193pub type MuxClear = Mux<SessionTcpStream, HttpListener>;
194pub type MuxTls = Mux<FrontRustls, HttpsListener>;
195
196pub enum Position {
197 Client(String, Rc<RefCell<Backend>>, BackendStatus),
198 Server,
199}
200
201impl Debug for Position {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 match self {
204 Self::Client(cluster_id, _, status) => f
205 .debug_tuple("Client")
206 .field(cluster_id)
207 .field(status)
208 .finish(),
209 Self::Server => write!(f, "Server"),
210 }
211 }
212}
213
214impl Position {
215 fn is_server(&self) -> bool {
216 match self {
217 Position::Client(..) => false,
218 Position::Server => true,
219 }
220 }
221 fn is_client(&self) -> bool {
222 !self.is_server()
223 }
224
225 /// Increment the global `count!()` counter for bytes read on this side.
226 pub fn count_bytes_in_counter(&self, size: usize) {
227 match self {
228 Position::Client(..) => count!(names::backend::BACK_BYTES_IN, size as i64),
229 Position::Server => count!(names::backend::BYTES_IN, size as i64),
230 }
231 }
232
233 /// Increment the global `count!()` counter for bytes written on this side.
234 pub fn count_bytes_out_counter(&self, size: usize) {
235 match self {
236 Position::Client(..) => count!(names::backend::BACK_BYTES_OUT, size as i64),
237 Position::Server => count!(names::backend::BYTES_OUT, size as i64),
238 }
239 }
240
241 /// Attribute `size` bytes read to the appropriate `SessionMetrics` field.
242 pub fn count_bytes_in(&self, metrics: &mut SessionMetrics, size: usize) {
243 match self {
244 Position::Client(..) => metrics.backend_bin += size,
245 Position::Server => metrics.bin += size,
246 }
247 }
248
249 /// Attribute `size` bytes written to the appropriate `SessionMetrics` field.
250 pub fn count_bytes_out(&self, metrics: &mut SessionMetrics, size: usize) {
251 match self {
252 Position::Client(..) => metrics.backend_bout += size,
253 Position::Server => metrics.bout += size,
254 }
255 }
256}
257
258#[derive(Debug)]
259pub enum BackendStatus {
260 Connecting(Instant),
261 Connected,
262 KeepAlive,
263 Disconnecting,
264}
265
266#[derive(Debug, Clone, Copy)]
267pub enum MuxResult {
268 Continue,
269 Upgrade,
270 CloseSession,
271}
272
273pub trait Endpoint: Debug {
274 fn readiness(&self, token: Token) -> &Readiness;
275 fn readiness_mut(&mut self, token: Token) -> &mut Readiness;
276 /// Returns the underlying TCP socket for the peer side of a stream.
277 ///
278 /// Used by access-log emission to capture TCP_INFO RTT for the side the
279 /// caller does NOT own directly: a frontend connection (Position::Server)
280 /// reads the backend socket through this method, and a backend connection
281 /// (Position::Client) reads the frontend socket the same way. `token` is
282 /// ignored by [`super::connection::EndpointServer`] (which has a single
283 /// frontend connection) and used as a key by
284 /// [`super::connection::EndpointClient`] (which keys backends by token).
285 /// Returns `None` when the token doesn't resolve, mirroring the existing
286 /// fallback paths in `readiness`/`readiness_mut`.
287 fn socket(&self, token: Token) -> Option<&TcpStream>;
288 /// If end_stream is called on a client it means the stream has PROPERLY finished,
289 /// the server has completed serving the response and informs the endpoint that this stream won't be used anymore.
290 /// If end_stream is called on a server it means the stream was BROKEN, the client was most likely disconnected or encountered an error
291 /// it is for the server to decide if the stream can be retried or an error should be sent. It should be GUARANTEED that all bytes from
292 /// the backend were read. However it is almost certain that all bytes were not already sent to the client.
293 fn end_stream<L: ListenerHandler + L7ListenerHandler>(
294 &mut self,
295 token: Token,
296 stream: GlobalStreamId,
297 context: &mut Context<L>,
298 );
299 /// If start_stream is called on a client it means the stream should be attached to this endpoint,
300 /// the stream might be recovering from a disconnection, in any case at this point its response MUST be empty.
301 /// If the start_stream is called on a H2 server it means the stream is a server push and its request MUST be empty.
302 /// Returns false if the stream could not be started (e.g. max concurrent streams reached).
303 fn start_stream<L: ListenerHandler + L7ListenerHandler>(
304 &mut self,
305 token: Token,
306 stream: GlobalStreamId,
307 context: &mut Context<L>,
308 ) -> bool;
309}
310
311/// Shared logic for half-close accounting: clear `bit` from `readiness.event` on
312/// socket errors/would-block, and return `true` (yield) when no bytes were
313/// transferred so the caller can park the half. Rationale for clearing only
314/// `bit` (not both halves) on `Closed`: the opposite half may still need one
315/// last pass to flush queued frames or the TLS close_notify.
316fn update_readiness(
317 size: usize,
318 status: SocketResult,
319 readiness: &mut Readiness,
320 bit: Ready,
321) -> bool {
322 trace!(
323 "{} size={}, status={:?}",
324 log_module_context!(),
325 size,
326 status
327 );
328 match status {
329 SocketResult::Continue => {}
330 SocketResult::Closed | SocketResult::Error | SocketResult::WouldBlock => {
331 readiness.event.remove(bit);
332 }
333 }
334 if size > 0 {
335 false
336 } else {
337 readiness.event.remove(bit);
338 true
339 }
340}
341
342fn update_readiness_after_read(
343 size: usize,
344 status: SocketResult,
345 readiness: &mut Readiness,
346) -> bool {
347 update_readiness(size, status, readiness, Ready::READABLE)
348}
349
350fn update_readiness_after_write(
351 size: usize,
352 status: SocketResult,
353 readiness: &mut Readiness,
354) -> bool {
355 update_readiness(size, status, readiness, Ready::WRITABLE)
356}
357pub struct Context<L: ListenerHandler + L7ListenerHandler> {
358 pub streams: Vec<Stream>,
359 /// Streams whose state is `StreamState::Link` and need backend connection.
360 /// Replaces the O(n) scan of `streams` in the ready loop.
361 pub pending_links: VecDeque<GlobalStreamId>,
362 /// Reverse index: backend token -> global stream IDs currently in
363 /// `StreamState::Linked(token)`. Eliminates O(n) scans of `streams`
364 /// when handling backend connect/disconnect/timeout/close events.
365 pub backend_streams: HashMap<Token, Vec<GlobalStreamId>>,
366 pub pool: Weak<RefCell<Pool>>,
367 pub listener: Rc<RefCell<L>>,
368 /// Connection/session ULID — mirrors `Mux.session_ulid`. Stored here so
369 /// per-stream `HttpContext` construction in [`Self::create_stream`] can
370 /// stamp the session slot of the log-context bracket without reaching
371 /// back into the parent [`Mux`].
372 pub session_ulid: Ulid,
373 pub session_address: Option<SocketAddr>,
374 pub public_address: SocketAddr,
375 pub debug: DebugHistory,
376 /// Shrink threshold ratio for recycled stream slots.
377 /// Vec is shrunk when total_slots > active_streams * ratio.
378 pub h2_stream_shrink_ratio: usize,
379 /// TLS SNI value negotiated at handshake, propagated to every
380 /// per-stream [`HttpContext`] so the routing layer can enforce
381 /// the SNI ↔ `:authority` binding on every H2 stream (and the
382 /// single H1 request). `None` for plaintext listeners or when
383 /// the client omitted the SNI extension. Stored pre-lowercased
384 /// and without a port for cheap exact-match comparison.
385 pub tls_server_name: Option<String>,
386 /// Snapshot of the SAN set of the certificate Sōzu actually served at
387 /// the TLS handshake. Captured once in `https.rs::upgrade_handshake`
388 /// from the resolver and frozen for the connection lifetime so H2
389 /// stream coalescing (RFC 7540 §9.1.1 / RFC 9113 §9.1.1) accepts any
390 /// `:authority` covered by the certificate, with RFC 6125 §6.4.3
391 /// wildcard handling. `None` for plaintext listeners or when SNI was
392 /// absent. `Some(empty)` when the default cert was served — every
393 /// `:authority` is rejected. `Arc` so the snapshot is shared across
394 /// every per-stream `HttpContext` without re-allocation.
395 pub tls_cert_names: Option<Arc<Vec<String>>>,
396 /// Whether the routing layer must reject any request whose authority
397 /// host does not exact-match `tls_server_name` (CWE-346 / CWE-444).
398 /// Mirrors `HttpsListenerConfig::strict_sni_binding`; captured once
399 /// at `Context::new` so routing decisions on each stream avoid a
400 /// per-stream `listener.borrow()`.
401 pub strict_sni_binding: bool,
402 /// Whether the request-side block walk must strip any client-supplied
403 /// `X-Real-IP` header before forwarding (anti-spoofing). Mirrors
404 /// `HttpListenerConfig::elide_x_real_ip` /
405 /// `HttpsListenerConfig::elide_x_real_ip`; captured once at
406 /// `Context::new` so per-stream `HttpContext`s do not need to call
407 /// `listener.borrow()` again. Independent of `send_x_real_ip`.
408 pub elide_x_real_ip: bool,
409 /// Whether `on_request_headers` injects a proxy-generated `X-Real-IP`
410 /// header carrying the connection peer IP (post-PROXY-v2 unwrap).
411 /// Mirrors `HttpListenerConfig::send_x_real_ip` /
412 /// `HttpsListenerConfig::send_x_real_ip`; captured once at
413 /// `Context::new`. Independent of `elide_x_real_ip`.
414 pub send_x_real_ip: bool,
415 /// Negotiated TLS protocol version short-form (e.g. `"TLSv1.3"`).
416 /// Captured once at handshake completion in `https.rs` and propagated
417 /// to every per-stream [`HttpContext`] so the access log can record it
418 /// without reaching back into the rustls session per request. `None`
419 /// for plaintext listeners.
420 pub tls_version: Option<&'static str>,
421 /// Negotiated TLS cipher suite short-form (e.g.
422 /// `"TLS_AES_128_GCM_SHA256"`). Captured once at handshake completion
423 /// and propagated to every per-stream [`HttpContext`]. `None` for
424 /// plaintext listeners.
425 pub tls_cipher: Option<&'static str>,
426 /// Negotiated ALPN protocol short-form (e.g. `"h2"`, `"http/1.1"`).
427 /// Captured once at handshake completion and propagated to every
428 /// per-stream [`HttpContext`]. `None` for plaintext listeners or when
429 /// no ALPN was negotiated.
430 pub tls_alpn: Option<&'static str>,
431}
432
433impl<L: ListenerHandler + L7ListenerHandler> Context<L> {
434 pub fn new(
435 session_ulid: Ulid,
436 pool: Weak<RefCell<Pool>>,
437 listener: Rc<RefCell<L>>,
438 session_address: Option<SocketAddr>,
439 public_address: SocketAddr,
440 ) -> Self {
441 let h2_stream_shrink_ratio = listener
442 .borrow()
443 .get_h2_connection_config()
444 .stream_shrink_ratio as usize;
445 let strict_sni_binding = listener.borrow().get_strict_sni_binding();
446 let elide_x_real_ip = listener.borrow().get_elide_x_real_ip();
447 let send_x_real_ip = listener.borrow().get_send_x_real_ip();
448 Self {
449 streams: Vec::new(),
450 pending_links: VecDeque::new(),
451 backend_streams: HashMap::new(),
452 pool,
453 listener,
454 session_ulid,
455 session_address,
456 public_address,
457 debug: DebugHistory::new(),
458 h2_stream_shrink_ratio,
459 tls_server_name: None,
460 tls_cert_names: None,
461 strict_sni_binding,
462 elide_x_real_ip,
463 send_x_real_ip,
464 tls_version: None,
465 tls_cipher: None,
466 tls_alpn: None,
467 }
468 }
469
470 pub fn active_len(&self) -> usize {
471 self.streams
472 .iter()
473 .filter(|s| !matches!(s.state, StreamState::Recycle))
474 .count()
475 }
476
477 /// Shared accessor for the [`HttpContext`] owned by a stream.
478 ///
479 /// Prefer this over `&self.streams[stream_id].context` at call sites
480 /// that only need read access — it keeps the `Stream`/`HttpContext`
481 /// relationship encapsulated and reads the same regardless of whether
482 /// the caller is inside `Router::connect`, the H2 mux, or a free
483 /// helper. Panics on an out-of-bounds `stream_id`, which is the same
484 /// behaviour as the raw `streams[sid]` indexing it replaces.
485 pub fn http_context(&self, stream_id: GlobalStreamId) -> &HttpContext {
486 &self.streams[stream_id].context
487 }
488
489 /// Mutable sibling of [`Self::http_context`]. Use when routing
490 /// decisions need to stamp `cluster_id` / `backend_id` on the stream's
491 /// [`HttpContext`] (e.g. `Router::connect` at the fill-cluster /
492 /// fill-backend points).
493 pub fn http_context_mut(&mut self, stream_id: GlobalStreamId) -> &mut HttpContext {
494 &mut self.streams[stream_id].context
495 }
496
497 /// Register a stream as linked to a backend token in the reverse index.
498 pub fn link_stream(&mut self, stream_id: GlobalStreamId, token: Token) {
499 self.streams[stream_id].state = StreamState::Linked(token);
500 self.backend_streams
501 .entry(token)
502 .or_default()
503 .push(stream_id);
504 }
505
506 /// Remove a stream from the backend reverse index if it is currently
507 /// `Linked`. Returns the backend token if one was removed.
508 pub fn unlink_stream(&mut self, stream_id: GlobalStreamId) -> Option<Token> {
509 if let StreamState::Linked(token) = self.streams[stream_id].state {
510 remove_backend_stream(&mut self.backend_streams, token, stream_id);
511 Some(token)
512 } else {
513 None
514 }
515 }
516
517 pub fn create_stream(&mut self, request_id: Ulid, window: u32) -> Option<GlobalStreamId> {
518 let http_context = {
519 let listener = self.listener.borrow();
520 let mut http_context = HttpContext::new(
521 self.session_ulid,
522 request_id,
523 listener.protocol(),
524 self.public_address,
525 self.session_address,
526 listener.get_sticky_name().to_string(),
527 listener.get_sozu_id_header().to_string(),
528 self.elide_x_real_ip,
529 self.send_x_real_ip,
530 );
531 // Propagate the connection-scoped TLS SNI onto every per-stream
532 // HttpContext so `route_from_request` can enforce the SNI ↔
533 // `:authority` binding for each H2 stream independently.
534 http_context.tls_server_name = self.tls_server_name.clone();
535 // Mirror the frozen-at-handshake SAN snapshot. `Arc` clone is a
536 // refcount bump, not a deep copy — every per-stream
537 // `HttpContext` shares the same `Vec<String>`.
538 http_context.tls_cert_names = self.tls_cert_names.clone();
539 // Mirror the listener's strict_sni_binding flag onto each
540 // HttpContext so the routing layer can honor operator opt-outs
541 // without reaching back into the listener on every request.
542 http_context.strict_sni_binding = self.strict_sni_binding;
543 // Propagate the connection-scoped TLS metadata onto every
544 // per-stream HttpContext so the access log can record it without
545 // touching the rustls session on every request. These are
546 // `&'static str` borrows from the rustls label tables — copy is
547 // a pointer move.
548 http_context.tls_version = self.tls_version;
549 http_context.tls_cipher = self.tls_cipher;
550 http_context.tls_alpn = self.tls_alpn;
551 http_context
552 };
553 let recycle_slot = self
554 .streams
555 .iter()
556 .position(|s| s.state == StreamState::Recycle);
557 if let Some(stream_id) = recycle_slot {
558 let stream = &mut self.streams[stream_id];
559 trace!("{} Reuse stream: {}", log_module_context!(), stream_id);
560 stream.state = StreamState::Idle;
561 stream.attempts = 0;
562 stream.front_received_end_of_stream = false;
563 stream.back_received_end_of_stream = false;
564 stream.front_data_received = 0;
565 stream.back_data_received = 0;
566 stream.request_counted = false;
567 stream.window = i32::try_from(window).unwrap_or(i32::MAX);
568 stream.context = http_context;
569 stream.back.clear();
570 stream.back.storage.clear();
571 stream.front.clear();
572 stream.front.storage.clear();
573 stream.metrics.reset();
574 stream.metrics.mark_request_start();
575 // After recycling a slot, check if the Vec has excessive trailing
576 // Recycle entries (more than 2x active streams of total capacity).
577 let active = self.active_len();
578 let total = self.streams.len();
579 if total > 1 && active > 0 && total > active * self.h2_stream_shrink_ratio {
580 self.shrink_trailing_recycle();
581 }
582 return Some(stream_id);
583 }
584 self.streams
585 .push(Stream::new(self.pool.clone(), http_context, window)?);
586 Some(self.streams.len() - 1)
587 }
588
589 /// Remove consecutive `Recycle` entries from the end of the streams Vec.
590 ///
591 /// This prevents unbounded growth when H2 streams are created and recycled
592 /// over time, reclaiming memory from slots that are no longer needed.
593 pub fn shrink_trailing_recycle(&mut self) {
594 while self
595 .streams
596 .last()
597 .is_some_and(|s| s.state == StreamState::Recycle)
598 {
599 self.streams.pop();
600 }
601 }
602}
603
604/// Remove `stream_id` from the backend-token reverse index for `token`.
605/// Free function to allow split borrows when `context.streams` is already
606/// mutably borrowed (preventing a `Context::unlink_stream` call).
607pub(super) fn remove_backend_stream(
608 index: &mut HashMap<Token, Vec<GlobalStreamId>>,
609 token: Token,
610 stream_id: GlobalStreamId,
611) {
612 if let Some(ids) = index.get_mut(&token) {
613 ids.retain(|&id| id != stream_id);
614 if ids.is_empty() {
615 index.remove(&token);
616 }
617 }
618}
619
620pub struct Mux<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
621 pub configured_frontend_timeout: Duration,
622 pub frontend_token: Token,
623 pub frontend: Connection<Front>,
624 pub router: Router,
625 pub context: Context<L>,
626 /// Per-session correlation ID generated at construction time. Included in
627 /// every log line emitted from this module so all events for a single
628 /// frontend connection can be reassembled (independent of the ephemeral
629 /// per-stream request id used by access logs).
630 pub session_ulid: Ulid,
631}
632
633impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Mux<Front, L> {
634 pub fn front_socket(&self) -> &TcpStream {
635 self.frontend.socket()
636 }
637}
638
639impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHandler> Mux<Front, L> {
640 fn delay_close_for_frontend_flush(&mut self, reason: &'static str) -> bool {
641 let _ = self.frontend.initiate_close_notify();
642 // LIFECYCLE §9 invariant 16: consult per-stream back-buffers in
643 // addition to the connection-level pending-write predicate so
644 // shutdown does not close while any open H2 stream still has
645 // kawa bytes queued after a voluntary scheduler yield.
646 if self
647 .frontend
648 .has_pending_write_including_streams(&self.context)
649 {
650 let readiness = self.frontend.readiness_mut();
651 readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
652 readiness.signal_pending_write();
653 debug!(
654 "{} Mux delaying close on {}: {:?}",
655 log_context!(self),
656 reason,
657 self.frontend
658 );
659 true
660 } else {
661 false
662 }
663 }
664
665 /// Drive the frontend I/O path during shutdown, when the server is polling
666 /// `shutting_down()` outside the normal epoll readiness loop.
667 ///
668 /// This is required for H2 graceful shutdown because a stream may still
669 /// need one last readable pass to observe the peer's END_STREAM or one last
670 /// writable pass to retire the stream, emit GOAWAY, or flush TLS records.
671 fn drive_frontend_shutdown_io(&mut self) -> SessionIsToBeClosed {
672 let force_h2_read = matches!(self.frontend, Connection::H2(_));
673 let force_h2_write = matches!(self.frontend, Connection::H2(_));
674 let readiness = self.frontend.readiness().clone();
675 if !force_h2_read
676 && !force_h2_write
677 && readiness.event.is_empty()
678 && !self.frontend.has_pending_write()
679 {
680 return false;
681 }
682
683 if force_h2_read || self.frontend.readiness().event.is_readable() {
684 self.frontend
685 .readiness_mut()
686 .interest
687 .insert(Ready::READABLE);
688 match self
689 .frontend
690 .readable(&mut self.context, EndpointClient(&mut self.router))
691 {
692 MuxResult::Continue => {}
693 MuxResult::CloseSession | MuxResult::Upgrade => return true,
694 }
695 }
696
697 if !force_h2_write
698 && !self.frontend.has_pending_write()
699 && !self.frontend.readiness().event.is_writable()
700 {
701 return false;
702 }
703
704 let mut iterations = 0;
705 loop {
706 self.frontend
707 .readiness_mut()
708 .interest
709 .insert(Ready::WRITABLE);
710 if force_h2_write {
711 self.frontend.readiness_mut().signal_pending_write();
712 }
713 match self
714 .frontend
715 .writable(&mut self.context, EndpointClient(&mut self.router))
716 {
717 MuxResult::Continue => {}
718 MuxResult::CloseSession | MuxResult::Upgrade => return true,
719 }
720
721 iterations += 1;
722 if iterations >= MAX_LOOP_ITERATIONS
723 || (!self.frontend.has_pending_write()
724 && !self.frontend.readiness().event.is_writable())
725 {
726 break;
727 }
728 }
729 false
730 }
731}
732
733impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHandler> SessionState
734 for Mux<Front, L>
735{
736 fn ready(
737 &mut self,
738 session: Rc<RefCell<dyn ProxySession>>,
739 proxy: Rc<RefCell<dyn L7Proxy>>,
740 _metrics: &mut SessionMetrics,
741 ) -> SessionResult {
742 let mut counter = 0;
743
744 if self.frontend.readiness().event.is_hup()
745 && !self.delay_close_for_frontend_flush("frontend HUP")
746 {
747 debug!(
748 "{} Mux closing on frontend HUP: {:?}",
749 log_context!(self),
750 self.frontend
751 );
752 return SessionResult::Close;
753 }
754
755 // Start service timers on all active streams after the HUP check.
756 // This mirrors session-level service_start/service_stop in Http(s)Session::ready()
757 // to measure only CPU processing time, excluding epoll wait between cycles.
758 for stream in &mut self.context.streams {
759 if stream.state.is_open() {
760 stream.metrics.service_start();
761 }
762 }
763
764 let start = Instant::now();
765 self.context.debug.push(DebugEvent::ReadyTimestamp(
766 std::time::SystemTime::now()
767 .duration_since(std::time::UNIX_EPOCH)
768 .unwrap_or_default()
769 .as_millis() as usize,
770 ));
771 trace!("{} {:?}", log_context!(self), start);
772 loop {
773 self.context.debug.push(DebugEvent::LoopStart);
774 loop {
775 self.context.debug.push(DebugEvent::LoopIteration(counter));
776 if self.frontend.readiness().filter_interest().is_readable() {
777 let res = {
778 let context = &mut self.context;
779 let res = self
780 .frontend
781 .readable(context, EndpointClient(&mut self.router));
782 context.debug.push(DebugEvent::SR(
783 self.frontend_token,
784 res,
785 self.frontend.readiness().clone(),
786 ));
787 res
788 };
789 match res {
790 MuxResult::Continue => {}
791 MuxResult::CloseSession => {
792 if !self.delay_close_for_frontend_flush("frontend readable") {
793 debug!(
794 "{} Mux close from frontend readable: {:?}",
795 log_context!(self),
796 self.frontend
797 );
798 return SessionResult::Close;
799 }
800 }
801 MuxResult::Upgrade => return SessionResult::Upgrade,
802 }
803 }
804
805 let mut all_backends_readiness_are_empty = true;
806 let mut dead_backends = Vec::new();
807 let mut backend_close: Option<(&'static str, Token)> = None;
808 for (token, client) in self.router.backends.iter_mut() {
809 let readiness = client.readiness_mut();
810 // Check the raw event for HUP/ERROR — not filter_interest(),
811 // because interest only contains READABLE|WRITABLE and would
812 // always mask out HUP (0b01000) and ERROR (0b00100).
813 let dead = readiness.event.is_hup() || readiness.event.is_error();
814 if dead {
815 trace!(
816 "{} Backend({:?}) -> {:?}",
817 log_context_lite!(self),
818 token,
819 readiness
820 );
821 readiness.event.remove(Ready::WRITABLE);
822 }
823
824 if client.readiness().filter_interest().is_writable() {
825 let position = client.position_mut();
826 match position {
827 Position::Client(
828 cluster_id,
829 backend,
830 BackendStatus::Connecting(start),
831 ) => {
832 #[cfg(debug_assertions)]
833 self.context
834 .debug
835 .push(DebugEvent::CCS(*token, cluster_id.clone()));
836
837 let mut backend_borrow = backend.borrow_mut();
838 if backend_borrow.retry_policy.is_down() {
839 info!(
840 "{} backend server {} at {} is up",
841 log_context_lite!(self),
842 backend_borrow.backend_id,
843 backend_borrow.address
844 );
845 incr!(
846 "backend.up",
847 Some(cluster_id),
848 Some(&backend_borrow.backend_id)
849 );
850 gauge!(
851 names::backend::AVAILABLE,
852 1,
853 Some(cluster_id),
854 Some(&backend_borrow.backend_id)
855 );
856 push_event(Event {
857 kind: EventKind::BackendUp as i32,
858 backend_id: Some(backend_borrow.backend_id.to_owned()),
859 address: Some(backend_borrow.address.into()),
860 cluster_id: Some(cluster_id.to_owned()),
861 metric_detail: None,
862 });
863 }
864
865 //successful connection, reset failure counter
866 backend_borrow.failures = 0;
867 backend_borrow.set_connection_time(start.elapsed());
868 backend_borrow.retry_policy.succeed();
869
870 if let Some(ids) = self.context.backend_streams.get(token) {
871 for &stream_id in ids {
872 self.context.streams[stream_id].metrics.backend_connected();
873 backend_borrow.active_requests += 1;
874 }
875 }
876 trace!(
877 "{} connection success: {:#?}",
878 log_context_lite!(self),
879 backend_borrow
880 );
881 drop(backend_borrow);
882 *position = Position::Client(
883 std::mem::take(cluster_id),
884 backend.clone(),
885 BackendStatus::Connected,
886 );
887 client
888 .timeout_container()
889 .set_duration(self.router.configured_backend_timeout);
890 }
891 Position::Client(..) => {}
892 Position::Server => {
893 error!(
894 "{} backend connection cannot be in Server position",
895 log_context_lite!(self)
896 );
897 }
898 }
899 let res = {
900 let context = &mut self.context;
901 let res = client.writable(context, EndpointServer(&mut self.frontend));
902 context.debug.push(DebugEvent::CW(
903 *token,
904 res,
905 client.readiness().clone(),
906 ));
907 res
908 };
909 match res {
910 MuxResult::Continue => {}
911 MuxResult::Upgrade => {
912 error!(
913 "{} only frontend connections can trigger Upgrade",
914 log_context_lite!(self)
915 );
916 }
917 MuxResult::CloseSession => {
918 backend_close = Some(("backend writable", *token));
919 break;
920 }
921 }
922 // Cross-readiness: backend wrote → wake frontend reader
923 let context = &mut self.context;
924 self.frontend.try_resume_reading(context);
925 }
926
927 if client.readiness().filter_interest().is_readable() {
928 let res = {
929 let context = &mut self.context;
930 let res = client.readable(context, EndpointServer(&mut self.frontend));
931 context.debug.push(DebugEvent::CR(
932 *token,
933 res,
934 client.readiness().clone(),
935 ));
936 res
937 };
938 match res {
939 MuxResult::Continue => {}
940 MuxResult::Upgrade => {
941 error!(
942 "{} only frontend connections can trigger Upgrade (readable)",
943 log_context_lite!(self)
944 );
945 }
946 MuxResult::CloseSession => {
947 backend_close = Some(("backend readable", *token));
948 break;
949 }
950 }
951 }
952
953 if dead
954 && !client.readiness().filter_interest().is_readable()
955 && !client.has_buffer_pressure(&self.context)
956 {
957 self.context
958 .debug
959 .push(DebugEvent::CH(*token, client.readiness().clone()));
960 trace!("{} Closing {:#?}", log_context_lite!(self), client);
961 match client.position() {
962 Position::Client(cluster_id, backend, BackendStatus::Connecting(_)) => {
963 let mut backend_borrow = backend.borrow_mut();
964 backend_borrow.failures += 1;
965
966 let already_unavailable = backend_borrow.retry_policy.is_down();
967 backend_borrow.retry_policy.fail();
968 incr!(
969 "backend.connections.error",
970 Some(cluster_id),
971 Some(&backend_borrow.backend_id)
972 );
973 if !already_unavailable && backend_borrow.retry_policy.is_down() {
974 error!(
975 "{} backend server {} at {} is down",
976 log_context_lite!(self),
977 backend_borrow.backend_id,
978 backend_borrow.address
979 );
980 incr!(
981 "backend.down",
982 Some(cluster_id),
983 Some(&backend_borrow.backend_id)
984 );
985 gauge!(
986 names::backend::AVAILABLE,
987 0,
988 Some(cluster_id),
989 Some(&backend_borrow.backend_id)
990 );
991 push_event(Event {
992 kind: EventKind::BackendDown as i32,
993 backend_id: Some(backend_borrow.backend_id.to_owned()),
994 address: Some(backend_borrow.address.into()),
995 cluster_id: Some(cluster_id.to_owned()),
996 metric_detail: None,
997 });
998 }
999 trace!(
1000 "{} connection fail: {:#?}",
1001 log_context_lite!(self),
1002 backend_borrow
1003 );
1004 }
1005 Position::Client(_, backend, _) => {
1006 let mut backend_borrow = backend.borrow_mut();
1007 let count = self
1008 .context
1009 .backend_streams
1010 .get(token)
1011 .map_or(0, |ids| ids.len());
1012 backend_borrow.active_requests =
1013 backend_borrow.active_requests.saturating_sub(count);
1014 }
1015 Position::Server => {
1016 error!(
1017 "{} dead backend cannot be in Server position",
1018 log_context_lite!(self)
1019 );
1020 }
1021 }
1022 client.close(&mut self.context, EndpointServer(&mut self.frontend));
1023 dead_backends.push(*token);
1024 }
1025
1026 if !client.readiness().filter_interest().is_empty() {
1027 all_backends_readiness_are_empty = false;
1028 }
1029 }
1030 // Remove dead backends from the map BEFORE handling
1031 // backend_close. client.close() already decremented
1032 // connections_per_backend / backend.connections gauges in
1033 // the loop above; if we return SessionResult::Close before
1034 // removing them, Mux::close() would decrement again
1035 // (double-decrement → gauge underflow).
1036 if !dead_backends.is_empty() {
1037 for token in &dead_backends {
1038 let proxy_borrow = proxy.borrow();
1039 if let Some(mut client) = self.router.backends.remove(token) {
1040 client.timeout_container().cancel();
1041 let socket = client.socket_mut();
1042 if let Err(e) = proxy_borrow.deregister_socket(socket) {
1043 error!(
1044 "{} error deregistering back socket({:?}): {:?}",
1045 log_context!(self),
1046 socket,
1047 e
1048 );
1049 }
1050 // invariant: write-only shutdown — Shutdown::Both on a TLS frontend
1051 // discards the receive buffer and elicits TCP RST, truncating the
1052 // already-queued response. Canonical write-up: `lib/src/https.rs:650-655`.
1053 // Backend sockets follow the same discipline for symmetry.
1054 if let Err(e) = socket.shutdown(Shutdown::Write) {
1055 if e.kind() != ErrorKind::NotConnected {
1056 error!(
1057 "{} error shutting down back socket({:?}): {:?}",
1058 log_context!(self),
1059 socket,
1060 e
1061 );
1062 }
1063 }
1064 } else {
1065 error!("{} session {:?} has no backend!", log_context!(self), token);
1066 }
1067 if !proxy_borrow.remove_session(*token) {
1068 error!(
1069 "{} session {:?} was already removed!",
1070 log_context!(self),
1071 token
1072 );
1073 }
1074 }
1075 trace!("{} FRONTEND: {:#?}", log_context!(self), self.frontend);
1076 trace!(
1077 "{} BACKENDS: {:#?}",
1078 log_context!(self),
1079 self.router.backends
1080 );
1081 }
1082 if let Some((reason, token)) = backend_close {
1083 if !self.delay_close_for_frontend_flush(reason) {
1084 debug!(
1085 "{} Mux close from {} token={:?}: frontend={:?}",
1086 log_context!(self),
1087 reason,
1088 token,
1089 self.frontend
1090 );
1091 return SessionResult::Close;
1092 }
1093 all_backends_readiness_are_empty = false;
1094 }
1095
1096 if self.frontend.readiness().filter_interest().is_writable() {
1097 let res = {
1098 let context = &mut self.context;
1099 let res = self
1100 .frontend
1101 .writable(context, EndpointClient(&mut self.router));
1102 context.debug.push(DebugEvent::SW(
1103 self.frontend_token,
1104 res,
1105 self.frontend.readiness().clone(),
1106 ));
1107 res
1108 };
1109 match res {
1110 MuxResult::Continue => {}
1111 MuxResult::CloseSession => {
1112 if !self.delay_close_for_frontend_flush("frontend writable") {
1113 debug!(
1114 "{} Mux close from frontend writable: {:?}",
1115 log_context!(self),
1116 self.frontend
1117 );
1118 return SessionResult::Close;
1119 }
1120 }
1121 MuxResult::Upgrade => return SessionResult::Upgrade,
1122 }
1123 // Cross-readiness: frontend wrote → wake parked backends.
1124 // If any backend resumes, invalidate the stale readiness
1125 // flag so the inner loop continues instead of breaking.
1126 let context = &mut self.context;
1127 for (_token, backend) in self.router.backends.iter_mut() {
1128 if backend.try_resume_reading(context) {
1129 all_backends_readiness_are_empty = false;
1130 }
1131 }
1132 }
1133
1134 if self.frontend.readiness().filter_interest().is_empty()
1135 && all_backends_readiness_are_empty
1136 {
1137 break;
1138 }
1139
1140 counter += 1;
1141 if counter >= MAX_LOOP_ITERATIONS {
1142 incr!(names::http::INFINITE_LOOP_ERROR);
1143 if self.frontend.has_pending_write() {
1144 debug!(
1145 "{} Mux loop budget exhausted while frontend flush pending: {:?}",
1146 log_context!(self),
1147 self.frontend
1148 );
1149 self.frontend.readiness_mut().event.remove(Ready::WRITABLE);
1150 self.frontend.timeout_container().set(self.frontend_token);
1151 break;
1152 }
1153 return SessionResult::Close;
1154 }
1155 }
1156
1157 let context = &mut self.context;
1158 let answers_rc = context.listener.borrow().get_answers().clone();
1159 let mut dirty = false;
1160 while let Some(stream_id) = context.pending_links.pop_front() {
1161 let Some(stream) = context.streams.get(stream_id) else {
1162 continue;
1163 };
1164 if stream.state != StreamState::Link {
1165 continue;
1166 }
1167 // Before the first request triggers a stream Link, the frontend timeout is set
1168 // to a shorter request_timeout, here we switch to the longer nominal timeout
1169 self.frontend
1170 .timeout_container()
1171 .set_duration(self.configured_frontend_timeout);
1172 let front_readiness = self.frontend.readiness_mut();
1173 dirty = true;
1174 match self.router.connect(
1175 stream_id,
1176 context,
1177 session.clone(),
1178 proxy.clone(),
1179 self.frontend_token,
1180 ) {
1181 Ok(_) => {
1182 let state = context.streams[stream_id].state;
1183 context.debug.push(DebugEvent::CC(stream_id, state));
1184 }
1185 Err(error) => {
1186 trace!("{} Connection error: {}", log_module_context!(), error);
1187 let stream = &mut context.streams[stream_id];
1188 let answers = answers_rc.borrow();
1189 use BackendConnectionError as BE;
1190 match error {
1191 BE::MaxConnectionRetries(_)
1192 | BE::MaxSessionsMemory
1193 | BE::MaxBuffers => {
1194 warn!(
1195 "{} backend retry budget exhausted: {}",
1196 log_module_context!(stream.context),
1197 error
1198 );
1199 set_default_answer(stream, front_readiness, 503, &answers);
1200 }
1201 BE::Backend(BackendError::NoBackendForCluster(_)) => {
1202 set_default_answer(stream, front_readiness, 503, &answers);
1203 }
1204 BE::RetrieveClusterError(RetrieveClusterError::RetrieveFrontend(
1205 ref err,
1206 )) => {
1207 // RFC 9110 §15.5.1: a malformed authority is a
1208 // 400. A syntactically valid authority that
1209 // simply has no matching frontend stays on the
1210 // historical 404 path.
1211 let code = match err {
1212 FrontendFromRequestError::HostParse { .. }
1213 | FrontendFromRequestError::InvalidCharsAfterHost(_) => 400,
1214 FrontendFromRequestError::NoClusterFound(_) => 404,
1215 };
1216 set_default_answer(stream, front_readiness, code, &answers);
1217 }
1218 BE::RetrieveClusterError(RetrieveClusterError::UnauthorizedRoute) => {
1219 set_default_answer(stream, front_readiness, 401, &answers);
1220 }
1221 BE::RetrieveClusterError(
1222 RetrieveClusterError::SniAuthorityMismatch { .. },
1223 ) => {
1224 // RFC 9110 §15.5.20: 421 Misdirected Request is the
1225 // semantically correct status for an authority that
1226 // does not belong to this TLS connection. The
1227 // http.sni_authority_mismatch metric emitted in
1228 // `route_from_request` remains the durable signal;
1229 // the 421 body here is what a client sees and may
1230 // retry on a fresh TLS connection with a matching SNI.
1231 set_default_answer(stream, front_readiness, 421, &answers);
1232 }
1233 BE::RetrieveClusterError(RetrieveClusterError::HttpsRedirect) => {
1234 // Use the redirect status stashed by `Router::route_from_request`
1235 // (#1009). Falls back to 301 for the legacy
1236 // `cluster.https_redirect = true` path that does
1237 // not set the field.
1238 let code = stream.context.redirect_status.unwrap_or(301);
1239 set_default_answer(stream, front_readiness, code, &answers);
1240 }
1241
1242 BE::Backend(ref e) => {
1243 error!("{} backend connection error: {}", log_module_context!(), e);
1244 set_default_answer(stream, front_readiness, 503, &answers);
1245 }
1246 BE::RetrieveClusterError(ref other) => {
1247 error!(
1248 "{} unexpected RetrieveClusterError variant: {:?}",
1249 log_module_context!(),
1250 other
1251 );
1252 set_default_answer(stream, front_readiness, 503, &answers);
1253 }
1254 // TCP specific error
1255 BE::NotFound(ref msg) => {
1256 error!(
1257 "{} NotFound is TCP-specific, not reachable in mux: {:?}",
1258 log_module_context!(),
1259 msg
1260 );
1261 set_default_answer(stream, front_readiness, 503, &answers);
1262 }
1263 // Per-(cluster, source-IP) connection limit reached.
1264 // Emit HTTP 429 with the resolved `Retry-After`. The
1265 // value is computed in `Router::connect` (where the
1266 // SessionManager + cluster override are reachable)
1267 // and stashed on the stream context just before the
1268 // error is returned, so the answer engine can render
1269 // (or elide) the header without re-deriving the
1270 // resolution chain here.
1271 BE::TooManyConnectionsPerIp { ref cluster_id } => {
1272 debug!(
1273 "{} per-(cluster, source-IP) limit hit for cluster {:?}",
1274 log_module_context!(),
1275 cluster_id
1276 );
1277 let retry_after = stream.context.retry_after_seconds;
1278 set_default_answer_with_retry_after(
1279 stream,
1280 front_readiness,
1281 429,
1282 &answers,
1283 retry_after,
1284 );
1285 }
1286 }
1287 context.debug.push(DebugEvent::CCF(stream_id, error));
1288 }
1289 }
1290 // All routing error arms now set a default answer, transitioning
1291 // the stream out of Link state. No re-enqueue needed.
1292 }
1293 if !dirty {
1294 break;
1295 }
1296 }
1297
1298 // Stop service timers before yielding to epoll, so idle wait time is excluded
1299 // from the service_time metric. For Close/Upgrade returns, close() handles cleanup.
1300 for stream in &mut self.context.streams {
1301 if stream.state.is_open() {
1302 stream.metrics.service_stop();
1303 }
1304 }
1305
1306 #[cfg(debug_assertions)]
1307 {
1308 // Verify backend_streams index matches actual stream states.
1309 let mut expected: HashMap<Token, Vec<GlobalStreamId>> = HashMap::new();
1310 for (id, stream) in self.context.streams.iter().enumerate() {
1311 if let StreamState::Linked(token) = stream.state {
1312 expected.entry(token).or_default().push(id);
1313 }
1314 }
1315 assert_eq!(
1316 expected.len(),
1317 self.context.backend_streams.len(),
1318 "backend_streams index key count mismatch: expected={:?}, actual={:?}",
1319 expected,
1320 self.context.backend_streams
1321 );
1322 for (token, mut expected_ids) in expected {
1323 let mut actual_ids = self
1324 .context
1325 .backend_streams
1326 .get(&token)
1327 .cloned()
1328 .unwrap_or_default();
1329 expected_ids.sort();
1330 actual_ids.sort();
1331 assert_eq!(
1332 expected_ids, actual_ids,
1333 "backend_streams index mismatch for token {token:?}",
1334 );
1335 }
1336 }
1337
1338 SessionResult::Continue
1339 }
1340
1341 fn update_readiness(&mut self, token: Token, events: Ready) {
1342 trace!("{} EVENTS: {:?} on {:?}", log_context!(self), events, token);
1343 self.context.debug.push(DebugEvent::EV(token, events));
1344 if token == self.frontend_token {
1345 self.frontend.readiness_mut().event |= events;
1346 } else if let Some(c) = self.router.backends.get_mut(&token) {
1347 c.readiness_mut().event |= events;
1348 }
1349 }
1350
1351 fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
1352 trace!("{} MuxState::timeout({:?})", log_context!(self), token);
1353 let front_is_h2 = match self.frontend {
1354 Connection::H1(_) => false,
1355 Connection::H2(_) => true,
1356 };
1357 let answers_rc = self.context.listener.borrow().get_answers().clone();
1358 let mut should_close = true;
1359 let mut should_write = false;
1360 if self.frontend_token == token {
1361 trace!(
1362 "{} MuxState::timeout_frontend({:#?})",
1363 log_context!(self),
1364 self.frontend
1365 );
1366 self.frontend.timeout_container().triggered();
1367 // The per-stream reaper (bidirectional-idle + outbound
1368 // flow-control-stall guards) normally runs only from `readable()`,
1369 // but a fully-silent peer never triggers a read event. Run it on the
1370 // connection-timeout path too so a window-stalled stream — a buffered
1371 // response the peer refuses to drain by holding its receive window
1372 // shut — is reaped and its MAX_CONCURRENT_STREAMS slot freed, instead
1373 // of lingering until the 30-minute zombie checker. The reaper queues
1374 // an `RST_STREAM(CANCEL)`; because `has_pending_write()` does NOT
1375 // observe `pending_rst_streams` (it gates connection close, so a
1376 // queued RST must not read as "keep open"), set `should_write` via
1377 // the dedicated `has_pending_control_write()` probe so the reset is
1378 // actually flushed to the peer before the connection closes — without
1379 // it, a fully-silent peer's stalled stream is freed but the peer sees
1380 // only EOF, never the RST(CANCEL).
1381 if let Connection::H2(h2) = &mut self.frontend {
1382 h2.cancel_timed_out_streams(
1383 &mut self.context,
1384 &mut EndpointClient(&mut self.router),
1385 );
1386 if h2.has_pending_control_write() {
1387 should_write = true;
1388 }
1389 }
1390 if self.frontend.has_pending_write() {
1391 should_write = true;
1392 }
1393 let front_readiness = self.frontend.readiness_mut();
1394 for stream_id in 0..self.context.streams.len() {
1395 match self.context.streams[stream_id].state {
1396 StreamState::Idle => {
1397 // In h1 an Idle stream is always the first request, so we can send a 408
1398 // In h2 an Idle stream doesn't necessarily hold a request yet,
1399 // in most cases it was just reserved, so we can just ignore them.
1400 if !front_is_h2 {
1401 let answers = answers_rc.borrow();
1402 let stream = &mut self.context.streams[stream_id];
1403 stream.context.access_log_message = Some("client_timeout");
1404 set_default_answer(stream, front_readiness, 408, &answers);
1405 should_write = true;
1406 }
1407 }
1408 StreamState::Link => {
1409 // This is an unusual case, as we have both a complete request and no
1410 // available backend yet. For now, we answer with 503.
1411 // Not a timeout-driven outcome from the operator's
1412 // perspective — leave access_log_message as None.
1413 let answers = answers_rc.borrow();
1414 let stream = &mut self.context.streams[stream_id];
1415 set_default_answer(stream, front_readiness, 503, &answers);
1416 should_write = true;
1417 }
1418 StreamState::Linked(_) => {
1419 // The frontend timed out while a stream is linked to a backend.
1420 // The backend timeout should handle this, but in case the backend
1421 // is also stalled, send a 504 and terminate the stream.
1422 if !self.context.streams[stream_id].back.consumed {
1423 self.context.unlink_stream(stream_id);
1424 let answers = answers_rc.borrow();
1425 let stream = &mut self.context.streams[stream_id];
1426 stream.context.access_log_message =
1427 Some("client_timeout_during_response");
1428 set_default_answer(stream, front_readiness, 504, &answers);
1429 should_write = true;
1430 } else if self.context.streams[stream_id].back.is_completed() {
1431 // Response fully proxied, stream can be closed
1432 } else if self.context.streams[stream_id].back.is_terminated()
1433 || self.context.streams[stream_id].back.is_error()
1434 {
1435 // Response is terminated/error but not fully written to frontend.
1436 // Keep the session alive briefly to flush remaining data.
1437 should_close = false;
1438 } else {
1439 // Partial response in progress — forcefully terminate
1440 self.context.unlink_stream(stream_id);
1441 let stream = &mut self.context.streams[stream_id];
1442 stream.context.access_log_message =
1443 Some("client_timeout_during_response");
1444 forcefully_terminate_answer(
1445 stream,
1446 front_readiness,
1447 H2Error::InternalError,
1448 );
1449 should_write = true;
1450 }
1451 // end_stream is called in a second pass below to avoid
1452 // borrow conflicts on context.streams.
1453 }
1454 StreamState::Unlinked => {
1455 // A stream Unlinked already has a response and its backend closed.
1456 // In case it hasn't finished proxying we wait. Otherwise it is a stream
1457 // kept alive for a new request, which can be killed.
1458 if !self.context.streams[stream_id].back.is_completed() {
1459 should_close = false;
1460 }
1461 }
1462 StreamState::Recycle => {
1463 // A recycled stream is an h2 stream which doesn't hold a request anymore.
1464 // We can ignore it.
1465 }
1466 }
1467 }
1468 // Second pass: end streams that were linked to backends.
1469 // This is done separately to avoid borrow conflicts on context.streams.
1470 let linked_streams: Vec<(GlobalStreamId, Token)> = self
1471 .context
1472 .streams
1473 .iter()
1474 .enumerate()
1475 .filter_map(|(id, stream)| {
1476 if let StreamState::Linked(back_token) = stream.state {
1477 Some((id, back_token))
1478 } else {
1479 None
1480 }
1481 })
1482 .collect();
1483 for (stream_id, back_token) in linked_streams {
1484 if let Some(backend) = self.router.backends.get_mut(&back_token) {
1485 backend.end_stream(stream_id, &mut self.context);
1486 }
1487 }
1488 } else if let Some(backend) = self.router.backends.get_mut(&token) {
1489 trace!(
1490 "{} MuxState::timeout_backend({:#?})",
1491 log_context_lite!(self),
1492 backend
1493 );
1494 backend.timeout_container().triggered();
1495 let front_readiness = self.frontend.readiness_mut();
1496 let linked_ids: Vec<GlobalStreamId> = self
1497 .context
1498 .backend_streams
1499 .get(&token)
1500 .map_or_else(Vec::new, |ids| ids.to_owned());
1501 for stream_id in linked_ids {
1502 // This stream is linked to the backend that timedout
1503 if self.context.streams[stream_id].back.is_terminated()
1504 || self.context.streams[stream_id].back.is_error()
1505 {
1506 trace!(
1507 "{} Stream terminated or in error, do nothing, just wait a bit more",
1508 log_module_context!()
1509 );
1510 // Nothing to do, simply wait for the remaining bytes to be proxied
1511 if !self.context.streams[stream_id].back.is_completed() {
1512 should_close = false;
1513 }
1514 } else if !self.context.streams[stream_id].back.consumed {
1515 // The response has not started yet
1516 trace!(
1517 "{} Stream still waiting for response, send 504",
1518 log_module_context!()
1519 );
1520 self.context.unlink_stream(stream_id);
1521 let answers = answers_rc.borrow();
1522 let stream = &mut self.context.streams[stream_id];
1523 stream.context.access_log_message = Some("backend_timeout");
1524 set_default_answer(stream, front_readiness, 504, &answers);
1525 should_write = true;
1526 } else {
1527 trace!(
1528 "{} Stream waiting for end of response, forcefully terminate it",
1529 log_module_context!()
1530 );
1531 self.context.unlink_stream(stream_id);
1532 let stream = &mut self.context.streams[stream_id];
1533 stream.context.access_log_message = Some("backend_response_timeout");
1534 forcefully_terminate_answer(stream, front_readiness, H2Error::InternalError);
1535 should_write = true;
1536 }
1537 backend.end_stream(stream_id, &mut self.context);
1538 }
1539 // Re-arm the backend timeout if the session stays alive (draining streams).
1540 // Without this, the timeout is consumed and the session becomes immortal
1541 // until the zombie checker runs.
1542 if !should_close {
1543 backend.timeout_container().set(token);
1544 }
1545 } else {
1546 // Session received a timeout for an unknown token, ignore it
1547 return StateResult::Continue;
1548 }
1549 if should_write {
1550 // Drain as much pending data as possible before closing.
1551 // A single writable() call is insufficient for large responses —
1552 // the TLS buffer may need multiple flushes. Without this loop,
1553 // the session is killed with unflushed TLS data, causing the
1554 // client to receive a truncated TLS record ("decode error").
1555 //
1556 // The constant 16 is empirical: it papers over a missing
1557 // invariant-15 hop in the H2 mux state machine where the
1558 // writable readiness signal is not always re-armed after a
1559 // partial flush. Long-term plan: reach invariant-15 closure
1560 // and remove this loop. See `lib/src/protocol/mux/LIFECYCLE.md`.
1561 let mut result = StateResult::Continue;
1562 for _ in 0..16 {
1563 result = match self
1564 .frontend
1565 .writable(&mut self.context, EndpointClient(&mut self.router))
1566 {
1567 MuxResult::Continue => StateResult::Continue,
1568 MuxResult::Upgrade => StateResult::Upgrade,
1569 MuxResult::CloseSession => StateResult::CloseSession,
1570 };
1571 if result != StateResult::Continue
1572 || !self.frontend.readiness_mut().interest.is_writable()
1573 {
1574 break;
1575 }
1576 }
1577 // Re-arm the frontend timeout so the session doesn't become immortal.
1578 // The writable call may have partially flushed the response — we need
1579 // the timeout to fire again if the flush stalls.
1580 if result == StateResult::Continue {
1581 self.frontend.timeout_container().set(self.frontend_token);
1582 }
1583 return result;
1584 }
1585 if should_close {
1586 if self.delay_close_for_frontend_flush("timeout") {
1587 debug!(
1588 "{} Mux timeout delaying close for frontend flush: token={:?}, frontend={:?}",
1589 log_context!(self),
1590 token,
1591 self.frontend
1592 );
1593 self.frontend.timeout_container().set(self.frontend_token);
1594 return StateResult::Continue;
1595 }
1596 if front_is_h2 {
1597 debug!(
1598 "{} Mux timeout returning CloseSession: token={:?}, frontend={:?}",
1599 log_context!(self),
1600 token,
1601 self.frontend
1602 );
1603 for (idx, stream) in self.context.streams.iter().enumerate() {
1604 if stream.state != StreamState::Recycle {
1605 debug!(
1606 "{} timeout stream[{}]: state={:?}, front_phase={:?}, back_phase={:?}, front_completed={}, back_completed={}",
1607 log_context!(self),
1608 idx,
1609 stream.state,
1610 stream.front.parsing_phase,
1611 stream.back.parsing_phase,
1612 stream.front.is_completed(),
1613 stream.back.is_completed()
1614 );
1615 }
1616 }
1617 }
1618 StateResult::CloseSession
1619 } else {
1620 // Re-arm the frontend timeout. Without this, the timeout is consumed
1621 // by triggered() and the session stays alive indefinitely until the
1622 // zombie checker runs (default: 30 minutes).
1623 self.frontend.timeout_container().set(self.frontend_token);
1624 StateResult::Continue
1625 }
1626 }
1627
1628 fn cancel_timeouts(&mut self) {
1629 trace!("{} MuxState::cancel_timeouts", log_context!(self));
1630 self.frontend.timeout_container().cancel();
1631 for backend in self.router.backends.values_mut() {
1632 backend.timeout_container().cancel();
1633 }
1634 }
1635
1636 fn print_state(&self, _context: &str) {
1637 // The trait-required `context: &str` parameter (protocol tag like
1638 // "HTTPS"/"HTTP" passed by callers) predates the unified
1639 // `log_context!(self)` envelope. The canonical `MUX` tag lives
1640 // inside `log_context!`, so we ignore the parameter here and emit
1641 // the bracketed Session(...) block instead, mirroring the second
1642 // `error!` in this function.
1643 error!(
1644 "\
1645{} Session(Mux)
1646\tFrontend:
1647\t\ttoken: {:?}\treadiness: {:?}
1648\tBackend(s):",
1649 log_context!(self),
1650 self.frontend_token,
1651 self.frontend.readiness()
1652 );
1653 for (backend_token, backend) in &self.router.backends {
1654 error!(
1655 "{} \t\ttoken: {:?}\treadiness: {:?}",
1656 log_context!(self),
1657 backend_token,
1658 backend.readiness()
1659 )
1660 }
1661 }
1662
1663 fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
1664 if self.context.debug.is_interesting() {
1665 warn!("{} {:?}", log_context!(self), self.context.debug.events);
1666 }
1667 debug!("{} MUX CLOSE", log_context!(self));
1668 trace!("{} FRONTEND: {:#?}", log_context!(self), self.frontend);
1669 trace!(
1670 "{} BACKENDS: {:#?}",
1671 log_context!(self),
1672 self.router.backends
1673 );
1674
1675 // Log active streams at session teardown for timeout diagnosis
1676 let active_count = self
1677 .context
1678 .streams
1679 .iter()
1680 .filter(|s| s.state.is_open() && s.metrics.start.is_some())
1681 .count();
1682 if active_count > 0 {
1683 debug!(
1684 "{} Session close with {} active stream(s)",
1685 log_context!(self),
1686 active_count
1687 );
1688 for (idx, stream) in self
1689 .context
1690 .streams
1691 .iter()
1692 .enumerate()
1693 .filter(|(_, s)| s.state.is_open() && s.metrics.start.is_some())
1694 {
1695 let elapsed = stream.metrics.service_time();
1696 debug!(
1697 "{} active stream[{}]: state={:?} service_time={:?} method={:?} path={:?} status={:?}",
1698 log_context!(self),
1699 idx,
1700 stream.state,
1701 elapsed,
1702 stream.context.method,
1703 stream.context.path,
1704 stream.context.status,
1705 );
1706 }
1707 incr!(names::h2::CLOSE_WITH_ACTIVE_STREAMS);
1708 }
1709
1710 // Distribute H2 connection-level overhead (control frames) across in-flight
1711 // streams so that access log bytes_in/bytes_out reflect actual wire cost.
1712 // Integer division may lose up to (active_count - 1) bytes, which is acceptable.
1713 let active_count = active_count.max(1);
1714 let (total_overhead_in, total_overhead_out) = self.frontend.overhead_bytes();
1715 let share_in = total_overhead_in / active_count;
1716 let share_out = total_overhead_out / active_count;
1717
1718 // Generate access logs for in-flight streams on session teardown.
1719 // Skip streams that already had their access log emitted (metrics.start is
1720 // set to None by metrics.reset() after generate_access_log in the happy path).
1721 // Frontend RTT is the same for every stream on this session — snapshot
1722 // it once outside the loop instead of paying one TCP_INFO syscall per
1723 // open stream.
1724 let client_rtt = socket_rtt(self.frontend.socket());
1725 for stream in &mut self.context.streams {
1726 if stream.state.is_open() && stream.metrics.start.is_some() {
1727 stream.metrics.bin += share_in;
1728 stream.metrics.bout += share_out;
1729 stream.metrics.service_stop();
1730 if stream.metrics.backend_stop.is_none() {
1731 stream.metrics.backend_stop();
1732 }
1733 // Only mark as error if the stream had an actual protocol/processing failure
1734 // (kawa parse error, backend error). Normal timeouts, client disconnects,
1735 // and graceful connection closures are not errors.
1736 let is_error = stream.front.is_error() || stream.back.is_error();
1737 let server_rtt = stream.linked_token().and_then(|token| {
1738 self.router
1739 .backends
1740 .get(&token)
1741 .and_then(|c| socket_rtt(c.socket()))
1742 });
1743 stream.generate_access_log(
1744 is_error,
1745 Some("session close"),
1746 self.context.listener.clone(),
1747 client_rtt,
1748 server_rtt,
1749 );
1750 stream.state = StreamState::Recycle;
1751 }
1752 }
1753
1754 self.frontend
1755 .close(&mut self.context, EndpointClient(&mut self.router));
1756
1757 for (token, client) in &mut self.router.backends {
1758 let proxy_borrow = proxy.borrow();
1759 client.timeout_container().cancel();
1760 let socket = client.socket_mut();
1761 if let Err(e) = proxy_borrow.deregister_socket(socket) {
1762 error!(
1763 "{} error deregistering back socket({:?}): {:?}",
1764 log_context_lite!(self),
1765 socket,
1766 e
1767 );
1768 }
1769 // invariant: write-only shutdown — Shutdown::Both on a TLS frontend
1770 // discards the receive buffer and elicits TCP RST, truncating the
1771 // already-queued response. Canonical write-up: `lib/src/https.rs:650-655`.
1772 // Backend sockets follow the same discipline for symmetry.
1773 if let Err(e) = socket.shutdown(Shutdown::Write) {
1774 if e.kind() != ErrorKind::NotConnected {
1775 error!(
1776 "{} error shutting down back socket({:?}): {:?}",
1777 log_context_lite!(self),
1778 socket,
1779 e
1780 );
1781 }
1782 }
1783 if !proxy_borrow.remove_session(*token) {
1784 error!(
1785 "{} session {:?} was already removed!",
1786 log_context_lite!(self),
1787 token
1788 );
1789 }
1790
1791 match client.position() {
1792 Position::Client(cluster_id, backend, _) => {
1793 let mut backend_borrow = backend.borrow_mut();
1794 backend_borrow.dec_connections();
1795 gauge_add!(names::backend::CONNECTIONS, -1);
1796 // Second `-1` site for `backend.pool.size` (the first is
1797 // in `connection.rs::pre_close_client_bookkeeping`). This
1798 // path runs during session teardown when the frontend
1799 // session iterates the backends map directly without
1800 // routing through `Connection::close`. Both `-1` sites
1801 // mirror the single `+1` in router.rs::connect and the
1802 // matching `backend.connections, -1` calls already
1803 // present here, so symmetry follows from
1804 // `backend.connections` correctness.
1805 gauge_add!(names::backend::POOL_SIZE, -1);
1806 gauge_add!(
1807 names::backend::CONNECTIONS_PER_BACKEND,
1808 -1,
1809 Some(cluster_id),
1810 Some(&backend_borrow.backend_id)
1811 );
1812 let count = self
1813 .context
1814 .backend_streams
1815 .get(token)
1816 .map_or(0, |ids| ids.len());
1817 backend_borrow.active_requests =
1818 backend_borrow.active_requests.saturating_sub(count);
1819 trace!(
1820 "{} connection (session) closed: {:#?}",
1821 log_context_lite!(self),
1822 backend_borrow
1823 );
1824 }
1825 Position::Server => {
1826 error!(
1827 "{} close_backend called on Server position",
1828 log_context_lite!(self)
1829 );
1830 }
1831 }
1832 }
1833 // Clear the reverse index after all backends have decremented their
1834 // active_requests counters (which depend on the index for stream counts).
1835 self.context.backend_streams.clear();
1836 }
1837
1838 fn shutting_down(&mut self) -> SessionIsToBeClosed {
1839 // RFC 9113 §6.8: initiate graceful shutdown with double-GOAWAY pattern.
1840 // Only send the initial GOAWAY once. The final GOAWAY (with the real
1841 // last_stream_id) is handled by finalize_write() when all streams drain.
1842 // Calling graceful_goaway() again would send the final GOAWAY
1843 // prematurely and force-disconnect before in-flight streams complete.
1844 if !self.frontend.is_draining() {
1845 match self.frontend.graceful_goaway() {
1846 MuxResult::CloseSession => return true,
1847 MuxResult::Continue => {
1848 // graceful_goaway() queued a GOAWAY frame. Flush it directly
1849 // since the event loop uses edge-triggered epoll and won't
1850 // deliver a new WRITABLE event for an already-writable socket.
1851 self.frontend.flush_zero_buffer();
1852 }
1853 _ => {}
1854 }
1855 } else {
1856 trace!(
1857 "{} shutting_down: already draining, skipping duplicate GOAWAY",
1858 log_context!(self)
1859 );
1860 // shut_down_sessions() runs outside ready(), so retry flushing any
1861 // previously-buffered GOAWAY/TLS records on each pass.
1862 self.frontend.flush_zero_buffer();
1863 }
1864 if self.drive_frontend_shutdown_io() {
1865 return true;
1866 }
1867 // Forced-close deadline: once the H2 listener's
1868 // `h2_graceful_shutdown_deadline_seconds` budget has elapsed from
1869 // the moment `graceful_goaway` armed `drain.started_at`, stop
1870 // waiting for streams and tear the session down. `drive_frontend_
1871 // shutdown_io` above already had a chance to flush any pending
1872 // TLS/GOAWAY records; this branch accepts that some bytes may be
1873 // lost in exchange for honoring the operator-configured SLA.
1874 // Listeners that disable the knob (`= 0` → `None`) short-circuit
1875 // the check inside `graceful_shutdown_deadline_elapsed`.
1876 if self.frontend.graceful_shutdown_deadline_elapsed() {
1877 debug!(
1878 "{} Mux shutting_down: graceful-shutdown deadline elapsed, forcing close",
1879 log_context!(self)
1880 );
1881 return true;
1882 }
1883 if matches!(self.frontend, Connection::H2(_)) && self.frontend.is_draining() {
1884 for stream in &mut self.context.streams {
1885 if stream.front_received_end_of_stream {
1886 continue;
1887 }
1888 if !matches!(stream.state, StreamState::Linked(_) | StreamState::Unlinked) {
1889 continue;
1890 }
1891 if stream.front.consumed
1892 && stream.front.storage.is_empty()
1893 && stream.front.is_completed()
1894 {
1895 stream.front_received_end_of_stream = true;
1896 self.frontend
1897 .readiness_mut()
1898 .interest
1899 .insert(Ready::WRITABLE);
1900 self.frontend.readiness_mut().signal_pending_write();
1901 }
1902 }
1903 }
1904 let mut can_stop = true;
1905 for stream in &mut self.context.streams {
1906 match stream.state {
1907 StreamState::Linked(_) => {
1908 can_stop = false;
1909 }
1910 StreamState::Unlinked => {
1911 kawa::debug_kawa(&stream.front);
1912 kawa::debug_kawa(&stream.back);
1913 if stream.is_quiesced() {
1914 continue;
1915 }
1916 stream.context.closing = true;
1917 can_stop = false;
1918 }
1919 _ => {}
1920 }
1921 }
1922 if self.frontend.has_pending_write() {
1923 return false;
1924 }
1925 if can_stop {
1926 let active_h2_streams = self
1927 .context
1928 .streams
1929 .iter()
1930 .enumerate()
1931 .filter(|(_, s)| {
1932 if s.state == StreamState::Recycle {
1933 return false;
1934 }
1935 if s.state == StreamState::Unlinked && s.is_quiesced() {
1936 return false;
1937 }
1938 true
1939 })
1940 .collect::<Vec<_>>();
1941 if matches!(self.frontend, Connection::H2(_)) && !active_h2_streams.is_empty() {
1942 debug!(
1943 "{} Mux shutting_down returning true with active H2 streams: {:?}",
1944 log_context!(self),
1945 self.frontend
1946 );
1947 for (idx, stream) in active_h2_streams {
1948 debug!(
1949 "{} shutdown stream[{}]: state={:?}, front_phase={:?}, back_phase={:?}, front_completed={}, back_completed={}",
1950 log_context!(self),
1951 idx,
1952 stream.state,
1953 stream.front.parsing_phase,
1954 stream.back.parsing_phase,
1955 stream.front.is_completed(),
1956 stream.back.is_completed()
1957 );
1958 }
1959 }
1960 }
1961 if can_stop {
1962 return true;
1963 }
1964
1965 false
1966 }
1967}
1968
1969#[cfg(test)]
1970mod tests {
1971 use super::*;
1972
1973 #[test]
1974 fn update_readiness_after_read_closed_keeps_writable() {
1975 let mut readiness = Readiness {
1976 event: Ready::READABLE | Ready::WRITABLE | Ready::HUP,
1977 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP,
1978 };
1979
1980 let should_yield = update_readiness_after_read(17, SocketResult::Closed, &mut readiness);
1981
1982 assert!(!should_yield);
1983 assert!(!readiness.event.is_readable());
1984 assert!(readiness.event.is_writable());
1985 assert!(readiness.event.is_hup());
1986 }
1987}