Skip to main content

sozu_lib/protocol/mux/
stream.rs

1//! Per-request stream state shared by the H1 and H2 mux paths.
2//!
3//! A [`Stream`] owns the front/back kawa buffers, HTTP context, and metrics
4//! for a single request/response pair. [`StreamParts`] splits it along the
5//! read/write axis so callers can borrow both sides of the pipe at the same
6//! time without fighting the borrow checker.
7
8use std::{
9    cell::RefCell,
10    fmt::Debug,
11    rc::{Rc, Weak},
12    time::Duration,
13};
14
15use mio::Token;
16use sozu_command::logging::ansi_palette;
17
18use super::{GenericHttpStream, Position};
19use crate::metrics::names;
20use crate::{
21    L7ListenerHandler, ListenerHandler, Protocol, SessionMetrics, pool::Pool,
22    protocol::http::editor::HttpContext,
23};
24
25/// Module-level prefix used on every log line emitted from the stream module.
26/// Streams have no direct peer reference so a single `MUX-STREAM` label is
27/// used, colored bold bright-white (uniform across every protocol) when the
28/// logger supports ANSI.
29macro_rules! log_module_context {
30    () => {{
31        let (open, reset, _, _, _) = ansi_palette();
32        format!("{open}MUX-STREAM{reset}\t >>>", open = open, reset = reset)
33    }};
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum StreamState {
38    Idle,
39    /// the Stream is asking for connection, this will trigger a call to connect
40    Link,
41    /// the Stream is linked to a Client (note that the client might not be connected)
42    Linked(Token),
43    /// the Stream was linked to a Client, but the connection closed, the client was removed
44    /// and this Stream could not be retried (it should be terminated)
45    Unlinked,
46    /// the Stream is unlinked and can be reused
47    Recycle,
48}
49
50impl StreamState {
51    pub fn is_open(&self) -> bool {
52        !matches!(self, StreamState::Idle | StreamState::Recycle)
53    }
54}
55
56pub struct Stream {
57    pub window: i32,
58    pub attempts: u8,
59    pub state: StreamState,
60    /// True when the frontend connection has received end_of_stream from the client.
61    pub front_received_end_of_stream: bool,
62    /// True when the backend connection has received end_of_stream from the backend server.
63    pub back_received_end_of_stream: bool,
64    /// Tracks total DATA payload bytes received on the frontend for content-length validation (RFC 9113 §8.1.1)
65    pub front_data_received: usize,
66    /// Tracks total DATA payload bytes received on the backend for content-length validation (RFC 9113 §8.1.1)
67    pub back_data_received: usize,
68    /// True when `gauge_add!(names::http::ACTIVE_REQUESTS, 1)` was emitted for this stream.
69    /// Prevents underflow when `generate_access_log` is called for streams that never
70    /// had their request fully parsed (idle timeouts, malformed requests).
71    pub request_counted: bool,
72    pub front: GenericHttpStream,
73    pub back: GenericHttpStream,
74    pub context: HttpContext,
75    pub metrics: SessionMetrics,
76}
77
78struct KawaSummary<'a>(&'a GenericHttpStream);
79impl Debug for KawaSummary<'_> {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct("Kawa")
82            .field("kind", &self.0.kind)
83            .field("parsing_phase", &self.0.parsing_phase)
84            .field("body_size", &self.0.body_size)
85            .field("consumed", &self.0.consumed)
86            .field("expects", &self.0.expects)
87            .field("blocks", &self.0.blocks.len())
88            .field("out", &self.0.out.len())
89            .field("storage_start", &self.0.storage.start)
90            .field("storage_head", &self.0.storage.head)
91            .field("storage_end", &self.0.storage.end)
92            .finish()
93    }
94}
95impl Debug for Stream {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.debug_struct("Stream")
98            .field("window", &self.window)
99            .field("attempts", &self.attempts)
100            .field("state", &self.state)
101            .field(
102                "front_received_end_of_stream",
103                &self.front_received_end_of_stream,
104            )
105            .field(
106                "back_received_end_of_stream",
107                &self.back_received_end_of_stream,
108            )
109            .field("front_data_received", &self.front_data_received)
110            .field("back_data_received", &self.back_data_received)
111            .field("request_counted", &self.request_counted)
112            .field("front", &KawaSummary(&self.front))
113            .field("back", &KawaSummary(&self.back))
114            .field("context", &self.context)
115            .field("metrics", &self.metrics)
116            .finish()
117    }
118}
119
120/// This struct allows to mutably borrow the read and write buffers (dependant on the position)
121/// as well as the context and metrics of a Stream at the same time
122pub struct StreamParts<'a> {
123    pub window: &'a mut i32,
124    pub rbuffer: &'a mut GenericHttpStream,
125    pub wbuffer: &'a mut GenericHttpStream,
126    /// Tracks whether end_of_stream has been received on the read side of this connection.
127    pub received_end_of_stream: &'a mut bool,
128    /// Tracks total DATA payload bytes received on the read side (for content-length validation).
129    pub data_received: &'a mut usize,
130    pub context: &'a mut HttpContext,
131    pub metrics: &'a mut SessionMetrics,
132}
133
134impl Stream {
135    pub fn new(pool: Weak<RefCell<Pool>>, context: HttpContext, window: u32) -> Option<Self> {
136        let (front_buffer, back_buffer) = match pool.upgrade() {
137            Some(pool) => {
138                let mut pool = pool.borrow_mut();
139                match (pool.checkout(), pool.checkout()) {
140                    (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
141                    _ => return None,
142                }
143            }
144            None => return None,
145        };
146        let stream = Self {
147            state: StreamState::Idle,
148            attempts: 0,
149            window: i32::try_from(window).unwrap_or(i32::MAX),
150            front_received_end_of_stream: false,
151            back_received_end_of_stream: false,
152            front_data_received: 0,
153            back_data_received: 0,
154            request_counted: false,
155            front: GenericHttpStream::new(kawa::Kind::Request, kawa::Buffer::new(front_buffer)),
156            back: GenericHttpStream::new(kawa::Kind::Response, kawa::Buffer::new(back_buffer)),
157            context,
158            metrics: SessionMetrics::new(None),
159        };
160        // Post: a freshly checked-out stream is a clean, closed slot — no
161        // request has been counted yet (so `generate_access_log` won't
162        // gauge-underflow `http.active_requests`) and no DATA has been seen on
163        // either half (the content-length reconciliation counters start at 0).
164        debug_assert_eq!(stream.state, StreamState::Idle, "new stream must be Idle");
165        debug_assert!(
166            !stream.state.is_open(),
167            "an Idle stream slot must not report as open"
168        );
169        debug_assert!(
170            !stream.request_counted,
171            "new stream must not have a counted request (gauge-underflow guard)"
172        );
173        debug_assert_eq!(
174            (stream.front_data_received, stream.back_data_received),
175            (0, 0),
176            "new stream DATA counters must start at 0"
177        );
178        #[cfg(debug_assertions)]
179        stream.check_invariants();
180        Some(stream)
181    }
182
183    /// Cross-field invariant sweep for the per-request stream state machine.
184    ///
185    /// Encodes the relationships that must hold for ANY `Stream` regardless of
186    /// the mux path (H1 or H2) that drives it:
187    /// - `state.is_open()` agrees with the `Idle`/`Recycle` discriminants
188    ///   (the open/closed split is the load-bearing predicate for shutdown and
189    ///   slot reuse).
190    /// - a `Recycle` slot is fully reset — no counted request can be left
191    ///   pending on a slot advertised as reusable, or `create_stream` would
192    ///   resurrect a stale `http.active_requests` charge.
193    /// - a `Linked` stream names a backend token; the `Linked(token)`
194    ///   discriminant and `linked_token()` must agree (the access-log and
195    ///   reverse-index lookups both depend on this equivalence).
196    ///
197    /// Compiled only with `debug_assertions`; the optimizer drops every call
198    /// in release. Network input never reaches a hard `assert!` here — these
199    /// fire only on our own logic bugs.
200    #[cfg(debug_assertions)]
201    pub(super) fn check_invariants(&self) {
202        debug_assert_eq!(
203            self.state.is_open(),
204            !matches!(self.state, StreamState::Idle | StreamState::Recycle),
205            "is_open() must agree with the Idle/Recycle discriminants"
206        );
207        if self.state == StreamState::Recycle {
208            debug_assert!(
209                !self.request_counted,
210                "a Recycle slot must not carry a counted request (active-requests leak)"
211            );
212        }
213        // `linked_token()` is the canonical accessor for the backend token; it
214        // must return Some iff the slot is `Linked`, since the reverse index
215        // and the access-log RTT lookup both branch on it.
216        debug_assert_eq!(
217            self.linked_token().is_some(),
218            matches!(self.state, StreamState::Linked(_)),
219            "linked_token() must be Some iff the stream is Linked"
220        );
221    }
222    /// Convenience accessor for the backend token when the stream is `Linked`.
223    /// Used by access-log emission sites to look up the backend socket on the
224    /// owning `Endpoint`/`Router` without re-pattern-matching `state` inline.
225    pub fn linked_token(&self) -> Option<Token> {
226        match self.state {
227            StreamState::Linked(token) => Some(token),
228            _ => None,
229        }
230    }
231
232    /// Returns true when both front and back kawa buffers are in a terminal
233    /// or initial state with no pending data. Used during shutdown to skip
234    /// streams that have already completed their work.
235    pub fn is_quiesced(&self) -> bool {
236        let front_done =
237            (self.front.is_initial() || self.front.is_completed() || self.front.is_terminated())
238                && self.front.storage.is_empty();
239        let back_done =
240            (self.back.is_initial() || self.back.is_completed() || self.back.is_terminated())
241                && self.back.storage.is_empty();
242        front_done && back_done
243    }
244
245    pub fn split(&mut self, position: &Position) -> StreamParts<'_> {
246        // Pre: the front buffer always parses requests and the back buffer
247        // always parses responses. `split` only re-labels them as read/write
248        // for the caller's position — it must never swap their kawa kinds.
249        debug_assert_eq!(
250            self.front.kind,
251            kawa::Kind::Request,
252            "front buffer must hold a Request kawa"
253        );
254        debug_assert_eq!(
255            self.back.kind,
256            kawa::Kind::Response,
257            "back buffer must hold a Response kawa"
258        );
259        match position {
260            Position::Client(..) => StreamParts {
261                window: &mut self.window,
262                rbuffer: &mut self.back,
263                wbuffer: &mut self.front,
264                received_end_of_stream: &mut self.back_received_end_of_stream,
265                data_received: &mut self.back_data_received,
266                context: &mut self.context,
267                metrics: &mut self.metrics,
268            },
269            Position::Server => StreamParts {
270                window: &mut self.window,
271                rbuffer: &mut self.front,
272                wbuffer: &mut self.back,
273                received_end_of_stream: &mut self.front_received_end_of_stream,
274                data_received: &mut self.front_data_received,
275                context: &mut self.context,
276                metrics: &mut self.metrics,
277            },
278        }
279    }
280    /// Emit the access log for this stream.
281    ///
282    /// `client_rtt`/`server_rtt` are passed in by the caller because the
283    /// `Stream` does not own a socket reference — the frontend socket lives
284    /// on the parent `Mux`/connection and the backend socket lives on
285    /// `Router.backends.get(token)`. Each caller snapshots the two
286    /// `getsockopt(TCP_INFO)` values from the sockets it can reach, mirroring
287    /// the inline pattern used by the `kawa_h1`, `pipe`, and TCP-frontend
288    /// access-log sites.
289    pub fn generate_access_log<L>(
290        &mut self,
291        error: bool,
292        message: Option<&str>,
293        listener: Rc<RefCell<L>>,
294        client_rtt: Option<Duration>,
295        server_rtt: Option<Duration>,
296    ) where
297        L: ListenerHandler + L7ListenerHandler,
298    {
299        let context = &self.context;
300        // Fall back to the per-stream timeout discriminator
301        // (`access_log_message`) when the caller did not supply an explicit
302        // `message`. The discriminator is set by `MuxState::timeout` before
303        // `set_default_answer` / `forcefully_terminate_answer` so the
304        // access log can distinguish a timeout-driven 408/504 from a
305        // backend-error 504. Caller-supplied `message` (e.g. parsing
306        // errors) takes precedence when both are present.
307        let message = message.or(context.access_log_message);
308        // Pair the `http.active_requests` gauge `-1` with `request_counted`:
309        // it must transition true -> false exactly once so a re-entry (H1
310        // keep-alive, double access-log on the same stream) cannot
311        // double-decrement the gauge into underflow. `request_counted` is set
312        // true at the matching `gauge_add!(.., 1)` in the H1/H2 readable paths.
313        let was_counted = self.request_counted;
314        if self.request_counted {
315            gauge_add!(names::http::ACTIVE_REQUESTS, -1);
316            self.request_counted = false;
317        }
318        debug_assert!(
319            !self.request_counted,
320            "generate_access_log must leave request_counted false (gauge-underflow guard)"
321        );
322        // The flag may only move true->false here (one `-1`); it must never be
323        // observed flipping back on within this call.
324        debug_assert!(
325            was_counted >= self.request_counted,
326            "request_counted must only clear here, never spontaneously set"
327        );
328        if error {
329            // Labelled with `(cluster_id, backend_id)`; see the matching
330            // emission in `kawa_h1::log_request_error` for the cardinality
331            // contract (`metrics::filter_labels_for_detail`).
332            incr!(
333                "http.errors",
334                context.cluster_id.as_deref(),
335                context.backend_id.as_deref()
336            );
337        }
338        let protocol = match context.protocol {
339            Protocol::HTTP => "http",
340            Protocol::HTTPS => "https",
341            other => {
342                error!(
343                    "{} mux streams only handle HTTP or HTTPS protocols, got {:?}",
344                    log_module_context!(),
345                    other
346                );
347                "unknown"
348            }
349        };
350
351        // Save the HTTP status code of the backend response. Emits the bucket
352        // counter unconditionally, plus the per-code counter from
353        // `crate::metrics::http_status_code_metric_name` when the status is on
354        // the short-list shared with the H1 path (`save_http_status_metric`).
355        let bucket_key = if let Some(status) = context.status {
356            match status {
357                100..=199 => names::http::STATUS_1XX,
358                200..=299 => names::http::STATUS_2XX,
359                300..=399 => names::http::STATUS_3XX,
360                400..=499 => names::http::STATUS_4XX,
361                500..=599 => names::http::STATUS_5XX,
362                _ => names::http::STATUS_OTHER,
363            }
364        } else {
365            "http.status.none"
366        };
367        incr!(
368            bucket_key,
369            context.cluster_id.as_deref(),
370            context.backend_id.as_deref()
371        );
372
373        if let Some(status) = context.status {
374            if let Some(per_code) = crate::metrics::http_status_code_metric_name(status) {
375                incr!(
376                    per_code,
377                    context.cluster_id.as_deref(),
378                    context.backend_id.as_deref()
379                );
380            }
381        }
382
383        let endpoint = sozu_command::logging::EndpointRecord::Http {
384            method: context.method.as_deref(),
385            authority: context.authority.as_deref(),
386            path: context.path.as_deref(),
387            reason: context.reason.as_deref(),
388            status: context.status,
389        };
390
391        let listener = listener.borrow();
392        let tags = context.authority.as_deref().and_then(|host| {
393            let hostname = match host.split_once(':') {
394                None => host,
395                Some((hostname, _)) => hostname,
396            };
397            listener.get_tags(hostname)
398        });
399
400        log_access! {
401            error,
402            on_failure: { incr!(names::access_logs::UNSENT) },
403            message,
404            context: context.log_context(),
405            session_address: context.session_address,
406            backend_address: context.backend_address,
407            protocol,
408            endpoint,
409            tags,
410            client_rtt,
411            server_rtt,
412            service_time: self.metrics.service_time(),
413            response_time: self.metrics.backend_response_time(),
414            request_time: self.metrics.request_time(),
415            start_time_ns: self.metrics.start_wall_ns(),
416            bytes_in: self.metrics.bin,
417            bytes_out: self.metrics.bout,
418            user_agent: context.user_agent.as_deref(),
419            x_request_id: context.x_request_id.as_deref(),
420            tls_version: context.tls_version,
421            tls_cipher: context.tls_cipher,
422            tls_sni: context.tls_server_name.as_deref(),
423            tls_alpn: context.tls_alpn,
424            xff_chain: context.xff_chain.as_deref(),
425            #[cfg(feature = "opentelemetry")]
426            otel: context.otel.as_ref(),
427            #[cfg(not(feature = "opentelemetry"))]
428            otel: None,
429        };
430        self.metrics.register_end_of_session(&context.log_context());
431    }
432}