Skip to main content

sozu_lib/
lib.rs

1//! ## What this library does
2//!
3//! This library provides tools to build and start HTTP, HTTPS and TCP reverse proxies.
4//!
5//! The proxies handles network polling, HTTP parsing, TLS in a fast single threaded event
6//! loop.
7//!
8//! Each proxy is designed to receive configuration changes at runtime instead of
9//! reloading from a file regularly. The event loop runs in its own thread
10//! and receives commands through a message queue.
11//!
12//! ## Difference with the crate `sozu`
13//!
14//! To create several workers and manage them all at once (which is the most common way to
15//! use Sōzu), the crate `sozu` is more indicated than using the lib directly.
16//!
17//! The crate `sozu` provides a binary called the main process.
18//! The main process uses `sozu_lib` to start and manage workers.
19//! Each worker can handle HTTP, HTTPS and TCP traffic.
20//! The main process receives synchronizes the state of all workers, using UNIX sockets
21//! and custom channels to communicate with them.
22//! The main process itself is is configurable with a file, and has a CLI.
23//!
24//! ## How to use this library directly
25//!
26//! This documentation here explains how to write a binary that will start a single Sōzu
27//! worker and give it orders. The method has two steps:
28//!
29//! 1. Starts a Sōzu worker in a distinct thread
30//! 2. sends instructions to the worker on a UNIX socket via a Sōzu channel
31//!
32//! ### How to start a Sōzu worker
33//!
34//! Before creating an HTTP proxy, we first need to create an HTTP listener.
35//! The listener is an abstraction around a TCP socket provided by the kernel.
36//! We need the `sozu_command_lib` to build a listener.
37//!
38//! ```
39//! use sozu_command_lib::{config::ListenerBuilder, proto::command::SocketAddress};
40//!
41//! let address = SocketAddress::new_v4(127,0,0,1,8080);
42//! let http_listener = ListenerBuilder::new_http(address)
43//!     .to_http(None)
44//!     .expect("Could not create HTTP listener");
45//! ```
46//!
47//! The `http_listener` is of the type `HttpListenerConfig`, that we can be sent to the worker
48//! to start the proxy.
49//!
50//! Then create a pair of channels to communicate with the proxy.
51//! The channel is a wrapper around a unix socket.
52//!
53//! ```ignore
54//! use sozu_command_lib::{
55//!     channel::Channel,
56//!     proto::command::{WorkerRequest, WorkerResponse},
57//! };
58//!
59//! let (mut command_channel, proxy_channel): (
60//!     Channel<WorkerRequest, WorkerResponse>,
61//!     Channel<WorkerResponse, WorkerRequest>,
62//! ) = Channel::generate(1000, 10000).expect("should create a channel");
63//!```
64//!
65//! Here, the `command_channel` end is blocking, it sends `WorkerRequest`s and receives
66//! `WorkerResponses`, while the `proxy_channel` end is non-blocking, and the types are reversed.
67//! Writing the types here isn't even necessary thanks to the compiler,
68//! but it brings the point accross.
69//!
70//! You can now launch the worker in a separate thread, providing the HTTP listener config,
71//! the proxy end of the channel, and your custom number of buffers and their size:
72//!
73//! ```ignore
74//! use std::thread;
75//!
76//! let worker_thread_join_handle = thread::spawn(move || {
77//!     let max_buffers = 500;
78//!     let buffer_size = 16384;
79//!     sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size);
80//! });
81//! ```
82//!
83//! ### Send orders
84//!
85//! Once the thread is launched, the proxy worker will start its event loop and handle
86//! events on the listening interface and port specified when building the HTTP Listener.
87//! Since no frontends or backends were specified for the proxy, it will receive
88//! the connections, parse the requests, then send a default (but configurable)
89//! answer.
90//!
91//! Before defining a frontend and backends, we need to define a cluster, which describes
92//! a routing configuration. A cluster contains:
93//!
94//! - one frontend
95//! - one or several backends
96//! - routing rules
97//!
98//! A cluster is identified by its `cluster_id`, which will be used to define frontends
99//! and backends later on.
100//!
101//! ```
102//! use sozu_command_lib::proto::command::{Cluster, LoadBalancingAlgorithms};
103//!
104//! let cluster = Cluster {
105//!     cluster_id: "my-cluster".to_string(),
106//!     sticky_session: false,
107//!     https_redirect: false,
108//!     load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
109//!     answer_503: Some("A custom forbidden message".to_string()),
110//!     ..Default::default()
111//! };
112//! ```
113//!
114//! The defaults are sensible, so we could define only the `cluster_id`.
115//!
116//! We can now define a frontend. A frontend is a way to recognize a request and match
117//! it to a `cluster_id`, depending on the hostname and the beginning of the URL path.
118//! The `address` field must match the one of the HTTP listener we defined before:
119//!
120//! ```
121//! use std::collections::BTreeMap;
122//!
123//!  use sozu_command_lib::proto::command::{PathRule, RequestHttpFrontend, RulePosition, SocketAddress};
124//!
125//! let http_front = RequestHttpFrontend {
126//!     cluster_id: Some("my-cluster".to_string()),
127//!     address: SocketAddress::new_v4(127,0,0,1,8080),
128//!     hostname: "example.com".to_string(),
129//!     path: PathRule::prefix(String::from("/")),
130//!     position: RulePosition::Pre.into(),
131//!     tags: BTreeMap::from([
132//!         ("owner".to_owned(), "John".to_owned()),
133//!         ("id".to_owned(), "my-own-http-front".to_owned()),
134//!     ]),
135//!     ..Default::default()
136//! };
137//! ```
138//!
139//! The `tags` are keys and values that will appear in the access logs,
140//! which can come in handy.
141//!
142//! Now let's define a backend.
143//! A backend is an instance of a backend application we want to route traffic to.
144//! The `address` field must match the IP and port of the backend server.
145//!
146//! ```
147//! use sozu_command_lib::proto::command::{AddBackend, LoadBalancingParams, SocketAddress};
148//!
149//! let http_backend = AddBackend {
150//!     cluster_id: "my-cluster".to_string(),
151//!     backend_id: "test-backend".to_string(),
152//!     address: SocketAddress::new_v4(127,0,0,1,8000),
153//!     load_balancing_parameters: Some(LoadBalancingParams::default()),
154//!     ..Default::default()
155//! };
156//! ```
157//!
158//! A cluster can have multiple backend servers, and they can be added or
159//! removed while the proxy is running. If a backend is removed from the configuration
160//! while the proxy is handling a request to that server, it will finish that
161//! request and stop sending new traffic to that server.
162//!
163//!
164//! Now we can use the other end of the channel to send all these requests to the worker,
165//! using the WorkerRequest type:
166//!
167//! ```ignore
168//! use sozu_command_lib::{
169//!     proto::command::{Request, request::RequestType, WorkerRequest},
170//! };
171//!
172//! command_channel
173//!     .write_message(&WorkerRequest {
174//!         id: String::from("add-the-cluster"),
175//!         content: RequestType::AddCluster(cluster).into(),
176//!     })
177//!     .expect("Could not send AddHttpFrontend request");
178//!
179//! command_channel
180//!     .write_message(&WorkerRequest {
181//!         id: String::from("add-the-frontend"),
182//!         content: RequestType::AddHttpFrontend(http_front).into(),
183//!     })
184//!     .expect("Could not send AddHttpFrontend request");
185//!
186//! command_channel
187//!     .write_message(&WorkerRequest {
188//!         id: String::from("add-the-backend"),
189//!         content: RequestType::AddBackend(http_backend).into(),
190//!     })
191//!     .expect("Could not send AddBackend request");
192//!
193//! println!("HTTP -> {:?}", command_channel.read_message());
194//! println!("HTTP -> {:?}", command_channel.read_message());
195//! println!("HTTP -> {:?}", command_channel.read_message());
196//! ```
197//!
198//!
199//! The event loop of the worker will process these instructions and add them to
200//! its state, and the worker will send back an acknowledgement
201//! message.
202//!
203//! Now we can let the worker thread run in the background:
204//!
205//! ```ignore
206//! let _ = worker_thread_join_handle.join();
207//! ```
208//!
209//! Here is the complete example for reference, it matches the `examples/http.rs` example:
210//!
211//! ```
212//! #[macro_use]
213//! extern crate sozu_command_lib;
214//!
215//! use std::{collections::BTreeMap, env, io::stdout, thread};
216//!
217//! use anyhow::Context;
218//! use sozu_command_lib::{
219//!     channel::Channel,
220//!     config::ListenerBuilder,
221//!     logging::setup_default_logging,
222//!     proto::command::{
223//!         request::RequestType, AddBackend, Cluster, LoadBalancingAlgorithms, LoadBalancingParams,
224//!         PathRule, Request, RequestHttpFrontend, RulePosition, SocketAddress,WorkerRequest,
225//!     },
226//! };
227//!
228//! fn main() -> anyhow::Result<()> {
229//!     setup_default_logging(true, "info", "EXAMPLE").with_context(|| "could not setup logging")?;
230//!
231//!     info!("starting up");
232//!
233//!     let http_listener = ListenerBuilder::new_http(SocketAddress::new_v4(127,0,0,1,8080))
234//!         .to_http(None)
235//!         .expect("Could not create HTTP listener");
236//!
237//!     let (mut command_channel, proxy_channel) =
238//!         Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
239//!
240//!     let worker_thread_join_handle = thread::spawn(move || {
241//!         let max_buffers = 500;
242//!         let buffer_size = 16384;
243//!         sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size)
244//!             .expect("The worker could not be started, or shut down");
245//!     });
246//!
247//!     let cluster = Cluster {
248//!         cluster_id: "my-cluster".to_string(),
249//!         sticky_session: false,
250//!         https_redirect: false,
251//!         load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
252//!         answer_503: Some("A custom forbidden message".to_string()),
253//!         ..Default::default()
254//!     };
255//!
256//!     let http_front = RequestHttpFrontend {
257//!         cluster_id: Some("my-cluster".to_string()),
258//!         address: SocketAddress::new_v4(127,0,0,1,8080),
259//!         hostname: "example.com".to_string(),
260//!         path: PathRule::prefix(String::from("/")),
261//!         position: RulePosition::Pre.into(),
262//!         tags: BTreeMap::from([
263//!             ("owner".to_owned(), "John".to_owned()),
264//!             ("id".to_owned(), "my-own-http-front".to_owned()),
265//!         ]),
266//!         ..Default::default()
267//!     };
268//!     let http_backend = AddBackend {
269//!         cluster_id: "my-cluster".to_string(),
270//!         backend_id: "test-backend".to_string(),
271//!         address: SocketAddress::new_v4(127,0,0,1,8000),
272//!         load_balancing_parameters: Some(LoadBalancingParams::default()),
273//!         ..Default::default()
274//!     };
275//!
276//!     command_channel
277//!         .write_message(&WorkerRequest {
278//!             id: String::from("add-the-cluster"),
279//!             content: RequestType::AddCluster(cluster).into(),
280//!         })
281//!         .expect("Could not send AddHttpFrontend request");
282//!
283//!     command_channel
284//!         .write_message(&WorkerRequest {
285//!             id: String::from("add-the-frontend"),
286//!             content: RequestType::AddHttpFrontend(http_front).into(),
287//!         })
288//!         .expect("Could not send AddHttpFrontend request");
289//!
290//!     command_channel
291//!         .write_message(&WorkerRequest {
292//!             id: String::from("add-the-backend"),
293//!             content: RequestType::AddBackend(http_backend).into(),
294//!         })
295//!         .expect("Could not send AddBackend request");
296//!
297//!     println!("HTTP -> {:?}", command_channel.read_message());
298//!     println!("HTTP -> {:?}", command_channel.read_message());
299//!
300//!     // uncomment to let it run in the background
301//!     // let _ = worker_thread_join_handle.join();
302//!     info!("good bye");
303//!     Ok(())
304//! }
305//! ```
306
307#[macro_use]
308extern crate sozu_command_lib as sozu_command;
309
310#[macro_use]
311pub mod util;
312#[macro_use]
313pub mod metrics;
314
315pub mod backends;
316pub mod crypto;
317pub mod features;
318pub mod health_check;
319pub mod http;
320pub mod load_balancing;
321pub mod pool;
322pub mod protocol;
323pub mod retry;
324pub mod router;
325pub mod socket;
326pub mod timer;
327pub mod tls;
328
329/// Linux zero-copy TCP forwarder. Used by `protocol::pipe::Pipe` when
330/// the listener is `Protocol::TCP` and the `splice` feature is enabled.
331#[cfg(all(target_os = "linux", feature = "splice"))]
332pub(crate) mod splice;
333
334pub mod server;
335pub mod tcp;
336pub mod udp;
337
338pub mod https;
339
340use std::{
341    cell::RefCell,
342    collections::{BTreeMap, HashMap},
343    fmt::{self, Display, Formatter},
344    net::SocketAddr,
345    rc::Rc,
346    str,
347    time::{Duration, Instant, SystemTime},
348};
349
350use backends::BackendError;
351use hex::FromHexError;
352use mio::{Interest, Token, net::TcpStream};
353use protocol::http::{answers::HttpAnswers, answers::TemplateError, parser::Method};
354use router::RouterError;
355use socket::ServerBindError;
356use sozu_command::{
357    AsStr, ObjectKind,
358    logging::{CachedTags, LogContext},
359    proto::command::{Cluster, ListenerType, RequestHttpFrontend, WorkerRequest, WorkerResponse},
360    ready::Ready,
361    state::ClusterId,
362};
363use tls::CertificateResolverError;
364
365use crate::{backends::BackendMap, metrics::names, router::RouteResult};
366
367/// Anything that can be registered in mio (subscribe to kernel events)
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369pub enum Protocol {
370    HTTP,
371    HTTPS,
372    TCP,
373    UDP,
374    HTTPListen,
375    HTTPSListen,
376    TCPListen,
377    UDPListen,
378    Channel,
379    Metrics,
380    Timer,
381}
382
383/// trait that must be implemented by listeners and client sessions
384pub trait ProxySession {
385    /// indicates the protocol associated with the session
386    ///
387    /// this is used to distinguish sessions from listenrs, channels, metrics
388    /// and timers
389    fn protocol(&self) -> Protocol;
390    /// if a session received an event or can still execute, the event loop will
391    /// call this method. Its result indicates if it can still execute, needs to
392    /// connect to a backend server, close the session
393    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed;
394    /// if the event loop got an event for a token associated with the session,
395    /// it will call this method on the session
396    fn update_readiness(&mut self, token: Token, events: Ready);
397    /// close a session, frontend and backend sockets,
398    /// remove the entries from the session manager slab
399    fn close(&mut self);
400    /// if a timeout associated with the session triggers, the event loop will
401    /// call this method with the timeout's token
402    fn timeout(&mut self, t: Token) -> SessionIsToBeClosed;
403    /// last time the session got an event
404    fn last_event(&self) -> Instant;
405    /// display the session's internal state (for debugging purpose)
406    fn print_session(&self);
407    /// get the token associated with the frontend
408    fn frontend_token(&self) -> Token;
409    /// tell the session it has to shut down if possible
410    ///
411    /// if the session handles HTTP requests, it will not close until the response
412    /// is completely sent back to the client
413    fn shutting_down(&mut self) -> SessionIsToBeClosed;
414    /// Best-effort identifier of the cluster currently routed to by this
415    /// session. Returns `None` for `ListenSession` (no per-session
416    /// cluster), and for client sessions before routing has resolved.
417    /// H2 sessions multiplex many streams over one frontend token and may
418    /// touch several clusters; the returned value is whichever cluster
419    /// the session most recently keep-alive'd to. Used for log/metric
420    /// attribution, not for accounting (the tracker keeps the canonical
421    /// per-stream `(cluster, IP)` set).
422    fn cluster_id(&self) -> Option<String> {
423        None
424    }
425    /// Source address as observed by Sōzu, with proxy-protocol awareness.
426    /// HTTP/HTTPS/TCP client sessions return the parsed PROXY-protocol
427    /// source when present, else `peer_addr`. `ListenSession` returns
428    /// `None`. Used to attribute per-(cluster, source-IP) tracking and
429    /// access logs to the real client behind a layer-4 PROXY frontend.
430    fn session_address(&self) -> Option<SocketAddr> {
431        None
432    }
433}
434
435#[macro_export]
436macro_rules! branch {
437    (if $($value:ident)? == $expected:ident { $($then:tt)* } else { $($else:tt)* }) => {
438        macro_rules! expect {
439            ($expected) => {$($then)*};
440            ($a:ident) => {$($else)*};
441            () => {$($else)*}
442        }
443        expect!($($value)?);
444    };
445    (if $($value:ident)? == $expected:ident { $($then:tt)* } ) => {
446        macro_rules! expect {
447            ($expected) => {$($then)*};
448        }
449        expect!($($value)?);
450    };
451}
452
453#[macro_export]
454macro_rules! fallback {
455    ({} $($default:tt)*) => {
456        $($default)*
457    };
458    ({$($value:tt)+} $($default:tt)*) => {
459        $($value)+
460    };
461}
462
463#[macro_export]
464macro_rules! StateMachineBuilder {
465    (
466        ($d:tt)
467        $(#[$($state_macros:tt)*])*
468        enum $state_name:ident $(impl $trait:ident)?  {
469            $($(#[$($variant_macros:tt)*])*
470            $variant_name:ident($state:ty$(,$($aux:ty),+)?) $(-> $override:expr)?),+ $(,)?
471        }
472    ) => {
473        /// A summary of the last valid State
474        #[derive(Clone, Copy, Debug)]
475        pub enum StateMarker {
476            $($variant_name,)+
477        }
478
479        $(#[$($state_macros)*])*
480        #[allow(clippy::large_enum_variant)]
481        pub enum $state_name {
482            $(
483                $(#[$($variant_macros)*])*
484                $variant_name($state$(,$($aux),+)?),
485            )+
486            /// Informs about upgrade failure, contains a summary the last valid State
487            FailedUpgrade(StateMarker),
488        }
489
490        macro_rules! _fn_impl {
491            ($function:ident(&$d($mut:ident)?, self $d(,$arg_name:ident: $arg_type:ty)*) $d(-> $ret:ty)? $d(| $marker:tt => $fail:expr)?) => {
492                fn $function(&$d($mut)? self $d(,$arg_name: $arg_type)*) $d(-> $ret)? {
493                    match self {
494                        $($state_name::$variant_name(_state, ..) => $crate::fallback!({$($override)?} _state.$function($d($arg_name),*)),)+
495                        $state_name::FailedUpgrade($crate::fallback!({$d($marker)?} _)) => $crate::fallback!({$d($fail)?} unreachable!())
496                    }
497                }
498            };
499        }
500
501        impl $state_name {
502            /// Informs about the last valid State before upgrade failure
503            fn marker(&self) -> StateMarker {
504                match self {
505                    $($state_name::$variant_name(..) => StateMarker::$variant_name,)+
506                    $state_name::FailedUpgrade(marker) => *marker,
507                }
508            }
509            /// Returns wether or not the State is FailedUpgrade
510            fn failed(&self) -> bool {
511                match self {
512                    $state_name::FailedUpgrade(_) => true,
513                    _ => false,
514                }
515            }
516            /// Gives back an owned version of the State,
517            /// leaving a FailedUpgrade in its place.
518            /// The FailedUpgrade retains the marker of the previous State.
519            fn take(&mut self) -> $state_name {
520                let mut owned_state = $state_name::FailedUpgrade(self.marker());
521                std::mem::swap(&mut owned_state, self);
522                owned_state
523            }
524            _fn_impl!{front_socket(&, self) -> &mio::net::TcpStream}
525        }
526
527        $crate::branch!{
528            if $($trait)? == SessionState {
529                impl SessionState for $state_name {
530                    _fn_impl!{ready(&mut, self, session: Rc<RefCell<dyn ProxySession>>, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) -> SessionResult}
531                    _fn_impl!{update_readiness(&mut, self, token: Token, events: Ready)}
532                    _fn_impl!{timeout(&mut, self, token: Token, metrics: &mut SessionMetrics) -> StateResult}
533                    _fn_impl!{cancel_timeouts(&mut, self)}
534                    _fn_impl!{print_state(&, self, context: &str) | marker => error!("{} Session(FailedUpgrade({:?}))", context, marker)}
535                    _fn_impl!{close(&mut, self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) | _ => {}}
536                    _fn_impl!{shutting_down(&mut, self) -> SessionIsToBeClosed | _ => true}
537                }
538            } else {}
539        }
540    };
541    ($($tt:tt)+) => {
542        StateMachineBuilder!{($) $($tt)+}
543    }
544}
545
546pub trait ListenerHandler {
547    fn get_addr(&self) -> &SocketAddr;
548
549    fn get_tags(&self, key: &str) -> Option<&CachedTags>;
550
551    fn get_concatenated_tags(&self, key: &str) -> Option<&str> {
552        self.get_tags(key).map(|tags| tags.concatenated.as_str())
553    }
554
555    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>);
556
557    fn protocol(&self) -> Protocol;
558
559    fn public_address(&self) -> SocketAddr;
560}
561
562#[derive(thiserror::Error, Debug)]
563pub enum FrontendFromRequestError {
564    #[error("Could not parse hostname from '{host}': {error}")]
565    HostParse { host: String, error: String },
566    #[error("invalid remaining chars after hostname. Host: {0}")]
567    InvalidCharsAfterHost(String),
568    #[error("no cluster: {0}")]
569    NoClusterFound(RouterError),
570}
571
572pub trait L7ListenerHandler {
573    fn get_sticky_name(&self) -> &str;
574
575    /// Name of the correlation header Sozu injects into every request and
576    /// response body. Default: `"Sozu-Id"`. Operators can rebrand via the
577    /// `sozu_id_header` listener config knob.
578    fn get_sozu_id_header(&self) -> &str {
579        "Sozu-Id"
580    }
581
582    fn get_connect_timeout(&self) -> u32;
583
584    /// retrieve a frontend by parsing a request's hostname, uri and method
585    fn frontend_from_request(
586        &self,
587        host: &str,
588        uri: &str,
589        method: &Method,
590    ) -> Result<RouteResult, FrontendFromRequestError>;
591
592    /// retrieve the listener's configured HTTP answers (templates)
593    fn get_answers(&self) -> &Rc<RefCell<HttpAnswers>>;
594
595    /// H2 flood detection thresholds from the listener config.
596    /// Returns the default config when the listener does not provide custom values.
597    fn get_h2_flood_config(&self) -> protocol::mux::H2FloodConfig {
598        protocol::mux::H2FloodConfig::default()
599    }
600
601    /// H2 connection tuning from the listener config.
602    /// Returns the default config when the listener does not provide custom values.
603    fn get_h2_connection_config(&self) -> protocol::mux::H2ConnectionConfig {
604        protocol::mux::H2ConnectionConfig::default()
605    }
606
607    /// Whether requests must have their `:authority` / `Host` exact-match
608    /// the TLS SNI negotiated at handshake (CWE-346 / CWE-444).
609    ///
610    /// Defaults to `true` — the safe setting that closes the
611    /// CWE-346 / CWE-444 cross-SNI smuggling vector. Operators can opt
612    /// out per-listener via `HttpsListenerConfig::strict_sni_binding =
613    /// false` when cross-SNI routing is explicitly required. Plaintext
614    /// HTTP listeners return the default value; they never have an SNI
615    /// to compare against, so the routing-layer check short-circuits on
616    /// `tls_server_name: None`.
617    fn get_strict_sni_binding(&self) -> bool {
618        true
619    }
620
621    /// Whether to strip any client-supplied `X-Real-IP` header from
622    /// forwarded requests (anti-spoofing).
623    ///
624    /// Defaults to `false` — preserves the historical pass-through
625    /// behaviour. Operators opt in via
626    /// `HttpListenerConfig::elide_x_real_ip = true` (and the equivalent on
627    /// HTTPS listeners). Independent of [`Self::get_send_x_real_ip`]: the
628    /// two flags can be combined freely (anti-spoof only, send only, both,
629    /// or neither). The elision branch lives in
630    /// `HttpContext::on_request_headers`, so it covers H1 and H2 alike.
631    fn get_elide_x_real_ip(&self) -> bool {
632        false
633    }
634
635    /// Whether to append a proxy-generated `X-Real-IP` header carrying the
636    /// connection peer IP (post-PROXY-v2 unwrap, i.e. the original client
637    /// IP) to every forwarded request.
638    ///
639    /// Defaults to `false` — preserves the historical no-injection
640    /// behaviour. Operators opt in via
641    /// `HttpListenerConfig::send_x_real_ip = true` (and the equivalent on
642    /// HTTPS listeners). Independent of [`Self::get_elide_x_real_ip`]: the
643    /// two flags can be combined freely. The injection branch lives next
644    /// to the existing X-Forwarded-For / Forwarded synthesis in
645    /// `HttpContext::on_request_headers`.
646    fn get_send_x_real_ip(&self) -> bool {
647        false
648    }
649
650    /// Per-stream idle timeout for H2 connections. An open stream that makes
651    /// no forward progress for this duration is cancelled (RST_STREAM / CANCEL).
652    /// Mitigates slow-multiplex Slowloris where a client keeps connection-level
653    /// activity high (resetting the connection idle timer on every frame) while
654    /// pinning streams for the full nominal connection timeout.
655    ///
656    /// Listeners inherit `max(30s, back_timeout)` when `h2_stream_idle_timeout_seconds`
657    /// is absent so operators who raised the socket-level backend budget do not
658    /// have to duplicate the value here; the 30 s floor preserves the baseline
659    /// slow-multiplex mitigation when `back_timeout` is shorter. Set the knob
660    /// explicitly to cap the per-stream deadline below `back_timeout` (useful
661    /// when under a slow-multiplex attack).
662    fn get_h2_stream_idle_timeout(&self) -> std::time::Duration {
663        std::time::Duration::from_secs(30)
664    }
665
666    /// Wall-clock budget granted to in-flight H2 streams after soft-stop sent
667    /// the initial `GOAWAY(NO_ERROR)`. Once the deadline elapses the mux
668    /// transitions to a forced close (final GOAWAY + session teardown).
669    ///
670    /// Returning `None` disables the forced close entirely — shutdown waits
671    /// for every stream to drain naturally. Returning `Some(d)` enforces the
672    /// budget. Default: `Some(Duration::from_secs(5))` (matches the historic
673    /// hard-coded 5 s deadline). Listeners expose the
674    /// `h2_graceful_shutdown_deadline_seconds` knob; value `0` maps to `None`.
675    fn get_h2_graceful_shutdown_deadline(&self) -> Option<std::time::Duration> {
676        Some(std::time::Duration::from_secs(5))
677    }
678}
679
680#[derive(Clone, Copy, Debug, PartialEq, Eq)]
681pub enum BackendConnectionStatus {
682    NotConnected,
683    Connecting(Instant),
684    Connected,
685}
686
687impl BackendConnectionStatus {
688    pub fn is_connecting(&self) -> bool {
689        matches!(self, BackendConnectionStatus::Connecting(_))
690    }
691}
692
693#[derive(Debug, PartialEq, Eq)]
694pub enum BackendConnectAction {
695    New,
696    Reuse,
697    Replace,
698}
699
700#[derive(thiserror::Error, Debug)]
701pub enum BackendConnectionError {
702    #[error("Not found: {0:?}")]
703    NotFound(ObjectKind),
704    #[error("Too many connections on cluster {0:?}")]
705    MaxConnectionRetries(Option<String>),
706    #[error("the sessions slab has reached maximum capacity")]
707    MaxSessionsMemory,
708    #[error("error from the backend: {0}")]
709    Backend(BackendError),
710    #[error("failed to retrieve the cluster: {0}")]
711    RetrieveClusterError(RetrieveClusterError),
712    #[error("maximum number of buffers reached")]
713    MaxBuffers,
714    /// Per-(cluster, source-IP) connection limit reached. The protocol
715    /// layer translates this into HTTP 429 Too Many Requests (with an
716    /// optional `Retry-After`) for HTTP/HTTPS sessions, or a graceful TCP
717    /// close for raw TCP. The `cluster_id` is included so log/metric
718    /// pipelines can attribute the rejection.
719    #[error("per-(cluster, source-IP) connection limit reached for cluster {cluster_id:?}")]
720    TooManyConnectionsPerIp { cluster_id: String },
721}
722
723/// used in kawa_h1 module for the Http session state
724#[derive(thiserror::Error, Debug)]
725pub enum RetrieveClusterError {
726    #[error("No method given")]
727    NoMethod,
728    #[error("No host given")]
729    NoHost,
730    #[error("No path given")]
731    NoPath,
732    #[error("unauthorized route")]
733    UnauthorizedRoute,
734    #[error("{0}")]
735    RetrieveFrontend(FrontendFromRequestError),
736    #[error("HTTPS redirect required")]
737    HttpsRedirect,
738    /// The HTTP `:authority` / `Host` host does not match the TLS SNI that was
739    /// negotiated for this connection, which would cross the TLS trust boundary.
740    /// Maps to HTTP 421 Misdirected Request (RFC 9110 §15.5.20).
741    #[error("TLS SNI {sni:?} does not match HTTP authority {authority:?}")]
742    SniAuthorityMismatch { sni: String, authority: String },
743}
744
745/// Used in sessions
746#[derive(Debug, PartialEq, Eq)]
747pub enum AcceptError {
748    IoError,
749    TooManySessions,
750    WouldBlock,
751    RegisterError,
752    WrongSocketAddress,
753    BufferCapacityReached,
754}
755
756/// returned by the HTTP, HTTPS and TCP listeners
757#[derive(thiserror::Error, Debug)]
758pub enum ListenerError {
759    #[error("failed to handle certificate request, got a resolver error, {0}")]
760    Resolver(CertificateResolverError),
761    #[error("failed to parse pem, {0}")]
762    PemParse(String),
763    #[error("failed to parse template {0:?}: {1}")]
764    TemplateParse(String, TemplateError),
765    #[error("failed to build rustls context, {0}")]
766    BuildRustls(String),
767    #[error("could not activate listener with address {address:?}: {error}")]
768    Activation { address: SocketAddr, error: String },
769    #[error("Could not register listener socket: {0}")]
770    SocketRegistration(std::io::Error),
771    #[error("could not add frontend: {0}")]
772    AddFrontend(RouterError),
773    #[error("could not remove frontend: {0}")]
774    RemoveFrontend(RouterError),
775    #[error("invalid value for field '{field}': {reason}")]
776    InvalidValue {
777        field: &'static str,
778        reason: &'static str,
779    },
780    /// `UpdateHttpsListenerConfig.hsts` was present but its `enabled`
781    /// field was unset. Per the partial-update contract, `enabled` is
782    /// the explicit-disambiguator between "explicit disable" (false) and
783    /// "explicit enable" (true); the patch handler refuses an
784    /// `enabled = None` block rather than silently picking one.
785    #[error(
786        "UpdateHttpsListenerConfig.hsts is present but `enabled` is unset; the partial-update \
787         contract requires `enabled` whenever the `hsts` block is present"
788    )]
789    HstsEnabledRequired,
790}
791
792/// Lift control-plane validation errors into listener-level errors so the
793/// worker can surface the same message without duplicating the match.
794/// Non-`InvalidValue` variants fall back to a generic `InvalidValue` — they
795/// are not expected on the worker's `update_config` path (state lookups
796/// happen on the master) but we avoid panicking if one slips through.
797impl From<sozu_command::state::StateError> for ListenerError {
798    fn from(err: sozu_command::state::StateError) -> Self {
799        match err {
800            sozu_command::state::StateError::InvalidValue { field, reason } => {
801                ListenerError::InvalidValue { field, reason }
802            }
803            _ => ListenerError::InvalidValue {
804                field: "state",
805                reason: "unexpected state error on worker path",
806            },
807        }
808    }
809}
810
811/// Returned by the HTTP, HTTPS and TCP proxies
812#[derive(thiserror::Error, Debug)]
813pub enum ProxyError {
814    #[error("error while soft stopping {proxy_protocol} proxy: {error}")]
815    SoftStop {
816        proxy_protocol: String,
817        error: String,
818    },
819    #[error("error while hard stopping {proxy_protocol} proxy: {error}")]
820    HardStop {
821        proxy_protocol: String,
822        error: String,
823    },
824    #[error("found no listener with address {0:?}")]
825    NoListenerFound(SocketAddr),
826    #[error("a listener is already present for this token")]
827    ListenerAlreadyPresent,
828    #[error("could not add listener: {0}")]
829    AddListener(ListenerError),
830    #[error("could not add cluster: {0}")]
831    AddCluster(ListenerError),
832    #[error("failed to activate listener with address {address:?}: {listener_error}")]
833    ListenerActivation {
834        address: SocketAddr,
835        listener_error: ListenerError,
836    },
837    #[error("can not add frontend {front:?}: {error}")]
838    WrongInputFrontend {
839        front: Box<RequestHttpFrontend>,
840        error: String,
841    },
842    #[error("could not add frontend: {0}")]
843    AddFrontend(ListenerError),
844    #[error("could not remove frontend: {0}")]
845    RemoveFrontend(ListenerError),
846    #[error("could not add certificate: {0}")]
847    AddCertificate(CertificateResolverError),
848    #[error("could not remove certificate: {0}")]
849    RemoveCertificate(CertificateResolverError),
850    #[error("could not replace certificate: {0}")]
851    ReplaceCertificate(CertificateResolverError),
852    #[error("wrong certificate fingerprint: {0}")]
853    WrongCertificateFingerprint(FromHexError),
854    #[error("this request is not supported by the proxy")]
855    UnsupportedMessage,
856    #[error("failed to acquire the lock, {0}")]
857    Lock(String),
858    #[error("could not bind to socket {0:?}: {1}")]
859    BindToSocket(SocketAddr, ServerBindError),
860    #[error("error registering socket of listener: {0}")]
861    RegisterListener(std::io::Error),
862    #[error("the listener is not activated")]
863    UnactivatedListener,
864    /// HSTS (RFC 6797) was attached to a frontend on a plain-HTTP
865    /// listener. RFC 6797 §7.2 forbids `Strict-Transport-Security` on
866    /// plaintext-HTTP responses; the worker rejects the request rather
867    /// than ship a non-conformant policy. The TOML loader rejects the
868    /// same shape at config-load time
869    /// (`command/src/config.rs::ConfigError::HstsOnPlainHttp`); this
870    /// arm catches the same misconfiguration when the request reaches
871    /// the worker over the IPC channel without going through the TOML
872    /// path (e.g. via `sozu frontend http add`).
873    #[error(
874        "HSTS is only valid on HTTPS frontends; rejecting AddHttpFrontend with hsts.enabled = \
875         true on address {0:?} (RFC 6797 §7.2)"
876    )]
877    HstsOnPlainHttp(SocketAddr),
878}
879
880use self::server::ListenToken;
881pub trait ProxyConfiguration {
882    fn notify(&mut self, message: WorkerRequest) -> WorkerResponse;
883    fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError>;
884    fn create_session(
885        &mut self,
886        socket: TcpStream,
887        token: ListenToken,
888        wait_time: Duration,
889        proxy: Rc<RefCell<Self>>,
890        // should we insert the tags here?
891    ) -> Result<(), AcceptError>;
892}
893
894pub trait L7Proxy {
895    fn kind(&self) -> ListenerType;
896
897    fn register_socket(
898        &self,
899        socket: &mut TcpStream,
900        token: Token,
901        interest: Interest,
902    ) -> Result<(), std::io::Error>;
903
904    fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error>;
905
906    fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token;
907
908    /// Remove the session from the session manager slab.
909    /// Returns true if the session was actually there before deletion
910    fn remove_session(&self, token: Token) -> bool;
911
912    fn backends(&self) -> Rc<RefCell<BackendMap>>;
913
914    fn clusters(&self) -> &HashMap<ClusterId, Cluster>;
915
916    /// Access the worker's [`SessionManager`] for per-(cluster, source-IP)
917    /// connection-limit accounting. The mux uses this to track / untrack
918    /// stream-granular `(cluster_id, ip)` entries and consult the
919    /// `cluster_ip_at_limit` gate before each backend connect.
920    fn sessions(&self) -> Rc<RefCell<crate::server::SessionManager>>;
921}
922
923#[derive(Debug, PartialEq, Eq)]
924pub enum RequiredEvents {
925    FrontReadBackNone,
926    FrontWriteBackNone,
927    FrontReadWriteBackNone,
928    FrontNoneBackNone,
929    FrontReadBackRead,
930    FrontWriteBackRead,
931    FrontReadWriteBackRead,
932    FrontNoneBackRead,
933    FrontReadBackWrite,
934    FrontWriteBackWrite,
935    FrontReadWriteBackWrite,
936    FrontNoneBackWrite,
937    FrontReadBackReadWrite,
938    FrontWriteBackReadWrite,
939    FrontReadWriteBackReadWrite,
940    FrontNoneBackReadWrite,
941}
942
943impl RequiredEvents {
944    pub fn front_readable(&self) -> bool {
945        matches!(
946            *self,
947            RequiredEvents::FrontReadBackNone
948                | RequiredEvents::FrontReadWriteBackNone
949                | RequiredEvents::FrontReadBackRead
950                | RequiredEvents::FrontReadWriteBackRead
951                | RequiredEvents::FrontReadBackWrite
952                | RequiredEvents::FrontReadWriteBackWrite
953                | RequiredEvents::FrontReadBackReadWrite
954                | RequiredEvents::FrontReadWriteBackReadWrite
955        )
956    }
957
958    pub fn front_writable(&self) -> bool {
959        matches!(
960            *self,
961            RequiredEvents::FrontWriteBackNone
962                | RequiredEvents::FrontReadWriteBackNone
963                | RequiredEvents::FrontWriteBackRead
964                | RequiredEvents::FrontReadWriteBackRead
965                | RequiredEvents::FrontWriteBackWrite
966                | RequiredEvents::FrontReadWriteBackWrite
967                | RequiredEvents::FrontWriteBackReadWrite
968                | RequiredEvents::FrontReadWriteBackReadWrite
969        )
970    }
971
972    pub fn back_readable(&self) -> bool {
973        matches!(
974            *self,
975            RequiredEvents::FrontReadBackRead
976                | RequiredEvents::FrontWriteBackRead
977                | RequiredEvents::FrontReadWriteBackRead
978                | RequiredEvents::FrontNoneBackRead
979                | RequiredEvents::FrontReadBackReadWrite
980                | RequiredEvents::FrontWriteBackReadWrite
981                | RequiredEvents::FrontReadWriteBackReadWrite
982                | RequiredEvents::FrontNoneBackReadWrite
983        )
984    }
985
986    pub fn back_writable(&self) -> bool {
987        matches!(
988            *self,
989            RequiredEvents::FrontReadBackWrite
990                | RequiredEvents::FrontWriteBackWrite
991                | RequiredEvents::FrontReadWriteBackWrite
992                | RequiredEvents::FrontNoneBackWrite
993                | RequiredEvents::FrontReadBackReadWrite
994                | RequiredEvents::FrontWriteBackReadWrite
995                | RequiredEvents::FrontReadWriteBackReadWrite
996                | RequiredEvents::FrontNoneBackReadWrite
997        )
998    }
999}
1000
1001/// Signals transitions between states of a given Protocol
1002#[derive(Debug, PartialEq, Eq)]
1003pub enum StateResult {
1004    /// Signals to the Protocol to close its backend
1005    CloseBackend,
1006    /// Signals to the parent Session to close itself
1007    CloseSession,
1008    /// Signals to the Protocol to connect to backend
1009    ConnectBackend,
1010    /// Signals to the Protocol to continue
1011    Continue,
1012    /// Signals to the parent Session to upgrade to the next Protocol
1013    Upgrade,
1014}
1015
1016/// Signals transitions between states of a given Session
1017#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1018pub enum SessionResult {
1019    /// Signals to the Session to close itself
1020    Close,
1021    /// Signals to the Session to continue
1022    Continue,
1023    /// Signals to the Session to upgrade its Protocol
1024    Upgrade,
1025}
1026
1027#[derive(Debug, PartialEq, Eq)]
1028pub enum SocketType {
1029    Listener,
1030    FrontClient,
1031}
1032
1033type SessionIsToBeClosed = bool;
1034
1035#[derive(Clone)]
1036pub struct Readiness {
1037    /// the current readiness
1038    pub event: Ready,
1039    /// the readiness we wish to attain
1040    pub interest: Ready,
1041}
1042
1043impl Display for Readiness {
1044    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1045        let i = &mut [b'-'; 4];
1046        let r = &mut [b'-'; 4];
1047        let mixed = &mut [b'-'; 4];
1048
1049        display_ready(i, self.interest);
1050        display_ready(r, self.event);
1051        display_ready(mixed, self.interest & self.event);
1052
1053        write!(
1054            f,
1055            "I({:?})&R({:?})=M({:?})",
1056            String::from_utf8_lossy(i),
1057            String::from_utf8_lossy(r),
1058            String::from_utf8_lossy(mixed)
1059        )
1060    }
1061}
1062
1063impl Default for Readiness {
1064    fn default() -> Self {
1065        Self::new()
1066    }
1067}
1068
1069impl Readiness {
1070    /// Mask of every bit `Ready` defines (READABLE | WRITABLE | ERROR | HUP).
1071    /// Any bit outside this set in `event` or `interest` is a corrupted
1072    /// readiness word — checked by [`Self::check_invariants`]. Not
1073    /// `#[cfg(debug_assertions)]`-gated: it is read from inside `debug_assert!`s
1074    /// whose arguments must still compile in release (HARD RULE 2 / E0425).
1075    const KNOWN_BITS: Ready =
1076        Ready(Ready::READABLE.0 | Ready::WRITABLE.0 | Ready::ERROR.0 | Ready::HUP.0);
1077
1078    pub const fn new() -> Readiness {
1079        Readiness {
1080            event: Ready::EMPTY,
1081            interest: Ready::EMPTY,
1082        }
1083    }
1084
1085    /// Cross-field invariant sweep: neither `event` nor `interest` may carry a
1086    /// bit `Ready` does not define. A stray bit would silently widen
1087    /// `filter_interest`'s mask and wake (or starve) a session on a phantom
1088    /// readiness. Cheap enough to call as a postcondition from every mutator.
1089    #[cfg(debug_assertions)]
1090    fn check_invariants(&self) {
1091        debug_assert_eq!(
1092            self.event & Self::KNOWN_BITS,
1093            self.event,
1094            "Readiness.event carries a bit outside READABLE|WRITABLE|ERROR|HUP"
1095        );
1096        debug_assert_eq!(
1097            self.interest & Self::KNOWN_BITS,
1098            self.interest,
1099            "Readiness.interest carries a bit outside READABLE|WRITABLE|ERROR|HUP"
1100        );
1101    }
1102
1103    pub fn reset(&mut self) {
1104        self.event = Ready::EMPTY;
1105        self.interest = Ready::EMPTY;
1106        // Post-condition: a reset clears *both* words — a half-reset leaves a
1107        // session armed on stale interest after teardown.
1108        debug_assert!(
1109            self.event.is_empty() && self.interest.is_empty(),
1110            "reset must clear both event and interest"
1111        );
1112        #[cfg(debug_assertions)]
1113        self.check_invariants();
1114    }
1115
1116    /// filters the readiness we actually want
1117    pub fn filter_interest(&self) -> Ready {
1118        // Pre-condition: both source words must be well-formed before we mask —
1119        // a stray bit upstream would leak through the intersection and wake (or
1120        // starve) a session on a phantom readiness.
1121        #[cfg(debug_assertions)]
1122        self.check_invariants();
1123        let filtered = self.event & self.interest;
1124        // Post-condition: the result is a subset of the recognized bits and can
1125        // only contain bits the session both saw AND asked for.
1126        debug_assert_eq!(
1127            filtered & Self::KNOWN_BITS,
1128            filtered,
1129            "filter_interest must not yield an unknown bit"
1130        );
1131        debug_assert!(
1132            self.interest.contains(filtered) && self.event.contains(filtered),
1133            "filtered readiness must be present in both interest and event"
1134        );
1135        filtered
1136    }
1137
1138    /// Signal that the socket has buffered data to write (e.g., TLS internal
1139    /// buffers) that won't generate a new epoll WRITABLE event.
1140    pub fn signal_pending_write(&mut self) {
1141        // Snapshot the unrelated bits (everything but WRITABLE) so we can prove
1142        // we flipped exactly the WRITABLE bit. `Ready` exposes no `!`, so mask on
1143        // the public `.0` word.
1144        let other_event_before = Ready(self.event.0 & !Ready::WRITABLE.0);
1145        self.event.insert(Ready::WRITABLE);
1146        debug_assert!(
1147            self.event.is_writable(),
1148            "signal_pending_write must set the WRITABLE event bit"
1149        );
1150        debug_assert_eq!(
1151            Ready(self.event.0 & !Ready::WRITABLE.0),
1152            other_event_before,
1153            "signal_pending_write must touch only the WRITABLE bit"
1154        );
1155        #[cfg(debug_assertions)]
1156        self.check_invariants();
1157    }
1158
1159    /// Signal that the socket has buffered data to read (e.g., TLS plaintext
1160    /// buffer after a 1xx clear) that won't generate a new epoll READABLE event.
1161    pub fn signal_pending_read(&mut self) {
1162        let other_event_before = Ready(self.event.0 & !Ready::READABLE.0);
1163        self.event.insert(Ready::READABLE);
1164        debug_assert!(
1165            self.event.is_readable(),
1166            "signal_pending_read must set the READABLE event bit"
1167        );
1168        debug_assert_eq!(
1169            Ready(self.event.0 & !Ready::READABLE.0),
1170            other_event_before,
1171            "signal_pending_read must touch only the READABLE bit"
1172        );
1173        #[cfg(debug_assertions)]
1174        self.check_invariants();
1175    }
1176
1177    /// Pair `Ready::WRITABLE` insert with `signal_pending_write` — the canonical
1178    /// invariant-15 form for any path that writes bytes to sozu-owned buffers
1179    /// under edge-triggered epoll. See `lib/src/protocol/mux/LIFECYCLE.md`.
1180    #[inline]
1181    pub fn arm_writable(&mut self) {
1182        // Snapshot the non-WRITABLE bits of both words: arm_writable must set the
1183        // WRITABLE bit in *both* interest and event and leave everything else as-is.
1184        let other_interest_before = Ready(self.interest.0 & !Ready::WRITABLE.0);
1185        let other_event_before = Ready(self.event.0 & !Ready::WRITABLE.0);
1186        self.interest.insert(Ready::WRITABLE);
1187        self.signal_pending_write();
1188        debug_assert!(
1189            self.interest.is_writable() && self.event.is_writable(),
1190            "arm_writable must set WRITABLE in both interest and event"
1191        );
1192        debug_assert_eq!(
1193            Ready(self.interest.0 & !Ready::WRITABLE.0),
1194            other_interest_before,
1195            "arm_writable must touch only the WRITABLE interest bit"
1196        );
1197        debug_assert_eq!(
1198            Ready(self.event.0 & !Ready::WRITABLE.0),
1199            other_event_before,
1200            "arm_writable must touch only the WRITABLE event bit"
1201        );
1202        #[cfg(debug_assertions)]
1203        self.check_invariants();
1204    }
1205}
1206
1207#[cfg(test)]
1208mod readiness_tests {
1209    use super::{Readiness, Ready};
1210
1211    #[test]
1212    fn arm_writable_sets_interest_and_event() {
1213        let mut r = Readiness::new();
1214        r.arm_writable();
1215        assert!(r.interest.is_writable());
1216        assert!(r.event.is_writable());
1217    }
1218
1219    #[test]
1220    fn arm_writable_is_idempotent() {
1221        let mut r = Readiness::new();
1222        r.arm_writable();
1223        r.arm_writable();
1224        assert_eq!(r.interest, Ready::WRITABLE);
1225        assert_eq!(r.event, Ready::WRITABLE);
1226    }
1227}
1228
1229pub fn display_ready(s: &mut [u8], readiness: Ready) {
1230    if readiness.is_readable() {
1231        s[0] = b'R';
1232    }
1233    if readiness.is_writable() {
1234        s[1] = b'W';
1235    }
1236    if readiness.is_error() {
1237        s[2] = b'E';
1238    }
1239    if readiness.is_hup() {
1240        s[3] = b'H';
1241    }
1242}
1243
1244pub fn ready_to_string(readiness: Ready) -> String {
1245    let s = &mut [b'-'; 4];
1246    display_ready(s, readiness);
1247    String::from_utf8(s.to_vec()).unwrap()
1248}
1249
1250impl fmt::Debug for Readiness {
1251    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1252        let i = &mut [b'-'; 4];
1253        let r = &mut [b'-'; 4];
1254        let mixed = &mut [b'-'; 4];
1255
1256        display_ready(i, self.interest);
1257        display_ready(r, self.event);
1258        display_ready(mixed, self.interest & self.event);
1259
1260        write!(
1261            f,
1262            "Readiness {{ interest: {}, readiness: {}, mixed: {} }}",
1263            str::from_utf8(i).unwrap(),
1264            str::from_utf8(r).unwrap(),
1265            str::from_utf8(mixed).unwrap()
1266        )
1267    }
1268}
1269
1270#[derive(Clone, Debug)]
1271pub struct SessionMetrics {
1272    /// date at which we started handling that request
1273    pub start: Option<Instant>,
1274    /// wall-clock timestamp captured alongside `start`, for access-log
1275    /// consumers that need an absolute start time (e.g. OTel span
1276    /// reconstruction) without subtracting a monotonic duration from a
1277    /// wall-clock end time — which mixes two unsynchronised clock sources.
1278    pub start_wall: Option<SystemTime>,
1279    /// time actually spent handling the request
1280    pub service_time: Duration,
1281    /// time spent waiting for its turn
1282    pub wait_time: Duration,
1283    /// bytes received by the frontend
1284    pub bin: usize,
1285    /// bytes sent by the frontend
1286    pub bout: usize,
1287
1288    /// date at which we started working on the request
1289    pub service_start: Option<Instant>,
1290    pub wait_start: Instant,
1291
1292    pub backend_id: Option<String>,
1293    pub backend_start: Option<Instant>,
1294    pub backend_connected: Option<Instant>,
1295    pub backend_stop: Option<Instant>,
1296    pub backend_bin: usize,
1297    pub backend_bout: usize,
1298}
1299
1300impl SessionMetrics {
1301    pub fn new(wait_time: Option<Duration>) -> SessionMetrics {
1302        SessionMetrics {
1303            start: Some(Instant::now()),
1304            start_wall: Some(SystemTime::now()),
1305            service_time: Duration::from_secs(0),
1306            wait_time: wait_time.unwrap_or_else(|| Duration::from_secs(0)),
1307            bin: 0,
1308            bout: 0,
1309            service_start: None,
1310            wait_start: Instant::now(),
1311            backend_id: None,
1312            backend_start: None,
1313            backend_connected: None,
1314            backend_stop: None,
1315            backend_bin: 0,
1316            backend_bout: 0,
1317        }
1318    }
1319
1320    pub fn reset(&mut self) {
1321        self.start = None;
1322        self.start_wall = None;
1323        self.service_time = Duration::from_secs(0);
1324        self.wait_time = Duration::from_secs(0);
1325        self.bin = 0;
1326        self.bout = 0;
1327        self.service_start = None;
1328        self.backend_start = None;
1329        self.backend_connected = None;
1330        self.backend_stop = None;
1331        self.backend_bin = 0;
1332        self.backend_bout = 0;
1333    }
1334
1335    pub fn service_start(&mut self) {
1336        let now = if self.start.is_none() {
1337            self.mark_request_start()
1338        } else {
1339            Instant::now()
1340        };
1341        self.service_start = Some(now);
1342        self.wait_time += now - self.wait_start;
1343    }
1344
1345    pub fn service_stop(&mut self) {
1346        if let Some(start) = self.service_start.take() {
1347            let duration = Instant::now() - start;
1348            self.service_time += duration;
1349        }
1350    }
1351
1352    pub fn wait_start(&mut self) {
1353        self.wait_start = Instant::now();
1354    }
1355
1356    pub fn service_time(&self) -> Duration {
1357        match self.service_start {
1358            Some(start) => {
1359                let last_duration = Instant::now() - start;
1360                self.service_time + last_duration
1361            }
1362            None => self.service_time,
1363        }
1364    }
1365
1366    /// Arm both the monotonic and wall-clock start timestamps together.
1367    /// This must be the single place that sets `start` + `start_wall` outside
1368    /// of `new()`, so the two fields can never desynchronize.
1369    /// Returns the monotonic instant so callers that need it (e.g.
1370    /// `service_start`) can reuse it without a second syscall.
1371    pub fn mark_request_start(&mut self) -> Instant {
1372        let now = Instant::now();
1373        self.start = Some(now);
1374        self.start_wall = Some(SystemTime::now());
1375        now
1376    }
1377
1378    /// time elapsed since the beginning of the session
1379    pub fn request_time(&self) -> Duration {
1380        match self.start {
1381            Some(start) => Instant::now() - start,
1382            None => Duration::from_secs(0),
1383        }
1384    }
1385
1386    /// Wall-clock start time as nanoseconds since the Unix epoch, or `None`
1387    /// if the monotonic start has not been set yet (post-`reset()`, pre-`service_start()`).
1388    pub fn start_wall_ns(&self) -> Option<i128> {
1389        self.start_wall.and_then(|t| {
1390            t.duration_since(SystemTime::UNIX_EPOCH)
1391                .ok()
1392                .map(|d| d.as_nanos() as i128)
1393        })
1394    }
1395
1396    pub fn backend_start(&mut self) {
1397        self.backend_start = Some(Instant::now());
1398    }
1399
1400    pub fn backend_connected(&mut self) {
1401        self.backend_connected = Some(Instant::now());
1402    }
1403
1404    pub fn backend_stop(&mut self) {
1405        self.backend_stop = Some(Instant::now());
1406    }
1407
1408    pub fn backend_response_time(&self) -> Option<Duration> {
1409        match (self.backend_connected, self.backend_stop) {
1410            (Some(start), Some(end)) => Some(end - start),
1411            (Some(start), None) => Some(Instant::now() - start),
1412            _ => None,
1413        }
1414    }
1415
1416    pub fn backend_connection_time(&self) -> Option<Duration> {
1417        match (self.backend_start, self.backend_connected) {
1418            (Some(start), Some(end)) => Some(end - start),
1419            _ => None,
1420        }
1421    }
1422
1423    pub fn register_end_of_session(&self, context: &LogContext) {
1424        let request_time = self.request_time();
1425        let service_time = self.service_time();
1426
1427        if let Some(cluster_id) = context.cluster_id {
1428            time!(
1429                names::event_loop::REQUEST_TIME,
1430                cluster_id,
1431                request_time.as_millis()
1432            );
1433            time!(
1434                names::event_loop::SERVICE_TIME,
1435                cluster_id,
1436                service_time.as_millis()
1437            );
1438        }
1439        time!(names::event_loop::REQUEST_TIME, request_time.as_millis());
1440        time!(names::event_loop::SERVICE_TIME, service_time.as_millis());
1441
1442        if let Some(backend_id) = self.backend_id.as_ref() {
1443            if let Some(backend_response_time) = self.backend_response_time() {
1444                record_backend_metrics!(
1445                    context.cluster_id.as_str_or("-"),
1446                    backend_id,
1447                    backend_response_time.as_millis(),
1448                    self.backend_connection_time(),
1449                    self.backend_bin,
1450                    self.backend_bout
1451                );
1452            }
1453        }
1454
1455        incr!(
1456            names::access_logs::COUNT,
1457            context.cluster_id,
1458            context.backend_id
1459        );
1460    }
1461}
1462
1463/// exponentially weighted moving average with high sensibility to latency bursts
1464///
1465/// cf Finagle for the original implementation: <https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala>
1466#[derive(Debug, PartialEq, Clone)]
1467pub struct PeakEWMA {
1468    /// decay in nanoseconds
1469    ///
1470    /// higher values will make the EWMA decay slowly to 0
1471    pub decay: f64,
1472    /// estimated RTT in nanoseconds
1473    ///
1474    /// must be set to a high enough default value so that new backends do not
1475    /// get all the traffic right away
1476    pub rtt: f64,
1477    /// last modification
1478    pub last_event: Instant,
1479}
1480
1481impl Default for PeakEWMA {
1482    fn default() -> Self {
1483        Self::new()
1484    }
1485}
1486
1487impl PeakEWMA {
1488    // hardcoded default values for now
1489    pub fn new() -> Self {
1490        PeakEWMA {
1491            // 1s
1492            decay: 1_000_000_000f64,
1493            // 50ms
1494            rtt: 50_000_000f64,
1495            last_event: Instant::now(),
1496        }
1497    }
1498
1499    pub fn observe(&mut self, rtt: f64) {
1500        let now = Instant::now();
1501        let dur = now - self.last_event;
1502
1503        // if latency is rising, we will immediately raise the cost
1504        if rtt > self.rtt {
1505            self.rtt = rtt;
1506        } else {
1507            // new_rtt = old_rtt * e^(-elapsed/decay) + observed_rtt * (1 - e^(-elapsed/decay))
1508            let weight = (-(dur.as_nanos() as f64) / self.decay).exp();
1509            self.rtt = self.rtt * weight + rtt * (1.0 - weight);
1510        }
1511
1512        self.last_event = now;
1513    }
1514
1515    pub fn get(&mut self, active_requests: usize) -> f64 {
1516        // decay the current value
1517        // (we might not have seen a request in a long time)
1518        self.observe(0.0);
1519
1520        (active_requests + 1) as f64 * self.rtt
1521    }
1522}
1523
1524pub mod testing {
1525    pub use std::{cell::RefCell, os::fd::IntoRawFd, rc::Rc};
1526
1527    pub use anyhow::Context;
1528    pub use mio::{Poll, Registry, Token, net::UnixStream};
1529    pub use slab::Slab;
1530    pub use sozu_command::{
1531        proto::command::{
1532            HttpListenerConfig, HttpsListenerConfig, ServerConfig, TcpListenerConfig,
1533        },
1534        scm_socket::{Listeners, ScmSocket},
1535    };
1536
1537    pub use crate::{
1538        Protocol, ProxySession,
1539        backends::BackendMap,
1540        http::HttpProxy,
1541        https::HttpsProxy,
1542        pool::Pool,
1543        server::{ListenSession, ProxyChannel, Server, SessionManager},
1544        tcp::TcpProxy,
1545    };
1546
1547    use std::sync::atomic::{AtomicU16, Ordering};
1548
1549    /// Port counter for sozu listener addresses in lib tests.
1550    /// Starts at 10000 to avoid collision with:
1551    /// - Privileged ports (<1024)
1552    /// - e2e suite (starts at 2000)
1553    /// - Ephemeral port range (typically 32768+)
1554    static PORT_PROVIDER: AtomicU16 = AtomicU16::new(10000);
1555
1556    /// Get a unique port for a sozu listener address.
1557    /// Each call returns a different port, safe for parallel test execution.
1558    pub fn provide_port() -> u16 {
1559        PORT_PROVIDER.fetch_add(1, Ordering::SeqCst)
1560    }
1561
1562    /// Everything needed to create a Server
1563    pub struct ServerParts {
1564        pub event_loop: Poll,
1565        pub registry: Registry,
1566        pub sessions: Rc<RefCell<SessionManager>>,
1567        pub pool: Rc<RefCell<Pool>>,
1568        pub backends: Rc<RefCell<BackendMap>>,
1569        pub client_scm_socket: ScmSocket,
1570        pub server_scm_socket: ScmSocket,
1571        pub server_config: ServerConfig,
1572    }
1573
1574    /// Setup a standalone server, for testing purposes
1575    pub fn prebuild_server(
1576        max_buffers: usize,
1577        buffer_size: usize,
1578        send_scm: bool,
1579    ) -> anyhow::Result<ServerParts> {
1580        let event_loop = Poll::new().with_context(|| "Failed at creating event loop")?;
1581        let backends = Rc::new(RefCell::new(BackendMap::new()));
1582        let server_config = ServerConfig {
1583            max_connections: max_buffers as u64,
1584            ..Default::default()
1585        };
1586
1587        let pool = Rc::new(RefCell::new(Pool::with_capacity(
1588            1,
1589            max_buffers,
1590            buffer_size,
1591        )));
1592
1593        let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
1594        {
1595            let entry = sessions.vacant_entry();
1596            info!("taking token {:?} for channel", entry.key());
1597            entry.insert(Rc::new(RefCell::new(ListenSession {
1598                protocol: Protocol::Channel,
1599            })));
1600        }
1601        {
1602            let entry = sessions.vacant_entry();
1603            info!("taking token {:?} for timer", entry.key());
1604            entry.insert(Rc::new(RefCell::new(ListenSession {
1605                protocol: Protocol::Timer,
1606            })));
1607        }
1608        {
1609            let entry = sessions.vacant_entry();
1610            info!("taking token {:?} for metrics", entry.key());
1611            entry.insert(Rc::new(RefCell::new(ListenSession {
1612                protocol: Protocol::Metrics,
1613            })));
1614        }
1615        // Test fixture: feature disabled (max_connections_per_ip = 0).
1616        let sessions = SessionManager::new(sessions, max_buffers, 0, 0);
1617
1618        let registry = event_loop
1619            .registry()
1620            .try_clone()
1621            .with_context(|| "Failed at creating a registry")?;
1622
1623        let (scm_server, scm_client) =
1624            UnixStream::pair().with_context(|| "Failed at creating scm unix stream")?;
1625        let client_scm_socket = ScmSocket::new(scm_client.into_raw_fd())
1626            .with_context(|| "Failed at creating the scm client socket")?;
1627        let server_scm_socket = ScmSocket::new(scm_server.into_raw_fd())
1628            .with_context(|| "Failed at creating the scm server socket")?;
1629        if send_scm {
1630            client_scm_socket
1631                .send_listeners(&Listeners::default())
1632                .with_context(|| "Failed at sending empty listeners")?;
1633        }
1634
1635        Ok(ServerParts {
1636            event_loop,
1637            registry,
1638            sessions,
1639            pool,
1640            backends,
1641            client_scm_socket,
1642            server_scm_socket,
1643            server_config,
1644        })
1645    }
1646}