Skip to main content

sozu_lib/
lib.rs

1//! ## What this library does
2//!
3//! This library provides tools to build and start HTTP, HTTPS and TCP reverse proxies.
4//!
5//! The proxies handles network polling, HTTP parsing, TLS in a fast single threaded event
6//! loop.
7//!
8//! Each proxy is designed to receive configuration changes at runtime instead of
9//! reloading from a file regularly. The event loop runs in its own thread
10//! and receives commands through a message queue.
11//!
12//! ## Difference with the crate `sozu`
13//!
14//! To create several workers and manage them all at once (which is the most common way to
15//! use Sōzu), the crate `sozu` is more indicated than using the lib directly.
16//!
17//! The crate `sozu` provides a binary called the main process.
18//! The main process uses `sozu_lib` to start and manage workers.
19//! Each worker can handle HTTP, HTTPS and TCP traffic.
20//! The main process receives synchronizes the state of all workers, using UNIX sockets
21//! and custom channels to communicate with them.
22//! The main process itself is is configurable with a file, and has a CLI.
23//!
24//! ## How to use this library directly
25//!
26//! This documentation here explains how to write a binary that will start a single Sōzu
27//! worker and give it orders. The method has two steps:
28//!
29//! 1. Starts a Sōzu worker in a distinct thread
30//! 2. sends instructions to the worker on a UNIX socket via a Sōzu channel
31//!
32//! ### How to start a Sōzu worker
33//!
34//! Before creating an HTTP proxy, we first need to create an HTTP listener.
35//! The listener is an abstraction around a TCP socket provided by the kernel.
36//! We need the `sozu_command_lib` to build a listener.
37//!
38//! ```
39//! use sozu_command_lib::{config::ListenerBuilder, proto::command::SocketAddress};
40//!
41//! let address = SocketAddress::new_v4(127,0,0,1,8080);
42//! let http_listener = ListenerBuilder::new_http(address)
43//!     .to_http(None)
44//!     .expect("Could not create HTTP listener");
45//! ```
46//!
47//! The `http_listener` is of the type `HttpListenerConfig`, that we can be sent to the worker
48//! to start the proxy.
49//!
50//! Then create a pair of channels to communicate with the proxy.
51//! The channel is a wrapper around a unix socket.
52//!
53//! ```ignore
54//! use sozu_command_lib::{
55//!     channel::Channel,
56//!     proto::command::{WorkerRequest, WorkerResponse},
57//! };
58//!
59//! let (mut command_channel, proxy_channel): (
60//!     Channel<WorkerRequest, WorkerResponse>,
61//!     Channel<WorkerResponse, WorkerRequest>,
62//! ) = Channel::generate(1000, 10000).expect("should create a channel");
63//!```
64//!
65//! Here, the `command_channel` end is blocking, it sends `WorkerRequest`s and receives
66//! `WorkerResponses`, while the `proxy_channel` end is non-blocking, and the types are reversed.
67//! Writing the types here isn't even necessary thanks to the compiler,
68//! but it brings the point accross.
69//!
70//! You can now launch the worker in a separate thread, providing the HTTP listener config,
71//! the proxy end of the channel, and your custom number of buffers and their size:
72//!
73//! ```ignore
74//! use std::thread;
75//!
76//! let worker_thread_join_handle = thread::spawn(move || {
77//!     let max_buffers = 500;
78//!     let buffer_size = 16384;
79//!     sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size);
80//! });
81//! ```
82//!
83//! ### Send orders
84//!
85//! Once the thread is launched, the proxy worker will start its event loop and handle
86//! events on the listening interface and port specified when building the HTTP Listener.
87//! Since no frontends or backends were specified for the proxy, it will receive
88//! the connections, parse the requests, then send a default (but configurable)
89//! answer.
90//!
91//! Before defining a frontend and backends, we need to define a cluster, which describes
92//! a routing configuration. A cluster contains:
93//!
94//! - one frontend
95//! - one or several backends
96//! - routing rules
97//!
98//! A cluster is identified by its `cluster_id`, which will be used to define frontends
99//! and backends later on.
100//!
101//! ```
102//! use sozu_command_lib::proto::command::{Cluster, LoadBalancingAlgorithms};
103//!
104//! let cluster = Cluster {
105//!     cluster_id: "my-cluster".to_string(),
106//!     sticky_session: false,
107//!     https_redirect: false,
108//!     load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
109//!     answer_503: Some("A custom forbidden message".to_string()),
110//!     ..Default::default()
111//! };
112//! ```
113//!
114//! The defaults are sensible, so we could define only the `cluster_id`.
115//!
116//! We can now define a frontend. A frontend is a way to recognize a request and match
117//! it to a `cluster_id`, depending on the hostname and the beginning of the URL path.
118//! The `address` field must match the one of the HTTP listener we defined before:
119//!
120//! ```
121//! use std::collections::BTreeMap;
122//!
123//!  use sozu_command_lib::proto::command::{PathRule, RequestHttpFrontend, RulePosition, SocketAddress};
124//!
125//! let http_front = RequestHttpFrontend {
126//!     cluster_id: Some("my-cluster".to_string()),
127//!     address: SocketAddress::new_v4(127,0,0,1,8080),
128//!     hostname: "example.com".to_string(),
129//!     path: PathRule::prefix(String::from("/")),
130//!     position: RulePosition::Pre.into(),
131//!     tags: BTreeMap::from([
132//!         ("owner".to_owned(), "John".to_owned()),
133//!         ("id".to_owned(), "my-own-http-front".to_owned()),
134//!     ]),
135//!     ..Default::default()
136//! };
137//! ```
138//!
139//! The `tags` are keys and values that will appear in the access logs,
140//! which can come in handy.
141//!
142//! Now let's define a backend.
143//! A backend is an instance of a backend application we want to route traffic to.
144//! The `address` field must match the IP and port of the backend server.
145//!
146//! ```
147//! use sozu_command_lib::proto::command::{AddBackend, LoadBalancingParams, SocketAddress};
148//!
149//! let http_backend = AddBackend {
150//!     cluster_id: "my-cluster".to_string(),
151//!     backend_id: "test-backend".to_string(),
152//!     address: SocketAddress::new_v4(127,0,0,1,8000),
153//!     load_balancing_parameters: Some(LoadBalancingParams::default()),
154//!     ..Default::default()
155//! };
156//! ```
157//!
158//! A cluster can have multiple backend servers, and they can be added or
159//! removed while the proxy is running. If a backend is removed from the configuration
160//! while the proxy is handling a request to that server, it will finish that
161//! request and stop sending new traffic to that server.
162//!
163//!
164//! Now we can use the other end of the channel to send all these requests to the worker,
165//! using the WorkerRequest type:
166//!
167//! ```ignore
168//! use sozu_command_lib::{
169//!     proto::command::{Request, request::RequestType, WorkerRequest},
170//! };
171//!
172//! command_channel
173//!     .write_message(&WorkerRequest {
174//!         id: String::from("add-the-cluster"),
175//!         content: RequestType::AddCluster(cluster).into(),
176//!     })
177//!     .expect("Could not send AddHttpFrontend request");
178//!
179//! command_channel
180//!     .write_message(&WorkerRequest {
181//!         id: String::from("add-the-frontend"),
182//!         content: RequestType::AddHttpFrontend(http_front).into(),
183//!     })
184//!     .expect("Could not send AddHttpFrontend request");
185//!
186//! command_channel
187//!     .write_message(&WorkerRequest {
188//!         id: String::from("add-the-backend"),
189//!         content: RequestType::AddBackend(http_backend).into(),
190//!     })
191//!     .expect("Could not send AddBackend request");
192//!
193//! println!("HTTP -> {:?}", command_channel.read_message());
194//! println!("HTTP -> {:?}", command_channel.read_message());
195//! println!("HTTP -> {:?}", command_channel.read_message());
196//! ```
197//!
198//!
199//! The event loop of the worker will process these instructions and add them to
200//! its state, and the worker will send back an acknowledgement
201//! message.
202//!
203//! Now we can let the worker thread run in the background:
204//!
205//! ```ignore
206//! let _ = worker_thread_join_handle.join();
207//! ```
208//!
209//! Here is the complete example for reference, it matches the `examples/http.rs` example:
210//!
211//! ```
212//! #[macro_use]
213//! extern crate sozu_command_lib;
214//!
215//! use std::{collections::BTreeMap, env, io::stdout, thread};
216//!
217//! use anyhow::Context;
218//! use sozu_command_lib::{
219//!     channel::Channel,
220//!     config::ListenerBuilder,
221//!     logging::setup_default_logging,
222//!     proto::command::{
223//!         request::RequestType, AddBackend, Cluster, LoadBalancingAlgorithms, LoadBalancingParams,
224//!         PathRule, Request, RequestHttpFrontend, RulePosition, SocketAddress,WorkerRequest,
225//!     },
226//! };
227//!
228//! fn main() -> anyhow::Result<()> {
229//!     setup_default_logging(true, "info", "EXAMPLE").with_context(|| "could not setup logging")?;
230//!
231//!     info!("starting up");
232//!
233//!     let http_listener = ListenerBuilder::new_http(SocketAddress::new_v4(127,0,0,1,8080))
234//!         .to_http(None)
235//!         .expect("Could not create HTTP listener");
236//!
237//!     let (mut command_channel, proxy_channel) =
238//!         Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
239//!
240//!     let worker_thread_join_handle = thread::spawn(move || {
241//!         let max_buffers = 500;
242//!         let buffer_size = 16384;
243//!         sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size)
244//!             .expect("The worker could not be started, or shut down");
245//!     });
246//!
247//!     let cluster = Cluster {
248//!         cluster_id: "my-cluster".to_string(),
249//!         sticky_session: false,
250//!         https_redirect: false,
251//!         load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
252//!         answer_503: Some("A custom forbidden message".to_string()),
253//!         ..Default::default()
254//!     };
255//!
256//!     let http_front = RequestHttpFrontend {
257//!         cluster_id: Some("my-cluster".to_string()),
258//!         address: SocketAddress::new_v4(127,0,0,1,8080),
259//!         hostname: "example.com".to_string(),
260//!         path: PathRule::prefix(String::from("/")),
261//!         position: RulePosition::Pre.into(),
262//!         tags: BTreeMap::from([
263//!             ("owner".to_owned(), "John".to_owned()),
264//!             ("id".to_owned(), "my-own-http-front".to_owned()),
265//!         ]),
266//!         ..Default::default()
267//!     };
268//!     let http_backend = AddBackend {
269//!         cluster_id: "my-cluster".to_string(),
270//!         backend_id: "test-backend".to_string(),
271//!         address: SocketAddress::new_v4(127,0,0,1,8000),
272//!         load_balancing_parameters: Some(LoadBalancingParams::default()),
273//!         ..Default::default()
274//!     };
275//!
276//!     command_channel
277//!         .write_message(&WorkerRequest {
278//!             id: String::from("add-the-cluster"),
279//!             content: RequestType::AddCluster(cluster).into(),
280//!         })
281//!         .expect("Could not send AddHttpFrontend request");
282//!
283//!     command_channel
284//!         .write_message(&WorkerRequest {
285//!             id: String::from("add-the-frontend"),
286//!             content: RequestType::AddHttpFrontend(http_front).into(),
287//!         })
288//!         .expect("Could not send AddHttpFrontend request");
289//!
290//!     command_channel
291//!         .write_message(&WorkerRequest {
292//!             id: String::from("add-the-backend"),
293//!             content: RequestType::AddBackend(http_backend).into(),
294//!         })
295//!         .expect("Could not send AddBackend request");
296//!
297//!     println!("HTTP -> {:?}", command_channel.read_message());
298//!     println!("HTTP -> {:?}", command_channel.read_message());
299//!
300//!     // uncomment to let it run in the background
301//!     // let _ = worker_thread_join_handle.join();
302//!     info!("good bye");
303//!     Ok(())
304//! }
305//! ```
306
307#[macro_use]
308extern crate sozu_command_lib as sozu_command;
309
310#[macro_use]
311pub mod util;
312#[macro_use]
313pub mod metrics;
314
315pub mod backends;
316pub mod crypto;
317pub mod features;
318pub mod health_check;
319pub mod http;
320pub mod load_balancing;
321pub mod pool;
322pub mod protocol;
323pub mod retry;
324pub mod router;
325pub mod socket;
326pub mod timer;
327pub mod tls;
328
329/// Linux zero-copy TCP forwarder. Used by `protocol::pipe::Pipe` when
330/// the listener is `Protocol::TCP` and the `splice` feature is enabled.
331#[cfg(all(target_os = "linux", feature = "splice"))]
332pub(crate) mod splice;
333
334pub mod server;
335pub mod tcp;
336
337pub mod https;
338
339use std::{
340    cell::RefCell,
341    collections::{BTreeMap, HashMap},
342    fmt::{self, Display, Formatter},
343    net::SocketAddr,
344    rc::Rc,
345    str,
346    time::{Duration, Instant},
347};
348
349use backends::BackendError;
350use hex::FromHexError;
351use mio::{Interest, Token, net::TcpStream};
352use protocol::http::{answers::HttpAnswers, answers::TemplateError, parser::Method};
353use router::RouterError;
354use socket::ServerBindError;
355use sozu_command::{
356    AsStr, ObjectKind,
357    logging::{CachedTags, LogContext},
358    proto::command::{Cluster, ListenerType, RequestHttpFrontend, WorkerRequest, WorkerResponse},
359    ready::Ready,
360    state::ClusterId,
361};
362use tls::CertificateResolverError;
363
364use crate::{backends::BackendMap, metrics::names, router::RouteResult};
365
366/// Anything that can be registered in mio (subscribe to kernel events)
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368pub enum Protocol {
369    HTTP,
370    HTTPS,
371    TCP,
372    HTTPListen,
373    HTTPSListen,
374    TCPListen,
375    Channel,
376    Metrics,
377    Timer,
378}
379
380/// trait that must be implemented by listeners and client sessions
381pub trait ProxySession {
382    /// indicates the protocol associated with the session
383    ///
384    /// this is used to distinguish sessions from listenrs, channels, metrics
385    /// and timers
386    fn protocol(&self) -> Protocol;
387    /// if a session received an event or can still execute, the event loop will
388    /// call this method. Its result indicates if it can still execute, needs to
389    /// connect to a backend server, close the session
390    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed;
391    /// if the event loop got an event for a token associated with the session,
392    /// it will call this method on the session
393    fn update_readiness(&mut self, token: Token, events: Ready);
394    /// close a session, frontend and backend sockets,
395    /// remove the entries from the session manager slab
396    fn close(&mut self);
397    /// if a timeout associated with the session triggers, the event loop will
398    /// call this method with the timeout's token
399    fn timeout(&mut self, t: Token) -> SessionIsToBeClosed;
400    /// last time the session got an event
401    fn last_event(&self) -> Instant;
402    /// display the session's internal state (for debugging purpose)
403    fn print_session(&self);
404    /// get the token associated with the frontend
405    fn frontend_token(&self) -> Token;
406    /// tell the session it has to shut down if possible
407    ///
408    /// if the session handles HTTP requests, it will not close until the response
409    /// is completely sent back to the client
410    fn shutting_down(&mut self) -> SessionIsToBeClosed;
411    /// Best-effort identifier of the cluster currently routed to by this
412    /// session. Returns `None` for `ListenSession` (no per-session
413    /// cluster), and for client sessions before routing has resolved.
414    /// H2 sessions multiplex many streams over one frontend token and may
415    /// touch several clusters; the returned value is whichever cluster
416    /// the session most recently keep-alive'd to. Used for log/metric
417    /// attribution, not for accounting (the tracker keeps the canonical
418    /// per-stream `(cluster, IP)` set).
419    fn cluster_id(&self) -> Option<String> {
420        None
421    }
422    /// Source address as observed by Sōzu, with proxy-protocol awareness.
423    /// HTTP/HTTPS/TCP client sessions return the parsed PROXY-protocol
424    /// source when present, else `peer_addr`. `ListenSession` returns
425    /// `None`. Used to attribute per-(cluster, source-IP) tracking and
426    /// access logs to the real client behind a layer-4 PROXY frontend.
427    fn session_address(&self) -> Option<SocketAddr> {
428        None
429    }
430}
431
432#[macro_export]
433macro_rules! branch {
434    (if $($value:ident)? == $expected:ident { $($then:tt)* } else { $($else:tt)* }) => {
435        macro_rules! expect {
436            ($expected) => {$($then)*};
437            ($a:ident) => {$($else)*};
438            () => {$($else)*}
439        }
440        expect!($($value)?);
441    };
442    (if $($value:ident)? == $expected:ident { $($then:tt)* } ) => {
443        macro_rules! expect {
444            ($expected) => {$($then)*};
445        }
446        expect!($($value)?);
447    };
448}
449
450#[macro_export]
451macro_rules! fallback {
452    ({} $($default:tt)*) => {
453        $($default)*
454    };
455    ({$($value:tt)+} $($default:tt)*) => {
456        $($value)+
457    };
458}
459
460#[macro_export]
461macro_rules! StateMachineBuilder {
462    (
463        ($d:tt)
464        $(#[$($state_macros:tt)*])*
465        enum $state_name:ident $(impl $trait:ident)?  {
466            $($(#[$($variant_macros:tt)*])*
467            $variant_name:ident($state:ty$(,$($aux:ty),+)?) $(-> $override:expr)?),+ $(,)?
468        }
469    ) => {
470        /// A summary of the last valid State
471        #[derive(Clone, Copy, Debug)]
472        pub enum StateMarker {
473            $($variant_name,)+
474        }
475
476        $(#[$($state_macros)*])*
477        #[allow(clippy::large_enum_variant)]
478        pub enum $state_name {
479            $(
480                $(#[$($variant_macros)*])*
481                $variant_name($state$(,$($aux),+)?),
482            )+
483            /// Informs about upgrade failure, contains a summary the last valid State
484            FailedUpgrade(StateMarker),
485        }
486
487        macro_rules! _fn_impl {
488            ($function:ident(&$d($mut:ident)?, self $d(,$arg_name:ident: $arg_type:ty)*) $d(-> $ret:ty)? $d(| $marker:tt => $fail:expr)?) => {
489                fn $function(&$d($mut)? self $d(,$arg_name: $arg_type)*) $d(-> $ret)? {
490                    match self {
491                        $($state_name::$variant_name(_state, ..) => $crate::fallback!({$($override)?} _state.$function($d($arg_name),*)),)+
492                        $state_name::FailedUpgrade($crate::fallback!({$d($marker)?} _)) => $crate::fallback!({$d($fail)?} unreachable!())
493                    }
494                }
495            };
496        }
497
498        impl $state_name {
499            /// Informs about the last valid State before upgrade failure
500            fn marker(&self) -> StateMarker {
501                match self {
502                    $($state_name::$variant_name(..) => StateMarker::$variant_name,)+
503                    $state_name::FailedUpgrade(marker) => *marker,
504                }
505            }
506            /// Returns wether or not the State is FailedUpgrade
507            fn failed(&self) -> bool {
508                match self {
509                    $state_name::FailedUpgrade(_) => true,
510                    _ => false,
511                }
512            }
513            /// Gives back an owned version of the State,
514            /// leaving a FailedUpgrade in its place.
515            /// The FailedUpgrade retains the marker of the previous State.
516            fn take(&mut self) -> $state_name {
517                let mut owned_state = $state_name::FailedUpgrade(self.marker());
518                std::mem::swap(&mut owned_state, self);
519                owned_state
520            }
521            _fn_impl!{front_socket(&, self) -> &mio::net::TcpStream}
522        }
523
524        $crate::branch!{
525            if $($trait)? == SessionState {
526                impl SessionState for $state_name {
527                    _fn_impl!{ready(&mut, self, session: Rc<RefCell<dyn ProxySession>>, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) -> SessionResult}
528                    _fn_impl!{update_readiness(&mut, self, token: Token, events: Ready)}
529                    _fn_impl!{timeout(&mut, self, token: Token, metrics: &mut SessionMetrics) -> StateResult}
530                    _fn_impl!{cancel_timeouts(&mut, self)}
531                    _fn_impl!{print_state(&, self, context: &str) | marker => error!("{} Session(FailedUpgrade({:?}))", context, marker)}
532                    _fn_impl!{close(&mut, self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) | _ => {}}
533                    _fn_impl!{shutting_down(&mut, self) -> SessionIsToBeClosed | _ => true}
534                }
535            } else {}
536        }
537    };
538    ($($tt:tt)+) => {
539        StateMachineBuilder!{($) $($tt)+}
540    }
541}
542
543pub trait ListenerHandler {
544    fn get_addr(&self) -> &SocketAddr;
545
546    fn get_tags(&self, key: &str) -> Option<&CachedTags>;
547
548    fn get_concatenated_tags(&self, key: &str) -> Option<&str> {
549        self.get_tags(key).map(|tags| tags.concatenated.as_str())
550    }
551
552    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>);
553
554    fn protocol(&self) -> Protocol;
555
556    fn public_address(&self) -> SocketAddr;
557}
558
559#[derive(thiserror::Error, Debug)]
560pub enum FrontendFromRequestError {
561    #[error("Could not parse hostname from '{host}': {error}")]
562    HostParse { host: String, error: String },
563    #[error("invalid remaining chars after hostname. Host: {0}")]
564    InvalidCharsAfterHost(String),
565    #[error("no cluster: {0}")]
566    NoClusterFound(RouterError),
567}
568
569pub trait L7ListenerHandler {
570    fn get_sticky_name(&self) -> &str;
571
572    /// Name of the correlation header Sozu injects into every request and
573    /// response body. Default: `"Sozu-Id"`. Operators can rebrand via the
574    /// `sozu_id_header` listener config knob.
575    fn get_sozu_id_header(&self) -> &str {
576        "Sozu-Id"
577    }
578
579    fn get_connect_timeout(&self) -> u32;
580
581    /// retrieve a frontend by parsing a request's hostname, uri and method
582    fn frontend_from_request(
583        &self,
584        host: &str,
585        uri: &str,
586        method: &Method,
587    ) -> Result<RouteResult, FrontendFromRequestError>;
588
589    /// retrieve the listener's configured HTTP answers (templates)
590    fn get_answers(&self) -> &Rc<RefCell<HttpAnswers>>;
591
592    /// H2 flood detection thresholds from the listener config.
593    /// Returns the default config when the listener does not provide custom values.
594    fn get_h2_flood_config(&self) -> protocol::mux::H2FloodConfig {
595        protocol::mux::H2FloodConfig::default()
596    }
597
598    /// H2 connection tuning from the listener config.
599    /// Returns the default config when the listener does not provide custom values.
600    fn get_h2_connection_config(&self) -> protocol::mux::H2ConnectionConfig {
601        protocol::mux::H2ConnectionConfig::default()
602    }
603
604    /// Whether requests must have their `:authority` / `Host` exact-match
605    /// the TLS SNI negotiated at handshake (CWE-346 / CWE-444).
606    ///
607    /// Defaults to `true` — the safe setting that closes the
608    /// CWE-346 / CWE-444 cross-SNI smuggling vector. Operators can opt
609    /// out per-listener via `HttpsListenerConfig::strict_sni_binding =
610    /// false` when cross-SNI routing is explicitly required. Plaintext
611    /// HTTP listeners return the default value; they never have an SNI
612    /// to compare against, so the routing-layer check short-circuits on
613    /// `tls_server_name: None`.
614    fn get_strict_sni_binding(&self) -> bool {
615        true
616    }
617
618    /// Whether to strip any client-supplied `X-Real-IP` header from
619    /// forwarded requests (anti-spoofing).
620    ///
621    /// Defaults to `false` — preserves the historical pass-through
622    /// behaviour. Operators opt in via
623    /// `HttpListenerConfig::elide_x_real_ip = true` (and the equivalent on
624    /// HTTPS listeners). Independent of [`Self::get_send_x_real_ip`]: the
625    /// two flags can be combined freely (anti-spoof only, send only, both,
626    /// or neither). The elision branch lives in
627    /// `HttpContext::on_request_headers`, so it covers H1 and H2 alike.
628    fn get_elide_x_real_ip(&self) -> bool {
629        false
630    }
631
632    /// Whether to append a proxy-generated `X-Real-IP` header carrying the
633    /// connection peer IP (post-PROXY-v2 unwrap, i.e. the original client
634    /// IP) to every forwarded request.
635    ///
636    /// Defaults to `false` — preserves the historical no-injection
637    /// behaviour. Operators opt in via
638    /// `HttpListenerConfig::send_x_real_ip = true` (and the equivalent on
639    /// HTTPS listeners). Independent of [`Self::get_elide_x_real_ip`]: the
640    /// two flags can be combined freely. The injection branch lives next
641    /// to the existing X-Forwarded-For / Forwarded synthesis in
642    /// `HttpContext::on_request_headers`.
643    fn get_send_x_real_ip(&self) -> bool {
644        false
645    }
646
647    /// Per-stream idle timeout for H2 connections. An open stream that makes
648    /// no forward progress for this duration is cancelled (RST_STREAM / CANCEL).
649    /// Mitigates slow-multiplex Slowloris where a client keeps connection-level
650    /// activity high (resetting the connection idle timer on every frame) while
651    /// pinning streams for the full nominal connection timeout.
652    ///
653    /// Listeners inherit `max(30s, back_timeout)` when `h2_stream_idle_timeout_seconds`
654    /// is absent so operators who raised the socket-level backend budget do not
655    /// have to duplicate the value here; the 30 s floor preserves the baseline
656    /// slow-multiplex mitigation when `back_timeout` is shorter. Set the knob
657    /// explicitly to cap the per-stream deadline below `back_timeout` (useful
658    /// when under a slow-multiplex attack).
659    fn get_h2_stream_idle_timeout(&self) -> std::time::Duration {
660        std::time::Duration::from_secs(30)
661    }
662
663    /// Wall-clock budget granted to in-flight H2 streams after soft-stop sent
664    /// the initial `GOAWAY(NO_ERROR)`. Once the deadline elapses the mux
665    /// transitions to a forced close (final GOAWAY + session teardown).
666    ///
667    /// Returning `None` disables the forced close entirely — shutdown waits
668    /// for every stream to drain naturally. Returning `Some(d)` enforces the
669    /// budget. Default: `Some(Duration::from_secs(5))` (matches the historic
670    /// hard-coded 5 s deadline). Listeners expose the
671    /// `h2_graceful_shutdown_deadline_seconds` knob; value `0` maps to `None`.
672    fn get_h2_graceful_shutdown_deadline(&self) -> Option<std::time::Duration> {
673        Some(std::time::Duration::from_secs(5))
674    }
675}
676
677#[derive(Clone, Copy, Debug, PartialEq, Eq)]
678pub enum BackendConnectionStatus {
679    NotConnected,
680    Connecting(Instant),
681    Connected,
682}
683
684impl BackendConnectionStatus {
685    pub fn is_connecting(&self) -> bool {
686        matches!(self, BackendConnectionStatus::Connecting(_))
687    }
688}
689
690#[derive(Debug, PartialEq, Eq)]
691pub enum BackendConnectAction {
692    New,
693    Reuse,
694    Replace,
695}
696
697#[derive(thiserror::Error, Debug)]
698pub enum BackendConnectionError {
699    #[error("Not found: {0:?}")]
700    NotFound(ObjectKind),
701    #[error("Too many connections on cluster {0:?}")]
702    MaxConnectionRetries(Option<String>),
703    #[error("the sessions slab has reached maximum capacity")]
704    MaxSessionsMemory,
705    #[error("error from the backend: {0}")]
706    Backend(BackendError),
707    #[error("failed to retrieve the cluster: {0}")]
708    RetrieveClusterError(RetrieveClusterError),
709    #[error("maximum number of buffers reached")]
710    MaxBuffers,
711    /// Per-(cluster, source-IP) connection limit reached. The protocol
712    /// layer translates this into HTTP 429 Too Many Requests (with an
713    /// optional `Retry-After`) for HTTP/HTTPS sessions, or a graceful TCP
714    /// close for raw TCP. The `cluster_id` is included so log/metric
715    /// pipelines can attribute the rejection.
716    #[error("per-(cluster, source-IP) connection limit reached for cluster {cluster_id:?}")]
717    TooManyConnectionsPerIp { cluster_id: String },
718}
719
720/// used in kawa_h1 module for the Http session state
721#[derive(thiserror::Error, Debug)]
722pub enum RetrieveClusterError {
723    #[error("No method given")]
724    NoMethod,
725    #[error("No host given")]
726    NoHost,
727    #[error("No path given")]
728    NoPath,
729    #[error("unauthorized route")]
730    UnauthorizedRoute,
731    #[error("{0}")]
732    RetrieveFrontend(FrontendFromRequestError),
733    #[error("HTTPS redirect required")]
734    HttpsRedirect,
735    /// The HTTP `:authority` / `Host` host does not match the TLS SNI that was
736    /// negotiated for this connection, which would cross the TLS trust boundary.
737    /// Maps to HTTP 421 Misdirected Request (RFC 9110 §15.5.20).
738    #[error("TLS SNI {sni:?} does not match HTTP authority {authority:?}")]
739    SniAuthorityMismatch { sni: String, authority: String },
740}
741
742/// Used in sessions
743#[derive(Debug, PartialEq, Eq)]
744pub enum AcceptError {
745    IoError,
746    TooManySessions,
747    WouldBlock,
748    RegisterError,
749    WrongSocketAddress,
750    BufferCapacityReached,
751}
752
753/// returned by the HTTP, HTTPS and TCP listeners
754#[derive(thiserror::Error, Debug)]
755pub enum ListenerError {
756    #[error("failed to handle certificate request, got a resolver error, {0}")]
757    Resolver(CertificateResolverError),
758    #[error("failed to parse pem, {0}")]
759    PemParse(String),
760    #[error("failed to parse template {0:?}: {1}")]
761    TemplateParse(String, TemplateError),
762    #[error("failed to build rustls context, {0}")]
763    BuildRustls(String),
764    #[error("could not activate listener with address {address:?}: {error}")]
765    Activation { address: SocketAddr, error: String },
766    #[error("Could not register listener socket: {0}")]
767    SocketRegistration(std::io::Error),
768    #[error("could not add frontend: {0}")]
769    AddFrontend(RouterError),
770    #[error("could not remove frontend: {0}")]
771    RemoveFrontend(RouterError),
772    #[error("invalid value for field '{field}': {reason}")]
773    InvalidValue {
774        field: &'static str,
775        reason: &'static str,
776    },
777    /// `UpdateHttpsListenerConfig.hsts` was present but its `enabled`
778    /// field was unset. Per the partial-update contract, `enabled` is
779    /// the explicit-disambiguator between "explicit disable" (false) and
780    /// "explicit enable" (true); the patch handler refuses an
781    /// `enabled = None` block rather than silently picking one.
782    #[error(
783        "UpdateHttpsListenerConfig.hsts is present but `enabled` is unset; the partial-update \
784         contract requires `enabled` whenever the `hsts` block is present"
785    )]
786    HstsEnabledRequired,
787}
788
789/// Lift control-plane validation errors into listener-level errors so the
790/// worker can surface the same message without duplicating the match.
791/// Non-`InvalidValue` variants fall back to a generic `InvalidValue` — they
792/// are not expected on the worker's `update_config` path (state lookups
793/// happen on the master) but we avoid panicking if one slips through.
794impl From<sozu_command::state::StateError> for ListenerError {
795    fn from(err: sozu_command::state::StateError) -> Self {
796        match err {
797            sozu_command::state::StateError::InvalidValue { field, reason } => {
798                ListenerError::InvalidValue { field, reason }
799            }
800            _ => ListenerError::InvalidValue {
801                field: "state",
802                reason: "unexpected state error on worker path",
803            },
804        }
805    }
806}
807
808/// Returned by the HTTP, HTTPS and TCP proxies
809#[derive(thiserror::Error, Debug)]
810pub enum ProxyError {
811    #[error("error while soft stopping {proxy_protocol} proxy: {error}")]
812    SoftStop {
813        proxy_protocol: String,
814        error: String,
815    },
816    #[error("error while hard stopping {proxy_protocol} proxy: {error}")]
817    HardStop {
818        proxy_protocol: String,
819        error: String,
820    },
821    #[error("found no listener with address {0:?}")]
822    NoListenerFound(SocketAddr),
823    #[error("a listener is already present for this token")]
824    ListenerAlreadyPresent,
825    #[error("could not add listener: {0}")]
826    AddListener(ListenerError),
827    #[error("could not add cluster: {0}")]
828    AddCluster(ListenerError),
829    #[error("failed to activate listener with address {address:?}: {listener_error}")]
830    ListenerActivation {
831        address: SocketAddr,
832        listener_error: ListenerError,
833    },
834    #[error("can not add frontend {front:?}: {error}")]
835    WrongInputFrontend {
836        front: Box<RequestHttpFrontend>,
837        error: String,
838    },
839    #[error("could not add frontend: {0}")]
840    AddFrontend(ListenerError),
841    #[error("could not remove frontend: {0}")]
842    RemoveFrontend(ListenerError),
843    #[error("could not add certificate: {0}")]
844    AddCertificate(CertificateResolverError),
845    #[error("could not remove certificate: {0}")]
846    RemoveCertificate(CertificateResolverError),
847    #[error("could not replace certificate: {0}")]
848    ReplaceCertificate(CertificateResolverError),
849    #[error("wrong certificate fingerprint: {0}")]
850    WrongCertificateFingerprint(FromHexError),
851    #[error("this request is not supported by the proxy")]
852    UnsupportedMessage,
853    #[error("failed to acquire the lock, {0}")]
854    Lock(String),
855    #[error("could not bind to socket {0:?}: {1}")]
856    BindToSocket(SocketAddr, ServerBindError),
857    #[error("error registering socket of listener: {0}")]
858    RegisterListener(std::io::Error),
859    #[error("the listener is not activated")]
860    UnactivatedListener,
861    /// HSTS (RFC 6797) was attached to a frontend on a plain-HTTP
862    /// listener. RFC 6797 §7.2 forbids `Strict-Transport-Security` on
863    /// plaintext-HTTP responses; the worker rejects the request rather
864    /// than ship a non-conformant policy. The TOML loader rejects the
865    /// same shape at config-load time
866    /// (`command/src/config.rs::ConfigError::HstsOnPlainHttp`); this
867    /// arm catches the same misconfiguration when the request reaches
868    /// the worker over the IPC channel without going through the TOML
869    /// path (e.g. via `sozu frontend http add`).
870    #[error(
871        "HSTS is only valid on HTTPS frontends; rejecting AddHttpFrontend with hsts.enabled = \
872         true on address {0:?} (RFC 6797 §7.2)"
873    )]
874    HstsOnPlainHttp(SocketAddr),
875}
876
877use self::server::ListenToken;
878pub trait ProxyConfiguration {
879    fn notify(&mut self, message: WorkerRequest) -> WorkerResponse;
880    fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError>;
881    fn create_session(
882        &mut self,
883        socket: TcpStream,
884        token: ListenToken,
885        wait_time: Duration,
886        proxy: Rc<RefCell<Self>>,
887        // should we insert the tags here?
888    ) -> Result<(), AcceptError>;
889}
890
891pub trait L7Proxy {
892    fn kind(&self) -> ListenerType;
893
894    fn register_socket(
895        &self,
896        socket: &mut TcpStream,
897        token: Token,
898        interest: Interest,
899    ) -> Result<(), std::io::Error>;
900
901    fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error>;
902
903    fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token;
904
905    /// Remove the session from the session manager slab.
906    /// Returns true if the session was actually there before deletion
907    fn remove_session(&self, token: Token) -> bool;
908
909    fn backends(&self) -> Rc<RefCell<BackendMap>>;
910
911    fn clusters(&self) -> &HashMap<ClusterId, Cluster>;
912
913    /// Access the worker's [`SessionManager`] for per-(cluster, source-IP)
914    /// connection-limit accounting. The mux uses this to track / untrack
915    /// stream-granular `(cluster_id, ip)` entries and consult the
916    /// `cluster_ip_at_limit` gate before each backend connect.
917    fn sessions(&self) -> Rc<RefCell<crate::server::SessionManager>>;
918}
919
920#[derive(Debug, PartialEq, Eq)]
921pub enum RequiredEvents {
922    FrontReadBackNone,
923    FrontWriteBackNone,
924    FrontReadWriteBackNone,
925    FrontNoneBackNone,
926    FrontReadBackRead,
927    FrontWriteBackRead,
928    FrontReadWriteBackRead,
929    FrontNoneBackRead,
930    FrontReadBackWrite,
931    FrontWriteBackWrite,
932    FrontReadWriteBackWrite,
933    FrontNoneBackWrite,
934    FrontReadBackReadWrite,
935    FrontWriteBackReadWrite,
936    FrontReadWriteBackReadWrite,
937    FrontNoneBackReadWrite,
938}
939
940impl RequiredEvents {
941    pub fn front_readable(&self) -> bool {
942        matches!(
943            *self,
944            RequiredEvents::FrontReadBackNone
945                | RequiredEvents::FrontReadWriteBackNone
946                | RequiredEvents::FrontReadBackRead
947                | RequiredEvents::FrontReadWriteBackRead
948                | RequiredEvents::FrontReadBackWrite
949                | RequiredEvents::FrontReadWriteBackWrite
950                | RequiredEvents::FrontReadBackReadWrite
951                | RequiredEvents::FrontReadWriteBackReadWrite
952        )
953    }
954
955    pub fn front_writable(&self) -> bool {
956        matches!(
957            *self,
958            RequiredEvents::FrontWriteBackNone
959                | RequiredEvents::FrontReadWriteBackNone
960                | RequiredEvents::FrontWriteBackRead
961                | RequiredEvents::FrontReadWriteBackRead
962                | RequiredEvents::FrontWriteBackWrite
963                | RequiredEvents::FrontReadWriteBackWrite
964                | RequiredEvents::FrontWriteBackReadWrite
965                | RequiredEvents::FrontReadWriteBackReadWrite
966        )
967    }
968
969    pub fn back_readable(&self) -> bool {
970        matches!(
971            *self,
972            RequiredEvents::FrontReadBackRead
973                | RequiredEvents::FrontWriteBackRead
974                | RequiredEvents::FrontReadWriteBackRead
975                | RequiredEvents::FrontNoneBackRead
976                | RequiredEvents::FrontReadBackReadWrite
977                | RequiredEvents::FrontWriteBackReadWrite
978                | RequiredEvents::FrontReadWriteBackReadWrite
979                | RequiredEvents::FrontNoneBackReadWrite
980        )
981    }
982
983    pub fn back_writable(&self) -> bool {
984        matches!(
985            *self,
986            RequiredEvents::FrontReadBackWrite
987                | RequiredEvents::FrontWriteBackWrite
988                | RequiredEvents::FrontReadWriteBackWrite
989                | RequiredEvents::FrontNoneBackWrite
990                | RequiredEvents::FrontReadBackReadWrite
991                | RequiredEvents::FrontWriteBackReadWrite
992                | RequiredEvents::FrontReadWriteBackReadWrite
993                | RequiredEvents::FrontNoneBackReadWrite
994        )
995    }
996}
997
998/// Signals transitions between states of a given Protocol
999#[derive(Debug, PartialEq, Eq)]
1000pub enum StateResult {
1001    /// Signals to the Protocol to close its backend
1002    CloseBackend,
1003    /// Signals to the parent Session to close itself
1004    CloseSession,
1005    /// Signals to the Protocol to connect to backend
1006    ConnectBackend,
1007    /// Signals to the Protocol to continue
1008    Continue,
1009    /// Signals to the parent Session to upgrade to the next Protocol
1010    Upgrade,
1011}
1012
1013/// Signals transitions between states of a given Session
1014#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1015pub enum SessionResult {
1016    /// Signals to the Session to close itself
1017    Close,
1018    /// Signals to the Session to continue
1019    Continue,
1020    /// Signals to the Session to upgrade its Protocol
1021    Upgrade,
1022}
1023
1024#[derive(Debug, PartialEq, Eq)]
1025pub enum SocketType {
1026    Listener,
1027    FrontClient,
1028}
1029
1030type SessionIsToBeClosed = bool;
1031
1032#[derive(Clone)]
1033pub struct Readiness {
1034    /// the current readiness
1035    pub event: Ready,
1036    /// the readiness we wish to attain
1037    pub interest: Ready,
1038}
1039
1040impl Display for Readiness {
1041    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1042        let i = &mut [b'-'; 4];
1043        let r = &mut [b'-'; 4];
1044        let mixed = &mut [b'-'; 4];
1045
1046        display_ready(i, self.interest);
1047        display_ready(r, self.event);
1048        display_ready(mixed, self.interest & self.event);
1049
1050        write!(
1051            f,
1052            "I({:?})&R({:?})=M({:?})",
1053            String::from_utf8_lossy(i),
1054            String::from_utf8_lossy(r),
1055            String::from_utf8_lossy(mixed)
1056        )
1057    }
1058}
1059
1060impl Default for Readiness {
1061    fn default() -> Self {
1062        Self::new()
1063    }
1064}
1065
1066impl Readiness {
1067    pub const fn new() -> Readiness {
1068        Readiness {
1069            event: Ready::EMPTY,
1070            interest: Ready::EMPTY,
1071        }
1072    }
1073
1074    pub fn reset(&mut self) {
1075        self.event = Ready::EMPTY;
1076        self.interest = Ready::EMPTY;
1077    }
1078
1079    /// filters the readiness we actually want
1080    pub fn filter_interest(&self) -> Ready {
1081        self.event & self.interest
1082    }
1083
1084    /// Signal that the socket has buffered data to write (e.g., TLS internal
1085    /// buffers) that won't generate a new epoll WRITABLE event.
1086    pub fn signal_pending_write(&mut self) {
1087        self.event.insert(Ready::WRITABLE);
1088    }
1089
1090    /// Signal that the socket has buffered data to read (e.g., TLS plaintext
1091    /// buffer after a 1xx clear) that won't generate a new epoll READABLE event.
1092    pub fn signal_pending_read(&mut self) {
1093        self.event.insert(Ready::READABLE);
1094    }
1095
1096    /// Pair `Ready::WRITABLE` insert with `signal_pending_write` — the canonical
1097    /// invariant-15 form for any path that writes bytes to sozu-owned buffers
1098    /// under edge-triggered epoll. See `lib/src/protocol/mux/LIFECYCLE.md`.
1099    #[inline]
1100    pub fn arm_writable(&mut self) {
1101        self.interest.insert(Ready::WRITABLE);
1102        self.signal_pending_write();
1103    }
1104}
1105
1106#[cfg(test)]
1107mod readiness_tests {
1108    use super::{Readiness, Ready};
1109
1110    #[test]
1111    fn arm_writable_sets_interest_and_event() {
1112        let mut r = Readiness::new();
1113        r.arm_writable();
1114        assert!(r.interest.is_writable());
1115        assert!(r.event.is_writable());
1116    }
1117
1118    #[test]
1119    fn arm_writable_is_idempotent() {
1120        let mut r = Readiness::new();
1121        r.arm_writable();
1122        r.arm_writable();
1123        assert_eq!(r.interest, Ready::WRITABLE);
1124        assert_eq!(r.event, Ready::WRITABLE);
1125    }
1126}
1127
1128pub fn display_ready(s: &mut [u8], readiness: Ready) {
1129    if readiness.is_readable() {
1130        s[0] = b'R';
1131    }
1132    if readiness.is_writable() {
1133        s[1] = b'W';
1134    }
1135    if readiness.is_error() {
1136        s[2] = b'E';
1137    }
1138    if readiness.is_hup() {
1139        s[3] = b'H';
1140    }
1141}
1142
1143pub fn ready_to_string(readiness: Ready) -> String {
1144    let s = &mut [b'-'; 4];
1145    display_ready(s, readiness);
1146    String::from_utf8(s.to_vec()).unwrap()
1147}
1148
1149impl fmt::Debug for Readiness {
1150    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1151        let i = &mut [b'-'; 4];
1152        let r = &mut [b'-'; 4];
1153        let mixed = &mut [b'-'; 4];
1154
1155        display_ready(i, self.interest);
1156        display_ready(r, self.event);
1157        display_ready(mixed, self.interest & self.event);
1158
1159        write!(
1160            f,
1161            "Readiness {{ interest: {}, readiness: {}, mixed: {} }}",
1162            str::from_utf8(i).unwrap(),
1163            str::from_utf8(r).unwrap(),
1164            str::from_utf8(mixed).unwrap()
1165        )
1166    }
1167}
1168
1169#[derive(Clone, Debug)]
1170pub struct SessionMetrics {
1171    /// date at which we started handling that request
1172    pub start: Option<Instant>,
1173    /// time actually spent handling the request
1174    pub service_time: Duration,
1175    /// time spent waiting for its turn
1176    pub wait_time: Duration,
1177    /// bytes received by the frontend
1178    pub bin: usize,
1179    /// bytes sent by the frontend
1180    pub bout: usize,
1181
1182    /// date at which we started working on the request
1183    pub service_start: Option<Instant>,
1184    pub wait_start: Instant,
1185
1186    pub backend_id: Option<String>,
1187    pub backend_start: Option<Instant>,
1188    pub backend_connected: Option<Instant>,
1189    pub backend_stop: Option<Instant>,
1190    pub backend_bin: usize,
1191    pub backend_bout: usize,
1192}
1193
1194impl SessionMetrics {
1195    pub fn new(wait_time: Option<Duration>) -> SessionMetrics {
1196        SessionMetrics {
1197            start: Some(Instant::now()),
1198            service_time: Duration::from_secs(0),
1199            wait_time: wait_time.unwrap_or_else(|| Duration::from_secs(0)),
1200            bin: 0,
1201            bout: 0,
1202            service_start: None,
1203            wait_start: Instant::now(),
1204            backend_id: None,
1205            backend_start: None,
1206            backend_connected: None,
1207            backend_stop: None,
1208            backend_bin: 0,
1209            backend_bout: 0,
1210        }
1211    }
1212
1213    pub fn reset(&mut self) {
1214        self.start = None;
1215        self.service_time = Duration::from_secs(0);
1216        self.wait_time = Duration::from_secs(0);
1217        self.bin = 0;
1218        self.bout = 0;
1219        self.service_start = None;
1220        self.backend_start = None;
1221        self.backend_connected = None;
1222        self.backend_stop = None;
1223        self.backend_bin = 0;
1224        self.backend_bout = 0;
1225    }
1226
1227    pub fn service_start(&mut self) {
1228        let now = Instant::now();
1229
1230        if self.start.is_none() {
1231            self.start = Some(now);
1232        }
1233
1234        self.service_start = Some(now);
1235        self.wait_time += now - self.wait_start;
1236    }
1237
1238    pub fn service_stop(&mut self) {
1239        if let Some(start) = self.service_start.take() {
1240            let duration = Instant::now() - start;
1241            self.service_time += duration;
1242        }
1243    }
1244
1245    pub fn wait_start(&mut self) {
1246        self.wait_start = Instant::now();
1247    }
1248
1249    pub fn service_time(&self) -> Duration {
1250        match self.service_start {
1251            Some(start) => {
1252                let last_duration = Instant::now() - start;
1253                self.service_time + last_duration
1254            }
1255            None => self.service_time,
1256        }
1257    }
1258
1259    /// time elapsed since the beginning of the session
1260    pub fn request_time(&self) -> Duration {
1261        match self.start {
1262            Some(start) => Instant::now() - start,
1263            None => Duration::from_secs(0),
1264        }
1265    }
1266
1267    pub fn backend_start(&mut self) {
1268        self.backend_start = Some(Instant::now());
1269    }
1270
1271    pub fn backend_connected(&mut self) {
1272        self.backend_connected = Some(Instant::now());
1273    }
1274
1275    pub fn backend_stop(&mut self) {
1276        self.backend_stop = Some(Instant::now());
1277    }
1278
1279    pub fn backend_response_time(&self) -> Option<Duration> {
1280        match (self.backend_connected, self.backend_stop) {
1281            (Some(start), Some(end)) => Some(end - start),
1282            (Some(start), None) => Some(Instant::now() - start),
1283            _ => None,
1284        }
1285    }
1286
1287    pub fn backend_connection_time(&self) -> Option<Duration> {
1288        match (self.backend_start, self.backend_connected) {
1289            (Some(start), Some(end)) => Some(end - start),
1290            _ => None,
1291        }
1292    }
1293
1294    pub fn register_end_of_session(&self, context: &LogContext) {
1295        let request_time = self.request_time();
1296        let service_time = self.service_time();
1297
1298        if let Some(cluster_id) = context.cluster_id {
1299            time!(
1300                names::event_loop::REQUEST_TIME,
1301                cluster_id,
1302                request_time.as_millis()
1303            );
1304            time!(
1305                names::event_loop::SERVICE_TIME,
1306                cluster_id,
1307                service_time.as_millis()
1308            );
1309        }
1310        time!(names::event_loop::REQUEST_TIME, request_time.as_millis());
1311        time!(names::event_loop::SERVICE_TIME, service_time.as_millis());
1312
1313        if let Some(backend_id) = self.backend_id.as_ref() {
1314            if let Some(backend_response_time) = self.backend_response_time() {
1315                record_backend_metrics!(
1316                    context.cluster_id.as_str_or("-"),
1317                    backend_id,
1318                    backend_response_time.as_millis(),
1319                    self.backend_connection_time(),
1320                    self.backend_bin,
1321                    self.backend_bout
1322                );
1323            }
1324        }
1325
1326        incr!(
1327            names::access_logs::COUNT,
1328            context.cluster_id,
1329            context.backend_id
1330        );
1331    }
1332}
1333
1334/// exponentially weighted moving average with high sensibility to latency bursts
1335///
1336/// cf Finagle for the original implementation: <https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala>
1337#[derive(Debug, PartialEq, Clone)]
1338pub struct PeakEWMA {
1339    /// decay in nanoseconds
1340    ///
1341    /// higher values will make the EWMA decay slowly to 0
1342    pub decay: f64,
1343    /// estimated RTT in nanoseconds
1344    ///
1345    /// must be set to a high enough default value so that new backends do not
1346    /// get all the traffic right away
1347    pub rtt: f64,
1348    /// last modification
1349    pub last_event: Instant,
1350}
1351
1352impl Default for PeakEWMA {
1353    fn default() -> Self {
1354        Self::new()
1355    }
1356}
1357
1358impl PeakEWMA {
1359    // hardcoded default values for now
1360    pub fn new() -> Self {
1361        PeakEWMA {
1362            // 1s
1363            decay: 1_000_000_000f64,
1364            // 50ms
1365            rtt: 50_000_000f64,
1366            last_event: Instant::now(),
1367        }
1368    }
1369
1370    pub fn observe(&mut self, rtt: f64) {
1371        let now = Instant::now();
1372        let dur = now - self.last_event;
1373
1374        // if latency is rising, we will immediately raise the cost
1375        if rtt > self.rtt {
1376            self.rtt = rtt;
1377        } else {
1378            // new_rtt = old_rtt * e^(-elapsed/decay) + observed_rtt * (1 - e^(-elapsed/decay))
1379            let weight = (-(dur.as_nanos() as f64) / self.decay).exp();
1380            self.rtt = self.rtt * weight + rtt * (1.0 - weight);
1381        }
1382
1383        self.last_event = now;
1384    }
1385
1386    pub fn get(&mut self, active_requests: usize) -> f64 {
1387        // decay the current value
1388        // (we might not have seen a request in a long time)
1389        self.observe(0.0);
1390
1391        (active_requests + 1) as f64 * self.rtt
1392    }
1393}
1394
1395pub mod testing {
1396    pub use std::{cell::RefCell, os::fd::IntoRawFd, rc::Rc};
1397
1398    pub use anyhow::Context;
1399    pub use mio::{Poll, Registry, Token, net::UnixStream};
1400    pub use slab::Slab;
1401    pub use sozu_command::{
1402        proto::command::{
1403            HttpListenerConfig, HttpsListenerConfig, ServerConfig, TcpListenerConfig,
1404        },
1405        scm_socket::{Listeners, ScmSocket},
1406    };
1407
1408    pub use crate::{
1409        Protocol, ProxySession,
1410        backends::BackendMap,
1411        http::HttpProxy,
1412        https::HttpsProxy,
1413        pool::Pool,
1414        server::{ListenSession, ProxyChannel, Server, SessionManager},
1415        tcp::TcpProxy,
1416    };
1417
1418    use std::sync::atomic::{AtomicU16, Ordering};
1419
1420    /// Port counter for sozu listener addresses in lib tests.
1421    /// Starts at 10000 to avoid collision with:
1422    /// - Privileged ports (<1024)
1423    /// - e2e suite (starts at 2000)
1424    /// - Ephemeral port range (typically 32768+)
1425    static PORT_PROVIDER: AtomicU16 = AtomicU16::new(10000);
1426
1427    /// Get a unique port for a sozu listener address.
1428    /// Each call returns a different port, safe for parallel test execution.
1429    pub fn provide_port() -> u16 {
1430        PORT_PROVIDER.fetch_add(1, Ordering::SeqCst)
1431    }
1432
1433    /// Everything needed to create a Server
1434    pub struct ServerParts {
1435        pub event_loop: Poll,
1436        pub registry: Registry,
1437        pub sessions: Rc<RefCell<SessionManager>>,
1438        pub pool: Rc<RefCell<Pool>>,
1439        pub backends: Rc<RefCell<BackendMap>>,
1440        pub client_scm_socket: ScmSocket,
1441        pub server_scm_socket: ScmSocket,
1442        pub server_config: ServerConfig,
1443    }
1444
1445    /// Setup a standalone server, for testing purposes
1446    pub fn prebuild_server(
1447        max_buffers: usize,
1448        buffer_size: usize,
1449        send_scm: bool,
1450    ) -> anyhow::Result<ServerParts> {
1451        let event_loop = Poll::new().with_context(|| "Failed at creating event loop")?;
1452        let backends = Rc::new(RefCell::new(BackendMap::new()));
1453        let server_config = ServerConfig {
1454            max_connections: max_buffers as u64,
1455            ..Default::default()
1456        };
1457
1458        let pool = Rc::new(RefCell::new(Pool::with_capacity(
1459            1,
1460            max_buffers,
1461            buffer_size,
1462        )));
1463
1464        let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
1465        {
1466            let entry = sessions.vacant_entry();
1467            info!("taking token {:?} for channel", entry.key());
1468            entry.insert(Rc::new(RefCell::new(ListenSession {
1469                protocol: Protocol::Channel,
1470            })));
1471        }
1472        {
1473            let entry = sessions.vacant_entry();
1474            info!("taking token {:?} for timer", entry.key());
1475            entry.insert(Rc::new(RefCell::new(ListenSession {
1476                protocol: Protocol::Timer,
1477            })));
1478        }
1479        {
1480            let entry = sessions.vacant_entry();
1481            info!("taking token {:?} for metrics", entry.key());
1482            entry.insert(Rc::new(RefCell::new(ListenSession {
1483                protocol: Protocol::Metrics,
1484            })));
1485        }
1486        // Test fixture: feature disabled (max_connections_per_ip = 0).
1487        let sessions = SessionManager::new(sessions, max_buffers, 0, 0);
1488
1489        let registry = event_loop
1490            .registry()
1491            .try_clone()
1492            .with_context(|| "Failed at creating a registry")?;
1493
1494        let (scm_server, scm_client) =
1495            UnixStream::pair().with_context(|| "Failed at creating scm unix stream")?;
1496        let client_scm_socket = ScmSocket::new(scm_client.into_raw_fd())
1497            .with_context(|| "Failed at creating the scm client socket")?;
1498        let server_scm_socket = ScmSocket::new(scm_server.into_raw_fd())
1499            .with_context(|| "Failed at creating the scm server socket")?;
1500        if send_scm {
1501            client_scm_socket
1502                .send_listeners(&Listeners::default())
1503                .with_context(|| "Failed at sending empty listeners")?;
1504        }
1505
1506        Ok(ServerParts {
1507            event_loop,
1508            registry,
1509            sessions,
1510            pool,
1511            backends,
1512            client_scm_socket,
1513            server_scm_socket,
1514            server_config,
1515        })
1516    }
1517}