Skip to main content

sozu_lib/protocol/mux/
stream.rs

1//! Per-request stream state shared by the H1 and H2 mux paths.
2//!
3//! A [`Stream`] owns the front/back kawa buffers, HTTP context, and metrics
4//! for a single request/response pair. [`StreamParts`] splits it along the
5//! read/write axis so callers can borrow both sides of the pipe at the same
6//! time without fighting the borrow checker.
7
8use std::{
9    cell::RefCell,
10    fmt::Debug,
11    rc::{Rc, Weak},
12    time::Duration,
13};
14
15use mio::Token;
16use sozu_command::logging::ansi_palette;
17
18use super::{GenericHttpStream, Position};
19use crate::metrics::names;
20use crate::{
21    L7ListenerHandler, ListenerHandler, Protocol, SessionMetrics, pool::Pool,
22    protocol::http::editor::HttpContext,
23};
24
25/// Module-level prefix used on every log line emitted from the stream module.
26/// Streams have no direct peer reference so a single `MUX-STREAM` label is
27/// used, colored bold bright-white (uniform across every protocol) when the
28/// logger supports ANSI.
29macro_rules! log_module_context {
30    () => {{
31        let (open, reset, _, _, _) = ansi_palette();
32        format!("{open}MUX-STREAM{reset}\t >>>", open = open, reset = reset)
33    }};
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum StreamState {
38    Idle,
39    /// the Stream is asking for connection, this will trigger a call to connect
40    Link,
41    /// the Stream is linked to a Client (note that the client might not be connected)
42    Linked(Token),
43    /// the Stream was linked to a Client, but the connection closed, the client was removed
44    /// and this Stream could not be retried (it should be terminated)
45    Unlinked,
46    /// the Stream is unlinked and can be reused
47    Recycle,
48}
49
50impl StreamState {
51    pub fn is_open(&self) -> bool {
52        !matches!(self, StreamState::Idle | StreamState::Recycle)
53    }
54}
55
56pub struct Stream {
57    pub window: i32,
58    pub attempts: u8,
59    pub state: StreamState,
60    /// True when the frontend connection has received end_of_stream from the client.
61    pub front_received_end_of_stream: bool,
62    /// True when the backend connection has received end_of_stream from the backend server.
63    pub back_received_end_of_stream: bool,
64    /// Tracks total DATA payload bytes received on the frontend for content-length validation (RFC 9113 §8.1.1)
65    pub front_data_received: usize,
66    /// Tracks total DATA payload bytes received on the backend for content-length validation (RFC 9113 §8.1.1)
67    pub back_data_received: usize,
68    /// True when `gauge_add!(names::http::ACTIVE_REQUESTS, 1)` was emitted for this stream.
69    /// Prevents underflow when `generate_access_log` is called for streams that never
70    /// had their request fully parsed (idle timeouts, malformed requests).
71    pub request_counted: bool,
72    pub front: GenericHttpStream,
73    pub back: GenericHttpStream,
74    pub context: HttpContext,
75    pub metrics: SessionMetrics,
76}
77
78struct KawaSummary<'a>(&'a GenericHttpStream);
79impl Debug for KawaSummary<'_> {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct("Kawa")
82            .field("kind", &self.0.kind)
83            .field("parsing_phase", &self.0.parsing_phase)
84            .field("body_size", &self.0.body_size)
85            .field("consumed", &self.0.consumed)
86            .field("expects", &self.0.expects)
87            .field("blocks", &self.0.blocks.len())
88            .field("out", &self.0.out.len())
89            .field("storage_start", &self.0.storage.start)
90            .field("storage_head", &self.0.storage.head)
91            .field("storage_end", &self.0.storage.end)
92            .finish()
93    }
94}
95impl Debug for Stream {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.debug_struct("Stream")
98            .field("window", &self.window)
99            .field("attempts", &self.attempts)
100            .field("state", &self.state)
101            .field(
102                "front_received_end_of_stream",
103                &self.front_received_end_of_stream,
104            )
105            .field(
106                "back_received_end_of_stream",
107                &self.back_received_end_of_stream,
108            )
109            .field("front_data_received", &self.front_data_received)
110            .field("back_data_received", &self.back_data_received)
111            .field("request_counted", &self.request_counted)
112            .field("front", &KawaSummary(&self.front))
113            .field("back", &KawaSummary(&self.back))
114            .field("context", &self.context)
115            .field("metrics", &self.metrics)
116            .finish()
117    }
118}
119
120/// This struct allows to mutably borrow the read and write buffers (dependant on the position)
121/// as well as the context and metrics of a Stream at the same time
122pub struct StreamParts<'a> {
123    pub window: &'a mut i32,
124    pub rbuffer: &'a mut GenericHttpStream,
125    pub wbuffer: &'a mut GenericHttpStream,
126    /// Tracks whether end_of_stream has been received on the read side of this connection.
127    pub received_end_of_stream: &'a mut bool,
128    /// Tracks total DATA payload bytes received on the read side (for content-length validation).
129    pub data_received: &'a mut usize,
130    pub context: &'a mut HttpContext,
131    pub metrics: &'a mut SessionMetrics,
132}
133
134impl Stream {
135    pub fn new(pool: Weak<RefCell<Pool>>, context: HttpContext, window: u32) -> Option<Self> {
136        let (front_buffer, back_buffer) = match pool.upgrade() {
137            Some(pool) => {
138                let mut pool = pool.borrow_mut();
139                match (pool.checkout(), pool.checkout()) {
140                    (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
141                    _ => return None,
142                }
143            }
144            None => return None,
145        };
146        Some(Self {
147            state: StreamState::Idle,
148            attempts: 0,
149            window: i32::try_from(window).unwrap_or(i32::MAX),
150            front_received_end_of_stream: false,
151            back_received_end_of_stream: false,
152            front_data_received: 0,
153            back_data_received: 0,
154            request_counted: false,
155            front: GenericHttpStream::new(kawa::Kind::Request, kawa::Buffer::new(front_buffer)),
156            back: GenericHttpStream::new(kawa::Kind::Response, kawa::Buffer::new(back_buffer)),
157            context,
158            metrics: SessionMetrics::new(None),
159        })
160    }
161    /// Convenience accessor for the backend token when the stream is `Linked`.
162    /// Used by access-log emission sites to look up the backend socket on the
163    /// owning `Endpoint`/`Router` without re-pattern-matching `state` inline.
164    pub fn linked_token(&self) -> Option<Token> {
165        match self.state {
166            StreamState::Linked(token) => Some(token),
167            _ => None,
168        }
169    }
170
171    /// Returns true when both front and back kawa buffers are in a terminal
172    /// or initial state with no pending data. Used during shutdown to skip
173    /// streams that have already completed their work.
174    pub fn is_quiesced(&self) -> bool {
175        let front_done =
176            (self.front.is_initial() || self.front.is_completed() || self.front.is_terminated())
177                && self.front.storage.is_empty();
178        let back_done =
179            (self.back.is_initial() || self.back.is_completed() || self.back.is_terminated())
180                && self.back.storage.is_empty();
181        front_done && back_done
182    }
183
184    pub fn split(&mut self, position: &Position) -> StreamParts<'_> {
185        match position {
186            Position::Client(..) => StreamParts {
187                window: &mut self.window,
188                rbuffer: &mut self.back,
189                wbuffer: &mut self.front,
190                received_end_of_stream: &mut self.back_received_end_of_stream,
191                data_received: &mut self.back_data_received,
192                context: &mut self.context,
193                metrics: &mut self.metrics,
194            },
195            Position::Server => StreamParts {
196                window: &mut self.window,
197                rbuffer: &mut self.front,
198                wbuffer: &mut self.back,
199                received_end_of_stream: &mut self.front_received_end_of_stream,
200                data_received: &mut self.front_data_received,
201                context: &mut self.context,
202                metrics: &mut self.metrics,
203            },
204        }
205    }
206    /// Emit the access log for this stream.
207    ///
208    /// `client_rtt`/`server_rtt` are passed in by the caller because the
209    /// `Stream` does not own a socket reference — the frontend socket lives
210    /// on the parent `Mux`/connection and the backend socket lives on
211    /// `Router.backends.get(token)`. Each caller snapshots the two
212    /// `getsockopt(TCP_INFO)` values from the sockets it can reach, mirroring
213    /// the inline pattern used by the `kawa_h1`, `pipe`, and TCP-frontend
214    /// access-log sites.
215    pub fn generate_access_log<L>(
216        &mut self,
217        error: bool,
218        message: Option<&str>,
219        listener: Rc<RefCell<L>>,
220        client_rtt: Option<Duration>,
221        server_rtt: Option<Duration>,
222    ) where
223        L: ListenerHandler + L7ListenerHandler,
224    {
225        let context = &self.context;
226        // Fall back to the per-stream timeout discriminator
227        // (`access_log_message`) when the caller did not supply an explicit
228        // `message`. The discriminator is set by `MuxState::timeout` before
229        // `set_default_answer` / `forcefully_terminate_answer` so the
230        // access log can distinguish a timeout-driven 408/504 from a
231        // backend-error 504. Caller-supplied `message` (e.g. parsing
232        // errors) takes precedence when both are present.
233        let message = message.or(context.access_log_message);
234        if self.request_counted {
235            gauge_add!(names::http::ACTIVE_REQUESTS, -1);
236            self.request_counted = false;
237        }
238        if error {
239            // Labelled with `(cluster_id, backend_id)`; see the matching
240            // emission in `kawa_h1::log_request_error` for the cardinality
241            // contract (`metrics::filter_labels_for_detail`).
242            incr!(
243                "http.errors",
244                context.cluster_id.as_deref(),
245                context.backend_id.as_deref()
246            );
247        }
248        let protocol = match context.protocol {
249            Protocol::HTTP => "http",
250            Protocol::HTTPS => "https",
251            other => {
252                error!(
253                    "{} mux streams only handle HTTP or HTTPS protocols, got {:?}",
254                    log_module_context!(),
255                    other
256                );
257                "unknown"
258            }
259        };
260
261        // Save the HTTP status code of the backend response. Emits the bucket
262        // counter unconditionally, plus the per-code counter from
263        // `crate::metrics::http_status_code_metric_name` when the status is on
264        // the short-list shared with the H1 path (`save_http_status_metric`).
265        let bucket_key = if let Some(status) = context.status {
266            match status {
267                100..=199 => names::http::STATUS_1XX,
268                200..=299 => names::http::STATUS_2XX,
269                300..=399 => names::http::STATUS_3XX,
270                400..=499 => names::http::STATUS_4XX,
271                500..=599 => names::http::STATUS_5XX,
272                _ => names::http::STATUS_OTHER,
273            }
274        } else {
275            "http.status.none"
276        };
277        incr!(
278            bucket_key,
279            context.cluster_id.as_deref(),
280            context.backend_id.as_deref()
281        );
282
283        if let Some(status) = context.status {
284            if let Some(per_code) = crate::metrics::http_status_code_metric_name(status) {
285                incr!(
286                    per_code,
287                    context.cluster_id.as_deref(),
288                    context.backend_id.as_deref()
289                );
290            }
291        }
292
293        let endpoint = sozu_command::logging::EndpointRecord::Http {
294            method: context.method.as_deref(),
295            authority: context.authority.as_deref(),
296            path: context.path.as_deref(),
297            reason: context.reason.as_deref(),
298            status: context.status,
299        };
300
301        let listener = listener.borrow();
302        let tags = context.authority.as_deref().and_then(|host| {
303            let hostname = match host.split_once(':') {
304                None => host,
305                Some((hostname, _)) => hostname,
306            };
307            listener.get_tags(hostname)
308        });
309
310        log_access! {
311            error,
312            on_failure: { incr!(names::access_logs::UNSENT) },
313            message,
314            context: context.log_context(),
315            session_address: context.session_address,
316            backend_address: context.backend_address,
317            protocol,
318            endpoint,
319            tags,
320            client_rtt,
321            server_rtt,
322            service_time: self.metrics.service_time(),
323            response_time: self.metrics.backend_response_time(),
324            request_time: self.metrics.request_time(),
325            bytes_in: self.metrics.bin,
326            bytes_out: self.metrics.bout,
327            user_agent: context.user_agent.as_deref(),
328            x_request_id: context.x_request_id.as_deref(),
329            tls_version: context.tls_version,
330            tls_cipher: context.tls_cipher,
331            tls_sni: context.tls_server_name.as_deref(),
332            tls_alpn: context.tls_alpn,
333            xff_chain: context.xff_chain.as_deref(),
334            #[cfg(feature = "opentelemetry")]
335            otel: context.otel.as_ref(),
336            #[cfg(not(feature = "opentelemetry"))]
337            otel: None,
338        };
339        self.metrics.register_end_of_session(&context.log_context());
340    }
341}