sozu_lib/lib.rs
1//! ## What this library does
2//!
3//! This library provides tools to build and start HTTP, HTTPS and TCP reverse proxies.
4//!
5//! The proxies handles network polling, HTTP parsing, TLS in a fast single threaded event
6//! loop.
7//!
8//! Each proxy is designed to receive configuration changes at runtime instead of
9//! reloading from a file regularly. The event loop runs in its own thread
10//! and receives commands through a message queue.
11//!
12//! ## Difference with the crate `sozu`
13//!
14//! To create several workers and manage them all at once (which is the most common way to
15//! use Sōzu), the crate `sozu` is more indicated than using the lib directly.
16//!
17//! The crate `sozu` provides a binary called the main process.
18//! The main process uses `sozu_lib` to start and manage workers.
19//! Each worker can handle HTTP, HTTPS and TCP traffic.
20//! The main process receives synchronizes the state of all workers, using UNIX sockets
21//! and custom channels to communicate with them.
22//! The main process itself is is configurable with a file, and has a CLI.
23//!
24//! ## How to use this library directly
25//!
26//! This documentation here explains how to write a binary that will start a single Sōzu
27//! worker and give it orders. The method has two steps:
28//!
29//! 1. Starts a Sōzu worker in a distinct thread
30//! 2. sends instructions to the worker on a UNIX socket via a Sōzu channel
31//!
32//! ### How to start a Sōzu worker
33//!
34//! Before creating an HTTP proxy, we first need to create an HTTP listener.
35//! The listener is an abstraction around a TCP socket provided by the kernel.
36//! We need the `sozu_command_lib` to build a listener.
37//!
38//! ```
39//! use sozu_command_lib::{config::ListenerBuilder, proto::command::SocketAddress};
40//!
41//! let address = SocketAddress::new_v4(127,0,0,1,8080);
42//! let http_listener = ListenerBuilder::new_http(address)
43//! .to_http(None)
44//! .expect("Could not create HTTP listener");
45//! ```
46//!
47//! The `http_listener` is of the type `HttpListenerConfig`, that we can be sent to the worker
48//! to start the proxy.
49//!
50//! Then create a pair of channels to communicate with the proxy.
51//! The channel is a wrapper around a unix socket.
52//!
53//! ```ignore
54//! use sozu_command_lib::{
55//! channel::Channel,
56//! proto::command::{WorkerRequest, WorkerResponse},
57//! };
58//!
59//! let (mut command_channel, proxy_channel): (
60//! Channel<WorkerRequest, WorkerResponse>,
61//! Channel<WorkerResponse, WorkerRequest>,
62//! ) = Channel::generate(1000, 10000).expect("should create a channel");
63//!```
64//!
65//! Here, the `command_channel` end is blocking, it sends `WorkerRequest`s and receives
66//! `WorkerResponses`, while the `proxy_channel` end is non-blocking, and the types are reversed.
67//! Writing the types here isn't even necessary thanks to the compiler,
68//! but it brings the point accross.
69//!
70//! You can now launch the worker in a separate thread, providing the HTTP listener config,
71//! the proxy end of the channel, and your custom number of buffers and their size:
72//!
73//! ```ignore
74//! use std::thread;
75//!
76//! let worker_thread_join_handle = thread::spawn(move || {
77//! let max_buffers = 500;
78//! let buffer_size = 16384;
79//! sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size);
80//! });
81//! ```
82//!
83//! ### Send orders
84//!
85//! Once the thread is launched, the proxy worker will start its event loop and handle
86//! events on the listening interface and port specified when building the HTTP Listener.
87//! Since no frontends or backends were specified for the proxy, it will receive
88//! the connections, parse the requests, then send a default (but configurable)
89//! answer.
90//!
91//! Before defining a frontend and backends, we need to define a cluster, which describes
92//! a routing configuration. A cluster contains:
93//!
94//! - one frontend
95//! - one or several backends
96//! - routing rules
97//!
98//! A cluster is identified by its `cluster_id`, which will be used to define frontends
99//! and backends later on.
100//!
101//! ```
102//! use sozu_command_lib::proto::command::{Cluster, LoadBalancingAlgorithms};
103//!
104//! let cluster = Cluster {
105//! cluster_id: "my-cluster".to_string(),
106//! sticky_session: false,
107//! https_redirect: false,
108//! load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
109//! answer_503: Some("A custom forbidden message".to_string()),
110//! ..Default::default()
111//! };
112//! ```
113//!
114//! The defaults are sensible, so we could define only the `cluster_id`.
115//!
116//! We can now define a frontend. A frontend is a way to recognize a request and match
117//! it to a `cluster_id`, depending on the hostname and the beginning of the URL path.
118//! The `address` field must match the one of the HTTP listener we defined before:
119//!
120//! ```
121//! use std::collections::BTreeMap;
122//!
123//! use sozu_command_lib::proto::command::{PathRule, RequestHttpFrontend, RulePosition, SocketAddress};
124//!
125//! let http_front = RequestHttpFrontend {
126//! cluster_id: Some("my-cluster".to_string()),
127//! address: SocketAddress::new_v4(127,0,0,1,8080),
128//! hostname: "example.com".to_string(),
129//! path: PathRule::prefix(String::from("/")),
130//! position: RulePosition::Pre.into(),
131//! tags: BTreeMap::from([
132//! ("owner".to_owned(), "John".to_owned()),
133//! ("id".to_owned(), "my-own-http-front".to_owned()),
134//! ]),
135//! ..Default::default()
136//! };
137//! ```
138//!
139//! The `tags` are keys and values that will appear in the access logs,
140//! which can come in handy.
141//!
142//! Now let's define a backend.
143//! A backend is an instance of a backend application we want to route traffic to.
144//! The `address` field must match the IP and port of the backend server.
145//!
146//! ```
147//! use sozu_command_lib::proto::command::{AddBackend, LoadBalancingParams, SocketAddress};
148//!
149//! let http_backend = AddBackend {
150//! cluster_id: "my-cluster".to_string(),
151//! backend_id: "test-backend".to_string(),
152//! address: SocketAddress::new_v4(127,0,0,1,8000),
153//! load_balancing_parameters: Some(LoadBalancingParams::default()),
154//! ..Default::default()
155//! };
156//! ```
157//!
158//! A cluster can have multiple backend servers, and they can be added or
159//! removed while the proxy is running. If a backend is removed from the configuration
160//! while the proxy is handling a request to that server, it will finish that
161//! request and stop sending new traffic to that server.
162//!
163//!
164//! Now we can use the other end of the channel to send all these requests to the worker,
165//! using the WorkerRequest type:
166//!
167//! ```ignore
168//! use sozu_command_lib::{
169//! proto::command::{Request, request::RequestType, WorkerRequest},
170//! };
171//!
172//! command_channel
173//! .write_message(&WorkerRequest {
174//! id: String::from("add-the-cluster"),
175//! content: RequestType::AddCluster(cluster).into(),
176//! })
177//! .expect("Could not send AddHttpFrontend request");
178//!
179//! command_channel
180//! .write_message(&WorkerRequest {
181//! id: String::from("add-the-frontend"),
182//! content: RequestType::AddHttpFrontend(http_front).into(),
183//! })
184//! .expect("Could not send AddHttpFrontend request");
185//!
186//! command_channel
187//! .write_message(&WorkerRequest {
188//! id: String::from("add-the-backend"),
189//! content: RequestType::AddBackend(http_backend).into(),
190//! })
191//! .expect("Could not send AddBackend request");
192//!
193//! println!("HTTP -> {:?}", command_channel.read_message());
194//! println!("HTTP -> {:?}", command_channel.read_message());
195//! println!("HTTP -> {:?}", command_channel.read_message());
196//! ```
197//!
198//!
199//! The event loop of the worker will process these instructions and add them to
200//! its state, and the worker will send back an acknowledgement
201//! message.
202//!
203//! Now we can let the worker thread run in the background:
204//!
205//! ```ignore
206//! let _ = worker_thread_join_handle.join();
207//! ```
208//!
209//! Here is the complete example for reference, it matches the `examples/http.rs` example:
210//!
211//! ```
212//! #[macro_use]
213//! extern crate sozu_command_lib;
214//!
215//! use std::{collections::BTreeMap, env, io::stdout, thread};
216//!
217//! use anyhow::Context;
218//! use sozu_command_lib::{
219//! channel::Channel,
220//! config::ListenerBuilder,
221//! logging::setup_default_logging,
222//! proto::command::{
223//! request::RequestType, AddBackend, Cluster, LoadBalancingAlgorithms, LoadBalancingParams,
224//! PathRule, Request, RequestHttpFrontend, RulePosition, SocketAddress,WorkerRequest,
225//! },
226//! };
227//!
228//! fn main() -> anyhow::Result<()> {
229//! setup_default_logging(true, "info", "EXAMPLE").with_context(|| "could not setup logging")?;
230//!
231//! info!("starting up");
232//!
233//! let http_listener = ListenerBuilder::new_http(SocketAddress::new_v4(127,0,0,1,8080))
234//! .to_http(None)
235//! .expect("Could not create HTTP listener");
236//!
237//! let (mut command_channel, proxy_channel) =
238//! Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
239//!
240//! let worker_thread_join_handle = thread::spawn(move || {
241//! let max_buffers = 500;
242//! let buffer_size = 16384;
243//! sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size)
244//! .expect("The worker could not be started, or shut down");
245//! });
246//!
247//! let cluster = Cluster {
248//! cluster_id: "my-cluster".to_string(),
249//! sticky_session: false,
250//! https_redirect: false,
251//! load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
252//! answer_503: Some("A custom forbidden message".to_string()),
253//! ..Default::default()
254//! };
255//!
256//! let http_front = RequestHttpFrontend {
257//! cluster_id: Some("my-cluster".to_string()),
258//! address: SocketAddress::new_v4(127,0,0,1,8080),
259//! hostname: "example.com".to_string(),
260//! path: PathRule::prefix(String::from("/")),
261//! position: RulePosition::Pre.into(),
262//! tags: BTreeMap::from([
263//! ("owner".to_owned(), "John".to_owned()),
264//! ("id".to_owned(), "my-own-http-front".to_owned()),
265//! ]),
266//! ..Default::default()
267//! };
268//! let http_backend = AddBackend {
269//! cluster_id: "my-cluster".to_string(),
270//! backend_id: "test-backend".to_string(),
271//! address: SocketAddress::new_v4(127,0,0,1,8000),
272//! load_balancing_parameters: Some(LoadBalancingParams::default()),
273//! ..Default::default()
274//! };
275//!
276//! command_channel
277//! .write_message(&WorkerRequest {
278//! id: String::from("add-the-cluster"),
279//! content: RequestType::AddCluster(cluster).into(),
280//! })
281//! .expect("Could not send AddHttpFrontend request");
282//!
283//! command_channel
284//! .write_message(&WorkerRequest {
285//! id: String::from("add-the-frontend"),
286//! content: RequestType::AddHttpFrontend(http_front).into(),
287//! })
288//! .expect("Could not send AddHttpFrontend request");
289//!
290//! command_channel
291//! .write_message(&WorkerRequest {
292//! id: String::from("add-the-backend"),
293//! content: RequestType::AddBackend(http_backend).into(),
294//! })
295//! .expect("Could not send AddBackend request");
296//!
297//! println!("HTTP -> {:?}", command_channel.read_message());
298//! println!("HTTP -> {:?}", command_channel.read_message());
299//!
300//! // uncomment to let it run in the background
301//! // let _ = worker_thread_join_handle.join();
302//! info!("good bye");
303//! Ok(())
304//! }
305//! ```
306
307#[macro_use]
308extern crate sozu_command_lib as sozu_command;
309
310#[macro_use]
311pub mod util;
312#[macro_use]
313pub mod metrics;
314
315pub mod backends;
316pub mod crypto;
317pub mod features;
318pub mod health_check;
319pub mod http;
320pub mod load_balancing;
321pub mod pool;
322pub mod protocol;
323pub mod retry;
324pub mod router;
325pub mod socket;
326pub mod timer;
327pub mod tls;
328
329/// Linux zero-copy TCP forwarder. Used by `protocol::pipe::Pipe` when
330/// the listener is `Protocol::TCP` and the `splice` feature is enabled.
331#[cfg(all(target_os = "linux", feature = "splice"))]
332pub(crate) mod splice;
333
334pub mod server;
335pub mod tcp;
336
337pub mod https;
338
339use std::{
340 cell::RefCell,
341 collections::{BTreeMap, HashMap},
342 fmt::{self, Display, Formatter},
343 net::SocketAddr,
344 rc::Rc,
345 str,
346 time::{Duration, Instant},
347};
348
349use backends::BackendError;
350use hex::FromHexError;
351use mio::{Interest, Token, net::TcpStream};
352use protocol::http::{answers::HttpAnswers, answers::TemplateError, parser::Method};
353use router::RouterError;
354use socket::ServerBindError;
355use sozu_command::{
356 AsStr, ObjectKind,
357 logging::{CachedTags, LogContext},
358 proto::command::{Cluster, ListenerType, RequestHttpFrontend, WorkerRequest, WorkerResponse},
359 ready::Ready,
360 state::ClusterId,
361};
362use tls::CertificateResolverError;
363
364use crate::{backends::BackendMap, metrics::names, router::RouteResult};
365
366/// Anything that can be registered in mio (subscribe to kernel events)
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368pub enum Protocol {
369 HTTP,
370 HTTPS,
371 TCP,
372 HTTPListen,
373 HTTPSListen,
374 TCPListen,
375 Channel,
376 Metrics,
377 Timer,
378}
379
380/// trait that must be implemented by listeners and client sessions
381pub trait ProxySession {
382 /// indicates the protocol associated with the session
383 ///
384 /// this is used to distinguish sessions from listenrs, channels, metrics
385 /// and timers
386 fn protocol(&self) -> Protocol;
387 /// if a session received an event or can still execute, the event loop will
388 /// call this method. Its result indicates if it can still execute, needs to
389 /// connect to a backend server, close the session
390 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed;
391 /// if the event loop got an event for a token associated with the session,
392 /// it will call this method on the session
393 fn update_readiness(&mut self, token: Token, events: Ready);
394 /// close a session, frontend and backend sockets,
395 /// remove the entries from the session manager slab
396 fn close(&mut self);
397 /// if a timeout associated with the session triggers, the event loop will
398 /// call this method with the timeout's token
399 fn timeout(&mut self, t: Token) -> SessionIsToBeClosed;
400 /// last time the session got an event
401 fn last_event(&self) -> Instant;
402 /// display the session's internal state (for debugging purpose)
403 fn print_session(&self);
404 /// get the token associated with the frontend
405 fn frontend_token(&self) -> Token;
406 /// tell the session it has to shut down if possible
407 ///
408 /// if the session handles HTTP requests, it will not close until the response
409 /// is completely sent back to the client
410 fn shutting_down(&mut self) -> SessionIsToBeClosed;
411 /// Best-effort identifier of the cluster currently routed to by this
412 /// session. Returns `None` for `ListenSession` (no per-session
413 /// cluster), and for client sessions before routing has resolved.
414 /// H2 sessions multiplex many streams over one frontend token and may
415 /// touch several clusters; the returned value is whichever cluster
416 /// the session most recently keep-alive'd to. Used for log/metric
417 /// attribution, not for accounting (the tracker keeps the canonical
418 /// per-stream `(cluster, IP)` set).
419 fn cluster_id(&self) -> Option<String> {
420 None
421 }
422 /// Source address as observed by Sōzu, with proxy-protocol awareness.
423 /// HTTP/HTTPS/TCP client sessions return the parsed PROXY-protocol
424 /// source when present, else `peer_addr`. `ListenSession` returns
425 /// `None`. Used to attribute per-(cluster, source-IP) tracking and
426 /// access logs to the real client behind a layer-4 PROXY frontend.
427 fn session_address(&self) -> Option<SocketAddr> {
428 None
429 }
430}
431
432#[macro_export]
433macro_rules! branch {
434 (if $($value:ident)? == $expected:ident { $($then:tt)* } else { $($else:tt)* }) => {
435 macro_rules! expect {
436 ($expected) => {$($then)*};
437 ($a:ident) => {$($else)*};
438 () => {$($else)*}
439 }
440 expect!($($value)?);
441 };
442 (if $($value:ident)? == $expected:ident { $($then:tt)* } ) => {
443 macro_rules! expect {
444 ($expected) => {$($then)*};
445 }
446 expect!($($value)?);
447 };
448}
449
450#[macro_export]
451macro_rules! fallback {
452 ({} $($default:tt)*) => {
453 $($default)*
454 };
455 ({$($value:tt)+} $($default:tt)*) => {
456 $($value)+
457 };
458}
459
460#[macro_export]
461macro_rules! StateMachineBuilder {
462 (
463 ($d:tt)
464 $(#[$($state_macros:tt)*])*
465 enum $state_name:ident $(impl $trait:ident)? {
466 $($(#[$($variant_macros:tt)*])*
467 $variant_name:ident($state:ty$(,$($aux:ty),+)?) $(-> $override:expr)?),+ $(,)?
468 }
469 ) => {
470 /// A summary of the last valid State
471 #[derive(Clone, Copy, Debug)]
472 pub enum StateMarker {
473 $($variant_name,)+
474 }
475
476 $(#[$($state_macros)*])*
477 #[allow(clippy::large_enum_variant)]
478 pub enum $state_name {
479 $(
480 $(#[$($variant_macros)*])*
481 $variant_name($state$(,$($aux),+)?),
482 )+
483 /// Informs about upgrade failure, contains a summary the last valid State
484 FailedUpgrade(StateMarker),
485 }
486
487 macro_rules! _fn_impl {
488 ($function:ident(&$d($mut:ident)?, self $d(,$arg_name:ident: $arg_type:ty)*) $d(-> $ret:ty)? $d(| $marker:tt => $fail:expr)?) => {
489 fn $function(&$d($mut)? self $d(,$arg_name: $arg_type)*) $d(-> $ret)? {
490 match self {
491 $($state_name::$variant_name(_state, ..) => $crate::fallback!({$($override)?} _state.$function($d($arg_name),*)),)+
492 $state_name::FailedUpgrade($crate::fallback!({$d($marker)?} _)) => $crate::fallback!({$d($fail)?} unreachable!())
493 }
494 }
495 };
496 }
497
498 impl $state_name {
499 /// Informs about the last valid State before upgrade failure
500 fn marker(&self) -> StateMarker {
501 match self {
502 $($state_name::$variant_name(..) => StateMarker::$variant_name,)+
503 $state_name::FailedUpgrade(marker) => *marker,
504 }
505 }
506 /// Returns wether or not the State is FailedUpgrade
507 fn failed(&self) -> bool {
508 match self {
509 $state_name::FailedUpgrade(_) => true,
510 _ => false,
511 }
512 }
513 /// Gives back an owned version of the State,
514 /// leaving a FailedUpgrade in its place.
515 /// The FailedUpgrade retains the marker of the previous State.
516 fn take(&mut self) -> $state_name {
517 let mut owned_state = $state_name::FailedUpgrade(self.marker());
518 std::mem::swap(&mut owned_state, self);
519 owned_state
520 }
521 _fn_impl!{front_socket(&, self) -> &mio::net::TcpStream}
522 }
523
524 $crate::branch!{
525 if $($trait)? == SessionState {
526 impl SessionState for $state_name {
527 _fn_impl!{ready(&mut, self, session: Rc<RefCell<dyn ProxySession>>, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) -> SessionResult}
528 _fn_impl!{update_readiness(&mut, self, token: Token, events: Ready)}
529 _fn_impl!{timeout(&mut, self, token: Token, metrics: &mut SessionMetrics) -> StateResult}
530 _fn_impl!{cancel_timeouts(&mut, self)}
531 _fn_impl!{print_state(&, self, context: &str) | marker => error!("{} Session(FailedUpgrade({:?}))", context, marker)}
532 _fn_impl!{close(&mut, self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) | _ => {}}
533 _fn_impl!{shutting_down(&mut, self) -> SessionIsToBeClosed | _ => true}
534 }
535 } else {}
536 }
537 };
538 ($($tt:tt)+) => {
539 StateMachineBuilder!{($) $($tt)+}
540 }
541}
542
543pub trait ListenerHandler {
544 fn get_addr(&self) -> &SocketAddr;
545
546 fn get_tags(&self, key: &str) -> Option<&CachedTags>;
547
548 fn get_concatenated_tags(&self, key: &str) -> Option<&str> {
549 self.get_tags(key).map(|tags| tags.concatenated.as_str())
550 }
551
552 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>);
553
554 fn protocol(&self) -> Protocol;
555
556 fn public_address(&self) -> SocketAddr;
557}
558
559#[derive(thiserror::Error, Debug)]
560pub enum FrontendFromRequestError {
561 #[error("Could not parse hostname from '{host}': {error}")]
562 HostParse { host: String, error: String },
563 #[error("invalid remaining chars after hostname. Host: {0}")]
564 InvalidCharsAfterHost(String),
565 #[error("no cluster: {0}")]
566 NoClusterFound(RouterError),
567}
568
569pub trait L7ListenerHandler {
570 fn get_sticky_name(&self) -> &str;
571
572 /// Name of the correlation header Sozu injects into every request and
573 /// response body. Default: `"Sozu-Id"`. Operators can rebrand via the
574 /// `sozu_id_header` listener config knob.
575 fn get_sozu_id_header(&self) -> &str {
576 "Sozu-Id"
577 }
578
579 fn get_connect_timeout(&self) -> u32;
580
581 /// retrieve a frontend by parsing a request's hostname, uri and method
582 fn frontend_from_request(
583 &self,
584 host: &str,
585 uri: &str,
586 method: &Method,
587 ) -> Result<RouteResult, FrontendFromRequestError>;
588
589 /// retrieve the listener's configured HTTP answers (templates)
590 fn get_answers(&self) -> &Rc<RefCell<HttpAnswers>>;
591
592 /// H2 flood detection thresholds from the listener config.
593 /// Returns the default config when the listener does not provide custom values.
594 fn get_h2_flood_config(&self) -> protocol::mux::H2FloodConfig {
595 protocol::mux::H2FloodConfig::default()
596 }
597
598 /// H2 connection tuning from the listener config.
599 /// Returns the default config when the listener does not provide custom values.
600 fn get_h2_connection_config(&self) -> protocol::mux::H2ConnectionConfig {
601 protocol::mux::H2ConnectionConfig::default()
602 }
603
604 /// Whether requests must have their `:authority` / `Host` exact-match
605 /// the TLS SNI negotiated at handshake (CWE-346 / CWE-444).
606 ///
607 /// Defaults to `true` — the safe setting that closes the
608 /// CWE-346 / CWE-444 cross-SNI smuggling vector. Operators can opt
609 /// out per-listener via `HttpsListenerConfig::strict_sni_binding =
610 /// false` when cross-SNI routing is explicitly required. Plaintext
611 /// HTTP listeners return the default value; they never have an SNI
612 /// to compare against, so the routing-layer check short-circuits on
613 /// `tls_server_name: None`.
614 fn get_strict_sni_binding(&self) -> bool {
615 true
616 }
617
618 /// Whether to strip any client-supplied `X-Real-IP` header from
619 /// forwarded requests (anti-spoofing).
620 ///
621 /// Defaults to `false` — preserves the historical pass-through
622 /// behaviour. Operators opt in via
623 /// `HttpListenerConfig::elide_x_real_ip = true` (and the equivalent on
624 /// HTTPS listeners). Independent of [`Self::get_send_x_real_ip`]: the
625 /// two flags can be combined freely (anti-spoof only, send only, both,
626 /// or neither). The elision branch lives in
627 /// `HttpContext::on_request_headers`, so it covers H1 and H2 alike.
628 fn get_elide_x_real_ip(&self) -> bool {
629 false
630 }
631
632 /// Whether to append a proxy-generated `X-Real-IP` header carrying the
633 /// connection peer IP (post-PROXY-v2 unwrap, i.e. the original client
634 /// IP) to every forwarded request.
635 ///
636 /// Defaults to `false` — preserves the historical no-injection
637 /// behaviour. Operators opt in via
638 /// `HttpListenerConfig::send_x_real_ip = true` (and the equivalent on
639 /// HTTPS listeners). Independent of [`Self::get_elide_x_real_ip`]: the
640 /// two flags can be combined freely. The injection branch lives next
641 /// to the existing X-Forwarded-For / Forwarded synthesis in
642 /// `HttpContext::on_request_headers`.
643 fn get_send_x_real_ip(&self) -> bool {
644 false
645 }
646
647 /// Per-stream idle timeout for H2 connections. An open stream that makes
648 /// no forward progress for this duration is cancelled (RST_STREAM / CANCEL).
649 /// Mitigates slow-multiplex Slowloris where a client keeps connection-level
650 /// activity high (resetting the connection idle timer on every frame) while
651 /// pinning streams for the full nominal connection timeout.
652 ///
653 /// Listeners inherit `max(30s, back_timeout)` when `h2_stream_idle_timeout_seconds`
654 /// is absent so operators who raised the socket-level backend budget do not
655 /// have to duplicate the value here; the 30 s floor preserves the baseline
656 /// slow-multiplex mitigation when `back_timeout` is shorter. Set the knob
657 /// explicitly to cap the per-stream deadline below `back_timeout` (useful
658 /// when under a slow-multiplex attack).
659 fn get_h2_stream_idle_timeout(&self) -> std::time::Duration {
660 std::time::Duration::from_secs(30)
661 }
662
663 /// Wall-clock budget granted to in-flight H2 streams after soft-stop sent
664 /// the initial `GOAWAY(NO_ERROR)`. Once the deadline elapses the mux
665 /// transitions to a forced close (final GOAWAY + session teardown).
666 ///
667 /// Returning `None` disables the forced close entirely — shutdown waits
668 /// for every stream to drain naturally. Returning `Some(d)` enforces the
669 /// budget. Default: `Some(Duration::from_secs(5))` (matches the historic
670 /// hard-coded 5 s deadline). Listeners expose the
671 /// `h2_graceful_shutdown_deadline_seconds` knob; value `0` maps to `None`.
672 fn get_h2_graceful_shutdown_deadline(&self) -> Option<std::time::Duration> {
673 Some(std::time::Duration::from_secs(5))
674 }
675}
676
677#[derive(Clone, Copy, Debug, PartialEq, Eq)]
678pub enum BackendConnectionStatus {
679 NotConnected,
680 Connecting(Instant),
681 Connected,
682}
683
684impl BackendConnectionStatus {
685 pub fn is_connecting(&self) -> bool {
686 matches!(self, BackendConnectionStatus::Connecting(_))
687 }
688}
689
690#[derive(Debug, PartialEq, Eq)]
691pub enum BackendConnectAction {
692 New,
693 Reuse,
694 Replace,
695}
696
697#[derive(thiserror::Error, Debug)]
698pub enum BackendConnectionError {
699 #[error("Not found: {0:?}")]
700 NotFound(ObjectKind),
701 #[error("Too many connections on cluster {0:?}")]
702 MaxConnectionRetries(Option<String>),
703 #[error("the sessions slab has reached maximum capacity")]
704 MaxSessionsMemory,
705 #[error("error from the backend: {0}")]
706 Backend(BackendError),
707 #[error("failed to retrieve the cluster: {0}")]
708 RetrieveClusterError(RetrieveClusterError),
709 #[error("maximum number of buffers reached")]
710 MaxBuffers,
711 /// Per-(cluster, source-IP) connection limit reached. The protocol
712 /// layer translates this into HTTP 429 Too Many Requests (with an
713 /// optional `Retry-After`) for HTTP/HTTPS sessions, or a graceful TCP
714 /// close for raw TCP. The `cluster_id` is included so log/metric
715 /// pipelines can attribute the rejection.
716 #[error("per-(cluster, source-IP) connection limit reached for cluster {cluster_id:?}")]
717 TooManyConnectionsPerIp { cluster_id: String },
718}
719
720/// used in kawa_h1 module for the Http session state
721#[derive(thiserror::Error, Debug)]
722pub enum RetrieveClusterError {
723 #[error("No method given")]
724 NoMethod,
725 #[error("No host given")]
726 NoHost,
727 #[error("No path given")]
728 NoPath,
729 #[error("unauthorized route")]
730 UnauthorizedRoute,
731 #[error("{0}")]
732 RetrieveFrontend(FrontendFromRequestError),
733 #[error("HTTPS redirect required")]
734 HttpsRedirect,
735 /// The HTTP `:authority` / `Host` host does not match the TLS SNI that was
736 /// negotiated for this connection, which would cross the TLS trust boundary.
737 /// Maps to HTTP 421 Misdirected Request (RFC 9110 §15.5.20).
738 #[error("TLS SNI {sni:?} does not match HTTP authority {authority:?}")]
739 SniAuthorityMismatch { sni: String, authority: String },
740}
741
742/// Used in sessions
743#[derive(Debug, PartialEq, Eq)]
744pub enum AcceptError {
745 IoError,
746 TooManySessions,
747 WouldBlock,
748 RegisterError,
749 WrongSocketAddress,
750 BufferCapacityReached,
751}
752
753/// returned by the HTTP, HTTPS and TCP listeners
754#[derive(thiserror::Error, Debug)]
755pub enum ListenerError {
756 #[error("failed to handle certificate request, got a resolver error, {0}")]
757 Resolver(CertificateResolverError),
758 #[error("failed to parse pem, {0}")]
759 PemParse(String),
760 #[error("failed to parse template {0:?}: {1}")]
761 TemplateParse(String, TemplateError),
762 #[error("failed to build rustls context, {0}")]
763 BuildRustls(String),
764 #[error("could not activate listener with address {address:?}: {error}")]
765 Activation { address: SocketAddr, error: String },
766 #[error("Could not register listener socket: {0}")]
767 SocketRegistration(std::io::Error),
768 #[error("could not add frontend: {0}")]
769 AddFrontend(RouterError),
770 #[error("could not remove frontend: {0}")]
771 RemoveFrontend(RouterError),
772 #[error("invalid value for field '{field}': {reason}")]
773 InvalidValue {
774 field: &'static str,
775 reason: &'static str,
776 },
777 /// `UpdateHttpsListenerConfig.hsts` was present but its `enabled`
778 /// field was unset. Per the partial-update contract, `enabled` is
779 /// the explicit-disambiguator between "explicit disable" (false) and
780 /// "explicit enable" (true); the patch handler refuses an
781 /// `enabled = None` block rather than silently picking one.
782 #[error(
783 "UpdateHttpsListenerConfig.hsts is present but `enabled` is unset; the partial-update \
784 contract requires `enabled` whenever the `hsts` block is present"
785 )]
786 HstsEnabledRequired,
787}
788
789/// Lift control-plane validation errors into listener-level errors so the
790/// worker can surface the same message without duplicating the match.
791/// Non-`InvalidValue` variants fall back to a generic `InvalidValue` — they
792/// are not expected on the worker's `update_config` path (state lookups
793/// happen on the master) but we avoid panicking if one slips through.
794impl From<sozu_command::state::StateError> for ListenerError {
795 fn from(err: sozu_command::state::StateError) -> Self {
796 match err {
797 sozu_command::state::StateError::InvalidValue { field, reason } => {
798 ListenerError::InvalidValue { field, reason }
799 }
800 _ => ListenerError::InvalidValue {
801 field: "state",
802 reason: "unexpected state error on worker path",
803 },
804 }
805 }
806}
807
808/// Returned by the HTTP, HTTPS and TCP proxies
809#[derive(thiserror::Error, Debug)]
810pub enum ProxyError {
811 #[error("error while soft stopping {proxy_protocol} proxy: {error}")]
812 SoftStop {
813 proxy_protocol: String,
814 error: String,
815 },
816 #[error("error while hard stopping {proxy_protocol} proxy: {error}")]
817 HardStop {
818 proxy_protocol: String,
819 error: String,
820 },
821 #[error("found no listener with address {0:?}")]
822 NoListenerFound(SocketAddr),
823 #[error("a listener is already present for this token")]
824 ListenerAlreadyPresent,
825 #[error("could not add listener: {0}")]
826 AddListener(ListenerError),
827 #[error("could not add cluster: {0}")]
828 AddCluster(ListenerError),
829 #[error("failed to activate listener with address {address:?}: {listener_error}")]
830 ListenerActivation {
831 address: SocketAddr,
832 listener_error: ListenerError,
833 },
834 #[error("can not add frontend {front:?}: {error}")]
835 WrongInputFrontend {
836 front: Box<RequestHttpFrontend>,
837 error: String,
838 },
839 #[error("could not add frontend: {0}")]
840 AddFrontend(ListenerError),
841 #[error("could not remove frontend: {0}")]
842 RemoveFrontend(ListenerError),
843 #[error("could not add certificate: {0}")]
844 AddCertificate(CertificateResolverError),
845 #[error("could not remove certificate: {0}")]
846 RemoveCertificate(CertificateResolverError),
847 #[error("could not replace certificate: {0}")]
848 ReplaceCertificate(CertificateResolverError),
849 #[error("wrong certificate fingerprint: {0}")]
850 WrongCertificateFingerprint(FromHexError),
851 #[error("this request is not supported by the proxy")]
852 UnsupportedMessage,
853 #[error("failed to acquire the lock, {0}")]
854 Lock(String),
855 #[error("could not bind to socket {0:?}: {1}")]
856 BindToSocket(SocketAddr, ServerBindError),
857 #[error("error registering socket of listener: {0}")]
858 RegisterListener(std::io::Error),
859 #[error("the listener is not activated")]
860 UnactivatedListener,
861 /// HSTS (RFC 6797) was attached to a frontend on a plain-HTTP
862 /// listener. RFC 6797 §7.2 forbids `Strict-Transport-Security` on
863 /// plaintext-HTTP responses; the worker rejects the request rather
864 /// than ship a non-conformant policy. The TOML loader rejects the
865 /// same shape at config-load time
866 /// (`command/src/config.rs::ConfigError::HstsOnPlainHttp`); this
867 /// arm catches the same misconfiguration when the request reaches
868 /// the worker over the IPC channel without going through the TOML
869 /// path (e.g. via `sozu frontend http add`).
870 #[error(
871 "HSTS is only valid on HTTPS frontends; rejecting AddHttpFrontend with hsts.enabled = \
872 true on address {0:?} (RFC 6797 §7.2)"
873 )]
874 HstsOnPlainHttp(SocketAddr),
875}
876
877use self::server::ListenToken;
878pub trait ProxyConfiguration {
879 fn notify(&mut self, message: WorkerRequest) -> WorkerResponse;
880 fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError>;
881 fn create_session(
882 &mut self,
883 socket: TcpStream,
884 token: ListenToken,
885 wait_time: Duration,
886 proxy: Rc<RefCell<Self>>,
887 // should we insert the tags here?
888 ) -> Result<(), AcceptError>;
889}
890
891pub trait L7Proxy {
892 fn kind(&self) -> ListenerType;
893
894 fn register_socket(
895 &self,
896 socket: &mut TcpStream,
897 token: Token,
898 interest: Interest,
899 ) -> Result<(), std::io::Error>;
900
901 fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error>;
902
903 fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token;
904
905 /// Remove the session from the session manager slab.
906 /// Returns true if the session was actually there before deletion
907 fn remove_session(&self, token: Token) -> bool;
908
909 fn backends(&self) -> Rc<RefCell<BackendMap>>;
910
911 fn clusters(&self) -> &HashMap<ClusterId, Cluster>;
912
913 /// Access the worker's [`SessionManager`] for per-(cluster, source-IP)
914 /// connection-limit accounting. The mux uses this to track / untrack
915 /// stream-granular `(cluster_id, ip)` entries and consult the
916 /// `cluster_ip_at_limit` gate before each backend connect.
917 fn sessions(&self) -> Rc<RefCell<crate::server::SessionManager>>;
918}
919
920#[derive(Debug, PartialEq, Eq)]
921pub enum RequiredEvents {
922 FrontReadBackNone,
923 FrontWriteBackNone,
924 FrontReadWriteBackNone,
925 FrontNoneBackNone,
926 FrontReadBackRead,
927 FrontWriteBackRead,
928 FrontReadWriteBackRead,
929 FrontNoneBackRead,
930 FrontReadBackWrite,
931 FrontWriteBackWrite,
932 FrontReadWriteBackWrite,
933 FrontNoneBackWrite,
934 FrontReadBackReadWrite,
935 FrontWriteBackReadWrite,
936 FrontReadWriteBackReadWrite,
937 FrontNoneBackReadWrite,
938}
939
940impl RequiredEvents {
941 pub fn front_readable(&self) -> bool {
942 matches!(
943 *self,
944 RequiredEvents::FrontReadBackNone
945 | RequiredEvents::FrontReadWriteBackNone
946 | RequiredEvents::FrontReadBackRead
947 | RequiredEvents::FrontReadWriteBackRead
948 | RequiredEvents::FrontReadBackWrite
949 | RequiredEvents::FrontReadWriteBackWrite
950 | RequiredEvents::FrontReadBackReadWrite
951 | RequiredEvents::FrontReadWriteBackReadWrite
952 )
953 }
954
955 pub fn front_writable(&self) -> bool {
956 matches!(
957 *self,
958 RequiredEvents::FrontWriteBackNone
959 | RequiredEvents::FrontReadWriteBackNone
960 | RequiredEvents::FrontWriteBackRead
961 | RequiredEvents::FrontReadWriteBackRead
962 | RequiredEvents::FrontWriteBackWrite
963 | RequiredEvents::FrontReadWriteBackWrite
964 | RequiredEvents::FrontWriteBackReadWrite
965 | RequiredEvents::FrontReadWriteBackReadWrite
966 )
967 }
968
969 pub fn back_readable(&self) -> bool {
970 matches!(
971 *self,
972 RequiredEvents::FrontReadBackRead
973 | RequiredEvents::FrontWriteBackRead
974 | RequiredEvents::FrontReadWriteBackRead
975 | RequiredEvents::FrontNoneBackRead
976 | RequiredEvents::FrontReadBackReadWrite
977 | RequiredEvents::FrontWriteBackReadWrite
978 | RequiredEvents::FrontReadWriteBackReadWrite
979 | RequiredEvents::FrontNoneBackReadWrite
980 )
981 }
982
983 pub fn back_writable(&self) -> bool {
984 matches!(
985 *self,
986 RequiredEvents::FrontReadBackWrite
987 | RequiredEvents::FrontWriteBackWrite
988 | RequiredEvents::FrontReadWriteBackWrite
989 | RequiredEvents::FrontNoneBackWrite
990 | RequiredEvents::FrontReadBackReadWrite
991 | RequiredEvents::FrontWriteBackReadWrite
992 | RequiredEvents::FrontReadWriteBackReadWrite
993 | RequiredEvents::FrontNoneBackReadWrite
994 )
995 }
996}
997
998/// Signals transitions between states of a given Protocol
999#[derive(Debug, PartialEq, Eq)]
1000pub enum StateResult {
1001 /// Signals to the Protocol to close its backend
1002 CloseBackend,
1003 /// Signals to the parent Session to close itself
1004 CloseSession,
1005 /// Signals to the Protocol to connect to backend
1006 ConnectBackend,
1007 /// Signals to the Protocol to continue
1008 Continue,
1009 /// Signals to the parent Session to upgrade to the next Protocol
1010 Upgrade,
1011}
1012
1013/// Signals transitions between states of a given Session
1014#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1015pub enum SessionResult {
1016 /// Signals to the Session to close itself
1017 Close,
1018 /// Signals to the Session to continue
1019 Continue,
1020 /// Signals to the Session to upgrade its Protocol
1021 Upgrade,
1022}
1023
1024#[derive(Debug, PartialEq, Eq)]
1025pub enum SocketType {
1026 Listener,
1027 FrontClient,
1028}
1029
1030type SessionIsToBeClosed = bool;
1031
1032#[derive(Clone)]
1033pub struct Readiness {
1034 /// the current readiness
1035 pub event: Ready,
1036 /// the readiness we wish to attain
1037 pub interest: Ready,
1038}
1039
1040impl Display for Readiness {
1041 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1042 let i = &mut [b'-'; 4];
1043 let r = &mut [b'-'; 4];
1044 let mixed = &mut [b'-'; 4];
1045
1046 display_ready(i, self.interest);
1047 display_ready(r, self.event);
1048 display_ready(mixed, self.interest & self.event);
1049
1050 write!(
1051 f,
1052 "I({:?})&R({:?})=M({:?})",
1053 String::from_utf8_lossy(i),
1054 String::from_utf8_lossy(r),
1055 String::from_utf8_lossy(mixed)
1056 )
1057 }
1058}
1059
1060impl Default for Readiness {
1061 fn default() -> Self {
1062 Self::new()
1063 }
1064}
1065
1066impl Readiness {
1067 pub const fn new() -> Readiness {
1068 Readiness {
1069 event: Ready::EMPTY,
1070 interest: Ready::EMPTY,
1071 }
1072 }
1073
1074 pub fn reset(&mut self) {
1075 self.event = Ready::EMPTY;
1076 self.interest = Ready::EMPTY;
1077 }
1078
1079 /// filters the readiness we actually want
1080 pub fn filter_interest(&self) -> Ready {
1081 self.event & self.interest
1082 }
1083
1084 /// Signal that the socket has buffered data to write (e.g., TLS internal
1085 /// buffers) that won't generate a new epoll WRITABLE event.
1086 pub fn signal_pending_write(&mut self) {
1087 self.event.insert(Ready::WRITABLE);
1088 }
1089
1090 /// Signal that the socket has buffered data to read (e.g., TLS plaintext
1091 /// buffer after a 1xx clear) that won't generate a new epoll READABLE event.
1092 pub fn signal_pending_read(&mut self) {
1093 self.event.insert(Ready::READABLE);
1094 }
1095
1096 /// Pair `Ready::WRITABLE` insert with `signal_pending_write` — the canonical
1097 /// invariant-15 form for any path that writes bytes to sozu-owned buffers
1098 /// under edge-triggered epoll. See `lib/src/protocol/mux/LIFECYCLE.md`.
1099 #[inline]
1100 pub fn arm_writable(&mut self) {
1101 self.interest.insert(Ready::WRITABLE);
1102 self.signal_pending_write();
1103 }
1104}
1105
1106#[cfg(test)]
1107mod readiness_tests {
1108 use super::{Readiness, Ready};
1109
1110 #[test]
1111 fn arm_writable_sets_interest_and_event() {
1112 let mut r = Readiness::new();
1113 r.arm_writable();
1114 assert!(r.interest.is_writable());
1115 assert!(r.event.is_writable());
1116 }
1117
1118 #[test]
1119 fn arm_writable_is_idempotent() {
1120 let mut r = Readiness::new();
1121 r.arm_writable();
1122 r.arm_writable();
1123 assert_eq!(r.interest, Ready::WRITABLE);
1124 assert_eq!(r.event, Ready::WRITABLE);
1125 }
1126}
1127
1128pub fn display_ready(s: &mut [u8], readiness: Ready) {
1129 if readiness.is_readable() {
1130 s[0] = b'R';
1131 }
1132 if readiness.is_writable() {
1133 s[1] = b'W';
1134 }
1135 if readiness.is_error() {
1136 s[2] = b'E';
1137 }
1138 if readiness.is_hup() {
1139 s[3] = b'H';
1140 }
1141}
1142
1143pub fn ready_to_string(readiness: Ready) -> String {
1144 let s = &mut [b'-'; 4];
1145 display_ready(s, readiness);
1146 String::from_utf8(s.to_vec()).unwrap()
1147}
1148
1149impl fmt::Debug for Readiness {
1150 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1151 let i = &mut [b'-'; 4];
1152 let r = &mut [b'-'; 4];
1153 let mixed = &mut [b'-'; 4];
1154
1155 display_ready(i, self.interest);
1156 display_ready(r, self.event);
1157 display_ready(mixed, self.interest & self.event);
1158
1159 write!(
1160 f,
1161 "Readiness {{ interest: {}, readiness: {}, mixed: {} }}",
1162 str::from_utf8(i).unwrap(),
1163 str::from_utf8(r).unwrap(),
1164 str::from_utf8(mixed).unwrap()
1165 )
1166 }
1167}
1168
1169#[derive(Clone, Debug)]
1170pub struct SessionMetrics {
1171 /// date at which we started handling that request
1172 pub start: Option<Instant>,
1173 /// time actually spent handling the request
1174 pub service_time: Duration,
1175 /// time spent waiting for its turn
1176 pub wait_time: Duration,
1177 /// bytes received by the frontend
1178 pub bin: usize,
1179 /// bytes sent by the frontend
1180 pub bout: usize,
1181
1182 /// date at which we started working on the request
1183 pub service_start: Option<Instant>,
1184 pub wait_start: Instant,
1185
1186 pub backend_id: Option<String>,
1187 pub backend_start: Option<Instant>,
1188 pub backend_connected: Option<Instant>,
1189 pub backend_stop: Option<Instant>,
1190 pub backend_bin: usize,
1191 pub backend_bout: usize,
1192}
1193
1194impl SessionMetrics {
1195 pub fn new(wait_time: Option<Duration>) -> SessionMetrics {
1196 SessionMetrics {
1197 start: Some(Instant::now()),
1198 service_time: Duration::from_secs(0),
1199 wait_time: wait_time.unwrap_or_else(|| Duration::from_secs(0)),
1200 bin: 0,
1201 bout: 0,
1202 service_start: None,
1203 wait_start: Instant::now(),
1204 backend_id: None,
1205 backend_start: None,
1206 backend_connected: None,
1207 backend_stop: None,
1208 backend_bin: 0,
1209 backend_bout: 0,
1210 }
1211 }
1212
1213 pub fn reset(&mut self) {
1214 self.start = None;
1215 self.service_time = Duration::from_secs(0);
1216 self.wait_time = Duration::from_secs(0);
1217 self.bin = 0;
1218 self.bout = 0;
1219 self.service_start = None;
1220 self.backend_start = None;
1221 self.backend_connected = None;
1222 self.backend_stop = None;
1223 self.backend_bin = 0;
1224 self.backend_bout = 0;
1225 }
1226
1227 pub fn service_start(&mut self) {
1228 let now = Instant::now();
1229
1230 if self.start.is_none() {
1231 self.start = Some(now);
1232 }
1233
1234 self.service_start = Some(now);
1235 self.wait_time += now - self.wait_start;
1236 }
1237
1238 pub fn service_stop(&mut self) {
1239 if let Some(start) = self.service_start.take() {
1240 let duration = Instant::now() - start;
1241 self.service_time += duration;
1242 }
1243 }
1244
1245 pub fn wait_start(&mut self) {
1246 self.wait_start = Instant::now();
1247 }
1248
1249 pub fn service_time(&self) -> Duration {
1250 match self.service_start {
1251 Some(start) => {
1252 let last_duration = Instant::now() - start;
1253 self.service_time + last_duration
1254 }
1255 None => self.service_time,
1256 }
1257 }
1258
1259 /// time elapsed since the beginning of the session
1260 pub fn request_time(&self) -> Duration {
1261 match self.start {
1262 Some(start) => Instant::now() - start,
1263 None => Duration::from_secs(0),
1264 }
1265 }
1266
1267 pub fn backend_start(&mut self) {
1268 self.backend_start = Some(Instant::now());
1269 }
1270
1271 pub fn backend_connected(&mut self) {
1272 self.backend_connected = Some(Instant::now());
1273 }
1274
1275 pub fn backend_stop(&mut self) {
1276 self.backend_stop = Some(Instant::now());
1277 }
1278
1279 pub fn backend_response_time(&self) -> Option<Duration> {
1280 match (self.backend_connected, self.backend_stop) {
1281 (Some(start), Some(end)) => Some(end - start),
1282 (Some(start), None) => Some(Instant::now() - start),
1283 _ => None,
1284 }
1285 }
1286
1287 pub fn backend_connection_time(&self) -> Option<Duration> {
1288 match (self.backend_start, self.backend_connected) {
1289 (Some(start), Some(end)) => Some(end - start),
1290 _ => None,
1291 }
1292 }
1293
1294 pub fn register_end_of_session(&self, context: &LogContext) {
1295 let request_time = self.request_time();
1296 let service_time = self.service_time();
1297
1298 if let Some(cluster_id) = context.cluster_id {
1299 time!(
1300 names::event_loop::REQUEST_TIME,
1301 cluster_id,
1302 request_time.as_millis()
1303 );
1304 time!(
1305 names::event_loop::SERVICE_TIME,
1306 cluster_id,
1307 service_time.as_millis()
1308 );
1309 }
1310 time!(names::event_loop::REQUEST_TIME, request_time.as_millis());
1311 time!(names::event_loop::SERVICE_TIME, service_time.as_millis());
1312
1313 if let Some(backend_id) = self.backend_id.as_ref() {
1314 if let Some(backend_response_time) = self.backend_response_time() {
1315 record_backend_metrics!(
1316 context.cluster_id.as_str_or("-"),
1317 backend_id,
1318 backend_response_time.as_millis(),
1319 self.backend_connection_time(),
1320 self.backend_bin,
1321 self.backend_bout
1322 );
1323 }
1324 }
1325
1326 incr!(
1327 names::access_logs::COUNT,
1328 context.cluster_id,
1329 context.backend_id
1330 );
1331 }
1332}
1333
1334/// exponentially weighted moving average with high sensibility to latency bursts
1335///
1336/// cf Finagle for the original implementation: <https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala>
1337#[derive(Debug, PartialEq, Clone)]
1338pub struct PeakEWMA {
1339 /// decay in nanoseconds
1340 ///
1341 /// higher values will make the EWMA decay slowly to 0
1342 pub decay: f64,
1343 /// estimated RTT in nanoseconds
1344 ///
1345 /// must be set to a high enough default value so that new backends do not
1346 /// get all the traffic right away
1347 pub rtt: f64,
1348 /// last modification
1349 pub last_event: Instant,
1350}
1351
1352impl Default for PeakEWMA {
1353 fn default() -> Self {
1354 Self::new()
1355 }
1356}
1357
1358impl PeakEWMA {
1359 // hardcoded default values for now
1360 pub fn new() -> Self {
1361 PeakEWMA {
1362 // 1s
1363 decay: 1_000_000_000f64,
1364 // 50ms
1365 rtt: 50_000_000f64,
1366 last_event: Instant::now(),
1367 }
1368 }
1369
1370 pub fn observe(&mut self, rtt: f64) {
1371 let now = Instant::now();
1372 let dur = now - self.last_event;
1373
1374 // if latency is rising, we will immediately raise the cost
1375 if rtt > self.rtt {
1376 self.rtt = rtt;
1377 } else {
1378 // new_rtt = old_rtt * e^(-elapsed/decay) + observed_rtt * (1 - e^(-elapsed/decay))
1379 let weight = (-(dur.as_nanos() as f64) / self.decay).exp();
1380 self.rtt = self.rtt * weight + rtt * (1.0 - weight);
1381 }
1382
1383 self.last_event = now;
1384 }
1385
1386 pub fn get(&mut self, active_requests: usize) -> f64 {
1387 // decay the current value
1388 // (we might not have seen a request in a long time)
1389 self.observe(0.0);
1390
1391 (active_requests + 1) as f64 * self.rtt
1392 }
1393}
1394
1395pub mod testing {
1396 pub use std::{cell::RefCell, os::fd::IntoRawFd, rc::Rc};
1397
1398 pub use anyhow::Context;
1399 pub use mio::{Poll, Registry, Token, net::UnixStream};
1400 pub use slab::Slab;
1401 pub use sozu_command::{
1402 proto::command::{
1403 HttpListenerConfig, HttpsListenerConfig, ServerConfig, TcpListenerConfig,
1404 },
1405 scm_socket::{Listeners, ScmSocket},
1406 };
1407
1408 pub use crate::{
1409 Protocol, ProxySession,
1410 backends::BackendMap,
1411 http::HttpProxy,
1412 https::HttpsProxy,
1413 pool::Pool,
1414 server::{ListenSession, ProxyChannel, Server, SessionManager},
1415 tcp::TcpProxy,
1416 };
1417
1418 use std::sync::atomic::{AtomicU16, Ordering};
1419
1420 /// Port counter for sozu listener addresses in lib tests.
1421 /// Starts at 10000 to avoid collision with:
1422 /// - Privileged ports (<1024)
1423 /// - e2e suite (starts at 2000)
1424 /// - Ephemeral port range (typically 32768+)
1425 static PORT_PROVIDER: AtomicU16 = AtomicU16::new(10000);
1426
1427 /// Get a unique port for a sozu listener address.
1428 /// Each call returns a different port, safe for parallel test execution.
1429 pub fn provide_port() -> u16 {
1430 PORT_PROVIDER.fetch_add(1, Ordering::SeqCst)
1431 }
1432
1433 /// Everything needed to create a Server
1434 pub struct ServerParts {
1435 pub event_loop: Poll,
1436 pub registry: Registry,
1437 pub sessions: Rc<RefCell<SessionManager>>,
1438 pub pool: Rc<RefCell<Pool>>,
1439 pub backends: Rc<RefCell<BackendMap>>,
1440 pub client_scm_socket: ScmSocket,
1441 pub server_scm_socket: ScmSocket,
1442 pub server_config: ServerConfig,
1443 }
1444
1445 /// Setup a standalone server, for testing purposes
1446 pub fn prebuild_server(
1447 max_buffers: usize,
1448 buffer_size: usize,
1449 send_scm: bool,
1450 ) -> anyhow::Result<ServerParts> {
1451 let event_loop = Poll::new().with_context(|| "Failed at creating event loop")?;
1452 let backends = Rc::new(RefCell::new(BackendMap::new()));
1453 let server_config = ServerConfig {
1454 max_connections: max_buffers as u64,
1455 ..Default::default()
1456 };
1457
1458 let pool = Rc::new(RefCell::new(Pool::with_capacity(
1459 1,
1460 max_buffers,
1461 buffer_size,
1462 )));
1463
1464 let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
1465 {
1466 let entry = sessions.vacant_entry();
1467 info!("taking token {:?} for channel", entry.key());
1468 entry.insert(Rc::new(RefCell::new(ListenSession {
1469 protocol: Protocol::Channel,
1470 })));
1471 }
1472 {
1473 let entry = sessions.vacant_entry();
1474 info!("taking token {:?} for timer", entry.key());
1475 entry.insert(Rc::new(RefCell::new(ListenSession {
1476 protocol: Protocol::Timer,
1477 })));
1478 }
1479 {
1480 let entry = sessions.vacant_entry();
1481 info!("taking token {:?} for metrics", entry.key());
1482 entry.insert(Rc::new(RefCell::new(ListenSession {
1483 protocol: Protocol::Metrics,
1484 })));
1485 }
1486 // Test fixture: feature disabled (max_connections_per_ip = 0).
1487 let sessions = SessionManager::new(sessions, max_buffers, 0, 0);
1488
1489 let registry = event_loop
1490 .registry()
1491 .try_clone()
1492 .with_context(|| "Failed at creating a registry")?;
1493
1494 let (scm_server, scm_client) =
1495 UnixStream::pair().with_context(|| "Failed at creating scm unix stream")?;
1496 let client_scm_socket = ScmSocket::new(scm_client.into_raw_fd())
1497 .with_context(|| "Failed at creating the scm client socket")?;
1498 let server_scm_socket = ScmSocket::new(scm_server.into_raw_fd())
1499 .with_context(|| "Failed at creating the scm server socket")?;
1500 if send_scm {
1501 client_scm_socket
1502 .send_listeners(&Listeners::default())
1503 .with_context(|| "Failed at sending empty listeners")?;
1504 }
1505
1506 Ok(ServerParts {
1507 event_loop,
1508 registry,
1509 sessions,
1510 pool,
1511 backends,
1512 client_scm_socket,
1513 server_scm_socket,
1514 server_config,
1515 })
1516 }
1517}