1use std::{
9 cell::RefCell,
10 fmt::Debug,
11 rc::{Rc, Weak},
12 time::Duration,
13};
14
15use mio::Token;
16use sozu_command::logging::ansi_palette;
17
18use super::{GenericHttpStream, Position};
19use crate::metrics::names;
20use crate::{
21 L7ListenerHandler, ListenerHandler, Protocol, SessionMetrics, pool::Pool,
22 protocol::http::editor::HttpContext,
23};
24
25macro_rules! log_module_context {
30 () => {{
31 let (open, reset, _, _, _) = ansi_palette();
32 format!("{open}MUX-STREAM{reset}\t >>>", open = open, reset = reset)
33 }};
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum StreamState {
38 Idle,
39 Link,
41 Linked(Token),
43 Unlinked,
46 Recycle,
48}
49
50impl StreamState {
51 pub fn is_open(&self) -> bool {
52 !matches!(self, StreamState::Idle | StreamState::Recycle)
53 }
54}
55
56pub struct Stream {
57 pub window: i32,
58 pub attempts: u8,
59 pub state: StreamState,
60 pub front_received_end_of_stream: bool,
62 pub back_received_end_of_stream: bool,
64 pub front_data_received: usize,
66 pub back_data_received: usize,
68 pub request_counted: bool,
72 pub front: GenericHttpStream,
73 pub back: GenericHttpStream,
74 pub context: HttpContext,
75 pub metrics: SessionMetrics,
76}
77
78struct KawaSummary<'a>(&'a GenericHttpStream);
79impl Debug for KawaSummary<'_> {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("Kawa")
82 .field("kind", &self.0.kind)
83 .field("parsing_phase", &self.0.parsing_phase)
84 .field("body_size", &self.0.body_size)
85 .field("consumed", &self.0.consumed)
86 .field("expects", &self.0.expects)
87 .field("blocks", &self.0.blocks.len())
88 .field("out", &self.0.out.len())
89 .field("storage_start", &self.0.storage.start)
90 .field("storage_head", &self.0.storage.head)
91 .field("storage_end", &self.0.storage.end)
92 .finish()
93 }
94}
95impl Debug for Stream {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.debug_struct("Stream")
98 .field("window", &self.window)
99 .field("attempts", &self.attempts)
100 .field("state", &self.state)
101 .field(
102 "front_received_end_of_stream",
103 &self.front_received_end_of_stream,
104 )
105 .field(
106 "back_received_end_of_stream",
107 &self.back_received_end_of_stream,
108 )
109 .field("front_data_received", &self.front_data_received)
110 .field("back_data_received", &self.back_data_received)
111 .field("request_counted", &self.request_counted)
112 .field("front", &KawaSummary(&self.front))
113 .field("back", &KawaSummary(&self.back))
114 .field("context", &self.context)
115 .field("metrics", &self.metrics)
116 .finish()
117 }
118}
119
120pub struct StreamParts<'a> {
123 pub window: &'a mut i32,
124 pub rbuffer: &'a mut GenericHttpStream,
125 pub wbuffer: &'a mut GenericHttpStream,
126 pub received_end_of_stream: &'a mut bool,
128 pub data_received: &'a mut usize,
130 pub context: &'a mut HttpContext,
131 pub metrics: &'a mut SessionMetrics,
132}
133
134impl Stream {
135 pub fn new(pool: Weak<RefCell<Pool>>, context: HttpContext, window: u32) -> Option<Self> {
136 let (front_buffer, back_buffer) = match pool.upgrade() {
137 Some(pool) => {
138 let mut pool = pool.borrow_mut();
139 match (pool.checkout(), pool.checkout()) {
140 (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
141 _ => return None,
142 }
143 }
144 None => return None,
145 };
146 Some(Self {
147 state: StreamState::Idle,
148 attempts: 0,
149 window: i32::try_from(window).unwrap_or(i32::MAX),
150 front_received_end_of_stream: false,
151 back_received_end_of_stream: false,
152 front_data_received: 0,
153 back_data_received: 0,
154 request_counted: false,
155 front: GenericHttpStream::new(kawa::Kind::Request, kawa::Buffer::new(front_buffer)),
156 back: GenericHttpStream::new(kawa::Kind::Response, kawa::Buffer::new(back_buffer)),
157 context,
158 metrics: SessionMetrics::new(None),
159 })
160 }
161 pub fn linked_token(&self) -> Option<Token> {
165 match self.state {
166 StreamState::Linked(token) => Some(token),
167 _ => None,
168 }
169 }
170
171 pub fn is_quiesced(&self) -> bool {
175 let front_done =
176 (self.front.is_initial() || self.front.is_completed() || self.front.is_terminated())
177 && self.front.storage.is_empty();
178 let back_done =
179 (self.back.is_initial() || self.back.is_completed() || self.back.is_terminated())
180 && self.back.storage.is_empty();
181 front_done && back_done
182 }
183
184 pub fn split(&mut self, position: &Position) -> StreamParts<'_> {
185 match position {
186 Position::Client(..) => StreamParts {
187 window: &mut self.window,
188 rbuffer: &mut self.back,
189 wbuffer: &mut self.front,
190 received_end_of_stream: &mut self.back_received_end_of_stream,
191 data_received: &mut self.back_data_received,
192 context: &mut self.context,
193 metrics: &mut self.metrics,
194 },
195 Position::Server => StreamParts {
196 window: &mut self.window,
197 rbuffer: &mut self.front,
198 wbuffer: &mut self.back,
199 received_end_of_stream: &mut self.front_received_end_of_stream,
200 data_received: &mut self.front_data_received,
201 context: &mut self.context,
202 metrics: &mut self.metrics,
203 },
204 }
205 }
206 pub fn generate_access_log<L>(
216 &mut self,
217 error: bool,
218 message: Option<&str>,
219 listener: Rc<RefCell<L>>,
220 client_rtt: Option<Duration>,
221 server_rtt: Option<Duration>,
222 ) where
223 L: ListenerHandler + L7ListenerHandler,
224 {
225 let context = &self.context;
226 let message = message.or(context.access_log_message);
234 if self.request_counted {
235 gauge_add!(names::http::ACTIVE_REQUESTS, -1);
236 self.request_counted = false;
237 }
238 if error {
239 incr!(
243 "http.errors",
244 context.cluster_id.as_deref(),
245 context.backend_id.as_deref()
246 );
247 }
248 let protocol = match context.protocol {
249 Protocol::HTTP => "http",
250 Protocol::HTTPS => "https",
251 other => {
252 error!(
253 "{} mux streams only handle HTTP or HTTPS protocols, got {:?}",
254 log_module_context!(),
255 other
256 );
257 "unknown"
258 }
259 };
260
261 let bucket_key = if let Some(status) = context.status {
266 match status {
267 100..=199 => names::http::STATUS_1XX,
268 200..=299 => names::http::STATUS_2XX,
269 300..=399 => names::http::STATUS_3XX,
270 400..=499 => names::http::STATUS_4XX,
271 500..=599 => names::http::STATUS_5XX,
272 _ => names::http::STATUS_OTHER,
273 }
274 } else {
275 "http.status.none"
276 };
277 incr!(
278 bucket_key,
279 context.cluster_id.as_deref(),
280 context.backend_id.as_deref()
281 );
282
283 if let Some(status) = context.status {
284 if let Some(per_code) = crate::metrics::http_status_code_metric_name(status) {
285 incr!(
286 per_code,
287 context.cluster_id.as_deref(),
288 context.backend_id.as_deref()
289 );
290 }
291 }
292
293 let endpoint = sozu_command::logging::EndpointRecord::Http {
294 method: context.method.as_deref(),
295 authority: context.authority.as_deref(),
296 path: context.path.as_deref(),
297 reason: context.reason.as_deref(),
298 status: context.status,
299 };
300
301 let listener = listener.borrow();
302 let tags = context.authority.as_deref().and_then(|host| {
303 let hostname = match host.split_once(':') {
304 None => host,
305 Some((hostname, _)) => hostname,
306 };
307 listener.get_tags(hostname)
308 });
309
310 log_access! {
311 error,
312 on_failure: { incr!(names::access_logs::UNSENT) },
313 message,
314 context: context.log_context(),
315 session_address: context.session_address,
316 backend_address: context.backend_address,
317 protocol,
318 endpoint,
319 tags,
320 client_rtt,
321 server_rtt,
322 service_time: self.metrics.service_time(),
323 response_time: self.metrics.backend_response_time(),
324 request_time: self.metrics.request_time(),
325 bytes_in: self.metrics.bin,
326 bytes_out: self.metrics.bout,
327 user_agent: context.user_agent.as_deref(),
328 x_request_id: context.x_request_id.as_deref(),
329 tls_version: context.tls_version,
330 tls_cipher: context.tls_cipher,
331 tls_sni: context.tls_server_name.as_deref(),
332 tls_alpn: context.tls_alpn,
333 xff_chain: context.xff_chain.as_deref(),
334 #[cfg(feature = "opentelemetry")]
335 otel: context.otel.as_ref(),
336 #[cfg(not(feature = "opentelemetry"))]
337 otel: None,
338 };
339 self.metrics.register_end_of_session(&context.log_context());
340 }
341}