Skip to main content

sozu_lib/protocol/kawa_h1/
editor.rs

1//! H1 request/response header editor.
2//!
3//! Captures method/authority/path on parse, rewrites hop-by-hop and
4//! forwarding headers (`X-Forwarded-*`, `Forwarded`, `Connection`,
5//! WebSocket-upgrade signalling, optional `traceparent`), and surfaces the
6//! canonical `LogContext` used by the access-log envelope. Acts as the
7//! Kawa `ParserCallbacks` implementation for the H1 mux path.
8
9use std::{
10    io::Write as _,
11    net::{IpAddr, SocketAddr},
12    str::from_utf8,
13    sync::Arc,
14};
15
16use rusty_ulid::Ulid;
17use sozu_command_lib::logging::LogContext;
18
19use crate::metrics::names;
20use crate::{
21    Protocol, RetrieveClusterError,
22    pool::Checkout,
23    protocol::{
24        http::{GenericHttpStream, Method, parser::compare_no_case},
25        pipe::WebSocketContext,
26    },
27};
28
29#[cfg(feature = "opentelemetry")]
30fn parse_traceparent(val: &kawa::Store, buf: &[u8]) -> Option<([u8; 32], [u8; 16])> {
31    let val = val.data(buf);
32    let (version, val) = parse_hex::<2>(val)?;
33    if version.as_slice() != b"00" {
34        return None;
35    }
36    let val = skip_separator(val)?;
37    let (trace_id, val) = parse_hex::<32>(val)?;
38    let val = skip_separator(val)?;
39    let (parent_id, val) = parse_hex::<16>(val)?;
40    let val = skip_separator(val)?;
41    let (_, val) = parse_hex::<2>(val)?;
42    val.is_empty().then_some((trace_id, parent_id))
43}
44
45#[cfg(feature = "opentelemetry")]
46fn parse_hex<const N: usize>(buf: &[u8]) -> Option<([u8; N], &[u8])> {
47    let val: [u8; N] = buf.get(..N)?.try_into().unwrap();
48    val.iter()
49        .all(|c| c.is_ascii_hexdigit())
50        .then_some((val, &buf[N..]))
51}
52
53#[cfg(feature = "opentelemetry")]
54fn skip_separator(buf: &[u8]) -> Option<&[u8]> {
55    buf.first().filter(|b| **b == b'-').map(|_| &buf[1..])
56}
57
58#[cfg(feature = "opentelemetry")]
59fn random_id<const N: usize>() -> [u8; N] {
60    use rand::RngExt;
61    const CHARSET: &[u8] = b"0123456789abcdef";
62    let mut rng = rand::rng();
63    let mut buf = [0; N];
64    buf.fill_with(|| {
65        let n = rng.random_range(0..CHARSET.len());
66        CHARSET[n]
67    });
68    buf
69}
70
71#[cfg(feature = "opentelemetry")]
72fn build_traceparent(trace_id: &[u8; 32], parent_id: &[u8; 16]) -> [u8; 55] {
73    // Pre: the ids are hex (they come from a parsed traceparent or our own
74    // hex `random_id`). Hex digits are inherently CR/LF-free, so this also
75    // upholds the anti-injection guarantee for the value we splice into the
76    // `traceparent` header. Inline (not via `is_crlf_free`) so the release
77    // build under `--features opentelemetry` still compiles (HARD RULE 2).
78    debug_assert!(
79        trace_id.iter().all(u8::is_ascii_hexdigit) && parent_id.iter().all(u8::is_ascii_hexdigit),
80        "traceparent ids must be hex (CR/LF-free header value)"
81    );
82    let mut buf = [0; 55];
83    buf[..3].copy_from_slice(b"00-");
84    buf[3..35].copy_from_slice(trace_id);
85    buf[35] = b'-';
86    buf[36..52].copy_from_slice(parent_id);
87    buf[52..55].copy_from_slice(b"-01");
88    // Post: the value is exactly the `00-<trace>-<parent>-01` shape with the
89    // dash separators at the fixed offsets the parser expects.
90    debug_assert!(
91        buf[2] == b'-' && buf[35] == b'-' && buf[52] == b'-',
92        "traceparent must carry dash separators at fixed offsets"
93    );
94    buf
95}
96
97/// `true` when `bytes` contains no CR or LF — the anti-injection
98/// invariant for any header value Sōzu serialises onto the wire. A value
99/// carrying a raw CR/LF could split one header into two (request/response
100/// smuggling, CWE-93/CWE-113).
101///
102/// Compiled in EVERY profile (HARD RULE 2): it is referenced inside plain
103/// `debug_assert!` calls whose arguments still compile with
104/// `debug_assertions` OFF, so a `#[cfg(debug_assertions)]` gate would break
105/// the release build with E0425. In release it is only reached from
106/// optimised-out `debug_assert!` bodies, so the optimiser drops it — hence
107/// `#[allow(dead_code)]` to keep `-D warnings` clean.
108#[allow(dead_code)]
109fn is_crlf_free(bytes: &[u8]) -> bool {
110    !bytes.iter().any(|b| *b == b'\r' || *b == b'\n')
111}
112
113/// Write the ";for=..;by=.." portion of a Forwarded header into `buf`
114/// without heap-allocating a `String`.
115///
116/// RFC 7239 §6 requires IPv6 literals to be enclosed in square brackets
117/// inside a quoted string (the `IP-literal` production is `"[" IPv6address "]"`).
118/// Emitting a bare IPv6 like `for="2001:db8::1:8080"` is ambiguous: the
119/// trailing `:1:8080` could parse as `::1` + port `8080` or as the literal
120/// `:1:8080` with no port. Matches HAProxy's behaviour
121/// (`_7239_print_ip6` in `src/http_ext.c`), which is the de-facto reference
122/// implementation of RFC 7239 in the proxy world. See sozu issue #1254.
123fn write_forwarded_for_by(buf: &mut Vec<u8>, peer_ip: IpAddr, peer_port: u16, public_ip: IpAddr) {
124    let before = buf.len();
125    buf.extend_from_slice(b";for=\"");
126    write_ip_literal(buf, peer_ip);
127    buf.push(b':');
128    let mut port_buf = itoa::Buffer::new();
129    buf.extend_from_slice(port_buf.format(peer_port).as_bytes());
130    buf.extend_from_slice(b"\";by=");
131    match public_ip {
132        IpAddr::V4(_) => {
133            let _ = write!(buf, "{public_ip}");
134        }
135        IpAddr::V6(_) => {
136            buf.push(b'"');
137            write_ip_literal(buf, public_ip);
138            buf.push(b'"');
139        }
140    }
141    // Post: the fragment grew the buffer and the whole appended span is
142    // CR/LF-free — it is spliced verbatim into a `Forwarded` header value,
143    // so a stray CR/LF would let an attacker-influenced peer address split
144    // the header (CWE-93). `peer_ip`/`public_ip` are `IpAddr` and the port
145    // is a `u16`, so no untrusted bytes reach here, but the guard pins the
146    // anti-injection contract against future edits to this fragment.
147    debug_assert!(
148        buf.len() > before,
149        "the Forwarded ;for=;by= fragment must emit bytes"
150    );
151    debug_assert!(
152        is_crlf_free(&buf[before..]),
153        "the Forwarded fragment must be CR/LF-free (anti-injection)"
154    );
155}
156
157/// Write an `IP-literal` per RFC 3986 §3.2.2 / RFC 7239 §6: IPv4 verbatim,
158/// IPv6 wrapped in `[...]`. Caller is responsible for any surrounding quotes
159/// required by the embedding syntax (RFC 7239 mandates them for IPv6).
160fn write_ip_literal(buf: &mut Vec<u8>, ip: IpAddr) {
161    let before = buf.len();
162    match ip {
163        IpAddr::V4(_) => {
164            let _ = write!(buf, "{ip}");
165        }
166        IpAddr::V6(_) => {
167            buf.push(b'[');
168            let _ = write!(buf, "{ip}");
169            buf.push(b']');
170        }
171    }
172    // Post: a non-empty literal was appended (every IpAddr renders to at
173    // least one byte) and the emitted bytes carry no CR/LF — an IpAddr can
174    // only render `[0-9a-fA-F:.]` plus the brackets we add, so this is a
175    // belt-and-suspenders guard on the value we splice into a header.
176    debug_assert!(buf.len() > before, "write_ip_literal must emit bytes");
177    debug_assert!(
178        is_crlf_free(&buf[before..]),
179        "an IP literal spliced into a header must be CR/LF-free"
180    );
181    // IPv6 is bracketed on both ends; IPv4 is bare (RFC 3986 §3.2.2).
182    debug_assert_eq!(
183        matches!(ip, IpAddr::V6(_)),
184        buf[before] == b'[' && buf[buf.len() - 1] == b']',
185        "IPv6 literals are bracketed, IPv4 literals are not"
186    );
187}
188
189/// Write ", proto=<proto>;for=..;by=.." (the suffix appended to an existing Forwarded value).
190fn write_forwarded_suffix(
191    buf: &mut Vec<u8>,
192    proto: &str,
193    peer_ip: IpAddr,
194    peer_port: u16,
195    public_ip: IpAddr,
196) {
197    // Pre: `proto` is the proxy-chosen scheme label ("http"/"https"), never
198    // attacker-controlled — assert it carries no CR/LF before it is spliced
199    // into the `Forwarded` value (anti-injection, CWE-93).
200    debug_assert!(
201        is_crlf_free(proto.as_bytes()),
202        "the proto label spliced into Forwarded must be CR/LF-free"
203    );
204    let before = buf.len();
205    buf.extend_from_slice(b", proto=");
206    buf.extend_from_slice(proto.as_bytes());
207    write_forwarded_for_by(buf, peer_ip, peer_port, public_ip);
208    // Post: the suffix grew the buffer, begins with the `, proto=`
209    // separator (so it appends cleanly to an existing value), and the whole
210    // appended span is CR/LF-free.
211    debug_assert!(
212        buf.len() > before + b", proto=".len(),
213        "the Forwarded suffix must emit the separator plus a value"
214    );
215    debug_assert!(
216        buf[before..].starts_with(b", proto="),
217        "the Forwarded suffix must start with the `, proto=` separator"
218    );
219    debug_assert!(
220        is_crlf_free(&buf[before..]),
221        "the Forwarded suffix must be CR/LF-free (anti-injection)"
222    );
223}
224
225/// This is the container used to store and use information about the session from within a Kawa parser callback
226#[derive(Debug)]
227pub struct HttpContext {
228    // ========== Write only
229    /// set to false if Kawa finds a "Connection" header with a "close" value in the response
230    pub keep_alive_backend: bool,
231    /// set to false if Kawa finds a "Connection" header with a "close" value in the request
232    pub keep_alive_frontend: bool,
233    /// the value of the sticky session cookie in the request
234    pub sticky_session_found: Option<String>,
235    // ---------- Status Line
236    /// the value of the method in the request line
237    pub method: Option<Method>,
238    /// the value of the authority of the request (in the request line of "Host" header)
239    pub authority: Option<String>,
240    /// the value of the path in the request line
241    pub path: Option<String>,
242    /// the value of the status code in the response line
243    pub status: Option<u16>,
244    /// the value of the reason in the response line
245    pub reason: Option<String>,
246    // ---------- Additional optional data
247    pub user_agent: Option<String>,
248    /// Value of the `x-request-id` header observed (if propagated from the
249    /// client/upstream LB) or generated (from `self.id`). Universal correlation
250    /// header — populated unconditionally by `on_request_headers` so the access
251    /// log can record the exact value forwarded to the backend.
252    pub x_request_id: Option<String>,
253    /// Verbatim value of the client-supplied `X-Forwarded-For` header as
254    /// observed before Sōzu appended its own hop. Captured here, not at
255    /// request edit time, so the access log records the upstream-attested
256    /// chain even when Sōzu also appends its own peer to the forwarded
257    /// header. `None` if the request had no `X-Forwarded-For` header.
258    pub xff_chain: Option<String>,
259
260    #[cfg(feature = "opentelemetry")]
261    pub otel: Option<sozu_command::logging::OpenTelemetry>,
262
263    // ========== Read only
264    /// signals wether Kawa should write a "Connection" header with a "close" value (request and response)
265    pub closing: bool,
266    /// Connection/session ULID — stable across all requests multiplexed on this
267    /// TCP or TLS connection. Used as the first slot in the legacy log-context
268    /// bracket `[session req cluster backend]` and emitted into
269    /// `ProtobufAccessLog.session_id`.
270    pub session_id: Ulid,
271    /// the value of the custom header, named "Sozu-Id", that Kawa should write (request and response)
272    pub id: Ulid,
273    pub backend_id: Option<String>,
274    pub cluster_id: Option<String>,
275    /// the value of the protocol Kawa should write in the Forwarded headers of the request
276    pub protocol: Protocol,
277    /// the value of the public address Kawa should write in the Forwarded headers of the request
278    pub public_address: SocketAddr,
279    /// the value of the session address Kawa should write in the Forwarded headers of the request
280    pub session_address: Option<SocketAddr>,
281    /// the name of the cookie Kawa should read from the request to get the sticky session
282    pub sticky_name: String,
283    /// the sticky session that should be used
284    /// used to create a "Set-Cookie" header in the response in case it differs from sticky_session_found
285    pub sticky_session: Option<String>,
286    /// the address of the backend server
287    pub backend_address: Option<SocketAddr>,
288    /// The TLS Server Name Indication (SNI) hostname negotiated at handshake.
289    ///
290    /// Populated for HTTPS listeners when the client sent an SNI extension (see
291    /// `https.rs::upgrade_handshake`). Used by the routing layer to enforce the
292    /// TLS trust boundary against the HTTP `:authority` / `Host` header — without
293    /// this check, an attacker holding a valid certificate for tenant A could
294    /// open TLS with SNI=A then send requests with `:authority=tenantB` and
295    /// reach tenant B's backend (CWE-346 / CWE-444).
296    ///
297    /// `None` when the listener is plaintext HTTP or the client omitted SNI.
298    /// Stored pre-lowercased and without a port for direct exact-match comparison.
299    pub tls_server_name: Option<String>,
300    /// Snapshot of the SAN set of the certificate Sōzu actually served at
301    /// the TLS handshake. Captured once in `https.rs::upgrade_handshake`
302    /// from the resolver and frozen for the connection lifetime so H2
303    /// stream coalescing (RFC 7540 §9.1.1 / RFC 9113 §9.1.1) accepts any
304    /// `:authority` covered by the certificate, with RFC 6125 §6.4.3
305    /// wildcard handling. `None` for plaintext listeners or when SNI was
306    /// absent (router falls back to the legacy exact-match predicate).
307    /// `Some(empty)` when the default cert was served — every
308    /// `:authority` is rejected. `Arc` so the snapshot is shared across
309    /// every per-stream `HttpContext` without re-allocation.
310    pub tls_cert_names: Option<Arc<Vec<String>>>,
311    /// Whether the router must reject this request when `tls_server_name`
312    /// does not exact-match its authority (CWE-346 / CWE-444). Mirrors
313    /// `HttpsListenerConfig::strict_sni_binding`. Set from the mux
314    /// `Context` at stream creation time (see `Context::create_stream`).
315    /// Plaintext listeners still never hit the check because
316    /// `tls_server_name` is `None`.
317    pub strict_sni_binding: bool,
318    /// When `true`, the request-side block walk in `on_request_headers`
319    /// strips any client-supplied `X-Real-IP` header before forwarding
320    /// (anti-spoofing). Mirrors `HttpListenerConfig::elide_x_real_ip` /
321    /// `HttpsListenerConfig::elide_x_real_ip`. Set from the mux `Context`
322    /// at stream creation (see `Context::create_stream`); listener-scoped
323    /// and never reset across keep-alive requests. Independent of
324    /// `send_x_real_ip`. The same flag is plumbed into
325    /// `pkawa::handle_trailer` so trailer HEADERS frames cannot bypass
326    /// the elision.
327    pub elide_x_real_ip: bool,
328    /// When `true`, `on_request_headers` injects a proxy-generated
329    /// `X-Real-IP` header carrying `session_address.ip()` (post-PROXY-v2
330    /// unwrap, i.e. the original client IP). Mirrors
331    /// `HttpListenerConfig::send_x_real_ip` /
332    /// `HttpsListenerConfig::send_x_real_ip`. Set from the mux `Context`
333    /// at stream creation (see `Context::create_stream`); listener-scoped
334    /// and never reset across keep-alive requests. Independent of
335    /// `elide_x_real_ip`. When `session_address` is `None` (raw socket
336    /// without a peer), no header is appended — identical to the
337    /// existing X-Forwarded-For / Forwarded synthesis behaviour.
338    pub send_x_real_ip: bool,
339    /// Negotiated TLS protocol version as a short label (e.g. `"TLSv1.3"`).
340    /// Captured from `rustls_version_label` at handshake completion and
341    /// propagated from the mux `Context`. `None` for plaintext listeners.
342    pub tls_version: Option<&'static str>,
343    /// Negotiated TLS cipher suite as a short label (e.g.
344    /// `"TLS_AES_128_GCM_SHA256"`). Captured from `rustls_ciphersuite_label`
345    /// at handshake completion and propagated from the mux `Context`. `None`
346    /// for plaintext listeners.
347    pub tls_cipher: Option<&'static str>,
348    /// Negotiated ALPN protocol (e.g. `"h2"`, `"http/1.1"`). Captured from
349    /// rustls at handshake completion and propagated from the mux `Context`.
350    /// `None` for plaintext listeners or when no ALPN was negotiated.
351    pub tls_alpn: Option<&'static str>,
352    /// Name of the correlation header Sozu injects into every request and
353    /// response. Defaults to `"Sozu-Id"` via [`L7ListenerHandler::get_sozu_id_header`].
354    /// Populated at stream creation from the listener config's `sozu_id_header`
355    /// knob. Stored as an owned `String` so it survives a listener hot-reload
356    /// that changes the value.
357    pub sozu_id_header: String,
358    /// Resolved `Location` URL stashed by the routing layer when a frontend
359    /// triggers a permanent redirect (`RedirectPolicy::PERMANENT` or the
360    /// legacy `cluster.https_redirect`). Read by the default-answer 301
361    /// path so the response carries the correct target URL — including
362    /// optional `cluster.https_redirect_port` and rewrite-template captures.
363    /// `None` when the request is not redirecting.
364    pub redirect_location: Option<String>,
365    /// `WWW-Authenticate` realm stashed by the routing layer when a
366    /// frontend rejects an unauthenticated request (`required_auth = true`
367    /// without a valid `Authorization: Basic` header, or
368    /// `RedirectPolicy::UNAUTHORIZED`). Read by the default-answer 401
369    /// path so the response carries the cluster's configured
370    /// `www_authenticate` value. `None` falls back to template default
371    /// (header is elided when no realm is configured).
372    pub www_authenticate: Option<String>,
373    /// Original authority captured before a rewrite-host fired; emitted
374    /// back to the backend as `X-Forwarded-Host` so the backend can
375    /// reconstruct the public URL even though `:authority` / `Host` was
376    /// rewritten on the wire. `None` when no host rewrite happened.
377    pub original_authority: Option<String>,
378    /// Per-frontend response-side header edits (set/replace/delete)
379    /// stashed by the routing layer for the emission boundary in
380    /// `mux/h1.rs::writable` and `mux/h2.rs::write_streams` to apply
381    /// before `kawa.prepare(...)`. Empty when the frontend has no
382    /// response-side header policy. An entry with an empty `val`
383    /// deletes the header by name (HAProxy `del-header` parity); a
384    /// non-empty `val` set/replaces.
385    pub headers_response: Vec<HeaderEditSnapshot>,
386    /// Resolved `Retry-After` value (seconds) for an HTTP 429 default
387    /// answer. Computed in `Router::connect` when the per-(cluster,
388    /// source-IP) connection limit is hit, by folding the cluster's
389    /// `retry_after` override over the global default. `None` (or
390    /// `Some(0)`) tells the answer engine to omit the `Retry-After`
391    /// header entirely — `Retry-After: 0` invites an immediate retry
392    /// that defeats the limit. Unused for any other status code.
393    pub retry_after_seconds: Option<u32>,
394    /// Frontend-supplied template body that overrides the listener /
395    /// cluster default `http.301.redirection` for a single
396    /// `RedirectPolicy::PERMANENT` request. Stashed by the routing layer
397    /// from `RouteResult::redirect_template` and consumed by the 301
398    /// branch of `mux::answers::set_default_answer_with_retry_after`,
399    /// which compiles the body via `HttpAnswers::render_inline_301` and
400    /// renders it with the same `(REDIRECT_LOCATION, ROUTE,
401    /// REQUEST_ID)` variable schema as the persistent template chain.
402    /// `None` falls back to the cluster / listener default. Unused for
403    /// any other status code or for the legacy
404    /// `cluster.https_redirect = true` path (which never sets it).
405    pub frontend_redirect_template: Option<String>,
406    /// Resolved redirect status code stashed by the routing layer when
407    /// a frontend's `RedirectPolicy` is one of the redirect variants.
408    /// 301 = `Permanent`, 302 = `Found`, 308 = `PermanentRedirect`.
409    /// `None` falls back to 301 for the legacy
410    /// `cluster.https_redirect = true` path. Closes #1009.
411    pub redirect_status: Option<u16>,
412    /// Stable, structured discriminator surfaced as the access-log
413    /// `message` field when the session terminates on a timeout. Set by
414    /// the timeout handlers in `kawa_h1::timeout` and `MuxState::timeout`
415    /// **before** the default-answer or `forcefully_terminate_answer`
416    /// path consumes it. The vocabulary is operator-visible API once
417    /// shipped — see the access-log section of `doc/configure.md` for
418    /// the full token list (`client_timeout`,
419    /// `client_timeout_during_response`, `backend_timeout`,
420    /// `backend_response_timeout`). `None` for non-timeout sessions, in
421    /// which case the access log emits `message: None` as before.
422    pub access_log_message: Option<&'static str>,
423}
424
425/// How `apply_response_header_edits` should interpret a per-edit value.
426///
427/// The implicit empty-`val` Append → delete encoding is still supported
428/// (so legacy operator-supplied `[[...frontends.headers]]` entries work
429/// unchanged); the explicit modes give typed policies finer control:
430///
431/// - `Append`: drop the legacy delete shortcut for empty `val`, and
432///   append every other entry before the end-of-headers flag.
433/// - `SetIfAbsent`: skip the insert when `kawa.blocks` already carries
434///   a non-elided header with the same name (case-insensitive). HSTS
435///   uses this by default to preserve a backend-supplied
436///   `Strict-Transport-Security` (RFC 6797 §6.1 single-header
437///   requirement).
438/// - `Set`: delete every existing header with the matching name, then
439///   insert the new entry. Use when the operator wants their typed
440///   policy to override any backend-supplied value (the
441///   `force_replace_backend = true` HSTS shape, for example).
442#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
443pub enum HeaderEditMode {
444    /// Append the header before the end-of-headers flag. Empty `val`
445    /// is interpreted as a delete (legacy behaviour preserved).
446    #[default]
447    Append,
448    /// Skip the insert if `kawa.blocks` already contains a non-elided
449    /// header whose name matches `key` case-insensitively. Otherwise
450    /// behave like `Append`.
451    SetIfAbsent,
452    /// Delete every existing header with the matching name, then
453    /// insert the new value. Equivalent to two operator-defined edits
454    /// (delete + append) but safer to express as one typed entry.
455    Set,
456}
457
458/// Owned snapshot of a per-frontend header edit, captured at routing
459/// time so the emission boundary can apply set/replace/delete without
460/// touching the routing layer's `Rc<[HeaderEdit]>` slices.
461///
462/// `mode` chooses between explicit Append/Delete/SetIfAbsent semantics.
463/// For backwards compatibility `mode = Append` paired with an empty
464/// `val` is still treated as a delete by `apply_response_header_edits`
465/// (HAProxy `del-header` parity), so callers that have not yet migrated
466/// to the explicit `HeaderEditMode::Delete` keep working unchanged.
467#[derive(Debug, Clone)]
468pub struct HeaderEditSnapshot {
469    pub key: Vec<u8>,
470    pub val: Vec<u8>,
471    pub mode: HeaderEditMode,
472}
473
474impl kawa::h1::ParserCallbacks<Checkout> for HttpContext {
475    fn on_headers(&mut self, stream: &mut GenericHttpStream) {
476        match stream.kind {
477            kawa::Kind::Request => self.on_request_headers(stream),
478            kawa::Kind::Response => self.on_response_headers(stream),
479        }
480    }
481}
482
483impl HttpContext {
484    /// Creates a new instance
485    #[allow(clippy::too_many_arguments)]
486    pub fn new(
487        session_id: Ulid,
488        request_id: Ulid,
489        protocol: Protocol,
490        public_address: SocketAddr,
491        session_address: Option<SocketAddr>,
492        sticky_name: String,
493        sozu_id_header: String,
494        elide_x_real_ip: bool,
495        send_x_real_ip: bool,
496    ) -> Self {
497        Self {
498            session_id,
499            id: request_id,
500            backend_id: None,
501            cluster_id: None,
502
503            closing: false,
504            keep_alive_backend: true,
505            keep_alive_frontend: true,
506            protocol,
507            public_address,
508            session_address,
509            sticky_name,
510            sticky_session: None,
511            sticky_session_found: None,
512
513            method: None,
514            authority: None,
515            path: None,
516            status: None,
517            reason: None,
518            user_agent: None,
519            x_request_id: None,
520            xff_chain: None,
521
522            #[cfg(feature = "opentelemetry")]
523            otel: Default::default(),
524
525            backend_address: None,
526            tls_server_name: None,
527            tls_cert_names: None,
528            strict_sni_binding: true,
529            elide_x_real_ip,
530            send_x_real_ip,
531            tls_version: None,
532            tls_cipher: None,
533            tls_alpn: None,
534            sozu_id_header,
535            redirect_location: None,
536            www_authenticate: None,
537            original_authority: None,
538            headers_response: Vec::new(),
539            retry_after_seconds: None,
540            frontend_redirect_template: None,
541            redirect_status: None,
542            access_log_message: None,
543        }
544    }
545
546    /// Callback for request:
547    ///
548    /// - edit headers (connection, forwarded, sticky cookie, sozu-id,
549    ///   x-request-id, x-real-ip)
550    /// - save information:
551    ///   - method
552    ///   - authority
553    ///   - path
554    ///   - front keep-alive
555    ///   - sticky cookie
556    ///   - user-agent
557    ///   - x-request-id (preserved if present, else derived from `self.id`)
558    fn on_request_headers(&mut self, request: &mut GenericHttpStream) {
559        // Editing never drops a block — it only elides in place (key set to
560        // Empty, length preserved) or pushes new headers. Snapshot the count
561        // so the postcondition can pin "blocks only grow" for the whole edit.
562        let blocks_at_entry = request.blocks.len();
563
564        let buf = request.storage.mut_buffer();
565
566        // Captures the request line
567        if let kawa::StatusLine::Request {
568            method,
569            authority,
570            path,
571            ..
572        } = &request.detached.status_line
573        {
574            self.method = method.data_opt(buf).map(Method::new);
575            self.authority = authority
576                .data_opt(buf)
577                .and_then(|data| from_utf8(data).ok())
578                .map(ToOwned::to_owned);
579            self.path = path
580                .data_opt(buf)
581                .and_then(|data| from_utf8(data).ok())
582                .map(ToOwned::to_owned);
583        }
584
585        // if self.method == Some(Method::Get) && request.body_size == kawa::BodySize::Empty {
586        //     request.parsing_phase = kawa::ParsingPhase::Terminated;
587        // }
588
589        let public_ip = self.public_address.ip();
590        let public_port = self.public_address.port();
591        let proto = match self.protocol {
592            Protocol::HTTP => "http",
593            Protocol::HTTPS => "https",
594            _ => unreachable!(),
595        };
596
597        // `proto` is the proxy-resolved scheme label, sourced from the
598        // listener protocol (never from request bytes). The match above
599        // already rejects anything but HTTP/HTTPS; pin that it is one of the
600        // two CR/LF-free literals we splice into forwarding headers.
601        debug_assert!(
602            proto == "http" || proto == "https",
603            "proto must be the http/https scheme label"
604        );
605
606        // Find and remove the sticky_name cookie
607        // if found its value is stored in sticky_session_found
608        for cookie in &mut request.detached.jar {
609            let key = cookie.key.data(buf);
610            if key == self.sticky_name.as_bytes() {
611                let val = cookie.val.data(buf);
612                self.sticky_session_found = from_utf8(val).ok().map(ToOwned::to_owned);
613                cookie.elide();
614                // Post: the matched sticky cookie is gone from the forwarded
615                // jar so the backend never sees Sōzu's own session cookie.
616                debug_assert!(
617                    cookie.is_elided(),
618                    "the matched sticky cookie must be elided after capture"
619                );
620            }
621        }
622
623        // If found:
624        // - set Connection to "close" if closing is set
625        // - set keep_alive_frontend to false if Connection is "close"
626        // - update value of X-Forwarded-Proto
627        // - update value of X-Forwarded-Port
628        // - store X-Forwarded-For
629        // - store Forwarded
630        // - store User-Agent
631        let mut x_for = None;
632        let mut forwarded = None;
633        let mut has_x_port = false;
634        let mut has_x_proto = false;
635        let mut has_x_request_id = false;
636        let mut has_connection = false;
637        #[cfg(feature = "opentelemetry")]
638        let mut traceparent: Option<&mut kawa::Pair> = None;
639        #[cfg(feature = "opentelemetry")]
640        let mut tracestate: Option<&mut kawa::Pair> = None;
641        for block in &mut request.blocks {
642            match block {
643                kawa::Block::Header(header) if !header.is_elided() => {
644                    let key = header.key.data(buf);
645                    if compare_no_case(key, b"connection") {
646                        has_connection = true;
647                        if self.closing {
648                            header.val = kawa::Store::Static(b"close");
649                        } else {
650                            let val = header.val.data(buf);
651                            self.keep_alive_frontend &= !compare_no_case(val, b"close");
652                        }
653                    } else if compare_no_case(key, b"X-Forwarded-Proto") {
654                        has_x_proto = true;
655                        // header.val = kawa::Store::Static(proto.as_bytes());
656                        incr!(names::http::TRUSTING_X_PROTO);
657                        let val = header.val.data(buf);
658                        if !compare_no_case(val, proto.as_bytes()) {
659                            incr!(names::http::TRUSTING_X_PROTO_DIFF);
660                            debug!(
661                                "{} Trusting X-Forwarded-Proto for {:?} even though {:?} != {}",
662                                self.log_context(),
663                                self.authority,
664                                val,
665                                proto
666                            );
667                        }
668                    } else if compare_no_case(key, b"X-Forwarded-Port") {
669                        has_x_port = true;
670                        // header.val = kawa::Store::from_string(public_port.to_string());
671                        incr!(names::http::TRUSTING_X_PORT);
672                        let val = header.val.data(buf);
673                        let mut port_buf = itoa::Buffer::new();
674                        let expected = port_buf.format(public_port);
675                        if !compare_no_case(val, expected.as_bytes()) {
676                            incr!(names::http::TRUSTING_X_PORT_DIFF);
677                            debug!(
678                                "{} Trusting X-Forwarded-Port for {:?} even though {:?} != {}",
679                                self.log_context(),
680                                self.authority,
681                                val,
682                                expected
683                            );
684                        }
685                    } else if compare_no_case(key, b"X-Forwarded-For") {
686                        // Snapshot the upstream-attested chain before we
687                        // potentially append our own peer below — the access
688                        // log records the value the client/upstream LB
689                        // forwarded, not the rewritten value Sōzu emits.
690                        self.xff_chain = header
691                            .val
692                            .data_opt(buf)
693                            .and_then(|data| from_utf8(data).ok())
694                            .map(ToOwned::to_owned);
695                        x_for = Some(header);
696                    } else if compare_no_case(key, b"X-Real-IP") && self.elide_x_real_ip {
697                        // Anti-spoofing: a client cannot supply its own
698                        // `X-Real-IP` and have it reach the backend. The
699                        // proxy-injected value (when `send_x_real_ip` is
700                        // also set) is appended after this loop. H2 trailer
701                        // HEADERS frames bypass this callback; they are
702                        // covered by the matching elision in
703                        // `pkawa::handle_trailer`.
704                        debug_assert!(
705                            self.elide_x_real_ip,
706                            "X-Real-IP is only elided when anti-spoofing is enabled"
707                        );
708                        header.elide();
709                        // Post: the spoofable client value is stripped before
710                        // forwarding (anti-spoofing invariant, CWE-348).
711                        debug_assert!(
712                            header.is_elided(),
713                            "client X-Real-IP must be elided when elide_x_real_ip is set"
714                        );
715                    } else if compare_no_case(key, b"Forwarded") {
716                        forwarded = Some(header);
717                    } else if compare_no_case(key, b"User-Agent") {
718                        self.user_agent = header
719                            .val
720                            .data_opt(buf)
721                            .and_then(|data| from_utf8(data).ok())
722                            .map(ToOwned::to_owned);
723                    } else if compare_no_case(key, b"X-Request-Id") {
724                        // RFC: not standardized, but the de-facto correlation
725                        // header used by Envoy/HAProxy/most LBs. Preserve the
726                        // client-supplied value verbatim — overwriting it
727                        // breaks end-to-end request tracing.
728                        has_x_request_id = true;
729                        self.x_request_id = header
730                            .val
731                            .data_opt(buf)
732                            .and_then(|data| from_utf8(data).ok())
733                            .map(ToOwned::to_owned);
734                    } else {
735                        #[cfg(feature = "opentelemetry")]
736                        if compare_no_case(key, b"traceparent") {
737                            if let Some(hdr) = traceparent {
738                                hdr.elide();
739                            }
740                            traceparent = Some(header);
741                        } else if compare_no_case(key, b"tracestate") {
742                            if let Some(hdr) = tracestate {
743                                hdr.elide();
744                            }
745                            tracestate = Some(header);
746                        }
747                    }
748                }
749                _ => {}
750            }
751        }
752
753        #[cfg(feature = "opentelemetry")]
754        let (otel, has_traceparent) = {
755            let mut otel = sozu_command_lib::logging::OpenTelemetry::default();
756            let tp = traceparent
757                .as_ref()
758                .and_then(|hdr| parse_traceparent(&hdr.val, buf))
759                .map(|(trace_id, parent_id)| (trace_id, Some(parent_id)));
760            // Remove tracestate if no traceparent is present
761            if let (None, Some(tracestate)) = (tp, tracestate) {
762                tracestate.elide();
763            }
764            let (trace_id, parent_id) = tp.unwrap_or_else(|| (random_id(), None));
765            otel.trace_id = trace_id;
766            otel.parent_span_id = parent_id;
767            otel.span_id = random_id();
768            // Modify header if present
769            if let Some(id) = &mut traceparent {
770                let new_val = build_traceparent(&otel.trace_id, &otel.span_id);
771                id.val.modify(buf, &new_val);
772            }
773            (otel, traceparent.is_some())
774        };
775
776        // If session_address is set:
777        // - append its ip address to the list of "X-Forwarded-For" if it was found, creates it if not
778        // - append "proto=[PROTO];for=[PEER];by=[PUBLIC]" to the list of "Forwarded" if it was found, creates it if not
779        if let Some(peer_addr) = self.session_address {
780            let peer_ip = peer_addr.ip();
781            let peer_port = peer_addr.port();
782            let has_x_for = x_for.is_some();
783            let has_forwarded = forwarded.is_some();
784
785            // Buffer for building header values — ownership is transferred to Store
786            // via `take`, so each header gets its own allocation.
787            let mut hdr_buf = Vec::with_capacity(128);
788
789            if let Some(header) = x_for {
790                let prior_len = header.val.data(buf).len();
791                hdr_buf.extend_from_slice(header.val.data(buf));
792                let _ = write!(hdr_buf, ", {peer_ip}");
793                // Pre: we only ever extend the client-attested chain — the
794                // existing value stays a prefix and we appended the `, ip`
795                // separator, so the rewritten value is strictly longer and
796                // CR/LF-free (else the appended peer would split the header).
797                debug_assert!(
798                    hdr_buf.len() > prior_len,
799                    "X-Forwarded-For append must grow the value"
800                );
801                debug_assert!(
802                    is_crlf_free(&hdr_buf[prior_len..]),
803                    "the X-Forwarded-For hop we append must be CR/LF-free (anti-injection)"
804                );
805                header.val = kawa::Store::from_vec(std::mem::take(&mut hdr_buf));
806            }
807            if let Some(header) = &mut forwarded {
808                let prior_len = header.val.data(buf).len();
809                hdr_buf.extend_from_slice(header.val.data(buf));
810                write_forwarded_suffix(&mut hdr_buf, proto, peer_ip, peer_port, public_ip);
811                // Pre: same contract for the structured `Forwarded` chain —
812                // the existing value is preserved as a prefix and our suffix
813                // is CR/LF-free (`write_forwarded_suffix` asserts the suffix
814                // span too; this pins it relative to the prior value).
815                debug_assert!(
816                    hdr_buf.len() > prior_len,
817                    "Forwarded append must grow the value"
818                );
819                debug_assert!(
820                    is_crlf_free(&hdr_buf[prior_len..]),
821                    "the Forwarded element we append must be CR/LF-free (anti-injection)"
822                );
823                header.val = kawa::Store::from_vec(std::mem::take(&mut hdr_buf));
824            }
825
826            if !has_x_for {
827                let blocks_before = request.blocks.len();
828                let _ = write!(hdr_buf, "{peer_ip}");
829                debug_assert!(
830                    is_crlf_free(&hdr_buf),
831                    "a synthesised X-Forwarded-For value must be CR/LF-free"
832                );
833                request.push_block(kawa::Block::Header(kawa::Pair {
834                    key: kawa::Store::Static(b"X-Forwarded-For"),
835                    val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
836                }));
837                debug_assert_eq!(
838                    request.blocks.len(),
839                    blocks_before + 1,
840                    "creating X-Forwarded-For must push exactly one block"
841                );
842            }
843            if !has_forwarded {
844                let blocks_before = request.blocks.len();
845                hdr_buf.extend_from_slice(b"proto=");
846                hdr_buf.extend_from_slice(proto.as_bytes());
847                write_forwarded_for_by(&mut hdr_buf, peer_ip, peer_port, public_ip);
848                debug_assert!(
849                    is_crlf_free(&hdr_buf),
850                    "a synthesised Forwarded value must be CR/LF-free"
851                );
852                request.push_block(kawa::Block::Header(kawa::Pair {
853                    key: kawa::Store::Static(b"Forwarded"),
854                    val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
855                }));
856                debug_assert_eq!(
857                    request.blocks.len(),
858                    blocks_before + 1,
859                    "creating Forwarded must push exactly one block"
860                );
861            }
862
863            // Inject a proxy-generated `X-Real-IP` header carrying the
864            // peer IP (post-PROXY-v2 unwrap, so the original client IP
865            // even when the upstream presented PROXY-v2). Folded into the
866            // `if let Some(peer_addr)` arm so missing peers (raw socket,
867            // no PROXY-v2) skip the injection silently — identical to the
868            // X-Forwarded-For / Forwarded synthesis behaviour above. Any
869            // client-supplied `X-Real-IP` was either elided in the block
870            // walk (if `elide_x_real_ip` is on) or passes through; this
871            // header is appended last so order in the resulting block
872            // list is deterministic for tests.
873            if self.send_x_real_ip {
874                let blocks_before = request.blocks.len();
875                debug_assert!(
876                    hdr_buf.is_empty(),
877                    "the header scratch buffer was taken before reuse"
878                );
879                let _ = write!(hdr_buf, "{peer_ip}");
880                // The proxy-generated value is a rendered IpAddr — non-empty
881                // and CR/LF-free, so it cannot inject a second header.
882                debug_assert!(
883                    !hdr_buf.is_empty() && is_crlf_free(&hdr_buf),
884                    "the injected X-Real-IP value must be a CR/LF-free IP"
885                );
886                request.push_block(kawa::Block::Header(kawa::Pair {
887                    key: kawa::Store::Static(b"X-Real-IP"),
888                    val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
889                }));
890                debug_assert_eq!(
891                    request.blocks.len(),
892                    blocks_before + 1,
893                    "injecting X-Real-IP must push exactly one block"
894                );
895            }
896        }
897
898        #[cfg(feature = "opentelemetry")]
899        {
900            if !has_traceparent {
901                let val = build_traceparent(&otel.trace_id, &otel.span_id);
902                request.push_block(kawa::Block::Header(kawa::Pair {
903                    key: kawa::Store::Static(b"traceparent"),
904                    val: kawa::Store::from_slice(&val),
905                }));
906            }
907            self.otel = Some(otel);
908        }
909
910        if !has_x_port {
911            let mut port_buf = itoa::Buffer::new();
912            let port_str = port_buf.format(public_port);
913            request.push_block(kawa::Block::Header(kawa::Pair {
914                key: kawa::Store::Static(b"X-Forwarded-Port"),
915                val: kawa::Store::from_slice(port_str.as_bytes()),
916            }));
917        }
918        if !has_x_proto {
919            request.push_block(kawa::Block::Header(kawa::Pair {
920                key: kawa::Store::Static(b"X-Forwarded-Proto"),
921                val: kawa::Store::Static(proto.as_bytes()),
922            }));
923        }
924        // Create a "Connection" header in case it was not found and closing it set
925        if !has_connection && self.closing {
926            request.push_block(kawa::Block::Header(kawa::Pair {
927                key: kawa::Store::Static(b"Connection"),
928                val: kawa::Store::Static(b"close"),
929            }));
930        }
931        // Inject "X-Request-Id" derived from the request ULID when the client
932        // (or upstream LB) did not already supply one. When already present,
933        // the header is left untouched in the block list — preserving the
934        // client-supplied value end-to-end is the whole point of this header.
935        // Either way, `self.x_request_id` is populated so the access log
936        // records the exact value forwarded to the backend.
937        if has_x_request_id {
938            incr!(names::http::X_REQUEST_ID_PROPAGATED);
939        } else {
940            let value = self.id.to_string();
941            // The generated id is a ULID rendering — Crockford base-32, so
942            // CR/LF-free by construction.
943            debug_assert!(
944                is_crlf_free(value.as_bytes()),
945                "the generated X-Request-Id must be CR/LF-free"
946            );
947            request.push_block(kawa::Block::Header(kawa::Pair {
948                key: kawa::Store::Static(b"X-Request-Id"),
949                val: kawa::Store::from_string(value.clone()),
950            }));
951            self.x_request_id = Some(value);
952            incr!(names::http::X_REQUEST_ID_GENERATED);
953        }
954        // Either branch leaves the forwarded value recorded for the access
955        // log so it matches exactly what the backend receives.
956        debug_assert!(
957            self.x_request_id.is_some(),
958            "on_request_headers must record the forwarded X-Request-Id"
959        );
960
961        // Create a custom correlation header (defaults to "Sozu-Id", can be
962        // renamed via the `sozu_id_header` listener config knob).
963        let blocks_before_sozu_id = request.blocks.len();
964        request.push_block(kawa::Block::Header(kawa::Pair {
965            key: kawa::Store::from_string(self.sozu_id_header.clone()),
966            val: kawa::Store::from_string(self.id.to_string()),
967        }));
968        debug_assert_eq!(
969            request.blocks.len(),
970            blocks_before_sozu_id + 1,
971            "the Sozu-Id correlation header must be pushed exactly once"
972        );
973
974        // Postcondition: the whole edit only ever added or elided headers —
975        // the block count is monotonically non-decreasing (a removed header
976        // is elided in place, never popped), so the backend never loses a
977        // header it should have seen.
978        debug_assert!(
979            request.blocks.len() >= blocks_at_entry,
980            "header editing must never drop a block from the request"
981        );
982    }
983
984    /// Callback for response:
985    ///
986    /// - edit headers (connection, set-cookie, sozu-id)
987    /// - save information:
988    ///   - status code
989    ///   - reason
990    ///   - back keep-alive
991    fn on_response_headers(&mut self, response: &mut GenericHttpStream) {
992        // Like the request path, response editing only adds or elides — pin
993        // the entry count so the postcondition can assert "blocks only grow".
994        let blocks_at_entry = response.blocks.len();
995
996        let buf = &mut response.storage.mut_buffer();
997
998        // Captures the response line
999        if let kawa::StatusLine::Response { code, reason, .. } = &response.detached.status_line {
1000            self.status = Some(*code);
1001            self.reason = reason
1002                .data_opt(buf)
1003                .and_then(|data| from_utf8(data).ok())
1004                .map(ToOwned::to_owned);
1005        }
1006
1007        if self.method == Some(Method::Head) {
1008            response.parsing_phase = kawa::ParsingPhase::Terminated;
1009        }
1010
1011        // If found:
1012        // - set Connection to "close" if closing is set
1013        // - set keep_alive_backend to false if Connection is "close"
1014        for block in &mut response.blocks {
1015            match block {
1016                kawa::Block::Header(header) if !header.is_elided() => {
1017                    let key = header.key.data(buf);
1018                    if compare_no_case(key, b"connection") {
1019                        if self.closing {
1020                            header.val = kawa::Store::Static(b"close");
1021                        } else {
1022                            let val = header.val.data(buf);
1023                            self.keep_alive_backend &= !compare_no_case(val, b"close");
1024                        }
1025                    }
1026                }
1027                _ => {}
1028            }
1029        }
1030
1031        // If the sticky_session is set and differs from the one found in the request
1032        // create a "Set-Cookie" header to update the sticky_name value
1033        if let Some(sticky_session) = &self.sticky_session {
1034            if self.sticky_session != self.sticky_session_found {
1035                let blocks_before = response.blocks.len();
1036                let mut cookie_buf =
1037                    Vec::with_capacity(self.sticky_name.len() + 1 + sticky_session.len() + 8);
1038                cookie_buf.extend_from_slice(self.sticky_name.as_bytes());
1039                cookie_buf.push(b'=');
1040                cookie_buf.extend_from_slice(sticky_session.as_bytes());
1041                cookie_buf.extend_from_slice(b"; Path=/");
1042                // The cookie value is `name=session; Path=/`, built from the
1043                // proxy-controlled sticky name + session id — assert it is
1044                // well-formed (contains the `=` separator, ends with the
1045                // attribute suffix) and CR/LF-free so it cannot inject an
1046                // extra Set-Cookie / split the response (CWE-113).
1047                debug_assert!(
1048                    cookie_buf.contains(&b'='),
1049                    "the Set-Cookie value must carry the name=value separator"
1050                );
1051                debug_assert!(
1052                    cookie_buf.ends_with(b"; Path=/"),
1053                    "the Set-Cookie value must end with the Path attribute"
1054                );
1055                debug_assert!(
1056                    is_crlf_free(&cookie_buf),
1057                    "the synthesised Set-Cookie value must be CR/LF-free (anti-injection)"
1058                );
1059                response.push_block(kawa::Block::Header(kawa::Pair {
1060                    key: kawa::Store::Static(b"Set-Cookie"),
1061                    val: kawa::Store::from_vec(cookie_buf),
1062                }));
1063                debug_assert_eq!(
1064                    response.blocks.len(),
1065                    blocks_before + 1,
1066                    "synthesising Set-Cookie must push exactly one block"
1067                );
1068            }
1069        }
1070
1071        // Create a custom correlation header (defaults to "Sozu-Id", can be
1072        // renamed via the `sozu_id_header` listener config knob).
1073        let blocks_before_sozu_id = response.blocks.len();
1074        response.push_block(kawa::Block::Header(kawa::Pair {
1075            key: kawa::Store::from_string(self.sozu_id_header.clone()),
1076            val: kawa::Store::from_string(self.id.to_string()),
1077        }));
1078        debug_assert_eq!(
1079            response.blocks.len(),
1080            blocks_before_sozu_id + 1,
1081            "the Sozu-Id correlation header must be pushed exactly once"
1082        );
1083
1084        // Postcondition: response editing only added or elided headers, so
1085        // the block count never decreased.
1086        debug_assert!(
1087            response.blocks.len() >= blocks_at_entry,
1088            "header editing must never drop a block from the response"
1089        );
1090    }
1091
1092    pub fn reset(&mut self) {
1093        // Snapshot the connection-scoped identity + TLS/listener fields that
1094        // reset() must NOT touch (set once at handshake, reused across every
1095        // keep-alive request). Cheap to copy — all `Copy`. Read only inside
1096        // the postcondition `debug_assert!`s below, so dead code in release.
1097        let session_id_before = self.session_id;
1098        let id_before = self.id;
1099        let strict_sni_before = self.strict_sni_binding;
1100        let elide_before = self.elide_x_real_ip;
1101        let send_before = self.send_x_real_ip;
1102        let tls_version_before = self.tls_version;
1103        let tls_cipher_before = self.tls_cipher;
1104        let tls_alpn_before = self.tls_alpn;
1105
1106        self.keep_alive_backend = true;
1107        self.keep_alive_frontend = true;
1108        self.sticky_session_found = None;
1109        self.method = None;
1110        self.authority = None;
1111        self.path = None;
1112        self.status = None;
1113        self.reason = None;
1114        self.user_agent = None;
1115        self.x_request_id = None;
1116        self.xff_chain = None;
1117        self.redirect_location = None;
1118        self.www_authenticate = None;
1119        self.original_authority = None;
1120        self.headers_response.clear();
1121        // Note: tls_server_name, tls_version, tls_cipher, tls_alpn,
1122        // strict_sni_binding, elide_x_real_ip, send_x_real_ip are
1123        // connection-scoped — set once at handshake completion and reused
1124        // across every keep-alive request, so reset() intentionally leaves
1125        // them in place.
1126
1127        // Post: request-scoped state is fully cleared (a stale value here
1128        // would leak across pipelined requests on the same connection).
1129        debug_assert!(
1130            self.method.is_none()
1131                && self.authority.is_none()
1132                && self.path.is_none()
1133                && self.status.is_none()
1134                && self.x_request_id.is_none()
1135                && self.headers_response.is_empty(),
1136            "reset() must clear all request-scoped state"
1137        );
1138        debug_assert!(
1139            self.keep_alive_backend && self.keep_alive_frontend,
1140            "reset() must restore keep-alive to its optimistic default"
1141        );
1142        // Post: connection-scoped identity + TLS/listener knobs are untouched.
1143        debug_assert_eq!(
1144            self.session_id, session_id_before,
1145            "reset() must preserve the connection session id"
1146        );
1147        debug_assert_eq!(self.id, id_before, "reset() must preserve the request id");
1148        debug_assert!(
1149            self.strict_sni_binding == strict_sni_before
1150                && self.elide_x_real_ip == elide_before
1151                && self.send_x_real_ip == send_before
1152                && self.tls_version == tls_version_before
1153                && self.tls_cipher == tls_cipher_before
1154                && self.tls_alpn == tls_alpn_before,
1155            "reset() must preserve connection-scoped TLS/listener knobs"
1156        );
1157    }
1158
1159    pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
1160        let given_method = self.method.as_ref().ok_or(RetrieveClusterError::NoMethod)?;
1161        let given_authority = self
1162            .authority
1163            .as_deref()
1164            .ok_or(RetrieveClusterError::NoHost)?;
1165        let given_path = self.path.as_deref().ok_or(RetrieveClusterError::NoPath)?;
1166
1167        // Post: the triple is returned in (authority, path, method) order —
1168        // pin it against a future field-swap regression that would route to
1169        // the wrong cluster. (Both fields can be empty strings on the wire,
1170        // so we assert the mapping, not non-emptiness.)
1171        debug_assert!(
1172            std::ptr::eq(given_authority, self.authority.as_deref().unwrap())
1173                && std::ptr::eq(given_path, self.path.as_deref().unwrap()),
1174            "extract_route must return (authority, path, method) in order"
1175        );
1176        Ok((given_authority, given_path, given_method))
1177    }
1178
1179    pub fn get_route(&self) -> String {
1180        if let Some(method) = &self.method {
1181            if let Some(authority) = &self.authority {
1182                if let Some(path) = &self.path {
1183                    return format!("{method} {authority}{path}");
1184                }
1185                return format!("{method} {authority}");
1186            }
1187            return format!("{method}");
1188        }
1189        String::new()
1190    }
1191
1192    pub fn websocket_context(&self) -> WebSocketContext {
1193        WebSocketContext::Http {
1194            method: self.method.clone(),
1195            authority: self.authority.clone(),
1196            path: self.path.clone(),
1197            reason: self.reason.clone(),
1198            status: self.status,
1199        }
1200    }
1201
1202    pub fn log_context(&self) -> LogContext<'_> {
1203        let ctx = LogContext {
1204            session_id: self.session_id,
1205            request_id: Some(self.id),
1206            cluster_id: self.cluster_id.as_deref(),
1207            backend_id: self.backend_id.as_deref(),
1208        };
1209        // The access-log bracket `[session req cluster backend]` is keyed on
1210        // session + request id; both must always be present so the log line
1211        // is correlatable. cluster/backend are legitimately absent before
1212        // routing, so they are not asserted here.
1213        debug_assert!(
1214            ctx.request_id.is_some(),
1215            "log_context must always carry the request id for correlation"
1216        );
1217        ctx
1218    }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223    use super::*;
1224    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1225
1226    /// Helper to create a minimal HttpContext for testing.
1227    fn make_context() -> HttpContext {
1228        HttpContext::new(
1229            Ulid::generate(),
1230            Ulid::generate(),
1231            Protocol::HTTP,
1232            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
1233            Some(SocketAddr::new(
1234                IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
1235                54321,
1236            )),
1237            "SERVERID".to_owned(),
1238            "Sozu-Id".to_owned(),
1239            false,
1240            false,
1241        )
1242    }
1243
1244    // ── sozu_id_header ──────────────────────────────────────────────────
1245
1246    #[test]
1247    fn test_sozu_id_header_default_name_stored_on_context() {
1248        // The make_context helper uses the documented default "Sozu-Id" to
1249        // match the trait default on `L7ListenerHandler::get_sozu_id_header`.
1250        let ctx = make_context();
1251        assert_eq!(ctx.sozu_id_header, "Sozu-Id");
1252    }
1253
1254    #[test]
1255    fn test_sozu_id_header_custom_name_stored_on_context() {
1256        // Operator-provided rename is carried verbatim onto the HttpContext.
1257        let ctx = HttpContext::new(
1258            Ulid::generate(),
1259            Ulid::generate(),
1260            Protocol::HTTP,
1261            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
1262            None,
1263            "SERVERID".to_owned(),
1264            "X-Edge-Id".to_owned(),
1265            false,
1266            false,
1267        );
1268        assert_eq!(ctx.sozu_id_header, "X-Edge-Id");
1269    }
1270
1271    // ── extract_route ──────────────────────────────────────────────────
1272
1273    #[test]
1274    fn test_extract_route_all_present() {
1275        let mut ctx = make_context();
1276        ctx.method = Some(Method::Get);
1277        ctx.authority = Some("example.com".to_owned());
1278        ctx.path = Some("/index.html".to_owned());
1279
1280        let (authority, path, method) = ctx.extract_route().unwrap();
1281        assert_eq!(authority, "example.com");
1282        assert_eq!(path, "/index.html");
1283        assert_eq!(method, &Method::Get);
1284    }
1285
1286    #[test]
1287    fn test_extract_route_no_method() {
1288        let mut ctx = make_context();
1289        ctx.authority = Some("example.com".to_owned());
1290        ctx.path = Some("/".to_owned());
1291
1292        let err = ctx.extract_route().unwrap_err();
1293        assert!(matches!(err, RetrieveClusterError::NoMethod));
1294    }
1295
1296    #[test]
1297    fn test_extract_route_no_host() {
1298        let mut ctx = make_context();
1299        ctx.method = Some(Method::Get);
1300        ctx.path = Some("/".to_owned());
1301
1302        let err = ctx.extract_route().unwrap_err();
1303        assert!(matches!(err, RetrieveClusterError::NoHost));
1304    }
1305
1306    #[test]
1307    fn test_extract_route_no_path() {
1308        let mut ctx = make_context();
1309        ctx.method = Some(Method::Get);
1310        ctx.authority = Some("example.com".to_owned());
1311
1312        let err = ctx.extract_route().unwrap_err();
1313        assert!(matches!(err, RetrieveClusterError::NoPath));
1314    }
1315
1316    // ── get_route ──────────────────────────────────────────────────────
1317
1318    #[test]
1319    fn test_get_route_all_present() {
1320        let mut ctx = make_context();
1321        ctx.method = Some(Method::Get);
1322        ctx.authority = Some("example.com".to_owned());
1323        ctx.path = Some("/api/v1".to_owned());
1324
1325        assert_eq!(ctx.get_route(), "GET example.com/api/v1");
1326    }
1327
1328    #[test]
1329    fn test_get_route_method_and_authority_only() {
1330        let mut ctx = make_context();
1331        ctx.method = Some(Method::Post);
1332        ctx.authority = Some("example.com".to_owned());
1333
1334        assert_eq!(ctx.get_route(), "POST example.com");
1335    }
1336
1337    #[test]
1338    fn test_get_route_method_only() {
1339        let mut ctx = make_context();
1340        ctx.method = Some(Method::Delete);
1341
1342        assert_eq!(ctx.get_route(), "DELETE");
1343    }
1344
1345    #[test]
1346    fn test_get_route_empty() {
1347        let ctx = make_context();
1348        assert_eq!(ctx.get_route(), "");
1349    }
1350
1351    // ── reset ──────────────────────────────────────────────────────────
1352
1353    #[test]
1354    fn test_reset_clears_request_response_state() {
1355        let mut ctx = make_context();
1356        ctx.keep_alive_backend = false;
1357        ctx.keep_alive_frontend = false;
1358        ctx.sticky_session_found = Some("abc123".to_owned());
1359        ctx.method = Some(Method::Post);
1360        ctx.authority = Some("example.com".to_owned());
1361        ctx.path = Some("/upload".to_owned());
1362        ctx.status = Some(200);
1363        ctx.reason = Some("OK".to_owned());
1364        ctx.user_agent = Some("curl/7.81".to_owned());
1365        ctx.x_request_id = Some("client-xrid-123".to_owned());
1366        ctx.xff_chain = Some("203.0.113.5, 198.51.100.10".to_owned());
1367        ctx.redirect_location = Some("https://example.com/".to_owned());
1368        ctx.www_authenticate = Some("Basic realm=\"sozu\"".to_owned());
1369        ctx.original_authority = Some("old.example.com".to_owned());
1370        ctx.headers_response.push(HeaderEditSnapshot {
1371            key: b"X-Cache".to_vec(),
1372            val: b"HIT".to_vec(),
1373            mode: HeaderEditMode::Append,
1374        });
1375
1376        ctx.reset();
1377
1378        assert!(ctx.keep_alive_backend);
1379        assert!(ctx.keep_alive_frontend);
1380        assert!(ctx.sticky_session_found.is_none());
1381        assert!(ctx.method.is_none());
1382        assert!(ctx.authority.is_none());
1383        assert!(ctx.path.is_none());
1384        assert!(ctx.status.is_none());
1385        assert!(ctx.reason.is_none());
1386        assert!(ctx.user_agent.is_none());
1387        assert!(ctx.x_request_id.is_none());
1388        assert!(ctx.xff_chain.is_none());
1389        // The four stash slots written by the routing layer must clear
1390        // between pipelined H1 requests; otherwise a future code path that
1391        // emits a 301 / 401 default-answer without re-routing would
1392        // inherit a stale Location / WWW-Authenticate from a prior request,
1393        // or the backend would receive a stale X-Forwarded-Host or a stale
1394        // response-side header edit.
1395        assert!(ctx.redirect_location.is_none());
1396        assert!(ctx.www_authenticate.is_none());
1397        assert!(ctx.original_authority.is_none());
1398        assert!(ctx.headers_response.is_empty());
1399    }
1400
1401    #[test]
1402    fn test_reset_preserves_tls_metadata() {
1403        // TLS metadata is connection-scoped (set once at handshake, reused
1404        // across every keep-alive request) — reset() must leave it intact
1405        // so the access log of the second request still carries it.
1406        let mut ctx = make_context();
1407        ctx.tls_server_name = Some("example.com".to_owned());
1408        ctx.tls_version = Some("TLSv1.3");
1409        ctx.tls_cipher = Some("TLS_AES_128_GCM_SHA256");
1410        ctx.tls_alpn = Some("h2");
1411        ctx.strict_sni_binding = false;
1412
1413        ctx.reset();
1414
1415        assert_eq!(ctx.tls_server_name.as_deref(), Some("example.com"));
1416        assert_eq!(ctx.tls_version, Some("TLSv1.3"));
1417        assert_eq!(ctx.tls_cipher, Some("TLS_AES_128_GCM_SHA256"));
1418        assert_eq!(ctx.tls_alpn, Some("h2"));
1419        assert!(!ctx.strict_sni_binding);
1420    }
1421
1422    #[test]
1423    fn test_reset_preserves_connection_state() {
1424        let mut ctx = make_context();
1425        ctx.closing = true;
1426        ctx.cluster_id = Some("cluster-1".to_owned());
1427        ctx.backend_id = Some("backend-1".to_owned());
1428        ctx.sticky_session = Some("session-abc".to_owned());
1429
1430        let original_id = ctx.id;
1431        let original_protocol = ctx.protocol;
1432        let original_public_address = ctx.public_address;
1433
1434        ctx.reset();
1435
1436        // Connection-level state is preserved
1437        assert!(ctx.closing);
1438        assert_eq!(ctx.cluster_id.as_deref(), Some("cluster-1"));
1439        assert_eq!(ctx.backend_id.as_deref(), Some("backend-1"));
1440        assert_eq!(ctx.sticky_session.as_deref(), Some("session-abc"));
1441        assert_eq!(ctx.id, original_id);
1442        assert_eq!(ctx.protocol, original_protocol);
1443        assert_eq!(ctx.public_address, original_public_address);
1444    }
1445
1446    // ── write_forwarded_for_by (RFC 7239 §6 IP-literal bracketing) ──────
1447    //
1448    // Locks in the IPv6 bracket+quote contract that matches HAProxy
1449    // (`_7239_print_ip6` in `src/http_ext.c`) and prevents regression to
1450    // the ambiguous `for="2001:db8::1:8080"` form flagged in issue #1254.
1451
1452    use std::net::Ipv6Addr;
1453
1454    fn render_for_by(peer: SocketAddr, public_ip: IpAddr) -> String {
1455        let mut buf = Vec::new();
1456        write_forwarded_for_by(&mut buf, peer.ip(), peer.port(), public_ip);
1457        String::from_utf8(buf).expect("forwarded fragment must be ASCII")
1458    }
1459
1460    fn render_suffix(proto: &str, peer: SocketAddr, public_ip: IpAddr) -> String {
1461        let mut buf = Vec::new();
1462        write_forwarded_suffix(&mut buf, proto, peer.ip(), peer.port(), public_ip);
1463        String::from_utf8(buf).expect("forwarded fragment must be ASCII")
1464    }
1465
1466    #[test]
1467    fn test_forwarded_ipv4_peer_ipv4_by_unchanged() {
1468        // IPv4 must keep the pre-fix wire format: unquoted `by=`, bare IPv4 in
1469        // `for=`. Sōzu emits `for=` quoted unconditionally (HAProxy emits it
1470        // quoted only when a port is attached; we always attach a port).
1471        let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7)), 54321);
1472        let public_ip = IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1));
1473        assert_eq!(
1474            render_for_by(peer, public_ip),
1475            r#";for="203.0.113.7:54321";by=198.51.100.1"#,
1476        );
1477    }
1478
1479    #[test]
1480    fn test_forwarded_ipv6_peer_brackets_disambiguate_port() {
1481        // Regression for issue #1254: without brackets, `for="2001:db8::1:8080"`
1482        // is ambiguous (could parse as `::1` + port `8080` or `:1:8080` literal).
1483        let peer = SocketAddr::new(IpAddr::V6("2001:db8::1".parse::<Ipv6Addr>().unwrap()), 8080);
1484        let public_ip = IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1));
1485        assert_eq!(
1486            render_for_by(peer, public_ip),
1487            r#";for="[2001:db8::1]:8080";by=198.51.100.1"#,
1488        );
1489    }
1490
1491    #[test]
1492    fn test_forwarded_ipv6_public_address_bracketed_and_quoted() {
1493        // `by=` carries no port but RFC 7239 §6 still requires the IP-literal
1494        // brackets for IPv6, and the value must be quoted because the literal
1495        // contains `:`.
1496        let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7)), 54321);
1497        let public_ip = IpAddr::V6("2001:db8::2".parse::<Ipv6Addr>().unwrap());
1498        assert_eq!(
1499            render_for_by(peer, public_ip),
1500            r#";for="203.0.113.7:54321";by="[2001:db8::2]""#,
1501        );
1502    }
1503
1504    #[test]
1505    fn test_forwarded_ipv6_peer_and_public_both_bracketed() {
1506        let peer = SocketAddr::new(IpAddr::V6("2001:db8::1".parse::<Ipv6Addr>().unwrap()), 8080);
1507        let public_ip = IpAddr::V6("2001:db8::2".parse::<Ipv6Addr>().unwrap());
1508        assert_eq!(
1509            render_for_by(peer, public_ip),
1510            r#";for="[2001:db8::1]:8080";by="[2001:db8::2]""#,
1511        );
1512    }
1513
1514    #[test]
1515    fn test_forwarded_suffix_prepends_proto_and_brackets_ipv6() {
1516        // The suffix form is what we append when an inbound `Forwarded`
1517        // header already exists. Same bracketing contract must hold.
1518        let peer = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 443);
1519        let public_ip = IpAddr::V6("fe80::1".parse::<Ipv6Addr>().unwrap());
1520        assert_eq!(
1521            render_suffix("https", peer, public_ip),
1522            r#", proto=https;for="[::1]:443";by="[fe80::1]""#,
1523        );
1524    }
1525
1526    // ── traceparent / opentelemetry helpers ─────────────────────────────
1527
1528    #[cfg(feature = "opentelemetry")]
1529    mod otel {
1530        use super::super::*;
1531
1532        #[test]
1533        fn test_parse_hex_valid() {
1534            let (val, rest) = parse_hex::<4>(b"abcd1234").unwrap();
1535            assert_eq!(&val, b"abcd");
1536            assert_eq!(rest, b"1234");
1537        }
1538
1539        #[test]
1540        fn test_parse_hex_exact_length() {
1541            let (val, rest) = parse_hex::<8>(b"01234567").unwrap();
1542            assert_eq!(&val, b"01234567");
1543            assert!(rest.is_empty());
1544        }
1545
1546        #[test]
1547        fn test_parse_hex_too_short() {
1548            assert!(parse_hex::<4>(b"ab").is_none());
1549        }
1550
1551        #[test]
1552        fn test_parse_hex_rejects_non_hex() {
1553            assert!(parse_hex::<4>(b"ghij").is_none());
1554        }
1555
1556        #[test]
1557        fn test_parse_hex_rejects_uppercase_is_ok() {
1558            // Uppercase hex digits are valid
1559            let (val, _) = parse_hex::<4>(b"ABCD").unwrap();
1560            assert_eq!(&val, b"ABCD");
1561        }
1562
1563        #[test]
1564        fn test_skip_separator_valid() {
1565            let rest = skip_separator(b"-hello").unwrap();
1566            assert_eq!(rest, b"hello");
1567        }
1568
1569        #[test]
1570        fn test_skip_separator_wrong_char() {
1571            assert!(skip_separator(b"+hello").is_none());
1572        }
1573
1574        #[test]
1575        fn test_skip_separator_empty() {
1576            assert!(skip_separator(b"").is_none());
1577        }
1578
1579        #[test]
1580        fn test_build_traceparent_format() {
1581            let trace_id: [u8; 32] = *b"4bf92f3577b34da6a3ce929d0e0e4736";
1582            let parent_id: [u8; 16] = *b"00f067aa0ba902b7";
1583
1584            let result = build_traceparent(&trace_id, &parent_id);
1585            assert_eq!(
1586                &result,
1587                b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
1588            );
1589        }
1590
1591        #[test]
1592        fn test_build_traceparent_length() {
1593            let trace_id = [b'a'; 32];
1594            let parent_id = [b'b'; 16];
1595            let result = build_traceparent(&trace_id, &parent_id);
1596            // Format: "00-" (3) + trace_id (32) + "-" (1) + parent_id (16) + "-01" (3) = 55
1597            assert_eq!(result.len(), 55);
1598        }
1599
1600        #[test]
1601        fn test_parse_traceparent_valid() {
1602            let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1603            let store = kawa::Store::Static(input);
1604            let (trace_id, parent_id) = parse_traceparent(&store, input).unwrap();
1605            assert_eq!(&trace_id, b"4bf92f3577b34da6a3ce929d0e0e4736");
1606            assert_eq!(&parent_id, b"00f067aa0ba902b7");
1607        }
1608
1609        #[test]
1610        fn test_parse_traceparent_sampled_flag_zero() {
1611            let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00";
1612            let store = kawa::Store::Static(input);
1613            let result = parse_traceparent(&store, input);
1614            assert!(result.is_some());
1615        }
1616
1617        #[test]
1618        fn test_parse_traceparent_wrong_version() {
1619            let input = b"01-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1620            let store = kawa::Store::Static(input);
1621            assert!(parse_traceparent(&store, input).is_none());
1622        }
1623
1624        #[test]
1625        fn test_parse_traceparent_too_short() {
1626            let input = b"00-4bf9";
1627            let store = kawa::Store::Static(input);
1628            assert!(parse_traceparent(&store, input).is_none());
1629        }
1630
1631        #[test]
1632        fn test_parse_traceparent_trailing_data() {
1633            // Extra characters after the trace-flags should cause rejection
1634            let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01-extra";
1635            let store = kawa::Store::Static(input);
1636            assert!(parse_traceparent(&store, input).is_none());
1637        }
1638
1639        #[test]
1640        fn test_parse_traceparent_missing_separator() {
1641            let input = b"004bf92f3577b34da6a3ce929d0e0e473600f067aa0ba902b701";
1642            let store = kawa::Store::Static(input);
1643            assert!(parse_traceparent(&store, input).is_none());
1644        }
1645
1646        #[test]
1647        fn test_parse_build_roundtrip() {
1648            let trace_id: [u8; 32] = *b"4bf92f3577b34da6a3ce929d0e0e4736";
1649            let parent_id: [u8; 16] = *b"00f067aa0ba902b7";
1650
1651            // Build a traceparent from known IDs
1652            let built = build_traceparent(&trace_id, &parent_id);
1653
1654            // Verify that the built value matches the expected static string,
1655            // then parse that static string back to confirm roundtrip.
1656            let expected: &[u8] = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1657            assert_eq!(&built[..], expected);
1658
1659            let store = kawa::Store::Static(expected);
1660            let (parsed_trace_id, parsed_parent_id) = parse_traceparent(&store, expected).unwrap();
1661
1662            assert_eq!(parsed_trace_id, trace_id);
1663            assert_eq!(parsed_parent_id, parent_id);
1664        }
1665    }
1666}