sozu_lib/protocol/kawa_h1/editor.rs
1//! H1 request/response header editor.
2//!
3//! Captures method/authority/path on parse, rewrites hop-by-hop and
4//! forwarding headers (`X-Forwarded-*`, `Forwarded`, `Connection`,
5//! WebSocket-upgrade signalling, optional `traceparent`), and surfaces the
6//! canonical `LogContext` used by the access-log envelope. Acts as the
7//! Kawa `ParserCallbacks` implementation for the H1 mux path.
8
9use std::{
10 io::Write as _,
11 net::{IpAddr, SocketAddr},
12 str::from_utf8,
13 sync::Arc,
14};
15
16use rusty_ulid::Ulid;
17use sozu_command_lib::logging::LogContext;
18
19use crate::metrics::names;
20use crate::{
21 Protocol, RetrieveClusterError,
22 pool::Checkout,
23 protocol::{
24 http::{GenericHttpStream, Method, parser::compare_no_case},
25 pipe::WebSocketContext,
26 },
27};
28
29#[cfg(feature = "opentelemetry")]
30fn parse_traceparent(val: &kawa::Store, buf: &[u8]) -> Option<([u8; 32], [u8; 16])> {
31 let val = val.data(buf);
32 let (version, val) = parse_hex::<2>(val)?;
33 if version.as_slice() != b"00" {
34 return None;
35 }
36 let val = skip_separator(val)?;
37 let (trace_id, val) = parse_hex::<32>(val)?;
38 let val = skip_separator(val)?;
39 let (parent_id, val) = parse_hex::<16>(val)?;
40 let val = skip_separator(val)?;
41 let (_, val) = parse_hex::<2>(val)?;
42 val.is_empty().then_some((trace_id, parent_id))
43}
44
45#[cfg(feature = "opentelemetry")]
46fn parse_hex<const N: usize>(buf: &[u8]) -> Option<([u8; N], &[u8])> {
47 let val: [u8; N] = buf.get(..N)?.try_into().unwrap();
48 val.iter()
49 .all(|c| c.is_ascii_hexdigit())
50 .then_some((val, &buf[N..]))
51}
52
53#[cfg(feature = "opentelemetry")]
54fn skip_separator(buf: &[u8]) -> Option<&[u8]> {
55 buf.first().filter(|b| **b == b'-').map(|_| &buf[1..])
56}
57
58#[cfg(feature = "opentelemetry")]
59fn random_id<const N: usize>() -> [u8; N] {
60 use rand::RngExt;
61 const CHARSET: &[u8] = b"0123456789abcdef";
62 let mut rng = rand::rng();
63 let mut buf = [0; N];
64 buf.fill_with(|| {
65 let n = rng.random_range(0..CHARSET.len());
66 CHARSET[n]
67 });
68 buf
69}
70
71#[cfg(feature = "opentelemetry")]
72fn build_traceparent(trace_id: &[u8; 32], parent_id: &[u8; 16]) -> [u8; 55] {
73 // Pre: the ids are hex (they come from a parsed traceparent or our own
74 // hex `random_id`). Hex digits are inherently CR/LF-free, so this also
75 // upholds the anti-injection guarantee for the value we splice into the
76 // `traceparent` header. Inline (not via `is_crlf_free`) so the release
77 // build under `--features opentelemetry` still compiles (HARD RULE 2).
78 debug_assert!(
79 trace_id.iter().all(u8::is_ascii_hexdigit) && parent_id.iter().all(u8::is_ascii_hexdigit),
80 "traceparent ids must be hex (CR/LF-free header value)"
81 );
82 let mut buf = [0; 55];
83 buf[..3].copy_from_slice(b"00-");
84 buf[3..35].copy_from_slice(trace_id);
85 buf[35] = b'-';
86 buf[36..52].copy_from_slice(parent_id);
87 buf[52..55].copy_from_slice(b"-01");
88 // Post: the value is exactly the `00-<trace>-<parent>-01` shape with the
89 // dash separators at the fixed offsets the parser expects.
90 debug_assert!(
91 buf[2] == b'-' && buf[35] == b'-' && buf[52] == b'-',
92 "traceparent must carry dash separators at fixed offsets"
93 );
94 buf
95}
96
97/// `true` when `bytes` contains no CR or LF — the anti-injection
98/// invariant for any header value Sōzu serialises onto the wire. A value
99/// carrying a raw CR/LF could split one header into two (request/response
100/// smuggling, CWE-93/CWE-113).
101///
102/// Compiled in EVERY profile (HARD RULE 2): it is referenced inside plain
103/// `debug_assert!` calls whose arguments still compile with
104/// `debug_assertions` OFF, so a `#[cfg(debug_assertions)]` gate would break
105/// the release build with E0425. In release it is only reached from
106/// optimised-out `debug_assert!` bodies, so the optimiser drops it — hence
107/// `#[allow(dead_code)]` to keep `-D warnings` clean.
108#[allow(dead_code)]
109fn is_crlf_free(bytes: &[u8]) -> bool {
110 !bytes.iter().any(|b| *b == b'\r' || *b == b'\n')
111}
112
113/// Write the ";for=..;by=.." portion of a Forwarded header into `buf`
114/// without heap-allocating a `String`.
115///
116/// RFC 7239 §6 requires IPv6 literals to be enclosed in square brackets
117/// inside a quoted string (the `IP-literal` production is `"[" IPv6address "]"`).
118/// Emitting a bare IPv6 like `for="2001:db8::1:8080"` is ambiguous: the
119/// trailing `:1:8080` could parse as `::1` + port `8080` or as the literal
120/// `:1:8080` with no port. Matches HAProxy's behaviour
121/// (`_7239_print_ip6` in `src/http_ext.c`), which is the de-facto reference
122/// implementation of RFC 7239 in the proxy world. See sozu issue #1254.
123fn write_forwarded_for_by(buf: &mut Vec<u8>, peer_ip: IpAddr, peer_port: u16, public_ip: IpAddr) {
124 let before = buf.len();
125 buf.extend_from_slice(b";for=\"");
126 write_ip_literal(buf, peer_ip);
127 buf.push(b':');
128 let mut port_buf = itoa::Buffer::new();
129 buf.extend_from_slice(port_buf.format(peer_port).as_bytes());
130 buf.extend_from_slice(b"\";by=");
131 match public_ip {
132 IpAddr::V4(_) => {
133 let _ = write!(buf, "{public_ip}");
134 }
135 IpAddr::V6(_) => {
136 buf.push(b'"');
137 write_ip_literal(buf, public_ip);
138 buf.push(b'"');
139 }
140 }
141 // Post: the fragment grew the buffer and the whole appended span is
142 // CR/LF-free — it is spliced verbatim into a `Forwarded` header value,
143 // so a stray CR/LF would let an attacker-influenced peer address split
144 // the header (CWE-93). `peer_ip`/`public_ip` are `IpAddr` and the port
145 // is a `u16`, so no untrusted bytes reach here, but the guard pins the
146 // anti-injection contract against future edits to this fragment.
147 debug_assert!(
148 buf.len() > before,
149 "the Forwarded ;for=;by= fragment must emit bytes"
150 );
151 debug_assert!(
152 is_crlf_free(&buf[before..]),
153 "the Forwarded fragment must be CR/LF-free (anti-injection)"
154 );
155}
156
157/// Write an `IP-literal` per RFC 3986 §3.2.2 / RFC 7239 §6: IPv4 verbatim,
158/// IPv6 wrapped in `[...]`. Caller is responsible for any surrounding quotes
159/// required by the embedding syntax (RFC 7239 mandates them for IPv6).
160fn write_ip_literal(buf: &mut Vec<u8>, ip: IpAddr) {
161 let before = buf.len();
162 match ip {
163 IpAddr::V4(_) => {
164 let _ = write!(buf, "{ip}");
165 }
166 IpAddr::V6(_) => {
167 buf.push(b'[');
168 let _ = write!(buf, "{ip}");
169 buf.push(b']');
170 }
171 }
172 // Post: a non-empty literal was appended (every IpAddr renders to at
173 // least one byte) and the emitted bytes carry no CR/LF — an IpAddr can
174 // only render `[0-9a-fA-F:.]` plus the brackets we add, so this is a
175 // belt-and-suspenders guard on the value we splice into a header.
176 debug_assert!(buf.len() > before, "write_ip_literal must emit bytes");
177 debug_assert!(
178 is_crlf_free(&buf[before..]),
179 "an IP literal spliced into a header must be CR/LF-free"
180 );
181 // IPv6 is bracketed on both ends; IPv4 is bare (RFC 3986 §3.2.2).
182 debug_assert_eq!(
183 matches!(ip, IpAddr::V6(_)),
184 buf[before] == b'[' && buf[buf.len() - 1] == b']',
185 "IPv6 literals are bracketed, IPv4 literals are not"
186 );
187}
188
189/// Write ", proto=<proto>;for=..;by=.." (the suffix appended to an existing Forwarded value).
190fn write_forwarded_suffix(
191 buf: &mut Vec<u8>,
192 proto: &str,
193 peer_ip: IpAddr,
194 peer_port: u16,
195 public_ip: IpAddr,
196) {
197 // Pre: `proto` is the proxy-chosen scheme label ("http"/"https"), never
198 // attacker-controlled — assert it carries no CR/LF before it is spliced
199 // into the `Forwarded` value (anti-injection, CWE-93).
200 debug_assert!(
201 is_crlf_free(proto.as_bytes()),
202 "the proto label spliced into Forwarded must be CR/LF-free"
203 );
204 let before = buf.len();
205 buf.extend_from_slice(b", proto=");
206 buf.extend_from_slice(proto.as_bytes());
207 write_forwarded_for_by(buf, peer_ip, peer_port, public_ip);
208 // Post: the suffix grew the buffer, begins with the `, proto=`
209 // separator (so it appends cleanly to an existing value), and the whole
210 // appended span is CR/LF-free.
211 debug_assert!(
212 buf.len() > before + b", proto=".len(),
213 "the Forwarded suffix must emit the separator plus a value"
214 );
215 debug_assert!(
216 buf[before..].starts_with(b", proto="),
217 "the Forwarded suffix must start with the `, proto=` separator"
218 );
219 debug_assert!(
220 is_crlf_free(&buf[before..]),
221 "the Forwarded suffix must be CR/LF-free (anti-injection)"
222 );
223}
224
225/// This is the container used to store and use information about the session from within a Kawa parser callback
226#[derive(Debug)]
227pub struct HttpContext {
228 // ========== Write only
229 /// set to false if Kawa finds a "Connection" header with a "close" value in the response
230 pub keep_alive_backend: bool,
231 /// set to false if Kawa finds a "Connection" header with a "close" value in the request
232 pub keep_alive_frontend: bool,
233 /// the value of the sticky session cookie in the request
234 pub sticky_session_found: Option<String>,
235 // ---------- Status Line
236 /// the value of the method in the request line
237 pub method: Option<Method>,
238 /// the value of the authority of the request (in the request line of "Host" header)
239 pub authority: Option<String>,
240 /// the value of the path in the request line
241 pub path: Option<String>,
242 /// the value of the status code in the response line
243 pub status: Option<u16>,
244 /// the value of the reason in the response line
245 pub reason: Option<String>,
246 // ---------- Additional optional data
247 pub user_agent: Option<String>,
248 /// Value of the `x-request-id` header observed (if propagated from the
249 /// client/upstream LB) or generated (from `self.id`). Universal correlation
250 /// header — populated unconditionally by `on_request_headers` so the access
251 /// log can record the exact value forwarded to the backend.
252 pub x_request_id: Option<String>,
253 /// Verbatim value of the client-supplied `X-Forwarded-For` header as
254 /// observed before Sōzu appended its own hop. Captured here, not at
255 /// request edit time, so the access log records the upstream-attested
256 /// chain even when Sōzu also appends its own peer to the forwarded
257 /// header. `None` if the request had no `X-Forwarded-For` header.
258 pub xff_chain: Option<String>,
259
260 #[cfg(feature = "opentelemetry")]
261 pub otel: Option<sozu_command::logging::OpenTelemetry>,
262
263 // ========== Read only
264 /// signals wether Kawa should write a "Connection" header with a "close" value (request and response)
265 pub closing: bool,
266 /// Connection/session ULID — stable across all requests multiplexed on this
267 /// TCP or TLS connection. Used as the first slot in the legacy log-context
268 /// bracket `[session req cluster backend]` and emitted into
269 /// `ProtobufAccessLog.session_id`.
270 pub session_id: Ulid,
271 /// the value of the custom header, named "Sozu-Id", that Kawa should write (request and response)
272 pub id: Ulid,
273 pub backend_id: Option<String>,
274 pub cluster_id: Option<String>,
275 /// the value of the protocol Kawa should write in the Forwarded headers of the request
276 pub protocol: Protocol,
277 /// the value of the public address Kawa should write in the Forwarded headers of the request
278 pub public_address: SocketAddr,
279 /// the value of the session address Kawa should write in the Forwarded headers of the request
280 pub session_address: Option<SocketAddr>,
281 /// the name of the cookie Kawa should read from the request to get the sticky session
282 pub sticky_name: String,
283 /// the sticky session that should be used
284 /// used to create a "Set-Cookie" header in the response in case it differs from sticky_session_found
285 pub sticky_session: Option<String>,
286 /// the address of the backend server
287 pub backend_address: Option<SocketAddr>,
288 /// The TLS Server Name Indication (SNI) hostname negotiated at handshake.
289 ///
290 /// Populated for HTTPS listeners when the client sent an SNI extension (see
291 /// `https.rs::upgrade_handshake`). Used by the routing layer to enforce the
292 /// TLS trust boundary against the HTTP `:authority` / `Host` header — without
293 /// this check, an attacker holding a valid certificate for tenant A could
294 /// open TLS with SNI=A then send requests with `:authority=tenantB` and
295 /// reach tenant B's backend (CWE-346 / CWE-444).
296 ///
297 /// `None` when the listener is plaintext HTTP or the client omitted SNI.
298 /// Stored pre-lowercased and without a port for direct exact-match comparison.
299 pub tls_server_name: Option<String>,
300 /// Snapshot of the SAN set of the certificate Sōzu actually served at
301 /// the TLS handshake. Captured once in `https.rs::upgrade_handshake`
302 /// from the resolver and frozen for the connection lifetime so H2
303 /// stream coalescing (RFC 7540 §9.1.1 / RFC 9113 §9.1.1) accepts any
304 /// `:authority` covered by the certificate, with RFC 6125 §6.4.3
305 /// wildcard handling. `None` for plaintext listeners or when SNI was
306 /// absent (router falls back to the legacy exact-match predicate).
307 /// `Some(empty)` when the default cert was served — every
308 /// `:authority` is rejected. `Arc` so the snapshot is shared across
309 /// every per-stream `HttpContext` without re-allocation.
310 pub tls_cert_names: Option<Arc<Vec<String>>>,
311 /// Whether the router must reject this request when `tls_server_name`
312 /// does not exact-match its authority (CWE-346 / CWE-444). Mirrors
313 /// `HttpsListenerConfig::strict_sni_binding`. Set from the mux
314 /// `Context` at stream creation time (see `Context::create_stream`).
315 /// Plaintext listeners still never hit the check because
316 /// `tls_server_name` is `None`.
317 pub strict_sni_binding: bool,
318 /// When `true`, the request-side block walk in `on_request_headers`
319 /// strips any client-supplied `X-Real-IP` header before forwarding
320 /// (anti-spoofing). Mirrors `HttpListenerConfig::elide_x_real_ip` /
321 /// `HttpsListenerConfig::elide_x_real_ip`. Set from the mux `Context`
322 /// at stream creation (see `Context::create_stream`); listener-scoped
323 /// and never reset across keep-alive requests. Independent of
324 /// `send_x_real_ip`. The same flag is plumbed into
325 /// `pkawa::handle_trailer` so trailer HEADERS frames cannot bypass
326 /// the elision.
327 pub elide_x_real_ip: bool,
328 /// When `true`, `on_request_headers` injects a proxy-generated
329 /// `X-Real-IP` header carrying `session_address.ip()` (post-PROXY-v2
330 /// unwrap, i.e. the original client IP). Mirrors
331 /// `HttpListenerConfig::send_x_real_ip` /
332 /// `HttpsListenerConfig::send_x_real_ip`. Set from the mux `Context`
333 /// at stream creation (see `Context::create_stream`); listener-scoped
334 /// and never reset across keep-alive requests. Independent of
335 /// `elide_x_real_ip`. When `session_address` is `None` (raw socket
336 /// without a peer), no header is appended — identical to the
337 /// existing X-Forwarded-For / Forwarded synthesis behaviour.
338 pub send_x_real_ip: bool,
339 /// Negotiated TLS protocol version as a short label (e.g. `"TLSv1.3"`).
340 /// Captured from `rustls_version_label` at handshake completion and
341 /// propagated from the mux `Context`. `None` for plaintext listeners.
342 pub tls_version: Option<&'static str>,
343 /// Negotiated TLS cipher suite as a short label (e.g.
344 /// `"TLS_AES_128_GCM_SHA256"`). Captured from `rustls_ciphersuite_label`
345 /// at handshake completion and propagated from the mux `Context`. `None`
346 /// for plaintext listeners.
347 pub tls_cipher: Option<&'static str>,
348 /// Negotiated ALPN protocol (e.g. `"h2"`, `"http/1.1"`). Captured from
349 /// rustls at handshake completion and propagated from the mux `Context`.
350 /// `None` for plaintext listeners or when no ALPN was negotiated.
351 pub tls_alpn: Option<&'static str>,
352 /// Name of the correlation header Sozu injects into every request and
353 /// response. Defaults to `"Sozu-Id"` via [`L7ListenerHandler::get_sozu_id_header`].
354 /// Populated at stream creation from the listener config's `sozu_id_header`
355 /// knob. Stored as an owned `String` so it survives a listener hot-reload
356 /// that changes the value.
357 pub sozu_id_header: String,
358 /// Resolved `Location` URL stashed by the routing layer when a frontend
359 /// triggers a permanent redirect (`RedirectPolicy::PERMANENT` or the
360 /// legacy `cluster.https_redirect`). Read by the default-answer 301
361 /// path so the response carries the correct target URL — including
362 /// optional `cluster.https_redirect_port` and rewrite-template captures.
363 /// `None` when the request is not redirecting.
364 pub redirect_location: Option<String>,
365 /// `WWW-Authenticate` realm stashed by the routing layer when a
366 /// frontend rejects an unauthenticated request (`required_auth = true`
367 /// without a valid `Authorization: Basic` header, or
368 /// `RedirectPolicy::UNAUTHORIZED`). Read by the default-answer 401
369 /// path so the response carries the cluster's configured
370 /// `www_authenticate` value. `None` falls back to template default
371 /// (header is elided when no realm is configured).
372 pub www_authenticate: Option<String>,
373 /// Original authority captured before a rewrite-host fired; emitted
374 /// back to the backend as `X-Forwarded-Host` so the backend can
375 /// reconstruct the public URL even though `:authority` / `Host` was
376 /// rewritten on the wire. `None` when no host rewrite happened.
377 pub original_authority: Option<String>,
378 /// Per-frontend response-side header edits (set/replace/delete)
379 /// stashed by the routing layer for the emission boundary in
380 /// `mux/h1.rs::writable` and `mux/h2.rs::write_streams` to apply
381 /// before `kawa.prepare(...)`. Empty when the frontend has no
382 /// response-side header policy. An entry with an empty `val`
383 /// deletes the header by name (HAProxy `del-header` parity); a
384 /// non-empty `val` set/replaces.
385 pub headers_response: Vec<HeaderEditSnapshot>,
386 /// Resolved `Retry-After` value (seconds) for an HTTP 429 default
387 /// answer. Computed in `Router::connect` when the per-(cluster,
388 /// source-IP) connection limit is hit, by folding the cluster's
389 /// `retry_after` override over the global default. `None` (or
390 /// `Some(0)`) tells the answer engine to omit the `Retry-After`
391 /// header entirely — `Retry-After: 0` invites an immediate retry
392 /// that defeats the limit. Unused for any other status code.
393 pub retry_after_seconds: Option<u32>,
394 /// Frontend-supplied template body that overrides the listener /
395 /// cluster default `http.301.redirection` for a single
396 /// `RedirectPolicy::PERMANENT` request. Stashed by the routing layer
397 /// from `RouteResult::redirect_template` and consumed by the 301
398 /// branch of `mux::answers::set_default_answer_with_retry_after`,
399 /// which compiles the body via `HttpAnswers::render_inline_301` and
400 /// renders it with the same `(REDIRECT_LOCATION, ROUTE,
401 /// REQUEST_ID)` variable schema as the persistent template chain.
402 /// `None` falls back to the cluster / listener default. Unused for
403 /// any other status code or for the legacy
404 /// `cluster.https_redirect = true` path (which never sets it).
405 pub frontend_redirect_template: Option<String>,
406 /// Resolved redirect status code stashed by the routing layer when
407 /// a frontend's `RedirectPolicy` is one of the redirect variants.
408 /// 301 = `Permanent`, 302 = `Found`, 308 = `PermanentRedirect`.
409 /// `None` falls back to 301 for the legacy
410 /// `cluster.https_redirect = true` path. Closes #1009.
411 pub redirect_status: Option<u16>,
412 /// Stable, structured discriminator surfaced as the access-log
413 /// `message` field when the session terminates on a timeout. Set by
414 /// the timeout handlers in `kawa_h1::timeout` and `MuxState::timeout`
415 /// **before** the default-answer or `forcefully_terminate_answer`
416 /// path consumes it. The vocabulary is operator-visible API once
417 /// shipped — see the access-log section of `doc/configure.md` for
418 /// the full token list (`client_timeout`,
419 /// `client_timeout_during_response`, `backend_timeout`,
420 /// `backend_response_timeout`). `None` for non-timeout sessions, in
421 /// which case the access log emits `message: None` as before.
422 pub access_log_message: Option<&'static str>,
423}
424
425/// How `apply_response_header_edits` should interpret a per-edit value.
426///
427/// The implicit empty-`val` Append → delete encoding is still supported
428/// (so legacy operator-supplied `[[...frontends.headers]]` entries work
429/// unchanged); the explicit modes give typed policies finer control:
430///
431/// - `Append`: drop the legacy delete shortcut for empty `val`, and
432/// append every other entry before the end-of-headers flag.
433/// - `SetIfAbsent`: skip the insert when `kawa.blocks` already carries
434/// a non-elided header with the same name (case-insensitive). HSTS
435/// uses this by default to preserve a backend-supplied
436/// `Strict-Transport-Security` (RFC 6797 §6.1 single-header
437/// requirement).
438/// - `Set`: delete every existing header with the matching name, then
439/// insert the new entry. Use when the operator wants their typed
440/// policy to override any backend-supplied value (the
441/// `force_replace_backend = true` HSTS shape, for example).
442#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
443pub enum HeaderEditMode {
444 /// Append the header before the end-of-headers flag. Empty `val`
445 /// is interpreted as a delete (legacy behaviour preserved).
446 #[default]
447 Append,
448 /// Skip the insert if `kawa.blocks` already contains a non-elided
449 /// header whose name matches `key` case-insensitively. Otherwise
450 /// behave like `Append`.
451 SetIfAbsent,
452 /// Delete every existing header with the matching name, then
453 /// insert the new value. Equivalent to two operator-defined edits
454 /// (delete + append) but safer to express as one typed entry.
455 Set,
456}
457
458/// Owned snapshot of a per-frontend header edit, captured at routing
459/// time so the emission boundary can apply set/replace/delete without
460/// touching the routing layer's `Rc<[HeaderEdit]>` slices.
461///
462/// `mode` chooses between explicit Append/Delete/SetIfAbsent semantics.
463/// For backwards compatibility `mode = Append` paired with an empty
464/// `val` is still treated as a delete by `apply_response_header_edits`
465/// (HAProxy `del-header` parity), so callers that have not yet migrated
466/// to the explicit `HeaderEditMode::Delete` keep working unchanged.
467#[derive(Debug, Clone)]
468pub struct HeaderEditSnapshot {
469 pub key: Vec<u8>,
470 pub val: Vec<u8>,
471 pub mode: HeaderEditMode,
472}
473
474impl kawa::h1::ParserCallbacks<Checkout> for HttpContext {
475 fn on_headers(&mut self, stream: &mut GenericHttpStream) {
476 match stream.kind {
477 kawa::Kind::Request => self.on_request_headers(stream),
478 kawa::Kind::Response => self.on_response_headers(stream),
479 }
480 }
481}
482
483impl HttpContext {
484 /// Creates a new instance
485 #[allow(clippy::too_many_arguments)]
486 pub fn new(
487 session_id: Ulid,
488 request_id: Ulid,
489 protocol: Protocol,
490 public_address: SocketAddr,
491 session_address: Option<SocketAddr>,
492 sticky_name: String,
493 sozu_id_header: String,
494 elide_x_real_ip: bool,
495 send_x_real_ip: bool,
496 ) -> Self {
497 Self {
498 session_id,
499 id: request_id,
500 backend_id: None,
501 cluster_id: None,
502
503 closing: false,
504 keep_alive_backend: true,
505 keep_alive_frontend: true,
506 protocol,
507 public_address,
508 session_address,
509 sticky_name,
510 sticky_session: None,
511 sticky_session_found: None,
512
513 method: None,
514 authority: None,
515 path: None,
516 status: None,
517 reason: None,
518 user_agent: None,
519 x_request_id: None,
520 xff_chain: None,
521
522 #[cfg(feature = "opentelemetry")]
523 otel: Default::default(),
524
525 backend_address: None,
526 tls_server_name: None,
527 tls_cert_names: None,
528 strict_sni_binding: true,
529 elide_x_real_ip,
530 send_x_real_ip,
531 tls_version: None,
532 tls_cipher: None,
533 tls_alpn: None,
534 sozu_id_header,
535 redirect_location: None,
536 www_authenticate: None,
537 original_authority: None,
538 headers_response: Vec::new(),
539 retry_after_seconds: None,
540 frontend_redirect_template: None,
541 redirect_status: None,
542 access_log_message: None,
543 }
544 }
545
546 /// Callback for request:
547 ///
548 /// - edit headers (connection, forwarded, sticky cookie, sozu-id,
549 /// x-request-id, x-real-ip)
550 /// - save information:
551 /// - method
552 /// - authority
553 /// - path
554 /// - front keep-alive
555 /// - sticky cookie
556 /// - user-agent
557 /// - x-request-id (preserved if present, else derived from `self.id`)
558 fn on_request_headers(&mut self, request: &mut GenericHttpStream) {
559 // Editing never drops a block — it only elides in place (key set to
560 // Empty, length preserved) or pushes new headers. Snapshot the count
561 // so the postcondition can pin "blocks only grow" for the whole edit.
562 let blocks_at_entry = request.blocks.len();
563
564 let buf = request.storage.mut_buffer();
565
566 // Captures the request line
567 if let kawa::StatusLine::Request {
568 method,
569 authority,
570 path,
571 ..
572 } = &request.detached.status_line
573 {
574 self.method = method.data_opt(buf).map(Method::new);
575 self.authority = authority
576 .data_opt(buf)
577 .and_then(|data| from_utf8(data).ok())
578 .map(ToOwned::to_owned);
579 self.path = path
580 .data_opt(buf)
581 .and_then(|data| from_utf8(data).ok())
582 .map(ToOwned::to_owned);
583 }
584
585 // if self.method == Some(Method::Get) && request.body_size == kawa::BodySize::Empty {
586 // request.parsing_phase = kawa::ParsingPhase::Terminated;
587 // }
588
589 let public_ip = self.public_address.ip();
590 let public_port = self.public_address.port();
591 let proto = match self.protocol {
592 Protocol::HTTP => "http",
593 Protocol::HTTPS => "https",
594 _ => unreachable!(),
595 };
596
597 // `proto` is the proxy-resolved scheme label, sourced from the
598 // listener protocol (never from request bytes). The match above
599 // already rejects anything but HTTP/HTTPS; pin that it is one of the
600 // two CR/LF-free literals we splice into forwarding headers.
601 debug_assert!(
602 proto == "http" || proto == "https",
603 "proto must be the http/https scheme label"
604 );
605
606 // Find and remove the sticky_name cookie
607 // if found its value is stored in sticky_session_found
608 for cookie in &mut request.detached.jar {
609 let key = cookie.key.data(buf);
610 if key == self.sticky_name.as_bytes() {
611 let val = cookie.val.data(buf);
612 self.sticky_session_found = from_utf8(val).ok().map(ToOwned::to_owned);
613 cookie.elide();
614 // Post: the matched sticky cookie is gone from the forwarded
615 // jar so the backend never sees Sōzu's own session cookie.
616 debug_assert!(
617 cookie.is_elided(),
618 "the matched sticky cookie must be elided after capture"
619 );
620 }
621 }
622
623 // If found:
624 // - set Connection to "close" if closing is set
625 // - set keep_alive_frontend to false if Connection is "close"
626 // - update value of X-Forwarded-Proto
627 // - update value of X-Forwarded-Port
628 // - store X-Forwarded-For
629 // - store Forwarded
630 // - store User-Agent
631 let mut x_for = None;
632 let mut forwarded = None;
633 let mut has_x_port = false;
634 let mut has_x_proto = false;
635 let mut has_x_request_id = false;
636 let mut has_connection = false;
637 #[cfg(feature = "opentelemetry")]
638 let mut traceparent: Option<&mut kawa::Pair> = None;
639 #[cfg(feature = "opentelemetry")]
640 let mut tracestate: Option<&mut kawa::Pair> = None;
641 for block in &mut request.blocks {
642 match block {
643 kawa::Block::Header(header) if !header.is_elided() => {
644 let key = header.key.data(buf);
645 if compare_no_case(key, b"connection") {
646 has_connection = true;
647 if self.closing {
648 header.val = kawa::Store::Static(b"close");
649 } else {
650 let val = header.val.data(buf);
651 self.keep_alive_frontend &= !compare_no_case(val, b"close");
652 }
653 } else if compare_no_case(key, b"X-Forwarded-Proto") {
654 has_x_proto = true;
655 // header.val = kawa::Store::Static(proto.as_bytes());
656 incr!(names::http::TRUSTING_X_PROTO);
657 let val = header.val.data(buf);
658 if !compare_no_case(val, proto.as_bytes()) {
659 incr!(names::http::TRUSTING_X_PROTO_DIFF);
660 debug!(
661 "{} Trusting X-Forwarded-Proto for {:?} even though {:?} != {}",
662 self.log_context(),
663 self.authority,
664 val,
665 proto
666 );
667 }
668 } else if compare_no_case(key, b"X-Forwarded-Port") {
669 has_x_port = true;
670 // header.val = kawa::Store::from_string(public_port.to_string());
671 incr!(names::http::TRUSTING_X_PORT);
672 let val = header.val.data(buf);
673 let mut port_buf = itoa::Buffer::new();
674 let expected = port_buf.format(public_port);
675 if !compare_no_case(val, expected.as_bytes()) {
676 incr!(names::http::TRUSTING_X_PORT_DIFF);
677 debug!(
678 "{} Trusting X-Forwarded-Port for {:?} even though {:?} != {}",
679 self.log_context(),
680 self.authority,
681 val,
682 expected
683 );
684 }
685 } else if compare_no_case(key, b"X-Forwarded-For") {
686 // Snapshot the upstream-attested chain before we
687 // potentially append our own peer below — the access
688 // log records the value the client/upstream LB
689 // forwarded, not the rewritten value Sōzu emits.
690 self.xff_chain = header
691 .val
692 .data_opt(buf)
693 .and_then(|data| from_utf8(data).ok())
694 .map(ToOwned::to_owned);
695 x_for = Some(header);
696 } else if compare_no_case(key, b"X-Real-IP") && self.elide_x_real_ip {
697 // Anti-spoofing: a client cannot supply its own
698 // `X-Real-IP` and have it reach the backend. The
699 // proxy-injected value (when `send_x_real_ip` is
700 // also set) is appended after this loop. H2 trailer
701 // HEADERS frames bypass this callback; they are
702 // covered by the matching elision in
703 // `pkawa::handle_trailer`.
704 debug_assert!(
705 self.elide_x_real_ip,
706 "X-Real-IP is only elided when anti-spoofing is enabled"
707 );
708 header.elide();
709 // Post: the spoofable client value is stripped before
710 // forwarding (anti-spoofing invariant, CWE-348).
711 debug_assert!(
712 header.is_elided(),
713 "client X-Real-IP must be elided when elide_x_real_ip is set"
714 );
715 } else if compare_no_case(key, b"Forwarded") {
716 forwarded = Some(header);
717 } else if compare_no_case(key, b"User-Agent") {
718 self.user_agent = header
719 .val
720 .data_opt(buf)
721 .and_then(|data| from_utf8(data).ok())
722 .map(ToOwned::to_owned);
723 } else if compare_no_case(key, b"X-Request-Id") {
724 // RFC: not standardized, but the de-facto correlation
725 // header used by Envoy/HAProxy/most LBs. Preserve the
726 // client-supplied value verbatim — overwriting it
727 // breaks end-to-end request tracing.
728 has_x_request_id = true;
729 self.x_request_id = header
730 .val
731 .data_opt(buf)
732 .and_then(|data| from_utf8(data).ok())
733 .map(ToOwned::to_owned);
734 } else {
735 #[cfg(feature = "opentelemetry")]
736 if compare_no_case(key, b"traceparent") {
737 if let Some(hdr) = traceparent {
738 hdr.elide();
739 }
740 traceparent = Some(header);
741 } else if compare_no_case(key, b"tracestate") {
742 if let Some(hdr) = tracestate {
743 hdr.elide();
744 }
745 tracestate = Some(header);
746 }
747 }
748 }
749 _ => {}
750 }
751 }
752
753 #[cfg(feature = "opentelemetry")]
754 let (otel, has_traceparent) = {
755 let mut otel = sozu_command_lib::logging::OpenTelemetry::default();
756 let tp = traceparent
757 .as_ref()
758 .and_then(|hdr| parse_traceparent(&hdr.val, buf))
759 .map(|(trace_id, parent_id)| (trace_id, Some(parent_id)));
760 // Remove tracestate if no traceparent is present
761 if let (None, Some(tracestate)) = (tp, tracestate) {
762 tracestate.elide();
763 }
764 let (trace_id, parent_id) = tp.unwrap_or_else(|| (random_id(), None));
765 otel.trace_id = trace_id;
766 otel.parent_span_id = parent_id;
767 otel.span_id = random_id();
768 // Modify header if present
769 if let Some(id) = &mut traceparent {
770 let new_val = build_traceparent(&otel.trace_id, &otel.span_id);
771 id.val.modify(buf, &new_val);
772 }
773 (otel, traceparent.is_some())
774 };
775
776 // If session_address is set:
777 // - append its ip address to the list of "X-Forwarded-For" if it was found, creates it if not
778 // - append "proto=[PROTO];for=[PEER];by=[PUBLIC]" to the list of "Forwarded" if it was found, creates it if not
779 if let Some(peer_addr) = self.session_address {
780 let peer_ip = peer_addr.ip();
781 let peer_port = peer_addr.port();
782 let has_x_for = x_for.is_some();
783 let has_forwarded = forwarded.is_some();
784
785 // Buffer for building header values — ownership is transferred to Store
786 // via `take`, so each header gets its own allocation.
787 let mut hdr_buf = Vec::with_capacity(128);
788
789 if let Some(header) = x_for {
790 let prior_len = header.val.data(buf).len();
791 hdr_buf.extend_from_slice(header.val.data(buf));
792 let _ = write!(hdr_buf, ", {peer_ip}");
793 // Pre: we only ever extend the client-attested chain — the
794 // existing value stays a prefix and we appended the `, ip`
795 // separator, so the rewritten value is strictly longer and
796 // CR/LF-free (else the appended peer would split the header).
797 debug_assert!(
798 hdr_buf.len() > prior_len,
799 "X-Forwarded-For append must grow the value"
800 );
801 debug_assert!(
802 is_crlf_free(&hdr_buf[prior_len..]),
803 "the X-Forwarded-For hop we append must be CR/LF-free (anti-injection)"
804 );
805 header.val = kawa::Store::from_vec(std::mem::take(&mut hdr_buf));
806 }
807 if let Some(header) = &mut forwarded {
808 let prior_len = header.val.data(buf).len();
809 hdr_buf.extend_from_slice(header.val.data(buf));
810 write_forwarded_suffix(&mut hdr_buf, proto, peer_ip, peer_port, public_ip);
811 // Pre: same contract for the structured `Forwarded` chain —
812 // the existing value is preserved as a prefix and our suffix
813 // is CR/LF-free (`write_forwarded_suffix` asserts the suffix
814 // span too; this pins it relative to the prior value).
815 debug_assert!(
816 hdr_buf.len() > prior_len,
817 "Forwarded append must grow the value"
818 );
819 debug_assert!(
820 is_crlf_free(&hdr_buf[prior_len..]),
821 "the Forwarded element we append must be CR/LF-free (anti-injection)"
822 );
823 header.val = kawa::Store::from_vec(std::mem::take(&mut hdr_buf));
824 }
825
826 if !has_x_for {
827 let blocks_before = request.blocks.len();
828 let _ = write!(hdr_buf, "{peer_ip}");
829 debug_assert!(
830 is_crlf_free(&hdr_buf),
831 "a synthesised X-Forwarded-For value must be CR/LF-free"
832 );
833 request.push_block(kawa::Block::Header(kawa::Pair {
834 key: kawa::Store::Static(b"X-Forwarded-For"),
835 val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
836 }));
837 debug_assert_eq!(
838 request.blocks.len(),
839 blocks_before + 1,
840 "creating X-Forwarded-For must push exactly one block"
841 );
842 }
843 if !has_forwarded {
844 let blocks_before = request.blocks.len();
845 hdr_buf.extend_from_slice(b"proto=");
846 hdr_buf.extend_from_slice(proto.as_bytes());
847 write_forwarded_for_by(&mut hdr_buf, peer_ip, peer_port, public_ip);
848 debug_assert!(
849 is_crlf_free(&hdr_buf),
850 "a synthesised Forwarded value must be CR/LF-free"
851 );
852 request.push_block(kawa::Block::Header(kawa::Pair {
853 key: kawa::Store::Static(b"Forwarded"),
854 val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
855 }));
856 debug_assert_eq!(
857 request.blocks.len(),
858 blocks_before + 1,
859 "creating Forwarded must push exactly one block"
860 );
861 }
862
863 // Inject a proxy-generated `X-Real-IP` header carrying the
864 // peer IP (post-PROXY-v2 unwrap, so the original client IP
865 // even when the upstream presented PROXY-v2). Folded into the
866 // `if let Some(peer_addr)` arm so missing peers (raw socket,
867 // no PROXY-v2) skip the injection silently — identical to the
868 // X-Forwarded-For / Forwarded synthesis behaviour above. Any
869 // client-supplied `X-Real-IP` was either elided in the block
870 // walk (if `elide_x_real_ip` is on) or passes through; this
871 // header is appended last so order in the resulting block
872 // list is deterministic for tests.
873 if self.send_x_real_ip {
874 let blocks_before = request.blocks.len();
875 debug_assert!(
876 hdr_buf.is_empty(),
877 "the header scratch buffer was taken before reuse"
878 );
879 let _ = write!(hdr_buf, "{peer_ip}");
880 // The proxy-generated value is a rendered IpAddr — non-empty
881 // and CR/LF-free, so it cannot inject a second header.
882 debug_assert!(
883 !hdr_buf.is_empty() && is_crlf_free(&hdr_buf),
884 "the injected X-Real-IP value must be a CR/LF-free IP"
885 );
886 request.push_block(kawa::Block::Header(kawa::Pair {
887 key: kawa::Store::Static(b"X-Real-IP"),
888 val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
889 }));
890 debug_assert_eq!(
891 request.blocks.len(),
892 blocks_before + 1,
893 "injecting X-Real-IP must push exactly one block"
894 );
895 }
896 }
897
898 #[cfg(feature = "opentelemetry")]
899 {
900 if !has_traceparent {
901 let val = build_traceparent(&otel.trace_id, &otel.span_id);
902 request.push_block(kawa::Block::Header(kawa::Pair {
903 key: kawa::Store::Static(b"traceparent"),
904 val: kawa::Store::from_slice(&val),
905 }));
906 }
907 self.otel = Some(otel);
908 }
909
910 if !has_x_port {
911 let mut port_buf = itoa::Buffer::new();
912 let port_str = port_buf.format(public_port);
913 request.push_block(kawa::Block::Header(kawa::Pair {
914 key: kawa::Store::Static(b"X-Forwarded-Port"),
915 val: kawa::Store::from_slice(port_str.as_bytes()),
916 }));
917 }
918 if !has_x_proto {
919 request.push_block(kawa::Block::Header(kawa::Pair {
920 key: kawa::Store::Static(b"X-Forwarded-Proto"),
921 val: kawa::Store::Static(proto.as_bytes()),
922 }));
923 }
924 // Create a "Connection" header in case it was not found and closing it set
925 if !has_connection && self.closing {
926 request.push_block(kawa::Block::Header(kawa::Pair {
927 key: kawa::Store::Static(b"Connection"),
928 val: kawa::Store::Static(b"close"),
929 }));
930 }
931 // Inject "X-Request-Id" derived from the request ULID when the client
932 // (or upstream LB) did not already supply one. When already present,
933 // the header is left untouched in the block list — preserving the
934 // client-supplied value end-to-end is the whole point of this header.
935 // Either way, `self.x_request_id` is populated so the access log
936 // records the exact value forwarded to the backend.
937 if has_x_request_id {
938 incr!(names::http::X_REQUEST_ID_PROPAGATED);
939 } else {
940 let value = self.id.to_string();
941 // The generated id is a ULID rendering — Crockford base-32, so
942 // CR/LF-free by construction.
943 debug_assert!(
944 is_crlf_free(value.as_bytes()),
945 "the generated X-Request-Id must be CR/LF-free"
946 );
947 request.push_block(kawa::Block::Header(kawa::Pair {
948 key: kawa::Store::Static(b"X-Request-Id"),
949 val: kawa::Store::from_string(value.clone()),
950 }));
951 self.x_request_id = Some(value);
952 incr!(names::http::X_REQUEST_ID_GENERATED);
953 }
954 // Either branch leaves the forwarded value recorded for the access
955 // log so it matches exactly what the backend receives.
956 debug_assert!(
957 self.x_request_id.is_some(),
958 "on_request_headers must record the forwarded X-Request-Id"
959 );
960
961 // Create a custom correlation header (defaults to "Sozu-Id", can be
962 // renamed via the `sozu_id_header` listener config knob).
963 let blocks_before_sozu_id = request.blocks.len();
964 request.push_block(kawa::Block::Header(kawa::Pair {
965 key: kawa::Store::from_string(self.sozu_id_header.clone()),
966 val: kawa::Store::from_string(self.id.to_string()),
967 }));
968 debug_assert_eq!(
969 request.blocks.len(),
970 blocks_before_sozu_id + 1,
971 "the Sozu-Id correlation header must be pushed exactly once"
972 );
973
974 // Postcondition: the whole edit only ever added or elided headers —
975 // the block count is monotonically non-decreasing (a removed header
976 // is elided in place, never popped), so the backend never loses a
977 // header it should have seen.
978 debug_assert!(
979 request.blocks.len() >= blocks_at_entry,
980 "header editing must never drop a block from the request"
981 );
982 }
983
984 /// Callback for response:
985 ///
986 /// - edit headers (connection, set-cookie, sozu-id)
987 /// - save information:
988 /// - status code
989 /// - reason
990 /// - back keep-alive
991 fn on_response_headers(&mut self, response: &mut GenericHttpStream) {
992 // Like the request path, response editing only adds or elides — pin
993 // the entry count so the postcondition can assert "blocks only grow".
994 let blocks_at_entry = response.blocks.len();
995
996 let buf = &mut response.storage.mut_buffer();
997
998 // Captures the response line
999 if let kawa::StatusLine::Response { code, reason, .. } = &response.detached.status_line {
1000 self.status = Some(*code);
1001 self.reason = reason
1002 .data_opt(buf)
1003 .and_then(|data| from_utf8(data).ok())
1004 .map(ToOwned::to_owned);
1005 }
1006
1007 if self.method == Some(Method::Head) {
1008 response.parsing_phase = kawa::ParsingPhase::Terminated;
1009 }
1010
1011 // If found:
1012 // - set Connection to "close" if closing is set
1013 // - set keep_alive_backend to false if Connection is "close"
1014 for block in &mut response.blocks {
1015 match block {
1016 kawa::Block::Header(header) if !header.is_elided() => {
1017 let key = header.key.data(buf);
1018 if compare_no_case(key, b"connection") {
1019 if self.closing {
1020 header.val = kawa::Store::Static(b"close");
1021 } else {
1022 let val = header.val.data(buf);
1023 self.keep_alive_backend &= !compare_no_case(val, b"close");
1024 }
1025 }
1026 }
1027 _ => {}
1028 }
1029 }
1030
1031 // If the sticky_session is set and differs from the one found in the request
1032 // create a "Set-Cookie" header to update the sticky_name value
1033 if let Some(sticky_session) = &self.sticky_session {
1034 if self.sticky_session != self.sticky_session_found {
1035 let blocks_before = response.blocks.len();
1036 let mut cookie_buf =
1037 Vec::with_capacity(self.sticky_name.len() + 1 + sticky_session.len() + 8);
1038 cookie_buf.extend_from_slice(self.sticky_name.as_bytes());
1039 cookie_buf.push(b'=');
1040 cookie_buf.extend_from_slice(sticky_session.as_bytes());
1041 cookie_buf.extend_from_slice(b"; Path=/");
1042 // The cookie value is `name=session; Path=/`, built from the
1043 // proxy-controlled sticky name + session id — assert it is
1044 // well-formed (contains the `=` separator, ends with the
1045 // attribute suffix) and CR/LF-free so it cannot inject an
1046 // extra Set-Cookie / split the response (CWE-113).
1047 debug_assert!(
1048 cookie_buf.contains(&b'='),
1049 "the Set-Cookie value must carry the name=value separator"
1050 );
1051 debug_assert!(
1052 cookie_buf.ends_with(b"; Path=/"),
1053 "the Set-Cookie value must end with the Path attribute"
1054 );
1055 debug_assert!(
1056 is_crlf_free(&cookie_buf),
1057 "the synthesised Set-Cookie value must be CR/LF-free (anti-injection)"
1058 );
1059 response.push_block(kawa::Block::Header(kawa::Pair {
1060 key: kawa::Store::Static(b"Set-Cookie"),
1061 val: kawa::Store::from_vec(cookie_buf),
1062 }));
1063 debug_assert_eq!(
1064 response.blocks.len(),
1065 blocks_before + 1,
1066 "synthesising Set-Cookie must push exactly one block"
1067 );
1068 }
1069 }
1070
1071 // Create a custom correlation header (defaults to "Sozu-Id", can be
1072 // renamed via the `sozu_id_header` listener config knob).
1073 let blocks_before_sozu_id = response.blocks.len();
1074 response.push_block(kawa::Block::Header(kawa::Pair {
1075 key: kawa::Store::from_string(self.sozu_id_header.clone()),
1076 val: kawa::Store::from_string(self.id.to_string()),
1077 }));
1078 debug_assert_eq!(
1079 response.blocks.len(),
1080 blocks_before_sozu_id + 1,
1081 "the Sozu-Id correlation header must be pushed exactly once"
1082 );
1083
1084 // Postcondition: response editing only added or elided headers, so
1085 // the block count never decreased.
1086 debug_assert!(
1087 response.blocks.len() >= blocks_at_entry,
1088 "header editing must never drop a block from the response"
1089 );
1090 }
1091
1092 pub fn reset(&mut self) {
1093 // Snapshot the connection-scoped identity + TLS/listener fields that
1094 // reset() must NOT touch (set once at handshake, reused across every
1095 // keep-alive request). Cheap to copy — all `Copy`. Read only inside
1096 // the postcondition `debug_assert!`s below, so dead code in release.
1097 let session_id_before = self.session_id;
1098 let id_before = self.id;
1099 let strict_sni_before = self.strict_sni_binding;
1100 let elide_before = self.elide_x_real_ip;
1101 let send_before = self.send_x_real_ip;
1102 let tls_version_before = self.tls_version;
1103 let tls_cipher_before = self.tls_cipher;
1104 let tls_alpn_before = self.tls_alpn;
1105
1106 self.keep_alive_backend = true;
1107 self.keep_alive_frontend = true;
1108 self.sticky_session_found = None;
1109 self.method = None;
1110 self.authority = None;
1111 self.path = None;
1112 self.status = None;
1113 self.reason = None;
1114 self.user_agent = None;
1115 self.x_request_id = None;
1116 self.xff_chain = None;
1117 self.redirect_location = None;
1118 self.www_authenticate = None;
1119 self.original_authority = None;
1120 self.headers_response.clear();
1121 // Note: tls_server_name, tls_version, tls_cipher, tls_alpn,
1122 // strict_sni_binding, elide_x_real_ip, send_x_real_ip are
1123 // connection-scoped — set once at handshake completion and reused
1124 // across every keep-alive request, so reset() intentionally leaves
1125 // them in place.
1126
1127 // Post: request-scoped state is fully cleared (a stale value here
1128 // would leak across pipelined requests on the same connection).
1129 debug_assert!(
1130 self.method.is_none()
1131 && self.authority.is_none()
1132 && self.path.is_none()
1133 && self.status.is_none()
1134 && self.x_request_id.is_none()
1135 && self.headers_response.is_empty(),
1136 "reset() must clear all request-scoped state"
1137 );
1138 debug_assert!(
1139 self.keep_alive_backend && self.keep_alive_frontend,
1140 "reset() must restore keep-alive to its optimistic default"
1141 );
1142 // Post: connection-scoped identity + TLS/listener knobs are untouched.
1143 debug_assert_eq!(
1144 self.session_id, session_id_before,
1145 "reset() must preserve the connection session id"
1146 );
1147 debug_assert_eq!(self.id, id_before, "reset() must preserve the request id");
1148 debug_assert!(
1149 self.strict_sni_binding == strict_sni_before
1150 && self.elide_x_real_ip == elide_before
1151 && self.send_x_real_ip == send_before
1152 && self.tls_version == tls_version_before
1153 && self.tls_cipher == tls_cipher_before
1154 && self.tls_alpn == tls_alpn_before,
1155 "reset() must preserve connection-scoped TLS/listener knobs"
1156 );
1157 }
1158
1159 pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
1160 let given_method = self.method.as_ref().ok_or(RetrieveClusterError::NoMethod)?;
1161 let given_authority = self
1162 .authority
1163 .as_deref()
1164 .ok_or(RetrieveClusterError::NoHost)?;
1165 let given_path = self.path.as_deref().ok_or(RetrieveClusterError::NoPath)?;
1166
1167 // Post: the triple is returned in (authority, path, method) order —
1168 // pin it against a future field-swap regression that would route to
1169 // the wrong cluster. (Both fields can be empty strings on the wire,
1170 // so we assert the mapping, not non-emptiness.)
1171 debug_assert!(
1172 std::ptr::eq(given_authority, self.authority.as_deref().unwrap())
1173 && std::ptr::eq(given_path, self.path.as_deref().unwrap()),
1174 "extract_route must return (authority, path, method) in order"
1175 );
1176 Ok((given_authority, given_path, given_method))
1177 }
1178
1179 pub fn get_route(&self) -> String {
1180 if let Some(method) = &self.method {
1181 if let Some(authority) = &self.authority {
1182 if let Some(path) = &self.path {
1183 return format!("{method} {authority}{path}");
1184 }
1185 return format!("{method} {authority}");
1186 }
1187 return format!("{method}");
1188 }
1189 String::new()
1190 }
1191
1192 pub fn websocket_context(&self) -> WebSocketContext {
1193 WebSocketContext::Http {
1194 method: self.method.clone(),
1195 authority: self.authority.clone(),
1196 path: self.path.clone(),
1197 reason: self.reason.clone(),
1198 status: self.status,
1199 }
1200 }
1201
1202 pub fn log_context(&self) -> LogContext<'_> {
1203 let ctx = LogContext {
1204 session_id: self.session_id,
1205 request_id: Some(self.id),
1206 cluster_id: self.cluster_id.as_deref(),
1207 backend_id: self.backend_id.as_deref(),
1208 };
1209 // The access-log bracket `[session req cluster backend]` is keyed on
1210 // session + request id; both must always be present so the log line
1211 // is correlatable. cluster/backend are legitimately absent before
1212 // routing, so they are not asserted here.
1213 debug_assert!(
1214 ctx.request_id.is_some(),
1215 "log_context must always carry the request id for correlation"
1216 );
1217 ctx
1218 }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223 use super::*;
1224 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1225
1226 /// Helper to create a minimal HttpContext for testing.
1227 fn make_context() -> HttpContext {
1228 HttpContext::new(
1229 Ulid::generate(),
1230 Ulid::generate(),
1231 Protocol::HTTP,
1232 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
1233 Some(SocketAddr::new(
1234 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
1235 54321,
1236 )),
1237 "SERVERID".to_owned(),
1238 "Sozu-Id".to_owned(),
1239 false,
1240 false,
1241 )
1242 }
1243
1244 // ── sozu_id_header ──────────────────────────────────────────────────
1245
1246 #[test]
1247 fn test_sozu_id_header_default_name_stored_on_context() {
1248 // The make_context helper uses the documented default "Sozu-Id" to
1249 // match the trait default on `L7ListenerHandler::get_sozu_id_header`.
1250 let ctx = make_context();
1251 assert_eq!(ctx.sozu_id_header, "Sozu-Id");
1252 }
1253
1254 #[test]
1255 fn test_sozu_id_header_custom_name_stored_on_context() {
1256 // Operator-provided rename is carried verbatim onto the HttpContext.
1257 let ctx = HttpContext::new(
1258 Ulid::generate(),
1259 Ulid::generate(),
1260 Protocol::HTTP,
1261 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
1262 None,
1263 "SERVERID".to_owned(),
1264 "X-Edge-Id".to_owned(),
1265 false,
1266 false,
1267 );
1268 assert_eq!(ctx.sozu_id_header, "X-Edge-Id");
1269 }
1270
1271 // ── extract_route ──────────────────────────────────────────────────
1272
1273 #[test]
1274 fn test_extract_route_all_present() {
1275 let mut ctx = make_context();
1276 ctx.method = Some(Method::Get);
1277 ctx.authority = Some("example.com".to_owned());
1278 ctx.path = Some("/index.html".to_owned());
1279
1280 let (authority, path, method) = ctx.extract_route().unwrap();
1281 assert_eq!(authority, "example.com");
1282 assert_eq!(path, "/index.html");
1283 assert_eq!(method, &Method::Get);
1284 }
1285
1286 #[test]
1287 fn test_extract_route_no_method() {
1288 let mut ctx = make_context();
1289 ctx.authority = Some("example.com".to_owned());
1290 ctx.path = Some("/".to_owned());
1291
1292 let err = ctx.extract_route().unwrap_err();
1293 assert!(matches!(err, RetrieveClusterError::NoMethod));
1294 }
1295
1296 #[test]
1297 fn test_extract_route_no_host() {
1298 let mut ctx = make_context();
1299 ctx.method = Some(Method::Get);
1300 ctx.path = Some("/".to_owned());
1301
1302 let err = ctx.extract_route().unwrap_err();
1303 assert!(matches!(err, RetrieveClusterError::NoHost));
1304 }
1305
1306 #[test]
1307 fn test_extract_route_no_path() {
1308 let mut ctx = make_context();
1309 ctx.method = Some(Method::Get);
1310 ctx.authority = Some("example.com".to_owned());
1311
1312 let err = ctx.extract_route().unwrap_err();
1313 assert!(matches!(err, RetrieveClusterError::NoPath));
1314 }
1315
1316 // ── get_route ──────────────────────────────────────────────────────
1317
1318 #[test]
1319 fn test_get_route_all_present() {
1320 let mut ctx = make_context();
1321 ctx.method = Some(Method::Get);
1322 ctx.authority = Some("example.com".to_owned());
1323 ctx.path = Some("/api/v1".to_owned());
1324
1325 assert_eq!(ctx.get_route(), "GET example.com/api/v1");
1326 }
1327
1328 #[test]
1329 fn test_get_route_method_and_authority_only() {
1330 let mut ctx = make_context();
1331 ctx.method = Some(Method::Post);
1332 ctx.authority = Some("example.com".to_owned());
1333
1334 assert_eq!(ctx.get_route(), "POST example.com");
1335 }
1336
1337 #[test]
1338 fn test_get_route_method_only() {
1339 let mut ctx = make_context();
1340 ctx.method = Some(Method::Delete);
1341
1342 assert_eq!(ctx.get_route(), "DELETE");
1343 }
1344
1345 #[test]
1346 fn test_get_route_empty() {
1347 let ctx = make_context();
1348 assert_eq!(ctx.get_route(), "");
1349 }
1350
1351 // ── reset ──────────────────────────────────────────────────────────
1352
1353 #[test]
1354 fn test_reset_clears_request_response_state() {
1355 let mut ctx = make_context();
1356 ctx.keep_alive_backend = false;
1357 ctx.keep_alive_frontend = false;
1358 ctx.sticky_session_found = Some("abc123".to_owned());
1359 ctx.method = Some(Method::Post);
1360 ctx.authority = Some("example.com".to_owned());
1361 ctx.path = Some("/upload".to_owned());
1362 ctx.status = Some(200);
1363 ctx.reason = Some("OK".to_owned());
1364 ctx.user_agent = Some("curl/7.81".to_owned());
1365 ctx.x_request_id = Some("client-xrid-123".to_owned());
1366 ctx.xff_chain = Some("203.0.113.5, 198.51.100.10".to_owned());
1367 ctx.redirect_location = Some("https://example.com/".to_owned());
1368 ctx.www_authenticate = Some("Basic realm=\"sozu\"".to_owned());
1369 ctx.original_authority = Some("old.example.com".to_owned());
1370 ctx.headers_response.push(HeaderEditSnapshot {
1371 key: b"X-Cache".to_vec(),
1372 val: b"HIT".to_vec(),
1373 mode: HeaderEditMode::Append,
1374 });
1375
1376 ctx.reset();
1377
1378 assert!(ctx.keep_alive_backend);
1379 assert!(ctx.keep_alive_frontend);
1380 assert!(ctx.sticky_session_found.is_none());
1381 assert!(ctx.method.is_none());
1382 assert!(ctx.authority.is_none());
1383 assert!(ctx.path.is_none());
1384 assert!(ctx.status.is_none());
1385 assert!(ctx.reason.is_none());
1386 assert!(ctx.user_agent.is_none());
1387 assert!(ctx.x_request_id.is_none());
1388 assert!(ctx.xff_chain.is_none());
1389 // The four stash slots written by the routing layer must clear
1390 // between pipelined H1 requests; otherwise a future code path that
1391 // emits a 301 / 401 default-answer without re-routing would
1392 // inherit a stale Location / WWW-Authenticate from a prior request,
1393 // or the backend would receive a stale X-Forwarded-Host or a stale
1394 // response-side header edit.
1395 assert!(ctx.redirect_location.is_none());
1396 assert!(ctx.www_authenticate.is_none());
1397 assert!(ctx.original_authority.is_none());
1398 assert!(ctx.headers_response.is_empty());
1399 }
1400
1401 #[test]
1402 fn test_reset_preserves_tls_metadata() {
1403 // TLS metadata is connection-scoped (set once at handshake, reused
1404 // across every keep-alive request) — reset() must leave it intact
1405 // so the access log of the second request still carries it.
1406 let mut ctx = make_context();
1407 ctx.tls_server_name = Some("example.com".to_owned());
1408 ctx.tls_version = Some("TLSv1.3");
1409 ctx.tls_cipher = Some("TLS_AES_128_GCM_SHA256");
1410 ctx.tls_alpn = Some("h2");
1411 ctx.strict_sni_binding = false;
1412
1413 ctx.reset();
1414
1415 assert_eq!(ctx.tls_server_name.as_deref(), Some("example.com"));
1416 assert_eq!(ctx.tls_version, Some("TLSv1.3"));
1417 assert_eq!(ctx.tls_cipher, Some("TLS_AES_128_GCM_SHA256"));
1418 assert_eq!(ctx.tls_alpn, Some("h2"));
1419 assert!(!ctx.strict_sni_binding);
1420 }
1421
1422 #[test]
1423 fn test_reset_preserves_connection_state() {
1424 let mut ctx = make_context();
1425 ctx.closing = true;
1426 ctx.cluster_id = Some("cluster-1".to_owned());
1427 ctx.backend_id = Some("backend-1".to_owned());
1428 ctx.sticky_session = Some("session-abc".to_owned());
1429
1430 let original_id = ctx.id;
1431 let original_protocol = ctx.protocol;
1432 let original_public_address = ctx.public_address;
1433
1434 ctx.reset();
1435
1436 // Connection-level state is preserved
1437 assert!(ctx.closing);
1438 assert_eq!(ctx.cluster_id.as_deref(), Some("cluster-1"));
1439 assert_eq!(ctx.backend_id.as_deref(), Some("backend-1"));
1440 assert_eq!(ctx.sticky_session.as_deref(), Some("session-abc"));
1441 assert_eq!(ctx.id, original_id);
1442 assert_eq!(ctx.protocol, original_protocol);
1443 assert_eq!(ctx.public_address, original_public_address);
1444 }
1445
1446 // ── write_forwarded_for_by (RFC 7239 §6 IP-literal bracketing) ──────
1447 //
1448 // Locks in the IPv6 bracket+quote contract that matches HAProxy
1449 // (`_7239_print_ip6` in `src/http_ext.c`) and prevents regression to
1450 // the ambiguous `for="2001:db8::1:8080"` form flagged in issue #1254.
1451
1452 use std::net::Ipv6Addr;
1453
1454 fn render_for_by(peer: SocketAddr, public_ip: IpAddr) -> String {
1455 let mut buf = Vec::new();
1456 write_forwarded_for_by(&mut buf, peer.ip(), peer.port(), public_ip);
1457 String::from_utf8(buf).expect("forwarded fragment must be ASCII")
1458 }
1459
1460 fn render_suffix(proto: &str, peer: SocketAddr, public_ip: IpAddr) -> String {
1461 let mut buf = Vec::new();
1462 write_forwarded_suffix(&mut buf, proto, peer.ip(), peer.port(), public_ip);
1463 String::from_utf8(buf).expect("forwarded fragment must be ASCII")
1464 }
1465
1466 #[test]
1467 fn test_forwarded_ipv4_peer_ipv4_by_unchanged() {
1468 // IPv4 must keep the pre-fix wire format: unquoted `by=`, bare IPv4 in
1469 // `for=`. Sōzu emits `for=` quoted unconditionally (HAProxy emits it
1470 // quoted only when a port is attached; we always attach a port).
1471 let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7)), 54321);
1472 let public_ip = IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1));
1473 assert_eq!(
1474 render_for_by(peer, public_ip),
1475 r#";for="203.0.113.7:54321";by=198.51.100.1"#,
1476 );
1477 }
1478
1479 #[test]
1480 fn test_forwarded_ipv6_peer_brackets_disambiguate_port() {
1481 // Regression for issue #1254: without brackets, `for="2001:db8::1:8080"`
1482 // is ambiguous (could parse as `::1` + port `8080` or `:1:8080` literal).
1483 let peer = SocketAddr::new(IpAddr::V6("2001:db8::1".parse::<Ipv6Addr>().unwrap()), 8080);
1484 let public_ip = IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1));
1485 assert_eq!(
1486 render_for_by(peer, public_ip),
1487 r#";for="[2001:db8::1]:8080";by=198.51.100.1"#,
1488 );
1489 }
1490
1491 #[test]
1492 fn test_forwarded_ipv6_public_address_bracketed_and_quoted() {
1493 // `by=` carries no port but RFC 7239 §6 still requires the IP-literal
1494 // brackets for IPv6, and the value must be quoted because the literal
1495 // contains `:`.
1496 let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7)), 54321);
1497 let public_ip = IpAddr::V6("2001:db8::2".parse::<Ipv6Addr>().unwrap());
1498 assert_eq!(
1499 render_for_by(peer, public_ip),
1500 r#";for="203.0.113.7:54321";by="[2001:db8::2]""#,
1501 );
1502 }
1503
1504 #[test]
1505 fn test_forwarded_ipv6_peer_and_public_both_bracketed() {
1506 let peer = SocketAddr::new(IpAddr::V6("2001:db8::1".parse::<Ipv6Addr>().unwrap()), 8080);
1507 let public_ip = IpAddr::V6("2001:db8::2".parse::<Ipv6Addr>().unwrap());
1508 assert_eq!(
1509 render_for_by(peer, public_ip),
1510 r#";for="[2001:db8::1]:8080";by="[2001:db8::2]""#,
1511 );
1512 }
1513
1514 #[test]
1515 fn test_forwarded_suffix_prepends_proto_and_brackets_ipv6() {
1516 // The suffix form is what we append when an inbound `Forwarded`
1517 // header already exists. Same bracketing contract must hold.
1518 let peer = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 443);
1519 let public_ip = IpAddr::V6("fe80::1".parse::<Ipv6Addr>().unwrap());
1520 assert_eq!(
1521 render_suffix("https", peer, public_ip),
1522 r#", proto=https;for="[::1]:443";by="[fe80::1]""#,
1523 );
1524 }
1525
1526 // ── traceparent / opentelemetry helpers ─────────────────────────────
1527
1528 #[cfg(feature = "opentelemetry")]
1529 mod otel {
1530 use super::super::*;
1531
1532 #[test]
1533 fn test_parse_hex_valid() {
1534 let (val, rest) = parse_hex::<4>(b"abcd1234").unwrap();
1535 assert_eq!(&val, b"abcd");
1536 assert_eq!(rest, b"1234");
1537 }
1538
1539 #[test]
1540 fn test_parse_hex_exact_length() {
1541 let (val, rest) = parse_hex::<8>(b"01234567").unwrap();
1542 assert_eq!(&val, b"01234567");
1543 assert!(rest.is_empty());
1544 }
1545
1546 #[test]
1547 fn test_parse_hex_too_short() {
1548 assert!(parse_hex::<4>(b"ab").is_none());
1549 }
1550
1551 #[test]
1552 fn test_parse_hex_rejects_non_hex() {
1553 assert!(parse_hex::<4>(b"ghij").is_none());
1554 }
1555
1556 #[test]
1557 fn test_parse_hex_rejects_uppercase_is_ok() {
1558 // Uppercase hex digits are valid
1559 let (val, _) = parse_hex::<4>(b"ABCD").unwrap();
1560 assert_eq!(&val, b"ABCD");
1561 }
1562
1563 #[test]
1564 fn test_skip_separator_valid() {
1565 let rest = skip_separator(b"-hello").unwrap();
1566 assert_eq!(rest, b"hello");
1567 }
1568
1569 #[test]
1570 fn test_skip_separator_wrong_char() {
1571 assert!(skip_separator(b"+hello").is_none());
1572 }
1573
1574 #[test]
1575 fn test_skip_separator_empty() {
1576 assert!(skip_separator(b"").is_none());
1577 }
1578
1579 #[test]
1580 fn test_build_traceparent_format() {
1581 let trace_id: [u8; 32] = *b"4bf92f3577b34da6a3ce929d0e0e4736";
1582 let parent_id: [u8; 16] = *b"00f067aa0ba902b7";
1583
1584 let result = build_traceparent(&trace_id, &parent_id);
1585 assert_eq!(
1586 &result,
1587 b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
1588 );
1589 }
1590
1591 #[test]
1592 fn test_build_traceparent_length() {
1593 let trace_id = [b'a'; 32];
1594 let parent_id = [b'b'; 16];
1595 let result = build_traceparent(&trace_id, &parent_id);
1596 // Format: "00-" (3) + trace_id (32) + "-" (1) + parent_id (16) + "-01" (3) = 55
1597 assert_eq!(result.len(), 55);
1598 }
1599
1600 #[test]
1601 fn test_parse_traceparent_valid() {
1602 let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1603 let store = kawa::Store::Static(input);
1604 let (trace_id, parent_id) = parse_traceparent(&store, input).unwrap();
1605 assert_eq!(&trace_id, b"4bf92f3577b34da6a3ce929d0e0e4736");
1606 assert_eq!(&parent_id, b"00f067aa0ba902b7");
1607 }
1608
1609 #[test]
1610 fn test_parse_traceparent_sampled_flag_zero() {
1611 let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00";
1612 let store = kawa::Store::Static(input);
1613 let result = parse_traceparent(&store, input);
1614 assert!(result.is_some());
1615 }
1616
1617 #[test]
1618 fn test_parse_traceparent_wrong_version() {
1619 let input = b"01-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1620 let store = kawa::Store::Static(input);
1621 assert!(parse_traceparent(&store, input).is_none());
1622 }
1623
1624 #[test]
1625 fn test_parse_traceparent_too_short() {
1626 let input = b"00-4bf9";
1627 let store = kawa::Store::Static(input);
1628 assert!(parse_traceparent(&store, input).is_none());
1629 }
1630
1631 #[test]
1632 fn test_parse_traceparent_trailing_data() {
1633 // Extra characters after the trace-flags should cause rejection
1634 let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01-extra";
1635 let store = kawa::Store::Static(input);
1636 assert!(parse_traceparent(&store, input).is_none());
1637 }
1638
1639 #[test]
1640 fn test_parse_traceparent_missing_separator() {
1641 let input = b"004bf92f3577b34da6a3ce929d0e0e473600f067aa0ba902b701";
1642 let store = kawa::Store::Static(input);
1643 assert!(parse_traceparent(&store, input).is_none());
1644 }
1645
1646 #[test]
1647 fn test_parse_build_roundtrip() {
1648 let trace_id: [u8; 32] = *b"4bf92f3577b34da6a3ce929d0e0e4736";
1649 let parent_id: [u8; 16] = *b"00f067aa0ba902b7";
1650
1651 // Build a traceparent from known IDs
1652 let built = build_traceparent(&trace_id, &parent_id);
1653
1654 // Verify that the built value matches the expected static string,
1655 // then parse that static string back to confirm roundtrip.
1656 let expected: &[u8] = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1657 assert_eq!(&built[..], expected);
1658
1659 let store = kawa::Store::Static(expected);
1660 let (parsed_trace_id, parsed_parent_id) = parse_traceparent(&store, expected).unwrap();
1661
1662 assert_eq!(parsed_trace_id, trace_id);
1663 assert_eq!(parsed_parent_id, parent_id);
1664 }
1665 }
1666}