sozu_lib/lib.rs
1//! ## What this library does
2//!
3//! This library provides tools to build and start HTTP, HTTPS and TCP reverse proxies.
4//!
5//! The proxies handles network polling, HTTP parsing, TLS in a fast single threaded event
6//! loop.
7//!
8//! Each proxy is designed to receive configuration changes at runtime instead of
9//! reloading from a file regularly. The event loop runs in its own thread
10//! and receives commands through a message queue.
11//!
12//! ## Difference with the crate `sozu`
13//!
14//! To create several workers and manage them all at once (which is the most common way to
15//! use Sōzu), the crate `sozu` is more indicated than using the lib directly.
16//!
17//! The crate `sozu` provides a binary called the main process.
18//! The main process uses `sozu_lib` to start and manage workers.
19//! Each worker can handle HTTP, HTTPS and TCP traffic.
20//! The main process receives synchronizes the state of all workers, using UNIX sockets
21//! and custom channels to communicate with them.
22//! The main process itself is is configurable with a file, and has a CLI.
23//!
24//! ## How to use this library directly
25//!
26//! This documentation here explains how to write a binary that will start a single Sōzu
27//! worker and give it orders. The method has two steps:
28//!
29//! 1. Starts a Sōzu worker in a distinct thread
30//! 2. sends instructions to the worker on a UNIX socket via a Sōzu channel
31//!
32//! ### How to start a Sōzu worker
33//!
34//! Before creating an HTTP proxy, we first need to create an HTTP listener.
35//! The listener is an abstraction around a TCP socket provided by the kernel.
36//! We need the `sozu_command_lib` to build a listener.
37//!
38//! ```
39//! use sozu_command_lib::{config::ListenerBuilder, proto::command::SocketAddress};
40//!
41//! let address = SocketAddress::new_v4(127,0,0,1,8080);
42//! let http_listener = ListenerBuilder::new_http(address)
43//! .to_http(None)
44//! .expect("Could not create HTTP listener");
45//! ```
46//!
47//! The `http_listener` is of the type `HttpListenerConfig`, that we can be sent to the worker
48//! to start the proxy.
49//!
50//! Then create a pair of channels to communicate with the proxy.
51//! The channel is a wrapper around a unix socket.
52//!
53//! ```ignore
54//! use sozu_command_lib::{
55//! channel::Channel,
56//! proto::command::{WorkerRequest, WorkerResponse},
57//! };
58//!
59//! let (mut command_channel, proxy_channel): (
60//! Channel<WorkerRequest, WorkerResponse>,
61//! Channel<WorkerResponse, WorkerRequest>,
62//! ) = Channel::generate(1000, 10000).expect("should create a channel");
63//!```
64//!
65//! Here, the `command_channel` end is blocking, it sends `WorkerRequest`s and receives
66//! `WorkerResponses`, while the `proxy_channel` end is non-blocking, and the types are reversed.
67//! Writing the types here isn't even necessary thanks to the compiler,
68//! but it brings the point accross.
69//!
70//! You can now launch the worker in a separate thread, providing the HTTP listener config,
71//! the proxy end of the channel, and your custom number of buffers and their size:
72//!
73//! ```ignore
74//! use std::thread;
75//!
76//! let worker_thread_join_handle = thread::spawn(move || {
77//! let max_buffers = 500;
78//! let buffer_size = 16384;
79//! sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size);
80//! });
81//! ```
82//!
83//! ### Send orders
84//!
85//! Once the thread is launched, the proxy worker will start its event loop and handle
86//! events on the listening interface and port specified when building the HTTP Listener.
87//! Since no frontends or backends were specified for the proxy, it will receive
88//! the connections, parse the requests, then send a default (but configurable)
89//! answer.
90//!
91//! Before defining a frontend and backends, we need to define a cluster, which describes
92//! a routing configuration. A cluster contains:
93//!
94//! - one frontend
95//! - one or several backends
96//! - routing rules
97//!
98//! A cluster is identified by its `cluster_id`, which will be used to define frontends
99//! and backends later on.
100//!
101//! ```
102//! use sozu_command_lib::proto::command::{Cluster, LoadBalancingAlgorithms};
103//!
104//! let cluster = Cluster {
105//! cluster_id: "my-cluster".to_string(),
106//! sticky_session: false,
107//! https_redirect: false,
108//! load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
109//! answer_503: Some("A custom forbidden message".to_string()),
110//! ..Default::default()
111//! };
112//! ```
113//!
114//! The defaults are sensible, so we could define only the `cluster_id`.
115//!
116//! We can now define a frontend. A frontend is a way to recognize a request and match
117//! it to a `cluster_id`, depending on the hostname and the beginning of the URL path.
118//! The `address` field must match the one of the HTTP listener we defined before:
119//!
120//! ```
121//! use std::collections::BTreeMap;
122//!
123//! use sozu_command_lib::proto::command::{PathRule, RequestHttpFrontend, RulePosition, SocketAddress};
124//!
125//! let http_front = RequestHttpFrontend {
126//! cluster_id: Some("my-cluster".to_string()),
127//! address: SocketAddress::new_v4(127,0,0,1,8080),
128//! hostname: "example.com".to_string(),
129//! path: PathRule::prefix(String::from("/")),
130//! position: RulePosition::Pre.into(),
131//! tags: BTreeMap::from([
132//! ("owner".to_owned(), "John".to_owned()),
133//! ("id".to_owned(), "my-own-http-front".to_owned()),
134//! ]),
135//! ..Default::default()
136//! };
137//! ```
138//!
139//! The `tags` are keys and values that will appear in the access logs,
140//! which can come in handy.
141//!
142//! Now let's define a backend.
143//! A backend is an instance of a backend application we want to route traffic to.
144//! The `address` field must match the IP and port of the backend server.
145//!
146//! ```
147//! use sozu_command_lib::proto::command::{AddBackend, LoadBalancingParams, SocketAddress};
148//!
149//! let http_backend = AddBackend {
150//! cluster_id: "my-cluster".to_string(),
151//! backend_id: "test-backend".to_string(),
152//! address: SocketAddress::new_v4(127,0,0,1,8000),
153//! load_balancing_parameters: Some(LoadBalancingParams::default()),
154//! ..Default::default()
155//! };
156//! ```
157//!
158//! A cluster can have multiple backend servers, and they can be added or
159//! removed while the proxy is running. If a backend is removed from the configuration
160//! while the proxy is handling a request to that server, it will finish that
161//! request and stop sending new traffic to that server.
162//!
163//!
164//! Now we can use the other end of the channel to send all these requests to the worker,
165//! using the WorkerRequest type:
166//!
167//! ```ignore
168//! use sozu_command_lib::{
169//! proto::command::{Request, request::RequestType, WorkerRequest},
170//! };
171//!
172//! command_channel
173//! .write_message(&WorkerRequest {
174//! id: String::from("add-the-cluster"),
175//! content: RequestType::AddCluster(cluster).into(),
176//! })
177//! .expect("Could not send AddHttpFrontend request");
178//!
179//! command_channel
180//! .write_message(&WorkerRequest {
181//! id: String::from("add-the-frontend"),
182//! content: RequestType::AddHttpFrontend(http_front).into(),
183//! })
184//! .expect("Could not send AddHttpFrontend request");
185//!
186//! command_channel
187//! .write_message(&WorkerRequest {
188//! id: String::from("add-the-backend"),
189//! content: RequestType::AddBackend(http_backend).into(),
190//! })
191//! .expect("Could not send AddBackend request");
192//!
193//! println!("HTTP -> {:?}", command_channel.read_message());
194//! println!("HTTP -> {:?}", command_channel.read_message());
195//! println!("HTTP -> {:?}", command_channel.read_message());
196//! ```
197//!
198//!
199//! The event loop of the worker will process these instructions and add them to
200//! its state, and the worker will send back an acknowledgement
201//! message.
202//!
203//! Now we can let the worker thread run in the background:
204//!
205//! ```ignore
206//! let _ = worker_thread_join_handle.join();
207//! ```
208//!
209//! Here is the complete example for reference, it matches the `examples/http.rs` example:
210//!
211//! ```
212//! #[macro_use]
213//! extern crate sozu_command_lib;
214//!
215//! use std::{collections::BTreeMap, env, io::stdout, thread};
216//!
217//! use anyhow::Context;
218//! use sozu_command_lib::{
219//! channel::Channel,
220//! config::ListenerBuilder,
221//! logging::setup_default_logging,
222//! proto::command::{
223//! request::RequestType, AddBackend, Cluster, LoadBalancingAlgorithms, LoadBalancingParams,
224//! PathRule, Request, RequestHttpFrontend, RulePosition, SocketAddress,WorkerRequest,
225//! },
226//! };
227//!
228//! fn main() -> anyhow::Result<()> {
229//! setup_default_logging(true, "info", "EXAMPLE").with_context(|| "could not setup logging")?;
230//!
231//! info!("starting up");
232//!
233//! let http_listener = ListenerBuilder::new_http(SocketAddress::new_v4(127,0,0,1,8080))
234//! .to_http(None)
235//! .expect("Could not create HTTP listener");
236//!
237//! let (mut command_channel, proxy_channel) =
238//! Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
239//!
240//! let worker_thread_join_handle = thread::spawn(move || {
241//! let max_buffers = 500;
242//! let buffer_size = 16384;
243//! sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size)
244//! .expect("The worker could not be started, or shut down");
245//! });
246//!
247//! let cluster = Cluster {
248//! cluster_id: "my-cluster".to_string(),
249//! sticky_session: false,
250//! https_redirect: false,
251//! load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
252//! answer_503: Some("A custom forbidden message".to_string()),
253//! ..Default::default()
254//! };
255//!
256//! let http_front = RequestHttpFrontend {
257//! cluster_id: Some("my-cluster".to_string()),
258//! address: SocketAddress::new_v4(127,0,0,1,8080),
259//! hostname: "example.com".to_string(),
260//! path: PathRule::prefix(String::from("/")),
261//! position: RulePosition::Pre.into(),
262//! tags: BTreeMap::from([
263//! ("owner".to_owned(), "John".to_owned()),
264//! ("id".to_owned(), "my-own-http-front".to_owned()),
265//! ]),
266//! ..Default::default()
267//! };
268//! let http_backend = AddBackend {
269//! cluster_id: "my-cluster".to_string(),
270//! backend_id: "test-backend".to_string(),
271//! address: SocketAddress::new_v4(127,0,0,1,8000),
272//! load_balancing_parameters: Some(LoadBalancingParams::default()),
273//! ..Default::default()
274//! };
275//!
276//! command_channel
277//! .write_message(&WorkerRequest {
278//! id: String::from("add-the-cluster"),
279//! content: RequestType::AddCluster(cluster).into(),
280//! })
281//! .expect("Could not send AddHttpFrontend request");
282//!
283//! command_channel
284//! .write_message(&WorkerRequest {
285//! id: String::from("add-the-frontend"),
286//! content: RequestType::AddHttpFrontend(http_front).into(),
287//! })
288//! .expect("Could not send AddHttpFrontend request");
289//!
290//! command_channel
291//! .write_message(&WorkerRequest {
292//! id: String::from("add-the-backend"),
293//! content: RequestType::AddBackend(http_backend).into(),
294//! })
295//! .expect("Could not send AddBackend request");
296//!
297//! println!("HTTP -> {:?}", command_channel.read_message());
298//! println!("HTTP -> {:?}", command_channel.read_message());
299//!
300//! // uncomment to let it run in the background
301//! // let _ = worker_thread_join_handle.join();
302//! info!("good bye");
303//! Ok(())
304//! }
305//! ```
306
307#[macro_use]
308extern crate sozu_command_lib as sozu_command;
309
310#[macro_use]
311pub mod util;
312#[macro_use]
313pub mod metrics;
314
315pub mod backends;
316pub mod crypto;
317pub mod features;
318pub mod health_check;
319pub mod http;
320pub mod load_balancing;
321pub mod pool;
322pub mod protocol;
323pub mod retry;
324pub mod router;
325pub mod socket;
326pub mod timer;
327pub mod tls;
328
329/// Linux zero-copy TCP forwarder. Used by `protocol::pipe::Pipe` when
330/// the listener is `Protocol::TCP` and the `splice` feature is enabled.
331#[cfg(all(target_os = "linux", feature = "splice"))]
332pub(crate) mod splice;
333
334pub mod server;
335pub mod tcp;
336pub mod udp;
337
338pub mod https;
339
340use std::{
341 cell::RefCell,
342 collections::{BTreeMap, HashMap},
343 fmt::{self, Display, Formatter},
344 net::SocketAddr,
345 rc::Rc,
346 str,
347 time::{Duration, Instant, SystemTime},
348};
349
350use backends::BackendError;
351use hex::FromHexError;
352use mio::{Interest, Token, net::TcpStream};
353use protocol::http::{answers::HttpAnswers, answers::TemplateError, parser::Method};
354use router::RouterError;
355use socket::ServerBindError;
356use sozu_command::{
357 AsStr, ObjectKind,
358 logging::{CachedTags, LogContext},
359 proto::command::{Cluster, ListenerType, RequestHttpFrontend, WorkerRequest, WorkerResponse},
360 ready::Ready,
361 state::ClusterId,
362};
363use tls::CertificateResolverError;
364
365use crate::{backends::BackendMap, metrics::names, router::RouteResult};
366
367/// Anything that can be registered in mio (subscribe to kernel events)
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369pub enum Protocol {
370 HTTP,
371 HTTPS,
372 TCP,
373 UDP,
374 HTTPListen,
375 HTTPSListen,
376 TCPListen,
377 UDPListen,
378 Channel,
379 Metrics,
380 Timer,
381}
382
383/// trait that must be implemented by listeners and client sessions
384pub trait ProxySession {
385 /// indicates the protocol associated with the session
386 ///
387 /// this is used to distinguish sessions from listenrs, channels, metrics
388 /// and timers
389 fn protocol(&self) -> Protocol;
390 /// if a session received an event or can still execute, the event loop will
391 /// call this method. Its result indicates if it can still execute, needs to
392 /// connect to a backend server, close the session
393 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed;
394 /// if the event loop got an event for a token associated with the session,
395 /// it will call this method on the session
396 fn update_readiness(&mut self, token: Token, events: Ready);
397 /// close a session, frontend and backend sockets,
398 /// remove the entries from the session manager slab
399 fn close(&mut self);
400 /// if a timeout associated with the session triggers, the event loop will
401 /// call this method with the timeout's token
402 fn timeout(&mut self, t: Token) -> SessionIsToBeClosed;
403 /// last time the session got an event
404 fn last_event(&self) -> Instant;
405 /// display the session's internal state (for debugging purpose)
406 fn print_session(&self);
407 /// get the token associated with the frontend
408 fn frontend_token(&self) -> Token;
409 /// tell the session it has to shut down if possible
410 ///
411 /// if the session handles HTTP requests, it will not close until the response
412 /// is completely sent back to the client
413 fn shutting_down(&mut self) -> SessionIsToBeClosed;
414 /// Best-effort identifier of the cluster currently routed to by this
415 /// session. Returns `None` for `ListenSession` (no per-session
416 /// cluster), and for client sessions before routing has resolved.
417 /// H2 sessions multiplex many streams over one frontend token and may
418 /// touch several clusters; the returned value is whichever cluster
419 /// the session most recently keep-alive'd to. Used for log/metric
420 /// attribution, not for accounting (the tracker keeps the canonical
421 /// per-stream `(cluster, IP)` set).
422 fn cluster_id(&self) -> Option<String> {
423 None
424 }
425 /// Source address as observed by Sōzu, with proxy-protocol awareness.
426 /// HTTP/HTTPS/TCP client sessions return the parsed PROXY-protocol
427 /// source when present, else `peer_addr`. `ListenSession` returns
428 /// `None`. Used to attribute per-(cluster, source-IP) tracking and
429 /// access logs to the real client behind a layer-4 PROXY frontend.
430 fn session_address(&self) -> Option<SocketAddr> {
431 None
432 }
433}
434
435#[macro_export]
436macro_rules! branch {
437 (if $($value:ident)? == $expected:ident { $($then:tt)* } else { $($else:tt)* }) => {
438 macro_rules! expect {
439 ($expected) => {$($then)*};
440 ($a:ident) => {$($else)*};
441 () => {$($else)*}
442 }
443 expect!($($value)?);
444 };
445 (if $($value:ident)? == $expected:ident { $($then:tt)* } ) => {
446 macro_rules! expect {
447 ($expected) => {$($then)*};
448 }
449 expect!($($value)?);
450 };
451}
452
453#[macro_export]
454macro_rules! fallback {
455 ({} $($default:tt)*) => {
456 $($default)*
457 };
458 ({$($value:tt)+} $($default:tt)*) => {
459 $($value)+
460 };
461}
462
463#[macro_export]
464macro_rules! StateMachineBuilder {
465 (
466 ($d:tt)
467 $(#[$($state_macros:tt)*])*
468 enum $state_name:ident $(impl $trait:ident)? {
469 $($(#[$($variant_macros:tt)*])*
470 $variant_name:ident($state:ty$(,$($aux:ty),+)?) $(-> $override:expr)?),+ $(,)?
471 }
472 ) => {
473 /// A summary of the last valid State
474 #[derive(Clone, Copy, Debug)]
475 pub enum StateMarker {
476 $($variant_name,)+
477 }
478
479 $(#[$($state_macros)*])*
480 #[allow(clippy::large_enum_variant)]
481 pub enum $state_name {
482 $(
483 $(#[$($variant_macros)*])*
484 $variant_name($state$(,$($aux),+)?),
485 )+
486 /// Informs about upgrade failure, contains a summary the last valid State
487 FailedUpgrade(StateMarker),
488 }
489
490 macro_rules! _fn_impl {
491 ($function:ident(&$d($mut:ident)?, self $d(,$arg_name:ident: $arg_type:ty)*) $d(-> $ret:ty)? $d(| $marker:tt => $fail:expr)?) => {
492 fn $function(&$d($mut)? self $d(,$arg_name: $arg_type)*) $d(-> $ret)? {
493 match self {
494 $($state_name::$variant_name(_state, ..) => $crate::fallback!({$($override)?} _state.$function($d($arg_name),*)),)+
495 $state_name::FailedUpgrade($crate::fallback!({$d($marker)?} _)) => $crate::fallback!({$d($fail)?} unreachable!())
496 }
497 }
498 };
499 }
500
501 impl $state_name {
502 /// Informs about the last valid State before upgrade failure
503 fn marker(&self) -> StateMarker {
504 match self {
505 $($state_name::$variant_name(..) => StateMarker::$variant_name,)+
506 $state_name::FailedUpgrade(marker) => *marker,
507 }
508 }
509 /// Returns wether or not the State is FailedUpgrade
510 fn failed(&self) -> bool {
511 match self {
512 $state_name::FailedUpgrade(_) => true,
513 _ => false,
514 }
515 }
516 /// Gives back an owned version of the State,
517 /// leaving a FailedUpgrade in its place.
518 /// The FailedUpgrade retains the marker of the previous State.
519 fn take(&mut self) -> $state_name {
520 let mut owned_state = $state_name::FailedUpgrade(self.marker());
521 std::mem::swap(&mut owned_state, self);
522 owned_state
523 }
524 _fn_impl!{front_socket(&, self) -> &mio::net::TcpStream}
525 }
526
527 $crate::branch!{
528 if $($trait)? == SessionState {
529 impl SessionState for $state_name {
530 _fn_impl!{ready(&mut, self, session: Rc<RefCell<dyn ProxySession>>, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) -> SessionResult}
531 _fn_impl!{update_readiness(&mut, self, token: Token, events: Ready)}
532 _fn_impl!{timeout(&mut, self, token: Token, metrics: &mut SessionMetrics) -> StateResult}
533 _fn_impl!{cancel_timeouts(&mut, self)}
534 _fn_impl!{print_state(&, self, context: &str) | marker => error!("{} Session(FailedUpgrade({:?}))", context, marker)}
535 _fn_impl!{close(&mut, self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) | _ => {}}
536 _fn_impl!{shutting_down(&mut, self) -> SessionIsToBeClosed | _ => true}
537 }
538 } else {}
539 }
540 };
541 ($($tt:tt)+) => {
542 StateMachineBuilder!{($) $($tt)+}
543 }
544}
545
546pub trait ListenerHandler {
547 fn get_addr(&self) -> &SocketAddr;
548
549 fn get_tags(&self, key: &str) -> Option<&CachedTags>;
550
551 fn get_concatenated_tags(&self, key: &str) -> Option<&str> {
552 self.get_tags(key).map(|tags| tags.concatenated.as_str())
553 }
554
555 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>);
556
557 fn protocol(&self) -> Protocol;
558
559 fn public_address(&self) -> SocketAddr;
560}
561
562#[derive(thiserror::Error, Debug)]
563pub enum FrontendFromRequestError {
564 #[error("Could not parse hostname from '{host}': {error}")]
565 HostParse { host: String, error: String },
566 #[error("invalid remaining chars after hostname. Host: {0}")]
567 InvalidCharsAfterHost(String),
568 #[error("no cluster: {0}")]
569 NoClusterFound(RouterError),
570}
571
572pub trait L7ListenerHandler {
573 fn get_sticky_name(&self) -> &str;
574
575 /// Name of the correlation header Sozu injects into every request and
576 /// response body. Default: `"Sozu-Id"`. Operators can rebrand via the
577 /// `sozu_id_header` listener config knob.
578 fn get_sozu_id_header(&self) -> &str {
579 "Sozu-Id"
580 }
581
582 fn get_connect_timeout(&self) -> u32;
583
584 /// retrieve a frontend by parsing a request's hostname, uri and method
585 fn frontend_from_request(
586 &self,
587 host: &str,
588 uri: &str,
589 method: &Method,
590 ) -> Result<RouteResult, FrontendFromRequestError>;
591
592 /// retrieve the listener's configured HTTP answers (templates)
593 fn get_answers(&self) -> &Rc<RefCell<HttpAnswers>>;
594
595 /// H2 flood detection thresholds from the listener config.
596 /// Returns the default config when the listener does not provide custom values.
597 fn get_h2_flood_config(&self) -> protocol::mux::H2FloodConfig {
598 protocol::mux::H2FloodConfig::default()
599 }
600
601 /// H2 connection tuning from the listener config.
602 /// Returns the default config when the listener does not provide custom values.
603 fn get_h2_connection_config(&self) -> protocol::mux::H2ConnectionConfig {
604 protocol::mux::H2ConnectionConfig::default()
605 }
606
607 /// Whether requests must have their `:authority` / `Host` exact-match
608 /// the TLS SNI negotiated at handshake (CWE-346 / CWE-444).
609 ///
610 /// Defaults to `true` — the safe setting that closes the
611 /// CWE-346 / CWE-444 cross-SNI smuggling vector. Operators can opt
612 /// out per-listener via `HttpsListenerConfig::strict_sni_binding =
613 /// false` when cross-SNI routing is explicitly required. Plaintext
614 /// HTTP listeners return the default value; they never have an SNI
615 /// to compare against, so the routing-layer check short-circuits on
616 /// `tls_server_name: None`.
617 fn get_strict_sni_binding(&self) -> bool {
618 true
619 }
620
621 /// Whether to strip any client-supplied `X-Real-IP` header from
622 /// forwarded requests (anti-spoofing).
623 ///
624 /// Defaults to `false` — preserves the historical pass-through
625 /// behaviour. Operators opt in via
626 /// `HttpListenerConfig::elide_x_real_ip = true` (and the equivalent on
627 /// HTTPS listeners). Independent of [`Self::get_send_x_real_ip`]: the
628 /// two flags can be combined freely (anti-spoof only, send only, both,
629 /// or neither). The elision branch lives in
630 /// `HttpContext::on_request_headers`, so it covers H1 and H2 alike.
631 fn get_elide_x_real_ip(&self) -> bool {
632 false
633 }
634
635 /// Whether to append a proxy-generated `X-Real-IP` header carrying the
636 /// connection peer IP (post-PROXY-v2 unwrap, i.e. the original client
637 /// IP) to every forwarded request.
638 ///
639 /// Defaults to `false` — preserves the historical no-injection
640 /// behaviour. Operators opt in via
641 /// `HttpListenerConfig::send_x_real_ip = true` (and the equivalent on
642 /// HTTPS listeners). Independent of [`Self::get_elide_x_real_ip`]: the
643 /// two flags can be combined freely. The injection branch lives next
644 /// to the existing X-Forwarded-For / Forwarded synthesis in
645 /// `HttpContext::on_request_headers`.
646 fn get_send_x_real_ip(&self) -> bool {
647 false
648 }
649
650 /// Per-stream idle timeout for H2 connections. An open stream that makes
651 /// no forward progress for this duration is cancelled (RST_STREAM / CANCEL).
652 /// Mitigates slow-multiplex Slowloris where a client keeps connection-level
653 /// activity high (resetting the connection idle timer on every frame) while
654 /// pinning streams for the full nominal connection timeout.
655 ///
656 /// Listeners inherit `max(30s, back_timeout)` when `h2_stream_idle_timeout_seconds`
657 /// is absent so operators who raised the socket-level backend budget do not
658 /// have to duplicate the value here; the 30 s floor preserves the baseline
659 /// slow-multiplex mitigation when `back_timeout` is shorter. Set the knob
660 /// explicitly to cap the per-stream deadline below `back_timeout` (useful
661 /// when under a slow-multiplex attack).
662 fn get_h2_stream_idle_timeout(&self) -> std::time::Duration {
663 std::time::Duration::from_secs(30)
664 }
665
666 /// Wall-clock budget granted to in-flight H2 streams after soft-stop sent
667 /// the initial `GOAWAY(NO_ERROR)`. Once the deadline elapses the mux
668 /// transitions to a forced close (final GOAWAY + session teardown).
669 ///
670 /// Returning `None` disables the forced close entirely — shutdown waits
671 /// for every stream to drain naturally. Returning `Some(d)` enforces the
672 /// budget. Default: `Some(Duration::from_secs(5))` (matches the historic
673 /// hard-coded 5 s deadline). Listeners expose the
674 /// `h2_graceful_shutdown_deadline_seconds` knob; value `0` maps to `None`.
675 fn get_h2_graceful_shutdown_deadline(&self) -> Option<std::time::Duration> {
676 Some(std::time::Duration::from_secs(5))
677 }
678}
679
680#[derive(Clone, Copy, Debug, PartialEq, Eq)]
681pub enum BackendConnectionStatus {
682 NotConnected,
683 Connecting(Instant),
684 Connected,
685}
686
687impl BackendConnectionStatus {
688 pub fn is_connecting(&self) -> bool {
689 matches!(self, BackendConnectionStatus::Connecting(_))
690 }
691}
692
693#[derive(Debug, PartialEq, Eq)]
694pub enum BackendConnectAction {
695 New,
696 Reuse,
697 Replace,
698}
699
700#[derive(thiserror::Error, Debug)]
701pub enum BackendConnectionError {
702 #[error("Not found: {0:?}")]
703 NotFound(ObjectKind),
704 #[error("Too many connections on cluster {0:?}")]
705 MaxConnectionRetries(Option<String>),
706 #[error("the sessions slab has reached maximum capacity")]
707 MaxSessionsMemory,
708 #[error("error from the backend: {0}")]
709 Backend(BackendError),
710 #[error("failed to retrieve the cluster: {0}")]
711 RetrieveClusterError(RetrieveClusterError),
712 #[error("maximum number of buffers reached")]
713 MaxBuffers,
714 /// Per-(cluster, source-IP) connection limit reached. The protocol
715 /// layer translates this into HTTP 429 Too Many Requests (with an
716 /// optional `Retry-After`) for HTTP/HTTPS sessions, or a graceful TCP
717 /// close for raw TCP. The `cluster_id` is included so log/metric
718 /// pipelines can attribute the rejection.
719 #[error("per-(cluster, source-IP) connection limit reached for cluster {cluster_id:?}")]
720 TooManyConnectionsPerIp { cluster_id: String },
721}
722
723/// used in kawa_h1 module for the Http session state
724#[derive(thiserror::Error, Debug)]
725pub enum RetrieveClusterError {
726 #[error("No method given")]
727 NoMethod,
728 #[error("No host given")]
729 NoHost,
730 #[error("No path given")]
731 NoPath,
732 #[error("unauthorized route")]
733 UnauthorizedRoute,
734 #[error("{0}")]
735 RetrieveFrontend(FrontendFromRequestError),
736 #[error("HTTPS redirect required")]
737 HttpsRedirect,
738 /// The HTTP `:authority` / `Host` host does not match the TLS SNI that was
739 /// negotiated for this connection, which would cross the TLS trust boundary.
740 /// Maps to HTTP 421 Misdirected Request (RFC 9110 §15.5.20).
741 #[error("TLS SNI {sni:?} does not match HTTP authority {authority:?}")]
742 SniAuthorityMismatch { sni: String, authority: String },
743}
744
745/// Used in sessions
746#[derive(Debug, PartialEq, Eq)]
747pub enum AcceptError {
748 IoError,
749 TooManySessions,
750 WouldBlock,
751 RegisterError,
752 WrongSocketAddress,
753 BufferCapacityReached,
754}
755
756/// returned by the HTTP, HTTPS and TCP listeners
757#[derive(thiserror::Error, Debug)]
758pub enum ListenerError {
759 #[error("failed to handle certificate request, got a resolver error, {0}")]
760 Resolver(CertificateResolverError),
761 #[error("failed to parse pem, {0}")]
762 PemParse(String),
763 #[error("failed to parse template {0:?}: {1}")]
764 TemplateParse(String, TemplateError),
765 #[error("failed to build rustls context, {0}")]
766 BuildRustls(String),
767 #[error("could not activate listener with address {address:?}: {error}")]
768 Activation { address: SocketAddr, error: String },
769 #[error("Could not register listener socket: {0}")]
770 SocketRegistration(std::io::Error),
771 #[error("could not add frontend: {0}")]
772 AddFrontend(RouterError),
773 #[error("could not remove frontend: {0}")]
774 RemoveFrontend(RouterError),
775 #[error("invalid value for field '{field}': {reason}")]
776 InvalidValue {
777 field: &'static str,
778 reason: &'static str,
779 },
780 /// `UpdateHttpsListenerConfig.hsts` was present but its `enabled`
781 /// field was unset. Per the partial-update contract, `enabled` is
782 /// the explicit-disambiguator between "explicit disable" (false) and
783 /// "explicit enable" (true); the patch handler refuses an
784 /// `enabled = None` block rather than silently picking one.
785 #[error(
786 "UpdateHttpsListenerConfig.hsts is present but `enabled` is unset; the partial-update \
787 contract requires `enabled` whenever the `hsts` block is present"
788 )]
789 HstsEnabledRequired,
790}
791
792/// Lift control-plane validation errors into listener-level errors so the
793/// worker can surface the same message without duplicating the match.
794/// Non-`InvalidValue` variants fall back to a generic `InvalidValue` — they
795/// are not expected on the worker's `update_config` path (state lookups
796/// happen on the master) but we avoid panicking if one slips through.
797impl From<sozu_command::state::StateError> for ListenerError {
798 fn from(err: sozu_command::state::StateError) -> Self {
799 match err {
800 sozu_command::state::StateError::InvalidValue { field, reason } => {
801 ListenerError::InvalidValue { field, reason }
802 }
803 _ => ListenerError::InvalidValue {
804 field: "state",
805 reason: "unexpected state error on worker path",
806 },
807 }
808 }
809}
810
811/// Returned by the HTTP, HTTPS and TCP proxies
812#[derive(thiserror::Error, Debug)]
813pub enum ProxyError {
814 #[error("error while soft stopping {proxy_protocol} proxy: {error}")]
815 SoftStop {
816 proxy_protocol: String,
817 error: String,
818 },
819 #[error("error while hard stopping {proxy_protocol} proxy: {error}")]
820 HardStop {
821 proxy_protocol: String,
822 error: String,
823 },
824 #[error("found no listener with address {0:?}")]
825 NoListenerFound(SocketAddr),
826 #[error("a listener is already present for this token")]
827 ListenerAlreadyPresent,
828 #[error("could not add listener: {0}")]
829 AddListener(ListenerError),
830 #[error("could not add cluster: {0}")]
831 AddCluster(ListenerError),
832 #[error("failed to activate listener with address {address:?}: {listener_error}")]
833 ListenerActivation {
834 address: SocketAddr,
835 listener_error: ListenerError,
836 },
837 #[error("can not add frontend {front:?}: {error}")]
838 WrongInputFrontend {
839 front: Box<RequestHttpFrontend>,
840 error: String,
841 },
842 #[error("could not add frontend: {0}")]
843 AddFrontend(ListenerError),
844 #[error("could not remove frontend: {0}")]
845 RemoveFrontend(ListenerError),
846 #[error("could not add certificate: {0}")]
847 AddCertificate(CertificateResolverError),
848 #[error("could not remove certificate: {0}")]
849 RemoveCertificate(CertificateResolverError),
850 #[error("could not replace certificate: {0}")]
851 ReplaceCertificate(CertificateResolverError),
852 #[error("wrong certificate fingerprint: {0}")]
853 WrongCertificateFingerprint(FromHexError),
854 #[error("this request is not supported by the proxy")]
855 UnsupportedMessage,
856 #[error("failed to acquire the lock, {0}")]
857 Lock(String),
858 #[error("could not bind to socket {0:?}: {1}")]
859 BindToSocket(SocketAddr, ServerBindError),
860 #[error("error registering socket of listener: {0}")]
861 RegisterListener(std::io::Error),
862 #[error("the listener is not activated")]
863 UnactivatedListener,
864 /// HSTS (RFC 6797) was attached to a frontend on a plain-HTTP
865 /// listener. RFC 6797 §7.2 forbids `Strict-Transport-Security` on
866 /// plaintext-HTTP responses; the worker rejects the request rather
867 /// than ship a non-conformant policy. The TOML loader rejects the
868 /// same shape at config-load time
869 /// (`command/src/config.rs::ConfigError::HstsOnPlainHttp`); this
870 /// arm catches the same misconfiguration when the request reaches
871 /// the worker over the IPC channel without going through the TOML
872 /// path (e.g. via `sozu frontend http add`).
873 #[error(
874 "HSTS is only valid on HTTPS frontends; rejecting AddHttpFrontend with hsts.enabled = \
875 true on address {0:?} (RFC 6797 §7.2)"
876 )]
877 HstsOnPlainHttp(SocketAddr),
878}
879
880use self::server::ListenToken;
881pub trait ProxyConfiguration {
882 fn notify(&mut self, message: WorkerRequest) -> WorkerResponse;
883 fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError>;
884 fn create_session(
885 &mut self,
886 socket: TcpStream,
887 token: ListenToken,
888 wait_time: Duration,
889 proxy: Rc<RefCell<Self>>,
890 // should we insert the tags here?
891 ) -> Result<(), AcceptError>;
892}
893
894pub trait L7Proxy {
895 fn kind(&self) -> ListenerType;
896
897 fn register_socket(
898 &self,
899 socket: &mut TcpStream,
900 token: Token,
901 interest: Interest,
902 ) -> Result<(), std::io::Error>;
903
904 fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error>;
905
906 fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token;
907
908 /// Remove the session from the session manager slab.
909 /// Returns true if the session was actually there before deletion
910 fn remove_session(&self, token: Token) -> bool;
911
912 fn backends(&self) -> Rc<RefCell<BackendMap>>;
913
914 fn clusters(&self) -> &HashMap<ClusterId, Cluster>;
915
916 /// Access the worker's [`SessionManager`] for per-(cluster, source-IP)
917 /// connection-limit accounting. The mux uses this to track / untrack
918 /// stream-granular `(cluster_id, ip)` entries and consult the
919 /// `cluster_ip_at_limit` gate before each backend connect.
920 fn sessions(&self) -> Rc<RefCell<crate::server::SessionManager>>;
921}
922
923#[derive(Debug, PartialEq, Eq)]
924pub enum RequiredEvents {
925 FrontReadBackNone,
926 FrontWriteBackNone,
927 FrontReadWriteBackNone,
928 FrontNoneBackNone,
929 FrontReadBackRead,
930 FrontWriteBackRead,
931 FrontReadWriteBackRead,
932 FrontNoneBackRead,
933 FrontReadBackWrite,
934 FrontWriteBackWrite,
935 FrontReadWriteBackWrite,
936 FrontNoneBackWrite,
937 FrontReadBackReadWrite,
938 FrontWriteBackReadWrite,
939 FrontReadWriteBackReadWrite,
940 FrontNoneBackReadWrite,
941}
942
943impl RequiredEvents {
944 pub fn front_readable(&self) -> bool {
945 matches!(
946 *self,
947 RequiredEvents::FrontReadBackNone
948 | RequiredEvents::FrontReadWriteBackNone
949 | RequiredEvents::FrontReadBackRead
950 | RequiredEvents::FrontReadWriteBackRead
951 | RequiredEvents::FrontReadBackWrite
952 | RequiredEvents::FrontReadWriteBackWrite
953 | RequiredEvents::FrontReadBackReadWrite
954 | RequiredEvents::FrontReadWriteBackReadWrite
955 )
956 }
957
958 pub fn front_writable(&self) -> bool {
959 matches!(
960 *self,
961 RequiredEvents::FrontWriteBackNone
962 | RequiredEvents::FrontReadWriteBackNone
963 | RequiredEvents::FrontWriteBackRead
964 | RequiredEvents::FrontReadWriteBackRead
965 | RequiredEvents::FrontWriteBackWrite
966 | RequiredEvents::FrontReadWriteBackWrite
967 | RequiredEvents::FrontWriteBackReadWrite
968 | RequiredEvents::FrontReadWriteBackReadWrite
969 )
970 }
971
972 pub fn back_readable(&self) -> bool {
973 matches!(
974 *self,
975 RequiredEvents::FrontReadBackRead
976 | RequiredEvents::FrontWriteBackRead
977 | RequiredEvents::FrontReadWriteBackRead
978 | RequiredEvents::FrontNoneBackRead
979 | RequiredEvents::FrontReadBackReadWrite
980 | RequiredEvents::FrontWriteBackReadWrite
981 | RequiredEvents::FrontReadWriteBackReadWrite
982 | RequiredEvents::FrontNoneBackReadWrite
983 )
984 }
985
986 pub fn back_writable(&self) -> bool {
987 matches!(
988 *self,
989 RequiredEvents::FrontReadBackWrite
990 | RequiredEvents::FrontWriteBackWrite
991 | RequiredEvents::FrontReadWriteBackWrite
992 | RequiredEvents::FrontNoneBackWrite
993 | RequiredEvents::FrontReadBackReadWrite
994 | RequiredEvents::FrontWriteBackReadWrite
995 | RequiredEvents::FrontReadWriteBackReadWrite
996 | RequiredEvents::FrontNoneBackReadWrite
997 )
998 }
999}
1000
1001/// Signals transitions between states of a given Protocol
1002#[derive(Debug, PartialEq, Eq)]
1003pub enum StateResult {
1004 /// Signals to the Protocol to close its backend
1005 CloseBackend,
1006 /// Signals to the parent Session to close itself
1007 CloseSession,
1008 /// Signals to the Protocol to connect to backend
1009 ConnectBackend,
1010 /// Signals to the Protocol to continue
1011 Continue,
1012 /// Signals to the parent Session to upgrade to the next Protocol
1013 Upgrade,
1014}
1015
1016/// Signals transitions between states of a given Session
1017#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1018pub enum SessionResult {
1019 /// Signals to the Session to close itself
1020 Close,
1021 /// Signals to the Session to continue
1022 Continue,
1023 /// Signals to the Session to upgrade its Protocol
1024 Upgrade,
1025}
1026
1027#[derive(Debug, PartialEq, Eq)]
1028pub enum SocketType {
1029 Listener,
1030 FrontClient,
1031}
1032
1033type SessionIsToBeClosed = bool;
1034
1035#[derive(Clone)]
1036pub struct Readiness {
1037 /// the current readiness
1038 pub event: Ready,
1039 /// the readiness we wish to attain
1040 pub interest: Ready,
1041}
1042
1043impl Display for Readiness {
1044 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1045 let i = &mut [b'-'; 4];
1046 let r = &mut [b'-'; 4];
1047 let mixed = &mut [b'-'; 4];
1048
1049 display_ready(i, self.interest);
1050 display_ready(r, self.event);
1051 display_ready(mixed, self.interest & self.event);
1052
1053 write!(
1054 f,
1055 "I({:?})&R({:?})=M({:?})",
1056 String::from_utf8_lossy(i),
1057 String::from_utf8_lossy(r),
1058 String::from_utf8_lossy(mixed)
1059 )
1060 }
1061}
1062
1063impl Default for Readiness {
1064 fn default() -> Self {
1065 Self::new()
1066 }
1067}
1068
1069impl Readiness {
1070 /// Mask of every bit `Ready` defines (READABLE | WRITABLE | ERROR | HUP).
1071 /// Any bit outside this set in `event` or `interest` is a corrupted
1072 /// readiness word — checked by [`Self::check_invariants`]. Not
1073 /// `#[cfg(debug_assertions)]`-gated: it is read from inside `debug_assert!`s
1074 /// whose arguments must still compile in release (HARD RULE 2 / E0425).
1075 const KNOWN_BITS: Ready =
1076 Ready(Ready::READABLE.0 | Ready::WRITABLE.0 | Ready::ERROR.0 | Ready::HUP.0);
1077
1078 pub const fn new() -> Readiness {
1079 Readiness {
1080 event: Ready::EMPTY,
1081 interest: Ready::EMPTY,
1082 }
1083 }
1084
1085 /// Cross-field invariant sweep: neither `event` nor `interest` may carry a
1086 /// bit `Ready` does not define. A stray bit would silently widen
1087 /// `filter_interest`'s mask and wake (or starve) a session on a phantom
1088 /// readiness. Cheap enough to call as a postcondition from every mutator.
1089 #[cfg(debug_assertions)]
1090 fn check_invariants(&self) {
1091 debug_assert_eq!(
1092 self.event & Self::KNOWN_BITS,
1093 self.event,
1094 "Readiness.event carries a bit outside READABLE|WRITABLE|ERROR|HUP"
1095 );
1096 debug_assert_eq!(
1097 self.interest & Self::KNOWN_BITS,
1098 self.interest,
1099 "Readiness.interest carries a bit outside READABLE|WRITABLE|ERROR|HUP"
1100 );
1101 }
1102
1103 pub fn reset(&mut self) {
1104 self.event = Ready::EMPTY;
1105 self.interest = Ready::EMPTY;
1106 // Post-condition: a reset clears *both* words — a half-reset leaves a
1107 // session armed on stale interest after teardown.
1108 debug_assert!(
1109 self.event.is_empty() && self.interest.is_empty(),
1110 "reset must clear both event and interest"
1111 );
1112 #[cfg(debug_assertions)]
1113 self.check_invariants();
1114 }
1115
1116 /// filters the readiness we actually want
1117 pub fn filter_interest(&self) -> Ready {
1118 // Pre-condition: both source words must be well-formed before we mask —
1119 // a stray bit upstream would leak through the intersection and wake (or
1120 // starve) a session on a phantom readiness.
1121 #[cfg(debug_assertions)]
1122 self.check_invariants();
1123 let filtered = self.event & self.interest;
1124 // Post-condition: the result is a subset of the recognized bits and can
1125 // only contain bits the session both saw AND asked for.
1126 debug_assert_eq!(
1127 filtered & Self::KNOWN_BITS,
1128 filtered,
1129 "filter_interest must not yield an unknown bit"
1130 );
1131 debug_assert!(
1132 self.interest.contains(filtered) && self.event.contains(filtered),
1133 "filtered readiness must be present in both interest and event"
1134 );
1135 filtered
1136 }
1137
1138 /// Signal that the socket has buffered data to write (e.g., TLS internal
1139 /// buffers) that won't generate a new epoll WRITABLE event.
1140 pub fn signal_pending_write(&mut self) {
1141 // Snapshot the unrelated bits (everything but WRITABLE) so we can prove
1142 // we flipped exactly the WRITABLE bit. `Ready` exposes no `!`, so mask on
1143 // the public `.0` word.
1144 let other_event_before = Ready(self.event.0 & !Ready::WRITABLE.0);
1145 self.event.insert(Ready::WRITABLE);
1146 debug_assert!(
1147 self.event.is_writable(),
1148 "signal_pending_write must set the WRITABLE event bit"
1149 );
1150 debug_assert_eq!(
1151 Ready(self.event.0 & !Ready::WRITABLE.0),
1152 other_event_before,
1153 "signal_pending_write must touch only the WRITABLE bit"
1154 );
1155 #[cfg(debug_assertions)]
1156 self.check_invariants();
1157 }
1158
1159 /// Signal that the socket has buffered data to read (e.g., TLS plaintext
1160 /// buffer after a 1xx clear) that won't generate a new epoll READABLE event.
1161 pub fn signal_pending_read(&mut self) {
1162 let other_event_before = Ready(self.event.0 & !Ready::READABLE.0);
1163 self.event.insert(Ready::READABLE);
1164 debug_assert!(
1165 self.event.is_readable(),
1166 "signal_pending_read must set the READABLE event bit"
1167 );
1168 debug_assert_eq!(
1169 Ready(self.event.0 & !Ready::READABLE.0),
1170 other_event_before,
1171 "signal_pending_read must touch only the READABLE bit"
1172 );
1173 #[cfg(debug_assertions)]
1174 self.check_invariants();
1175 }
1176
1177 /// Pair `Ready::WRITABLE` insert with `signal_pending_write` — the canonical
1178 /// invariant-15 form for any path that writes bytes to sozu-owned buffers
1179 /// under edge-triggered epoll. See `lib/src/protocol/mux/LIFECYCLE.md`.
1180 #[inline]
1181 pub fn arm_writable(&mut self) {
1182 // Snapshot the non-WRITABLE bits of both words: arm_writable must set the
1183 // WRITABLE bit in *both* interest and event and leave everything else as-is.
1184 let other_interest_before = Ready(self.interest.0 & !Ready::WRITABLE.0);
1185 let other_event_before = Ready(self.event.0 & !Ready::WRITABLE.0);
1186 self.interest.insert(Ready::WRITABLE);
1187 self.signal_pending_write();
1188 debug_assert!(
1189 self.interest.is_writable() && self.event.is_writable(),
1190 "arm_writable must set WRITABLE in both interest and event"
1191 );
1192 debug_assert_eq!(
1193 Ready(self.interest.0 & !Ready::WRITABLE.0),
1194 other_interest_before,
1195 "arm_writable must touch only the WRITABLE interest bit"
1196 );
1197 debug_assert_eq!(
1198 Ready(self.event.0 & !Ready::WRITABLE.0),
1199 other_event_before,
1200 "arm_writable must touch only the WRITABLE event bit"
1201 );
1202 #[cfg(debug_assertions)]
1203 self.check_invariants();
1204 }
1205}
1206
1207#[cfg(test)]
1208mod readiness_tests {
1209 use super::{Readiness, Ready};
1210
1211 #[test]
1212 fn arm_writable_sets_interest_and_event() {
1213 let mut r = Readiness::new();
1214 r.arm_writable();
1215 assert!(r.interest.is_writable());
1216 assert!(r.event.is_writable());
1217 }
1218
1219 #[test]
1220 fn arm_writable_is_idempotent() {
1221 let mut r = Readiness::new();
1222 r.arm_writable();
1223 r.arm_writable();
1224 assert_eq!(r.interest, Ready::WRITABLE);
1225 assert_eq!(r.event, Ready::WRITABLE);
1226 }
1227}
1228
1229pub fn display_ready(s: &mut [u8], readiness: Ready) {
1230 if readiness.is_readable() {
1231 s[0] = b'R';
1232 }
1233 if readiness.is_writable() {
1234 s[1] = b'W';
1235 }
1236 if readiness.is_error() {
1237 s[2] = b'E';
1238 }
1239 if readiness.is_hup() {
1240 s[3] = b'H';
1241 }
1242}
1243
1244pub fn ready_to_string(readiness: Ready) -> String {
1245 let s = &mut [b'-'; 4];
1246 display_ready(s, readiness);
1247 String::from_utf8(s.to_vec()).unwrap()
1248}
1249
1250impl fmt::Debug for Readiness {
1251 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1252 let i = &mut [b'-'; 4];
1253 let r = &mut [b'-'; 4];
1254 let mixed = &mut [b'-'; 4];
1255
1256 display_ready(i, self.interest);
1257 display_ready(r, self.event);
1258 display_ready(mixed, self.interest & self.event);
1259
1260 write!(
1261 f,
1262 "Readiness {{ interest: {}, readiness: {}, mixed: {} }}",
1263 str::from_utf8(i).unwrap(),
1264 str::from_utf8(r).unwrap(),
1265 str::from_utf8(mixed).unwrap()
1266 )
1267 }
1268}
1269
1270#[derive(Clone, Debug)]
1271pub struct SessionMetrics {
1272 /// date at which we started handling that request
1273 pub start: Option<Instant>,
1274 /// wall-clock timestamp captured alongside `start`, for access-log
1275 /// consumers that need an absolute start time (e.g. OTel span
1276 /// reconstruction) without subtracting a monotonic duration from a
1277 /// wall-clock end time — which mixes two unsynchronised clock sources.
1278 pub start_wall: Option<SystemTime>,
1279 /// time actually spent handling the request
1280 pub service_time: Duration,
1281 /// time spent waiting for its turn
1282 pub wait_time: Duration,
1283 /// bytes received by the frontend
1284 pub bin: usize,
1285 /// bytes sent by the frontend
1286 pub bout: usize,
1287
1288 /// date at which we started working on the request
1289 pub service_start: Option<Instant>,
1290 pub wait_start: Instant,
1291
1292 pub backend_id: Option<String>,
1293 pub backend_start: Option<Instant>,
1294 pub backend_connected: Option<Instant>,
1295 pub backend_stop: Option<Instant>,
1296 pub backend_bin: usize,
1297 pub backend_bout: usize,
1298}
1299
1300impl SessionMetrics {
1301 pub fn new(wait_time: Option<Duration>) -> SessionMetrics {
1302 SessionMetrics {
1303 start: Some(Instant::now()),
1304 start_wall: Some(SystemTime::now()),
1305 service_time: Duration::from_secs(0),
1306 wait_time: wait_time.unwrap_or_else(|| Duration::from_secs(0)),
1307 bin: 0,
1308 bout: 0,
1309 service_start: None,
1310 wait_start: Instant::now(),
1311 backend_id: None,
1312 backend_start: None,
1313 backend_connected: None,
1314 backend_stop: None,
1315 backend_bin: 0,
1316 backend_bout: 0,
1317 }
1318 }
1319
1320 pub fn reset(&mut self) {
1321 self.start = None;
1322 self.start_wall = None;
1323 self.service_time = Duration::from_secs(0);
1324 self.wait_time = Duration::from_secs(0);
1325 self.bin = 0;
1326 self.bout = 0;
1327 self.service_start = None;
1328 self.backend_start = None;
1329 self.backend_connected = None;
1330 self.backend_stop = None;
1331 self.backend_bin = 0;
1332 self.backend_bout = 0;
1333 }
1334
1335 pub fn service_start(&mut self) {
1336 let now = if self.start.is_none() {
1337 self.mark_request_start()
1338 } else {
1339 Instant::now()
1340 };
1341 self.service_start = Some(now);
1342 self.wait_time += now - self.wait_start;
1343 }
1344
1345 pub fn service_stop(&mut self) {
1346 if let Some(start) = self.service_start.take() {
1347 let duration = Instant::now() - start;
1348 self.service_time += duration;
1349 }
1350 }
1351
1352 pub fn wait_start(&mut self) {
1353 self.wait_start = Instant::now();
1354 }
1355
1356 pub fn service_time(&self) -> Duration {
1357 match self.service_start {
1358 Some(start) => {
1359 let last_duration = Instant::now() - start;
1360 self.service_time + last_duration
1361 }
1362 None => self.service_time,
1363 }
1364 }
1365
1366 /// Arm both the monotonic and wall-clock start timestamps together.
1367 /// This must be the single place that sets `start` + `start_wall` outside
1368 /// of `new()`, so the two fields can never desynchronize.
1369 /// Returns the monotonic instant so callers that need it (e.g.
1370 /// `service_start`) can reuse it without a second syscall.
1371 pub fn mark_request_start(&mut self) -> Instant {
1372 let now = Instant::now();
1373 self.start = Some(now);
1374 self.start_wall = Some(SystemTime::now());
1375 now
1376 }
1377
1378 /// time elapsed since the beginning of the session
1379 pub fn request_time(&self) -> Duration {
1380 match self.start {
1381 Some(start) => Instant::now() - start,
1382 None => Duration::from_secs(0),
1383 }
1384 }
1385
1386 /// Wall-clock start time as nanoseconds since the Unix epoch, or `None`
1387 /// if the monotonic start has not been set yet (post-`reset()`, pre-`service_start()`).
1388 pub fn start_wall_ns(&self) -> Option<i128> {
1389 self.start_wall.and_then(|t| {
1390 t.duration_since(SystemTime::UNIX_EPOCH)
1391 .ok()
1392 .map(|d| d.as_nanos() as i128)
1393 })
1394 }
1395
1396 pub fn backend_start(&mut self) {
1397 self.backend_start = Some(Instant::now());
1398 }
1399
1400 pub fn backend_connected(&mut self) {
1401 self.backend_connected = Some(Instant::now());
1402 }
1403
1404 pub fn backend_stop(&mut self) {
1405 self.backend_stop = Some(Instant::now());
1406 }
1407
1408 pub fn backend_response_time(&self) -> Option<Duration> {
1409 match (self.backend_connected, self.backend_stop) {
1410 (Some(start), Some(end)) => Some(end - start),
1411 (Some(start), None) => Some(Instant::now() - start),
1412 _ => None,
1413 }
1414 }
1415
1416 pub fn backend_connection_time(&self) -> Option<Duration> {
1417 match (self.backend_start, self.backend_connected) {
1418 (Some(start), Some(end)) => Some(end - start),
1419 _ => None,
1420 }
1421 }
1422
1423 pub fn register_end_of_session(&self, context: &LogContext) {
1424 let request_time = self.request_time();
1425 let service_time = self.service_time();
1426
1427 if let Some(cluster_id) = context.cluster_id {
1428 time!(
1429 names::event_loop::REQUEST_TIME,
1430 cluster_id,
1431 request_time.as_millis()
1432 );
1433 time!(
1434 names::event_loop::SERVICE_TIME,
1435 cluster_id,
1436 service_time.as_millis()
1437 );
1438 }
1439 time!(names::event_loop::REQUEST_TIME, request_time.as_millis());
1440 time!(names::event_loop::SERVICE_TIME, service_time.as_millis());
1441
1442 if let Some(backend_id) = self.backend_id.as_ref() {
1443 if let Some(backend_response_time) = self.backend_response_time() {
1444 record_backend_metrics!(
1445 context.cluster_id.as_str_or("-"),
1446 backend_id,
1447 backend_response_time.as_millis(),
1448 self.backend_connection_time(),
1449 self.backend_bin,
1450 self.backend_bout
1451 );
1452 }
1453 }
1454
1455 incr!(
1456 names::access_logs::COUNT,
1457 context.cluster_id,
1458 context.backend_id
1459 );
1460 }
1461}
1462
1463/// exponentially weighted moving average with high sensibility to latency bursts
1464///
1465/// cf Finagle for the original implementation: <https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala>
1466#[derive(Debug, PartialEq, Clone)]
1467pub struct PeakEWMA {
1468 /// decay in nanoseconds
1469 ///
1470 /// higher values will make the EWMA decay slowly to 0
1471 pub decay: f64,
1472 /// estimated RTT in nanoseconds
1473 ///
1474 /// must be set to a high enough default value so that new backends do not
1475 /// get all the traffic right away
1476 pub rtt: f64,
1477 /// last modification
1478 pub last_event: Instant,
1479}
1480
1481impl Default for PeakEWMA {
1482 fn default() -> Self {
1483 Self::new()
1484 }
1485}
1486
1487impl PeakEWMA {
1488 // hardcoded default values for now
1489 pub fn new() -> Self {
1490 PeakEWMA {
1491 // 1s
1492 decay: 1_000_000_000f64,
1493 // 50ms
1494 rtt: 50_000_000f64,
1495 last_event: Instant::now(),
1496 }
1497 }
1498
1499 pub fn observe(&mut self, rtt: f64) {
1500 let now = Instant::now();
1501 let dur = now - self.last_event;
1502
1503 // if latency is rising, we will immediately raise the cost
1504 if rtt > self.rtt {
1505 self.rtt = rtt;
1506 } else {
1507 // new_rtt = old_rtt * e^(-elapsed/decay) + observed_rtt * (1 - e^(-elapsed/decay))
1508 let weight = (-(dur.as_nanos() as f64) / self.decay).exp();
1509 self.rtt = self.rtt * weight + rtt * (1.0 - weight);
1510 }
1511
1512 self.last_event = now;
1513 }
1514
1515 pub fn get(&mut self, active_requests: usize) -> f64 {
1516 // decay the current value
1517 // (we might not have seen a request in a long time)
1518 self.observe(0.0);
1519
1520 (active_requests + 1) as f64 * self.rtt
1521 }
1522}
1523
1524pub mod testing {
1525 pub use std::{cell::RefCell, os::fd::IntoRawFd, rc::Rc};
1526
1527 pub use anyhow::Context;
1528 pub use mio::{Poll, Registry, Token, net::UnixStream};
1529 pub use slab::Slab;
1530 pub use sozu_command::{
1531 proto::command::{
1532 HttpListenerConfig, HttpsListenerConfig, ServerConfig, TcpListenerConfig,
1533 },
1534 scm_socket::{Listeners, ScmSocket},
1535 };
1536
1537 pub use crate::{
1538 Protocol, ProxySession,
1539 backends::BackendMap,
1540 http::HttpProxy,
1541 https::HttpsProxy,
1542 pool::Pool,
1543 server::{ListenSession, ProxyChannel, Server, SessionManager},
1544 tcp::TcpProxy,
1545 };
1546
1547 use std::sync::atomic::{AtomicU16, Ordering};
1548
1549 /// Port counter for sozu listener addresses in lib tests.
1550 /// Starts at 10000 to avoid collision with:
1551 /// - Privileged ports (<1024)
1552 /// - e2e suite (starts at 2000)
1553 /// - Ephemeral port range (typically 32768+)
1554 static PORT_PROVIDER: AtomicU16 = AtomicU16::new(10000);
1555
1556 /// Get a unique port for a sozu listener address.
1557 /// Each call returns a different port, safe for parallel test execution.
1558 pub fn provide_port() -> u16 {
1559 PORT_PROVIDER.fetch_add(1, Ordering::SeqCst)
1560 }
1561
1562 /// Everything needed to create a Server
1563 pub struct ServerParts {
1564 pub event_loop: Poll,
1565 pub registry: Registry,
1566 pub sessions: Rc<RefCell<SessionManager>>,
1567 pub pool: Rc<RefCell<Pool>>,
1568 pub backends: Rc<RefCell<BackendMap>>,
1569 pub client_scm_socket: ScmSocket,
1570 pub server_scm_socket: ScmSocket,
1571 pub server_config: ServerConfig,
1572 }
1573
1574 /// Setup a standalone server, for testing purposes
1575 pub fn prebuild_server(
1576 max_buffers: usize,
1577 buffer_size: usize,
1578 send_scm: bool,
1579 ) -> anyhow::Result<ServerParts> {
1580 let event_loop = Poll::new().with_context(|| "Failed at creating event loop")?;
1581 let backends = Rc::new(RefCell::new(BackendMap::new()));
1582 let server_config = ServerConfig {
1583 max_connections: max_buffers as u64,
1584 ..Default::default()
1585 };
1586
1587 let pool = Rc::new(RefCell::new(Pool::with_capacity(
1588 1,
1589 max_buffers,
1590 buffer_size,
1591 )));
1592
1593 let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
1594 {
1595 let entry = sessions.vacant_entry();
1596 info!("taking token {:?} for channel", entry.key());
1597 entry.insert(Rc::new(RefCell::new(ListenSession {
1598 protocol: Protocol::Channel,
1599 })));
1600 }
1601 {
1602 let entry = sessions.vacant_entry();
1603 info!("taking token {:?} for timer", entry.key());
1604 entry.insert(Rc::new(RefCell::new(ListenSession {
1605 protocol: Protocol::Timer,
1606 })));
1607 }
1608 {
1609 let entry = sessions.vacant_entry();
1610 info!("taking token {:?} for metrics", entry.key());
1611 entry.insert(Rc::new(RefCell::new(ListenSession {
1612 protocol: Protocol::Metrics,
1613 })));
1614 }
1615 // Test fixture: feature disabled (max_connections_per_ip = 0).
1616 let sessions = SessionManager::new(sessions, max_buffers, 0, 0);
1617
1618 let registry = event_loop
1619 .registry()
1620 .try_clone()
1621 .with_context(|| "Failed at creating a registry")?;
1622
1623 let (scm_server, scm_client) =
1624 UnixStream::pair().with_context(|| "Failed at creating scm unix stream")?;
1625 let client_scm_socket = ScmSocket::new(scm_client.into_raw_fd())
1626 .with_context(|| "Failed at creating the scm client socket")?;
1627 let server_scm_socket = ScmSocket::new(scm_server.into_raw_fd())
1628 .with_context(|| "Failed at creating the scm server socket")?;
1629 if send_scm {
1630 client_scm_socket
1631 .send_listeners(&Listeners::default())
1632 .with_context(|| "Failed at sending empty listeners")?;
1633 }
1634
1635 Ok(ServerParts {
1636 event_loop,
1637 registry,
1638 sessions,
1639 pool,
1640 backends,
1641 client_scm_socket,
1642 server_scm_socket,
1643 server_config,
1644 })
1645 }
1646}