Skip to main content

sozu_lib/
http.rs

1use std::{
2    cell::RefCell,
3    collections::{BTreeMap, HashMap, hash_map::Entry},
4    io::ErrorKind,
5    net::{Shutdown, SocketAddr},
6    os::unix::io::AsRawFd,
7    rc::{Rc, Weak},
8    str::from_utf8_unchecked,
9    time::{Duration, Instant},
10};
11
12use mio::{
13    Interest, Registry, Token,
14    net::{TcpListener as MioTcpListener, TcpStream},
15    unix::SourceFd,
16};
17use rusty_ulid::Ulid;
18use sozu_command::{
19    logging::CachedTags,
20    proto::command::{
21        Cluster, HttpListenerConfig, ListenerType, RemoveListener, RequestHttpFrontend,
22        UpdateHttpListenerConfig, WorkerRequest, WorkerResponse, request::RequestType,
23    },
24    ready::Ready,
25    response::HttpFrontend,
26    state::{ClusterId, validate_h2_flood_knobs_http, validate_sozu_id_header},
27};
28
29use crate::metrics::names;
30use crate::{
31    AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
32    ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed,
33    SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
34    backends::BackendMap,
35    pool::Pool,
36    protocol::{
37        Pipe, SessionState,
38        http::{
39            answers::HttpAnswers,
40            parser::{Method, hostname_and_port},
41        },
42        mux::{self, Mux, MuxClear},
43        proxy_protocol::expect::ExpectProxyProtocol,
44    },
45    router::{RouteResult, Router},
46    server::{ListenToken, SessionManager},
47    socket::server_bind,
48    timer::TimeoutContainer,
49};
50
51#[derive(PartialEq, Eq)]
52pub enum SessionStatus {
53    Normal,
54    DefaultAnswer,
55}
56
57StateMachineBuilder! {
58    /// The various Stages of an HTTP connection:
59    ///
60    /// 1. optional (ExpectProxyProtocol)
61    /// 2. HTTP (via Mux in H1 mode)
62    /// 3. WebSocket (passthrough)
63    enum HttpStateMachine impl SessionState {
64        Expect(ExpectProxyProtocol<TcpStream>),
65        Mux(MuxClear),
66        WebSocket(Pipe<crate::socket::SessionTcpStream, HttpListener>),
67    }
68}
69
70/// Module-level prefix for log lines emitted from this file when no session
71/// is in scope. Produces a bold bright-white `HTTP` label in colored mode.
72/// Used by [`HttpProxy`] / [`HttpListener`] callbacks (notify, add_cluster,
73/// add_*_frontend, accept, soft_stop, hard_stop, etc.) which own a token map
74/// keyed by listener and have no `frontend_token` of their own.
75macro_rules! log_module_context {
76    () => {{
77        let (open, reset, _, _, _) = sozu_command::logging::ansi_palette();
78        format!("{open}HTTP{reset}\t >>>", open = open, reset = reset)
79    }};
80}
81
82/// Per-session prefix for log lines emitted with an [`HttpSession`] in
83/// scope. Renders the canonical `\tHTTP\tSession(...)\t >>>` envelope from
84/// the session's `frontend_token` (mirrors the bracket convention used by
85/// `MUX-*`, `RUSTLS`, `PIPE`). Operators can grep-correlate against the
86/// token id across log lines for the same H1 connection.
87macro_rules! log_context {
88    ($self:expr) => {{
89        let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
90        format!(
91            "{open}HTTP{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset})\t >>>",
92            open = open,
93            reset = reset,
94            grey = grey,
95            gray = gray,
96            white = white,
97            frontend = $self.frontend_token.0,
98        )
99    }};
100}
101
102/// HTTP Session to insert in the SessionManager
103///
104/// 1 session <=> 1 HTTP connection (client to sozu)
105pub struct HttpSession {
106    configured_backend_timeout: Duration,
107    configured_connect_timeout: Duration,
108    configured_frontend_timeout: Duration,
109    frontend_token: Token,
110    last_event: Instant,
111    listener: Rc<RefCell<HttpListener>>,
112    metrics: SessionMetrics,
113    pool: Weak<RefCell<Pool>>,
114    proxy: Rc<RefCell<HttpProxy>>,
115    state: HttpStateMachine,
116    has_been_closed: bool,
117}
118
119impl HttpSession {
120    #[allow(clippy::too_many_arguments)]
121    pub fn new(
122        configured_backend_timeout: Duration,
123        configured_connect_timeout: Duration,
124        configured_frontend_timeout: Duration,
125        configured_request_timeout: Duration,
126        expect_proxy: bool,
127        listener: Rc<RefCell<HttpListener>>,
128        pool: Weak<RefCell<Pool>>,
129        proxy: Rc<RefCell<HttpProxy>>,
130        public_address: SocketAddr,
131        sock: TcpStream,
132        token: Token,
133        wait_time: Duration,
134    ) -> Result<Self, AcceptError> {
135        let request_id = Ulid::generate();
136        let container_frontend_timeout = TimeoutContainer::new(configured_request_timeout, token);
137
138        let state = if expect_proxy {
139            trace!("{} starting in expect proxy state", log_module_context!());
140            gauge_add!(names::protocol::PROXY_EXPECT, 1);
141
142            HttpStateMachine::Expect(ExpectProxyProtocol::new(
143                container_frontend_timeout,
144                sock,
145                token,
146                request_id,
147            ))
148        } else {
149            gauge_add!(names::protocol::HTTP, 1);
150            let session_address = sock.peer_addr().ok();
151            let session_ulid = rusty_ulid::Ulid::generate();
152            let sock = crate::socket::SessionTcpStream::new(sock, session_ulid, session_address);
153
154            let frontend =
155                mux::Connection::new_h1_server(session_ulid, sock, container_frontend_timeout);
156            let router = mux::Router::new(configured_backend_timeout, configured_connect_timeout);
157            let mut context = mux::Context::new(
158                session_ulid,
159                pool.clone(),
160                listener.clone(),
161                session_address,
162                public_address,
163            );
164            context
165                .create_stream(request_id, 1 << 16)
166                .ok_or(AcceptError::BufferCapacityReached)?;
167            HttpStateMachine::Mux(Mux {
168                configured_frontend_timeout,
169                frontend_token: token,
170                frontend,
171                router,
172                context,
173                session_ulid,
174            })
175        };
176
177        let metrics = SessionMetrics::new(Some(wait_time));
178        Ok(HttpSession {
179            configured_backend_timeout,
180            configured_connect_timeout,
181            configured_frontend_timeout,
182            frontend_token: token,
183            has_been_closed: false,
184            last_event: Instant::now(),
185            listener,
186            metrics,
187            pool,
188            proxy,
189            state,
190        })
191    }
192
193    pub fn upgrade(&mut self) -> SessionIsToBeClosed {
194        debug!("{} upgrade", log_context!(self));
195        let new_state = match self.state.take() {
196            HttpStateMachine::Mux(mux) => self.upgrade_mux(mux),
197            HttpStateMachine::Expect(expect) => self.upgrade_expect(expect),
198            HttpStateMachine::WebSocket(ws) => self.upgrade_websocket(ws),
199            HttpStateMachine::FailedUpgrade(_) => {
200                // Reaching this arm means a prior upgrade already returned
201                // `None` and the session should have been closed. Fall back
202                // to closing cleanly instead of panicking the worker.
203                error!(
204                    "{} upgrade called on FailedUpgrade state; closing session",
205                    log_context!(self)
206                );
207                None
208            }
209        };
210
211        match new_state {
212            Some(state) => {
213                self.state = state;
214                false
215            }
216            // The state stays FailedUpgrade, but the Session should be closed right after
217            None => true,
218        }
219    }
220
221    fn upgrade_expect(
222        &mut self,
223        expect: ExpectProxyProtocol<TcpStream>,
224    ) -> Option<HttpStateMachine> {
225        debug!("{} switching to HTTP", log_context!(self));
226        match expect
227            .addresses
228            .as_ref()
229            .map(|add| (add.destination(), add.source()))
230        {
231            Some((Some(public_address), Some(session_address))) => {
232                let session_ulid = rusty_ulid::Ulid::generate();
233                let frontend = mux::Connection::new_h1_server(
234                    session_ulid,
235                    crate::socket::SessionTcpStream::new(
236                        expect.frontend,
237                        session_ulid,
238                        Some(session_address),
239                    ),
240                    expect.container_frontend_timeout,
241                );
242                let router = mux::Router::new(
243                    self.configured_backend_timeout,
244                    self.configured_connect_timeout,
245                );
246                let mut context = mux::Context::new(
247                    session_ulid,
248                    self.pool.clone(),
249                    self.listener.clone(),
250                    Some(session_address),
251                    public_address,
252                );
253                if context.create_stream(expect.request_id, 1 << 16).is_none() {
254                    error!(
255                        "{} expect upgrade failed: could not create stream",
256                        log_context!(self)
257                    );
258                    return None;
259                }
260                let mut mux = Mux {
261                    configured_frontend_timeout: self.configured_frontend_timeout,
262                    frontend_token: self.frontend_token,
263                    frontend,
264                    router,
265                    context,
266                    session_ulid,
267                };
268                mux.frontend.readiness_mut().event = expect.frontend_readiness.event;
269
270                gauge_add!(names::protocol::PROXY_EXPECT, -1);
271                gauge_add!(names::protocol::HTTP, 1);
272                Some(HttpStateMachine::Mux(mux))
273            }
274            _ => {
275                debug!(
276                    "{} expect upgrade failed: bad header {:?}",
277                    log_context!(self),
278                    expect.addresses
279                );
280                None
281            }
282        }
283    }
284
285    fn upgrade_mux(&mut self, mut mux: MuxClear) -> Option<HttpStateMachine> {
286        debug!("{} mux switching to ws", log_context!(self));
287        let Some(stream) = mux.context.streams.pop() else {
288            error!(
289                "{} upgrade_mux: no stream attached to the mux session, closing",
290                log_context!(self)
291            );
292            return None;
293        };
294        // http.active_requests was already decremented by generate_access_log()
295        // in h1.rs before MuxResult::Upgrade was returned to us.
296
297        let (frontend_readiness, frontend_socket, mut container_frontend_timeout) =
298            match mux.frontend {
299                mux::Connection::H1(mux::ConnectionH1 {
300                    readiness,
301                    socket,
302                    timeout_container,
303                    ..
304                }) => (readiness, socket, timeout_container),
305                mux::Connection::H2(_) => {
306                    error!(
307                        "{} only h1<->h1 connections can upgrade to websocket",
308                        log_context!(self)
309                    );
310                    return None;
311                }
312            };
313
314        let mux::StreamState::Linked(back_token) = stream.state else {
315            error!(
316                "{} upgrading stream should be linked to a backend",
317                log_context!(self)
318            );
319            return None;
320        };
321        let Some(backend) = mux.router.backends.remove(&back_token) else {
322            error!(
323                "{} upgrade_mux: backend for token {:?} is missing (already disconnected?), closing",
324                log_context!(self),
325                back_token
326            );
327            return None;
328        };
329        let (cluster_id, backend, backend_readiness, backend_socket, mut container_backend_timeout) =
330            match backend {
331                mux::Connection::H1(mux::ConnectionH1 {
332                    position:
333                        mux::Position::Client(cluster_id, backend, mux::BackendStatus::Connected),
334                    readiness,
335                    socket,
336                    timeout_container,
337                    ..
338                }) => (cluster_id, backend, readiness, socket, timeout_container),
339                mux::Connection::H1(_) => {
340                    error!(
341                        "{} the backend disconnected just after upgrade, abort",
342                        log_context!(self)
343                    );
344                    return None;
345                }
346                mux::Connection::H2(_) => {
347                    error!(
348                        "{} only h1<->h1 connections can upgrade to websocket",
349                        log_context!(self)
350                    );
351                    return None;
352                }
353            };
354
355        let ws_context = stream.context.websocket_context();
356
357        container_frontend_timeout.reset();
358        container_backend_timeout.reset();
359
360        let backend_id = backend.borrow().backend_id.clone();
361        // `Pipe::backend_socket` is typed `Option<TcpStream>` (raw, pre-mux).
362        // The mux wraps every backend TCP socket in `SessionTcpStream` so
363        // SOCKET-layer errors carry the session ULID; unwrap back to the
364        // plain `TcpStream` here to feed Pipe's legacy shape.
365        let backend_socket = backend_socket.stream;
366        let mut pipe = Pipe::new(
367            stream.back.storage.buffer,
368            Some(backend_id),
369            Some(backend_socket),
370            Some(backend),
371            Some(container_backend_timeout),
372            Some(container_frontend_timeout),
373            Some(cluster_id),
374            stream.front.storage.buffer,
375            self.frontend_token,
376            frontend_socket,
377            self.listener.clone(),
378            Protocol::HTTP,
379            stream.context.session_id,
380            stream.context.id,
381            stream.context.session_address,
382            ws_context,
383        );
384
385        pipe.frontend_readiness.event = frontend_readiness.event;
386        pipe.backend_readiness.event = backend_readiness.event;
387        pipe.set_back_token(back_token);
388
389        // http.active_requests was already decremented by generate_access_log()
390        // in h1.rs when the 101 response was written (before MuxResult::Upgrade).
391        gauge_add!(names::protocol::HTTP, -1);
392        gauge_add!(names::protocol::WS, 1);
393        gauge_add!(names::websocket::ACTIVE_REQUESTS, 1);
394        Some(HttpStateMachine::WebSocket(pipe))
395    }
396
397    fn upgrade_websocket(
398        &self,
399        ws: Pipe<crate::socket::SessionTcpStream, HttpListener>,
400    ) -> Option<HttpStateMachine> {
401        // what do we do here?
402        error!(
403            "{} upgrade called on WS, this should not happen",
404            log_context!(self)
405        );
406        Some(HttpStateMachine::WebSocket(ws))
407    }
408}
409
410impl ProxySession for HttpSession {
411    fn close(&mut self) {
412        if self.has_been_closed {
413            return;
414        }
415
416        trace!("{} closing HTTP session", log_context!(self));
417        self.metrics.service_stop();
418
419        // Restore gauges
420        match self.state.marker() {
421            StateMarker::Expect => gauge_add!(names::protocol::PROXY_EXPECT, -1),
422            StateMarker::Mux => gauge_add!(names::protocol::HTTP, -1),
423            StateMarker::WebSocket => {
424                gauge_add!(names::protocol::WS, -1);
425                gauge_add!(names::websocket::ACTIVE_REQUESTS, -1);
426            }
427        }
428
429        if self.state.failed() {
430            match self.state.marker() {
431                StateMarker::Expect => incr!(names::http::UPGRADE_EXPECT_FAILED),
432                StateMarker::Mux => incr!(names::http::UPGRADE_MUX_FAILED),
433                StateMarker::WebSocket => incr!(names::http::UPGRADE_WS_FAILED),
434            }
435            // FailedUpgrade means the socket was consumed by a failed upgrade
436            // attempt, so we can only close the state (no-op) and remove the
437            // session — cancel_timeouts / front_socket are unreachable.
438            self.state.close(self.proxy.clone(), &mut self.metrics);
439            self.proxy.borrow().remove_session(self.frontend_token);
440            self.has_been_closed = true;
441            return;
442        }
443
444        self.state.cancel_timeouts();
445        // defer backend closing to the state
446        self.state.close(self.proxy.clone(), &mut self.metrics);
447
448        let front_socket = self.state.front_socket();
449        // invariant: write-only shutdown — Shutdown::Both on a TLS frontend
450        // discards the receive buffer and elicits TCP RST, truncating the
451        // already-queued response. Canonical write-up: `lib/src/https.rs:650-655`.
452        // Backend sockets follow the same discipline for symmetry.
453        if let Err(e) = front_socket.shutdown(Shutdown::Write) {
454            // error 107 NotConnected can happen when was never fully connected, or was already disconnected due to error
455            if e.kind() != ErrorKind::NotConnected {
456                error!(
457                    "{} error shutting down front socket({:?}): {:?}",
458                    log_context!(self),
459                    front_socket,
460                    e
461                )
462            }
463        }
464
465        // deregister the frontend and remove it
466        let proxy = self.proxy.borrow();
467        let fd = front_socket.as_raw_fd();
468        if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
469            error!(
470                "{} error deregistering front socket({:?}) while closing HTTP session: {:?}",
471                log_context!(self),
472                fd,
473                e
474            );
475        }
476        proxy.remove_session(self.frontend_token);
477
478        self.has_been_closed = true;
479    }
480
481    fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
482        let state_result = self.state.timeout(token, &mut self.metrics);
483        state_result == StateResult::CloseSession
484    }
485
486    fn protocol(&self) -> Protocol {
487        Protocol::HTTP
488    }
489
490    fn update_readiness(&mut self, token: Token, events: Ready) {
491        trace!(
492            "{} token {:?} got event {}",
493            log_context!(self),
494            token,
495            super::ready_to_string(events)
496        );
497        self.last_event = Instant::now();
498        self.metrics.wait_start();
499        self.state.update_readiness(token, events);
500    }
501
502    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
503        self.metrics.service_start();
504
505        let session_result =
506            self.state
507                .ready(session.clone(), self.proxy.clone(), &mut self.metrics);
508
509        let to_be_closed = match session_result {
510            SessionResult::Close => true,
511            SessionResult::Continue => false,
512            SessionResult::Upgrade => match self.upgrade() {
513                false => self.ready(session),
514                true => true,
515            },
516        };
517
518        self.metrics.service_stop();
519        to_be_closed
520    }
521
522    fn shutting_down(&mut self) -> SessionIsToBeClosed {
523        self.state.shutting_down()
524    }
525
526    fn last_event(&self) -> Instant {
527        self.last_event
528    }
529
530    fn print_session(&self) {
531        self.state.print_state("HTTP");
532        error!("{} Metrics: {:?}", log_context!(self), self.metrics);
533    }
534
535    fn frontend_token(&self) -> Token {
536        self.frontend_token
537    }
538}
539
540pub type Hostname = String;
541
542/// Cleartext HTTP/1.x listener.
543///
544/// # HTTP/2 over cleartext (h2c) is NOT supported
545///
546/// RFC 7540 §3.2 specified an `Upgrade: h2c` mechanism to negotiate HTTP/2
547/// over a cleartext TCP connection, with a companion prior-knowledge
548/// variant in §3.4. Both paths are intentionally absent from this listener:
549///
550/// - No `Upgrade: h2c` handler: the HTTP/1.1 state machine forwards
551///   `Upgrade` headers to the backend but never responds `101 Switching
552///   Protocols` with an HTTP/2 connection preface.
553/// - No prior-knowledge detection: the listener does not sniff the
554///   24-byte `PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n` magic string; a client
555///   that opens a TCP connection and immediately sends the preface will
556///   be interpreted as a malformed HTTP/1 request and rejected with 400.
557///
558/// RFC 9113 (the current HTTP/2 RFC, obsoleting 7540) formally deprecates
559/// the `Upgrade: h2c` mechanism. Clients that want HTTP/2 MUST use the
560/// TLS ALPN path (`HttpsListener`, selector `h2`) instead. This is
561/// consistent with the industry consensus (nginx, envoy, cloudflare) and
562/// removes an entire class of cleartext-preface smuggling primitives.
563pub struct HttpListener {
564    active: bool,
565    address: SocketAddr,
566    answers: Rc<RefCell<HttpAnswers>>,
567    config: HttpListenerConfig,
568    fronts: Router,
569    listener: Option<MioTcpListener>,
570    tags: BTreeMap<String, CachedTags>,
571    token: Token,
572}
573
574impl ListenerHandler for HttpListener {
575    fn get_addr(&self) -> &SocketAddr {
576        &self.address
577    }
578
579    fn get_tags(&self, key: &str) -> Option<&CachedTags> {
580        self.tags.get(key)
581    }
582
583    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
584        match tags {
585            Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
586            None => self.tags.remove(&key),
587        };
588    }
589
590    fn protocol(&self) -> Protocol {
591        Protocol::HTTP
592    }
593
594    fn public_address(&self) -> SocketAddr {
595        self.config
596            .public_address
597            .map(|addr| addr.into())
598            .unwrap_or(self.address)
599    }
600}
601
602impl L7ListenerHandler for HttpListener {
603    fn get_sticky_name(&self) -> &str {
604        &self.config.sticky_name
605    }
606
607    fn get_sozu_id_header(&self) -> &str {
608        self.config
609            .sozu_id_header
610            .as_deref()
611            .filter(|s| !s.is_empty())
612            .unwrap_or("Sozu-Id")
613    }
614
615    fn get_connect_timeout(&self) -> u32 {
616        self.config.connect_timeout
617    }
618
619    // redundant, already called once in extract_route
620    fn frontend_from_request(
621        &self,
622        host: &str,
623        uri: &str,
624        method: &Method,
625    ) -> Result<RouteResult, FrontendFromRequestError> {
626        let start = Instant::now();
627        let (remaining_input, (hostname, _)) = match hostname_and_port(host.as_bytes()) {
628            Ok(tuple) => tuple,
629            Err(parse_error) => {
630                // parse_error contains a slice of given_host, which should NOT escape this scope
631                return Err(FrontendFromRequestError::HostParse {
632                    host: host.to_owned(),
633                    error: parse_error.to_string(),
634                });
635            }
636        };
637        if remaining_input != &b""[..] {
638            return Err(FrontendFromRequestError::InvalidCharsAfterHost(
639                host.to_owned(),
640            ));
641        }
642
643        /*if port == Some(&b"80"[..]) {
644        // it is alright to call from_utf8_unchecked,
645        // we already verified that there are only ascii
646        // chars in there
647          unsafe { from_utf8_unchecked(hostname) }
648        } else {
649          host
650        }
651        */
652        // SAFETY: `hostname` was just produced by `hostname_and_port` (see
653        // `lib/src/protocol/kawa_h1/parser.rs:133`), which only accepts
654        // bytes matching `is_hostname_char` (alphanumeric, `-`, `.`, plus
655        // `_` under the tolerant-http1-parser feature). All accepted
656        // bytes are ASCII (≤ 0x7F), so the slice is valid single-byte UTF-8.
657        let host = unsafe { from_utf8_unchecked(hostname) };
658
659        let route = self.fronts.lookup(host, uri, method).map_err(|e| {
660            incr!(names::http::FAILED_BACKEND_MATCHING);
661            FrontendFromRequestError::NoClusterFound(e)
662        })?;
663
664        let now = Instant::now();
665
666        if let Some(cluster) = route.cluster_id.as_deref() {
667            time!(
668                names::event_loop::FRONTEND_MATCHING_TIME,
669                cluster,
670                (now - start).as_millis()
671            );
672        }
673
674        Ok(route)
675    }
676
677    fn get_answers(&self) -> &Rc<RefCell<HttpAnswers>> {
678        &self.answers
679    }
680
681    fn get_h2_flood_config(&self) -> crate::protocol::mux::H2FloodConfig {
682        let defaults = crate::protocol::mux::H2FloodConfig::default();
683        crate::protocol::mux::H2FloodConfig {
684            max_rst_stream_per_window: self
685                .config
686                .h2_max_rst_stream_per_window
687                .unwrap_or(defaults.max_rst_stream_per_window),
688            max_ping_per_window: self
689                .config
690                .h2_max_ping_per_window
691                .unwrap_or(defaults.max_ping_per_window),
692            max_settings_per_window: self
693                .config
694                .h2_max_settings_per_window
695                .unwrap_or(defaults.max_settings_per_window),
696            max_empty_data_per_window: self
697                .config
698                .h2_max_empty_data_per_window
699                .unwrap_or(defaults.max_empty_data_per_window),
700            max_window_update_stream0_per_window: self
701                .config
702                .h2_max_window_update_stream0_per_window
703                .unwrap_or(defaults.max_window_update_stream0_per_window),
704            max_continuation_frames: self
705                .config
706                .h2_max_continuation_frames
707                .unwrap_or(defaults.max_continuation_frames),
708            max_glitch_count: self
709                .config
710                .h2_max_glitch_count
711                .unwrap_or(defaults.max_glitch_count),
712            max_rst_stream_lifetime: self
713                .config
714                .h2_max_rst_stream_lifetime
715                .unwrap_or(defaults.max_rst_stream_lifetime),
716            max_rst_stream_abusive_lifetime: self
717                .config
718                .h2_max_rst_stream_abusive_lifetime
719                .unwrap_or(defaults.max_rst_stream_abusive_lifetime),
720            max_rst_stream_emitted_lifetime: self
721                .config
722                .h2_max_rst_stream_emitted_lifetime
723                .unwrap_or(defaults.max_rst_stream_emitted_lifetime),
724            max_header_list_size: self
725                .config
726                .h2_max_header_list_size
727                .unwrap_or(defaults.max_header_list_size),
728            max_header_table_size: self
729                .config
730                .h2_max_header_table_size
731                .unwrap_or(defaults.max_header_table_size),
732        }
733    }
734
735    fn get_h2_connection_config(&self) -> crate::protocol::mux::H2ConnectionConfig {
736        crate::protocol::mux::H2ConnectionConfig::from_optional(
737            self.config.h2_initial_connection_window,
738            self.config.h2_max_concurrent_streams,
739            self.config.h2_stream_shrink_ratio,
740        )
741    }
742
743    fn get_h2_stream_idle_timeout(&self) -> std::time::Duration {
744        // Inherit `back_timeout` when the knob is unset so listeners tuned for
745        // long-running backends do not cancel streams at the 30 s security
746        // floor. The `max(30, …)` keeps the baseline slow-multiplex mitigation
747        // when `back_timeout` is shorter than 30 s. Explicit values (including
748        // ones below 30 s) win — operators under a slow-multiplex attack can
749        // lower the per-stream deadline to cap buffer pinning.
750        let seconds = self
751            .config
752            .h2_stream_idle_timeout_seconds
753            .map(|s| u64::from(s.max(1)))
754            .unwrap_or_else(|| u64::from(self.config.back_timeout).max(30));
755        std::time::Duration::from_secs(seconds)
756    }
757
758    fn get_h2_graceful_shutdown_deadline(&self) -> Option<std::time::Duration> {
759        match self.config.h2_graceful_shutdown_deadline_seconds {
760            None => Some(std::time::Duration::from_secs(5)),
761            Some(0) => None,
762            Some(s) => Some(std::time::Duration::from_secs(u64::from(s))),
763        }
764    }
765
766    fn get_elide_x_real_ip(&self) -> bool {
767        self.config.elide_x_real_ip.unwrap_or(false)
768    }
769
770    fn get_send_x_real_ip(&self) -> bool {
771        self.config.send_x_real_ip.unwrap_or(false)
772    }
773}
774
775pub struct HttpProxy {
776    backends: Rc<RefCell<BackendMap>>,
777    clusters: HashMap<ClusterId, Cluster>,
778    listeners: HashMap<Token, Rc<RefCell<HttpListener>>>,
779    pool: Rc<RefCell<Pool>>,
780    registry: Registry,
781    sessions: Rc<RefCell<SessionManager>>,
782}
783
784impl HttpProxy {
785    pub fn new(
786        registry: Registry,
787        sessions: Rc<RefCell<SessionManager>>,
788        pool: Rc<RefCell<Pool>>,
789        backends: Rc<RefCell<BackendMap>>,
790    ) -> HttpProxy {
791        HttpProxy {
792            backends,
793            clusters: HashMap::new(),
794            listeners: HashMap::new(),
795            pool,
796            registry,
797            sessions,
798        }
799    }
800
801    pub fn add_listener(
802        &mut self,
803        config: HttpListenerConfig,
804        token: Token,
805    ) -> Result<Token, ProxyError> {
806        match self.listeners.entry(token) {
807            Entry::Vacant(entry) => {
808                let http_listener =
809                    HttpListener::new(config, token).map_err(ProxyError::AddListener)?;
810                entry.insert(Rc::new(RefCell::new(http_listener)));
811                Ok(token)
812            }
813            _ => Err(ProxyError::ListenerAlreadyPresent),
814        }
815    }
816
817    pub fn get_listener(&self, token: &Token) -> Option<Rc<RefCell<HttpListener>>> {
818        self.listeners.get(token).cloned()
819    }
820
821    pub fn remove_listener(&mut self, remove: RemoveListener) -> Result<(), ProxyError> {
822        let len = self.listeners.len();
823        let remove_address = remove.address.into();
824        self.listeners
825            .retain(|_, l| l.borrow().address != remove_address);
826
827        if !self.listeners.len() < len {
828            info!(
829                "{} no HTTP listener to remove at address {:?}",
830                log_module_context!(),
831                remove_address
832            );
833        }
834        Ok(())
835    }
836
837    pub fn activate_listener(
838        &self,
839        addr: &SocketAddr,
840        tcp_listener: Option<MioTcpListener>,
841    ) -> Result<Token, ProxyError> {
842        let listener = self
843            .listeners
844            .values()
845            .find(|listener| listener.borrow().address == *addr)
846            .ok_or(ProxyError::NoListenerFound(addr.to_owned()))?;
847
848        listener
849            .borrow_mut()
850            .activate(&self.registry, tcp_listener)
851            .map_err(|listener_error| ProxyError::ListenerActivation {
852                address: *addr,
853                listener_error,
854            })
855    }
856
857    pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
858        self.listeners
859            .iter()
860            .filter_map(|(_, listener)| {
861                let mut owned = listener.borrow_mut();
862                if let Some(listener) = owned.listener.take() {
863                    // Reset `active` so a subsequent `activate()` re-binds
864                    // instead of short-circuiting on the stale flag.
865                    owned.active = false;
866                    return Some((owned.address, listener));
867                }
868
869                None
870            })
871            .collect()
872    }
873
874    pub fn give_back_listener(
875        &mut self,
876        address: SocketAddr,
877    ) -> Result<(Token, MioTcpListener), ProxyError> {
878        let listener = self
879            .listeners
880            .values()
881            .find(|listener| listener.borrow().address == address)
882            .ok_or(ProxyError::NoListenerFound(address))?;
883
884        let mut owned = listener.borrow_mut();
885
886        let taken_listener = owned
887            .listener
888            .take()
889            .ok_or(ProxyError::UnactivatedListener)?;
890
891        // Reset `active` so a subsequent `activate()` re-binds instead of
892        // short-circuiting on the stale flag.
893        owned.active = false;
894
895        Ok((owned.token, taken_listener))
896    }
897
898    /// Apply a partial-update patch to the identified HTTP listener.
899    pub fn update_listener(&mut self, patch: UpdateHttpListenerConfig) -> Result<(), ProxyError> {
900        let address: std::net::SocketAddr = patch.address.into();
901        let listener = self
902            .listeners
903            .values()
904            .find(|l| l.borrow().address == address)
905            .ok_or(ProxyError::NoListenerFound(address))?;
906        listener
907            .borrow_mut()
908            .update_config(&patch)
909            .map_err(|listener_error| ProxyError::ListenerActivation {
910                address,
911                listener_error,
912            })
913    }
914
915    pub fn add_cluster(&mut self, mut cluster: Cluster) -> Result<(), ProxyError> {
916        // Reconcile the legacy single-status `answer_503` field with the
917        // new map. The new map wins on collision.
918        let mut overrides = cluster.answers.clone();
919        if let Some(answer_503) = cluster.answer_503.take() {
920            overrides.entry("503".to_owned()).or_insert(answer_503);
921        }
922        if !overrides.is_empty() {
923            for listener in self.listeners.values() {
924                listener
925                    .borrow()
926                    .answers
927                    .borrow_mut()
928                    .add_cluster_answers(&cluster.cluster_id, &overrides)
929                    .map_err(|(name, error)| {
930                        ProxyError::AddCluster(ListenerError::TemplateParse(name, error))
931                    })?;
932            }
933        }
934        self.clusters.insert(cluster.cluster_id.clone(), cluster);
935        Ok(())
936    }
937
938    pub fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), ProxyError> {
939        self.clusters.remove(cluster_id);
940
941        for listener in self.listeners.values() {
942            listener
943                .borrow()
944                .answers
945                .borrow_mut()
946                .remove_cluster_answers(cluster_id);
947        }
948        Ok(())
949    }
950
951    pub fn add_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
952        // RFC 6797 §7.2: `Strict-Transport-Security` MUST NOT be sent over
953        // plaintext HTTP. Reject any AddHttpFrontend that ships an enabled
954        // HSTS policy before it ever touches the routing trie. This is
955        // defense in depth on top of the TOML config-load reject in
956        // `command/src/config.rs`; sites that build a `RequestHttpFrontend`
957        // outside the TOML path (`sozu frontend http add`, programmatic
958        // IPC senders) are caught here.
959        // Reject ANY hsts field on a plain-HTTP frontend, not just
960        // `enabled = true`. There is no listener-default HSTS to inherit
961        // on an HTTP listener (the field doesn't exist on
962        // `HttpListenerConfig`), so the explicit-disable signal
963        // (`enabled = false`) has nothing to suppress on this surface.
964        // Carrying any `hsts` field here is a misconfiguration rather
965        // than a deliberate choice.
966        if front.hsts.is_some() {
967            incr!(names::http::HSTS_SUPPRESSED_PLAINTEXT);
968            return Err(ProxyError::HstsOnPlainHttp(front.address.into()));
969        }
970
971        let front = front.clone().to_frontend().map_err(|request_error| {
972            ProxyError::WrongInputFrontend {
973                front: Box::new(front),
974                error: request_error.to_string(),
975            }
976        })?;
977
978        let mut listener = self
979            .listeners
980            .values()
981            .find(|l| l.borrow().address == front.address)
982            .ok_or(ProxyError::NoListenerFound(front.address))?
983            .borrow_mut();
984
985        let hostname = front.hostname.to_owned();
986        let tags = front.tags.to_owned();
987
988        listener
989            .add_http_front(front)
990            .map_err(ProxyError::AddFrontend)?;
991        listener.set_tags(hostname, tags);
992        Ok(())
993    }
994
995    pub fn remove_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
996        let front = front.clone().to_frontend().map_err(|request_error| {
997            ProxyError::WrongInputFrontend {
998                front: Box::new(front),
999                error: request_error.to_string(),
1000            }
1001        })?;
1002
1003        let mut listener = self
1004            .listeners
1005            .values()
1006            .find(|l| l.borrow().address == front.address)
1007            .ok_or(ProxyError::NoListenerFound(front.address))?
1008            .borrow_mut();
1009
1010        let hostname = front.hostname.to_owned();
1011
1012        listener
1013            .remove_http_front(front)
1014            .map_err(ProxyError::RemoveFrontend)?;
1015
1016        if !listener.fronts.has_hostname(&hostname) {
1017            listener.set_tags(hostname, None);
1018        }
1019        Ok(())
1020    }
1021
1022    pub fn soft_stop(&mut self) -> Result<(), ProxyError> {
1023        let listeners: HashMap<_, _> = self.listeners.drain().collect();
1024        let mut socket_errors = vec![];
1025        for (_, l) in listeners.iter() {
1026            if let Some(mut sock) = l.borrow_mut().listener.take() {
1027                debug!("{} deregistering socket {:?}", log_module_context!(), sock);
1028                if let Err(e) = self.registry.deregister(&mut sock) {
1029                    let error = format!("socket {sock:?}: {e:?}");
1030                    socket_errors.push(error);
1031                }
1032            }
1033        }
1034
1035        if !socket_errors.is_empty() {
1036            return Err(ProxyError::SoftStop {
1037                proxy_protocol: "HTTP".to_string(),
1038                error: format!("Error deregistering listen sockets: {socket_errors:?}"),
1039            });
1040        }
1041
1042        Ok(())
1043    }
1044
1045    pub fn hard_stop(&mut self) -> Result<(), ProxyError> {
1046        let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1047        let mut socket_errors = vec![];
1048        for (_, l) in listeners.drain() {
1049            if let Some(mut sock) = l.borrow_mut().listener.take() {
1050                debug!("{} deregistering socket {:?}", log_module_context!(), sock);
1051                if let Err(e) = self.registry.deregister(&mut sock) {
1052                    let error = format!("socket {sock:?}: {e:?}");
1053                    socket_errors.push(error);
1054                }
1055            }
1056        }
1057
1058        if !socket_errors.is_empty() {
1059            return Err(ProxyError::HardStop {
1060                proxy_protocol: "HTTP".to_string(),
1061                error: format!("Error deregistering listen sockets: {socket_errors:?}"),
1062            });
1063        }
1064
1065        Ok(())
1066    }
1067}
1068
1069impl HttpListener {
1070    pub fn new(config: HttpListenerConfig, token: Token) -> Result<HttpListener, ListenerError> {
1071        Ok(HttpListener {
1072            active: false,
1073            address: config.address.into(),
1074            answers: Rc::new(RefCell::new({
1075                // Reconcile the legacy `http_answers` per-status fields
1076                // with the new template map: the new map wins on
1077                // collision, the legacy fields fill in any status the
1078                // operator hasn't yet migrated.
1079                let mut answers_map = config.answers.clone();
1080                if let Some(ref legacy) = config.http_answers {
1081                    crate::protocol::http::answers::merge_legacy_into_map(&mut answers_map, legacy);
1082                }
1083                HttpAnswers::new(&answers_map)
1084                    .map_err(|(name, error)| ListenerError::TemplateParse(name, error))?
1085            })),
1086            config,
1087            fronts: Router::new(),
1088            listener: None,
1089            tags: BTreeMap::new(),
1090            token,
1091        })
1092    }
1093
1094    pub fn activate(
1095        &mut self,
1096        registry: &Registry,
1097        tcp_listener: Option<MioTcpListener>,
1098    ) -> Result<Token, ListenerError> {
1099        if self.active {
1100            return Ok(self.token);
1101        }
1102        let address: SocketAddr = self.config.address.into();
1103
1104        let mut listener = match tcp_listener {
1105            Some(tcp_listener) => tcp_listener,
1106            None => {
1107                server_bind(address).map_err(|server_bind_error| ListenerError::Activation {
1108                    address,
1109                    error: server_bind_error.to_string(),
1110                })?
1111            }
1112        };
1113
1114        registry
1115            .register(&mut listener, self.token, Interest::READABLE)
1116            .map_err(ListenerError::SocketRegistration)?;
1117
1118        self.listener = Some(listener);
1119        self.active = true;
1120        Ok(self.token)
1121    }
1122
1123    /// Apply a partial-update patch to this listener's live configuration.
1124    ///
1125    /// Fields absent in the patch (i.e. `None`) are preserved unchanged.
1126    /// If `http_answers` is present only the listener-default templates are
1127    /// replaced; per-cluster overrides in `cluster_custom_answers` are kept.
1128    pub fn update_config(&mut self, patch: &UpdateHttpListenerConfig) -> Result<(), ListenerError> {
1129        // Defense-in-depth validation: main-process ConfigState::dispatch
1130        // validates before scatter, but a raw protobuf client or state replay
1131        // may reach the worker without that check. `StateError` lifts into
1132        // `ListenerError` via `From` so `?` suffices.
1133        validate_h2_flood_knobs_http(patch)?;
1134        if let Some(ref hdr) = patch.sozu_id_header {
1135            validate_sozu_id_header(hdr)?;
1136        }
1137
1138        if let Some(v) = patch.public_address {
1139            self.config.public_address = Some(v);
1140        }
1141        if let Some(v) = patch.expect_proxy {
1142            self.config.expect_proxy = v;
1143        }
1144        if let Some(ref v) = patch.sticky_name {
1145            self.config.sticky_name = v.to_owned();
1146        }
1147        if let Some(v) = patch.front_timeout {
1148            self.config.front_timeout = v;
1149        }
1150        if let Some(v) = patch.back_timeout {
1151            self.config.back_timeout = v;
1152        }
1153        if let Some(v) = patch.connect_timeout {
1154            self.config.connect_timeout = v;
1155        }
1156        if let Some(v) = patch.request_timeout {
1157            self.config.request_timeout = v;
1158        }
1159        if let Some(ref v) = patch.sozu_id_header {
1160            self.config.sozu_id_header = Some(v.to_owned());
1161        }
1162        if let Some(v) = patch.elide_x_real_ip {
1163            self.config.elide_x_real_ip = Some(v);
1164        }
1165        if let Some(v) = patch.send_x_real_ip {
1166            self.config.send_x_real_ip = Some(v);
1167        }
1168
1169        // H2 flood knobs
1170        if let Some(v) = patch.h2_max_rst_stream_per_window {
1171            self.config.h2_max_rst_stream_per_window = Some(v);
1172        }
1173        if let Some(v) = patch.h2_max_ping_per_window {
1174            self.config.h2_max_ping_per_window = Some(v);
1175        }
1176        if let Some(v) = patch.h2_max_settings_per_window {
1177            self.config.h2_max_settings_per_window = Some(v);
1178        }
1179        if let Some(v) = patch.h2_max_empty_data_per_window {
1180            self.config.h2_max_empty_data_per_window = Some(v);
1181        }
1182        if let Some(v) = patch.h2_max_continuation_frames {
1183            self.config.h2_max_continuation_frames = Some(v);
1184        }
1185        if let Some(v) = patch.h2_max_glitch_count {
1186            self.config.h2_max_glitch_count = Some(v);
1187        }
1188        if let Some(v) = patch.h2_initial_connection_window {
1189            self.config.h2_initial_connection_window = Some(v);
1190        }
1191        if let Some(v) = patch.h2_max_concurrent_streams {
1192            self.config.h2_max_concurrent_streams = Some(v);
1193        }
1194        if let Some(v) = patch.h2_stream_shrink_ratio {
1195            self.config.h2_stream_shrink_ratio = Some(v);
1196        }
1197        if let Some(v) = patch.h2_max_rst_stream_lifetime {
1198            self.config.h2_max_rst_stream_lifetime = Some(v);
1199        }
1200        if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
1201            self.config.h2_max_rst_stream_abusive_lifetime = Some(v);
1202        }
1203        if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
1204            self.config.h2_max_rst_stream_emitted_lifetime = Some(v);
1205        }
1206        if let Some(v) = patch.h2_max_header_list_size {
1207            self.config.h2_max_header_list_size = Some(v);
1208        }
1209        if let Some(v) = patch.h2_max_header_table_size {
1210            self.config.h2_max_header_table_size = Some(v);
1211        }
1212        if let Some(v) = patch.h2_stream_idle_timeout_seconds {
1213            self.config.h2_stream_idle_timeout_seconds = Some(v);
1214        }
1215        if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
1216            self.config.h2_graceful_shutdown_deadline_seconds = Some(v);
1217        }
1218        if let Some(v) = patch.h2_max_window_update_stream0_per_window {
1219            self.config.h2_max_window_update_stream0_per_window = Some(v);
1220        }
1221
1222        // HTTP answers: merge legacy `http_answers` and the new `answers`
1223        // map on top of the existing config, then rebuild the listener-level
1224        // template registry. Per-cluster overrides in
1225        // `HttpAnswers::cluster_answers` are preserved across the rebuild.
1226        let answers_changed = patch.http_answers.is_some() || !patch.answers.is_empty();
1227        if answers_changed {
1228            if let Some(ref new_answers) = patch.http_answers {
1229                crate::sozu_command::state::merge_custom_http_answers(
1230                    &mut self.config.http_answers,
1231                    new_answers,
1232                );
1233            }
1234            for (code, body) in &patch.answers {
1235                if !body.is_empty() {
1236                    self.config.answers.insert(code.clone(), body.clone());
1237                }
1238            }
1239
1240            let mut answers_map = self.config.answers.clone();
1241            if let Some(ref legacy) = self.config.http_answers {
1242                crate::protocol::http::answers::merge_legacy_into_map(&mut answers_map, legacy);
1243            }
1244            // Rebuild the listener-level templates and migrate the existing
1245            // per-cluster overrides over to the new `HttpAnswers`.
1246            let mut new_answers = HttpAnswers::new(&answers_map)
1247                .map_err(|(name, error)| ListenerError::TemplateParse(name, error))?;
1248            let preserved = std::mem::take(&mut self.answers.borrow_mut().cluster_answers);
1249            new_answers.cluster_answers = preserved;
1250            *self.answers.borrow_mut() = new_answers;
1251        }
1252
1253        Ok(())
1254    }
1255
1256    pub fn add_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
1257        self.fronts
1258            .add_http_front(&http_front)
1259            .map_err(ListenerError::AddFrontend)
1260    }
1261
1262    pub fn remove_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
1263        debug!(
1264            "{} removing http_front {:?}",
1265            log_module_context!(),
1266            http_front
1267        );
1268        self.fronts
1269            .remove_http_front(&http_front)
1270            .map_err(ListenerError::RemoveFrontend)
1271    }
1272
1273    fn accept(&mut self) -> Result<TcpStream, AcceptError> {
1274        if let Some(ref sock) = self.listener {
1275            sock.accept()
1276                .map_err(|e| match e.kind() {
1277                    ErrorKind::WouldBlock => AcceptError::WouldBlock,
1278                    _ => {
1279                        error!("{} accept() IO error: {:?}", log_module_context!(), e);
1280                        AcceptError::IoError
1281                    }
1282                })
1283                .map(|(sock, _)| sock)
1284        } else {
1285            error!(
1286                "{} cannot accept connections, no listening socket available",
1287                log_module_context!()
1288            );
1289            Err(AcceptError::IoError)
1290        }
1291    }
1292}
1293
1294impl ProxyConfiguration for HttpProxy {
1295    fn notify(&mut self, request: WorkerRequest) -> WorkerResponse {
1296        let request_id = request.id.clone();
1297
1298        let result = match request.content.request_type {
1299            Some(RequestType::AddCluster(cluster)) => {
1300                debug!(
1301                    "{} {} add cluster {:?}",
1302                    log_module_context!(),
1303                    request.id,
1304                    cluster
1305                );
1306                self.add_cluster(cluster)
1307            }
1308            Some(RequestType::RemoveCluster(cluster_id)) => {
1309                debug!(
1310                    "{} {} remove cluster {:?}",
1311                    log_module_context!(),
1312                    request_id,
1313                    cluster_id
1314                );
1315                self.remove_cluster(&cluster_id)
1316            }
1317            Some(RequestType::AddHttpFrontend(front)) => {
1318                debug!(
1319                    "{} {} add front {:?}",
1320                    log_module_context!(),
1321                    request_id,
1322                    front
1323                );
1324                self.add_http_frontend(front)
1325            }
1326            Some(RequestType::RemoveHttpFrontend(front)) => {
1327                debug!(
1328                    "{} {} remove front {:?}",
1329                    log_module_context!(),
1330                    request_id,
1331                    front
1332                );
1333                self.remove_http_frontend(front)
1334            }
1335            Some(RequestType::RemoveListener(remove)) => {
1336                debug!(
1337                    "{} removing HTTP listener at address {:?}",
1338                    log_module_context!(),
1339                    remove.address
1340                );
1341                self.remove_listener(remove)
1342            }
1343            Some(RequestType::SoftStop(_)) => {
1344                debug!(
1345                    "{} {} processing soft shutdown",
1346                    log_module_context!(),
1347                    request_id
1348                );
1349                match self.soft_stop() {
1350                    Ok(()) => {
1351                        info!(
1352                            "{} {} soft stop successful",
1353                            log_module_context!(),
1354                            request_id
1355                        );
1356                        return WorkerResponse::processing(request.id);
1357                    }
1358                    Err(e) => Err(e),
1359                }
1360            }
1361            Some(RequestType::HardStop(_)) => {
1362                debug!(
1363                    "{} {} processing hard shutdown",
1364                    log_module_context!(),
1365                    request_id
1366                );
1367                match self.hard_stop() {
1368                    Ok(()) => {
1369                        info!(
1370                            "{} {} hard stop successful",
1371                            log_module_context!(),
1372                            request_id
1373                        );
1374                        return WorkerResponse::processing(request.id);
1375                    }
1376                    Err(e) => Err(e),
1377                }
1378            }
1379            Some(RequestType::Status(_)) => {
1380                debug!("{} {} status", log_module_context!(), request_id);
1381                Ok(())
1382            }
1383            other_command => {
1384                debug!(
1385                    "{} {} unsupported message for HTTP proxy, ignoring: {:?}",
1386                    log_module_context!(),
1387                    request.id,
1388                    other_command
1389                );
1390                Err(ProxyError::UnsupportedMessage)
1391            }
1392        };
1393
1394        match result {
1395            Ok(()) => {
1396                debug!("{} {} successful", log_module_context!(), request_id);
1397                WorkerResponse::ok(request_id)
1398            }
1399            Err(proxy_error) => {
1400                debug!(
1401                    "{} {} unsuccessful: {}",
1402                    log_module_context!(),
1403                    request_id,
1404                    proxy_error
1405                );
1406                WorkerResponse::error(request_id, proxy_error)
1407            }
1408        }
1409    }
1410
1411    fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError> {
1412        if let Some(listener) = self.listeners.get(&Token(token.0)) {
1413            listener.borrow_mut().accept()
1414        } else {
1415            Err(AcceptError::IoError)
1416        }
1417    }
1418
1419    fn create_session(
1420        &mut self,
1421        mut frontend_sock: TcpStream,
1422        listener_token: ListenToken,
1423        wait_time: Duration,
1424        proxy: Rc<RefCell<Self>>,
1425    ) -> Result<(), AcceptError> {
1426        let listener = self
1427            .listeners
1428            .get(&Token(listener_token.0))
1429            .cloned()
1430            .ok_or(AcceptError::IoError)?;
1431
1432        if let Err(e) = frontend_sock.set_nodelay(true) {
1433            error!(
1434                "{} error setting nodelay on front socket({:?}): {:?}",
1435                log_module_context!(),
1436                frontend_sock,
1437                e
1438            );
1439        }
1440        let mut session_manager = self.sessions.borrow_mut();
1441        let session_entry = session_manager.slab.vacant_entry();
1442        let session_token = Token(session_entry.key());
1443        let owned = listener.borrow();
1444
1445        if let Err(register_error) = self.registry.register(
1446            &mut frontend_sock,
1447            session_token,
1448            Interest::READABLE | Interest::WRITABLE,
1449        ) {
1450            error!(
1451                "{} error registering listen socket({:?}): {:?}",
1452                log_module_context!(),
1453                frontend_sock,
1454                register_error
1455            );
1456            return Err(AcceptError::RegisterError);
1457        }
1458
1459        let public_address: SocketAddr = match owned.config.public_address {
1460            Some(pub_addr) => pub_addr.into(),
1461            None => owned.config.address.into(),
1462        };
1463
1464        let session = HttpSession::new(
1465            Duration::from_secs(owned.config.back_timeout as u64),
1466            Duration::from_secs(owned.config.connect_timeout as u64),
1467            Duration::from_secs(owned.config.front_timeout as u64),
1468            Duration::from_secs(owned.config.request_timeout as u64),
1469            owned.config.expect_proxy,
1470            listener.clone(),
1471            Rc::downgrade(&self.pool),
1472            proxy,
1473            public_address,
1474            frontend_sock,
1475            session_token,
1476            wait_time,
1477        )?;
1478
1479        let session = Rc::new(RefCell::new(session));
1480        session_entry.insert(session);
1481
1482        Ok(())
1483    }
1484}
1485
1486impl L7Proxy for HttpProxy {
1487    fn kind(&self) -> ListenerType {
1488        ListenerType::Http
1489    }
1490
1491    fn register_socket(
1492        &self,
1493        source: &mut TcpStream,
1494        token: Token,
1495        interest: Interest,
1496    ) -> Result<(), std::io::Error> {
1497        self.registry.register(source, token, interest)
1498    }
1499
1500    fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error> {
1501        self.registry.deregister(tcp_stream)
1502    }
1503
1504    fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token {
1505        let mut session_manager = self.sessions.borrow_mut();
1506        let entry = session_manager.slab.vacant_entry();
1507        let token = Token(entry.key());
1508        let _entry = entry.insert(session);
1509        token
1510    }
1511
1512    fn remove_session(&self, token: Token) -> bool {
1513        let mut sessions = self.sessions.borrow_mut();
1514        // Drain the session's `(cluster, ip)` accounting before the slab
1515        // slot is freed — once the slot is reused for a new session the
1516        // token would otherwise alias an unrelated set of entries. No-op
1517        // when the session never tracked anything (feature disabled, or
1518        // no request reached `Router::connect`).
1519        sessions.untrack_all_cluster_ip(token);
1520        sessions.slab.try_remove(token.0).is_some()
1521    }
1522
1523    fn backends(&self) -> Rc<RefCell<BackendMap>> {
1524        self.backends.clone()
1525    }
1526
1527    fn clusters(&self) -> &HashMap<ClusterId, Cluster> {
1528        &self.clusters
1529    }
1530
1531    fn sessions(&self) -> Rc<RefCell<SessionManager>> {
1532        self.sessions.clone()
1533    }
1534}
1535
1536pub mod testing {
1537    use crate::testing::*;
1538
1539    /// this function is not used, but is available for example and testing purposes
1540    pub fn start_http_worker(
1541        config: HttpListenerConfig,
1542        channel: ProxyChannel,
1543        max_buffers: usize,
1544        buffer_size: usize,
1545    ) -> anyhow::Result<()> {
1546        let address = config.address.into();
1547
1548        let ServerParts {
1549            event_loop,
1550            registry,
1551            sessions,
1552            pool,
1553            backends,
1554            client_scm_socket: _,
1555            server_scm_socket,
1556            server_config,
1557        } = prebuild_server(max_buffers, buffer_size, true)?;
1558
1559        let token = {
1560            let mut sessions = sessions.borrow_mut();
1561            let entry = sessions.slab.vacant_entry();
1562            let key = entry.key();
1563            let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1564                protocol: Protocol::HTTPListen,
1565            })));
1566            Token(key)
1567        };
1568
1569        let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1570        proxy
1571            .add_listener(config, token)
1572            .with_context(|| "Failed at creating adding the listener")?;
1573        proxy
1574            .activate_listener(&address, None)
1575            .with_context(|| "Failed at creating activating the listener")?;
1576
1577        let mut server = Server::new(
1578            event_loop,
1579            channel,
1580            server_scm_socket,
1581            sessions,
1582            pool,
1583            backends,
1584            Some(proxy),
1585            None,
1586            None,
1587            server_config,
1588            None,
1589            false,
1590        )
1591        .with_context(|| "Failed at creating server")?;
1592
1593        debug!("{} starting event loop", log_module_context!());
1594        server.run();
1595        debug!("{} ending event loop", log_module_context!());
1596        Ok(())
1597    }
1598}
1599
1600#[cfg(test)]
1601mod tests {
1602    extern crate tiny_http;
1603
1604    use std::{
1605        io::{Read, Write},
1606        net::TcpStream,
1607        str,
1608        sync::{Arc, Barrier},
1609        thread,
1610        time::Duration,
1611    };
1612
1613    use sozu_command::proto::command::SocketAddress;
1614
1615    use super::{testing::start_http_worker, *};
1616    use crate::sozu_command::{
1617        channel::Channel,
1618        config::ListenerBuilder,
1619        proto::command::{
1620            LoadBalancingParams, PathRule, RulePosition, SoftStop, WorkerRequest,
1621            request::RequestType,
1622        },
1623        response::{Backend, HttpFrontend},
1624    };
1625
1626    /*
1627    #[test]
1628    #[cfg(target_pointer_width = "64")]
1629    fn size_test() {
1630      assert_size!(ExpectProxyProtocol<mio::net::TcpStream>, 520);
1631      assert_size!(Http<mio::net::TcpStream>, 1232);
1632      assert_size!(Pipe<mio::net::TcpStream>, 272);
1633      assert_size!(State, 1240);
1634      // fails depending on the platform?
1635      assert_size!(Session, 1592);
1636    }
1637    */
1638
1639    #[test]
1640    fn round_trip() {
1641        setup_test_logger!();
1642        let front_port = crate::testing::provide_port();
1643        let backend_server = Arc::new(
1644            tiny_http::Server::http("127.0.0.1:0").expect("could not create tiny_http server"),
1645        );
1646        let backend_port = backend_server
1647            .server_addr()
1648            .to_ip()
1649            .expect("tiny_http server should bind to IP address")
1650            .port();
1651
1652        let barrier = Arc::new(Barrier::new(2));
1653
1654        let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, front_port))
1655            .to_http(None)
1656            .expect("could not create listener config");
1657
1658        let (mut command, channel) =
1659            Channel::generate(1000, 10000).expect("should create a channel");
1660
1661        thread::scope(|s| {
1662            let backend_handle = backend_server.clone();
1663            let barrier_clone = barrier.to_owned();
1664            s.spawn(move || {
1665                setup_test_logger!();
1666                start_server(&backend_handle, barrier_clone);
1667            });
1668            barrier.wait();
1669
1670            s.spawn(move || {
1671                setup_test_logger!();
1672                start_http_worker(config, channel, 10, 16384)
1673                    .expect("could not start the http server");
1674            });
1675
1676            let front = RequestHttpFrontend {
1677                cluster_id: Some("cluster_1".to_owned()),
1678                address: SocketAddress::new_v4(127, 0, 0, 1, front_port),
1679                hostname: "localhost".to_owned(),
1680                path: PathRule::prefix("/".to_owned()),
1681                ..Default::default()
1682            };
1683            command
1684                .write_message(&WorkerRequest {
1685                    id: "ID_ABCD".to_owned(),
1686                    content: RequestType::AddHttpFrontend(front).into(),
1687                })
1688                .expect("could not send AddHttpFrontend");
1689            let backend = Backend {
1690                cluster_id: "cluster_1".to_owned(),
1691                backend_id: "cluster_1-0".to_owned(),
1692                address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
1693                load_balancing_parameters: Some(LoadBalancingParams::default()),
1694                sticky_id: None,
1695                backup: None,
1696            };
1697            command
1698                .write_message(&WorkerRequest {
1699                    id: "ID_EFGH".to_owned(),
1700                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
1701                })
1702                .expect("could not send AddBackend");
1703
1704            println!("test received: {:?}", command.read_message());
1705            println!("test received: {:?}", command.read_message());
1706
1707            let mut client =
1708                TcpStream::connect(("127.0.0.1", front_port)).expect("could not connect to sozu");
1709
1710            client
1711                .set_read_timeout(Some(Duration::new(1, 0)))
1712                .expect("could not set read timeout");
1713            let request = format!(
1714                "GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\nConnection: Close\r\n\r\n"
1715            );
1716            let w = client.write(request.as_bytes());
1717            println!("http client write: {w:?}");
1718
1719            barrier.wait();
1720            let mut buffer = [0; 4096];
1721            let mut index = 0;
1722
1723            // tiny_http responds with exactly 191 bytes for a "hello world" body
1724            // (headers + body). This is deterministic for a given tiny_http version.
1725            let expected_len = 191;
1726
1727            loop {
1728                assert!(index <= expected_len);
1729                if index == expected_len {
1730                    break;
1731                }
1732
1733                let r = client.read(&mut buffer[index..]);
1734                println!("http client read: {r:?}");
1735                match r {
1736                    Err(e) => panic!("client request should not fail. Error: {e:?}"),
1737                    Ok(sz) => {
1738                        index += sz;
1739                    }
1740                }
1741            }
1742            println!(
1743                "Response: {}",
1744                str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1745            );
1746
1747            // Gracefully stop the sozu worker so the scoped thread can join
1748            command
1749                .write_message(&WorkerRequest {
1750                    id: "ID_STOP".to_owned(),
1751                    content: RequestType::SoftStop(SoftStop {}).into(),
1752                })
1753                .expect("could not send SoftStop");
1754            // Unblock the backend server so its thread can exit
1755            backend_server.unblock();
1756        });
1757    }
1758
1759    #[test]
1760    fn keep_alive() {
1761        setup_test_logger!();
1762        let front_port = crate::testing::provide_port();
1763        let backend_server = Arc::new(
1764            tiny_http::Server::http("127.0.0.1:0").expect("could not create tiny_http server"),
1765        );
1766        let backend_port = backend_server
1767            .server_addr()
1768            .to_ip()
1769            .expect("tiny_http server should bind to IP address")
1770            .port();
1771
1772        let barrier = Arc::new(Barrier::new(2));
1773
1774        let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, front_port))
1775            .to_http(None)
1776            .expect("could not create listener config");
1777
1778        let (mut command, channel) =
1779            Channel::generate(1000, 10000).expect("should create a channel");
1780
1781        thread::scope(|s| {
1782            let backend_handle = backend_server.clone();
1783            let barrier_clone = barrier.to_owned();
1784            s.spawn(move || {
1785                setup_test_logger!();
1786                start_server(&backend_handle, barrier_clone);
1787            });
1788            barrier.wait();
1789
1790            s.spawn(move || {
1791                setup_test_logger!();
1792                start_http_worker(config, channel, 10, 16384)
1793                    .expect("could not start the http server");
1794            });
1795
1796            let front = RequestHttpFrontend {
1797                address: SocketAddress::new_v4(127, 0, 0, 1, front_port),
1798                hostname: "localhost".to_owned(),
1799                path: PathRule::prefix("/".to_owned()),
1800                cluster_id: Some("cluster_1".to_owned()),
1801                ..Default::default()
1802            };
1803            command
1804                .write_message(&WorkerRequest {
1805                    id: "ID_ABCD".to_owned(),
1806                    content: RequestType::AddHttpFrontend(front).into(),
1807                })
1808                .expect("could not send AddHttpFrontend");
1809            let backend = Backend {
1810                address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
1811                backend_id: "cluster_1-0".to_owned(),
1812                backup: None,
1813                cluster_id: "cluster_1".to_owned(),
1814                load_balancing_parameters: Some(LoadBalancingParams::default()),
1815                sticky_id: None,
1816            };
1817            command
1818                .write_message(&WorkerRequest {
1819                    id: "ID_EFGH".to_owned(),
1820                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
1821                })
1822                .expect("could not send AddBackend");
1823
1824            println!("test received: {:?}", command.read_message());
1825            println!("test received: {:?}", command.read_message());
1826
1827            let mut client =
1828                TcpStream::connect(("127.0.0.1", front_port)).expect("could not connect to sozu");
1829            client
1830                .set_read_timeout(Some(Duration::new(5, 0)))
1831                .expect("could not set read timeout");
1832
1833            // tiny_http responds with exactly 191 bytes for a "hello world" body
1834            // (headers + body). This is deterministic for a given tiny_http version.
1835            let expected_len = 191;
1836
1837            let request = format!("GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\n\r\n");
1838            let w = client
1839                .write(request.as_bytes())
1840                .expect("could not write first request");
1841            println!("http client write: {w:?}");
1842            barrier.wait();
1843
1844            let mut buffer = [0; 4096];
1845            let mut index = 0;
1846
1847            loop {
1848                assert!(index <= expected_len);
1849                if index == expected_len {
1850                    break;
1851                }
1852
1853                let r = client.read(&mut buffer[index..]);
1854                println!("http client read: {r:?}");
1855                match r {
1856                    Err(e) => panic!("client request should not fail. Error: {e:?}"),
1857                    Ok(sz) => {
1858                        index += sz;
1859                    }
1860                }
1861            }
1862
1863            println!(
1864                "Response: {}",
1865                str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1866            );
1867
1868            println!("first request ended, will send second one");
1869            let request2 = format!("GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\n\r\n");
1870            let w2 = client.write(request2.as_bytes());
1871            println!("http client write: {w2:?}");
1872            barrier.wait();
1873
1874            let mut buffer2 = [0; 4096];
1875            let mut index = 0;
1876
1877            loop {
1878                assert!(index <= expected_len);
1879                if index == expected_len {
1880                    break;
1881                }
1882
1883                let r2 = client.read(&mut buffer2[index..]);
1884                println!("http client read: {r2:?}");
1885                match r2 {
1886                    Err(e) => panic!("client request should not fail. Error: {e:?}"),
1887                    Ok(sz) => {
1888                        index += sz;
1889                    }
1890                }
1891            }
1892            println!(
1893                "Response: {}",
1894                str::from_utf8(&buffer2[..index]).expect("could not make string from buffer")
1895            );
1896
1897            // Gracefully stop the sozu worker so the scoped thread can join
1898            command
1899                .write_message(&WorkerRequest {
1900                    id: "ID_STOP".to_owned(),
1901                    content: RequestType::SoftStop(SoftStop {}).into(),
1902                })
1903                .expect("could not send SoftStop");
1904            // Unblock the backend server so its thread can exit
1905            backend_server.unblock();
1906        });
1907    }
1908
1909    use self::tiny_http::Response;
1910
1911    fn start_server(server: &tiny_http::Server, barrier: Arc<Barrier>) {
1912        let addr = server.server_addr();
1913        info!("starting web server on {:?}", addr);
1914        barrier.wait();
1915
1916        for request in server.incoming_requests() {
1917            info!(
1918                "backend web server got request -> method: {:?}, url: {:?}, headers: {:?}",
1919                request.method(),
1920                request.url(),
1921                request.headers()
1922            );
1923
1924            let response = Response::from_string("hello world");
1925            request
1926                .respond(response)
1927                .expect("could not respond to request");
1928            info!("backend web server sent response");
1929            barrier.wait();
1930            info!("server session stopped");
1931        }
1932
1933        println!("server on {addr:?} closed");
1934    }
1935
1936    #[test]
1937    fn frontend_from_request_test() {
1938        let cluster_id1 = "cluster_1".to_owned();
1939        let cluster_id2 = "cluster_2".to_owned();
1940        let cluster_id3 = "cluster_3".to_owned();
1941        let uri1 = "/".to_owned();
1942        let uri2 = "/yolo".to_owned();
1943        let uri3 = "/yolo/swag".to_owned();
1944
1945        let mut fronts = Router::new();
1946        fronts
1947            .add_http_front(&HttpFrontend {
1948                address: "0.0.0.0:80".parse().unwrap(),
1949                hostname: "lolcatho.st".to_owned(),
1950                method: None,
1951                path: PathRule::prefix(uri1),
1952                position: RulePosition::Tree,
1953                cluster_id: Some(cluster_id1),
1954                tags: None,
1955                redirect: None,
1956                redirect_scheme: None,
1957                redirect_template: None,
1958                rewrite_host: None,
1959                rewrite_path: None,
1960                rewrite_port: None,
1961                required_auth: None,
1962                headers: Vec::new(),
1963                hsts: None,
1964            })
1965            .expect("Could not add http frontend");
1966        fronts
1967            .add_http_front(&HttpFrontend {
1968                address: "0.0.0.0:80".parse().unwrap(),
1969                hostname: "lolcatho.st".to_owned(),
1970                method: None,
1971                path: PathRule::prefix(uri2),
1972                position: RulePosition::Tree,
1973                cluster_id: Some(cluster_id2),
1974                tags: None,
1975                redirect: None,
1976                redirect_scheme: None,
1977                redirect_template: None,
1978                rewrite_host: None,
1979                rewrite_path: None,
1980                rewrite_port: None,
1981                required_auth: None,
1982                headers: Vec::new(),
1983                hsts: None,
1984            })
1985            .expect("Could not add http frontend");
1986        fronts
1987            .add_http_front(&HttpFrontend {
1988                address: "0.0.0.0:80".parse().unwrap(),
1989                hostname: "lolcatho.st".to_owned(),
1990                method: None,
1991                path: PathRule::prefix(uri3),
1992                position: RulePosition::Tree,
1993                cluster_id: Some(cluster_id3),
1994                tags: None,
1995                redirect: None,
1996                redirect_scheme: None,
1997                redirect_template: None,
1998                rewrite_host: None,
1999                rewrite_path: None,
2000                rewrite_port: None,
2001                required_auth: None,
2002                headers: Vec::new(),
2003                hsts: None,
2004            })
2005            .expect("Could not add http frontend");
2006        fronts
2007            .add_http_front(&HttpFrontend {
2008                address: "0.0.0.0:80".parse().unwrap(),
2009                hostname: "other.domain".to_owned(),
2010                method: None,
2011                path: PathRule::prefix("/test".to_owned()),
2012                position: RulePosition::Tree,
2013                cluster_id: Some("cluster_1".to_owned()),
2014                tags: None,
2015                redirect: None,
2016                redirect_scheme: None,
2017                redirect_template: None,
2018                rewrite_host: None,
2019                rewrite_path: None,
2020                rewrite_port: None,
2021                required_auth: None,
2022                headers: Vec::new(),
2023                hsts: None,
2024            })
2025            .expect("Could not add http frontend");
2026
2027        let address = SocketAddress::new_v4(127, 0, 0, 1, 1030);
2028
2029        let default_config = ListenerBuilder::new_http(address)
2030            .to_http(None)
2031            .expect("Could not create default HTTP listener config");
2032
2033        let listener = HttpListener {
2034            listener: None,
2035            address: address.into(),
2036            fronts,
2037            answers: Rc::new(RefCell::new(HttpAnswers::new(&BTreeMap::new()).unwrap())),
2038            config: default_config,
2039            token: Token(0),
2040            active: true,
2041            tags: BTreeMap::new(),
2042        };
2043
2044        let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get);
2045        let frontend2 = listener.frontend_from_request("lolcatho.st", "/test", &Method::Get);
2046        let frontend3 = listener.frontend_from_request("lolcatho.st", "/yolo/test", &Method::Get);
2047        let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get);
2048        let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get);
2049        assert_eq!(
2050            frontend1
2051                .expect("should find frontend")
2052                .cluster_id
2053                .as_deref(),
2054            Some("cluster_1")
2055        );
2056        assert_eq!(
2057            frontend2
2058                .expect("should find frontend")
2059                .cluster_id
2060                .as_deref(),
2061            Some("cluster_1")
2062        );
2063        assert_eq!(
2064            frontend3
2065                .expect("should find frontend")
2066                .cluster_id
2067                .as_deref(),
2068            Some("cluster_2")
2069        );
2070        assert_eq!(
2071            frontend4
2072                .expect("should find frontend")
2073                .cluster_id
2074                .as_deref(),
2075            Some("cluster_3")
2076        );
2077        assert!(frontend5.is_err());
2078    }
2079
2080    #[test]
2081    fn h2_stream_idle_timeout_inherits_back_timeout() {
2082        let address = SocketAddress::new_v4(127, 0, 0, 1, 1040);
2083        let build = |back_timeout: u32, explicit: Option<u32>| -> HttpListener {
2084            let mut cfg = ListenerBuilder::new_http(address)
2085                .to_http(None)
2086                .expect("default HTTP listener config");
2087            cfg.back_timeout = back_timeout;
2088            cfg.h2_stream_idle_timeout_seconds = explicit;
2089            HttpListener::new(cfg, Token(0)).expect("build listener")
2090        };
2091
2092        // Knob unset: inherit back_timeout when it exceeds the 30s floor.
2093        assert_eq!(
2094            build(180, None).get_h2_stream_idle_timeout(),
2095            Duration::from_secs(180)
2096        );
2097
2098        // Knob unset, back_timeout below floor: stay at 30s to preserve the
2099        // slow-multiplex Slowloris mitigation.
2100        assert_eq!(
2101            build(5, None).get_h2_stream_idle_timeout(),
2102            Duration::from_secs(30)
2103        );
2104
2105        // Explicit values win in both directions — including below the floor,
2106        // so operators under attack can tighten the deadline.
2107        assert_eq!(
2108            build(180, Some(10)).get_h2_stream_idle_timeout(),
2109            Duration::from_secs(10)
2110        );
2111        assert_eq!(
2112            build(5, Some(600)).get_h2_stream_idle_timeout(),
2113            Duration::from_secs(600)
2114        );
2115
2116        // `Some(0)` is clamped to 1s to keep the deadline non-degenerate.
2117        assert_eq!(
2118            build(180, Some(0)).get_h2_stream_idle_timeout(),
2119            Duration::from_secs(1)
2120        );
2121    }
2122}