Skip to main content

sozu_lib/protocol/kawa_h1/
editor.rs

1//! H1 request/response header editor.
2//!
3//! Captures method/authority/path on parse, rewrites hop-by-hop and
4//! forwarding headers (`X-Forwarded-*`, `Forwarded`, `Connection`,
5//! WebSocket-upgrade signalling, optional `traceparent`), and surfaces the
6//! canonical `LogContext` used by the access-log envelope. Acts as the
7//! Kawa `ParserCallbacks` implementation for the H1 mux path.
8
9use std::{
10    io::Write as _,
11    net::{IpAddr, SocketAddr},
12    str::from_utf8,
13    sync::Arc,
14};
15
16use rusty_ulid::Ulid;
17use sozu_command_lib::logging::LogContext;
18
19use crate::metrics::names;
20use crate::{
21    Protocol, RetrieveClusterError,
22    pool::Checkout,
23    protocol::{
24        http::{GenericHttpStream, Method, parser::compare_no_case},
25        pipe::WebSocketContext,
26    },
27};
28
29#[cfg(feature = "opentelemetry")]
30fn parse_traceparent(val: &kawa::Store, buf: &[u8]) -> Option<([u8; 32], [u8; 16])> {
31    let val = val.data(buf);
32    let (version, val) = parse_hex::<2>(val)?;
33    if version.as_slice() != b"00" {
34        return None;
35    }
36    let val = skip_separator(val)?;
37    let (trace_id, val) = parse_hex::<32>(val)?;
38    let val = skip_separator(val)?;
39    let (parent_id, val) = parse_hex::<16>(val)?;
40    let val = skip_separator(val)?;
41    let (_, val) = parse_hex::<2>(val)?;
42    val.is_empty().then_some((trace_id, parent_id))
43}
44
45#[cfg(feature = "opentelemetry")]
46fn parse_hex<const N: usize>(buf: &[u8]) -> Option<([u8; N], &[u8])> {
47    let val: [u8; N] = buf.get(..N)?.try_into().unwrap();
48    val.iter()
49        .all(|c| c.is_ascii_hexdigit())
50        .then_some((val, &buf[N..]))
51}
52
53#[cfg(feature = "opentelemetry")]
54fn skip_separator(buf: &[u8]) -> Option<&[u8]> {
55    buf.first().filter(|b| **b == b'-').map(|_| &buf[1..])
56}
57
58#[cfg(feature = "opentelemetry")]
59fn random_id<const N: usize>() -> [u8; N] {
60    use rand::RngExt;
61    const CHARSET: &[u8] = b"0123456789abcdef";
62    let mut rng = rand::rng();
63    let mut buf = [0; N];
64    buf.fill_with(|| {
65        let n = rng.random_range(0..CHARSET.len());
66        CHARSET[n]
67    });
68    buf
69}
70
71#[cfg(feature = "opentelemetry")]
72fn build_traceparent(trace_id: &[u8; 32], parent_id: &[u8; 16]) -> [u8; 55] {
73    let mut buf = [0; 55];
74    buf[..3].copy_from_slice(b"00-");
75    buf[3..35].copy_from_slice(trace_id);
76    buf[35] = b'-';
77    buf[36..52].copy_from_slice(parent_id);
78    buf[52..55].copy_from_slice(b"-01");
79    buf
80}
81
82/// Write the ";for=..;by=.." portion of a Forwarded header into `buf`
83/// without heap-allocating a `String`.
84///
85/// RFC 7239 §6 requires IPv6 literals to be enclosed in square brackets
86/// inside a quoted string (the `IP-literal` production is `"[" IPv6address "]"`).
87/// Emitting a bare IPv6 like `for="2001:db8::1:8080"` is ambiguous: the
88/// trailing `:1:8080` could parse as `::1` + port `8080` or as the literal
89/// `:1:8080` with no port. Matches HAProxy's behaviour
90/// (`_7239_print_ip6` in `src/http_ext.c`), which is the de-facto reference
91/// implementation of RFC 7239 in the proxy world. See sozu issue #1254.
92fn write_forwarded_for_by(buf: &mut Vec<u8>, peer_ip: IpAddr, peer_port: u16, public_ip: IpAddr) {
93    buf.extend_from_slice(b";for=\"");
94    write_ip_literal(buf, peer_ip);
95    buf.push(b':');
96    let mut port_buf = itoa::Buffer::new();
97    buf.extend_from_slice(port_buf.format(peer_port).as_bytes());
98    buf.extend_from_slice(b"\";by=");
99    match public_ip {
100        IpAddr::V4(_) => {
101            let _ = write!(buf, "{public_ip}");
102        }
103        IpAddr::V6(_) => {
104            buf.push(b'"');
105            write_ip_literal(buf, public_ip);
106            buf.push(b'"');
107        }
108    }
109}
110
111/// Write an `IP-literal` per RFC 3986 §3.2.2 / RFC 7239 §6: IPv4 verbatim,
112/// IPv6 wrapped in `[...]`. Caller is responsible for any surrounding quotes
113/// required by the embedding syntax (RFC 7239 mandates them for IPv6).
114fn write_ip_literal(buf: &mut Vec<u8>, ip: IpAddr) {
115    match ip {
116        IpAddr::V4(_) => {
117            let _ = write!(buf, "{ip}");
118        }
119        IpAddr::V6(_) => {
120            buf.push(b'[');
121            let _ = write!(buf, "{ip}");
122            buf.push(b']');
123        }
124    }
125}
126
127/// Write ", proto=<proto>;for=..;by=.." (the suffix appended to an existing Forwarded value).
128fn write_forwarded_suffix(
129    buf: &mut Vec<u8>,
130    proto: &str,
131    peer_ip: IpAddr,
132    peer_port: u16,
133    public_ip: IpAddr,
134) {
135    buf.extend_from_slice(b", proto=");
136    buf.extend_from_slice(proto.as_bytes());
137    write_forwarded_for_by(buf, peer_ip, peer_port, public_ip);
138}
139
140/// This is the container used to store and use information about the session from within a Kawa parser callback
141#[derive(Debug)]
142pub struct HttpContext {
143    // ========== Write only
144    /// set to false if Kawa finds a "Connection" header with a "close" value in the response
145    pub keep_alive_backend: bool,
146    /// set to false if Kawa finds a "Connection" header with a "close" value in the request
147    pub keep_alive_frontend: bool,
148    /// the value of the sticky session cookie in the request
149    pub sticky_session_found: Option<String>,
150    // ---------- Status Line
151    /// the value of the method in the request line
152    pub method: Option<Method>,
153    /// the value of the authority of the request (in the request line of "Host" header)
154    pub authority: Option<String>,
155    /// the value of the path in the request line
156    pub path: Option<String>,
157    /// the value of the status code in the response line
158    pub status: Option<u16>,
159    /// the value of the reason in the response line
160    pub reason: Option<String>,
161    // ---------- Additional optional data
162    pub user_agent: Option<String>,
163    /// Value of the `x-request-id` header observed (if propagated from the
164    /// client/upstream LB) or generated (from `self.id`). Universal correlation
165    /// header — populated unconditionally by `on_request_headers` so the access
166    /// log can record the exact value forwarded to the backend.
167    pub x_request_id: Option<String>,
168    /// Verbatim value of the client-supplied `X-Forwarded-For` header as
169    /// observed before Sōzu appended its own hop. Captured here, not at
170    /// request edit time, so the access log records the upstream-attested
171    /// chain even when Sōzu also appends its own peer to the forwarded
172    /// header. `None` if the request had no `X-Forwarded-For` header.
173    pub xff_chain: Option<String>,
174
175    #[cfg(feature = "opentelemetry")]
176    pub otel: Option<sozu_command::logging::OpenTelemetry>,
177
178    // ========== Read only
179    /// signals wether Kawa should write a "Connection" header with a "close" value (request and response)
180    pub closing: bool,
181    /// Connection/session ULID — stable across all requests multiplexed on this
182    /// TCP or TLS connection. Used as the first slot in the legacy log-context
183    /// bracket `[session req cluster backend]` and emitted into
184    /// `ProtobufAccessLog.session_id`.
185    pub session_id: Ulid,
186    /// the value of the custom header, named "Sozu-Id", that Kawa should write (request and response)
187    pub id: Ulid,
188    pub backend_id: Option<String>,
189    pub cluster_id: Option<String>,
190    /// the value of the protocol Kawa should write in the Forwarded headers of the request
191    pub protocol: Protocol,
192    /// the value of the public address Kawa should write in the Forwarded headers of the request
193    pub public_address: SocketAddr,
194    /// the value of the session address Kawa should write in the Forwarded headers of the request
195    pub session_address: Option<SocketAddr>,
196    /// the name of the cookie Kawa should read from the request to get the sticky session
197    pub sticky_name: String,
198    /// the sticky session that should be used
199    /// used to create a "Set-Cookie" header in the response in case it differs from sticky_session_found
200    pub sticky_session: Option<String>,
201    /// the address of the backend server
202    pub backend_address: Option<SocketAddr>,
203    /// The TLS Server Name Indication (SNI) hostname negotiated at handshake.
204    ///
205    /// Populated for HTTPS listeners when the client sent an SNI extension (see
206    /// `https.rs::upgrade_handshake`). Used by the routing layer to enforce the
207    /// TLS trust boundary against the HTTP `:authority` / `Host` header — without
208    /// this check, an attacker holding a valid certificate for tenant A could
209    /// open TLS with SNI=A then send requests with `:authority=tenantB` and
210    /// reach tenant B's backend (CWE-346 / CWE-444).
211    ///
212    /// `None` when the listener is plaintext HTTP or the client omitted SNI.
213    /// Stored pre-lowercased and without a port for direct exact-match comparison.
214    pub tls_server_name: Option<String>,
215    /// Snapshot of the SAN set of the certificate Sōzu actually served at
216    /// the TLS handshake. Captured once in `https.rs::upgrade_handshake`
217    /// from the resolver and frozen for the connection lifetime so H2
218    /// stream coalescing (RFC 7540 §9.1.1 / RFC 9113 §9.1.1) accepts any
219    /// `:authority` covered by the certificate, with RFC 6125 §6.4.3
220    /// wildcard handling. `None` for plaintext listeners or when SNI was
221    /// absent (router falls back to the legacy exact-match predicate).
222    /// `Some(empty)` when the default cert was served — every
223    /// `:authority` is rejected. `Arc` so the snapshot is shared across
224    /// every per-stream `HttpContext` without re-allocation.
225    pub tls_cert_names: Option<Arc<Vec<String>>>,
226    /// Whether the router must reject this request when `tls_server_name`
227    /// does not exact-match its authority (CWE-346 / CWE-444). Mirrors
228    /// `HttpsListenerConfig::strict_sni_binding`. Set from the mux
229    /// `Context` at stream creation time (see `Context::create_stream`).
230    /// Plaintext listeners still never hit the check because
231    /// `tls_server_name` is `None`.
232    pub strict_sni_binding: bool,
233    /// When `true`, the request-side block walk in `on_request_headers`
234    /// strips any client-supplied `X-Real-IP` header before forwarding
235    /// (anti-spoofing). Mirrors `HttpListenerConfig::elide_x_real_ip` /
236    /// `HttpsListenerConfig::elide_x_real_ip`. Set from the mux `Context`
237    /// at stream creation (see `Context::create_stream`); listener-scoped
238    /// and never reset across keep-alive requests. Independent of
239    /// `send_x_real_ip`. The same flag is plumbed into
240    /// `pkawa::handle_trailer` so trailer HEADERS frames cannot bypass
241    /// the elision.
242    pub elide_x_real_ip: bool,
243    /// When `true`, `on_request_headers` injects a proxy-generated
244    /// `X-Real-IP` header carrying `session_address.ip()` (post-PROXY-v2
245    /// unwrap, i.e. the original client IP). Mirrors
246    /// `HttpListenerConfig::send_x_real_ip` /
247    /// `HttpsListenerConfig::send_x_real_ip`. Set from the mux `Context`
248    /// at stream creation (see `Context::create_stream`); listener-scoped
249    /// and never reset across keep-alive requests. Independent of
250    /// `elide_x_real_ip`. When `session_address` is `None` (raw socket
251    /// without a peer), no header is appended — identical to the
252    /// existing X-Forwarded-For / Forwarded synthesis behaviour.
253    pub send_x_real_ip: bool,
254    /// Negotiated TLS protocol version as a short label (e.g. `"TLSv1.3"`).
255    /// Captured from `rustls_version_label` at handshake completion and
256    /// propagated from the mux `Context`. `None` for plaintext listeners.
257    pub tls_version: Option<&'static str>,
258    /// Negotiated TLS cipher suite as a short label (e.g.
259    /// `"TLS_AES_128_GCM_SHA256"`). Captured from `rustls_ciphersuite_label`
260    /// at handshake completion and propagated from the mux `Context`. `None`
261    /// for plaintext listeners.
262    pub tls_cipher: Option<&'static str>,
263    /// Negotiated ALPN protocol (e.g. `"h2"`, `"http/1.1"`). Captured from
264    /// rustls at handshake completion and propagated from the mux `Context`.
265    /// `None` for plaintext listeners or when no ALPN was negotiated.
266    pub tls_alpn: Option<&'static str>,
267    /// Name of the correlation header Sozu injects into every request and
268    /// response. Defaults to `"Sozu-Id"` via [`L7ListenerHandler::get_sozu_id_header`].
269    /// Populated at stream creation from the listener config's `sozu_id_header`
270    /// knob. Stored as an owned `String` so it survives a listener hot-reload
271    /// that changes the value.
272    pub sozu_id_header: String,
273    /// Resolved `Location` URL stashed by the routing layer when a frontend
274    /// triggers a permanent redirect (`RedirectPolicy::PERMANENT` or the
275    /// legacy `cluster.https_redirect`). Read by the default-answer 301
276    /// path so the response carries the correct target URL — including
277    /// optional `cluster.https_redirect_port` and rewrite-template captures.
278    /// `None` when the request is not redirecting.
279    pub redirect_location: Option<String>,
280    /// `WWW-Authenticate` realm stashed by the routing layer when a
281    /// frontend rejects an unauthenticated request (`required_auth = true`
282    /// without a valid `Authorization: Basic` header, or
283    /// `RedirectPolicy::UNAUTHORIZED`). Read by the default-answer 401
284    /// path so the response carries the cluster's configured
285    /// `www_authenticate` value. `None` falls back to template default
286    /// (header is elided when no realm is configured).
287    pub www_authenticate: Option<String>,
288    /// Original authority captured before a rewrite-host fired; emitted
289    /// back to the backend as `X-Forwarded-Host` so the backend can
290    /// reconstruct the public URL even though `:authority` / `Host` was
291    /// rewritten on the wire. `None` when no host rewrite happened.
292    pub original_authority: Option<String>,
293    /// Per-frontend response-side header edits (set/replace/delete)
294    /// stashed by the routing layer for the emission boundary in
295    /// `mux/h1.rs::writable` and `mux/h2.rs::write_streams` to apply
296    /// before `kawa.prepare(...)`. Empty when the frontend has no
297    /// response-side header policy. An entry with an empty `val`
298    /// deletes the header by name (HAProxy `del-header` parity); a
299    /// non-empty `val` set/replaces.
300    pub headers_response: Vec<HeaderEditSnapshot>,
301    /// Resolved `Retry-After` value (seconds) for an HTTP 429 default
302    /// answer. Computed in `Router::connect` when the per-(cluster,
303    /// source-IP) connection limit is hit, by folding the cluster's
304    /// `retry_after` override over the global default. `None` (or
305    /// `Some(0)`) tells the answer engine to omit the `Retry-After`
306    /// header entirely — `Retry-After: 0` invites an immediate retry
307    /// that defeats the limit. Unused for any other status code.
308    pub retry_after_seconds: Option<u32>,
309    /// Frontend-supplied template body that overrides the listener /
310    /// cluster default `http.301.redirection` for a single
311    /// `RedirectPolicy::PERMANENT` request. Stashed by the routing layer
312    /// from `RouteResult::redirect_template` and consumed by the 301
313    /// branch of `mux::answers::set_default_answer_with_retry_after`,
314    /// which compiles the body via `HttpAnswers::render_inline_301` and
315    /// renders it with the same `(REDIRECT_LOCATION, ROUTE,
316    /// REQUEST_ID)` variable schema as the persistent template chain.
317    /// `None` falls back to the cluster / listener default. Unused for
318    /// any other status code or for the legacy
319    /// `cluster.https_redirect = true` path (which never sets it).
320    pub frontend_redirect_template: Option<String>,
321    /// Resolved redirect status code stashed by the routing layer when
322    /// a frontend's `RedirectPolicy` is one of the redirect variants.
323    /// 301 = `Permanent`, 302 = `Found`, 308 = `PermanentRedirect`.
324    /// `None` falls back to 301 for the legacy
325    /// `cluster.https_redirect = true` path. Closes #1009.
326    pub redirect_status: Option<u16>,
327    /// Stable, structured discriminator surfaced as the access-log
328    /// `message` field when the session terminates on a timeout. Set by
329    /// the timeout handlers in `kawa_h1::timeout` and `MuxState::timeout`
330    /// **before** the default-answer or `forcefully_terminate_answer`
331    /// path consumes it. The vocabulary is operator-visible API once
332    /// shipped — see the access-log section of `doc/configure.md` for
333    /// the full token list (`client_timeout`,
334    /// `client_timeout_during_response`, `backend_timeout`,
335    /// `backend_response_timeout`). `None` for non-timeout sessions, in
336    /// which case the access log emits `message: None` as before.
337    pub access_log_message: Option<&'static str>,
338}
339
340/// How `apply_response_header_edits` should interpret a per-edit value.
341///
342/// The implicit empty-`val` Append → delete encoding is still supported
343/// (so legacy operator-supplied `[[...frontends.headers]]` entries work
344/// unchanged); the explicit modes give typed policies finer control:
345///
346/// - `Append`: drop the legacy delete shortcut for empty `val`, and
347///   append every other entry before the end-of-headers flag.
348/// - `SetIfAbsent`: skip the insert when `kawa.blocks` already carries
349///   a non-elided header with the same name (case-insensitive). HSTS
350///   uses this by default to preserve a backend-supplied
351///   `Strict-Transport-Security` (RFC 6797 §6.1 single-header
352///   requirement).
353/// - `Set`: delete every existing header with the matching name, then
354///   insert the new entry. Use when the operator wants their typed
355///   policy to override any backend-supplied value (the
356///   `force_replace_backend = true` HSTS shape, for example).
357#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
358pub enum HeaderEditMode {
359    /// Append the header before the end-of-headers flag. Empty `val`
360    /// is interpreted as a delete (legacy behaviour preserved).
361    #[default]
362    Append,
363    /// Skip the insert if `kawa.blocks` already contains a non-elided
364    /// header whose name matches `key` case-insensitively. Otherwise
365    /// behave like `Append`.
366    SetIfAbsent,
367    /// Delete every existing header with the matching name, then
368    /// insert the new value. Equivalent to two operator-defined edits
369    /// (delete + append) but safer to express as one typed entry.
370    Set,
371}
372
373/// Owned snapshot of a per-frontend header edit, captured at routing
374/// time so the emission boundary can apply set/replace/delete without
375/// touching the routing layer's `Rc<[HeaderEdit]>` slices.
376///
377/// `mode` chooses between explicit Append/Delete/SetIfAbsent semantics.
378/// For backwards compatibility `mode = Append` paired with an empty
379/// `val` is still treated as a delete by `apply_response_header_edits`
380/// (HAProxy `del-header` parity), so callers that have not yet migrated
381/// to the explicit `HeaderEditMode::Delete` keep working unchanged.
382#[derive(Debug, Clone)]
383pub struct HeaderEditSnapshot {
384    pub key: Vec<u8>,
385    pub val: Vec<u8>,
386    pub mode: HeaderEditMode,
387}
388
389impl kawa::h1::ParserCallbacks<Checkout> for HttpContext {
390    fn on_headers(&mut self, stream: &mut GenericHttpStream) {
391        match stream.kind {
392            kawa::Kind::Request => self.on_request_headers(stream),
393            kawa::Kind::Response => self.on_response_headers(stream),
394        }
395    }
396}
397
398impl HttpContext {
399    /// Creates a new instance
400    #[allow(clippy::too_many_arguments)]
401    pub fn new(
402        session_id: Ulid,
403        request_id: Ulid,
404        protocol: Protocol,
405        public_address: SocketAddr,
406        session_address: Option<SocketAddr>,
407        sticky_name: String,
408        sozu_id_header: String,
409        elide_x_real_ip: bool,
410        send_x_real_ip: bool,
411    ) -> Self {
412        Self {
413            session_id,
414            id: request_id,
415            backend_id: None,
416            cluster_id: None,
417
418            closing: false,
419            keep_alive_backend: true,
420            keep_alive_frontend: true,
421            protocol,
422            public_address,
423            session_address,
424            sticky_name,
425            sticky_session: None,
426            sticky_session_found: None,
427
428            method: None,
429            authority: None,
430            path: None,
431            status: None,
432            reason: None,
433            user_agent: None,
434            x_request_id: None,
435            xff_chain: None,
436
437            #[cfg(feature = "opentelemetry")]
438            otel: Default::default(),
439
440            backend_address: None,
441            tls_server_name: None,
442            tls_cert_names: None,
443            strict_sni_binding: true,
444            elide_x_real_ip,
445            send_x_real_ip,
446            tls_version: None,
447            tls_cipher: None,
448            tls_alpn: None,
449            sozu_id_header,
450            redirect_location: None,
451            www_authenticate: None,
452            original_authority: None,
453            headers_response: Vec::new(),
454            retry_after_seconds: None,
455            frontend_redirect_template: None,
456            redirect_status: None,
457            access_log_message: None,
458        }
459    }
460
461    /// Callback for request:
462    ///
463    /// - edit headers (connection, forwarded, sticky cookie, sozu-id,
464    ///   x-request-id, x-real-ip)
465    /// - save information:
466    ///   - method
467    ///   - authority
468    ///   - path
469    ///   - front keep-alive
470    ///   - sticky cookie
471    ///   - user-agent
472    ///   - x-request-id (preserved if present, else derived from `self.id`)
473    fn on_request_headers(&mut self, request: &mut GenericHttpStream) {
474        let buf = request.storage.mut_buffer();
475
476        // Captures the request line
477        if let kawa::StatusLine::Request {
478            method,
479            authority,
480            path,
481            ..
482        } = &request.detached.status_line
483        {
484            self.method = method.data_opt(buf).map(Method::new);
485            self.authority = authority
486                .data_opt(buf)
487                .and_then(|data| from_utf8(data).ok())
488                .map(ToOwned::to_owned);
489            self.path = path
490                .data_opt(buf)
491                .and_then(|data| from_utf8(data).ok())
492                .map(ToOwned::to_owned);
493        }
494
495        // if self.method == Some(Method::Get) && request.body_size == kawa::BodySize::Empty {
496        //     request.parsing_phase = kawa::ParsingPhase::Terminated;
497        // }
498
499        let public_ip = self.public_address.ip();
500        let public_port = self.public_address.port();
501        let proto = match self.protocol {
502            Protocol::HTTP => "http",
503            Protocol::HTTPS => "https",
504            _ => unreachable!(),
505        };
506
507        // Find and remove the sticky_name cookie
508        // if found its value is stored in sticky_session_found
509        for cookie in &mut request.detached.jar {
510            let key = cookie.key.data(buf);
511            if key == self.sticky_name.as_bytes() {
512                let val = cookie.val.data(buf);
513                self.sticky_session_found = from_utf8(val).ok().map(ToOwned::to_owned);
514                cookie.elide();
515            }
516        }
517
518        // If found:
519        // - set Connection to "close" if closing is set
520        // - set keep_alive_frontend to false if Connection is "close"
521        // - update value of X-Forwarded-Proto
522        // - update value of X-Forwarded-Port
523        // - store X-Forwarded-For
524        // - store Forwarded
525        // - store User-Agent
526        let mut x_for = None;
527        let mut forwarded = None;
528        let mut has_x_port = false;
529        let mut has_x_proto = false;
530        let mut has_x_request_id = false;
531        let mut has_connection = false;
532        #[cfg(feature = "opentelemetry")]
533        let mut traceparent: Option<&mut kawa::Pair> = None;
534        #[cfg(feature = "opentelemetry")]
535        let mut tracestate: Option<&mut kawa::Pair> = None;
536        for block in &mut request.blocks {
537            match block {
538                kawa::Block::Header(header) if !header.is_elided() => {
539                    let key = header.key.data(buf);
540                    if compare_no_case(key, b"connection") {
541                        has_connection = true;
542                        if self.closing {
543                            header.val = kawa::Store::Static(b"close");
544                        } else {
545                            let val = header.val.data(buf);
546                            self.keep_alive_frontend &= !compare_no_case(val, b"close");
547                        }
548                    } else if compare_no_case(key, b"X-Forwarded-Proto") {
549                        has_x_proto = true;
550                        // header.val = kawa::Store::Static(proto.as_bytes());
551                        incr!(names::http::TRUSTING_X_PROTO);
552                        let val = header.val.data(buf);
553                        if !compare_no_case(val, proto.as_bytes()) {
554                            incr!(names::http::TRUSTING_X_PROTO_DIFF);
555                            debug!(
556                                "{} Trusting X-Forwarded-Proto for {:?} even though {:?} != {}",
557                                self.log_context(),
558                                self.authority,
559                                val,
560                                proto
561                            );
562                        }
563                    } else if compare_no_case(key, b"X-Forwarded-Port") {
564                        has_x_port = true;
565                        // header.val = kawa::Store::from_string(public_port.to_string());
566                        incr!(names::http::TRUSTING_X_PORT);
567                        let val = header.val.data(buf);
568                        let mut port_buf = itoa::Buffer::new();
569                        let expected = port_buf.format(public_port);
570                        if !compare_no_case(val, expected.as_bytes()) {
571                            incr!(names::http::TRUSTING_X_PORT_DIFF);
572                            debug!(
573                                "{} Trusting X-Forwarded-Port for {:?} even though {:?} != {}",
574                                self.log_context(),
575                                self.authority,
576                                val,
577                                expected
578                            );
579                        }
580                    } else if compare_no_case(key, b"X-Forwarded-For") {
581                        // Snapshot the upstream-attested chain before we
582                        // potentially append our own peer below — the access
583                        // log records the value the client/upstream LB
584                        // forwarded, not the rewritten value Sōzu emits.
585                        self.xff_chain = header
586                            .val
587                            .data_opt(buf)
588                            .and_then(|data| from_utf8(data).ok())
589                            .map(ToOwned::to_owned);
590                        x_for = Some(header);
591                    } else if compare_no_case(key, b"X-Real-IP") && self.elide_x_real_ip {
592                        // Anti-spoofing: a client cannot supply its own
593                        // `X-Real-IP` and have it reach the backend. The
594                        // proxy-injected value (when `send_x_real_ip` is
595                        // also set) is appended after this loop. H2 trailer
596                        // HEADERS frames bypass this callback; they are
597                        // covered by the matching elision in
598                        // `pkawa::handle_trailer`.
599                        header.elide();
600                    } else if compare_no_case(key, b"Forwarded") {
601                        forwarded = Some(header);
602                    } else if compare_no_case(key, b"User-Agent") {
603                        self.user_agent = header
604                            .val
605                            .data_opt(buf)
606                            .and_then(|data| from_utf8(data).ok())
607                            .map(ToOwned::to_owned);
608                    } else if compare_no_case(key, b"X-Request-Id") {
609                        // RFC: not standardized, but the de-facto correlation
610                        // header used by Envoy/HAProxy/most LBs. Preserve the
611                        // client-supplied value verbatim — overwriting it
612                        // breaks end-to-end request tracing.
613                        has_x_request_id = true;
614                        self.x_request_id = header
615                            .val
616                            .data_opt(buf)
617                            .and_then(|data| from_utf8(data).ok())
618                            .map(ToOwned::to_owned);
619                    } else {
620                        #[cfg(feature = "opentelemetry")]
621                        if compare_no_case(key, b"traceparent") {
622                            if let Some(hdr) = traceparent {
623                                hdr.elide();
624                            }
625                            traceparent = Some(header);
626                        } else if compare_no_case(key, b"tracestate") {
627                            if let Some(hdr) = tracestate {
628                                hdr.elide();
629                            }
630                            tracestate = Some(header);
631                        }
632                    }
633                }
634                _ => {}
635            }
636        }
637
638        #[cfg(feature = "opentelemetry")]
639        let (otel, has_traceparent) = {
640            let mut otel = sozu_command_lib::logging::OpenTelemetry::default();
641            let tp = traceparent
642                .as_ref()
643                .and_then(|hdr| parse_traceparent(&hdr.val, buf))
644                .map(|(trace_id, parent_id)| (trace_id, Some(parent_id)));
645            // Remove tracestate if no traceparent is present
646            if let (None, Some(tracestate)) = (tp, tracestate) {
647                tracestate.elide();
648            }
649            let (trace_id, parent_id) = tp.unwrap_or_else(|| (random_id(), None));
650            otel.trace_id = trace_id;
651            otel.parent_span_id = parent_id;
652            otel.span_id = random_id();
653            // Modify header if present
654            if let Some(id) = &mut traceparent {
655                let new_val = build_traceparent(&otel.trace_id, &otel.span_id);
656                id.val.modify(buf, &new_val);
657            }
658            (otel, traceparent.is_some())
659        };
660
661        // If session_address is set:
662        // - append its ip address to the list of "X-Forwarded-For" if it was found, creates it if not
663        // - append "proto=[PROTO];for=[PEER];by=[PUBLIC]" to the list of "Forwarded" if it was found, creates it if not
664        if let Some(peer_addr) = self.session_address {
665            let peer_ip = peer_addr.ip();
666            let peer_port = peer_addr.port();
667            let has_x_for = x_for.is_some();
668            let has_forwarded = forwarded.is_some();
669
670            // Buffer for building header values — ownership is transferred to Store
671            // via `take`, so each header gets its own allocation.
672            let mut hdr_buf = Vec::with_capacity(128);
673
674            if let Some(header) = x_for {
675                hdr_buf.extend_from_slice(header.val.data(buf));
676                let _ = write!(hdr_buf, ", {peer_ip}");
677                header.val = kawa::Store::from_vec(std::mem::take(&mut hdr_buf));
678            }
679            if let Some(header) = &mut forwarded {
680                hdr_buf.extend_from_slice(header.val.data(buf));
681                write_forwarded_suffix(&mut hdr_buf, proto, peer_ip, peer_port, public_ip);
682                header.val = kawa::Store::from_vec(std::mem::take(&mut hdr_buf));
683            }
684
685            if !has_x_for {
686                let _ = write!(hdr_buf, "{peer_ip}");
687                request.push_block(kawa::Block::Header(kawa::Pair {
688                    key: kawa::Store::Static(b"X-Forwarded-For"),
689                    val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
690                }));
691            }
692            if !has_forwarded {
693                hdr_buf.extend_from_slice(b"proto=");
694                hdr_buf.extend_from_slice(proto.as_bytes());
695                write_forwarded_for_by(&mut hdr_buf, peer_ip, peer_port, public_ip);
696                request.push_block(kawa::Block::Header(kawa::Pair {
697                    key: kawa::Store::Static(b"Forwarded"),
698                    val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
699                }));
700            }
701
702            // Inject a proxy-generated `X-Real-IP` header carrying the
703            // peer IP (post-PROXY-v2 unwrap, so the original client IP
704            // even when the upstream presented PROXY-v2). Folded into the
705            // `if let Some(peer_addr)` arm so missing peers (raw socket,
706            // no PROXY-v2) skip the injection silently — identical to the
707            // X-Forwarded-For / Forwarded synthesis behaviour above. Any
708            // client-supplied `X-Real-IP` was either elided in the block
709            // walk (if `elide_x_real_ip` is on) or passes through; this
710            // header is appended last so order in the resulting block
711            // list is deterministic for tests.
712            if self.send_x_real_ip {
713                let _ = write!(hdr_buf, "{peer_ip}");
714                request.push_block(kawa::Block::Header(kawa::Pair {
715                    key: kawa::Store::Static(b"X-Real-IP"),
716                    val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
717                }));
718            }
719        }
720
721        #[cfg(feature = "opentelemetry")]
722        {
723            if !has_traceparent {
724                let val = build_traceparent(&otel.trace_id, &otel.span_id);
725                request.push_block(kawa::Block::Header(kawa::Pair {
726                    key: kawa::Store::Static(b"traceparent"),
727                    val: kawa::Store::from_slice(&val),
728                }));
729            }
730            self.otel = Some(otel);
731        }
732
733        if !has_x_port {
734            let mut port_buf = itoa::Buffer::new();
735            let port_str = port_buf.format(public_port);
736            request.push_block(kawa::Block::Header(kawa::Pair {
737                key: kawa::Store::Static(b"X-Forwarded-Port"),
738                val: kawa::Store::from_slice(port_str.as_bytes()),
739            }));
740        }
741        if !has_x_proto {
742            request.push_block(kawa::Block::Header(kawa::Pair {
743                key: kawa::Store::Static(b"X-Forwarded-Proto"),
744                val: kawa::Store::Static(proto.as_bytes()),
745            }));
746        }
747        // Create a "Connection" header in case it was not found and closing it set
748        if !has_connection && self.closing {
749            request.push_block(kawa::Block::Header(kawa::Pair {
750                key: kawa::Store::Static(b"Connection"),
751                val: kawa::Store::Static(b"close"),
752            }));
753        }
754        // Inject "X-Request-Id" derived from the request ULID when the client
755        // (or upstream LB) did not already supply one. When already present,
756        // the header is left untouched in the block list — preserving the
757        // client-supplied value end-to-end is the whole point of this header.
758        // Either way, `self.x_request_id` is populated so the access log
759        // records the exact value forwarded to the backend.
760        if has_x_request_id {
761            incr!(names::http::X_REQUEST_ID_PROPAGATED);
762        } else {
763            let value = self.id.to_string();
764            request.push_block(kawa::Block::Header(kawa::Pair {
765                key: kawa::Store::Static(b"X-Request-Id"),
766                val: kawa::Store::from_string(value.clone()),
767            }));
768            self.x_request_id = Some(value);
769            incr!(names::http::X_REQUEST_ID_GENERATED);
770        }
771
772        // Create a custom correlation header (defaults to "Sozu-Id", can be
773        // renamed via the `sozu_id_header` listener config knob).
774        request.push_block(kawa::Block::Header(kawa::Pair {
775            key: kawa::Store::from_string(self.sozu_id_header.clone()),
776            val: kawa::Store::from_string(self.id.to_string()),
777        }));
778    }
779
780    /// Callback for response:
781    ///
782    /// - edit headers (connection, set-cookie, sozu-id)
783    /// - save information:
784    ///   - status code
785    ///   - reason
786    ///   - back keep-alive
787    fn on_response_headers(&mut self, response: &mut GenericHttpStream) {
788        let buf = &mut response.storage.mut_buffer();
789
790        // Captures the response line
791        if let kawa::StatusLine::Response { code, reason, .. } = &response.detached.status_line {
792            self.status = Some(*code);
793            self.reason = reason
794                .data_opt(buf)
795                .and_then(|data| from_utf8(data).ok())
796                .map(ToOwned::to_owned);
797        }
798
799        if self.method == Some(Method::Head) {
800            response.parsing_phase = kawa::ParsingPhase::Terminated;
801        }
802
803        // If found:
804        // - set Connection to "close" if closing is set
805        // - set keep_alive_backend to false if Connection is "close"
806        for block in &mut response.blocks {
807            match block {
808                kawa::Block::Header(header) if !header.is_elided() => {
809                    let key = header.key.data(buf);
810                    if compare_no_case(key, b"connection") {
811                        if self.closing {
812                            header.val = kawa::Store::Static(b"close");
813                        } else {
814                            let val = header.val.data(buf);
815                            self.keep_alive_backend &= !compare_no_case(val, b"close");
816                        }
817                    }
818                }
819                _ => {}
820            }
821        }
822
823        // If the sticky_session is set and differs from the one found in the request
824        // create a "Set-Cookie" header to update the sticky_name value
825        if let Some(sticky_session) = &self.sticky_session {
826            if self.sticky_session != self.sticky_session_found {
827                let mut cookie_buf =
828                    Vec::with_capacity(self.sticky_name.len() + 1 + sticky_session.len() + 8);
829                cookie_buf.extend_from_slice(self.sticky_name.as_bytes());
830                cookie_buf.push(b'=');
831                cookie_buf.extend_from_slice(sticky_session.as_bytes());
832                cookie_buf.extend_from_slice(b"; Path=/");
833                response.push_block(kawa::Block::Header(kawa::Pair {
834                    key: kawa::Store::Static(b"Set-Cookie"),
835                    val: kawa::Store::from_vec(cookie_buf),
836                }));
837            }
838        }
839
840        // Create a custom correlation header (defaults to "Sozu-Id", can be
841        // renamed via the `sozu_id_header` listener config knob).
842        response.push_block(kawa::Block::Header(kawa::Pair {
843            key: kawa::Store::from_string(self.sozu_id_header.clone()),
844            val: kawa::Store::from_string(self.id.to_string()),
845        }));
846    }
847
848    pub fn reset(&mut self) {
849        self.keep_alive_backend = true;
850        self.keep_alive_frontend = true;
851        self.sticky_session_found = None;
852        self.method = None;
853        self.authority = None;
854        self.path = None;
855        self.status = None;
856        self.reason = None;
857        self.user_agent = None;
858        self.x_request_id = None;
859        self.xff_chain = None;
860        self.redirect_location = None;
861        self.www_authenticate = None;
862        self.original_authority = None;
863        self.headers_response.clear();
864        // Note: tls_server_name, tls_version, tls_cipher, tls_alpn,
865        // strict_sni_binding, elide_x_real_ip, send_x_real_ip are
866        // connection-scoped — set once at handshake completion and reused
867        // across every keep-alive request, so reset() intentionally leaves
868        // them in place.
869    }
870
871    pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
872        let given_method = self.method.as_ref().ok_or(RetrieveClusterError::NoMethod)?;
873        let given_authority = self
874            .authority
875            .as_deref()
876            .ok_or(RetrieveClusterError::NoHost)?;
877        let given_path = self.path.as_deref().ok_or(RetrieveClusterError::NoPath)?;
878
879        Ok((given_authority, given_path, given_method))
880    }
881
882    pub fn get_route(&self) -> String {
883        if let Some(method) = &self.method {
884            if let Some(authority) = &self.authority {
885                if let Some(path) = &self.path {
886                    return format!("{method} {authority}{path}");
887                }
888                return format!("{method} {authority}");
889            }
890            return format!("{method}");
891        }
892        String::new()
893    }
894
895    pub fn websocket_context(&self) -> WebSocketContext {
896        WebSocketContext::Http {
897            method: self.method.clone(),
898            authority: self.authority.clone(),
899            path: self.path.clone(),
900            reason: self.reason.clone(),
901            status: self.status,
902        }
903    }
904
905    pub fn log_context(&self) -> LogContext<'_> {
906        LogContext {
907            session_id: self.session_id,
908            request_id: Some(self.id),
909            cluster_id: self.cluster_id.as_deref(),
910            backend_id: self.backend_id.as_deref(),
911        }
912    }
913}
914
915#[cfg(test)]
916mod tests {
917    use super::*;
918    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
919
920    /// Helper to create a minimal HttpContext for testing.
921    fn make_context() -> HttpContext {
922        HttpContext::new(
923            Ulid::generate(),
924            Ulid::generate(),
925            Protocol::HTTP,
926            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
927            Some(SocketAddr::new(
928                IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
929                54321,
930            )),
931            "SERVERID".to_owned(),
932            "Sozu-Id".to_owned(),
933            false,
934            false,
935        )
936    }
937
938    // ── sozu_id_header ──────────────────────────────────────────────────
939
940    #[test]
941    fn test_sozu_id_header_default_name_stored_on_context() {
942        // The make_context helper uses the documented default "Sozu-Id" to
943        // match the trait default on `L7ListenerHandler::get_sozu_id_header`.
944        let ctx = make_context();
945        assert_eq!(ctx.sozu_id_header, "Sozu-Id");
946    }
947
948    #[test]
949    fn test_sozu_id_header_custom_name_stored_on_context() {
950        // Operator-provided rename is carried verbatim onto the HttpContext.
951        let ctx = HttpContext::new(
952            Ulid::generate(),
953            Ulid::generate(),
954            Protocol::HTTP,
955            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
956            None,
957            "SERVERID".to_owned(),
958            "X-Edge-Id".to_owned(),
959            false,
960            false,
961        );
962        assert_eq!(ctx.sozu_id_header, "X-Edge-Id");
963    }
964
965    // ── extract_route ──────────────────────────────────────────────────
966
967    #[test]
968    fn test_extract_route_all_present() {
969        let mut ctx = make_context();
970        ctx.method = Some(Method::Get);
971        ctx.authority = Some("example.com".to_owned());
972        ctx.path = Some("/index.html".to_owned());
973
974        let (authority, path, method) = ctx.extract_route().unwrap();
975        assert_eq!(authority, "example.com");
976        assert_eq!(path, "/index.html");
977        assert_eq!(method, &Method::Get);
978    }
979
980    #[test]
981    fn test_extract_route_no_method() {
982        let mut ctx = make_context();
983        ctx.authority = Some("example.com".to_owned());
984        ctx.path = Some("/".to_owned());
985
986        let err = ctx.extract_route().unwrap_err();
987        assert!(matches!(err, RetrieveClusterError::NoMethod));
988    }
989
990    #[test]
991    fn test_extract_route_no_host() {
992        let mut ctx = make_context();
993        ctx.method = Some(Method::Get);
994        ctx.path = Some("/".to_owned());
995
996        let err = ctx.extract_route().unwrap_err();
997        assert!(matches!(err, RetrieveClusterError::NoHost));
998    }
999
1000    #[test]
1001    fn test_extract_route_no_path() {
1002        let mut ctx = make_context();
1003        ctx.method = Some(Method::Get);
1004        ctx.authority = Some("example.com".to_owned());
1005
1006        let err = ctx.extract_route().unwrap_err();
1007        assert!(matches!(err, RetrieveClusterError::NoPath));
1008    }
1009
1010    // ── get_route ──────────────────────────────────────────────────────
1011
1012    #[test]
1013    fn test_get_route_all_present() {
1014        let mut ctx = make_context();
1015        ctx.method = Some(Method::Get);
1016        ctx.authority = Some("example.com".to_owned());
1017        ctx.path = Some("/api/v1".to_owned());
1018
1019        assert_eq!(ctx.get_route(), "GET example.com/api/v1");
1020    }
1021
1022    #[test]
1023    fn test_get_route_method_and_authority_only() {
1024        let mut ctx = make_context();
1025        ctx.method = Some(Method::Post);
1026        ctx.authority = Some("example.com".to_owned());
1027
1028        assert_eq!(ctx.get_route(), "POST example.com");
1029    }
1030
1031    #[test]
1032    fn test_get_route_method_only() {
1033        let mut ctx = make_context();
1034        ctx.method = Some(Method::Delete);
1035
1036        assert_eq!(ctx.get_route(), "DELETE");
1037    }
1038
1039    #[test]
1040    fn test_get_route_empty() {
1041        let ctx = make_context();
1042        assert_eq!(ctx.get_route(), "");
1043    }
1044
1045    // ── reset ──────────────────────────────────────────────────────────
1046
1047    #[test]
1048    fn test_reset_clears_request_response_state() {
1049        let mut ctx = make_context();
1050        ctx.keep_alive_backend = false;
1051        ctx.keep_alive_frontend = false;
1052        ctx.sticky_session_found = Some("abc123".to_owned());
1053        ctx.method = Some(Method::Post);
1054        ctx.authority = Some("example.com".to_owned());
1055        ctx.path = Some("/upload".to_owned());
1056        ctx.status = Some(200);
1057        ctx.reason = Some("OK".to_owned());
1058        ctx.user_agent = Some("curl/7.81".to_owned());
1059        ctx.x_request_id = Some("client-xrid-123".to_owned());
1060        ctx.xff_chain = Some("203.0.113.5, 198.51.100.10".to_owned());
1061        ctx.redirect_location = Some("https://example.com/".to_owned());
1062        ctx.www_authenticate = Some("Basic realm=\"sozu\"".to_owned());
1063        ctx.original_authority = Some("old.example.com".to_owned());
1064        ctx.headers_response.push(HeaderEditSnapshot {
1065            key: b"X-Cache".to_vec(),
1066            val: b"HIT".to_vec(),
1067            mode: HeaderEditMode::Append,
1068        });
1069
1070        ctx.reset();
1071
1072        assert!(ctx.keep_alive_backend);
1073        assert!(ctx.keep_alive_frontend);
1074        assert!(ctx.sticky_session_found.is_none());
1075        assert!(ctx.method.is_none());
1076        assert!(ctx.authority.is_none());
1077        assert!(ctx.path.is_none());
1078        assert!(ctx.status.is_none());
1079        assert!(ctx.reason.is_none());
1080        assert!(ctx.user_agent.is_none());
1081        assert!(ctx.x_request_id.is_none());
1082        assert!(ctx.xff_chain.is_none());
1083        // The four stash slots written by the routing layer must clear
1084        // between pipelined H1 requests; otherwise a future code path that
1085        // emits a 301 / 401 default-answer without re-routing would
1086        // inherit a stale Location / WWW-Authenticate from a prior request,
1087        // or the backend would receive a stale X-Forwarded-Host or a stale
1088        // response-side header edit.
1089        assert!(ctx.redirect_location.is_none());
1090        assert!(ctx.www_authenticate.is_none());
1091        assert!(ctx.original_authority.is_none());
1092        assert!(ctx.headers_response.is_empty());
1093    }
1094
1095    #[test]
1096    fn test_reset_preserves_tls_metadata() {
1097        // TLS metadata is connection-scoped (set once at handshake, reused
1098        // across every keep-alive request) — reset() must leave it intact
1099        // so the access log of the second request still carries it.
1100        let mut ctx = make_context();
1101        ctx.tls_server_name = Some("example.com".to_owned());
1102        ctx.tls_version = Some("TLSv1.3");
1103        ctx.tls_cipher = Some("TLS_AES_128_GCM_SHA256");
1104        ctx.tls_alpn = Some("h2");
1105        ctx.strict_sni_binding = false;
1106
1107        ctx.reset();
1108
1109        assert_eq!(ctx.tls_server_name.as_deref(), Some("example.com"));
1110        assert_eq!(ctx.tls_version, Some("TLSv1.3"));
1111        assert_eq!(ctx.tls_cipher, Some("TLS_AES_128_GCM_SHA256"));
1112        assert_eq!(ctx.tls_alpn, Some("h2"));
1113        assert!(!ctx.strict_sni_binding);
1114    }
1115
1116    #[test]
1117    fn test_reset_preserves_connection_state() {
1118        let mut ctx = make_context();
1119        ctx.closing = true;
1120        ctx.cluster_id = Some("cluster-1".to_owned());
1121        ctx.backend_id = Some("backend-1".to_owned());
1122        ctx.sticky_session = Some("session-abc".to_owned());
1123
1124        let original_id = ctx.id;
1125        let original_protocol = ctx.protocol;
1126        let original_public_address = ctx.public_address;
1127
1128        ctx.reset();
1129
1130        // Connection-level state is preserved
1131        assert!(ctx.closing);
1132        assert_eq!(ctx.cluster_id.as_deref(), Some("cluster-1"));
1133        assert_eq!(ctx.backend_id.as_deref(), Some("backend-1"));
1134        assert_eq!(ctx.sticky_session.as_deref(), Some("session-abc"));
1135        assert_eq!(ctx.id, original_id);
1136        assert_eq!(ctx.protocol, original_protocol);
1137        assert_eq!(ctx.public_address, original_public_address);
1138    }
1139
1140    // ── write_forwarded_for_by (RFC 7239 §6 IP-literal bracketing) ──────
1141    //
1142    // Locks in the IPv6 bracket+quote contract that matches HAProxy
1143    // (`_7239_print_ip6` in `src/http_ext.c`) and prevents regression to
1144    // the ambiguous `for="2001:db8::1:8080"` form flagged in issue #1254.
1145
1146    use std::net::Ipv6Addr;
1147
1148    fn render_for_by(peer: SocketAddr, public_ip: IpAddr) -> String {
1149        let mut buf = Vec::new();
1150        write_forwarded_for_by(&mut buf, peer.ip(), peer.port(), public_ip);
1151        String::from_utf8(buf).expect("forwarded fragment must be ASCII")
1152    }
1153
1154    fn render_suffix(proto: &str, peer: SocketAddr, public_ip: IpAddr) -> String {
1155        let mut buf = Vec::new();
1156        write_forwarded_suffix(&mut buf, proto, peer.ip(), peer.port(), public_ip);
1157        String::from_utf8(buf).expect("forwarded fragment must be ASCII")
1158    }
1159
1160    #[test]
1161    fn test_forwarded_ipv4_peer_ipv4_by_unchanged() {
1162        // IPv4 must keep the pre-fix wire format: unquoted `by=`, bare IPv4 in
1163        // `for=`. Sōzu emits `for=` quoted unconditionally (HAProxy emits it
1164        // quoted only when a port is attached; we always attach a port).
1165        let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7)), 54321);
1166        let public_ip = IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1));
1167        assert_eq!(
1168            render_for_by(peer, public_ip),
1169            r#";for="203.0.113.7:54321";by=198.51.100.1"#,
1170        );
1171    }
1172
1173    #[test]
1174    fn test_forwarded_ipv6_peer_brackets_disambiguate_port() {
1175        // Regression for issue #1254: without brackets, `for="2001:db8::1:8080"`
1176        // is ambiguous (could parse as `::1` + port `8080` or `:1:8080` literal).
1177        let peer = SocketAddr::new(IpAddr::V6("2001:db8::1".parse::<Ipv6Addr>().unwrap()), 8080);
1178        let public_ip = IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1));
1179        assert_eq!(
1180            render_for_by(peer, public_ip),
1181            r#";for="[2001:db8::1]:8080";by=198.51.100.1"#,
1182        );
1183    }
1184
1185    #[test]
1186    fn test_forwarded_ipv6_public_address_bracketed_and_quoted() {
1187        // `by=` carries no port but RFC 7239 §6 still requires the IP-literal
1188        // brackets for IPv6, and the value must be quoted because the literal
1189        // contains `:`.
1190        let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7)), 54321);
1191        let public_ip = IpAddr::V6("2001:db8::2".parse::<Ipv6Addr>().unwrap());
1192        assert_eq!(
1193            render_for_by(peer, public_ip),
1194            r#";for="203.0.113.7:54321";by="[2001:db8::2]""#,
1195        );
1196    }
1197
1198    #[test]
1199    fn test_forwarded_ipv6_peer_and_public_both_bracketed() {
1200        let peer = SocketAddr::new(IpAddr::V6("2001:db8::1".parse::<Ipv6Addr>().unwrap()), 8080);
1201        let public_ip = IpAddr::V6("2001:db8::2".parse::<Ipv6Addr>().unwrap());
1202        assert_eq!(
1203            render_for_by(peer, public_ip),
1204            r#";for="[2001:db8::1]:8080";by="[2001:db8::2]""#,
1205        );
1206    }
1207
1208    #[test]
1209    fn test_forwarded_suffix_prepends_proto_and_brackets_ipv6() {
1210        // The suffix form is what we append when an inbound `Forwarded`
1211        // header already exists. Same bracketing contract must hold.
1212        let peer = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 443);
1213        let public_ip = IpAddr::V6("fe80::1".parse::<Ipv6Addr>().unwrap());
1214        assert_eq!(
1215            render_suffix("https", peer, public_ip),
1216            r#", proto=https;for="[::1]:443";by="[fe80::1]""#,
1217        );
1218    }
1219
1220    // ── traceparent / opentelemetry helpers ─────────────────────────────
1221
1222    #[cfg(feature = "opentelemetry")]
1223    mod otel {
1224        use super::super::*;
1225
1226        #[test]
1227        fn test_parse_hex_valid() {
1228            let (val, rest) = parse_hex::<4>(b"abcd1234").unwrap();
1229            assert_eq!(&val, b"abcd");
1230            assert_eq!(rest, b"1234");
1231        }
1232
1233        #[test]
1234        fn test_parse_hex_exact_length() {
1235            let (val, rest) = parse_hex::<8>(b"01234567").unwrap();
1236            assert_eq!(&val, b"01234567");
1237            assert!(rest.is_empty());
1238        }
1239
1240        #[test]
1241        fn test_parse_hex_too_short() {
1242            assert!(parse_hex::<4>(b"ab").is_none());
1243        }
1244
1245        #[test]
1246        fn test_parse_hex_rejects_non_hex() {
1247            assert!(parse_hex::<4>(b"ghij").is_none());
1248        }
1249
1250        #[test]
1251        fn test_parse_hex_rejects_uppercase_is_ok() {
1252            // Uppercase hex digits are valid
1253            let (val, _) = parse_hex::<4>(b"ABCD").unwrap();
1254            assert_eq!(&val, b"ABCD");
1255        }
1256
1257        #[test]
1258        fn test_skip_separator_valid() {
1259            let rest = skip_separator(b"-hello").unwrap();
1260            assert_eq!(rest, b"hello");
1261        }
1262
1263        #[test]
1264        fn test_skip_separator_wrong_char() {
1265            assert!(skip_separator(b"+hello").is_none());
1266        }
1267
1268        #[test]
1269        fn test_skip_separator_empty() {
1270            assert!(skip_separator(b"").is_none());
1271        }
1272
1273        #[test]
1274        fn test_build_traceparent_format() {
1275            let trace_id: [u8; 32] = *b"4bf92f3577b34da6a3ce929d0e0e4736";
1276            let parent_id: [u8; 16] = *b"00f067aa0ba902b7";
1277
1278            let result = build_traceparent(&trace_id, &parent_id);
1279            assert_eq!(
1280                &result,
1281                b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
1282            );
1283        }
1284
1285        #[test]
1286        fn test_build_traceparent_length() {
1287            let trace_id = [b'a'; 32];
1288            let parent_id = [b'b'; 16];
1289            let result = build_traceparent(&trace_id, &parent_id);
1290            // Format: "00-" (3) + trace_id (32) + "-" (1) + parent_id (16) + "-01" (3) = 55
1291            assert_eq!(result.len(), 55);
1292        }
1293
1294        #[test]
1295        fn test_parse_traceparent_valid() {
1296            let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1297            let store = kawa::Store::Static(input);
1298            let (trace_id, parent_id) = parse_traceparent(&store, input).unwrap();
1299            assert_eq!(&trace_id, b"4bf92f3577b34da6a3ce929d0e0e4736");
1300            assert_eq!(&parent_id, b"00f067aa0ba902b7");
1301        }
1302
1303        #[test]
1304        fn test_parse_traceparent_sampled_flag_zero() {
1305            let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00";
1306            let store = kawa::Store::Static(input);
1307            let result = parse_traceparent(&store, input);
1308            assert!(result.is_some());
1309        }
1310
1311        #[test]
1312        fn test_parse_traceparent_wrong_version() {
1313            let input = b"01-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1314            let store = kawa::Store::Static(input);
1315            assert!(parse_traceparent(&store, input).is_none());
1316        }
1317
1318        #[test]
1319        fn test_parse_traceparent_too_short() {
1320            let input = b"00-4bf9";
1321            let store = kawa::Store::Static(input);
1322            assert!(parse_traceparent(&store, input).is_none());
1323        }
1324
1325        #[test]
1326        fn test_parse_traceparent_trailing_data() {
1327            // Extra characters after the trace-flags should cause rejection
1328            let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01-extra";
1329            let store = kawa::Store::Static(input);
1330            assert!(parse_traceparent(&store, input).is_none());
1331        }
1332
1333        #[test]
1334        fn test_parse_traceparent_missing_separator() {
1335            let input = b"004bf92f3577b34da6a3ce929d0e0e473600f067aa0ba902b701";
1336            let store = kawa::Store::Static(input);
1337            assert!(parse_traceparent(&store, input).is_none());
1338        }
1339
1340        #[test]
1341        fn test_parse_build_roundtrip() {
1342            let trace_id: [u8; 32] = *b"4bf92f3577b34da6a3ce929d0e0e4736";
1343            let parent_id: [u8; 16] = *b"00f067aa0ba902b7";
1344
1345            // Build a traceparent from known IDs
1346            let built = build_traceparent(&trace_id, &parent_id);
1347
1348            // Verify that the built value matches the expected static string,
1349            // then parse that static string back to confirm roundtrip.
1350            let expected: &[u8] = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1351            assert_eq!(&built[..], expected);
1352
1353            let store = kawa::Store::Static(expected);
1354            let (parsed_trace_id, parsed_parent_id) = parse_traceparent(&store, expected).unwrap();
1355
1356            assert_eq!(parsed_trace_id, trace_id);
1357            assert_eq!(parsed_parent_id, parent_id);
1358        }
1359    }
1360}