1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
//! Per-request stream state shared by the H1 and H2 mux paths.
//!
//! A [`Stream`] owns the front/back kawa buffers, HTTP context, and metrics
//! for a single request/response pair. [`StreamParts`] splits it along the
//! read/write axis so callers can borrow both sides of the pipe at the same
//! time without fighting the borrow checker.
use std::{
cell::RefCell,
fmt::Debug,
rc::{Rc, Weak},
time::Duration,
};
use mio::Token;
use sozu_command::logging::ansi_palette;
use super::{GenericHttpStream, Position};
use crate::metrics::names;
use crate::{
L7ListenerHandler, ListenerHandler, Protocol, SessionMetrics, pool::Pool,
protocol::http::editor::HttpContext,
};
/// Module-level prefix used on every log line emitted from the stream module.
/// Streams have no direct peer reference so a single `MUX-STREAM` label is
/// used, colored bold bright-white (uniform across every protocol) when the
/// logger supports ANSI.
macro_rules! log_module_context {
() => {{
let (open, reset, _, _, _) = ansi_palette();
format!("{open}MUX-STREAM{reset}\t >>>", open = open, reset = reset)
}};
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamState {
Idle,
/// the Stream is asking for connection, this will trigger a call to connect
Link,
/// the Stream is linked to a Client (note that the client might not be connected)
Linked(Token),
/// the Stream was linked to a Client, but the connection closed, the client was removed
/// and this Stream could not be retried (it should be terminated)
Unlinked,
/// the Stream is unlinked and can be reused
Recycle,
}
impl StreamState {
pub fn is_open(&self) -> bool {
!matches!(self, StreamState::Idle | StreamState::Recycle)
}
}
pub struct Stream {
pub window: i32,
pub attempts: u8,
pub state: StreamState,
/// True when the frontend connection has received end_of_stream from the client.
pub front_received_end_of_stream: bool,
/// True when the backend connection has received end_of_stream from the backend server.
pub back_received_end_of_stream: bool,
/// Tracks total DATA payload bytes received on the frontend for content-length validation (RFC 9113 §8.1.1)
pub front_data_received: usize,
/// Tracks total DATA payload bytes received on the backend for content-length validation (RFC 9113 §8.1.1)
pub back_data_received: usize,
/// True when `gauge_add!(names::http::ACTIVE_REQUESTS, 1)` was emitted for this stream.
/// Prevents underflow when `generate_access_log` is called for streams that never
/// had their request fully parsed (idle timeouts, malformed requests).
pub request_counted: bool,
pub front: GenericHttpStream,
pub back: GenericHttpStream,
pub context: HttpContext,
pub metrics: SessionMetrics,
}
struct KawaSummary<'a>(&'a GenericHttpStream);
impl Debug for KawaSummary<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Kawa")
.field("kind", &self.0.kind)
.field("parsing_phase", &self.0.parsing_phase)
.field("body_size", &self.0.body_size)
.field("consumed", &self.0.consumed)
.field("expects", &self.0.expects)
.field("blocks", &self.0.blocks.len())
.field("out", &self.0.out.len())
.field("storage_start", &self.0.storage.start)
.field("storage_head", &self.0.storage.head)
.field("storage_end", &self.0.storage.end)
.finish()
}
}
impl Debug for Stream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Stream")
.field("window", &self.window)
.field("attempts", &self.attempts)
.field("state", &self.state)
.field(
"front_received_end_of_stream",
&self.front_received_end_of_stream,
)
.field(
"back_received_end_of_stream",
&self.back_received_end_of_stream,
)
.field("front_data_received", &self.front_data_received)
.field("back_data_received", &self.back_data_received)
.field("request_counted", &self.request_counted)
.field("front", &KawaSummary(&self.front))
.field("back", &KawaSummary(&self.back))
.field("context", &self.context)
.field("metrics", &self.metrics)
.finish()
}
}
/// This struct allows to mutably borrow the read and write buffers (dependant on the position)
/// as well as the context and metrics of a Stream at the same time
pub struct StreamParts<'a> {
pub window: &'a mut i32,
pub rbuffer: &'a mut GenericHttpStream,
pub wbuffer: &'a mut GenericHttpStream,
/// Tracks whether end_of_stream has been received on the read side of this connection.
pub received_end_of_stream: &'a mut bool,
/// Tracks total DATA payload bytes received on the read side (for content-length validation).
pub data_received: &'a mut usize,
pub context: &'a mut HttpContext,
pub metrics: &'a mut SessionMetrics,
}
impl Stream {
pub fn new(pool: Weak<RefCell<Pool>>, context: HttpContext, window: u32) -> Option<Self> {
let (front_buffer, back_buffer) = match pool.upgrade() {
Some(pool) => {
let mut pool = pool.borrow_mut();
match (pool.checkout(), pool.checkout()) {
(Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
_ => return None,
}
}
None => return None,
};
Some(Self {
state: StreamState::Idle,
attempts: 0,
window: i32::try_from(window).unwrap_or(i32::MAX),
front_received_end_of_stream: false,
back_received_end_of_stream: false,
front_data_received: 0,
back_data_received: 0,
request_counted: false,
front: GenericHttpStream::new(kawa::Kind::Request, kawa::Buffer::new(front_buffer)),
back: GenericHttpStream::new(kawa::Kind::Response, kawa::Buffer::new(back_buffer)),
context,
metrics: SessionMetrics::new(None),
})
}
/// Convenience accessor for the backend token when the stream is `Linked`.
/// Used by access-log emission sites to look up the backend socket on the
/// owning `Endpoint`/`Router` without re-pattern-matching `state` inline.
pub fn linked_token(&self) -> Option<Token> {
match self.state {
StreamState::Linked(token) => Some(token),
_ => None,
}
}
/// Returns true when both front and back kawa buffers are in a terminal
/// or initial state with no pending data. Used during shutdown to skip
/// streams that have already completed their work.
pub fn is_quiesced(&self) -> bool {
let front_done =
(self.front.is_initial() || self.front.is_completed() || self.front.is_terminated())
&& self.front.storage.is_empty();
let back_done =
(self.back.is_initial() || self.back.is_completed() || self.back.is_terminated())
&& self.back.storage.is_empty();
front_done && back_done
}
pub fn split(&mut self, position: &Position) -> StreamParts<'_> {
match position {
Position::Client(..) => StreamParts {
window: &mut self.window,
rbuffer: &mut self.back,
wbuffer: &mut self.front,
received_end_of_stream: &mut self.back_received_end_of_stream,
data_received: &mut self.back_data_received,
context: &mut self.context,
metrics: &mut self.metrics,
},
Position::Server => StreamParts {
window: &mut self.window,
rbuffer: &mut self.front,
wbuffer: &mut self.back,
received_end_of_stream: &mut self.front_received_end_of_stream,
data_received: &mut self.front_data_received,
context: &mut self.context,
metrics: &mut self.metrics,
},
}
}
/// Emit the access log for this stream.
///
/// `client_rtt`/`server_rtt` are passed in by the caller because the
/// `Stream` does not own a socket reference — the frontend socket lives
/// on the parent `Mux`/connection and the backend socket lives on
/// `Router.backends.get(token)`. Each caller snapshots the two
/// `getsockopt(TCP_INFO)` values from the sockets it can reach, mirroring
/// the inline pattern used by the `kawa_h1`, `pipe`, and TCP-frontend
/// access-log sites.
pub fn generate_access_log<L>(
&mut self,
error: bool,
message: Option<&str>,
listener: Rc<RefCell<L>>,
client_rtt: Option<Duration>,
server_rtt: Option<Duration>,
) where
L: ListenerHandler + L7ListenerHandler,
{
let context = &self.context;
// Fall back to the per-stream timeout discriminator
// (`access_log_message`) when the caller did not supply an explicit
// `message`. The discriminator is set by `MuxState::timeout` before
// `set_default_answer` / `forcefully_terminate_answer` so the
// access log can distinguish a timeout-driven 408/504 from a
// backend-error 504. Caller-supplied `message` (e.g. parsing
// errors) takes precedence when both are present.
let message = message.or(context.access_log_message);
if self.request_counted {
gauge_add!(names::http::ACTIVE_REQUESTS, -1);
self.request_counted = false;
}
if error {
// Labelled with `(cluster_id, backend_id)`; see the matching
// emission in `kawa_h1::log_request_error` for the cardinality
// contract (`metrics::filter_labels_for_detail`).
incr!(
"http.errors",
context.cluster_id.as_deref(),
context.backend_id.as_deref()
);
}
let protocol = match context.protocol {
Protocol::HTTP => "http",
Protocol::HTTPS => "https",
other => {
error!(
"{} mux streams only handle HTTP or HTTPS protocols, got {:?}",
log_module_context!(),
other
);
"unknown"
}
};
// Save the HTTP status code of the backend response. Emits the bucket
// counter unconditionally, plus the per-code counter from
// `crate::metrics::http_status_code_metric_name` when the status is on
// the short-list shared with the H1 path (`save_http_status_metric`).
let bucket_key = if let Some(status) = context.status {
match status {
100..=199 => names::http::STATUS_1XX,
200..=299 => names::http::STATUS_2XX,
300..=399 => names::http::STATUS_3XX,
400..=499 => names::http::STATUS_4XX,
500..=599 => names::http::STATUS_5XX,
_ => names::http::STATUS_OTHER,
}
} else {
"http.status.none"
};
incr!(
bucket_key,
context.cluster_id.as_deref(),
context.backend_id.as_deref()
);
if let Some(status) = context.status {
if let Some(per_code) = crate::metrics::http_status_code_metric_name(status) {
incr!(
per_code,
context.cluster_id.as_deref(),
context.backend_id.as_deref()
);
}
}
let endpoint = sozu_command::logging::EndpointRecord::Http {
method: context.method.as_deref(),
authority: context.authority.as_deref(),
path: context.path.as_deref(),
reason: context.reason.as_deref(),
status: context.status,
};
let listener = listener.borrow();
let tags = context.authority.as_deref().and_then(|host| {
let hostname = match host.split_once(':') {
None => host,
Some((hostname, _)) => hostname,
};
listener.get_tags(hostname)
});
log_access! {
error,
on_failure: { incr!(names::access_logs::UNSENT) },
message,
context: context.log_context(),
session_address: context.session_address,
backend_address: context.backend_address,
protocol,
endpoint,
tags,
client_rtt,
server_rtt,
service_time: self.metrics.service_time(),
response_time: self.metrics.backend_response_time(),
request_time: self.metrics.request_time(),
bytes_in: self.metrics.bin,
bytes_out: self.metrics.bout,
user_agent: context.user_agent.as_deref(),
x_request_id: context.x_request_id.as_deref(),
tls_version: context.tls_version,
tls_cipher: context.tls_cipher,
tls_sni: context.tls_server_name.as_deref(),
tls_alpn: context.tls_alpn,
xff_chain: context.xff_chain.as_deref(),
#[cfg(feature = "opentelemetry")]
otel: context.otel.as_ref(),
#[cfg(not(feature = "opentelemetry"))]
otel: None,
};
self.metrics.register_end_of_session(&context.log_context());
}
}