sozu_lib/protocol/kawa_h1/editor.rs
1//! H1 request/response header editor.
2//!
3//! Captures method/authority/path on parse, rewrites hop-by-hop and
4//! forwarding headers (`X-Forwarded-*`, `Forwarded`, `Connection`,
5//! WebSocket-upgrade signalling, optional `traceparent`), and surfaces the
6//! canonical `LogContext` used by the access-log envelope. Acts as the
7//! Kawa `ParserCallbacks` implementation for the H1 mux path.
8
9use std::{
10 io::Write as _,
11 net::{IpAddr, SocketAddr},
12 str::from_utf8,
13 sync::Arc,
14};
15
16use rusty_ulid::Ulid;
17use sozu_command_lib::logging::LogContext;
18
19use crate::metrics::names;
20use crate::{
21 Protocol, RetrieveClusterError,
22 pool::Checkout,
23 protocol::{
24 http::{GenericHttpStream, Method, parser::compare_no_case},
25 pipe::WebSocketContext,
26 },
27};
28
29#[cfg(feature = "opentelemetry")]
30fn parse_traceparent(val: &kawa::Store, buf: &[u8]) -> Option<([u8; 32], [u8; 16])> {
31 let val = val.data(buf);
32 let (version, val) = parse_hex::<2>(val)?;
33 if version.as_slice() != b"00" {
34 return None;
35 }
36 let val = skip_separator(val)?;
37 let (trace_id, val) = parse_hex::<32>(val)?;
38 let val = skip_separator(val)?;
39 let (parent_id, val) = parse_hex::<16>(val)?;
40 let val = skip_separator(val)?;
41 let (_, val) = parse_hex::<2>(val)?;
42 val.is_empty().then_some((trace_id, parent_id))
43}
44
45#[cfg(feature = "opentelemetry")]
46fn parse_hex<const N: usize>(buf: &[u8]) -> Option<([u8; N], &[u8])> {
47 let val: [u8; N] = buf.get(..N)?.try_into().unwrap();
48 val.iter()
49 .all(|c| c.is_ascii_hexdigit())
50 .then_some((val, &buf[N..]))
51}
52
53#[cfg(feature = "opentelemetry")]
54fn skip_separator(buf: &[u8]) -> Option<&[u8]> {
55 buf.first().filter(|b| **b == b'-').map(|_| &buf[1..])
56}
57
58#[cfg(feature = "opentelemetry")]
59fn random_id<const N: usize>() -> [u8; N] {
60 use rand::RngExt;
61 const CHARSET: &[u8] = b"0123456789abcdef";
62 let mut rng = rand::rng();
63 let mut buf = [0; N];
64 buf.fill_with(|| {
65 let n = rng.random_range(0..CHARSET.len());
66 CHARSET[n]
67 });
68 buf
69}
70
71#[cfg(feature = "opentelemetry")]
72fn build_traceparent(trace_id: &[u8; 32], parent_id: &[u8; 16]) -> [u8; 55] {
73 let mut buf = [0; 55];
74 buf[..3].copy_from_slice(b"00-");
75 buf[3..35].copy_from_slice(trace_id);
76 buf[35] = b'-';
77 buf[36..52].copy_from_slice(parent_id);
78 buf[52..55].copy_from_slice(b"-01");
79 buf
80}
81
82/// Write the ";for=..;by=.." portion of a Forwarded header into `buf`
83/// without heap-allocating a `String`.
84///
85/// RFC 7239 §6 requires IPv6 literals to be enclosed in square brackets
86/// inside a quoted string (the `IP-literal` production is `"[" IPv6address "]"`).
87/// Emitting a bare IPv6 like `for="2001:db8::1:8080"` is ambiguous: the
88/// trailing `:1:8080` could parse as `::1` + port `8080` or as the literal
89/// `:1:8080` with no port. Matches HAProxy's behaviour
90/// (`_7239_print_ip6` in `src/http_ext.c`), which is the de-facto reference
91/// implementation of RFC 7239 in the proxy world. See sozu issue #1254.
92fn write_forwarded_for_by(buf: &mut Vec<u8>, peer_ip: IpAddr, peer_port: u16, public_ip: IpAddr) {
93 buf.extend_from_slice(b";for=\"");
94 write_ip_literal(buf, peer_ip);
95 buf.push(b':');
96 let mut port_buf = itoa::Buffer::new();
97 buf.extend_from_slice(port_buf.format(peer_port).as_bytes());
98 buf.extend_from_slice(b"\";by=");
99 match public_ip {
100 IpAddr::V4(_) => {
101 let _ = write!(buf, "{public_ip}");
102 }
103 IpAddr::V6(_) => {
104 buf.push(b'"');
105 write_ip_literal(buf, public_ip);
106 buf.push(b'"');
107 }
108 }
109}
110
111/// Write an `IP-literal` per RFC 3986 §3.2.2 / RFC 7239 §6: IPv4 verbatim,
112/// IPv6 wrapped in `[...]`. Caller is responsible for any surrounding quotes
113/// required by the embedding syntax (RFC 7239 mandates them for IPv6).
114fn write_ip_literal(buf: &mut Vec<u8>, ip: IpAddr) {
115 match ip {
116 IpAddr::V4(_) => {
117 let _ = write!(buf, "{ip}");
118 }
119 IpAddr::V6(_) => {
120 buf.push(b'[');
121 let _ = write!(buf, "{ip}");
122 buf.push(b']');
123 }
124 }
125}
126
127/// Write ", proto=<proto>;for=..;by=.." (the suffix appended to an existing Forwarded value).
128fn write_forwarded_suffix(
129 buf: &mut Vec<u8>,
130 proto: &str,
131 peer_ip: IpAddr,
132 peer_port: u16,
133 public_ip: IpAddr,
134) {
135 buf.extend_from_slice(b", proto=");
136 buf.extend_from_slice(proto.as_bytes());
137 write_forwarded_for_by(buf, peer_ip, peer_port, public_ip);
138}
139
140/// This is the container used to store and use information about the session from within a Kawa parser callback
141#[derive(Debug)]
142pub struct HttpContext {
143 // ========== Write only
144 /// set to false if Kawa finds a "Connection" header with a "close" value in the response
145 pub keep_alive_backend: bool,
146 /// set to false if Kawa finds a "Connection" header with a "close" value in the request
147 pub keep_alive_frontend: bool,
148 /// the value of the sticky session cookie in the request
149 pub sticky_session_found: Option<String>,
150 // ---------- Status Line
151 /// the value of the method in the request line
152 pub method: Option<Method>,
153 /// the value of the authority of the request (in the request line of "Host" header)
154 pub authority: Option<String>,
155 /// the value of the path in the request line
156 pub path: Option<String>,
157 /// the value of the status code in the response line
158 pub status: Option<u16>,
159 /// the value of the reason in the response line
160 pub reason: Option<String>,
161 // ---------- Additional optional data
162 pub user_agent: Option<String>,
163 /// Value of the `x-request-id` header observed (if propagated from the
164 /// client/upstream LB) or generated (from `self.id`). Universal correlation
165 /// header — populated unconditionally by `on_request_headers` so the access
166 /// log can record the exact value forwarded to the backend.
167 pub x_request_id: Option<String>,
168 /// Verbatim value of the client-supplied `X-Forwarded-For` header as
169 /// observed before Sōzu appended its own hop. Captured here, not at
170 /// request edit time, so the access log records the upstream-attested
171 /// chain even when Sōzu also appends its own peer to the forwarded
172 /// header. `None` if the request had no `X-Forwarded-For` header.
173 pub xff_chain: Option<String>,
174
175 #[cfg(feature = "opentelemetry")]
176 pub otel: Option<sozu_command::logging::OpenTelemetry>,
177
178 // ========== Read only
179 /// signals wether Kawa should write a "Connection" header with a "close" value (request and response)
180 pub closing: bool,
181 /// Connection/session ULID — stable across all requests multiplexed on this
182 /// TCP or TLS connection. Used as the first slot in the legacy log-context
183 /// bracket `[session req cluster backend]` and emitted into
184 /// `ProtobufAccessLog.session_id`.
185 pub session_id: Ulid,
186 /// the value of the custom header, named "Sozu-Id", that Kawa should write (request and response)
187 pub id: Ulid,
188 pub backend_id: Option<String>,
189 pub cluster_id: Option<String>,
190 /// the value of the protocol Kawa should write in the Forwarded headers of the request
191 pub protocol: Protocol,
192 /// the value of the public address Kawa should write in the Forwarded headers of the request
193 pub public_address: SocketAddr,
194 /// the value of the session address Kawa should write in the Forwarded headers of the request
195 pub session_address: Option<SocketAddr>,
196 /// the name of the cookie Kawa should read from the request to get the sticky session
197 pub sticky_name: String,
198 /// the sticky session that should be used
199 /// used to create a "Set-Cookie" header in the response in case it differs from sticky_session_found
200 pub sticky_session: Option<String>,
201 /// the address of the backend server
202 pub backend_address: Option<SocketAddr>,
203 /// The TLS Server Name Indication (SNI) hostname negotiated at handshake.
204 ///
205 /// Populated for HTTPS listeners when the client sent an SNI extension (see
206 /// `https.rs::upgrade_handshake`). Used by the routing layer to enforce the
207 /// TLS trust boundary against the HTTP `:authority` / `Host` header — without
208 /// this check, an attacker holding a valid certificate for tenant A could
209 /// open TLS with SNI=A then send requests with `:authority=tenantB` and
210 /// reach tenant B's backend (CWE-346 / CWE-444).
211 ///
212 /// `None` when the listener is plaintext HTTP or the client omitted SNI.
213 /// Stored pre-lowercased and without a port for direct exact-match comparison.
214 pub tls_server_name: Option<String>,
215 /// Snapshot of the SAN set of the certificate Sōzu actually served at
216 /// the TLS handshake. Captured once in `https.rs::upgrade_handshake`
217 /// from the resolver and frozen for the connection lifetime so H2
218 /// stream coalescing (RFC 7540 §9.1.1 / RFC 9113 §9.1.1) accepts any
219 /// `:authority` covered by the certificate, with RFC 6125 §6.4.3
220 /// wildcard handling. `None` for plaintext listeners or when SNI was
221 /// absent (router falls back to the legacy exact-match predicate).
222 /// `Some(empty)` when the default cert was served — every
223 /// `:authority` is rejected. `Arc` so the snapshot is shared across
224 /// every per-stream `HttpContext` without re-allocation.
225 pub tls_cert_names: Option<Arc<Vec<String>>>,
226 /// Whether the router must reject this request when `tls_server_name`
227 /// does not exact-match its authority (CWE-346 / CWE-444). Mirrors
228 /// `HttpsListenerConfig::strict_sni_binding`. Set from the mux
229 /// `Context` at stream creation time (see `Context::create_stream`).
230 /// Plaintext listeners still never hit the check because
231 /// `tls_server_name` is `None`.
232 pub strict_sni_binding: bool,
233 /// When `true`, the request-side block walk in `on_request_headers`
234 /// strips any client-supplied `X-Real-IP` header before forwarding
235 /// (anti-spoofing). Mirrors `HttpListenerConfig::elide_x_real_ip` /
236 /// `HttpsListenerConfig::elide_x_real_ip`. Set from the mux `Context`
237 /// at stream creation (see `Context::create_stream`); listener-scoped
238 /// and never reset across keep-alive requests. Independent of
239 /// `send_x_real_ip`. The same flag is plumbed into
240 /// `pkawa::handle_trailer` so trailer HEADERS frames cannot bypass
241 /// the elision.
242 pub elide_x_real_ip: bool,
243 /// When `true`, `on_request_headers` injects a proxy-generated
244 /// `X-Real-IP` header carrying `session_address.ip()` (post-PROXY-v2
245 /// unwrap, i.e. the original client IP). Mirrors
246 /// `HttpListenerConfig::send_x_real_ip` /
247 /// `HttpsListenerConfig::send_x_real_ip`. Set from the mux `Context`
248 /// at stream creation (see `Context::create_stream`); listener-scoped
249 /// and never reset across keep-alive requests. Independent of
250 /// `elide_x_real_ip`. When `session_address` is `None` (raw socket
251 /// without a peer), no header is appended — identical to the
252 /// existing X-Forwarded-For / Forwarded synthesis behaviour.
253 pub send_x_real_ip: bool,
254 /// Negotiated TLS protocol version as a short label (e.g. `"TLSv1.3"`).
255 /// Captured from `rustls_version_label` at handshake completion and
256 /// propagated from the mux `Context`. `None` for plaintext listeners.
257 pub tls_version: Option<&'static str>,
258 /// Negotiated TLS cipher suite as a short label (e.g.
259 /// `"TLS_AES_128_GCM_SHA256"`). Captured from `rustls_ciphersuite_label`
260 /// at handshake completion and propagated from the mux `Context`. `None`
261 /// for plaintext listeners.
262 pub tls_cipher: Option<&'static str>,
263 /// Negotiated ALPN protocol (e.g. `"h2"`, `"http/1.1"`). Captured from
264 /// rustls at handshake completion and propagated from the mux `Context`.
265 /// `None` for plaintext listeners or when no ALPN was negotiated.
266 pub tls_alpn: Option<&'static str>,
267 /// Name of the correlation header Sozu injects into every request and
268 /// response. Defaults to `"Sozu-Id"` via [`L7ListenerHandler::get_sozu_id_header`].
269 /// Populated at stream creation from the listener config's `sozu_id_header`
270 /// knob. Stored as an owned `String` so it survives a listener hot-reload
271 /// that changes the value.
272 pub sozu_id_header: String,
273 /// Resolved `Location` URL stashed by the routing layer when a frontend
274 /// triggers a permanent redirect (`RedirectPolicy::PERMANENT` or the
275 /// legacy `cluster.https_redirect`). Read by the default-answer 301
276 /// path so the response carries the correct target URL — including
277 /// optional `cluster.https_redirect_port` and rewrite-template captures.
278 /// `None` when the request is not redirecting.
279 pub redirect_location: Option<String>,
280 /// `WWW-Authenticate` realm stashed by the routing layer when a
281 /// frontend rejects an unauthenticated request (`required_auth = true`
282 /// without a valid `Authorization: Basic` header, or
283 /// `RedirectPolicy::UNAUTHORIZED`). Read by the default-answer 401
284 /// path so the response carries the cluster's configured
285 /// `www_authenticate` value. `None` falls back to template default
286 /// (header is elided when no realm is configured).
287 pub www_authenticate: Option<String>,
288 /// Original authority captured before a rewrite-host fired; emitted
289 /// back to the backend as `X-Forwarded-Host` so the backend can
290 /// reconstruct the public URL even though `:authority` / `Host` was
291 /// rewritten on the wire. `None` when no host rewrite happened.
292 pub original_authority: Option<String>,
293 /// Per-frontend response-side header edits (set/replace/delete)
294 /// stashed by the routing layer for the emission boundary in
295 /// `mux/h1.rs::writable` and `mux/h2.rs::write_streams` to apply
296 /// before `kawa.prepare(...)`. Empty when the frontend has no
297 /// response-side header policy. An entry with an empty `val`
298 /// deletes the header by name (HAProxy `del-header` parity); a
299 /// non-empty `val` set/replaces.
300 pub headers_response: Vec<HeaderEditSnapshot>,
301 /// Resolved `Retry-After` value (seconds) for an HTTP 429 default
302 /// answer. Computed in `Router::connect` when the per-(cluster,
303 /// source-IP) connection limit is hit, by folding the cluster's
304 /// `retry_after` override over the global default. `None` (or
305 /// `Some(0)`) tells the answer engine to omit the `Retry-After`
306 /// header entirely — `Retry-After: 0` invites an immediate retry
307 /// that defeats the limit. Unused for any other status code.
308 pub retry_after_seconds: Option<u32>,
309 /// Frontend-supplied template body that overrides the listener /
310 /// cluster default `http.301.redirection` for a single
311 /// `RedirectPolicy::PERMANENT` request. Stashed by the routing layer
312 /// from `RouteResult::redirect_template` and consumed by the 301
313 /// branch of `mux::answers::set_default_answer_with_retry_after`,
314 /// which compiles the body via `HttpAnswers::render_inline_301` and
315 /// renders it with the same `(REDIRECT_LOCATION, ROUTE,
316 /// REQUEST_ID)` variable schema as the persistent template chain.
317 /// `None` falls back to the cluster / listener default. Unused for
318 /// any other status code or for the legacy
319 /// `cluster.https_redirect = true` path (which never sets it).
320 pub frontend_redirect_template: Option<String>,
321 /// Resolved redirect status code stashed by the routing layer when
322 /// a frontend's `RedirectPolicy` is one of the redirect variants.
323 /// 301 = `Permanent`, 302 = `Found`, 308 = `PermanentRedirect`.
324 /// `None` falls back to 301 for the legacy
325 /// `cluster.https_redirect = true` path. Closes #1009.
326 pub redirect_status: Option<u16>,
327 /// Stable, structured discriminator surfaced as the access-log
328 /// `message` field when the session terminates on a timeout. Set by
329 /// the timeout handlers in `kawa_h1::timeout` and `MuxState::timeout`
330 /// **before** the default-answer or `forcefully_terminate_answer`
331 /// path consumes it. The vocabulary is operator-visible API once
332 /// shipped — see the access-log section of `doc/configure.md` for
333 /// the full token list (`client_timeout`,
334 /// `client_timeout_during_response`, `backend_timeout`,
335 /// `backend_response_timeout`). `None` for non-timeout sessions, in
336 /// which case the access log emits `message: None` as before.
337 pub access_log_message: Option<&'static str>,
338}
339
340/// How `apply_response_header_edits` should interpret a per-edit value.
341///
342/// The implicit empty-`val` Append → delete encoding is still supported
343/// (so legacy operator-supplied `[[...frontends.headers]]` entries work
344/// unchanged); the explicit modes give typed policies finer control:
345///
346/// - `Append`: drop the legacy delete shortcut for empty `val`, and
347/// append every other entry before the end-of-headers flag.
348/// - `SetIfAbsent`: skip the insert when `kawa.blocks` already carries
349/// a non-elided header with the same name (case-insensitive). HSTS
350/// uses this by default to preserve a backend-supplied
351/// `Strict-Transport-Security` (RFC 6797 §6.1 single-header
352/// requirement).
353/// - `Set`: delete every existing header with the matching name, then
354/// insert the new entry. Use when the operator wants their typed
355/// policy to override any backend-supplied value (the
356/// `force_replace_backend = true` HSTS shape, for example).
357#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
358pub enum HeaderEditMode {
359 /// Append the header before the end-of-headers flag. Empty `val`
360 /// is interpreted as a delete (legacy behaviour preserved).
361 #[default]
362 Append,
363 /// Skip the insert if `kawa.blocks` already contains a non-elided
364 /// header whose name matches `key` case-insensitively. Otherwise
365 /// behave like `Append`.
366 SetIfAbsent,
367 /// Delete every existing header with the matching name, then
368 /// insert the new value. Equivalent to two operator-defined edits
369 /// (delete + append) but safer to express as one typed entry.
370 Set,
371}
372
373/// Owned snapshot of a per-frontend header edit, captured at routing
374/// time so the emission boundary can apply set/replace/delete without
375/// touching the routing layer's `Rc<[HeaderEdit]>` slices.
376///
377/// `mode` chooses between explicit Append/Delete/SetIfAbsent semantics.
378/// For backwards compatibility `mode = Append` paired with an empty
379/// `val` is still treated as a delete by `apply_response_header_edits`
380/// (HAProxy `del-header` parity), so callers that have not yet migrated
381/// to the explicit `HeaderEditMode::Delete` keep working unchanged.
382#[derive(Debug, Clone)]
383pub struct HeaderEditSnapshot {
384 pub key: Vec<u8>,
385 pub val: Vec<u8>,
386 pub mode: HeaderEditMode,
387}
388
389impl kawa::h1::ParserCallbacks<Checkout> for HttpContext {
390 fn on_headers(&mut self, stream: &mut GenericHttpStream) {
391 match stream.kind {
392 kawa::Kind::Request => self.on_request_headers(stream),
393 kawa::Kind::Response => self.on_response_headers(stream),
394 }
395 }
396}
397
398impl HttpContext {
399 /// Creates a new instance
400 #[allow(clippy::too_many_arguments)]
401 pub fn new(
402 session_id: Ulid,
403 request_id: Ulid,
404 protocol: Protocol,
405 public_address: SocketAddr,
406 session_address: Option<SocketAddr>,
407 sticky_name: String,
408 sozu_id_header: String,
409 elide_x_real_ip: bool,
410 send_x_real_ip: bool,
411 ) -> Self {
412 Self {
413 session_id,
414 id: request_id,
415 backend_id: None,
416 cluster_id: None,
417
418 closing: false,
419 keep_alive_backend: true,
420 keep_alive_frontend: true,
421 protocol,
422 public_address,
423 session_address,
424 sticky_name,
425 sticky_session: None,
426 sticky_session_found: None,
427
428 method: None,
429 authority: None,
430 path: None,
431 status: None,
432 reason: None,
433 user_agent: None,
434 x_request_id: None,
435 xff_chain: None,
436
437 #[cfg(feature = "opentelemetry")]
438 otel: Default::default(),
439
440 backend_address: None,
441 tls_server_name: None,
442 tls_cert_names: None,
443 strict_sni_binding: true,
444 elide_x_real_ip,
445 send_x_real_ip,
446 tls_version: None,
447 tls_cipher: None,
448 tls_alpn: None,
449 sozu_id_header,
450 redirect_location: None,
451 www_authenticate: None,
452 original_authority: None,
453 headers_response: Vec::new(),
454 retry_after_seconds: None,
455 frontend_redirect_template: None,
456 redirect_status: None,
457 access_log_message: None,
458 }
459 }
460
461 /// Callback for request:
462 ///
463 /// - edit headers (connection, forwarded, sticky cookie, sozu-id,
464 /// x-request-id, x-real-ip)
465 /// - save information:
466 /// - method
467 /// - authority
468 /// - path
469 /// - front keep-alive
470 /// - sticky cookie
471 /// - user-agent
472 /// - x-request-id (preserved if present, else derived from `self.id`)
473 fn on_request_headers(&mut self, request: &mut GenericHttpStream) {
474 let buf = request.storage.mut_buffer();
475
476 // Captures the request line
477 if let kawa::StatusLine::Request {
478 method,
479 authority,
480 path,
481 ..
482 } = &request.detached.status_line
483 {
484 self.method = method.data_opt(buf).map(Method::new);
485 self.authority = authority
486 .data_opt(buf)
487 .and_then(|data| from_utf8(data).ok())
488 .map(ToOwned::to_owned);
489 self.path = path
490 .data_opt(buf)
491 .and_then(|data| from_utf8(data).ok())
492 .map(ToOwned::to_owned);
493 }
494
495 // if self.method == Some(Method::Get) && request.body_size == kawa::BodySize::Empty {
496 // request.parsing_phase = kawa::ParsingPhase::Terminated;
497 // }
498
499 let public_ip = self.public_address.ip();
500 let public_port = self.public_address.port();
501 let proto = match self.protocol {
502 Protocol::HTTP => "http",
503 Protocol::HTTPS => "https",
504 _ => unreachable!(),
505 };
506
507 // Find and remove the sticky_name cookie
508 // if found its value is stored in sticky_session_found
509 for cookie in &mut request.detached.jar {
510 let key = cookie.key.data(buf);
511 if key == self.sticky_name.as_bytes() {
512 let val = cookie.val.data(buf);
513 self.sticky_session_found = from_utf8(val).ok().map(ToOwned::to_owned);
514 cookie.elide();
515 }
516 }
517
518 // If found:
519 // - set Connection to "close" if closing is set
520 // - set keep_alive_frontend to false if Connection is "close"
521 // - update value of X-Forwarded-Proto
522 // - update value of X-Forwarded-Port
523 // - store X-Forwarded-For
524 // - store Forwarded
525 // - store User-Agent
526 let mut x_for = None;
527 let mut forwarded = None;
528 let mut has_x_port = false;
529 let mut has_x_proto = false;
530 let mut has_x_request_id = false;
531 let mut has_connection = false;
532 #[cfg(feature = "opentelemetry")]
533 let mut traceparent: Option<&mut kawa::Pair> = None;
534 #[cfg(feature = "opentelemetry")]
535 let mut tracestate: Option<&mut kawa::Pair> = None;
536 for block in &mut request.blocks {
537 match block {
538 kawa::Block::Header(header) if !header.is_elided() => {
539 let key = header.key.data(buf);
540 if compare_no_case(key, b"connection") {
541 has_connection = true;
542 if self.closing {
543 header.val = kawa::Store::Static(b"close");
544 } else {
545 let val = header.val.data(buf);
546 self.keep_alive_frontend &= !compare_no_case(val, b"close");
547 }
548 } else if compare_no_case(key, b"X-Forwarded-Proto") {
549 has_x_proto = true;
550 // header.val = kawa::Store::Static(proto.as_bytes());
551 incr!(names::http::TRUSTING_X_PROTO);
552 let val = header.val.data(buf);
553 if !compare_no_case(val, proto.as_bytes()) {
554 incr!(names::http::TRUSTING_X_PROTO_DIFF);
555 debug!(
556 "{} Trusting X-Forwarded-Proto for {:?} even though {:?} != {}",
557 self.log_context(),
558 self.authority,
559 val,
560 proto
561 );
562 }
563 } else if compare_no_case(key, b"X-Forwarded-Port") {
564 has_x_port = true;
565 // header.val = kawa::Store::from_string(public_port.to_string());
566 incr!(names::http::TRUSTING_X_PORT);
567 let val = header.val.data(buf);
568 let mut port_buf = itoa::Buffer::new();
569 let expected = port_buf.format(public_port);
570 if !compare_no_case(val, expected.as_bytes()) {
571 incr!(names::http::TRUSTING_X_PORT_DIFF);
572 debug!(
573 "{} Trusting X-Forwarded-Port for {:?} even though {:?} != {}",
574 self.log_context(),
575 self.authority,
576 val,
577 expected
578 );
579 }
580 } else if compare_no_case(key, b"X-Forwarded-For") {
581 // Snapshot the upstream-attested chain before we
582 // potentially append our own peer below — the access
583 // log records the value the client/upstream LB
584 // forwarded, not the rewritten value Sōzu emits.
585 self.xff_chain = header
586 .val
587 .data_opt(buf)
588 .and_then(|data| from_utf8(data).ok())
589 .map(ToOwned::to_owned);
590 x_for = Some(header);
591 } else if compare_no_case(key, b"X-Real-IP") && self.elide_x_real_ip {
592 // Anti-spoofing: a client cannot supply its own
593 // `X-Real-IP` and have it reach the backend. The
594 // proxy-injected value (when `send_x_real_ip` is
595 // also set) is appended after this loop. H2 trailer
596 // HEADERS frames bypass this callback; they are
597 // covered by the matching elision in
598 // `pkawa::handle_trailer`.
599 header.elide();
600 } else if compare_no_case(key, b"Forwarded") {
601 forwarded = Some(header);
602 } else if compare_no_case(key, b"User-Agent") {
603 self.user_agent = header
604 .val
605 .data_opt(buf)
606 .and_then(|data| from_utf8(data).ok())
607 .map(ToOwned::to_owned);
608 } else if compare_no_case(key, b"X-Request-Id") {
609 // RFC: not standardized, but the de-facto correlation
610 // header used by Envoy/HAProxy/most LBs. Preserve the
611 // client-supplied value verbatim — overwriting it
612 // breaks end-to-end request tracing.
613 has_x_request_id = true;
614 self.x_request_id = header
615 .val
616 .data_opt(buf)
617 .and_then(|data| from_utf8(data).ok())
618 .map(ToOwned::to_owned);
619 } else {
620 #[cfg(feature = "opentelemetry")]
621 if compare_no_case(key, b"traceparent") {
622 if let Some(hdr) = traceparent {
623 hdr.elide();
624 }
625 traceparent = Some(header);
626 } else if compare_no_case(key, b"tracestate") {
627 if let Some(hdr) = tracestate {
628 hdr.elide();
629 }
630 tracestate = Some(header);
631 }
632 }
633 }
634 _ => {}
635 }
636 }
637
638 #[cfg(feature = "opentelemetry")]
639 let (otel, has_traceparent) = {
640 let mut otel = sozu_command_lib::logging::OpenTelemetry::default();
641 let tp = traceparent
642 .as_ref()
643 .and_then(|hdr| parse_traceparent(&hdr.val, buf))
644 .map(|(trace_id, parent_id)| (trace_id, Some(parent_id)));
645 // Remove tracestate if no traceparent is present
646 if let (None, Some(tracestate)) = (tp, tracestate) {
647 tracestate.elide();
648 }
649 let (trace_id, parent_id) = tp.unwrap_or_else(|| (random_id(), None));
650 otel.trace_id = trace_id;
651 otel.parent_span_id = parent_id;
652 otel.span_id = random_id();
653 // Modify header if present
654 if let Some(id) = &mut traceparent {
655 let new_val = build_traceparent(&otel.trace_id, &otel.span_id);
656 id.val.modify(buf, &new_val);
657 }
658 (otel, traceparent.is_some())
659 };
660
661 // If session_address is set:
662 // - append its ip address to the list of "X-Forwarded-For" if it was found, creates it if not
663 // - append "proto=[PROTO];for=[PEER];by=[PUBLIC]" to the list of "Forwarded" if it was found, creates it if not
664 if let Some(peer_addr) = self.session_address {
665 let peer_ip = peer_addr.ip();
666 let peer_port = peer_addr.port();
667 let has_x_for = x_for.is_some();
668 let has_forwarded = forwarded.is_some();
669
670 // Buffer for building header values — ownership is transferred to Store
671 // via `take`, so each header gets its own allocation.
672 let mut hdr_buf = Vec::with_capacity(128);
673
674 if let Some(header) = x_for {
675 hdr_buf.extend_from_slice(header.val.data(buf));
676 let _ = write!(hdr_buf, ", {peer_ip}");
677 header.val = kawa::Store::from_vec(std::mem::take(&mut hdr_buf));
678 }
679 if let Some(header) = &mut forwarded {
680 hdr_buf.extend_from_slice(header.val.data(buf));
681 write_forwarded_suffix(&mut hdr_buf, proto, peer_ip, peer_port, public_ip);
682 header.val = kawa::Store::from_vec(std::mem::take(&mut hdr_buf));
683 }
684
685 if !has_x_for {
686 let _ = write!(hdr_buf, "{peer_ip}");
687 request.push_block(kawa::Block::Header(kawa::Pair {
688 key: kawa::Store::Static(b"X-Forwarded-For"),
689 val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
690 }));
691 }
692 if !has_forwarded {
693 hdr_buf.extend_from_slice(b"proto=");
694 hdr_buf.extend_from_slice(proto.as_bytes());
695 write_forwarded_for_by(&mut hdr_buf, peer_ip, peer_port, public_ip);
696 request.push_block(kawa::Block::Header(kawa::Pair {
697 key: kawa::Store::Static(b"Forwarded"),
698 val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
699 }));
700 }
701
702 // Inject a proxy-generated `X-Real-IP` header carrying the
703 // peer IP (post-PROXY-v2 unwrap, so the original client IP
704 // even when the upstream presented PROXY-v2). Folded into the
705 // `if let Some(peer_addr)` arm so missing peers (raw socket,
706 // no PROXY-v2) skip the injection silently — identical to the
707 // X-Forwarded-For / Forwarded synthesis behaviour above. Any
708 // client-supplied `X-Real-IP` was either elided in the block
709 // walk (if `elide_x_real_ip` is on) or passes through; this
710 // header is appended last so order in the resulting block
711 // list is deterministic for tests.
712 if self.send_x_real_ip {
713 let _ = write!(hdr_buf, "{peer_ip}");
714 request.push_block(kawa::Block::Header(kawa::Pair {
715 key: kawa::Store::Static(b"X-Real-IP"),
716 val: kawa::Store::from_vec(std::mem::take(&mut hdr_buf)),
717 }));
718 }
719 }
720
721 #[cfg(feature = "opentelemetry")]
722 {
723 if !has_traceparent {
724 let val = build_traceparent(&otel.trace_id, &otel.span_id);
725 request.push_block(kawa::Block::Header(kawa::Pair {
726 key: kawa::Store::Static(b"traceparent"),
727 val: kawa::Store::from_slice(&val),
728 }));
729 }
730 self.otel = Some(otel);
731 }
732
733 if !has_x_port {
734 let mut port_buf = itoa::Buffer::new();
735 let port_str = port_buf.format(public_port);
736 request.push_block(kawa::Block::Header(kawa::Pair {
737 key: kawa::Store::Static(b"X-Forwarded-Port"),
738 val: kawa::Store::from_slice(port_str.as_bytes()),
739 }));
740 }
741 if !has_x_proto {
742 request.push_block(kawa::Block::Header(kawa::Pair {
743 key: kawa::Store::Static(b"X-Forwarded-Proto"),
744 val: kawa::Store::Static(proto.as_bytes()),
745 }));
746 }
747 // Create a "Connection" header in case it was not found and closing it set
748 if !has_connection && self.closing {
749 request.push_block(kawa::Block::Header(kawa::Pair {
750 key: kawa::Store::Static(b"Connection"),
751 val: kawa::Store::Static(b"close"),
752 }));
753 }
754 // Inject "X-Request-Id" derived from the request ULID when the client
755 // (or upstream LB) did not already supply one. When already present,
756 // the header is left untouched in the block list — preserving the
757 // client-supplied value end-to-end is the whole point of this header.
758 // Either way, `self.x_request_id` is populated so the access log
759 // records the exact value forwarded to the backend.
760 if has_x_request_id {
761 incr!(names::http::X_REQUEST_ID_PROPAGATED);
762 } else {
763 let value = self.id.to_string();
764 request.push_block(kawa::Block::Header(kawa::Pair {
765 key: kawa::Store::Static(b"X-Request-Id"),
766 val: kawa::Store::from_string(value.clone()),
767 }));
768 self.x_request_id = Some(value);
769 incr!(names::http::X_REQUEST_ID_GENERATED);
770 }
771
772 // Create a custom correlation header (defaults to "Sozu-Id", can be
773 // renamed via the `sozu_id_header` listener config knob).
774 request.push_block(kawa::Block::Header(kawa::Pair {
775 key: kawa::Store::from_string(self.sozu_id_header.clone()),
776 val: kawa::Store::from_string(self.id.to_string()),
777 }));
778 }
779
780 /// Callback for response:
781 ///
782 /// - edit headers (connection, set-cookie, sozu-id)
783 /// - save information:
784 /// - status code
785 /// - reason
786 /// - back keep-alive
787 fn on_response_headers(&mut self, response: &mut GenericHttpStream) {
788 let buf = &mut response.storage.mut_buffer();
789
790 // Captures the response line
791 if let kawa::StatusLine::Response { code, reason, .. } = &response.detached.status_line {
792 self.status = Some(*code);
793 self.reason = reason
794 .data_opt(buf)
795 .and_then(|data| from_utf8(data).ok())
796 .map(ToOwned::to_owned);
797 }
798
799 if self.method == Some(Method::Head) {
800 response.parsing_phase = kawa::ParsingPhase::Terminated;
801 }
802
803 // If found:
804 // - set Connection to "close" if closing is set
805 // - set keep_alive_backend to false if Connection is "close"
806 for block in &mut response.blocks {
807 match block {
808 kawa::Block::Header(header) if !header.is_elided() => {
809 let key = header.key.data(buf);
810 if compare_no_case(key, b"connection") {
811 if self.closing {
812 header.val = kawa::Store::Static(b"close");
813 } else {
814 let val = header.val.data(buf);
815 self.keep_alive_backend &= !compare_no_case(val, b"close");
816 }
817 }
818 }
819 _ => {}
820 }
821 }
822
823 // If the sticky_session is set and differs from the one found in the request
824 // create a "Set-Cookie" header to update the sticky_name value
825 if let Some(sticky_session) = &self.sticky_session {
826 if self.sticky_session != self.sticky_session_found {
827 let mut cookie_buf =
828 Vec::with_capacity(self.sticky_name.len() + 1 + sticky_session.len() + 8);
829 cookie_buf.extend_from_slice(self.sticky_name.as_bytes());
830 cookie_buf.push(b'=');
831 cookie_buf.extend_from_slice(sticky_session.as_bytes());
832 cookie_buf.extend_from_slice(b"; Path=/");
833 response.push_block(kawa::Block::Header(kawa::Pair {
834 key: kawa::Store::Static(b"Set-Cookie"),
835 val: kawa::Store::from_vec(cookie_buf),
836 }));
837 }
838 }
839
840 // Create a custom correlation header (defaults to "Sozu-Id", can be
841 // renamed via the `sozu_id_header` listener config knob).
842 response.push_block(kawa::Block::Header(kawa::Pair {
843 key: kawa::Store::from_string(self.sozu_id_header.clone()),
844 val: kawa::Store::from_string(self.id.to_string()),
845 }));
846 }
847
848 pub fn reset(&mut self) {
849 self.keep_alive_backend = true;
850 self.keep_alive_frontend = true;
851 self.sticky_session_found = None;
852 self.method = None;
853 self.authority = None;
854 self.path = None;
855 self.status = None;
856 self.reason = None;
857 self.user_agent = None;
858 self.x_request_id = None;
859 self.xff_chain = None;
860 self.redirect_location = None;
861 self.www_authenticate = None;
862 self.original_authority = None;
863 self.headers_response.clear();
864 // Note: tls_server_name, tls_version, tls_cipher, tls_alpn,
865 // strict_sni_binding, elide_x_real_ip, send_x_real_ip are
866 // connection-scoped — set once at handshake completion and reused
867 // across every keep-alive request, so reset() intentionally leaves
868 // them in place.
869 }
870
871 pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
872 let given_method = self.method.as_ref().ok_or(RetrieveClusterError::NoMethod)?;
873 let given_authority = self
874 .authority
875 .as_deref()
876 .ok_or(RetrieveClusterError::NoHost)?;
877 let given_path = self.path.as_deref().ok_or(RetrieveClusterError::NoPath)?;
878
879 Ok((given_authority, given_path, given_method))
880 }
881
882 pub fn get_route(&self) -> String {
883 if let Some(method) = &self.method {
884 if let Some(authority) = &self.authority {
885 if let Some(path) = &self.path {
886 return format!("{method} {authority}{path}");
887 }
888 return format!("{method} {authority}");
889 }
890 return format!("{method}");
891 }
892 String::new()
893 }
894
895 pub fn websocket_context(&self) -> WebSocketContext {
896 WebSocketContext::Http {
897 method: self.method.clone(),
898 authority: self.authority.clone(),
899 path: self.path.clone(),
900 reason: self.reason.clone(),
901 status: self.status,
902 }
903 }
904
905 pub fn log_context(&self) -> LogContext<'_> {
906 LogContext {
907 session_id: self.session_id,
908 request_id: Some(self.id),
909 cluster_id: self.cluster_id.as_deref(),
910 backend_id: self.backend_id.as_deref(),
911 }
912 }
913}
914
915#[cfg(test)]
916mod tests {
917 use super::*;
918 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
919
920 /// Helper to create a minimal HttpContext for testing.
921 fn make_context() -> HttpContext {
922 HttpContext::new(
923 Ulid::generate(),
924 Ulid::generate(),
925 Protocol::HTTP,
926 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
927 Some(SocketAddr::new(
928 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
929 54321,
930 )),
931 "SERVERID".to_owned(),
932 "Sozu-Id".to_owned(),
933 false,
934 false,
935 )
936 }
937
938 // ── sozu_id_header ──────────────────────────────────────────────────
939
940 #[test]
941 fn test_sozu_id_header_default_name_stored_on_context() {
942 // The make_context helper uses the documented default "Sozu-Id" to
943 // match the trait default on `L7ListenerHandler::get_sozu_id_header`.
944 let ctx = make_context();
945 assert_eq!(ctx.sozu_id_header, "Sozu-Id");
946 }
947
948 #[test]
949 fn test_sozu_id_header_custom_name_stored_on_context() {
950 // Operator-provided rename is carried verbatim onto the HttpContext.
951 let ctx = HttpContext::new(
952 Ulid::generate(),
953 Ulid::generate(),
954 Protocol::HTTP,
955 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
956 None,
957 "SERVERID".to_owned(),
958 "X-Edge-Id".to_owned(),
959 false,
960 false,
961 );
962 assert_eq!(ctx.sozu_id_header, "X-Edge-Id");
963 }
964
965 // ── extract_route ──────────────────────────────────────────────────
966
967 #[test]
968 fn test_extract_route_all_present() {
969 let mut ctx = make_context();
970 ctx.method = Some(Method::Get);
971 ctx.authority = Some("example.com".to_owned());
972 ctx.path = Some("/index.html".to_owned());
973
974 let (authority, path, method) = ctx.extract_route().unwrap();
975 assert_eq!(authority, "example.com");
976 assert_eq!(path, "/index.html");
977 assert_eq!(method, &Method::Get);
978 }
979
980 #[test]
981 fn test_extract_route_no_method() {
982 let mut ctx = make_context();
983 ctx.authority = Some("example.com".to_owned());
984 ctx.path = Some("/".to_owned());
985
986 let err = ctx.extract_route().unwrap_err();
987 assert!(matches!(err, RetrieveClusterError::NoMethod));
988 }
989
990 #[test]
991 fn test_extract_route_no_host() {
992 let mut ctx = make_context();
993 ctx.method = Some(Method::Get);
994 ctx.path = Some("/".to_owned());
995
996 let err = ctx.extract_route().unwrap_err();
997 assert!(matches!(err, RetrieveClusterError::NoHost));
998 }
999
1000 #[test]
1001 fn test_extract_route_no_path() {
1002 let mut ctx = make_context();
1003 ctx.method = Some(Method::Get);
1004 ctx.authority = Some("example.com".to_owned());
1005
1006 let err = ctx.extract_route().unwrap_err();
1007 assert!(matches!(err, RetrieveClusterError::NoPath));
1008 }
1009
1010 // ── get_route ──────────────────────────────────────────────────────
1011
1012 #[test]
1013 fn test_get_route_all_present() {
1014 let mut ctx = make_context();
1015 ctx.method = Some(Method::Get);
1016 ctx.authority = Some("example.com".to_owned());
1017 ctx.path = Some("/api/v1".to_owned());
1018
1019 assert_eq!(ctx.get_route(), "GET example.com/api/v1");
1020 }
1021
1022 #[test]
1023 fn test_get_route_method_and_authority_only() {
1024 let mut ctx = make_context();
1025 ctx.method = Some(Method::Post);
1026 ctx.authority = Some("example.com".to_owned());
1027
1028 assert_eq!(ctx.get_route(), "POST example.com");
1029 }
1030
1031 #[test]
1032 fn test_get_route_method_only() {
1033 let mut ctx = make_context();
1034 ctx.method = Some(Method::Delete);
1035
1036 assert_eq!(ctx.get_route(), "DELETE");
1037 }
1038
1039 #[test]
1040 fn test_get_route_empty() {
1041 let ctx = make_context();
1042 assert_eq!(ctx.get_route(), "");
1043 }
1044
1045 // ── reset ──────────────────────────────────────────────────────────
1046
1047 #[test]
1048 fn test_reset_clears_request_response_state() {
1049 let mut ctx = make_context();
1050 ctx.keep_alive_backend = false;
1051 ctx.keep_alive_frontend = false;
1052 ctx.sticky_session_found = Some("abc123".to_owned());
1053 ctx.method = Some(Method::Post);
1054 ctx.authority = Some("example.com".to_owned());
1055 ctx.path = Some("/upload".to_owned());
1056 ctx.status = Some(200);
1057 ctx.reason = Some("OK".to_owned());
1058 ctx.user_agent = Some("curl/7.81".to_owned());
1059 ctx.x_request_id = Some("client-xrid-123".to_owned());
1060 ctx.xff_chain = Some("203.0.113.5, 198.51.100.10".to_owned());
1061 ctx.redirect_location = Some("https://example.com/".to_owned());
1062 ctx.www_authenticate = Some("Basic realm=\"sozu\"".to_owned());
1063 ctx.original_authority = Some("old.example.com".to_owned());
1064 ctx.headers_response.push(HeaderEditSnapshot {
1065 key: b"X-Cache".to_vec(),
1066 val: b"HIT".to_vec(),
1067 mode: HeaderEditMode::Append,
1068 });
1069
1070 ctx.reset();
1071
1072 assert!(ctx.keep_alive_backend);
1073 assert!(ctx.keep_alive_frontend);
1074 assert!(ctx.sticky_session_found.is_none());
1075 assert!(ctx.method.is_none());
1076 assert!(ctx.authority.is_none());
1077 assert!(ctx.path.is_none());
1078 assert!(ctx.status.is_none());
1079 assert!(ctx.reason.is_none());
1080 assert!(ctx.user_agent.is_none());
1081 assert!(ctx.x_request_id.is_none());
1082 assert!(ctx.xff_chain.is_none());
1083 // The four stash slots written by the routing layer must clear
1084 // between pipelined H1 requests; otherwise a future code path that
1085 // emits a 301 / 401 default-answer without re-routing would
1086 // inherit a stale Location / WWW-Authenticate from a prior request,
1087 // or the backend would receive a stale X-Forwarded-Host or a stale
1088 // response-side header edit.
1089 assert!(ctx.redirect_location.is_none());
1090 assert!(ctx.www_authenticate.is_none());
1091 assert!(ctx.original_authority.is_none());
1092 assert!(ctx.headers_response.is_empty());
1093 }
1094
1095 #[test]
1096 fn test_reset_preserves_tls_metadata() {
1097 // TLS metadata is connection-scoped (set once at handshake, reused
1098 // across every keep-alive request) — reset() must leave it intact
1099 // so the access log of the second request still carries it.
1100 let mut ctx = make_context();
1101 ctx.tls_server_name = Some("example.com".to_owned());
1102 ctx.tls_version = Some("TLSv1.3");
1103 ctx.tls_cipher = Some("TLS_AES_128_GCM_SHA256");
1104 ctx.tls_alpn = Some("h2");
1105 ctx.strict_sni_binding = false;
1106
1107 ctx.reset();
1108
1109 assert_eq!(ctx.tls_server_name.as_deref(), Some("example.com"));
1110 assert_eq!(ctx.tls_version, Some("TLSv1.3"));
1111 assert_eq!(ctx.tls_cipher, Some("TLS_AES_128_GCM_SHA256"));
1112 assert_eq!(ctx.tls_alpn, Some("h2"));
1113 assert!(!ctx.strict_sni_binding);
1114 }
1115
1116 #[test]
1117 fn test_reset_preserves_connection_state() {
1118 let mut ctx = make_context();
1119 ctx.closing = true;
1120 ctx.cluster_id = Some("cluster-1".to_owned());
1121 ctx.backend_id = Some("backend-1".to_owned());
1122 ctx.sticky_session = Some("session-abc".to_owned());
1123
1124 let original_id = ctx.id;
1125 let original_protocol = ctx.protocol;
1126 let original_public_address = ctx.public_address;
1127
1128 ctx.reset();
1129
1130 // Connection-level state is preserved
1131 assert!(ctx.closing);
1132 assert_eq!(ctx.cluster_id.as_deref(), Some("cluster-1"));
1133 assert_eq!(ctx.backend_id.as_deref(), Some("backend-1"));
1134 assert_eq!(ctx.sticky_session.as_deref(), Some("session-abc"));
1135 assert_eq!(ctx.id, original_id);
1136 assert_eq!(ctx.protocol, original_protocol);
1137 assert_eq!(ctx.public_address, original_public_address);
1138 }
1139
1140 // ── write_forwarded_for_by (RFC 7239 §6 IP-literal bracketing) ──────
1141 //
1142 // Locks in the IPv6 bracket+quote contract that matches HAProxy
1143 // (`_7239_print_ip6` in `src/http_ext.c`) and prevents regression to
1144 // the ambiguous `for="2001:db8::1:8080"` form flagged in issue #1254.
1145
1146 use std::net::Ipv6Addr;
1147
1148 fn render_for_by(peer: SocketAddr, public_ip: IpAddr) -> String {
1149 let mut buf = Vec::new();
1150 write_forwarded_for_by(&mut buf, peer.ip(), peer.port(), public_ip);
1151 String::from_utf8(buf).expect("forwarded fragment must be ASCII")
1152 }
1153
1154 fn render_suffix(proto: &str, peer: SocketAddr, public_ip: IpAddr) -> String {
1155 let mut buf = Vec::new();
1156 write_forwarded_suffix(&mut buf, proto, peer.ip(), peer.port(), public_ip);
1157 String::from_utf8(buf).expect("forwarded fragment must be ASCII")
1158 }
1159
1160 #[test]
1161 fn test_forwarded_ipv4_peer_ipv4_by_unchanged() {
1162 // IPv4 must keep the pre-fix wire format: unquoted `by=`, bare IPv4 in
1163 // `for=`. Sōzu emits `for=` quoted unconditionally (HAProxy emits it
1164 // quoted only when a port is attached; we always attach a port).
1165 let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7)), 54321);
1166 let public_ip = IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1));
1167 assert_eq!(
1168 render_for_by(peer, public_ip),
1169 r#";for="203.0.113.7:54321";by=198.51.100.1"#,
1170 );
1171 }
1172
1173 #[test]
1174 fn test_forwarded_ipv6_peer_brackets_disambiguate_port() {
1175 // Regression for issue #1254: without brackets, `for="2001:db8::1:8080"`
1176 // is ambiguous (could parse as `::1` + port `8080` or `:1:8080` literal).
1177 let peer = SocketAddr::new(IpAddr::V6("2001:db8::1".parse::<Ipv6Addr>().unwrap()), 8080);
1178 let public_ip = IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1));
1179 assert_eq!(
1180 render_for_by(peer, public_ip),
1181 r#";for="[2001:db8::1]:8080";by=198.51.100.1"#,
1182 );
1183 }
1184
1185 #[test]
1186 fn test_forwarded_ipv6_public_address_bracketed_and_quoted() {
1187 // `by=` carries no port but RFC 7239 §6 still requires the IP-literal
1188 // brackets for IPv6, and the value must be quoted because the literal
1189 // contains `:`.
1190 let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7)), 54321);
1191 let public_ip = IpAddr::V6("2001:db8::2".parse::<Ipv6Addr>().unwrap());
1192 assert_eq!(
1193 render_for_by(peer, public_ip),
1194 r#";for="203.0.113.7:54321";by="[2001:db8::2]""#,
1195 );
1196 }
1197
1198 #[test]
1199 fn test_forwarded_ipv6_peer_and_public_both_bracketed() {
1200 let peer = SocketAddr::new(IpAddr::V6("2001:db8::1".parse::<Ipv6Addr>().unwrap()), 8080);
1201 let public_ip = IpAddr::V6("2001:db8::2".parse::<Ipv6Addr>().unwrap());
1202 assert_eq!(
1203 render_for_by(peer, public_ip),
1204 r#";for="[2001:db8::1]:8080";by="[2001:db8::2]""#,
1205 );
1206 }
1207
1208 #[test]
1209 fn test_forwarded_suffix_prepends_proto_and_brackets_ipv6() {
1210 // The suffix form is what we append when an inbound `Forwarded`
1211 // header already exists. Same bracketing contract must hold.
1212 let peer = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 443);
1213 let public_ip = IpAddr::V6("fe80::1".parse::<Ipv6Addr>().unwrap());
1214 assert_eq!(
1215 render_suffix("https", peer, public_ip),
1216 r#", proto=https;for="[::1]:443";by="[fe80::1]""#,
1217 );
1218 }
1219
1220 // ── traceparent / opentelemetry helpers ─────────────────────────────
1221
1222 #[cfg(feature = "opentelemetry")]
1223 mod otel {
1224 use super::super::*;
1225
1226 #[test]
1227 fn test_parse_hex_valid() {
1228 let (val, rest) = parse_hex::<4>(b"abcd1234").unwrap();
1229 assert_eq!(&val, b"abcd");
1230 assert_eq!(rest, b"1234");
1231 }
1232
1233 #[test]
1234 fn test_parse_hex_exact_length() {
1235 let (val, rest) = parse_hex::<8>(b"01234567").unwrap();
1236 assert_eq!(&val, b"01234567");
1237 assert!(rest.is_empty());
1238 }
1239
1240 #[test]
1241 fn test_parse_hex_too_short() {
1242 assert!(parse_hex::<4>(b"ab").is_none());
1243 }
1244
1245 #[test]
1246 fn test_parse_hex_rejects_non_hex() {
1247 assert!(parse_hex::<4>(b"ghij").is_none());
1248 }
1249
1250 #[test]
1251 fn test_parse_hex_rejects_uppercase_is_ok() {
1252 // Uppercase hex digits are valid
1253 let (val, _) = parse_hex::<4>(b"ABCD").unwrap();
1254 assert_eq!(&val, b"ABCD");
1255 }
1256
1257 #[test]
1258 fn test_skip_separator_valid() {
1259 let rest = skip_separator(b"-hello").unwrap();
1260 assert_eq!(rest, b"hello");
1261 }
1262
1263 #[test]
1264 fn test_skip_separator_wrong_char() {
1265 assert!(skip_separator(b"+hello").is_none());
1266 }
1267
1268 #[test]
1269 fn test_skip_separator_empty() {
1270 assert!(skip_separator(b"").is_none());
1271 }
1272
1273 #[test]
1274 fn test_build_traceparent_format() {
1275 let trace_id: [u8; 32] = *b"4bf92f3577b34da6a3ce929d0e0e4736";
1276 let parent_id: [u8; 16] = *b"00f067aa0ba902b7";
1277
1278 let result = build_traceparent(&trace_id, &parent_id);
1279 assert_eq!(
1280 &result,
1281 b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
1282 );
1283 }
1284
1285 #[test]
1286 fn test_build_traceparent_length() {
1287 let trace_id = [b'a'; 32];
1288 let parent_id = [b'b'; 16];
1289 let result = build_traceparent(&trace_id, &parent_id);
1290 // Format: "00-" (3) + trace_id (32) + "-" (1) + parent_id (16) + "-01" (3) = 55
1291 assert_eq!(result.len(), 55);
1292 }
1293
1294 #[test]
1295 fn test_parse_traceparent_valid() {
1296 let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1297 let store = kawa::Store::Static(input);
1298 let (trace_id, parent_id) = parse_traceparent(&store, input).unwrap();
1299 assert_eq!(&trace_id, b"4bf92f3577b34da6a3ce929d0e0e4736");
1300 assert_eq!(&parent_id, b"00f067aa0ba902b7");
1301 }
1302
1303 #[test]
1304 fn test_parse_traceparent_sampled_flag_zero() {
1305 let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00";
1306 let store = kawa::Store::Static(input);
1307 let result = parse_traceparent(&store, input);
1308 assert!(result.is_some());
1309 }
1310
1311 #[test]
1312 fn test_parse_traceparent_wrong_version() {
1313 let input = b"01-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1314 let store = kawa::Store::Static(input);
1315 assert!(parse_traceparent(&store, input).is_none());
1316 }
1317
1318 #[test]
1319 fn test_parse_traceparent_too_short() {
1320 let input = b"00-4bf9";
1321 let store = kawa::Store::Static(input);
1322 assert!(parse_traceparent(&store, input).is_none());
1323 }
1324
1325 #[test]
1326 fn test_parse_traceparent_trailing_data() {
1327 // Extra characters after the trace-flags should cause rejection
1328 let input = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01-extra";
1329 let store = kawa::Store::Static(input);
1330 assert!(parse_traceparent(&store, input).is_none());
1331 }
1332
1333 #[test]
1334 fn test_parse_traceparent_missing_separator() {
1335 let input = b"004bf92f3577b34da6a3ce929d0e0e473600f067aa0ba902b701";
1336 let store = kawa::Store::Static(input);
1337 assert!(parse_traceparent(&store, input).is_none());
1338 }
1339
1340 #[test]
1341 fn test_parse_build_roundtrip() {
1342 let trace_id: [u8; 32] = *b"4bf92f3577b34da6a3ce929d0e0e4736";
1343 let parent_id: [u8; 16] = *b"00f067aa0ba902b7";
1344
1345 // Build a traceparent from known IDs
1346 let built = build_traceparent(&trace_id, &parent_id);
1347
1348 // Verify that the built value matches the expected static string,
1349 // then parse that static string back to confirm roundtrip.
1350 let expected: &[u8] = b"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
1351 assert_eq!(&built[..], expected);
1352
1353 let store = kawa::Store::Static(expected);
1354 let (parsed_trace_id, parsed_parent_id) = parse_traceparent(&store, expected).unwrap();
1355
1356 assert_eq!(parsed_trace_id, trace_id);
1357 assert_eq!(parsed_parent_id, parent_id);
1358 }
1359 }
1360}