1use std::{
9 cell::RefCell,
10 fmt::Debug,
11 rc::{Rc, Weak},
12 time::Duration,
13};
14
15use mio::Token;
16use sozu_command::logging::ansi_palette;
17
18use super::{GenericHttpStream, Position};
19use crate::metrics::names;
20use crate::{
21 L7ListenerHandler, ListenerHandler, Protocol, SessionMetrics, pool::Pool,
22 protocol::http::editor::HttpContext,
23};
24
25macro_rules! log_module_context {
30 () => {{
31 let (open, reset, _, _, _) = ansi_palette();
32 format!("{open}MUX-STREAM{reset}\t >>>", open = open, reset = reset)
33 }};
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum StreamState {
38 Idle,
39 Link,
41 Linked(Token),
43 Unlinked,
46 Recycle,
48}
49
50impl StreamState {
51 pub fn is_open(&self) -> bool {
52 !matches!(self, StreamState::Idle | StreamState::Recycle)
53 }
54}
55
56pub struct Stream {
57 pub window: i32,
58 pub attempts: u8,
59 pub state: StreamState,
60 pub front_received_end_of_stream: bool,
62 pub back_received_end_of_stream: bool,
64 pub front_data_received: usize,
66 pub back_data_received: usize,
68 pub request_counted: bool,
72 pub front: GenericHttpStream,
73 pub back: GenericHttpStream,
74 pub context: HttpContext,
75 pub metrics: SessionMetrics,
76}
77
78struct KawaSummary<'a>(&'a GenericHttpStream);
79impl Debug for KawaSummary<'_> {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("Kawa")
82 .field("kind", &self.0.kind)
83 .field("parsing_phase", &self.0.parsing_phase)
84 .field("body_size", &self.0.body_size)
85 .field("consumed", &self.0.consumed)
86 .field("expects", &self.0.expects)
87 .field("blocks", &self.0.blocks.len())
88 .field("out", &self.0.out.len())
89 .field("storage_start", &self.0.storage.start)
90 .field("storage_head", &self.0.storage.head)
91 .field("storage_end", &self.0.storage.end)
92 .finish()
93 }
94}
95impl Debug for Stream {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.debug_struct("Stream")
98 .field("window", &self.window)
99 .field("attempts", &self.attempts)
100 .field("state", &self.state)
101 .field(
102 "front_received_end_of_stream",
103 &self.front_received_end_of_stream,
104 )
105 .field(
106 "back_received_end_of_stream",
107 &self.back_received_end_of_stream,
108 )
109 .field("front_data_received", &self.front_data_received)
110 .field("back_data_received", &self.back_data_received)
111 .field("request_counted", &self.request_counted)
112 .field("front", &KawaSummary(&self.front))
113 .field("back", &KawaSummary(&self.back))
114 .field("context", &self.context)
115 .field("metrics", &self.metrics)
116 .finish()
117 }
118}
119
120pub struct StreamParts<'a> {
123 pub window: &'a mut i32,
124 pub rbuffer: &'a mut GenericHttpStream,
125 pub wbuffer: &'a mut GenericHttpStream,
126 pub received_end_of_stream: &'a mut bool,
128 pub data_received: &'a mut usize,
130 pub context: &'a mut HttpContext,
131 pub metrics: &'a mut SessionMetrics,
132}
133
134impl Stream {
135 pub fn new(pool: Weak<RefCell<Pool>>, context: HttpContext, window: u32) -> Option<Self> {
136 let (front_buffer, back_buffer) = match pool.upgrade() {
137 Some(pool) => {
138 let mut pool = pool.borrow_mut();
139 match (pool.checkout(), pool.checkout()) {
140 (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
141 _ => return None,
142 }
143 }
144 None => return None,
145 };
146 let stream = Self {
147 state: StreamState::Idle,
148 attempts: 0,
149 window: i32::try_from(window).unwrap_or(i32::MAX),
150 front_received_end_of_stream: false,
151 back_received_end_of_stream: false,
152 front_data_received: 0,
153 back_data_received: 0,
154 request_counted: false,
155 front: GenericHttpStream::new(kawa::Kind::Request, kawa::Buffer::new(front_buffer)),
156 back: GenericHttpStream::new(kawa::Kind::Response, kawa::Buffer::new(back_buffer)),
157 context,
158 metrics: SessionMetrics::new(None),
159 };
160 debug_assert_eq!(stream.state, StreamState::Idle, "new stream must be Idle");
165 debug_assert!(
166 !stream.state.is_open(),
167 "an Idle stream slot must not report as open"
168 );
169 debug_assert!(
170 !stream.request_counted,
171 "new stream must not have a counted request (gauge-underflow guard)"
172 );
173 debug_assert_eq!(
174 (stream.front_data_received, stream.back_data_received),
175 (0, 0),
176 "new stream DATA counters must start at 0"
177 );
178 #[cfg(debug_assertions)]
179 stream.check_invariants();
180 Some(stream)
181 }
182
183 #[cfg(debug_assertions)]
201 pub(super) fn check_invariants(&self) {
202 debug_assert_eq!(
203 self.state.is_open(),
204 !matches!(self.state, StreamState::Idle | StreamState::Recycle),
205 "is_open() must agree with the Idle/Recycle discriminants"
206 );
207 if self.state == StreamState::Recycle {
208 debug_assert!(
209 !self.request_counted,
210 "a Recycle slot must not carry a counted request (active-requests leak)"
211 );
212 }
213 debug_assert_eq!(
217 self.linked_token().is_some(),
218 matches!(self.state, StreamState::Linked(_)),
219 "linked_token() must be Some iff the stream is Linked"
220 );
221 }
222 pub fn linked_token(&self) -> Option<Token> {
226 match self.state {
227 StreamState::Linked(token) => Some(token),
228 _ => None,
229 }
230 }
231
232 pub fn is_quiesced(&self) -> bool {
236 let front_done =
237 (self.front.is_initial() || self.front.is_completed() || self.front.is_terminated())
238 && self.front.storage.is_empty();
239 let back_done =
240 (self.back.is_initial() || self.back.is_completed() || self.back.is_terminated())
241 && self.back.storage.is_empty();
242 front_done && back_done
243 }
244
245 pub fn split(&mut self, position: &Position) -> StreamParts<'_> {
246 debug_assert_eq!(
250 self.front.kind,
251 kawa::Kind::Request,
252 "front buffer must hold a Request kawa"
253 );
254 debug_assert_eq!(
255 self.back.kind,
256 kawa::Kind::Response,
257 "back buffer must hold a Response kawa"
258 );
259 match position {
260 Position::Client(..) => StreamParts {
261 window: &mut self.window,
262 rbuffer: &mut self.back,
263 wbuffer: &mut self.front,
264 received_end_of_stream: &mut self.back_received_end_of_stream,
265 data_received: &mut self.back_data_received,
266 context: &mut self.context,
267 metrics: &mut self.metrics,
268 },
269 Position::Server => StreamParts {
270 window: &mut self.window,
271 rbuffer: &mut self.front,
272 wbuffer: &mut self.back,
273 received_end_of_stream: &mut self.front_received_end_of_stream,
274 data_received: &mut self.front_data_received,
275 context: &mut self.context,
276 metrics: &mut self.metrics,
277 },
278 }
279 }
280 pub fn generate_access_log<L>(
290 &mut self,
291 error: bool,
292 message: Option<&str>,
293 listener: Rc<RefCell<L>>,
294 client_rtt: Option<Duration>,
295 server_rtt: Option<Duration>,
296 ) where
297 L: ListenerHandler + L7ListenerHandler,
298 {
299 let context = &self.context;
300 let message = message.or(context.access_log_message);
308 let was_counted = self.request_counted;
314 if self.request_counted {
315 gauge_add!(names::http::ACTIVE_REQUESTS, -1);
316 self.request_counted = false;
317 }
318 debug_assert!(
319 !self.request_counted,
320 "generate_access_log must leave request_counted false (gauge-underflow guard)"
321 );
322 debug_assert!(
325 was_counted >= self.request_counted,
326 "request_counted must only clear here, never spontaneously set"
327 );
328 if error {
329 incr!(
333 "http.errors",
334 context.cluster_id.as_deref(),
335 context.backend_id.as_deref()
336 );
337 }
338 let protocol = match context.protocol {
339 Protocol::HTTP => "http",
340 Protocol::HTTPS => "https",
341 other => {
342 error!(
343 "{} mux streams only handle HTTP or HTTPS protocols, got {:?}",
344 log_module_context!(),
345 other
346 );
347 "unknown"
348 }
349 };
350
351 let bucket_key = if let Some(status) = context.status {
356 match status {
357 100..=199 => names::http::STATUS_1XX,
358 200..=299 => names::http::STATUS_2XX,
359 300..=399 => names::http::STATUS_3XX,
360 400..=499 => names::http::STATUS_4XX,
361 500..=599 => names::http::STATUS_5XX,
362 _ => names::http::STATUS_OTHER,
363 }
364 } else {
365 "http.status.none"
366 };
367 incr!(
368 bucket_key,
369 context.cluster_id.as_deref(),
370 context.backend_id.as_deref()
371 );
372
373 if let Some(status) = context.status {
374 if let Some(per_code) = crate::metrics::http_status_code_metric_name(status) {
375 incr!(
376 per_code,
377 context.cluster_id.as_deref(),
378 context.backend_id.as_deref()
379 );
380 }
381 }
382
383 let endpoint = sozu_command::logging::EndpointRecord::Http {
384 method: context.method.as_deref(),
385 authority: context.authority.as_deref(),
386 path: context.path.as_deref(),
387 reason: context.reason.as_deref(),
388 status: context.status,
389 };
390
391 let listener = listener.borrow();
392 let tags = context.authority.as_deref().and_then(|host| {
393 let hostname = match host.split_once(':') {
394 None => host,
395 Some((hostname, _)) => hostname,
396 };
397 listener.get_tags(hostname)
398 });
399
400 log_access! {
401 error,
402 on_failure: { incr!(names::access_logs::UNSENT) },
403 message,
404 context: context.log_context(),
405 session_address: context.session_address,
406 backend_address: context.backend_address,
407 protocol,
408 endpoint,
409 tags,
410 client_rtt,
411 server_rtt,
412 service_time: self.metrics.service_time(),
413 response_time: self.metrics.backend_response_time(),
414 request_time: self.metrics.request_time(),
415 start_time_ns: self.metrics.start_wall_ns(),
416 bytes_in: self.metrics.bin,
417 bytes_out: self.metrics.bout,
418 user_agent: context.user_agent.as_deref(),
419 x_request_id: context.x_request_id.as_deref(),
420 tls_version: context.tls_version,
421 tls_cipher: context.tls_cipher,
422 tls_sni: context.tls_server_name.as_deref(),
423 tls_alpn: context.tls_alpn,
424 xff_chain: context.xff_chain.as_deref(),
425 #[cfg(feature = "opentelemetry")]
426 otel: context.otel.as_ref(),
427 #[cfg(not(feature = "opentelemetry"))]
428 otel: None,
429 };
430 self.metrics.register_end_of_session(&context.log_context());
431 }
432}