Skip to main content

sozu_lib/protocol/mux/
router.rs

1//! Backend routing and connection reuse for the mux layer.
2//!
3//! [`Router`] owns the map of token -> backend [`Connection`] and centralises
4//! the logic for picking (or opening) the right backend for an incoming
5//! request. The H2 reuse strategy prefers the least-loaded non-draining
6//! connection of the target cluster; H1 falls back to keep-alive reuse.
7
8use std::{cell::RefCell, collections::HashMap, rc::Rc, time::Duration};
9
10use mio::{Interest, Token, net::TcpStream};
11use sozu_command::{
12    logging::ansi_palette,
13    proto::command::{ListenerType, RedirectPolicy, RedirectScheme},
14};
15
16#[cfg(debug_assertions)]
17use super::DebugEvent;
18use super::{BackendStatus, Connection, Context, GlobalStreamId, Position, StreamState};
19use crate::{
20    BackendConnectionError, L7ListenerHandler, L7Proxy, ListenerHandler, ProxySession, Readiness,
21    RetrieveClusterError,
22    backends::{Backend, BackendError},
23    protocol::http::editor::{HeaderEditMode, HeaderEditSnapshot, HttpContext},
24    router::{HeaderEdit, RouteResult},
25    server::CONN_RETRIES,
26    socket::SessionTcpStream,
27    timer::TimeoutContainer,
28};
29
30use crate::metrics::names;
31
32/// Module-level prefix used on every log line emitted from the router.
33///
34/// Two arms:
35/// * `log_module_context!()` — zero-arg, legacy `MUX-ROUTER\t >>>` output.
36///   Kept for sites without an `HttpContext` in scope. No call site in this
37///   module currently uses this arm (every one has an `HttpContext` reachable
38///   via [`Context::http_context`] or a direct `&mut HttpContext`
39///   parameter), but the arm is retained so the macro name stays stable for
40///   future sessionless callers.
41/// * `log_module_context!($http_context)` — rich form. `$http_context` must be
42///   `&HttpContext` (or coerce to one). Produces the same
43///   `[session req cluster backend]` bracket as RUSTLS/PIPE/TCP followed by a
44///   `Session(frontend=..., method=..., authority=...)` block, so router
45///   lines are filterable by session ULID or request ULID. `cluster_id` is
46///   already carried by the bracket's third slot — not duplicated inside
47///   `Session(...)`.
48macro_rules! log_module_context {
49    () => {{
50        let (open, reset, _, _, _) = ansi_palette();
51        format!("{open}MUX-ROUTER{reset}\t >>>", open = open, reset = reset)
52    }};
53    ($http_context:expr) => {{
54        let (open, reset, grey, gray, white) = ansi_palette();
55        let http_ctx: &HttpContext = &$http_context;
56        let ctx = http_ctx.log_context();
57        format!(
58            "{gray}{ctx}{reset}\t{open}MUX-ROUTER{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend:?}{reset}, {gray}method{reset}={white}{method:?}{reset}, {gray}authority{reset}={white}{authority:?}{reset})\t >>>",
59            open = open,
60            reset = reset,
61            grey = grey,
62            gray = gray,
63            white = white,
64            ctx = ctx,
65            frontend = http_ctx.session_address,
66            method = http_ctx.method,
67            authority = http_ctx.authority,
68        )
69    }};
70}
71
72#[derive(Debug)]
73pub struct Router {
74    pub backends: HashMap<Token, Connection<SessionTcpStream>>,
75    pub configured_backend_timeout: Duration,
76    pub configured_connect_timeout: Duration,
77    /// Fallback readiness used when a backend token is missing from the map.
78    /// This prevents panicking in the Endpoint trait methods that return references.
79    pub(super) fallback_readiness: Readiness,
80}
81
82impl Router {
83    pub fn new(configured_backend_timeout: Duration, configured_connect_timeout: Duration) -> Self {
84        Self {
85            backends: HashMap::new(),
86            configured_backend_timeout,
87            configured_connect_timeout,
88            fallback_readiness: Readiness::new(),
89        }
90    }
91
92    pub(super) fn connect<L: ListenerHandler + L7ListenerHandler>(
93        &mut self,
94        stream_id: GlobalStreamId,
95        context: &mut Context<L>,
96        session: Rc<RefCell<dyn ProxySession>>,
97        proxy: Rc<RefCell<dyn L7Proxy>>,
98        // Frontend session token, threaded in from `Mux::ready` so the
99        // per-(cluster, source-IP) accounting can key on it without
100        // re-borrowing `session` — the outer event-loop call chain
101        // already holds a mutable borrow of that cell.
102        frontend_token: Token,
103    ) -> Result<(), BackendConnectionError> {
104        let stream = &mut context.streams[stream_id];
105        // when reused, a stream should be detached from its old connection, if not we could end
106        // with concurrent connections on a single endpoint
107        if !matches!(stream.state, StreamState::Link) {
108            error!(
109                "{} stream {} expected to be in Link state, got {:?}",
110                log_module_context!(stream.context),
111                stream_id,
112                stream.state
113            );
114            return Err(BackendConnectionError::MaxSessionsMemory);
115        }
116        #[cfg(debug_assertions)]
117        context
118            .debug
119            .push(DebugEvent::Str(stream.context.get_route()));
120        if stream.attempts >= CONN_RETRIES {
121            incr!(
122                "backend.connect.retries_exhausted",
123                stream.context.cluster_id.as_deref(),
124                stream.context.backend_id.as_deref()
125            );
126            return Err(BackendConnectionError::MaxConnectionRetries(
127                stream.context.cluster_id.clone(),
128            ));
129        }
130        stream.attempts += 1;
131
132        // Borrow front mutably (so route_from_request can rewrite the request
133        // line authority/path and inject request-side header edits before we
134        // forward to the backend) plus context mutably (so it can stash
135        // redirect_location / www_authenticate / original_authority /
136        // headers_response). We split-borrow manually to keep the rest of
137        // `connect` working with `stream_context` aliasing `stream.context`.
138        let (front_ref, stream_context_ref) = {
139            let stream_split = &mut *stream;
140            (&mut stream_split.front, &mut stream_split.context)
141        };
142        let cluster_id = self
143            .route_from_request(stream_context_ref, front_ref, &context.listener, &proxy)
144            .map_err(BackendConnectionError::RetrieveClusterError)?;
145        let stream_context = &mut stream.context;
146        stream_context.cluster_id = Some(cluster_id.to_owned());
147
148        let (
149            frontend_should_stick,
150            frontend_should_redirect_https,
151            h2,
152            cluster_max_connections_per_ip,
153            cluster_retry_after,
154        ) = proxy
155            .borrow()
156            .clusters()
157            .get(&cluster_id)
158            .map(|cluster| {
159                (
160                    cluster.sticky_session,
161                    cluster.https_redirect,
162                    cluster.http2.unwrap_or(false),
163                    cluster.max_connections_per_ip,
164                    cluster.retry_after,
165                )
166            })
167            .unwrap_or((false, false, false, None, None));
168
169        // ── Legacy `cluster.https_redirect` short-circuit ──
170        //
171        // Resolve the legacy HTTP→HTTPS redirect BEFORE per-(cluster,
172        // source-IP) accounting so a redirect-only request never
173        // consumes an IP slot. Otherwise a same-IP client iterating an
174        // HTTP→HTTPS hop could trip 429 ahead of the 301 even though no
175        // backend would have been opened. A duplicate guard that lived
176        // here previously (rebase artefact — two identical
177        // `if frontend_should_redirect_https && …` blocks back-to-back)
178        // is folded into this single early-return.
179        // Frontend-scoped `RedirectPolicy::PERMANENT` already returns
180        // from `route_from_request` with the same error, so this only
181        // handles the legacy cluster-level path that doesn't surface
182        // from `route_from_request`.
183        if frontend_should_redirect_https && matches!(proxy.borrow().kind(), ListenerType::Http) {
184            return Err(BackendConnectionError::RetrieveClusterError(
185                RetrieveClusterError::HttpsRedirect,
186            ));
187        }
188
189        // Per-(cluster, source-IP) connection limit gate. Runs AFTER cluster
190        // resolution AND legacy redirect emission (so a 401/421/redirect
191        // frontend never trips the limit) and BEFORE any backend selection
192        // (so a rejection consumes neither a backend pool slot nor a retry
193        // budget). The check uses the source IP from the per-stream
194        // `HttpContext.session_address`, which is the proxy-protocol-aware
195        // client address when present, falling back to `peer_addr`. The
196        // limit governs distinct **frontend connections** per
197        // `(cluster, ip)`: an H2 session multiplexing N streams to the same
198        // cluster from the same IP still consumes a single slot.
199        let session_ip = stream_context.session_address.map(|sa| sa.ip());
200        if let Some(ip) = session_ip {
201            // The frontend session is mutably borrowed up the call stack
202            // (`HttpSession::ready` -> `state.ready` -> `Mux::ready` ->
203            // here), so we cannot reach `session.borrow().frontend_token()`.
204            // The token is threaded in by the caller instead.
205            let sessions_rc = proxy.borrow().sessions();
206            let at_limit = sessions_rc.borrow().cluster_ip_at_limit(
207                frontend_token,
208                &cluster_id,
209                &ip,
210                cluster_max_connections_per_ip,
211            );
212            if at_limit {
213                let retry_after = sessions_rc
214                    .borrow()
215                    .effective_retry_after(cluster_retry_after);
216                // Stash the resolved retry value on the stream so the
217                // mux's BackendConnectionError → 429 mapping can render
218                // (or elide) the `Retry-After` header without
219                // re-deriving the override chain.
220                stream_context.retry_after_seconds = Some(retry_after).filter(|v| *v > 0);
221                return Err(BackendConnectionError::TooManyConnectionsPerIp {
222                    cluster_id: cluster_id.to_owned(),
223                });
224            }
225            // Idempotent track — H2 streams to the same `(cluster, ip)`
226            // share a single slot in the per-token set. Decrement happens
227            // wholesale on session close via `untrack_all_cluster_ip`.
228            sessions_rc
229                .borrow_mut()
230                .track_cluster_ip(frontend_token, cluster_id.clone(), ip);
231        }
232
233        /*
234        H2 connecting strategy (least-loaded):
235        - look at every backend connection
236        - among connected backends for this cluster, pick the one with the fewest active streams
237        - fall back to a connecting backend if no connected one exists
238        - if no backend is to reuse, ask the router for a socket to the "next in line" backend
239
240        H1 strategy: reuse the first KeepAlive backend for this cluster.
241         */
242
243        let mut reuse_token = None;
244        let mut best_h2_stream_count = usize::MAX;
245        for (token, backend) in &self.backends {
246            match (h2, backend.position()) {
247                (_, Position::Server) => {
248                    error!(
249                        "{} Backend connection unexpectedly behaves like a server",
250                        log_module_context!(stream_context)
251                    );
252                    continue;
253                }
254                (_, Position::Client(_, _, BackendStatus::Disconnecting)) => {}
255
256                (true, Position::Client(other_cluster_id, _, BackendStatus::Connected)) => {
257                    if *other_cluster_id == cluster_id && !backend.is_draining() {
258                        // Pick the non-draining H2 connection with the fewest active streams
259                        let Connection::H2(h2c) = backend else {
260                            continue;
261                        };
262                        let stream_count = h2c.streams.len();
263                        if stream_count
264                            >= h2c.peer_settings.settings_max_concurrent_streams as usize
265                        {
266                            continue;
267                        }
268                        if stream_count < best_h2_stream_count {
269                            best_h2_stream_count = stream_count;
270                            reuse_token = Some(*token);
271                        }
272                    }
273                }
274                (true, Position::Client(other_cluster_id, _, BackendStatus::Connecting(_))) => {
275                    // Only use a connecting backend if no connected one was found
276                    if *other_cluster_id == cluster_id
277                        && best_h2_stream_count == usize::MAX
278                        && matches!(backend, Connection::H2(_))
279                    {
280                        reuse_token = Some(*token)
281                    }
282                }
283                (true, Position::Client(other_cluster_id, _, BackendStatus::KeepAlive)) => {
284                    if *other_cluster_id == cluster_id && matches!(backend, Connection::H2(_)) {
285                        error!(
286                            "{} ConnectionH2 unexpectedly behaves like H1 with KeepAlive",
287                            log_module_context!(stream_context)
288                        );
289                    }
290                }
291
292                (false, Position::Client(old_cluster_id, _, BackendStatus::KeepAlive)) => {
293                    if *old_cluster_id == cluster_id {
294                        reuse_token = Some(*token);
295                        break;
296                    }
297                }
298                // can't bundle H1 streams together
299                (false, Position::Client(_, _, BackendStatus::Connected))
300                | (false, Position::Client(_, _, BackendStatus::Connecting(_))) => {}
301            }
302        }
303        trace!(
304            "{} connect: (stick={}, h2={}) -> (reuse={:?})",
305            log_module_context!(stream_context),
306            frontend_should_stick,
307            h2,
308            reuse_token
309        );
310
311        if let Some(token) = reuse_token {
312            // Pool reuse: an existing backend connection (H2 multiplex slot or
313            // H1 keep-alive socket) is being reattached to this stream. Pair
314            // with `backend.pool.miss` below — together they describe the
315            // pool's hit/miss ratio. Counted before any commit so the metric
316            // is consistent with the trace log.
317            incr!(names::backend::POOL_HIT);
318            trace!(
319                "{} reused backend: {:#?}",
320                log_module_context!(stream_context),
321                self.backends.get(&token)
322            );
323            // Link backend to stream for the reused connection path. We check
324            // that the backend can accept a new stream before committing any
325            // per-stream state.
326            let Some(backend_conn) = self.backends.get_mut(&token) else {
327                error!(
328                    "{} reused backend token {:?} missing from backends map",
329                    log_module_context!(stream_context),
330                    token
331                );
332                return Err(BackendConnectionError::MaxSessionsMemory);
333            };
334            if !backend_conn.start_stream(stream_id, context) {
335                // Use `context.http_context(stream_id)` instead of reusing
336                // `stream_context`: `start_stream` above takes `&mut
337                // context`, which reborrows the slab mutably and ends any
338                // outstanding `stream_context` reference. A fresh shared
339                // borrow via the accessor is borrow-check clean.
340                error!(
341                    "{} Backend rejected stream start (max concurrent streams reached)",
342                    log_module_context!(context.http_context(stream_id))
343                );
344                return Err(BackendConnectionError::MaxSessionsMemory);
345            }
346            // For reused backends: set context fields and metrics lifecycle
347            if let Some(backend_conn) = self.backends.get(&token) {
348                if let Position::Client(_, backend_ref, _) = backend_conn.position() {
349                    let backend = backend_ref.borrow();
350                    let stream = &mut context.streams[stream_id];
351                    stream.context.backend_id = Some(backend.backend_id.to_owned());
352                    stream.context.backend_address = Some(backend.address);
353                    stream.metrics.backend_id = Some(backend.backend_id.to_owned());
354                    stream.metrics.backend_start();
355                    stream.metrics.backend_connected();
356                }
357            }
358            context.link_stream(stream_id, token);
359            return Ok(());
360        }
361
362        // New-backend path: fall through.
363        //
364        // Pool miss: no reusable connection was found (no live H2 multiplex
365        // slot for this cluster, no H1 keep-alive socket). A fresh TCP dial
366        // and full backend handshake will follow. Pair with `backend.pool.hit`
367        // above. The metric is incremented BEFORE `backend_from_request` so
368        // the count includes attempts that fail at backend selection
369        // (BackendError::NoBackendForCluster, etc.) — every miss is a slot
370        // we did not save. The dial itself may still fail
371        // (BackendConnectionError::*), in which case `backend.pool.size` is
372        // never bumped (see the gauge below) but the miss is already counted.
373        incr!(names::backend::POOL_MISS);
374        let token = {
375            //
376            // SECURITY (CWE-400): defer every stateful side-effect
377            // (backend.connections / connections_per_backend gauges, slab
378            // add_session, mio register_socket, self.backends.insert,
379            // stream.metrics.backend_start) until AFTER `new_h2_client` AND
380            // `start_stream` have both succeeded. If either fails we must
381            // return Err without leaking a slab entry, an epoll registration,
382            // a gauge counter, or a router-map entry.
383            //
384            // The TcpStream lives on the stack here and is moved into the
385            // Connection by `new_h2_client`/`new_h1_client`; on failure the
386            // Connection (or the raw TcpStream, for the pool-exhaustion
387            // branch that drops inside `new_h2_client`) is dropped, closing
388            // the fd. No token is ever allocated, so there is nothing to
389            // roll back.
390            let (socket, backend) = self.backend_from_request(
391                &cluster_id,
392                frontend_should_stick,
393                stream_context,
394                proxy.clone(),
395                &context.listener,
396            )?;
397
398            if let Err(e) = socket.set_nodelay(true) {
399                error!(
400                    "{} error setting nodelay on back socket({:?}): {:?}",
401                    log_module_context!(context.http_context(stream_id)),
402                    socket,
403                    e
404                );
405            }
406
407            // Cache the backend's configured address so SOCKET log lines
408            // fired on ECONNREFUSED (or any failed async `connect()`) can
409            // still render `peer=<backend>` — `getpeername(2)` returns
410            // ENOTCONN in that state, so the live lookup path would show
411            // `peer=None` exactly when the operator needs the backend id.
412            let backend_peer = Some(backend.borrow().address);
413            let socket = SessionTcpStream::new(socket, context.session_ulid, backend_peer);
414
415            // Build an un-armed timeout: we can't call `TimeoutContainer::new`
416            // yet because that requires the slab token, and we only allocate
417            // the token on the happy path. `.set(token)` below arms it.
418            let timeout_container = TimeoutContainer::new_empty(self.configured_connect_timeout);
419            let flood_config = context.listener.borrow().get_h2_flood_config();
420            let connection_config = context.listener.borrow().get_h2_connection_config();
421            let stream_idle_timeout = context.listener.borrow().get_h2_stream_idle_timeout();
422            let graceful_shutdown_deadline = context
423                .listener
424                .borrow()
425                .get_h2_graceful_shutdown_deadline();
426            let backend_id_for_gauge = backend.borrow().backend_id.to_owned();
427            let mut connection = if h2 {
428                match Connection::new_h2_client(
429                    context.session_ulid,
430                    socket,
431                    cluster_id.to_owned(),
432                    backend,
433                    context.pool.clone(),
434                    timeout_container,
435                    flood_config,
436                    connection_config,
437                    stream_idle_timeout,
438                    graceful_shutdown_deadline,
439                ) {
440                    Some(connection) => connection,
441                    // pool exhaustion: socket already dropped by new_h2_client,
442                    // no side-effects were committed.
443                    None => return Err(BackendConnectionError::MaxBuffers),
444                }
445            } else {
446                Connection::new_h1_client(
447                    context.session_ulid,
448                    socket,
449                    cluster_id.to_owned(),
450                    backend,
451                    timeout_container,
452                )
453            };
454
455            // Check the backend can accept a new stream BEFORE committing any
456            // registry state. `start_stream` increments `active_requests` via
457            // `pre_start_stream_client_bookkeeping` and undoes it itself on
458            // failure (see `Connection::start_stream`), so dropping the
459            // connection on a false return leaves backend accounting clean.
460            if !connection.start_stream(stream_id, context) {
461                error!(
462                    "{} Backend rejected stream start (max concurrent streams reached)",
463                    log_module_context!(context.http_context(stream_id))
464                );
465                // `connection` (socket + timeout_container) drops here.
466                return Err(BackendConnectionError::MaxSessionsMemory);
467            }
468
469            // --- Happy path: commit side-effects in one atomic-ish block ---
470            let stream = &mut context.streams[stream_id];
471            stream.metrics.backend_start();
472            stream.metrics.backend_id = stream.context.backend_id.to_owned();
473            gauge_add!(names::backend::CONNECTIONS, 1);
474            // `backend.pool.size` mirrors `backend.connections` exactly: one
475            // entry per `Router::backends` token. The `-1` partner lives in
476            // `connection.rs::pre_close_client_bookkeeping` (graceful close)
477            // and `mod.rs::close_backend` (session teardown). Symmetric
478            // pairing with both decrement sites is the only defence against
479            // the gauge underflow class of bug fixed by a650ad69 / d2f01ed4.
480            gauge_add!(names::backend::POOL_SIZE, 1);
481            gauge_add!(
482                names::backend::CONNECTIONS_PER_BACKEND,
483                1,
484                Some(&cluster_id),
485                Some(&backend_id_for_gauge)
486            );
487
488            let token = proxy.borrow().add_session(session);
489
490            {
491                let socket_ref = connection.socket_mut();
492                if let Err(e) = proxy.borrow().register_socket(
493                    socket_ref,
494                    token,
495                    Interest::READABLE | Interest::WRITABLE,
496                ) {
497                    // SECURITY (CWE-400): treat mio registration failure as a
498                    // hard connect failure. Without this rollback the gauges
499                    // (`backend.connections`, `backend.pool.size`,
500                    // `connections_per_backend`), the slab session, and the
501                    // already-incremented `Backend.active_requests` counter
502                    // (bumped in `Connection::start_stream` ->
503                    // `pre_start_stream_client_bookkeeping`) all leak until
504                    // the connect timeout fires. Under fd pressure
505                    // (EMFILE/ENFILE) this can occur in tight bursts and
506                    // poison capacity dashboards.
507                    error!(
508                        "{} error registering back socket: {:?} — rolling back",
509                        log_module_context!(context.http_context(stream_id)),
510                        e
511                    );
512                    // Undo the gauge increments committed above.
513                    gauge_add!(names::backend::CONNECTIONS, -1);
514                    gauge_add!(names::backend::POOL_SIZE, -1);
515                    gauge_add!(
516                        names::backend::CONNECTIONS_PER_BACKEND,
517                        -1,
518                        Some(&cluster_id),
519                        Some(&backend_id_for_gauge)
520                    );
521                    // Drop the slab session and the connection. The connection
522                    // is local to this scope; dropping it here also closes the
523                    // underlying TcpStream and releases the
524                    // `Backend.active_requests` increment via the regular
525                    // session drop path (`pre_close_client_bookkeeping`).
526                    proxy.borrow().remove_session(token);
527                    return Err(BackendConnectionError::MaxSessionsMemory);
528                }
529            }
530
531            // Arm the connect timeout now that we own a real token.
532            connection.timeout_container().set(token);
533
534            self.backends.insert(token, connection);
535            token
536        };
537
538        context.link_stream(stream_id, token);
539        Ok(())
540    }
541
542    fn route_from_request<L: ListenerHandler + L7ListenerHandler>(
543        &mut self,
544        context: &mut HttpContext,
545        front: &mut super::GenericHttpStream,
546        listener: &Rc<RefCell<L>>,
547        proxy: &Rc<RefCell<dyn L7Proxy>>,
548    ) -> Result<String, RetrieveClusterError> {
549        let (host, uri, method) = match context.extract_route() {
550            Ok(tuple) => tuple,
551            Err(cluster_error) => {
552                // we are past kawa parsing if it succeeded this can't fail
553                // if the request was malformed it was caught by kawa and we sent a 400
554                error!(
555                    "{} Malformed request in connect (should be caught at parsing) {:?}: {}",
556                    log_module_context!(context),
557                    context,
558                    cluster_error
559                );
560                return Err(cluster_error);
561            }
562        };
563        // Snapshot the pre-rewrite authority into an owned string so we
564        // can later stash it on `context.original_authority` without
565        // mutably aliasing the immutable borrow that `host: &str` still
566        // holds on `context`.
567        let captured_authority = host.to_owned();
568
569        // ── TLS cert SAN ↔ HTTP :authority binding ────────────────────────
570        // Reject any request whose `:authority` is not covered by a SAN of
571        // the certificate Sōzu actually served at the TLS handshake, with
572        // RFC 6125 §6.4.3 wildcard handling. Without this binding, an
573        // attacker holding a valid certificate for tenant A could open TLS
574        // with SNI=A then send an H2 stream with `:authority=tenantB.…` and
575        // reach tenant B's backend, crossing the TLS trust boundary
576        // (CWE-346 / CWE-444). The H2 spec explicitly allows browsers to
577        // coalesce streams onto a connection whenever the server is
578        // authoritative for the new origin (RFC 7540 §9.1.1 / RFC 9113
579        // §9.1.1), which "authoritative" means "covered by a SAN of the
580        // served cert"; rejecting coalesced streams as 421 caused the
581        // user-visible bug this predicate fixes (RFC 9110 §15.5.20).
582        //
583        // Plaintext listeners bypass the check (SNI is always `None`).
584        // Connections where SNI was sent but no cert matched (rustls served
585        // the default cert) carry `Some(empty)` SAN snapshot, so every
586        // authority is rejected — Sōzu is not authoritative for any name.
587        // Connections with no SNI fall back to the legacy exact-SNI match
588        // predicate (`authority_matches_sni`) for parity with pre-fix
589        // behaviour on the pathological "no SNI" case.
590        // Operators may opt out per-listener via
591        // `HttpsListenerConfig::strict_sni_binding = false`.
592        if let Some(sni) = context
593            .tls_server_name
594            .as_deref()
595            .filter(|_| context.strict_sni_binding)
596        {
597            let matched: Option<&str> = match context.tls_cert_names.as_deref() {
598                Some(cert_names) => authority_matched_cert_name(host, cert_names),
599                None => {
600                    if authority_matches_sni(host, sni) {
601                        Some(sni)
602                    } else {
603                        None
604                    }
605                }
606            };
607            match matched {
608                Some(matched_name) => {
609                    // Real coalescing = matched SAN differs from the SNI's
610                    // value after the matcher's port-strip + ASCII case
611                    // folding. Same-name requests are the common
612                    // non-coalesced path; do not pollute the counter or
613                    // logs with them. The ALPN=`h2` gate is a defensive
614                    // guard, not load-bearing under current invariants —
615                    // every request reaching `route_from_request` on an
616                    // HTTPS listener with `tls_cert_names` populated has
617                    // already gone through the H2 mux (ALPN=h2 by
618                    // construction). Kept explicit so a future routing
619                    // refactor that funnels H1 keep-alive through the
620                    // same predicate doesn't silently double-count
621                    // sequential `Host:` reuse as "coalescing".
622                    if !authority_matches_sni(host, sni) && context.tls_alpn == Some("h2") {
623                        incr!(names::h2::COALESCING_ACCEPTED);
624                        debug!(
625                            "{} accepted coalesced authority {:?} (SNI {:?}, matched SAN {:?})",
626                            log_module_context!(context),
627                            host,
628                            sni,
629                            matched_name,
630                        );
631                    }
632                }
633                None => {
634                    incr!(names::http::SNI_AUTHORITY_MISMATCH);
635                    warn!(
636                        "{} rejecting request: TLS cert SANs do not cover :authority {:?} (SNI {:?})",
637                        log_module_context!(context),
638                        host,
639                        sni,
640                    );
641                    return Err(RetrieveClusterError::SniAuthorityMismatch {
642                        sni: sni.to_owned(),
643                        authority: host.to_owned(),
644                    });
645                }
646            }
647        }
648
649        let route_result = listener.borrow().frontend_from_request(host, uri, method);
650
651        let route = match route_result {
652            Ok(route) => route,
653            Err(frontend_error) => {
654                trace!("{} {}", log_module_context!(context), frontend_error);
655                return Err(RetrieveClusterError::RetrieveFrontend(frontend_error));
656            }
657        };
658
659        // Stash the pre-rewrite authority unconditionally so log lines,
660        // access logs, and audit records that fire on ANY downstream
661        // path (denial, redirect, basic-auth 401, backend-connect
662        // failure, successful forward) carry the value the client
663        // actually sent. Capturing inside the rewrite helper alone would
664        // lose it on every branch where the rewrite is not applied.
665        context.original_authority = Some(captured_authority);
666
667        // ── Resolve the routing decision ──────────────────────────────────
668        // Snapshot the policy fields we need before consuming `route`, then
669        // map each policy outcome to either an early-error variant (which
670        // the caller turns into a default answer) or a cluster_id (which
671        // proceeds to backend connect).
672        let RouteResult {
673            cluster_id,
674            redirect,
675            redirect_scheme,
676            redirect_template,
677            rewritten_host,
678            rewritten_path,
679            rewritten_port,
680            headers_request,
681            headers_response,
682            required_auth: frontend_required_auth,
683            ..
684        } = route;
685
686        // ── HSTS (RFC 6797) snapshot hoist for HTTPS ──────────────────────
687        // The response snapshot is built in two passes so HSTS reaches
688        // every HTTPS response code (RFC 6797 §8.1 — including
689        // proxy-generated 3xx / 401 / 5xx default answers) WITHOUT
690        // changing the pre-PR scope of operator-defined `Append`
691        // response headers (which only apply on the regular forward
692        // path).
693        //
694        // Pass 1 (here, before any early return): for HTTPS only, copy
695        // ONLY the HSTS-class typed edits (`SetIfAbsent | Set`). These
696        // need to land on default answers — `set_default_answer_with_retry_after`
697        // bypasses the post-forward copy below.
698        //
699        // Pass 2 (post-forward, end of function): copy EVERY edit
700        // (including operator `Append` headers). Runs only on the
701        // regular forward path because the early returns short-circuit
702        // before reaching it.
703        //
704        // Plain-HTTP listeners are skipped here per RFC 6797 §7.2 (no
705        // STS over plaintext) — defense in depth on top of the
706        // TOML-time `ConfigError::HstsOnPlainHttp` and the worker IPC
707        // `ProxyError::HstsOnPlainHttp` rejects.
708        if matches!(context.protocol, crate::Protocol::HTTPS) {
709            snapshot_response_edits(&mut context.headers_response, &headers_response, |e| {
710                matches!(e.mode, HeaderEditMode::SetIfAbsent | HeaderEditMode::Set)
711            });
712        }
713
714        // Look up cluster-side policy knobs once. The values we need are:
715        //  - `https_redirect` (legacy) and `https_redirect_port` for the 301 location URL
716        //  - `authorized_hashes` and `www_authenticate` for the 401 path
717        let (legacy_https_redirect, https_redirect_port, authorized_hashes, www_authenticate) =
718            match cluster_id.as_deref() {
719                Some(id) => proxy
720                    .borrow()
721                    .clusters()
722                    .get(id)
723                    .map(|c| {
724                        (
725                            c.https_redirect,
726                            c.https_redirect_port,
727                            c.authorized_hashes.clone(),
728                            c.www_authenticate.clone(),
729                        )
730                    })
731                    .unwrap_or((false, None, Vec::new(), None)),
732                None => (false, None, Vec::new(), None),
733            };
734
735        // ── 1. Explicit redirect policies (PERMANENT / FOUND / PERMANENT_REDIRECT) ──
736        // Resolved BEFORE the clusterless-deny branch so a frontend that
737        // declares `redirect = permanent | found | permanent_redirect`
738        // emits the matching 3xx even when no cluster is bound. This is
739        // the canonical "moved" shape from the original proposal in
740        // #1161 and is the only way to express "this hostname has moved"
741        // without standing up a dummy cluster. The block does not read
742        // `cluster_id`; per-cluster values (`https_redirect_port`,
743        // `www_authenticate`, …) default to safe sentinels at the cluster
744        // lookup above when `cluster_id` is `None`, so the reorder is
745        // data-flow-safe.
746        //
747        // Status code mapping (closes #1009):
748        //   Permanent          → 301 (RFC 9110 §15.4.2)
749        //   Found              → 302 (RFC 9110 §15.4.3) — UA may rewrite POST→GET
750        //   PermanentRedirect  → 308 (RFC 9110 §15.4.9) — method MUST be preserved
751        let redirect_status = match redirect {
752            RedirectPolicy::Permanent => Some(301u16),
753            RedirectPolicy::Found => Some(302u16),
754            RedirectPolicy::PermanentRedirect => Some(308u16),
755            // Forward / Unauthorized are handled by other branches
756            // below; keeping them named here forces an exhaustive
757            // match so a future RedirectPolicy variant doesn't
758            // silently fall through to `None`.
759            RedirectPolicy::Forward | RedirectPolicy::Unauthorized => None,
760        };
761        if let Some(status_code) = redirect_status {
762            let scheme = resolve_redirect_scheme(redirect_scheme, context);
763            let port = rewritten_port.map(|p| p as u32).or(https_redirect_port);
764            // Feed the rewritten host AND path into the `Location` URL
765            // when the frontend's RewriteParts populated them. Without
766            // this, a `redirect = permanent` frontend with
767            // `rewrite_host = "new.example.com"` would serve clients
768            // back to the original `Host:` header, defeating the
769            // documented `old → new` shape.
770            // The host_override path also keeps `:port` stripping
771            // intact: `build_redirect_location` removes any `:port` on
772            // the override before reapplying `port_suffix`.
773            context.redirect_location = Some(build_redirect_location(
774                scheme,
775                context,
776                port,
777                rewritten_host.as_deref(),
778                rewritten_path.as_deref(),
779            ));
780            // Stash the frontend's `redirect_template` (when set) so the
781            // 3xx default-answer path can render it via
782            // `HttpAnswers::render_inline_redirect` instead of the
783            // listener / cluster default. Without this stash the field
784            // flows into `RouteResult` only to be dropped by the
785            // wildcard destructure below, so the operator-supplied
786            // template has no observable effect on the rendered
787            // redirect.
788            context.frontend_redirect_template = redirect_template;
789            // Stash the resolved status so the answer engine picks the
790            // matching default template (`http.301.redirection` /
791            // `http.302.redirection` / `http.308.redirection`).
792            context.redirect_status = Some(status_code);
793            return Err(RetrieveClusterError::HttpsRedirect);
794        }
795
796        // ── 2. Explicit `RedirectPolicy::UNAUTHORIZED` or clusterless deny ─
797        // Reached when the frontend either explicitly asks for 401 or has
798        // no backing cluster and no `Permanent` redirect to honour. The
799        // `Forward + cluster_id == None` combination collapses here so
800        // legacy clusterless frontends still emit 401 by default.
801        if matches!(redirect, RedirectPolicy::Unauthorized) || cluster_id.is_none() {
802            context.www_authenticate = www_authenticate.clone();
803            trace!("{} RouteResult::deny", log_module_context!(context));
804            return Err(RetrieveClusterError::UnauthorizedRoute);
805        }
806
807        let Some(cluster_id) = cluster_id else {
808            // Guarded by the clusterless-deny branch immediately above;
809            // the `is_none()` arm has already returned `UnauthorizedRoute`
810            // by the time control reaches here.
811            unreachable!("cluster_id was checked Some above")
812        };
813
814        // ── 3. Legacy `cluster.https_redirect` (HTTP-only listeners) ───────
815        // The caller (`Router::connect`) emits the actual 301 only on
816        // `ListenerType::Http`; gate the URL stash on the same predicate
817        // so an HTTPS listener never carries a stale `redirect_location`
818        // into a downstream default-answer path.
819        if legacy_https_redirect && matches!(proxy.borrow().kind(), ListenerType::Http) {
820            let port = https_redirect_port;
821            context.redirect_location =
822                Some(build_redirect_location("https", context, port, None, None));
823        }
824
825        // ── 4. Basic auth check (only when `required_auth` was set) ────────
826        // The check iterates the full hash list in constant time (see
827        // `crate::protocol::mux::auth::check_basic`) so the time spent
828        // does not leak which hash matched, or whether any did at all.
829        // On failure, stash the cluster's `www_authenticate` realm so the
830        // 401 default-answer can render the matching `WWW-Authenticate`
831        // header. An empty realm causes the template engine to elide the
832        // header entirely (`or_elide_header = true`).
833        if frontend_required_auth
834            && !crate::protocol::mux::auth::check_basic(front, &authorized_hashes)
835        {
836            context.www_authenticate = www_authenticate.clone();
837            trace!(
838                "{} basic-auth check failed; emitting 401",
839                log_module_context!(context)
840            );
841            return Err(RetrieveClusterError::UnauthorizedRoute);
842        }
843
844        // ── 5. Request-side mutations on the front kawa ────────────────────
845        // From here on the route is a Forward — apply the frontend's
846        // rewrite + header policy to the request kawa so the backend
847        // wire carries the operator-configured shape.
848        apply_request_rewrites_and_headers(
849            front,
850            context,
851            rewritten_host.as_deref(),
852            rewritten_path.as_deref(),
853            &headers_request,
854        );
855
856        // Pass 2 of the response-snapshot copy (see the HSTS hoist
857        // above). Runs unconditionally on the regular forward path
858        // (the early returns above bypass this site, which keeps the
859        // default-answer scope as HSTS-only). Copies EVERY edit so
860        // operator-defined `Append` response headers reach
861        // backend-served responses on both HTTP and HTTPS listeners,
862        // preserving their pre-PR scope.
863        snapshot_response_edits(&mut context.headers_response, &headers_response, |_| true);
864
865        Ok(cluster_id)
866    }
867
868    pub fn backend_from_request<L: ListenerHandler + L7ListenerHandler>(
869        &mut self,
870        cluster_id: &str,
871        frontend_should_stick: bool,
872        context: &mut HttpContext,
873        proxy: Rc<RefCell<dyn L7Proxy>>,
874        listener: &Rc<RefCell<L>>,
875    ) -> Result<(TcpStream, Rc<RefCell<Backend>>), BackendConnectionError> {
876        let (backend, conn) = self
877            .get_backend_for_sticky_session(
878                cluster_id,
879                frontend_should_stick,
880                context.sticky_session_found.as_deref(),
881                proxy,
882            )
883            .map_err(|backend_error| {
884                trace!("{} {}", log_module_context!(context), backend_error);
885                BackendConnectionError::Backend(backend_error)
886            })?;
887
888        if frontend_should_stick {
889            // update sticky name in case it changed I guess?
890            context.sticky_name = listener.borrow().get_sticky_name().to_string();
891
892            context.sticky_session = Some(
893                backend
894                    .borrow()
895                    .sticky_id
896                    .clone()
897                    .unwrap_or_else(|| backend.borrow().backend_id.to_owned()),
898            );
899        }
900
901        context.backend_id = Some(backend.borrow().backend_id.to_owned());
902        context.backend_address = Some(backend.borrow().address);
903
904        Ok((conn, backend))
905    }
906
907    fn get_backend_for_sticky_session(
908        &self,
909        cluster_id: &str,
910        frontend_should_stick: bool,
911        sticky_session: Option<&str>,
912        proxy: Rc<RefCell<dyn L7Proxy>>,
913    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
914        match (frontend_should_stick, sticky_session) {
915            (true, Some(sticky_session)) => proxy
916                .borrow()
917                .backends()
918                .borrow_mut()
919                .backend_from_sticky_session(cluster_id, sticky_session),
920            _ => proxy
921                .borrow()
922                .backends()
923                .borrow_mut()
924                .backend_from_cluster_id(cluster_id),
925        }
926    }
927}
928
929/// Apply the frontend's request-side rewrite + header policy to the
930/// request kawa. Mutations land before backend connect so the backend
931/// wire carries the rewritten shape:
932///
933/// 1. If `rewritten_host` is set, replace the request-line authority
934///    with the rewritten value, replace any existing `Host` request
935///    header (so H1 backends see the same value the H2 `:authority`
936///    would carry), and inject `X-Forwarded-Host` carrying the
937///    pre-rewrite authority. The X-Forwarded-Host injection ONLY fires
938///    when `rewritten_host` is set — without a rewrite there is no host
939///    swap to disclose, and HAProxy's `option forwardfor` style
940///    headers (`X-Forwarded-For`, `X-Forwarded-Proto`) still flow from
941///    the kawa parser. The pre-rewrite authority itself is captured by
942///    the caller (`route_from_request`) into `context.original_authority`
943///    on every routed request so it survives every downstream code path
944///    (audit, deny, redirect, basic-auth 401, backend-connect failure).
945///    Dedup rule: the synthetic Host AND any pre-existing Host header
946///    are dropped in the retain pass below before the rewritten Host is
947///    appended, so the wire never carries two `Host:` headers.
948/// 2. If `rewritten_path` is set, replace both the abstract path
949///    (consumed by H2 `:path`) and the request-line URI (consumed by
950///    the H1 converter) so cardinality H1↔H1, H1↔H2, H2↔H1, H2↔H2 all
951///    propagate the rewritten target.
952/// 3. For every `headers_request` edit:
953///    - empty `val` → remove every existing header with the matching
954///      name from `kawa.blocks` (HAProxy `del-header` parity);
955///    - non-empty `val` → append the header before the `end_header`
956///      flag block. Set/replace semantics: callers that want to replace
957///      a header pass two edits (one delete with empty val, one set
958///      with the new value).
959fn apply_request_rewrites_and_headers(
960    kawa: &mut super::GenericHttpStream,
961    context: &mut HttpContext,
962    rewritten_host: Option<&str>,
963    rewritten_path: Option<&str>,
964    headers_request: &[HeaderEdit],
965) {
966    use kawa::{Block, Pair, Store};
967
968    if rewritten_host.is_none() && rewritten_path.is_none() && headers_request.is_empty() {
969        return;
970    }
971
972    // `route_from_request` already captured the pre-rewrite authority
973    // into `context.original_authority`. Re-borrow it here for the
974    // optional X-Forwarded-Host injection rather than re-parsing the
975    // kawa Store. Cloning a short header value (typically `host:port`)
976    // is cheaper than another UTF-8 decode of the request-line slice.
977    let original_authority: Option<String> = if rewritten_host.is_some() {
978        context.original_authority.clone()
979    } else {
980        None
981    };
982
983    // ── status-line authority / path rewrites ─────────────────────────
984    // The kawa request status line carries both `path` and `uri` —
985    // `path` is the abstract path (consumed by the H2 converter to
986    // emit `:path`) while `uri` is the request-line URI (consumed by
987    // the H1 converter at `kawa::protocol::h1::converter`). Both must
988    // be mutated so an H1 frontend forwarding to an H1 backend AND an
989    // H2 frontend forwarding to an H1 backend (or vice versa) see the
990    // rewritten target on the wire.
991    if rewritten_host.is_some() || rewritten_path.is_some() {
992        if let kawa::StatusLine::Request {
993            authority,
994            path,
995            uri,
996            ..
997        } = &mut kawa.detached.status_line
998        {
999            if let Some(new_host) = rewritten_host {
1000                *authority = Store::from_string(new_host.to_owned());
1001            }
1002            if let Some(new_path) = rewritten_path {
1003                *path = Store::from_string(new_path.to_owned());
1004                *uri = Store::from_string(new_path.to_owned());
1005            }
1006        }
1007    }
1008
1009    // ── single-pass split: deletes vs. sets ───────────────────────────
1010    // Walk `headers_request` once and separate each edit into either the
1011    // delete list (empty val) or the insert list (non-empty val). Two
1012    // passes was wasteful when an operator stacks many `--header` flags;
1013    // one pass keeps the allocation profile flat.
1014    let host_lower = b"host";
1015    let xfh_lower = b"x-forwarded-host";
1016    let rewriting_host = rewritten_host.is_some();
1017    let mut keys_to_drop: Vec<Vec<u8>> = Vec::with_capacity(headers_request.len() + 2);
1018    let mut to_insert: Vec<Block> = Vec::with_capacity(headers_request.len() + 2);
1019    // Track whether any operator-supplied edit names Host or
1020    // X-Forwarded-Host so we always dedup the existing kawa Host header
1021    // before inserting the operator's value. Without this, an operator
1022    // who sets `--header request=Host=evil` on a frontend WITHOUT
1023    // `--rewrite-host` lands TWO `Host:` headers on the backend wire —
1024    // a request-smuggling primitive on backends that pick last-Host
1025    // (CWE-444 cousin).
1026    let mut operator_overrides_host = false;
1027    let mut operator_overrides_xfh = false;
1028    for edit in headers_request {
1029        let key_is_host = edit.key.eq_ignore_ascii_case(host_lower);
1030        let key_is_xfh = edit.key.eq_ignore_ascii_case(xfh_lower);
1031        operator_overrides_host |= key_is_host;
1032        operator_overrides_xfh |= key_is_xfh;
1033        if edit.val.is_empty() {
1034            keys_to_drop.push(edit.key.iter().map(u8::to_ascii_lowercase).collect());
1035        } else {
1036            to_insert.push(Block::Header(Pair {
1037                key: Store::from_slice(&edit.key),
1038                val: Store::from_slice(&edit.val),
1039            }));
1040        }
1041    }
1042    if rewriting_host || operator_overrides_host {
1043        keys_to_drop.push(host_lower.to_vec());
1044    }
1045    if rewriting_host || operator_overrides_xfh {
1046        keys_to_drop.push(xfh_lower.to_vec());
1047    }
1048
1049    // ── delete pass on existing blocks ────────────────────────────────
1050    let buf_ptr = kawa.storage.buffer();
1051    if !keys_to_drop.is_empty() {
1052        // Read `key.data(buf_ptr)` only on non-elided headers — kawa's
1053        // earlier passes (HPACK decoder, H1 header parser) tag suppressed
1054        // headers with `Store::Empty` rather than removing them, and
1055        // calling `.data()` on `Store::Empty` panics in
1056        // `kawa-0.6.8/src/storage/repr.rs`. Pinning the guard explicitly
1057        // until kawa changes its policy.
1058        let buf = buf_ptr;
1059        kawa.blocks.retain(|block| {
1060            if let Block::Header(Pair { key, val: _ }) = block {
1061                if matches!(key, Store::Empty) {
1062                    return true;
1063                }
1064                let key_bytes = key.data(buf);
1065                // Both `keys_to_drop` and `key_lower` are pre-lowercased,
1066                // so a byte-equality compare is sufficient — a second
1067                // ASCII-fold pass via `compare_no_case` would just burn
1068                // cycles re-folding bytes that are already canonical.
1069                let key_lower: Vec<u8> = key_bytes.iter().map(u8::to_ascii_lowercase).collect();
1070                !keys_to_drop
1071                    .iter()
1072                    .any(|k| k.as_slice() == key_lower.as_slice())
1073            } else {
1074                true
1075            }
1076        });
1077    }
1078
1079    // ── insertion before the end-of-headers flag ──────────────────────
1080    // Every header we add (rewritten Host, X-Forwarded-Host,
1081    // operator-supplied set/append edits) must land before
1082    // `Block::Flags { end_header: true }` so the converter emits them
1083    // as part of the request header block. Synthetic Host/X-Forwarded-Host
1084    // are prepended (they describe the rewrite, not an operator policy).
1085    let end_header_idx = super::shared::end_of_headers_index(kawa);
1086
1087    if rewriting_host {
1088        let mut synth: Vec<Block> = Vec::with_capacity(2);
1089        if let Some(new_host) = rewritten_host {
1090            synth.push(Block::Header(Pair {
1091                key: Store::Static(b"Host"),
1092                val: Store::from_string(new_host.to_owned()),
1093            }));
1094        }
1095        if let Some(orig) = original_authority.as_deref() {
1096            synth.push(Block::Header(Pair {
1097                key: Store::Static(b"X-Forwarded-Host"),
1098                val: Store::from_string(orig.to_owned()),
1099            }));
1100        }
1101        synth.append(&mut to_insert);
1102        to_insert = synth;
1103    }
1104    if !to_insert.is_empty() {
1105        let insert_at = end_header_idx.unwrap_or(kawa.blocks.len());
1106        for (offset, block) in to_insert.into_iter().enumerate() {
1107            kawa.blocks.insert(insert_at + offset, block);
1108        }
1109    }
1110}
1111
1112/// Copy a per-frontend response-edit slice into the per-stream
1113/// `HttpContext.headers_response` snapshot, applying `filter` to each
1114/// edit. The snapshot is cleared before the copy so a second pass on
1115/// the same context (the HSTS hoist + post-forward pattern in
1116/// `route_from_request`) overrides any earlier partial copy.
1117fn snapshot_response_edits<F>(target: &mut Vec<HeaderEditSnapshot>, src: &[HeaderEdit], filter: F)
1118where
1119    F: Fn(&HeaderEdit) -> bool,
1120{
1121    target.clear();
1122    for edit in src.iter().filter(|e| filter(e)) {
1123        target.push(HeaderEditSnapshot {
1124            key: edit.key.to_vec(),
1125            val: edit.val.to_vec(),
1126            mode: edit.mode,
1127        });
1128    }
1129}
1130
1131/// Resolve the protocol scheme to use when emitting a redirect's `Location`
1132/// header. Maps the proto enum onto `"http"` / `"https"`, with `USE_SAME`
1133/// preserving the request's scheme (HTTPS for TLS listeners, HTTP otherwise).
1134fn resolve_redirect_scheme(scheme: RedirectScheme, context: &HttpContext) -> &'static str {
1135    match scheme {
1136        RedirectScheme::UseHttps => "https",
1137        RedirectScheme::UseHttp => "http",
1138        RedirectScheme::UseSame => {
1139            if context.tls_server_name.is_some() {
1140                "https"
1141            } else {
1142                "http"
1143            }
1144        }
1145    }
1146}
1147
1148/// Build the `Location` URL for a redirect response. Defaults the port
1149/// suffix only when the operator provided one or when scheme defaults
1150/// would mismatch (port 80 on https / 443 on http stays implicit).
1151///
1152/// `host_override` and `path_override` carry the frontend's
1153/// `RewriteParts::run` output for `RedirectPolicy::PERMANENT` flows so
1154/// the 301 `Location` reflects `rewrite_host` / `rewrite_path` instead
1155/// of the original `:authority` / `:path`. The legacy
1156/// `cluster.https_redirect` path passes `None` for both — it has no
1157/// per-frontend rewrite knobs.
1158fn build_redirect_location(
1159    scheme: &str,
1160    context: &HttpContext,
1161    port: Option<u32>,
1162    host_override: Option<&str>,
1163    path_override: Option<&str>,
1164) -> String {
1165    let authority = host_override
1166        .or(context.authority.as_deref())
1167        .unwrap_or_default();
1168    let path = path_override.or(context.path.as_deref()).unwrap_or("/");
1169    // Strip an existing `:port` from the authority — operators typically
1170    // configure `https_redirect_port` precisely because the listener's
1171    // port differs from the redirect target. Bracketed IPv6 literals
1172    // like `[::1]` survive intact: `rsplit_once(':')` only triggers when
1173    // the suffix after the final `:` is entirely ASCII digits.
1174    let host_only = match authority.rsplit_once(':') {
1175        Some((host, port_part))
1176            if !port_part.is_empty() && port_part.bytes().all(|b| b.is_ascii_digit()) =>
1177        {
1178            host
1179        }
1180        _ => authority,
1181    };
1182    let port_suffix = match port {
1183        Some(80) if scheme == "http" => String::new(),
1184        Some(443) if scheme == "https" => String::new(),
1185        Some(p) => format!(":{p}"),
1186        None => String::new(),
1187    };
1188    format!("{scheme}://{host_only}{port_suffix}{path}")
1189}
1190
1191/// Exact-match test between an HTTP `:authority` / `Host` value and a TLS SNI.
1192///
1193/// Matching rules:
1194///   * The authority is stripped of its optional `:port` suffix. RFC 6066 §3
1195///     forbids a port in the SNI extension, so the SNI is compared against
1196///     the host component only.
1197///   * The comparison is case-insensitive (RFC 9110 §4.2.3 — hosts are
1198///     case-insensitive). The SNI is assumed to be already lowercased by
1199///     the caller (see `https.rs::upgrade_handshake`); only the authority
1200///     side needs on-the-fly `to_ascii_lowercase`.
1201///   * No wildcard logic: if the operator serves a wildcard certificate,
1202///     the SNI negotiated by the client is still the specific name that
1203///     client sent, and the request `:authority` must equal that specific
1204///     name exactly. This is the tightest possible TLS trust boundary.
1205///
1206/// The `:port` suffix is only stripped when the suffix is non-empty and
1207/// entirely ASCII digits. This keeps bracketed IPv6 literals like `[::1]`
1208/// intact: `rsplit_once(':')` would otherwise mis-split them.
1209pub(crate) fn authority_matches_sni(authority: &str, sni_lowercased: &str) -> bool {
1210    let host = strip_authority_port(authority);
1211    if host.len() != sni_lowercased.len() {
1212        return false;
1213    }
1214    host.as_bytes()
1215        .iter()
1216        .zip(sni_lowercased.as_bytes())
1217        .all(|(a, b)| a.to_ascii_lowercase() == *b)
1218}
1219
1220/// Strip the optional `:port` suffix from an authority value. Bracketed
1221/// IPv6 literals (`[::1]`, `[::1]:8443`) keep their inner colons intact:
1222/// the suffix is only stripped when the tail after the last `:` is
1223/// non-empty and entirely ASCII digits.
1224fn strip_authority_port(authority: &str) -> &str {
1225    match authority.rsplit_once(':') {
1226        Some((h, port)) if !port.is_empty() && port.bytes().all(|b| b.is_ascii_digit()) => h,
1227        _ => authority,
1228    }
1229}
1230
1231/// RFC 6125 §6.4.3 wildcard-aware match of `:authority` against a SAN set
1232/// snapshot taken at TLS handshake.
1233///
1234/// Returns the matched SAN entry on success so the caller can log it.
1235///
1236/// Matching rules:
1237///   * Port suffix on the authority is stripped (same logic as
1238///     [`authority_matches_sni`], IPv6-bracket safe).
1239///   * Compare is ASCII case-insensitive (`:authority` is ASCII per
1240///     RFC 9113 §8.3.1; SAN entries are stored pre-lowercased by
1241///     `https.rs::upgrade_handshake`).
1242///   * `*.suffix` matches exactly one DNS label at the leftmost position
1243///     and only when that label is non-empty: it does NOT match the apex,
1244///     does NOT cross dots, and embedded wildcards (`foo.*.example.com`,
1245///     `*foo.example.com`) are forbidden.
1246///   * Empty `names` ⇒ `None` (default-cert path — Sōzu is not
1247///     authoritative for any name).
1248pub(crate) fn authority_matched_cert_name<'a>(
1249    authority: &str,
1250    names: &'a [String],
1251) -> Option<&'a str> {
1252    let mut host = strip_authority_port(authority);
1253    // RFC 1034 §3.1 absolute-form: `example.com.` and `example.com` name
1254    // the same host. The SAN snapshot already strips trailing dots at
1255    // `https.rs::upgrade_handshake`, and the SNI side strips them at the
1256    // same site; strip on the authority side so a client emitting
1257    // absolute-form `:authority` (or H1 `Host`) does not get a false 421.
1258    // Only one trailing dot is removed because RFC 1034 forbids multiple
1259    // trailing dots on a domain literal.
1260    if let Some(trimmed) = host.strip_suffix('.') {
1261        host = trimmed;
1262    }
1263    if host.is_empty() {
1264        return None;
1265    }
1266    for entry in names {
1267        if let Some(suffix) = entry.strip_prefix("*.") {
1268            // RFC 6125 §6.4.3: the wildcard label is the *entire* left-most
1269            // label. Embedded wildcards (`f*.example.com`, `*f.example.com`)
1270            // are rejected because we reach this branch only when the entry
1271            // starts with the exact two bytes `*.`. We still must reject
1272            // wildcards anywhere else in the entry by requiring no further
1273            // `*` in `suffix`.
1274            if suffix.contains('*') {
1275                continue;
1276            }
1277            // Authority has the form `<left-most-label>.<rest>`; the
1278            // wildcard substitutes for exactly that left-most label, which
1279            // must be non-empty and contain no dot.
1280            let Some((leftmost, rest)) = host.split_once('.') else {
1281                continue;
1282            };
1283            if leftmost.is_empty() {
1284                continue;
1285            }
1286            if rest.eq_ignore_ascii_case(suffix) {
1287                return Some(entry);
1288            }
1289            continue;
1290        }
1291        if entry.contains('*') {
1292            // Internal wildcards (`foo.*.example.com`) are not RFC 6125-
1293            // valid. Skip rather than mis-match.
1294            continue;
1295        }
1296        if host.eq_ignore_ascii_case(entry) {
1297            return Some(entry);
1298        }
1299    }
1300    None
1301}
1302
1303#[cfg(test)]
1304mod tests {
1305    use super::authority_matches_sni;
1306
1307    #[test]
1308    fn match_exact() {
1309        assert!(authority_matches_sni("example.com", "example.com"));
1310    }
1311
1312    #[test]
1313    fn match_different_case() {
1314        assert!(authority_matches_sni("Example.COM", "example.com"));
1315    }
1316
1317    #[test]
1318    fn match_authority_with_port() {
1319        assert!(authority_matches_sni("example.com:8443", "example.com"));
1320    }
1321
1322    #[test]
1323    fn reject_different_host() {
1324        assert!(!authority_matches_sni(
1325            "tenant-b.example.com",
1326            "tenant-a.example.com"
1327        ));
1328    }
1329
1330    #[test]
1331    fn reject_substring_attack() {
1332        // Length check guards against an authority that is a prefix or
1333        // suffix of the SNI (or vice versa).
1334        assert!(!authority_matches_sni("example.co", "example.com"));
1335        assert!(!authority_matches_sni("example.commons", "example.com"));
1336    }
1337
1338    #[test]
1339    fn reject_wildcard_not_expanded() {
1340        // Wildcard cert selection happens at the cert-resolver layer; the SNI
1341        // we see here is the concrete name the client sent. Do not silently
1342        // accept `*.example.com` as matching `foo.example.com`.
1343        assert!(!authority_matches_sni("foo.example.com", "*.example.com"));
1344    }
1345
1346    #[test]
1347    fn ipv6_bracketed_literal_with_port() {
1348        // `[::1]:8443` must still match the SNI `[::1]`; only the trailing
1349        // `:8443` is a port (all digits → stripped).
1350        assert!(authority_matches_sni("[::1]:8443", "[::1]"));
1351    }
1352
1353    #[test]
1354    fn ipv6_bracketed_without_port() {
1355        // The `:` characters inside the brackets must not be mistaken for a
1356        // port separator: the tail after the last `:` is `1]`, not all
1357        // digits, so it is NOT stripped and the whole string compares.
1358        assert!(authority_matches_sni("[::1]", "[::1]"));
1359    }
1360}
1361
1362#[cfg(test)]
1363mod authority_matched_cert_name_tests {
1364    use super::authority_matched_cert_name;
1365
1366    #[test]
1367    fn cert_name_match_exact_single_san() {
1368        let names = vec!["example.com".to_owned()];
1369        assert_eq!(
1370            authority_matched_cert_name("example.com", &names),
1371            Some("example.com"),
1372        );
1373    }
1374
1375    #[test]
1376    fn cert_name_match_wildcard_left_most() {
1377        let names = vec!["*.cleverapps.io".to_owned()];
1378        assert_eq!(
1379            authority_matched_cert_name("staging-3.cleverapps.io", &names),
1380            Some("*.cleverapps.io"),
1381        );
1382    }
1383
1384    #[test]
1385    fn cert_name_reject_wildcard_apex() {
1386        // RFC 6125 §6.4.3: `*.example.com` does NOT cover the apex
1387        // `example.com` — the wildcard label must consume exactly one
1388        // non-empty label.
1389        let names = vec!["*.example.com".to_owned()];
1390        assert_eq!(authority_matched_cert_name("example.com", &names), None);
1391    }
1392
1393    #[test]
1394    fn cert_name_reject_wildcard_two_labels() {
1395        // `*.example.com` cannot cross dots: `a.b.example.com` has two
1396        // labels before `example.com` and must be rejected.
1397        let names = vec!["*.example.com".to_owned()];
1398        assert_eq!(authority_matched_cert_name("a.b.example.com", &names), None,);
1399    }
1400
1401    #[test]
1402    fn cert_name_reject_wildcard_not_left_most() {
1403        // Embedded wildcards (`foo.*.example.com`) are not RFC 6125-valid
1404        // and must be skipped, not mis-matched.
1405        let names = vec!["foo.*.example.com".to_owned()];
1406        assert_eq!(
1407            authority_matched_cert_name("foo.bar.example.com", &names),
1408            None,
1409        );
1410    }
1411
1412    #[test]
1413    fn cert_name_match_case_insensitive() {
1414        // ASCII case folding only — `:authority` is ASCII per RFC 9113
1415        // §8.3.1 and the snapshot is pre-lowercased at handshake.
1416        let names = vec!["EXAMPLE.com".to_owned()];
1417        assert!(authority_matched_cert_name("Example.COM", &names).is_some());
1418    }
1419
1420    #[test]
1421    fn cert_name_match_with_port() {
1422        // The port suffix on `:authority` must be stripped before the
1423        // SAN compare.
1424        let names = vec!["example.com".to_owned()];
1425        assert!(authority_matched_cert_name("example.com:8443", &names).is_some());
1426    }
1427
1428    #[test]
1429    fn cert_name_match_absolute_form_trailing_dot() {
1430        // RFC 1034 §3.1: an absolute-form domain literal carries one
1431        // trailing dot (`example.com.`) and resolves to the same host as
1432        // the relative form. The SAN snapshot stores the relative form
1433        // (https.rs strips the trailing dot at handshake), so the matcher
1434        // must strip it on the authority side too — otherwise a client
1435        // emitting an absolute-form `:authority` gets a false 421.
1436        let names = vec!["example.com".to_owned()];
1437        assert!(authority_matched_cert_name("example.com.", &names).is_some());
1438        // And with both port and trailing dot.
1439        assert!(authority_matched_cert_name("example.com.:8443", &names).is_some());
1440        // The wildcard branch must also accept the absolute form.
1441        let wildcard = vec!["*.example.com".to_owned()];
1442        assert!(authority_matched_cert_name("foo.example.com.", &wildcard).is_some());
1443    }
1444
1445    #[test]
1446    fn cert_name_match_idn_a_label() {
1447        // IDNA A-labels (xn--…) are ASCII and compare byte-for-byte once
1448        // the snapshot is lowercased.
1449        let names = vec!["xn--bcher-kva.example.com".to_owned()];
1450        assert!(authority_matched_cert_name("xn--bcher-kva.example.com", &names).is_some());
1451    }
1452
1453    #[test]
1454    fn cert_name_reject_empty_names() {
1455        // Empty snapshot = default cert served = Sōzu is not
1456        // authoritative for any name; every authority must miss.
1457        assert_eq!(authority_matched_cert_name("example.com", &[]), None);
1458    }
1459
1460    #[test]
1461    fn cert_name_match_multi_san_one_hit() {
1462        let names = vec!["foo.com".to_owned(), "*.example.org".to_owned()];
1463        assert_eq!(
1464            authority_matched_cert_name("bar.example.org", &names),
1465            Some("*.example.org"),
1466        );
1467    }
1468
1469    #[test]
1470    fn cert_name_reject_substring_attack() {
1471        // `*.example.com` must not match `example.commons` — the suffix
1472        // after the first label is `commons`, not `example.com`.
1473        let names = vec!["*.example.com".to_owned()];
1474        assert_eq!(authority_matched_cert_name("example.commons", &names), None,);
1475    }
1476
1477    #[test]
1478    fn cert_name_ipv6_bracketed_literal_with_port() {
1479        // The `:` characters inside the brackets must not be mistaken for
1480        // a port separator: only the trailing `:8443` is stripped, and
1481        // `[::1]` compares equal to `[::1]`.
1482        let names = vec!["[::1]".to_owned()];
1483        assert!(authority_matched_cert_name("[::1]:8443", &names).is_some());
1484    }
1485}