Skip to main content

sozu_lib/
http.rs

1use std::{
2    cell::RefCell,
3    collections::{BTreeMap, HashMap, hash_map::Entry},
4    io::ErrorKind,
5    net::{Shutdown, SocketAddr},
6    os::unix::io::AsRawFd,
7    rc::{Rc, Weak},
8    str::from_utf8_unchecked,
9    time::{Duration, Instant},
10};
11
12use mio::{
13    Interest, Registry, Token,
14    net::{TcpListener as MioTcpListener, TcpStream},
15    unix::SourceFd,
16};
17use rusty_ulid::Ulid;
18use sozu_command::{
19    logging::CachedTags,
20    proto::command::{
21        Cluster, HttpListenerConfig, ListenerType, RemoveListener, RequestHttpFrontend,
22        UpdateHttpListenerConfig, WorkerRequest, WorkerResponse, request::RequestType,
23    },
24    ready::Ready,
25    response::HttpFrontend,
26    state::{ClusterId, validate_h2_flood_knobs_http, validate_sozu_id_header},
27};
28
29use crate::metrics::names;
30use crate::{
31    AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
32    ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed,
33    SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
34    backends::BackendMap,
35    pool::Pool,
36    protocol::{
37        Pipe, SessionState,
38        http::{
39            answers::HttpAnswers,
40            parser::{Method, hostname_and_port},
41        },
42        mux::{self, Mux, MuxClear},
43        proxy_protocol::expect::ExpectProxyProtocol,
44    },
45    router::{RouteResult, Router},
46    server::{ListenToken, SessionManager},
47    socket::server_bind,
48    timer::TimeoutContainer,
49};
50
51#[derive(PartialEq, Eq)]
52pub enum SessionStatus {
53    Normal,
54    DefaultAnswer,
55}
56
57StateMachineBuilder! {
58    /// The various Stages of an HTTP connection:
59    ///
60    /// 1. optional (ExpectProxyProtocol)
61    /// 2. HTTP (via Mux in H1 mode)
62    /// 3. WebSocket (passthrough)
63    enum HttpStateMachine impl SessionState {
64        Expect(ExpectProxyProtocol<TcpStream>),
65        Mux(MuxClear),
66        WebSocket(Pipe<crate::socket::SessionTcpStream, HttpListener>),
67    }
68}
69
70/// Module-level prefix for log lines emitted from this file when no session
71/// is in scope. Produces a bold bright-white `HTTP` label in colored mode.
72/// Used by [`HttpProxy`] / [`HttpListener`] callbacks (notify, add_cluster,
73/// add_*_frontend, accept, soft_stop, hard_stop, etc.) which own a token map
74/// keyed by listener and have no `frontend_token` of their own.
75macro_rules! log_module_context {
76    () => {{
77        let (open, reset, _, _, _) = sozu_command::logging::ansi_palette();
78        format!("{open}HTTP{reset}\t >>>", open = open, reset = reset)
79    }};
80}
81
82/// Per-session prefix for log lines emitted with an [`HttpSession`] in
83/// scope. Renders the canonical `\tHTTP\tSession(...)\t >>>` envelope from
84/// the session's `frontend_token` (mirrors the bracket convention used by
85/// `MUX-*`, `RUSTLS`, `PIPE`). Operators can grep-correlate against the
86/// token id across log lines for the same H1 connection.
87macro_rules! log_context {
88    ($self:expr) => {{
89        let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
90        format!(
91            "{open}HTTP{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset})\t >>>",
92            open = open,
93            reset = reset,
94            grey = grey,
95            gray = gray,
96            white = white,
97            frontend = $self.frontend_token.0,
98        )
99    }};
100}
101
102/// HTTP Session to insert in the SessionManager
103///
104/// 1 session <=> 1 HTTP connection (client to sozu)
105pub struct HttpSession {
106    configured_backend_timeout: Duration,
107    configured_connect_timeout: Duration,
108    configured_frontend_timeout: Duration,
109    frontend_token: Token,
110    last_event: Instant,
111    listener: Rc<RefCell<HttpListener>>,
112    metrics: SessionMetrics,
113    pool: Weak<RefCell<Pool>>,
114    proxy: Rc<RefCell<HttpProxy>>,
115    state: HttpStateMachine,
116    has_been_closed: bool,
117}
118
119impl HttpSession {
120    #[allow(clippy::too_many_arguments)]
121    pub fn new(
122        configured_backend_timeout: Duration,
123        configured_connect_timeout: Duration,
124        configured_frontend_timeout: Duration,
125        configured_request_timeout: Duration,
126        expect_proxy: bool,
127        listener: Rc<RefCell<HttpListener>>,
128        pool: Weak<RefCell<Pool>>,
129        proxy: Rc<RefCell<HttpProxy>>,
130        public_address: SocketAddr,
131        sock: TcpStream,
132        token: Token,
133        wait_time: Duration,
134    ) -> Result<Self, AcceptError> {
135        let request_id = Ulid::generate();
136        let container_frontend_timeout = TimeoutContainer::new(configured_request_timeout, token);
137
138        let state = if expect_proxy {
139            trace!("{} starting in expect proxy state", log_module_context!());
140            gauge_add!(names::protocol::PROXY_EXPECT, 1);
141
142            HttpStateMachine::Expect(ExpectProxyProtocol::new(
143                container_frontend_timeout,
144                sock,
145                token,
146                request_id,
147            ))
148        } else {
149            gauge_add!(names::protocol::HTTP, 1);
150            let session_address = sock.peer_addr().ok();
151            let session_ulid = rusty_ulid::Ulid::generate();
152            let sock = crate::socket::SessionTcpStream::new(sock, session_ulid, session_address);
153
154            let frontend =
155                mux::Connection::new_h1_server(session_ulid, sock, container_frontend_timeout);
156            let router = mux::Router::new(configured_backend_timeout, configured_connect_timeout);
157            let mut context = mux::Context::new(
158                session_ulid,
159                pool.clone(),
160                listener.clone(),
161                session_address,
162                public_address,
163            );
164            context
165                .create_stream(request_id, 1 << 16)
166                .ok_or(AcceptError::BufferCapacityReached)?;
167            HttpStateMachine::Mux(Mux {
168                configured_frontend_timeout,
169                frontend_token: token,
170                frontend,
171                router,
172                context,
173                session_ulid,
174            })
175        };
176
177        // Invariant: `create_session` allocates `token` from the session slab
178        // and registers the frontend socket under it before constructing us.
179        // A session whose frontend token aliased the listen-socket token (0)
180        // would route listener readiness into a session — a state-machine bug.
181        // (`Token(0)` is reserved in real workers; the test harness in this
182        // file builds listeners directly without going through `new`.)
183        debug_assert_eq!(
184            state.marker() as u8,
185            if expect_proxy {
186                StateMarker::Expect as u8
187            } else {
188                StateMarker::Mux as u8
189            },
190            "constructed state must match the expect_proxy branch"
191        );
192        // The freshly-built state can never be a FailedUpgrade: that marker
193        // only appears after a real upgrade attempt drains the state via
194        // `take()`. Catch a future refactor that constructs one by accident.
195        debug_assert!(
196            !state.failed(),
197            "a newly created session must not start in FailedUpgrade"
198        );
199
200        let metrics = SessionMetrics::new(Some(wait_time));
201        let session = HttpSession {
202            configured_backend_timeout,
203            configured_connect_timeout,
204            configured_frontend_timeout,
205            frontend_token: token,
206            has_been_closed: false,
207            last_event: Instant::now(),
208            listener,
209            metrics,
210            pool,
211            proxy,
212            state,
213        };
214        debug_assert_eq!(
215            session.frontend_token, token,
216            "frontend token must be the slab token used for registration"
217        );
218        #[cfg(debug_assertions)]
219        session.check_invariants();
220        Ok(session)
221    }
222
223    /// Full cross-field invariant sweep for the session state machine, used as
224    /// a run-to-completion postcondition. Compiled out in release.
225    ///
226    /// Strong invariants asserted here:
227    /// - the frontend token is always present (slab key is never sentinel);
228    /// - once `close()` has run the state has been drained / cancelled, so the
229    ///   session is terminal and must not be re-driven (callers gate on the
230    ///   returned `SessionIsToBeClosed`);
231    /// - the live state marker is one of the three legal H1 stages — the
232    ///   `FailedUpgrade` marker is only transient (between `take()` and the
233    ///   `close()` that immediately follows a failed upgrade), so it is the
234    ///   single tolerated exception on a still-open session.
235    #[cfg(debug_assertions)]
236    fn check_invariants(&self) {
237        let marker = self.state.marker();
238        debug_assert!(
239            matches!(
240                marker,
241                StateMarker::Expect | StateMarker::Mux | StateMarker::WebSocket
242            ),
243            "session marker must be a legal H1 stage (Expect/Mux/WebSocket), got {marker:?}"
244        );
245        // A failed state is only ever observed transiently between a failed
246        // upgrade and the close() that reaps it; if it survives to a postcondition
247        // sweep on a not-yet-closed session, a close path was skipped.
248        debug_assert!(
249            !self.state.failed() || self.has_been_closed,
250            "FailedUpgrade state must be reaped by close(), never left live"
251        );
252    }
253
254    pub fn upgrade(&mut self) -> SessionIsToBeClosed {
255        debug!("{} upgrade", log_context!(self));
256        // Record the stage we are leaving so we can assert the transition is
257        // legal (no stage-skipping) after the handoff resolves. `take()`
258        // installs a `FailedUpgrade(marker)` placeholder carrying this same
259        // marker, so the session is never left in a half-moved state if an
260        // upgrade_* helper bails out.
261        let from_marker = self.state.marker();
262        let new_state = match self.state.take() {
263            HttpStateMachine::Mux(mux) => self.upgrade_mux(mux),
264            HttpStateMachine::Expect(expect) => self.upgrade_expect(expect),
265            HttpStateMachine::WebSocket(ws) => self.upgrade_websocket(ws),
266            HttpStateMachine::FailedUpgrade(_) => {
267                // Reaching this arm means a prior upgrade already returned
268                // `None` and the session should have been closed. Fall back
269                // to closing cleanly instead of panicking the worker.
270                error!(
271                    "{} upgrade called on FailedUpgrade state; closing session",
272                    log_context!(self)
273                );
274                None
275            }
276        };
277
278        match new_state {
279            Some(state) => {
280                // Legal transitions only: Expect→Mux, Mux→WebSocket, or the
281                // WebSocket self-loop (upgrade_websocket is a no-op guard). Any
282                // other pair means a stage was skipped or reversed.
283                debug_assert!(
284                    matches!(
285                        (from_marker, state.marker()),
286                        (StateMarker::Expect, StateMarker::Mux)
287                            | (StateMarker::Mux, StateMarker::WebSocket)
288                            | (StateMarker::WebSocket, StateMarker::WebSocket)
289                    ),
290                    "illegal protocol-upgrade transition {from_marker:?} -> {:?}",
291                    state.marker()
292                );
293                debug_assert!(
294                    !state.failed(),
295                    "a successful upgrade must not install a FailedUpgrade state"
296                );
297                self.state = state;
298                #[cfg(debug_assertions)]
299                self.check_invariants();
300                false
301            }
302            // The state stays FailedUpgrade, but the Session should be closed right after
303            None => {
304                // On failure `take()` left a FailedUpgrade placeholder behind;
305                // the caller (`ready`) must close the session on the returned
306                // `true`, so we do not run the still-open invariant sweep here.
307                debug_assert!(
308                    self.state.failed(),
309                    "a failed upgrade must leave the session in FailedUpgrade"
310                );
311                true
312            }
313        }
314    }
315
316    fn upgrade_expect(
317        &mut self,
318        expect: ExpectProxyProtocol<TcpStream>,
319    ) -> Option<HttpStateMachine> {
320        debug!("{} switching to HTTP", log_context!(self));
321        match expect
322            .addresses
323            .as_ref()
324            .map(|add| (add.destination(), add.source()))
325        {
326            Some((Some(public_address), Some(session_address))) => {
327                let session_ulid = rusty_ulid::Ulid::generate();
328                let frontend = mux::Connection::new_h1_server(
329                    session_ulid,
330                    crate::socket::SessionTcpStream::new(
331                        expect.frontend,
332                        session_ulid,
333                        Some(session_address),
334                    ),
335                    expect.container_frontend_timeout,
336                );
337                let router = mux::Router::new(
338                    self.configured_backend_timeout,
339                    self.configured_connect_timeout,
340                );
341                let mut context = mux::Context::new(
342                    session_ulid,
343                    self.pool.clone(),
344                    self.listener.clone(),
345                    Some(session_address),
346                    public_address,
347                );
348                if context.create_stream(expect.request_id, 1 << 16).is_none() {
349                    error!(
350                        "{} expect upgrade failed: could not create stream",
351                        log_context!(self)
352                    );
353                    return None;
354                }
355                let mut mux = Mux {
356                    configured_frontend_timeout: self.configured_frontend_timeout,
357                    frontend_token: self.frontend_token,
358                    frontend,
359                    router,
360                    context,
361                    session_ulid,
362                };
363                mux.frontend.readiness_mut().event = expect.frontend_readiness.event;
364
365                // The Expect→Mux handoff must carry the session's frontend
366                // token across unchanged: the slab slot and epoll registration
367                // are keyed by it, so a mismatch would misroute readiness.
368                debug_assert_eq!(
369                    mux.frontend_token, self.frontend_token,
370                    "expect upgrade must preserve the frontend token"
371                );
372                // Fresh Mux: exactly one stream was just created above, and no
373                // backend is connected yet (back token is set iff linked).
374                debug_assert_eq!(
375                    mux.context.streams.len(),
376                    1,
377                    "a freshly upgraded Mux owns exactly the request stream"
378                );
379
380                // Gauge pairing: this connection leaves PROXY_EXPECT and enters
381                // HTTP. The two adjustments must stay together so the live-state
382                // gauges sum to the session count (no double-count, no leak).
383                gauge_add!(names::protocol::PROXY_EXPECT, -1);
384                gauge_add!(names::protocol::HTTP, 1);
385                Some(HttpStateMachine::Mux(mux))
386            }
387            _ => {
388                debug!(
389                    "{} expect upgrade failed: bad header {:?}",
390                    log_context!(self),
391                    expect.addresses
392                );
393                None
394            }
395        }
396    }
397
398    fn upgrade_mux(&mut self, mut mux: MuxClear) -> Option<HttpStateMachine> {
399        debug!("{} mux switching to ws", log_context!(self));
400        let Some(stream) = mux.context.streams.pop() else {
401            error!(
402                "{} upgrade_mux: no stream attached to the mux session, closing",
403                log_context!(self)
404            );
405            return None;
406        };
407        // http.active_requests was already decremented by generate_access_log()
408        // in h1.rs before MuxResult::Upgrade was returned to us.
409
410        let (frontend_readiness, frontend_socket, mut container_frontend_timeout) =
411            match mux.frontend {
412                mux::Connection::H1(mux::ConnectionH1 {
413                    readiness,
414                    socket,
415                    timeout_container,
416                    ..
417                }) => (readiness, socket, timeout_container),
418                mux::Connection::H2(_) => {
419                    error!(
420                        "{} only h1<->h1 connections can upgrade to websocket",
421                        log_context!(self)
422                    );
423                    return None;
424                }
425            };
426
427        let mux::StreamState::Linked(back_token) = stream.state else {
428            error!(
429                "{} upgrading stream should be linked to a backend",
430                log_context!(self)
431            );
432            return None;
433        };
434        // Invariant (back token set iff backend connected): a Linked stream
435        // names a backend token that must exist in the router's backend map.
436        // We assert the map is non-empty here, then prove membership by the
437        // `remove` below — a Linked token absent from the map would be a
438        // book-keeping desync between stream state and the backend map.
439        debug_assert!(
440            mux.router.backends.contains_key(&back_token),
441            "a Linked stream's back token must index a connected backend"
442        );
443        let backends_before = mux.router.backends.len();
444        let Some(backend) = mux.router.backends.remove(&back_token) else {
445            error!(
446                "{} upgrade_mux: backend for token {:?} is missing (already disconnected?), closing",
447                log_context!(self),
448                back_token
449            );
450            return None;
451        };
452        let (cluster_id, backend, backend_readiness, backend_socket, mut container_backend_timeout) =
453            match backend {
454                mux::Connection::H1(mux::ConnectionH1 {
455                    position:
456                        mux::Position::Client(cluster_id, backend, mux::BackendStatus::Connected),
457                    readiness,
458                    socket,
459                    timeout_container,
460                    ..
461                }) => (cluster_id, backend, readiness, socket, timeout_container),
462                mux::Connection::H1(_) => {
463                    error!(
464                        "{} the backend disconnected just after upgrade, abort",
465                        log_context!(self)
466                    );
467                    return None;
468                }
469                mux::Connection::H2(_) => {
470                    error!(
471                        "{} only h1<->h1 connections can upgrade to websocket",
472                        log_context!(self)
473                    );
474                    return None;
475                }
476            };
477
478        // Post-removal book-keeping: the backend is gone from the map and the
479        // count dropped by exactly one (the `remove` matched a present key).
480        debug_assert!(
481            !mux.router.backends.contains_key(&back_token),
482            "the upgraded backend must be evicted from the router map"
483        );
484        debug_assert_eq!(
485            mux.router.backends.len(),
486            backends_before - 1,
487            "removing the backend must drop the backend count by exactly one"
488        );
489
490        let ws_context = stream.context.websocket_context();
491
492        container_frontend_timeout.reset();
493        container_backend_timeout.reset();
494
495        let backend_id = backend.borrow().backend_id.clone();
496        // `Pipe::backend_socket` is typed `Option<TcpStream>` (raw, pre-mux).
497        // The mux wraps every backend TCP socket in `SessionTcpStream` so
498        // SOCKET-layer errors carry the session ULID; unwrap back to the
499        // plain `TcpStream` here to feed Pipe's legacy shape.
500        let backend_socket = backend_socket.stream;
501        let mut pipe = Pipe::new(
502            stream.back.storage.buffer,
503            Some(backend_id),
504            Some(backend_socket),
505            Some(backend),
506            Some(container_backend_timeout),
507            Some(container_frontend_timeout),
508            Some(cluster_id),
509            stream.front.storage.buffer,
510            self.frontend_token,
511            frontend_socket,
512            self.listener.clone(),
513            Protocol::HTTP,
514            stream.context.session_id,
515            stream.context.id,
516            stream.context.session_address,
517            ws_context,
518        );
519
520        pipe.frontend_readiness.event = frontend_readiness.event;
521        pipe.backend_readiness.event = backend_readiness.event;
522        // The WebSocket pipe inherits the live backend connection, so its back
523        // token must be set (back token present iff a backend is connected).
524        pipe.set_back_token(back_token);
525        debug_assert_eq!(
526            pipe.back_token(),
527            vec![back_token],
528            "websocket pipe must carry exactly the upgraded backend token"
529        );
530
531        // http.active_requests was already decremented by generate_access_log()
532        // in h1.rs when the 101 response was written (before MuxResult::Upgrade).
533        //
534        // Gauge pairing (Mux→WebSocket): leave HTTP, enter WS, and start
535        // accounting one active websocket request. These three must move
536        // together so the protocol gauges stay consistent with the session set.
537        gauge_add!(names::protocol::HTTP, -1);
538        gauge_add!(names::protocol::WS, 1);
539        gauge_add!(names::websocket::ACTIVE_REQUESTS, 1);
540        Some(HttpStateMachine::WebSocket(pipe))
541    }
542
543    fn upgrade_websocket(
544        &self,
545        ws: Pipe<crate::socket::SessionTcpStream, HttpListener>,
546    ) -> Option<HttpStateMachine> {
547        // what do we do here?
548        error!(
549            "{} upgrade called on WS, this should not happen",
550            log_context!(self)
551        );
552        Some(HttpStateMachine::WebSocket(ws))
553    }
554}
555
556impl ProxySession for HttpSession {
557    fn close(&mut self) {
558        if self.has_been_closed {
559            return;
560        }
561        // Reaching here means we are about to do the one-and-only teardown.
562        debug_assert!(
563            !self.has_been_closed,
564            "close past the guard must run on a not-yet-closed session"
565        );
566
567        trace!("{} closing HTTP session", log_context!(self));
568        self.metrics.service_stop();
569
570        // Restore gauges
571        match self.state.marker() {
572            StateMarker::Expect => gauge_add!(names::protocol::PROXY_EXPECT, -1),
573            StateMarker::Mux => gauge_add!(names::protocol::HTTP, -1),
574            StateMarker::WebSocket => {
575                gauge_add!(names::protocol::WS, -1);
576                gauge_add!(names::websocket::ACTIVE_REQUESTS, -1);
577            }
578        }
579
580        if self.state.failed() {
581            match self.state.marker() {
582                StateMarker::Expect => incr!(names::http::UPGRADE_EXPECT_FAILED),
583                StateMarker::Mux => incr!(names::http::UPGRADE_MUX_FAILED),
584                StateMarker::WebSocket => incr!(names::http::UPGRADE_WS_FAILED),
585            }
586            // FailedUpgrade means the socket was consumed by a failed upgrade
587            // attempt, so we can only close the state (no-op) and remove the
588            // session — cancel_timeouts / front_socket are unreachable.
589            self.state.close(self.proxy.clone(), &mut self.metrics);
590            self.proxy.borrow().remove_session(self.frontend_token);
591            self.has_been_closed = true;
592            debug_assert!(
593                self.has_been_closed,
594                "failed-upgrade close path must mark the session closed"
595            );
596            return;
597        }
598
599        self.state.cancel_timeouts();
600        // defer backend closing to the state
601        self.state.close(self.proxy.clone(), &mut self.metrics);
602
603        let front_socket = self.state.front_socket();
604        // invariant: write-only shutdown — Shutdown::Both on a TLS frontend
605        // discards the receive buffer and elicits TCP RST, truncating the
606        // already-queued response. Canonical write-up: `lib/src/https.rs:650-655`.
607        // Backend sockets follow the same discipline for symmetry.
608        if let Err(e) = front_socket.shutdown(Shutdown::Write) {
609            // error 107 NotConnected can happen when was never fully connected, or was already disconnected due to error
610            if e.kind() != ErrorKind::NotConnected {
611                error!(
612                    "{} error shutting down front socket({:?}): {:?}",
613                    log_context!(self),
614                    front_socket,
615                    e
616                )
617            }
618        }
619
620        // deregister the frontend and remove it
621        let proxy = self.proxy.borrow();
622        let fd = front_socket.as_raw_fd();
623        if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
624            error!(
625                "{} error deregistering front socket({:?}) while closing HTTP session: {:?}",
626                log_context!(self),
627                fd,
628                e
629            );
630        }
631        proxy.remove_session(self.frontend_token);
632
633        self.has_been_closed = true;
634        debug_assert!(
635            self.has_been_closed,
636            "close must leave the session marked closed (idempotency latch)"
637        );
638    }
639
640    fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
641        let state_result = self.state.timeout(token, &mut self.metrics);
642        state_result == StateResult::CloseSession
643    }
644
645    fn protocol(&self) -> Protocol {
646        Protocol::HTTP
647    }
648
649    fn update_readiness(&mut self, token: Token, events: Ready) {
650        trace!(
651            "{} token {:?} got event {}",
652            log_context!(self),
653            token,
654            super::ready_to_string(events)
655        );
656        self.last_event = Instant::now();
657        self.metrics.wait_start();
658        self.state.update_readiness(token, events);
659    }
660
661    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
662        self.metrics.service_start();
663
664        let session_result =
665            self.state
666                .ready(session.clone(), self.proxy.clone(), &mut self.metrics);
667
668        let to_be_closed = match session_result {
669            SessionResult::Close => true,
670            SessionResult::Continue => false,
671            SessionResult::Upgrade => match self.upgrade() {
672                false => self.ready(session),
673                true => true,
674            },
675        };
676
677        self.metrics.service_stop();
678        // Run-to-completion postcondition: a session that is being kept alive
679        // must still satisfy its cross-field invariants. When `to_be_closed`
680        // is true the state may be a transient FailedUpgrade or already torn
681        // down, so the sweep only applies to the still-open path.
682        #[cfg(debug_assertions)]
683        if !to_be_closed {
684            self.check_invariants();
685        }
686        to_be_closed
687    }
688
689    fn shutting_down(&mut self) -> SessionIsToBeClosed {
690        self.state.shutting_down()
691    }
692
693    fn last_event(&self) -> Instant {
694        self.last_event
695    }
696
697    fn print_session(&self) {
698        self.state.print_state("HTTP");
699        error!("{} Metrics: {:?}", log_context!(self), self.metrics);
700    }
701
702    fn frontend_token(&self) -> Token {
703        self.frontend_token
704    }
705}
706
707pub type Hostname = String;
708
709/// Cleartext HTTP/1.x listener.
710///
711/// # HTTP/2 over cleartext (h2c) is NOT supported
712///
713/// RFC 7540 §3.2 specified an `Upgrade: h2c` mechanism to negotiate HTTP/2
714/// over a cleartext TCP connection, with a companion prior-knowledge
715/// variant in §3.4. Both paths are intentionally absent from this listener:
716///
717/// - No `Upgrade: h2c` handler: the HTTP/1.1 state machine forwards
718///   `Upgrade` headers to the backend but never responds `101 Switching
719///   Protocols` with an HTTP/2 connection preface.
720/// - No prior-knowledge detection: the listener does not sniff the
721///   24-byte `PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n` magic string; a client
722///   that opens a TCP connection and immediately sends the preface will
723///   be interpreted as a malformed HTTP/1 request and rejected with 400.
724///
725/// RFC 9113 (the current HTTP/2 RFC, obsoleting 7540) formally deprecates
726/// the `Upgrade: h2c` mechanism. Clients that want HTTP/2 MUST use the
727/// TLS ALPN path (`HttpsListener`, selector `h2`) instead. This is
728/// consistent with the industry consensus (nginx, envoy, cloudflare) and
729/// removes an entire class of cleartext-preface smuggling primitives.
730pub struct HttpListener {
731    active: bool,
732    address: SocketAddr,
733    answers: Rc<RefCell<HttpAnswers>>,
734    config: HttpListenerConfig,
735    fronts: Router,
736    listener: Option<MioTcpListener>,
737    tags: BTreeMap<String, CachedTags>,
738    token: Token,
739}
740
741impl ListenerHandler for HttpListener {
742    fn get_addr(&self) -> &SocketAddr {
743        &self.address
744    }
745
746    fn get_tags(&self, key: &str) -> Option<&CachedTags> {
747        self.tags.get(key)
748    }
749
750    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
751        match tags {
752            Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
753            None => self.tags.remove(&key),
754        };
755    }
756
757    fn protocol(&self) -> Protocol {
758        Protocol::HTTP
759    }
760
761    fn public_address(&self) -> SocketAddr {
762        self.config
763            .public_address
764            .map(|addr| addr.into())
765            .unwrap_or(self.address)
766    }
767}
768
769impl L7ListenerHandler for HttpListener {
770    fn get_sticky_name(&self) -> &str {
771        &self.config.sticky_name
772    }
773
774    fn get_sozu_id_header(&self) -> &str {
775        self.config
776            .sozu_id_header
777            .as_deref()
778            .filter(|s| !s.is_empty())
779            .unwrap_or("Sozu-Id")
780    }
781
782    fn get_connect_timeout(&self) -> u32 {
783        self.config.connect_timeout
784    }
785
786    // redundant, already called once in extract_route
787    fn frontend_from_request(
788        &self,
789        host: &str,
790        uri: &str,
791        method: &Method,
792    ) -> Result<RouteResult, FrontendFromRequestError> {
793        let start = Instant::now();
794        let (remaining_input, (hostname, _)) = match hostname_and_port(host.as_bytes()) {
795            Ok(tuple) => tuple,
796            Err(parse_error) => {
797                // parse_error contains a slice of given_host, which should NOT escape this scope
798                return Err(FrontendFromRequestError::HostParse {
799                    host: host.to_owned(),
800                    error: parse_error.to_string(),
801                });
802            }
803        };
804        if remaining_input != &b""[..] {
805            return Err(FrontendFromRequestError::InvalidCharsAfterHost(
806                host.to_owned(),
807            ));
808        }
809
810        /*if port == Some(&b"80"[..]) {
811        // it is alright to call from_utf8_unchecked,
812        // we already verified that there are only ascii
813        // chars in there
814          unsafe { from_utf8_unchecked(hostname) }
815        } else {
816          host
817        }
818        */
819        // SAFETY: `hostname` was just produced by `hostname_and_port` (see
820        // `lib/src/protocol/kawa_h1/parser.rs:133`), which only accepts
821        // bytes matching `is_hostname_char` (alphanumeric, `-`, `.`, plus
822        // `_` under the tolerant-http1-parser feature). All accepted
823        // bytes are ASCII (≤ 0x7F), so the slice is valid single-byte UTF-8.
824        let host = unsafe { from_utf8_unchecked(hostname) };
825
826        let route = self.fronts.lookup(host, uri, method).map_err(|e| {
827            incr!(names::http::FAILED_BACKEND_MATCHING);
828            FrontendFromRequestError::NoClusterFound(e)
829        })?;
830
831        let now = Instant::now();
832
833        if let Some(cluster) = route.cluster_id.as_deref() {
834            time!(
835                names::event_loop::FRONTEND_MATCHING_TIME,
836                cluster,
837                (now - start).as_millis()
838            );
839        }
840
841        Ok(route)
842    }
843
844    fn get_answers(&self) -> &Rc<RefCell<HttpAnswers>> {
845        &self.answers
846    }
847
848    fn get_h2_flood_config(&self) -> crate::protocol::mux::H2FloodConfig {
849        let defaults = crate::protocol::mux::H2FloodConfig::default();
850        crate::protocol::mux::H2FloodConfig {
851            max_rst_stream_per_window: self
852                .config
853                .h2_max_rst_stream_per_window
854                .unwrap_or(defaults.max_rst_stream_per_window),
855            max_ping_per_window: self
856                .config
857                .h2_max_ping_per_window
858                .unwrap_or(defaults.max_ping_per_window),
859            max_settings_per_window: self
860                .config
861                .h2_max_settings_per_window
862                .unwrap_or(defaults.max_settings_per_window),
863            max_empty_data_per_window: self
864                .config
865                .h2_max_empty_data_per_window
866                .unwrap_or(defaults.max_empty_data_per_window),
867            max_window_update_stream0_per_window: self
868                .config
869                .h2_max_window_update_stream0_per_window
870                .unwrap_or(defaults.max_window_update_stream0_per_window),
871            max_continuation_frames: self
872                .config
873                .h2_max_continuation_frames
874                .unwrap_or(defaults.max_continuation_frames),
875            max_glitch_count: self
876                .config
877                .h2_max_glitch_count
878                .unwrap_or(defaults.max_glitch_count),
879            max_rst_stream_lifetime: self
880                .config
881                .h2_max_rst_stream_lifetime
882                .unwrap_or(defaults.max_rst_stream_lifetime),
883            max_rst_stream_abusive_lifetime: self
884                .config
885                .h2_max_rst_stream_abusive_lifetime
886                .unwrap_or(defaults.max_rst_stream_abusive_lifetime),
887            max_rst_stream_emitted_lifetime: self
888                .config
889                .h2_max_rst_stream_emitted_lifetime
890                .unwrap_or(defaults.max_rst_stream_emitted_lifetime),
891            max_header_list_size: self
892                .config
893                .h2_max_header_list_size
894                .unwrap_or(defaults.max_header_list_size),
895            max_header_table_size: self
896                .config
897                .h2_max_header_table_size
898                .unwrap_or(defaults.max_header_table_size),
899            max_header_fields: self
900                .config
901                .h2_max_header_fields
902                .unwrap_or(defaults.max_header_fields),
903        }
904    }
905
906    fn get_h2_connection_config(&self) -> crate::protocol::mux::H2ConnectionConfig {
907        crate::protocol::mux::H2ConnectionConfig::from_optional(
908            self.config.h2_initial_connection_window,
909            self.config.h2_max_concurrent_streams,
910            self.config.h2_stream_shrink_ratio,
911        )
912    }
913
914    fn get_h2_stream_idle_timeout(&self) -> std::time::Duration {
915        // Inherit `back_timeout` when the knob is unset so listeners tuned for
916        // long-running backends do not cancel streams at the 30 s security
917        // floor. The `max(30, …)` keeps the baseline slow-multiplex mitigation
918        // when `back_timeout` is shorter than 30 s. Explicit values (including
919        // ones below 30 s) win — operators under a slow-multiplex attack can
920        // lower the per-stream deadline to cap buffer pinning.
921        let seconds = self
922            .config
923            .h2_stream_idle_timeout_seconds
924            .map(|s| u64::from(s.max(1)))
925            .unwrap_or_else(|| u64::from(self.config.back_timeout).max(30));
926        std::time::Duration::from_secs(seconds)
927    }
928
929    fn get_h2_graceful_shutdown_deadline(&self) -> Option<std::time::Duration> {
930        match self.config.h2_graceful_shutdown_deadline_seconds {
931            None => Some(std::time::Duration::from_secs(5)),
932            Some(0) => None,
933            Some(s) => Some(std::time::Duration::from_secs(u64::from(s))),
934        }
935    }
936
937    fn get_elide_x_real_ip(&self) -> bool {
938        self.config.elide_x_real_ip.unwrap_or(false)
939    }
940
941    fn get_send_x_real_ip(&self) -> bool {
942        self.config.send_x_real_ip.unwrap_or(false)
943    }
944}
945
946pub struct HttpProxy {
947    backends: Rc<RefCell<BackendMap>>,
948    clusters: HashMap<ClusterId, Cluster>,
949    listeners: HashMap<Token, Rc<RefCell<HttpListener>>>,
950    pool: Rc<RefCell<Pool>>,
951    registry: Registry,
952    sessions: Rc<RefCell<SessionManager>>,
953}
954
955impl HttpProxy {
956    pub fn new(
957        registry: Registry,
958        sessions: Rc<RefCell<SessionManager>>,
959        pool: Rc<RefCell<Pool>>,
960        backends: Rc<RefCell<BackendMap>>,
961    ) -> HttpProxy {
962        HttpProxy {
963            backends,
964            clusters: HashMap::new(),
965            listeners: HashMap::new(),
966            pool,
967            registry,
968            sessions,
969        }
970    }
971
972    pub fn add_listener(
973        &mut self,
974        config: HttpListenerConfig,
975        token: Token,
976    ) -> Result<Token, ProxyError> {
977        match self.listeners.entry(token) {
978            Entry::Vacant(entry) => {
979                let http_listener =
980                    HttpListener::new(config, token).map_err(ProxyError::AddListener)?;
981                entry.insert(Rc::new(RefCell::new(http_listener)));
982                Ok(token)
983            }
984            _ => Err(ProxyError::ListenerAlreadyPresent),
985        }
986    }
987
988    pub fn get_listener(&self, token: &Token) -> Option<Rc<RefCell<HttpListener>>> {
989        self.listeners.get(token).cloned()
990    }
991
992    pub fn remove_listener(&mut self, remove: RemoveListener) -> Result<(), ProxyError> {
993        let len = self.listeners.len();
994        let remove_address = remove.address.into();
995        self.listeners
996            .retain(|_, l| l.borrow().address != remove_address);
997
998        if !self.listeners.len() < len {
999            info!(
1000                "{} no HTTP listener to remove at address {:?}",
1001                log_module_context!(),
1002                remove_address
1003            );
1004        }
1005        Ok(())
1006    }
1007
1008    pub fn activate_listener(
1009        &self,
1010        addr: &SocketAddr,
1011        tcp_listener: Option<MioTcpListener>,
1012    ) -> Result<Token, ProxyError> {
1013        let listener = self
1014            .listeners
1015            .values()
1016            .find(|listener| listener.borrow().address == *addr)
1017            .ok_or(ProxyError::NoListenerFound(addr.to_owned()))?;
1018
1019        listener
1020            .borrow_mut()
1021            .activate(&self.registry, tcp_listener)
1022            .map_err(|listener_error| ProxyError::ListenerActivation {
1023                address: *addr,
1024                listener_error,
1025            })
1026    }
1027
1028    pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
1029        self.listeners
1030            .iter()
1031            .filter_map(|(_, listener)| {
1032                let mut owned = listener.borrow_mut();
1033                if let Some(listener) = owned.listener.take() {
1034                    // Reset `active` so a subsequent `activate()` re-binds
1035                    // instead of short-circuiting on the stale flag.
1036                    owned.active = false;
1037                    return Some((owned.address, listener));
1038                }
1039
1040                None
1041            })
1042            .collect()
1043    }
1044
1045    pub fn give_back_listener(
1046        &mut self,
1047        address: SocketAddr,
1048    ) -> Result<(Token, MioTcpListener), ProxyError> {
1049        let listener = self
1050            .listeners
1051            .values()
1052            .find(|listener| listener.borrow().address == address)
1053            .ok_or(ProxyError::NoListenerFound(address))?;
1054
1055        let mut owned = listener.borrow_mut();
1056
1057        let taken_listener = owned
1058            .listener
1059            .take()
1060            .ok_or(ProxyError::UnactivatedListener)?;
1061
1062        // Reset `active` so a subsequent `activate()` re-binds instead of
1063        // short-circuiting on the stale flag.
1064        owned.active = false;
1065
1066        Ok((owned.token, taken_listener))
1067    }
1068
1069    /// Apply a partial-update patch to the identified HTTP listener.
1070    pub fn update_listener(&mut self, patch: UpdateHttpListenerConfig) -> Result<(), ProxyError> {
1071        let address: std::net::SocketAddr = patch.address.into();
1072        let listener = self
1073            .listeners
1074            .values()
1075            .find(|l| l.borrow().address == address)
1076            .ok_or(ProxyError::NoListenerFound(address))?;
1077        listener
1078            .borrow_mut()
1079            .update_config(&patch)
1080            .map_err(|listener_error| ProxyError::ListenerActivation {
1081                address,
1082                listener_error,
1083            })
1084    }
1085
1086    pub fn add_cluster(&mut self, mut cluster: Cluster) -> Result<(), ProxyError> {
1087        // Reconcile the legacy single-status `answer_503` field with the
1088        // new map. The new map wins on collision.
1089        let mut overrides = cluster.answers.clone();
1090        if let Some(answer_503) = cluster.answer_503.take() {
1091            overrides.entry("503".to_owned()).or_insert(answer_503);
1092        }
1093        if !overrides.is_empty() {
1094            for listener in self.listeners.values() {
1095                listener
1096                    .borrow()
1097                    .answers
1098                    .borrow_mut()
1099                    .add_cluster_answers(&cluster.cluster_id, &overrides)
1100                    .map_err(|(name, error)| {
1101                        ProxyError::AddCluster(ListenerError::TemplateParse(name, error))
1102                    })?;
1103            }
1104        }
1105        self.clusters.insert(cluster.cluster_id.clone(), cluster);
1106        Ok(())
1107    }
1108
1109    pub fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), ProxyError> {
1110        self.clusters.remove(cluster_id);
1111
1112        for listener in self.listeners.values() {
1113            listener
1114                .borrow()
1115                .answers
1116                .borrow_mut()
1117                .remove_cluster_answers(cluster_id);
1118        }
1119        Ok(())
1120    }
1121
1122    pub fn add_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
1123        // RFC 6797 §7.2: `Strict-Transport-Security` MUST NOT be sent over
1124        // plaintext HTTP. Reject any AddHttpFrontend that ships an enabled
1125        // HSTS policy before it ever touches the routing trie. This is
1126        // defense in depth on top of the TOML config-load reject in
1127        // `command/src/config.rs`; sites that build a `RequestHttpFrontend`
1128        // outside the TOML path (`sozu frontend http add`, programmatic
1129        // IPC senders) are caught here.
1130        // Reject ANY hsts field on a plain-HTTP frontend, not just
1131        // `enabled = true`. There is no listener-default HSTS to inherit
1132        // on an HTTP listener (the field doesn't exist on
1133        // `HttpListenerConfig`), so the explicit-disable signal
1134        // (`enabled = false`) has nothing to suppress on this surface.
1135        // Carrying any `hsts` field here is a misconfiguration rather
1136        // than a deliberate choice.
1137        if front.hsts.is_some() {
1138            incr!(names::http::HSTS_SUPPRESSED_PLAINTEXT);
1139            return Err(ProxyError::HstsOnPlainHttp(front.address.into()));
1140        }
1141
1142        let front = front.clone().to_frontend().map_err(|request_error| {
1143            ProxyError::WrongInputFrontend {
1144                front: Box::new(front),
1145                error: request_error.to_string(),
1146            }
1147        })?;
1148
1149        let mut listener = self
1150            .listeners
1151            .values()
1152            .find(|l| l.borrow().address == front.address)
1153            .ok_or(ProxyError::NoListenerFound(front.address))?
1154            .borrow_mut();
1155
1156        let hostname = front.hostname.to_owned();
1157        let tags = front.tags.to_owned();
1158
1159        listener
1160            .add_http_front(front)
1161            .map_err(ProxyError::AddFrontend)?;
1162        listener.set_tags(hostname, tags);
1163        Ok(())
1164    }
1165
1166    pub fn remove_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
1167        let front = front.clone().to_frontend().map_err(|request_error| {
1168            ProxyError::WrongInputFrontend {
1169                front: Box::new(front),
1170                error: request_error.to_string(),
1171            }
1172        })?;
1173
1174        let mut listener = self
1175            .listeners
1176            .values()
1177            .find(|l| l.borrow().address == front.address)
1178            .ok_or(ProxyError::NoListenerFound(front.address))?
1179            .borrow_mut();
1180
1181        let hostname = front.hostname.to_owned();
1182
1183        listener
1184            .remove_http_front(front)
1185            .map_err(ProxyError::RemoveFrontend)?;
1186
1187        if !listener.fronts.has_hostname(&hostname) {
1188            listener.set_tags(hostname, None);
1189        }
1190        Ok(())
1191    }
1192
1193    pub fn soft_stop(&mut self) -> Result<(), ProxyError> {
1194        let listeners: HashMap<_, _> = self.listeners.drain().collect();
1195        let mut socket_errors = vec![];
1196        for (_, l) in listeners.iter() {
1197            if let Some(mut sock) = l.borrow_mut().listener.take() {
1198                debug!("{} deregistering socket {:?}", log_module_context!(), sock);
1199                if let Err(e) = self.registry.deregister(&mut sock) {
1200                    let error = format!("socket {sock:?}: {e:?}");
1201                    socket_errors.push(error);
1202                }
1203            }
1204        }
1205
1206        if !socket_errors.is_empty() {
1207            return Err(ProxyError::SoftStop {
1208                proxy_protocol: "HTTP".to_string(),
1209                error: format!("Error deregistering listen sockets: {socket_errors:?}"),
1210            });
1211        }
1212
1213        Ok(())
1214    }
1215
1216    pub fn hard_stop(&mut self) -> Result<(), ProxyError> {
1217        let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1218        let mut socket_errors = vec![];
1219        for (_, l) in listeners.drain() {
1220            if let Some(mut sock) = l.borrow_mut().listener.take() {
1221                debug!("{} deregistering socket {:?}", log_module_context!(), sock);
1222                if let Err(e) = self.registry.deregister(&mut sock) {
1223                    let error = format!("socket {sock:?}: {e:?}");
1224                    socket_errors.push(error);
1225                }
1226            }
1227        }
1228
1229        if !socket_errors.is_empty() {
1230            return Err(ProxyError::HardStop {
1231                proxy_protocol: "HTTP".to_string(),
1232                error: format!("Error deregistering listen sockets: {socket_errors:?}"),
1233            });
1234        }
1235
1236        Ok(())
1237    }
1238}
1239
1240impl HttpListener {
1241    pub fn new(config: HttpListenerConfig, token: Token) -> Result<HttpListener, ListenerError> {
1242        Ok(HttpListener {
1243            active: false,
1244            address: config.address.into(),
1245            answers: Rc::new(RefCell::new({
1246                // Reconcile the legacy `http_answers` per-status fields
1247                // with the new template map: the new map wins on
1248                // collision, the legacy fields fill in any status the
1249                // operator hasn't yet migrated.
1250                let mut answers_map = config.answers.clone();
1251                if let Some(ref legacy) = config.http_answers {
1252                    crate::protocol::http::answers::merge_legacy_into_map(&mut answers_map, legacy);
1253                }
1254                HttpAnswers::new(&answers_map)
1255                    .map_err(|(name, error)| ListenerError::TemplateParse(name, error))?
1256            })),
1257            config,
1258            fronts: Router::new(),
1259            listener: None,
1260            tags: BTreeMap::new(),
1261            token,
1262        })
1263    }
1264
1265    pub fn activate(
1266        &mut self,
1267        registry: &Registry,
1268        tcp_listener: Option<MioTcpListener>,
1269    ) -> Result<Token, ListenerError> {
1270        if self.active {
1271            return Ok(self.token);
1272        }
1273        let address: SocketAddr = self.config.address.into();
1274
1275        let mut listener = match tcp_listener {
1276            Some(tcp_listener) => tcp_listener,
1277            None => {
1278                server_bind(address).map_err(|server_bind_error| ListenerError::Activation {
1279                    address,
1280                    error: server_bind_error.to_string(),
1281                })?
1282            }
1283        };
1284
1285        registry
1286            .register(&mut listener, self.token, Interest::READABLE)
1287            .map_err(ListenerError::SocketRegistration)?;
1288
1289        self.listener = Some(listener);
1290        self.active = true;
1291        Ok(self.token)
1292    }
1293
1294    /// Apply a partial-update patch to this listener's live configuration.
1295    ///
1296    /// Fields absent in the patch (i.e. `None`) are preserved unchanged.
1297    /// If `http_answers` is present only the listener-default templates are
1298    /// replaced; per-cluster overrides in `cluster_custom_answers` are kept.
1299    pub fn update_config(&mut self, patch: &UpdateHttpListenerConfig) -> Result<(), ListenerError> {
1300        // Defense-in-depth validation: main-process ConfigState::dispatch
1301        // validates before scatter, but a raw protobuf client or state replay
1302        // may reach the worker without that check. `StateError` lifts into
1303        // `ListenerError` via `From` so `?` suffices.
1304        validate_h2_flood_knobs_http(patch)?;
1305        if let Some(ref hdr) = patch.sozu_id_header {
1306            validate_sozu_id_header(hdr)?;
1307        }
1308
1309        if let Some(v) = patch.public_address {
1310            self.config.public_address = Some(v);
1311        }
1312        if let Some(v) = patch.expect_proxy {
1313            self.config.expect_proxy = v;
1314        }
1315        if let Some(ref v) = patch.sticky_name {
1316            self.config.sticky_name = v.to_owned();
1317        }
1318        if let Some(v) = patch.front_timeout {
1319            self.config.front_timeout = v;
1320        }
1321        if let Some(v) = patch.back_timeout {
1322            self.config.back_timeout = v;
1323        }
1324        if let Some(v) = patch.connect_timeout {
1325            self.config.connect_timeout = v;
1326        }
1327        if let Some(v) = patch.request_timeout {
1328            self.config.request_timeout = v;
1329        }
1330        if let Some(ref v) = patch.sozu_id_header {
1331            self.config.sozu_id_header = Some(v.to_owned());
1332        }
1333        if let Some(v) = patch.elide_x_real_ip {
1334            self.config.elide_x_real_ip = Some(v);
1335        }
1336        if let Some(v) = patch.send_x_real_ip {
1337            self.config.send_x_real_ip = Some(v);
1338        }
1339
1340        // H2 flood knobs
1341        if let Some(v) = patch.h2_max_rst_stream_per_window {
1342            self.config.h2_max_rst_stream_per_window = Some(v);
1343        }
1344        if let Some(v) = patch.h2_max_ping_per_window {
1345            self.config.h2_max_ping_per_window = Some(v);
1346        }
1347        if let Some(v) = patch.h2_max_settings_per_window {
1348            self.config.h2_max_settings_per_window = Some(v);
1349        }
1350        if let Some(v) = patch.h2_max_empty_data_per_window {
1351            self.config.h2_max_empty_data_per_window = Some(v);
1352        }
1353        if let Some(v) = patch.h2_max_continuation_frames {
1354            self.config.h2_max_continuation_frames = Some(v);
1355        }
1356        if let Some(v) = patch.h2_max_glitch_count {
1357            self.config.h2_max_glitch_count = Some(v);
1358        }
1359        if let Some(v) = patch.h2_initial_connection_window {
1360            self.config.h2_initial_connection_window = Some(v);
1361        }
1362        if let Some(v) = patch.h2_max_concurrent_streams {
1363            self.config.h2_max_concurrent_streams = Some(v);
1364        }
1365        if let Some(v) = patch.h2_stream_shrink_ratio {
1366            self.config.h2_stream_shrink_ratio = Some(v);
1367        }
1368        if let Some(v) = patch.h2_max_rst_stream_lifetime {
1369            self.config.h2_max_rst_stream_lifetime = Some(v);
1370        }
1371        if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
1372            self.config.h2_max_rst_stream_abusive_lifetime = Some(v);
1373        }
1374        if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
1375            self.config.h2_max_rst_stream_emitted_lifetime = Some(v);
1376        }
1377        if let Some(v) = patch.h2_max_header_list_size {
1378            self.config.h2_max_header_list_size = Some(v);
1379        }
1380        if let Some(v) = patch.h2_max_header_table_size {
1381            self.config.h2_max_header_table_size = Some(v);
1382        }
1383        if let Some(v) = patch.h2_max_header_fields {
1384            self.config.h2_max_header_fields = Some(v);
1385        }
1386        if let Some(v) = patch.h2_stream_idle_timeout_seconds {
1387            self.config.h2_stream_idle_timeout_seconds = Some(v);
1388        }
1389        if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
1390            self.config.h2_graceful_shutdown_deadline_seconds = Some(v);
1391        }
1392        if let Some(v) = patch.h2_max_window_update_stream0_per_window {
1393            self.config.h2_max_window_update_stream0_per_window = Some(v);
1394        }
1395
1396        // HTTP answers: merge legacy `http_answers` and the new `answers`
1397        // map on top of the existing config, then rebuild the listener-level
1398        // template registry. Per-cluster overrides in
1399        // `HttpAnswers::cluster_answers` are preserved across the rebuild.
1400        let answers_changed = patch.http_answers.is_some() || !patch.answers.is_empty();
1401        if answers_changed {
1402            if let Some(ref new_answers) = patch.http_answers {
1403                crate::sozu_command::state::merge_custom_http_answers(
1404                    &mut self.config.http_answers,
1405                    new_answers,
1406                );
1407            }
1408            for (code, body) in &patch.answers {
1409                if !body.is_empty() {
1410                    self.config.answers.insert(code.clone(), body.clone());
1411                }
1412            }
1413
1414            let mut answers_map = self.config.answers.clone();
1415            if let Some(ref legacy) = self.config.http_answers {
1416                crate::protocol::http::answers::merge_legacy_into_map(&mut answers_map, legacy);
1417            }
1418            // Rebuild the listener-level templates and migrate the existing
1419            // per-cluster overrides over to the new `HttpAnswers`.
1420            let mut new_answers = HttpAnswers::new(&answers_map)
1421                .map_err(|(name, error)| ListenerError::TemplateParse(name, error))?;
1422            let preserved = std::mem::take(&mut self.answers.borrow_mut().cluster_answers);
1423            new_answers.cluster_answers = preserved;
1424            *self.answers.borrow_mut() = new_answers;
1425        }
1426
1427        Ok(())
1428    }
1429
1430    pub fn add_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
1431        self.fronts
1432            .add_http_front(&http_front)
1433            .map_err(ListenerError::AddFrontend)
1434    }
1435
1436    pub fn remove_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
1437        debug!(
1438            "{} removing http_front {:?}",
1439            log_module_context!(),
1440            http_front
1441        );
1442        self.fronts
1443            .remove_http_front(&http_front)
1444            .map_err(ListenerError::RemoveFrontend)
1445    }
1446
1447    fn accept(&mut self) -> Result<TcpStream, AcceptError> {
1448        if let Some(ref sock) = self.listener {
1449            sock.accept()
1450                .map_err(|e| match e.kind() {
1451                    ErrorKind::WouldBlock => AcceptError::WouldBlock,
1452                    _ => {
1453                        error!("{} accept() IO error: {:?}", log_module_context!(), e);
1454                        AcceptError::IoError
1455                    }
1456                })
1457                .map(|(sock, _)| sock)
1458        } else {
1459            error!(
1460                "{} cannot accept connections, no listening socket available",
1461                log_module_context!()
1462            );
1463            Err(AcceptError::IoError)
1464        }
1465    }
1466}
1467
1468impl ProxyConfiguration for HttpProxy {
1469    fn notify(&mut self, request: WorkerRequest) -> WorkerResponse {
1470        let request_id = request.id.clone();
1471
1472        let result = match request.content.request_type {
1473            Some(RequestType::AddCluster(cluster)) => {
1474                debug!(
1475                    "{} {} add cluster {:?}",
1476                    log_module_context!(),
1477                    request.id,
1478                    cluster
1479                );
1480                self.add_cluster(cluster)
1481            }
1482            Some(RequestType::RemoveCluster(cluster_id)) => {
1483                debug!(
1484                    "{} {} remove cluster {:?}",
1485                    log_module_context!(),
1486                    request_id,
1487                    cluster_id
1488                );
1489                self.remove_cluster(&cluster_id)
1490            }
1491            Some(RequestType::AddHttpFrontend(front)) => {
1492                debug!(
1493                    "{} {} add front {:?}",
1494                    log_module_context!(),
1495                    request_id,
1496                    front
1497                );
1498                self.add_http_frontend(front)
1499            }
1500            Some(RequestType::RemoveHttpFrontend(front)) => {
1501                debug!(
1502                    "{} {} remove front {:?}",
1503                    log_module_context!(),
1504                    request_id,
1505                    front
1506                );
1507                self.remove_http_frontend(front)
1508            }
1509            Some(RequestType::RemoveListener(remove)) => {
1510                debug!(
1511                    "{} removing HTTP listener at address {:?}",
1512                    log_module_context!(),
1513                    remove.address
1514                );
1515                self.remove_listener(remove)
1516            }
1517            Some(RequestType::SoftStop(_)) => {
1518                debug!(
1519                    "{} {} processing soft shutdown",
1520                    log_module_context!(),
1521                    request_id
1522                );
1523                match self.soft_stop() {
1524                    Ok(()) => {
1525                        info!(
1526                            "{} {} soft stop successful",
1527                            log_module_context!(),
1528                            request_id
1529                        );
1530                        return WorkerResponse::processing(request.id);
1531                    }
1532                    Err(e) => Err(e),
1533                }
1534            }
1535            Some(RequestType::HardStop(_)) => {
1536                debug!(
1537                    "{} {} processing hard shutdown",
1538                    log_module_context!(),
1539                    request_id
1540                );
1541                match self.hard_stop() {
1542                    Ok(()) => {
1543                        info!(
1544                            "{} {} hard stop successful",
1545                            log_module_context!(),
1546                            request_id
1547                        );
1548                        return WorkerResponse::processing(request.id);
1549                    }
1550                    Err(e) => Err(e),
1551                }
1552            }
1553            Some(RequestType::Status(_)) => {
1554                debug!("{} {} status", log_module_context!(), request_id);
1555                Ok(())
1556            }
1557            other_command => {
1558                debug!(
1559                    "{} {} unsupported message for HTTP proxy, ignoring: {:?}",
1560                    log_module_context!(),
1561                    request.id,
1562                    other_command
1563                );
1564                Err(ProxyError::UnsupportedMessage)
1565            }
1566        };
1567
1568        match result {
1569            Ok(()) => {
1570                debug!("{} {} successful", log_module_context!(), request_id);
1571                WorkerResponse::ok(request_id)
1572            }
1573            Err(proxy_error) => {
1574                debug!(
1575                    "{} {} unsuccessful: {}",
1576                    log_module_context!(),
1577                    request_id,
1578                    proxy_error
1579                );
1580                WorkerResponse::error(request_id, proxy_error)
1581            }
1582        }
1583    }
1584
1585    fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError> {
1586        if let Some(listener) = self.listeners.get(&Token(token.0)) {
1587            listener.borrow_mut().accept()
1588        } else {
1589            Err(AcceptError::IoError)
1590        }
1591    }
1592
1593    fn create_session(
1594        &mut self,
1595        mut frontend_sock: TcpStream,
1596        listener_token: ListenToken,
1597        wait_time: Duration,
1598        proxy: Rc<RefCell<Self>>,
1599    ) -> Result<(), AcceptError> {
1600        let listener = self
1601            .listeners
1602            .get(&Token(listener_token.0))
1603            .cloned()
1604            .ok_or(AcceptError::IoError)?;
1605
1606        if let Err(e) = frontend_sock.set_nodelay(true) {
1607            error!(
1608                "{} error setting nodelay on front socket({:?}): {:?}",
1609                log_module_context!(),
1610                frontend_sock,
1611                e
1612            );
1613        }
1614        let mut session_manager = self.sessions.borrow_mut();
1615        let slab_len_before = session_manager.slab.len();
1616        let session_entry = session_manager.slab.vacant_entry();
1617        let session_token = Token(session_entry.key());
1618        // The token handed to the new session, used for epoll registration and
1619        // every `remove_session(frontend_token)` call, MUST be the very slab
1620        // key this entry will occupy. A drift here would deregister/free the
1621        // wrong slot on close.
1622        debug_assert_eq!(
1623            session_token.0,
1624            session_entry.key(),
1625            "session token must equal the slab vacant-entry key"
1626        );
1627        let owned = listener.borrow();
1628
1629        if let Err(register_error) = self.registry.register(
1630            &mut frontend_sock,
1631            session_token,
1632            Interest::READABLE | Interest::WRITABLE,
1633        ) {
1634            error!(
1635                "{} error registering listen socket({:?}): {:?}",
1636                log_module_context!(),
1637                frontend_sock,
1638                register_error
1639            );
1640            return Err(AcceptError::RegisterError);
1641        }
1642
1643        let public_address: SocketAddr = match owned.config.public_address {
1644            Some(pub_addr) => pub_addr.into(),
1645            None => owned.config.address.into(),
1646        };
1647
1648        let session = HttpSession::new(
1649            Duration::from_secs(owned.config.back_timeout as u64),
1650            Duration::from_secs(owned.config.connect_timeout as u64),
1651            Duration::from_secs(owned.config.front_timeout as u64),
1652            Duration::from_secs(owned.config.request_timeout as u64),
1653            owned.config.expect_proxy,
1654            listener.clone(),
1655            Rc::downgrade(&self.pool),
1656            proxy,
1657            public_address,
1658            frontend_sock,
1659            session_token,
1660            wait_time,
1661        )?;
1662
1663        // The session's frontend token must be exactly the slab key we are
1664        // about to fill — the registration above and all later token lookups
1665        // depend on it.
1666        debug_assert_eq!(
1667            session.frontend_token, session_token,
1668            "session must own the frontend token it was created with"
1669        );
1670
1671        let session = Rc::new(RefCell::new(session));
1672        session_entry.insert(session);
1673        // Inserting into the previously-vacant entry grows the live session
1674        // count by exactly one (gauge-like ±1 pairing against close()'s
1675        // try_remove). `len()` is the count of occupied slots.
1676        debug_assert_eq!(
1677            session_manager.slab.len(),
1678            slab_len_before + 1,
1679            "creating a session must occupy exactly one new slab slot"
1680        );
1681
1682        Ok(())
1683    }
1684}
1685
1686impl L7Proxy for HttpProxy {
1687    fn kind(&self) -> ListenerType {
1688        ListenerType::Http
1689    }
1690
1691    fn register_socket(
1692        &self,
1693        source: &mut TcpStream,
1694        token: Token,
1695        interest: Interest,
1696    ) -> Result<(), std::io::Error> {
1697        self.registry.register(source, token, interest)
1698    }
1699
1700    fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error> {
1701        self.registry.deregister(tcp_stream)
1702    }
1703
1704    fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token {
1705        let mut session_manager = self.sessions.borrow_mut();
1706        let len_before = session_manager.slab.len();
1707        let entry = session_manager.slab.vacant_entry();
1708        let token = Token(entry.key());
1709        let _entry = entry.insert(session);
1710        // The returned token is the slab key callers use to later remove this
1711        // session, and the insert occupied exactly one new slot.
1712        debug_assert_eq!(
1713            session_manager.slab.len(),
1714            len_before + 1,
1715            "add_session must occupy exactly one new slab slot"
1716        );
1717        debug_assert!(
1718            session_manager.slab.contains(token.0),
1719            "the returned token must index the freshly inserted session"
1720        );
1721        token
1722    }
1723
1724    fn remove_session(&self, token: Token) -> bool {
1725        let mut sessions = self.sessions.borrow_mut();
1726        let was_present = sessions.slab.contains(token.0);
1727        let len_before = sessions.slab.len();
1728        // Drain the session's `(cluster, ip)` accounting before the slab
1729        // slot is freed — once the slot is reused for a new session the
1730        // token would otherwise alias an unrelated set of entries. No-op
1731        // when the session never tracked anything (feature disabled, or
1732        // no request reached `Router::connect`).
1733        sessions.untrack_all_cluster_ip(token);
1734        let removed = sessions.slab.try_remove(token.0).is_some();
1735        // Removal must report exactly whether the slot was occupied, and the
1736        // live count drops by one iff something was actually removed (±1
1737        // pairing against add_session / create_session). The slot is gone.
1738        debug_assert_eq!(
1739            removed, was_present,
1740            "try_remove reports presence iff the slot was occupied"
1741        );
1742        debug_assert_eq!(
1743            sessions.slab.len(),
1744            len_before - removed as usize,
1745            "slab len drops by exactly one iff a session was removed"
1746        );
1747        debug_assert!(
1748            !sessions.slab.contains(token.0),
1749            "the slot must be free after remove_session"
1750        );
1751        removed
1752    }
1753
1754    fn backends(&self) -> Rc<RefCell<BackendMap>> {
1755        self.backends.clone()
1756    }
1757
1758    fn clusters(&self) -> &HashMap<ClusterId, Cluster> {
1759        &self.clusters
1760    }
1761
1762    fn sessions(&self) -> Rc<RefCell<SessionManager>> {
1763        self.sessions.clone()
1764    }
1765}
1766
1767pub mod testing {
1768    use crate::testing::*;
1769
1770    /// this function is not used, but is available for example and testing purposes
1771    pub fn start_http_worker(
1772        config: HttpListenerConfig,
1773        channel: ProxyChannel,
1774        max_buffers: usize,
1775        buffer_size: usize,
1776    ) -> anyhow::Result<()> {
1777        let address = config.address.into();
1778
1779        let ServerParts {
1780            event_loop,
1781            registry,
1782            sessions,
1783            pool,
1784            backends,
1785            client_scm_socket: _,
1786            server_scm_socket,
1787            server_config,
1788        } = prebuild_server(max_buffers, buffer_size, true)?;
1789
1790        let token = {
1791            let mut sessions = sessions.borrow_mut();
1792            let entry = sessions.slab.vacant_entry();
1793            let key = entry.key();
1794            let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1795                protocol: Protocol::HTTPListen,
1796            })));
1797            Token(key)
1798        };
1799
1800        let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1801        proxy
1802            .add_listener(config, token)
1803            .with_context(|| "Failed at creating adding the listener")?;
1804        proxy
1805            .activate_listener(&address, None)
1806            .with_context(|| "Failed at creating activating the listener")?;
1807
1808        let mut server = Server::new(
1809            event_loop,
1810            channel,
1811            server_scm_socket,
1812            sessions,
1813            pool,
1814            backends,
1815            Some(proxy),
1816            None,
1817            None,
1818            server_config,
1819            None,
1820            false,
1821        )
1822        .with_context(|| "Failed at creating server")?;
1823
1824        debug!("{} starting event loop", log_module_context!());
1825        server.run();
1826        debug!("{} ending event loop", log_module_context!());
1827        Ok(())
1828    }
1829}
1830
1831#[cfg(test)]
1832mod tests {
1833    extern crate tiny_http;
1834
1835    use std::{
1836        io::{Read, Write},
1837        net::TcpStream,
1838        str,
1839        sync::{Arc, Barrier},
1840        thread,
1841        time::Duration,
1842    };
1843
1844    use sozu_command::proto::command::SocketAddress;
1845
1846    use super::{testing::start_http_worker, *};
1847    use crate::sozu_command::{
1848        channel::Channel,
1849        config::ListenerBuilder,
1850        proto::command::{
1851            LoadBalancingParams, PathRule, RulePosition, SoftStop, WorkerRequest,
1852            request::RequestType,
1853        },
1854        response::{Backend, HttpFrontend},
1855    };
1856
1857    /*
1858    #[test]
1859    #[cfg(target_pointer_width = "64")]
1860    fn size_test() {
1861      assert_size!(ExpectProxyProtocol<mio::net::TcpStream>, 520);
1862      assert_size!(Http<mio::net::TcpStream>, 1232);
1863      assert_size!(Pipe<mio::net::TcpStream>, 272);
1864      assert_size!(State, 1240);
1865      // fails depending on the platform?
1866      assert_size!(Session, 1592);
1867    }
1868    */
1869
1870    #[test]
1871    fn round_trip() {
1872        setup_test_logger!();
1873        let front_port = crate::testing::provide_port();
1874        let backend_server = Arc::new(
1875            tiny_http::Server::http("127.0.0.1:0").expect("could not create tiny_http server"),
1876        );
1877        let backend_port = backend_server
1878            .server_addr()
1879            .to_ip()
1880            .expect("tiny_http server should bind to IP address")
1881            .port();
1882
1883        let barrier = Arc::new(Barrier::new(2));
1884
1885        let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, front_port))
1886            .to_http(None)
1887            .expect("could not create listener config");
1888
1889        let (mut command, channel) =
1890            Channel::generate(1000, 10000).expect("should create a channel");
1891
1892        thread::scope(|s| {
1893            let backend_handle = backend_server.clone();
1894            let barrier_clone = barrier.to_owned();
1895            s.spawn(move || {
1896                setup_test_logger!();
1897                start_server(&backend_handle, barrier_clone);
1898            });
1899            barrier.wait();
1900
1901            s.spawn(move || {
1902                setup_test_logger!();
1903                start_http_worker(config, channel, 10, 16384)
1904                    .expect("could not start the http server");
1905            });
1906
1907            let front = RequestHttpFrontend {
1908                cluster_id: Some("cluster_1".to_owned()),
1909                address: SocketAddress::new_v4(127, 0, 0, 1, front_port),
1910                hostname: "localhost".to_owned(),
1911                path: PathRule::prefix("/".to_owned()),
1912                ..Default::default()
1913            };
1914            command
1915                .write_message(&WorkerRequest {
1916                    id: "ID_ABCD".to_owned(),
1917                    content: RequestType::AddHttpFrontend(front).into(),
1918                })
1919                .expect("could not send AddHttpFrontend");
1920            let backend = Backend {
1921                cluster_id: "cluster_1".to_owned(),
1922                backend_id: "cluster_1-0".to_owned(),
1923                address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
1924                load_balancing_parameters: Some(LoadBalancingParams::default()),
1925                sticky_id: None,
1926                backup: None,
1927            };
1928            command
1929                .write_message(&WorkerRequest {
1930                    id: "ID_EFGH".to_owned(),
1931                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
1932                })
1933                .expect("could not send AddBackend");
1934
1935            println!("test received: {:?}", command.read_message());
1936            println!("test received: {:?}", command.read_message());
1937
1938            let mut client =
1939                TcpStream::connect(("127.0.0.1", front_port)).expect("could not connect to sozu");
1940
1941            client
1942                .set_read_timeout(Some(Duration::new(1, 0)))
1943                .expect("could not set read timeout");
1944            let request = format!(
1945                "GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\nConnection: Close\r\n\r\n"
1946            );
1947            let w = client.write(request.as_bytes());
1948            println!("http client write: {w:?}");
1949
1950            barrier.wait();
1951            let mut buffer = [0; 4096];
1952            let mut index = 0;
1953
1954            // tiny_http responds with exactly 191 bytes for a "hello world" body
1955            // (headers + body). This is deterministic for a given tiny_http version.
1956            let expected_len = 191;
1957
1958            loop {
1959                assert!(index <= expected_len);
1960                if index == expected_len {
1961                    break;
1962                }
1963
1964                let r = client.read(&mut buffer[index..]);
1965                println!("http client read: {r:?}");
1966                match r {
1967                    Err(e) => panic!("client request should not fail. Error: {e:?}"),
1968                    Ok(sz) => {
1969                        index += sz;
1970                    }
1971                }
1972            }
1973            println!(
1974                "Response: {}",
1975                str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1976            );
1977
1978            // Gracefully stop the sozu worker so the scoped thread can join
1979            command
1980                .write_message(&WorkerRequest {
1981                    id: "ID_STOP".to_owned(),
1982                    content: RequestType::SoftStop(SoftStop {}).into(),
1983                })
1984                .expect("could not send SoftStop");
1985            // Unblock the backend server so its thread can exit
1986            backend_server.unblock();
1987        });
1988    }
1989
1990    #[test]
1991    fn keep_alive() {
1992        setup_test_logger!();
1993        let front_port = crate::testing::provide_port();
1994        let backend_server = Arc::new(
1995            tiny_http::Server::http("127.0.0.1:0").expect("could not create tiny_http server"),
1996        );
1997        let backend_port = backend_server
1998            .server_addr()
1999            .to_ip()
2000            .expect("tiny_http server should bind to IP address")
2001            .port();
2002
2003        let barrier = Arc::new(Barrier::new(2));
2004
2005        let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, front_port))
2006            .to_http(None)
2007            .expect("could not create listener config");
2008
2009        let (mut command, channel) =
2010            Channel::generate(1000, 10000).expect("should create a channel");
2011
2012        thread::scope(|s| {
2013            let backend_handle = backend_server.clone();
2014            let barrier_clone = barrier.to_owned();
2015            s.spawn(move || {
2016                setup_test_logger!();
2017                start_server(&backend_handle, barrier_clone);
2018            });
2019            barrier.wait();
2020
2021            s.spawn(move || {
2022                setup_test_logger!();
2023                start_http_worker(config, channel, 10, 16384)
2024                    .expect("could not start the http server");
2025            });
2026
2027            let front = RequestHttpFrontend {
2028                address: SocketAddress::new_v4(127, 0, 0, 1, front_port),
2029                hostname: "localhost".to_owned(),
2030                path: PathRule::prefix("/".to_owned()),
2031                cluster_id: Some("cluster_1".to_owned()),
2032                ..Default::default()
2033            };
2034            command
2035                .write_message(&WorkerRequest {
2036                    id: "ID_ABCD".to_owned(),
2037                    content: RequestType::AddHttpFrontend(front).into(),
2038                })
2039                .expect("could not send AddHttpFrontend");
2040            let backend = Backend {
2041                address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
2042                backend_id: "cluster_1-0".to_owned(),
2043                backup: None,
2044                cluster_id: "cluster_1".to_owned(),
2045                load_balancing_parameters: Some(LoadBalancingParams::default()),
2046                sticky_id: None,
2047            };
2048            command
2049                .write_message(&WorkerRequest {
2050                    id: "ID_EFGH".to_owned(),
2051                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
2052                })
2053                .expect("could not send AddBackend");
2054
2055            println!("test received: {:?}", command.read_message());
2056            println!("test received: {:?}", command.read_message());
2057
2058            let mut client =
2059                TcpStream::connect(("127.0.0.1", front_port)).expect("could not connect to sozu");
2060            client
2061                .set_read_timeout(Some(Duration::new(5, 0)))
2062                .expect("could not set read timeout");
2063
2064            // tiny_http responds with exactly 191 bytes for a "hello world" body
2065            // (headers + body). This is deterministic for a given tiny_http version.
2066            let expected_len = 191;
2067
2068            let request = format!("GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\n\r\n");
2069            let w = client
2070                .write(request.as_bytes())
2071                .expect("could not write first request");
2072            println!("http client write: {w:?}");
2073            barrier.wait();
2074
2075            let mut buffer = [0; 4096];
2076            let mut index = 0;
2077
2078            loop {
2079                assert!(index <= expected_len);
2080                if index == expected_len {
2081                    break;
2082                }
2083
2084                let r = client.read(&mut buffer[index..]);
2085                println!("http client read: {r:?}");
2086                match r {
2087                    Err(e) => panic!("client request should not fail. Error: {e:?}"),
2088                    Ok(sz) => {
2089                        index += sz;
2090                    }
2091                }
2092            }
2093
2094            println!(
2095                "Response: {}",
2096                str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
2097            );
2098
2099            println!("first request ended, will send second one");
2100            let request2 = format!("GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\n\r\n");
2101            let w2 = client.write(request2.as_bytes());
2102            println!("http client write: {w2:?}");
2103            barrier.wait();
2104
2105            let mut buffer2 = [0; 4096];
2106            let mut index = 0;
2107
2108            loop {
2109                assert!(index <= expected_len);
2110                if index == expected_len {
2111                    break;
2112                }
2113
2114                let r2 = client.read(&mut buffer2[index..]);
2115                println!("http client read: {r2:?}");
2116                match r2 {
2117                    Err(e) => panic!("client request should not fail. Error: {e:?}"),
2118                    Ok(sz) => {
2119                        index += sz;
2120                    }
2121                }
2122            }
2123            println!(
2124                "Response: {}",
2125                str::from_utf8(&buffer2[..index]).expect("could not make string from buffer")
2126            );
2127
2128            // Gracefully stop the sozu worker so the scoped thread can join
2129            command
2130                .write_message(&WorkerRequest {
2131                    id: "ID_STOP".to_owned(),
2132                    content: RequestType::SoftStop(SoftStop {}).into(),
2133                })
2134                .expect("could not send SoftStop");
2135            // Unblock the backend server so its thread can exit
2136            backend_server.unblock();
2137        });
2138    }
2139
2140    use self::tiny_http::Response;
2141
2142    fn start_server(server: &tiny_http::Server, barrier: Arc<Barrier>) {
2143        let addr = server.server_addr();
2144        info!("starting web server on {:?}", addr);
2145        barrier.wait();
2146
2147        for request in server.incoming_requests() {
2148            info!(
2149                "backend web server got request -> method: {:?}, url: {:?}, headers: {:?}",
2150                request.method(),
2151                request.url(),
2152                request.headers()
2153            );
2154
2155            let response = Response::from_string("hello world");
2156            request
2157                .respond(response)
2158                .expect("could not respond to request");
2159            info!("backend web server sent response");
2160            barrier.wait();
2161            info!("server session stopped");
2162        }
2163
2164        println!("server on {addr:?} closed");
2165    }
2166
2167    #[test]
2168    fn frontend_from_request_test() {
2169        let cluster_id1 = "cluster_1".to_owned();
2170        let cluster_id2 = "cluster_2".to_owned();
2171        let cluster_id3 = "cluster_3".to_owned();
2172        let uri1 = "/".to_owned();
2173        let uri2 = "/yolo".to_owned();
2174        let uri3 = "/yolo/swag".to_owned();
2175
2176        let mut fronts = Router::new();
2177        fronts
2178            .add_http_front(&HttpFrontend {
2179                address: "0.0.0.0:80".parse().unwrap(),
2180                hostname: "lolcatho.st".to_owned(),
2181                method: None,
2182                path: PathRule::prefix(uri1),
2183                position: RulePosition::Tree,
2184                cluster_id: Some(cluster_id1),
2185                tags: None,
2186                redirect: None,
2187                redirect_scheme: None,
2188                redirect_template: None,
2189                rewrite_host: None,
2190                rewrite_path: None,
2191                rewrite_port: None,
2192                required_auth: None,
2193                headers: Vec::new(),
2194                hsts: None,
2195            })
2196            .expect("Could not add http frontend");
2197        fronts
2198            .add_http_front(&HttpFrontend {
2199                address: "0.0.0.0:80".parse().unwrap(),
2200                hostname: "lolcatho.st".to_owned(),
2201                method: None,
2202                path: PathRule::prefix(uri2),
2203                position: RulePosition::Tree,
2204                cluster_id: Some(cluster_id2),
2205                tags: None,
2206                redirect: None,
2207                redirect_scheme: None,
2208                redirect_template: None,
2209                rewrite_host: None,
2210                rewrite_path: None,
2211                rewrite_port: None,
2212                required_auth: None,
2213                headers: Vec::new(),
2214                hsts: None,
2215            })
2216            .expect("Could not add http frontend");
2217        fronts
2218            .add_http_front(&HttpFrontend {
2219                address: "0.0.0.0:80".parse().unwrap(),
2220                hostname: "lolcatho.st".to_owned(),
2221                method: None,
2222                path: PathRule::prefix(uri3),
2223                position: RulePosition::Tree,
2224                cluster_id: Some(cluster_id3),
2225                tags: None,
2226                redirect: None,
2227                redirect_scheme: None,
2228                redirect_template: None,
2229                rewrite_host: None,
2230                rewrite_path: None,
2231                rewrite_port: None,
2232                required_auth: None,
2233                headers: Vec::new(),
2234                hsts: None,
2235            })
2236            .expect("Could not add http frontend");
2237        fronts
2238            .add_http_front(&HttpFrontend {
2239                address: "0.0.0.0:80".parse().unwrap(),
2240                hostname: "other.domain".to_owned(),
2241                method: None,
2242                path: PathRule::prefix("/test".to_owned()),
2243                position: RulePosition::Tree,
2244                cluster_id: Some("cluster_1".to_owned()),
2245                tags: None,
2246                redirect: None,
2247                redirect_scheme: None,
2248                redirect_template: None,
2249                rewrite_host: None,
2250                rewrite_path: None,
2251                rewrite_port: None,
2252                required_auth: None,
2253                headers: Vec::new(),
2254                hsts: None,
2255            })
2256            .expect("Could not add http frontend");
2257
2258        let address = SocketAddress::new_v4(127, 0, 0, 1, 1030);
2259
2260        let default_config = ListenerBuilder::new_http(address)
2261            .to_http(None)
2262            .expect("Could not create default HTTP listener config");
2263
2264        let listener = HttpListener {
2265            listener: None,
2266            address: address.into(),
2267            fronts,
2268            answers: Rc::new(RefCell::new(HttpAnswers::new(&BTreeMap::new()).unwrap())),
2269            config: default_config,
2270            token: Token(0),
2271            active: true,
2272            tags: BTreeMap::new(),
2273        };
2274
2275        let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get);
2276        let frontend2 = listener.frontend_from_request("lolcatho.st", "/test", &Method::Get);
2277        let frontend3 = listener.frontend_from_request("lolcatho.st", "/yolo/test", &Method::Get);
2278        let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get);
2279        let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get);
2280        assert_eq!(
2281            frontend1
2282                .expect("should find frontend")
2283                .cluster_id
2284                .as_deref(),
2285            Some("cluster_1")
2286        );
2287        assert_eq!(
2288            frontend2
2289                .expect("should find frontend")
2290                .cluster_id
2291                .as_deref(),
2292            Some("cluster_1")
2293        );
2294        assert_eq!(
2295            frontend3
2296                .expect("should find frontend")
2297                .cluster_id
2298                .as_deref(),
2299            Some("cluster_2")
2300        );
2301        assert_eq!(
2302            frontend4
2303                .expect("should find frontend")
2304                .cluster_id
2305                .as_deref(),
2306            Some("cluster_3")
2307        );
2308        assert!(frontend5.is_err());
2309    }
2310
2311    #[test]
2312    fn h2_stream_idle_timeout_inherits_back_timeout() {
2313        let address = SocketAddress::new_v4(127, 0, 0, 1, 1040);
2314        let build = |back_timeout: u32, explicit: Option<u32>| -> HttpListener {
2315            let mut cfg = ListenerBuilder::new_http(address)
2316                .to_http(None)
2317                .expect("default HTTP listener config");
2318            cfg.back_timeout = back_timeout;
2319            cfg.h2_stream_idle_timeout_seconds = explicit;
2320            HttpListener::new(cfg, Token(0)).expect("build listener")
2321        };
2322
2323        // Knob unset: inherit back_timeout when it exceeds the 30s floor.
2324        assert_eq!(
2325            build(180, None).get_h2_stream_idle_timeout(),
2326            Duration::from_secs(180)
2327        );
2328
2329        // Knob unset, back_timeout below floor: stay at 30s to preserve the
2330        // slow-multiplex Slowloris mitigation.
2331        assert_eq!(
2332            build(5, None).get_h2_stream_idle_timeout(),
2333            Duration::from_secs(30)
2334        );
2335
2336        // Explicit values win in both directions — including below the floor,
2337        // so operators under attack can tighten the deadline.
2338        assert_eq!(
2339            build(180, Some(10)).get_h2_stream_idle_timeout(),
2340            Duration::from_secs(10)
2341        );
2342        assert_eq!(
2343            build(5, Some(600)).get_h2_stream_idle_timeout(),
2344            Duration::from_secs(600)
2345        );
2346
2347        // `Some(0)` is clamped to 1s to keep the deadline non-degenerate.
2348        assert_eq!(
2349            build(180, Some(0)).get_h2_stream_idle_timeout(),
2350            Duration::from_secs(1)
2351        );
2352    }
2353}