sozu_lib/
lib.rs

1//! ## What this library does
2//!
3//! This library provides tools to build and start HTTP, HTTPS and TCP reverse proxies.
4//!
5//! The proxies handles network polling, HTTP parsing, TLS in a fast single threaded event
6//! loop.
7//!
8//! Each proxy is designed to receive configuration changes at runtime instead of
9//! reloading from a file regularly. The event loop runs in its own thread
10//! and receives commands through a message queue.
11//!
12//! ## Difference with the crate `sozu`
13//!
14//! To create several workers and manage them all at once (which is the most common way to
15//! use Sōzu), the crate `sozu` is more indicated than using the lib directly.
16//!
17//! The crate `sozu` provides a binary called the main process.
18//! The main process uses `sozu_lib` to start and manage workers.
19//! Each worker can handle HTTP, HTTPS and TCP traffic.
20//! The main process receives synchronizes the state of all workers, using UNIX sockets
21//! and custom channels to communicate with them.
22//! The main process itself is is configurable with a file, and has a CLI.
23//!
24//! ## How to use this library directly
25//!
26//! This documentation here explains how to write a binary that will start a single Sōzu
27//! worker and give it orders. The method has two steps:
28//!
29//! 1. Starts a Sōzu worker in a distinct thread
30//! 2. sends instructions to the worker on a UNIX socket via a Sōzu channel
31//!
32//! ### How to start a Sōzu worker
33//!
34//! Before creating an HTTP proxy, we first need to create an HTTP listener.
35//! The listener is an abstraction around a TCP socket provided by the kernel.
36//! We need the `sozu_command_lib` to build a listener.
37//!
38//! ```
39//! use sozu_command_lib::{config::ListenerBuilder, proto::command::SocketAddress};
40//!
41//! let address = SocketAddress::new_v4(127,0,0,1,8080);
42//! let http_listener = ListenerBuilder::new_http(address)
43//!     .to_http(None)
44//!     .expect("Could not create HTTP listener");
45//! ```
46//!
47//! The `http_listener` is of the type `HttpListenerConfig`, that we can be sent to the worker
48//! to start the proxy.
49//!
50//! Then create a pair of channels to communicate with the proxy.
51//! The channel is a wrapper around a unix socket.
52//!
53//! ```ignore
54//! use sozu_command_lib::{
55//!     channel::Channel,
56//!     proto::command::{WorkerRequest, WorkerResponse},
57//! };
58//!
59//! let (mut command_channel, proxy_channel): (
60//!     Channel<WorkerRequest, WorkerResponse>,
61//!     Channel<WorkerResponse, WorkerRequest>,
62//! ) = Channel::generate(1000, 10000).expect("should create a channel");
63//!```
64//!
65//! Here, the `command_channel` end is blocking, it sends `WorkerRequest`s and receives
66//! `WorkerResponses`, while the `proxy_channel` end is non-blocking, and the types are reversed.
67//! Writing the types here isn't even necessary thanks to the compiler,
68//! but it brings the point accross.
69//!
70//! You can now launch the worker in a separate thread, providing the HTTP listener config,
71//! the proxy end of the channel, and your custom number of buffers and their size:
72//!
73//! ```ignore
74//! use std::thread;
75//!
76//! let worker_thread_join_handle = thread::spawn(move || {
77//!     let max_buffers = 500;
78//!     let buffer_size = 16384;
79//!     sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size);
80//! });
81//! ```
82//!
83//! ### Send orders
84//!
85//! Once the thread is launched, the proxy worker will start its event loop and handle
86//! events on the listening interface and port specified when building the HTTP Listener.
87//! Since no frontends or backends were specified for the proxy, it will receive
88//! the connections, parse the requests, then send a default (but configurable)
89//! answer.
90//!
91//! Before defining a frontend and backends, we need to define a cluster, which describes
92//! a routing configuration. A cluster contains:
93//!
94//! - one frontend
95//! - one or several backends
96//! - routing rules
97//!
98//! A cluster is identified by its `cluster_id`, which will be used to define frontends
99//! and backends later on.
100//!
101//! ```
102//! use sozu_command_lib::proto::command::{Cluster, LoadBalancingAlgorithms};
103//!
104//! let cluster = Cluster {
105//!     cluster_id: "my-cluster".to_string(),
106//!     sticky_session: false,
107//!     https_redirect: false,
108//!     load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
109//!     answer_503: Some("A custom forbidden message".to_string()),
110//!     ..Default::default()
111//! };
112//! ```
113//!
114//! The defaults are sensible, so we could define only the `cluster_id`.
115//!
116//! We can now define a frontend. A frontend is a way to recognize a request and match
117//! it to a `cluster_id`, depending on the hostname and the beginning of the URL path.
118//! The `address` field must match the one of the HTTP listener we defined before:
119//!
120//! ```
121//! use std::collections::BTreeMap;
122//!
123//!  use sozu_command_lib::proto::command::{PathRule, RequestHttpFrontend, RulePosition, SocketAddress};
124//!
125//! let http_front = RequestHttpFrontend {
126//!     cluster_id: Some("my-cluster".to_string()),
127//!     address: SocketAddress::new_v4(127,0,0,1,8080),
128//!     hostname: "example.com".to_string(),
129//!     path: PathRule::prefix(String::from("/")),
130//!     position: RulePosition::Pre.into(),
131//!     tags: BTreeMap::from([
132//!         ("owner".to_owned(), "John".to_owned()),
133//!         ("id".to_owned(), "my-own-http-front".to_owned()),
134//!     ]),
135//!     ..Default::default()
136//! };
137//! ```
138//!
139//! The `tags` are keys and values that will appear in the access logs,
140//! which can come in handy.
141//!
142//! Now let's define a backend.
143//! A backend is an instance of a backend application we want to route traffic to.
144//! The `address` field must match the IP and port of the backend server.
145//!
146//! ```
147//! use sozu_command_lib::proto::command::{AddBackend, LoadBalancingParams, SocketAddress};
148//!
149//! let http_backend = AddBackend {
150//!     cluster_id: "my-cluster".to_string(),
151//!     backend_id: "test-backend".to_string(),
152//!     address: SocketAddress::new_v4(127,0,0,1,8000),
153//!     load_balancing_parameters: Some(LoadBalancingParams::default()),
154//!     ..Default::default()
155//! };
156//! ```
157//!
158//! A cluster can have multiple backend servers, and they can be added or
159//! removed while the proxy is running. If a backend is removed from the configuration
160//! while the proxy is handling a request to that server, it will finish that
161//! request and stop sending new traffic to that server.
162//!
163//!
164//! Now we can use the other end of the channel to send all these requests to the worker,
165//! using the WorkerRequest type:
166//!
167//! ```ignore
168//! use sozu_command_lib::{
169//!     proto::command::{Request, request::RequestType, WorkerRequest},
170//! };
171//!
172//! command_channel
173//!     .write_message(&WorkerRequest {
174//!         id: String::from("add-the-cluster"),
175//!         content: RequestType::AddCluster(cluster).into(),
176//!     })
177//!     .expect("Could not send AddHttpFrontend request");
178//!
179//! command_channel
180//!     .write_message(&WorkerRequest {
181//!         id: String::from("add-the-frontend"),
182//!         content: RequestType::AddHttpFrontend(http_front).into(),
183//!     })
184//!     .expect("Could not send AddHttpFrontend request");
185//!
186//! command_channel
187//!     .write_message(&WorkerRequest {
188//!         id: String::from("add-the-backend"),
189//!         content: RequestType::AddBackend(http_backend).into(),
190//!     })
191//!     .expect("Could not send AddBackend request");
192//!
193//! println!("HTTP -> {:?}", command_channel.read_message());
194//! println!("HTTP -> {:?}", command_channel.read_message());
195//! println!("HTTP -> {:?}", command_channel.read_message());
196//! ```
197//!
198//!
199//! The event loop of the worker will process these instructions and add them to
200//! its state, and the worker will send back an acknowledgement
201//! message.
202//!
203//! Now we can let the worker thread run in the background:
204//!
205//! ```ignore
206//! let _ = worker_thread_join_handle.join();
207//! ```
208//!
209//! Here is the complete example for reference, it matches the `examples/http.rs` example:
210//!
211//! ```
212//! #[macro_use]
213//! extern crate sozu_command_lib;
214//!
215//! use std::{collections::BTreeMap, env, io::stdout, thread};
216//!
217//! use anyhow::Context;
218//! use sozu_command_lib::{
219//!     channel::Channel,
220//!     config::ListenerBuilder,
221//!     logging::setup_default_logging,
222//!     proto::command::{
223//!         request::RequestType, AddBackend, Cluster, LoadBalancingAlgorithms, LoadBalancingParams,
224//!         PathRule, Request, RequestHttpFrontend, RulePosition, SocketAddress,WorkerRequest,
225//!     },
226//! };
227//!
228//! fn main() -> anyhow::Result<()> {
229//!     setup_default_logging(true, "info", "EXAMPLE").with_context(|| "could not setup logging")?;
230//!
231//!     info!("starting up");
232//!
233//!     let http_listener = ListenerBuilder::new_http(SocketAddress::new_v4(127,0,0,1,8080))
234//!         .to_http(None)
235//!         .expect("Could not create HTTP listener");
236//!
237//!     let (mut command_channel, proxy_channel) =
238//!         Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
239//!
240//!     let worker_thread_join_handle = thread::spawn(move || {
241//!         let max_buffers = 500;
242//!         let buffer_size = 16384;
243//!         sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size)
244//!             .expect("The worker could not be started, or shut down");
245//!     });
246//!
247//!     let cluster = Cluster {
248//!         cluster_id: "my-cluster".to_string(),
249//!         sticky_session: false,
250//!         https_redirect: false,
251//!         load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
252//!         answer_503: Some("A custom forbidden message".to_string()),
253//!         ..Default::default()
254//!     };
255//!
256//!     let http_front = RequestHttpFrontend {
257//!         cluster_id: Some("my-cluster".to_string()),
258//!         address: SocketAddress::new_v4(127,0,0,1,8080),
259//!         hostname: "example.com".to_string(),
260//!         path: PathRule::prefix(String::from("/")),
261//!         position: RulePosition::Pre.into(),
262//!         tags: BTreeMap::from([
263//!             ("owner".to_owned(), "John".to_owned()),
264//!             ("id".to_owned(), "my-own-http-front".to_owned()),
265//!         ]),
266//!         ..Default::default()
267//!     };
268//!     let http_backend = AddBackend {
269//!         cluster_id: "my-cluster".to_string(),
270//!         backend_id: "test-backend".to_string(),
271//!         address: SocketAddress::new_v4(127,0,0,1,8000),
272//!         load_balancing_parameters: Some(LoadBalancingParams::default()),
273//!         ..Default::default()
274//!     };
275//!
276//!     command_channel
277//!         .write_message(&WorkerRequest {
278//!             id: String::from("add-the-cluster"),
279//!             content: RequestType::AddCluster(cluster).into(),
280//!         })
281//!         .expect("Could not send AddHttpFrontend request");
282//!
283//!     command_channel
284//!         .write_message(&WorkerRequest {
285//!             id: String::from("add-the-frontend"),
286//!             content: RequestType::AddHttpFrontend(http_front).into(),
287//!         })
288//!         .expect("Could not send AddHttpFrontend request");
289//!
290//!     command_channel
291//!         .write_message(&WorkerRequest {
292//!             id: String::from("add-the-backend"),
293//!             content: RequestType::AddBackend(http_backend).into(),
294//!         })
295//!         .expect("Could not send AddBackend request");
296//!
297//!     println!("HTTP -> {:?}", command_channel.read_message());
298//!     println!("HTTP -> {:?}", command_channel.read_message());
299//!
300//!     // uncomment to let it run in the background
301//!     // let _ = worker_thread_join_handle.join();
302//!     info!("good bye");
303//!     Ok(())
304//! }
305//! ```
306
307#[macro_use]
308extern crate sozu_command_lib as sozu_command;
309#[cfg(test)]
310#[macro_use]
311extern crate quickcheck;
312
313#[macro_use]
314pub mod util;
315#[macro_use]
316pub mod metrics;
317
318pub mod backends;
319pub mod features;
320pub mod http;
321pub mod load_balancing;
322pub mod pool;
323pub mod protocol;
324pub mod retry;
325pub mod router;
326pub mod socket;
327pub mod timer;
328pub mod tls;
329
330/// unused for now but may be usefull for bypassing sozu on a low level
331#[cfg(feature = "splice")]
332mod splice;
333
334pub mod server;
335pub mod tcp;
336
337pub mod https;
338
339use std::{
340    cell::RefCell,
341    collections::{BTreeMap, HashMap},
342    fmt::{self, Display, Formatter},
343    net::SocketAddr,
344    rc::Rc,
345    str,
346    time::{Duration, Instant},
347};
348
349use backends::BackendError;
350use hex::FromHexError;
351use mio::{net::TcpStream, Interest, Token};
352use protocol::http::{answers::TemplateError, parser::Method};
353use router::RouterError;
354use socket::ServerBindError;
355use tls::CertificateResolverError;
356
357use sozu_command::{
358    logging::{CachedTags, LogContext},
359    proto::command::{Cluster, ListenerType, RequestHttpFrontend, WorkerRequest, WorkerResponse},
360    ready::Ready,
361    state::ClusterId,
362    AsStr, ObjectKind,
363};
364
365use crate::{backends::BackendMap, router::Route};
366
367/// Anything that can be registered in mio (subscribe to kernel events)
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369pub enum Protocol {
370    HTTP,
371    HTTPS,
372    TCP,
373    HTTPListen,
374    HTTPSListen,
375    TCPListen,
376    Channel,
377    Metrics,
378    Timer,
379}
380
381/// trait that must be implemented by listeners and client sessions
382pub trait ProxySession {
383    /// indicates the protocol associated with the session
384    ///
385    /// this is used to distinguish sessions from listenrs, channels, metrics
386    /// and timers
387    fn protocol(&self) -> Protocol;
388    /// if a session received an event or can still execute, the event loop will
389    /// call this method. Its result indicates if it can still execute, needs to
390    /// connect to a backend server, close the session
391    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed;
392    /// if the event loop got an event for a token associated with the session,
393    /// it will call this method on the session
394    fn update_readiness(&mut self, token: Token, events: Ready);
395    /// close a session, frontend and backend sockets,
396    /// remove the entries from the session manager slab
397    fn close(&mut self);
398    /// if a timeout associated with the session triggers, the event loop will
399    /// call this method with the timeout's token
400    fn timeout(&mut self, t: Token) -> SessionIsToBeClosed;
401    /// last time the session got an event
402    fn last_event(&self) -> Instant;
403    /// display the session's internal state (for debugging purpose)
404    fn print_session(&self);
405    /// get the token associated with the frontend
406    fn frontend_token(&self) -> Token;
407    /// tell the session it has to shut down if possible
408    ///
409    /// if the session handles HTTP requests, it will not close until the response
410    /// is completely sent back to the client
411    fn shutting_down(&mut self) -> SessionIsToBeClosed;
412}
413
414#[macro_export]
415macro_rules! branch {
416    (if $($value:ident)? == $expected:ident { $($then:tt)* } else { $($else:tt)* }) => {
417        macro_rules! expect {
418            ($expected) => {$($then)*};
419            ($a:ident) => {$($else)*};
420            () => {$($else)*}
421        }
422        expect!($($value)?);
423    };
424    (if $($value:ident)? == $expected:ident { $($then:tt)* } ) => {
425        macro_rules! expect {
426            ($expected) => {$($then)*};
427        }
428        expect!($($value)?);
429    };
430}
431
432#[macro_export]
433macro_rules! fallback {
434    ({} $($default:tt)*) => {
435        $($default)*
436    };
437    ({$($value:tt)+} $($default:tt)*) => {
438        $($value)+
439    };
440}
441
442#[macro_export]
443macro_rules! StateMachineBuilder {
444    (
445        ($d:tt)
446        $(#[$($state_macros:tt)*])*
447        enum $state_name:ident $(impl $trait:ident)?  {
448            $($(#[$($variant_macros:tt)*])*
449            $variant_name:ident($state:ty$(,$($aux:ty),+)?) $(-> $override:expr)?),+ $(,)?
450        }
451    ) => {
452        /// A summary of the last valid State
453        #[derive(Clone, Copy, Debug)]
454        pub enum StateMarker {
455            $($variant_name,)+
456        }
457
458        $(#[$($state_macros)*])*
459        pub enum $state_name {
460            $(
461                $(#[$($variant_macros)*])*
462                $variant_name($state$(,$($aux),+)?),
463            )+
464            /// Informs about upgrade failure, contains a summary the last valid State
465            FailedUpgrade(StateMarker),
466        }
467
468        macro_rules! _fn_impl {
469            ($function:ident(&$d($mut:ident)?, self $d(,$arg_name:ident: $arg_type:ty)*) $d(-> $ret:ty)? $d(| $marker:tt => $fail:expr)?) => {
470                fn $function(&$d($mut)? self $d(,$arg_name: $arg_type)*) $d(-> $ret)? {
471                    match self {
472                        $($state_name::$variant_name(_state, ..) => $crate::fallback!({$($override)?} _state.$function($d($arg_name),*)),)+
473                        $state_name::FailedUpgrade($crate::fallback!({$d($marker)?} _)) => $crate::fallback!({$d($fail)?} unreachable!())
474                    }
475                }
476            };
477        }
478
479        impl $state_name {
480            /// Informs about the last valid State before upgrade failure
481            fn marker(&self) -> StateMarker {
482                match self {
483                    $($state_name::$variant_name(..) => StateMarker::$variant_name,)+
484                    $state_name::FailedUpgrade(marker) => *marker,
485                }
486            }
487            /// Returns wether or not the State is FailedUpgrade
488            fn failed(&self) -> bool {
489                match self {
490                    $state_name::FailedUpgrade(_) => true,
491                    _ => false,
492                }
493            }
494            /// Gives back an owned version of the State,
495            /// leaving a FailedUpgrade in its place.
496            /// The FailedUpgrade retains the marker of the previous State.
497            fn take(&mut self) -> $state_name {
498                let mut owned_state = $state_name::FailedUpgrade(self.marker());
499                std::mem::swap(&mut owned_state, self);
500                owned_state
501            }
502            _fn_impl!{front_socket(&, self) -> &mio::net::TcpStream}
503        }
504
505        $crate::branch!{
506            if $($trait)? == SessionState {
507                impl SessionState for $state_name {
508                    _fn_impl!{ready(&mut, self, session: Rc<RefCell<dyn ProxySession>>, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) -> SessionResult}
509                    _fn_impl!{update_readiness(&mut, self, token: Token, events: Ready)}
510                    _fn_impl!{timeout(&mut, self, token: Token, metrics: &mut SessionMetrics) -> StateResult}
511                    _fn_impl!{cancel_timeouts(&mut, self)}
512                    _fn_impl!{print_state(&, self, context: &str) | marker => error!("{} Session(FailedUpgrade({:?}))", context, marker)}
513                    _fn_impl!{close(&mut, self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) | _ => {}}
514                    _fn_impl!{shutting_down(&mut, self) -> SessionIsToBeClosed | _ => true}
515                }
516            } else {}
517        }
518    };
519    ($($tt:tt)+) => {
520        StateMachineBuilder!{($) $($tt)+}
521    }
522}
523
524pub trait ListenerHandler {
525    fn get_addr(&self) -> &SocketAddr;
526
527    fn get_tags(&self, key: &str) -> Option<&CachedTags>;
528
529    fn get_concatenated_tags(&self, key: &str) -> Option<&str> {
530        self.get_tags(key).map(|tags| tags.concatenated.as_str())
531    }
532
533    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>);
534}
535
536#[derive(thiserror::Error, Debug)]
537pub enum FrontendFromRequestError {
538    #[error("Could not parse hostname from '{host}': {error}")]
539    HostParse { host: String, error: String },
540    #[error("invalid remaining chars after hostname. Host: {0}")]
541    InvalidCharsAfterHost(String),
542    #[error("no cluster: {0}")]
543    NoClusterFound(RouterError),
544}
545
546pub trait L7ListenerHandler {
547    fn get_sticky_name(&self) -> &str;
548
549    fn get_connect_timeout(&self) -> u32;
550
551    /// retrieve a frontend by parsing a request's hostname, uri and method
552    fn frontend_from_request(
553        &self,
554        host: &str,
555        uri: &str,
556        method: &Method,
557    ) -> Result<Route, FrontendFromRequestError>;
558}
559
560#[derive(Clone, Copy, Debug, PartialEq, Eq)]
561pub enum BackendConnectionStatus {
562    NotConnected,
563    Connecting(Instant),
564    Connected,
565}
566
567impl BackendConnectionStatus {
568    pub fn is_connecting(&self) -> bool {
569        matches!(self, BackendConnectionStatus::Connecting(_))
570    }
571}
572
573#[derive(Debug, PartialEq, Eq)]
574pub enum BackendConnectAction {
575    New,
576    Reuse,
577    Replace,
578}
579
580#[derive(thiserror::Error, Debug)]
581pub enum BackendConnectionError {
582    #[error("Not found: {0:?}")]
583    NotFound(ObjectKind),
584    #[error("Too many connections on cluster {0:?}")]
585    MaxConnectionRetries(Option<String>),
586    #[error("the sessions slab has reached maximum capacity")]
587    MaxSessionsMemory,
588    #[error("error from the backend: {0}")]
589    Backend(BackendError),
590    #[error("failed to retrieve the cluster: {0}")]
591    RetrieveClusterError(RetrieveClusterError),
592}
593
594/// used in kawa_h1 module for the Http session state
595#[derive(thiserror::Error, Debug)]
596pub enum RetrieveClusterError {
597    #[error("No method given")]
598    NoMethod,
599    #[error("No host given")]
600    NoHost,
601    #[error("No path given")]
602    NoPath,
603    #[error("unauthorized route")]
604    UnauthorizedRoute,
605    #[error("{0}")]
606    RetrieveFrontend(FrontendFromRequestError),
607}
608
609/// Used in sessions
610#[derive(Debug, PartialEq, Eq)]
611pub enum AcceptError {
612    IoError,
613    TooManySessions,
614    WouldBlock,
615    RegisterError,
616    WrongSocketAddress,
617    BufferCapacityReached,
618}
619
620/// returned by the HTTP, HTTPS and TCP listeners
621#[derive(thiserror::Error, Debug)]
622pub enum ListenerError {
623    #[error("failed to handle certificate request, got a resolver error, {0}")]
624    Resolver(CertificateResolverError),
625    #[error("failed to parse pem, {0}")]
626    PemParse(String),
627    #[error("failed to parse template {0}: {1}")]
628    TemplateParse(u16, TemplateError),
629    #[error("failed to build rustls context, {0}")]
630    BuildRustls(String),
631    #[error("could not activate listener with address {address:?}: {error}")]
632    Activation { address: SocketAddr, error: String },
633    #[error("Could not register listener socket: {0}")]
634    SocketRegistration(std::io::Error),
635    #[error("could not add frontend: {0}")]
636    AddFrontend(RouterError),
637    #[error("could not remove frontend: {0}")]
638    RemoveFrontend(RouterError),
639}
640
641/// Returned by the HTTP, HTTPS and TCP proxies
642#[derive(thiserror::Error, Debug)]
643pub enum ProxyError {
644    #[error("error while soft stopping {proxy_protocol} proxy: {error}")]
645    SoftStop {
646        proxy_protocol: String,
647        error: String,
648    },
649    #[error("error while hard stopping {proxy_protocol} proxy: {error}")]
650    HardStop {
651        proxy_protocol: String,
652        error: String,
653    },
654    #[error("found no listener with address {0:?}")]
655    NoListenerFound(SocketAddr),
656    #[error("a listener is already present for this token")]
657    ListenerAlreadyPresent,
658    #[error("could not add listener: {0}")]
659    AddListener(ListenerError),
660    #[error("could not add cluster: {0}")]
661    AddCluster(ListenerError),
662    #[error("failed to activate listener with address {address:?}: {listener_error}")]
663    ListenerActivation {
664        address: SocketAddr,
665        listener_error: ListenerError,
666    },
667    #[error("can not add frontend {front:?}: {error}")]
668    WrongInputFrontend {
669        front: RequestHttpFrontend,
670        error: String,
671    },
672    #[error("could not add frontend: {0}")]
673    AddFrontend(ListenerError),
674    #[error("could not remove frontend: {0}")]
675    RemoveFrontend(ListenerError),
676    #[error("could not add certificate: {0}")]
677    AddCertificate(CertificateResolverError),
678    #[error("could not remove certificate: {0}")]
679    RemoveCertificate(CertificateResolverError),
680    #[error("could not replace certificate: {0}")]
681    ReplaceCertificate(CertificateResolverError),
682    #[error("wrong certificate fingerprint: {0}")]
683    WrongCertificateFingerprint(FromHexError),
684    #[error("this request is not supported by the proxy")]
685    UnsupportedMessage,
686    #[error("failed to acquire the lock, {0}")]
687    Lock(String),
688    #[error("could not bind to socket {0:?}: {1}")]
689    BindToSocket(SocketAddr, ServerBindError),
690    #[error("error registering socket of listener: {0}")]
691    RegisterListener(std::io::Error),
692    #[error("the listener is not activated")]
693    UnactivatedListener,
694}
695
696use self::server::ListenToken;
697pub trait ProxyConfiguration {
698    fn notify(&mut self, message: WorkerRequest) -> WorkerResponse;
699    fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError>;
700    fn create_session(
701        &mut self,
702        socket: TcpStream,
703        token: ListenToken,
704        wait_time: Duration,
705        proxy: Rc<RefCell<Self>>,
706        // should we insert the tags here?
707    ) -> Result<(), AcceptError>;
708}
709
710pub trait L7Proxy {
711    fn kind(&self) -> ListenerType;
712
713    fn register_socket(
714        &self,
715        socket: &mut TcpStream,
716        token: Token,
717        interest: Interest,
718    ) -> Result<(), std::io::Error>;
719
720    fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error>;
721
722    fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token;
723
724    /// Remove the session from the session manager slab.
725    /// Returns true if the session was actually there before deletion
726    fn remove_session(&self, token: Token) -> bool;
727
728    fn backends(&self) -> Rc<RefCell<BackendMap>>;
729
730    fn clusters(&self) -> &HashMap<ClusterId, Cluster>;
731}
732
733#[derive(Debug, PartialEq, Eq)]
734pub enum RequiredEvents {
735    FrontReadBackNone,
736    FrontWriteBackNone,
737    FrontReadWriteBackNone,
738    FrontNoneBackNone,
739    FrontReadBackRead,
740    FrontWriteBackRead,
741    FrontReadWriteBackRead,
742    FrontNoneBackRead,
743    FrontReadBackWrite,
744    FrontWriteBackWrite,
745    FrontReadWriteBackWrite,
746    FrontNoneBackWrite,
747    FrontReadBackReadWrite,
748    FrontWriteBackReadWrite,
749    FrontReadWriteBackReadWrite,
750    FrontNoneBackReadWrite,
751}
752
753impl RequiredEvents {
754    pub fn front_readable(&self) -> bool {
755        matches!(
756            *self,
757            RequiredEvents::FrontReadBackNone
758                | RequiredEvents::FrontReadWriteBackNone
759                | RequiredEvents::FrontReadBackRead
760                | RequiredEvents::FrontReadWriteBackRead
761                | RequiredEvents::FrontReadBackWrite
762                | RequiredEvents::FrontReadWriteBackWrite
763                | RequiredEvents::FrontReadBackReadWrite
764                | RequiredEvents::FrontReadWriteBackReadWrite
765        )
766    }
767
768    pub fn front_writable(&self) -> bool {
769        matches!(
770            *self,
771            RequiredEvents::FrontWriteBackNone
772                | RequiredEvents::FrontReadWriteBackNone
773                | RequiredEvents::FrontWriteBackRead
774                | RequiredEvents::FrontReadWriteBackRead
775                | RequiredEvents::FrontWriteBackWrite
776                | RequiredEvents::FrontReadWriteBackWrite
777                | RequiredEvents::FrontWriteBackReadWrite
778                | RequiredEvents::FrontReadWriteBackReadWrite
779        )
780    }
781
782    pub fn back_readable(&self) -> bool {
783        matches!(
784            *self,
785            RequiredEvents::FrontReadBackRead
786                | RequiredEvents::FrontWriteBackRead
787                | RequiredEvents::FrontReadWriteBackRead
788                | RequiredEvents::FrontNoneBackRead
789                | RequiredEvents::FrontReadBackReadWrite
790                | RequiredEvents::FrontWriteBackReadWrite
791                | RequiredEvents::FrontReadWriteBackReadWrite
792                | RequiredEvents::FrontNoneBackReadWrite
793        )
794    }
795
796    pub fn back_writable(&self) -> bool {
797        matches!(
798            *self,
799            RequiredEvents::FrontReadBackWrite
800                | RequiredEvents::FrontWriteBackWrite
801                | RequiredEvents::FrontReadWriteBackWrite
802                | RequiredEvents::FrontNoneBackWrite
803                | RequiredEvents::FrontReadBackReadWrite
804                | RequiredEvents::FrontWriteBackReadWrite
805                | RequiredEvents::FrontReadWriteBackReadWrite
806                | RequiredEvents::FrontNoneBackReadWrite
807        )
808    }
809}
810
811/// Signals transitions between states of a given Protocol
812#[derive(Debug, PartialEq, Eq)]
813pub enum StateResult {
814    /// Signals to the Protocol to close its backend
815    CloseBackend,
816    /// Signals to the parent Session to close itself
817    CloseSession,
818    /// Signals to the Protocol to connect to backend
819    ConnectBackend,
820    /// Signals to the Protocol to continue
821    Continue,
822    /// Signals to the parent Session to upgrade to the next Protocol
823    Upgrade,
824}
825
826/// Signals transitions between states of a given Session
827#[derive(Debug, Clone, Copy, PartialEq, Eq)]
828pub enum SessionResult {
829    /// Signals to the Session to close itself
830    Close,
831    /// Signals to the Session to continue
832    Continue,
833    /// Signals to the Session to upgrade its Protocol
834    Upgrade,
835}
836
837#[derive(Debug, PartialEq, Eq)]
838pub enum SocketType {
839    Listener,
840    FrontClient,
841}
842
843type SessionIsToBeClosed = bool;
844
845#[derive(Clone)]
846pub struct Readiness {
847    /// the current readiness
848    pub event: Ready,
849    /// the readiness we wish to attain
850    pub interest: Ready,
851}
852
853impl Display for Readiness {
854    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
855        let i = &mut [b'-'; 4];
856        let r = &mut [b'-'; 4];
857        let mixed = &mut [b'-'; 4];
858
859        display_ready(i, self.interest);
860        display_ready(r, self.event);
861        display_ready(mixed, self.interest & self.event);
862
863        write!(
864            f,
865            "I({:?})&R({:?})=M({:?})",
866            String::from_utf8_lossy(i),
867            String::from_utf8_lossy(r),
868            String::from_utf8_lossy(mixed)
869        )
870    }
871}
872
873impl Default for Readiness {
874    fn default() -> Self {
875        Self::new()
876    }
877}
878
879impl Readiness {
880    pub const fn new() -> Readiness {
881        Readiness {
882            event: Ready::EMPTY,
883            interest: Ready::EMPTY,
884        }
885    }
886
887    pub fn reset(&mut self) {
888        self.event = Ready::EMPTY;
889        self.interest = Ready::EMPTY;
890    }
891
892    /// filters the readiness we actually want
893    pub fn filter_interest(&self) -> Ready {
894        self.event & self.interest
895    }
896}
897
898pub fn display_ready(s: &mut [u8], readiness: Ready) {
899    if readiness.is_readable() {
900        s[0] = b'R';
901    }
902    if readiness.is_writable() {
903        s[1] = b'W';
904    }
905    if readiness.is_error() {
906        s[2] = b'E';
907    }
908    if readiness.is_hup() {
909        s[3] = b'H';
910    }
911}
912
913pub fn ready_to_string(readiness: Ready) -> String {
914    let s = &mut [b'-'; 4];
915    display_ready(s, readiness);
916    String::from_utf8(s.to_vec()).unwrap()
917}
918
919impl fmt::Debug for Readiness {
920    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
921        let i = &mut [b'-'; 4];
922        let r = &mut [b'-'; 4];
923        let mixed = &mut [b'-'; 4];
924
925        display_ready(i, self.interest);
926        display_ready(r, self.event);
927        display_ready(mixed, self.interest & self.event);
928
929        write!(
930            f,
931            "Readiness {{ interest: {}, readiness: {}, mixed: {} }}",
932            str::from_utf8(i).unwrap(),
933            str::from_utf8(r).unwrap(),
934            str::from_utf8(mixed).unwrap()
935        )
936    }
937}
938
939#[derive(Clone, Debug)]
940pub struct SessionMetrics {
941    /// date at which we started handling that request
942    pub start: Option<Instant>,
943    /// time actually spent handling the request
944    pub service_time: Duration,
945    /// time spent waiting for its turn
946    pub wait_time: Duration,
947    /// bytes received by the frontend
948    pub bin: usize,
949    /// bytes sent by the frontend
950    pub bout: usize,
951
952    /// date at which we started working on the request
953    pub service_start: Option<Instant>,
954    pub wait_start: Instant,
955
956    pub backend_id: Option<String>,
957    pub backend_start: Option<Instant>,
958    pub backend_connected: Option<Instant>,
959    pub backend_stop: Option<Instant>,
960    pub backend_bin: usize,
961    pub backend_bout: usize,
962}
963
964impl SessionMetrics {
965    pub fn new(wait_time: Option<Duration>) -> SessionMetrics {
966        SessionMetrics {
967            start: Some(Instant::now()),
968            service_time: Duration::from_secs(0),
969            wait_time: wait_time.unwrap_or_else(|| Duration::from_secs(0)),
970            bin: 0,
971            bout: 0,
972            service_start: None,
973            wait_start: Instant::now(),
974            backend_id: None,
975            backend_start: None,
976            backend_connected: None,
977            backend_stop: None,
978            backend_bin: 0,
979            backend_bout: 0,
980        }
981    }
982
983    pub fn reset(&mut self) {
984        self.start = None;
985        self.service_time = Duration::from_secs(0);
986        self.wait_time = Duration::from_secs(0);
987        self.bin = 0;
988        self.bout = 0;
989        self.service_start = None;
990        self.backend_start = None;
991        self.backend_connected = None;
992        self.backend_stop = None;
993        self.backend_bin = 0;
994        self.backend_bout = 0;
995    }
996
997    pub fn service_start(&mut self) {
998        let now = Instant::now();
999
1000        if self.start.is_none() {
1001            self.start = Some(now);
1002        }
1003
1004        self.service_start = Some(now);
1005        self.wait_time += now - self.wait_start;
1006    }
1007
1008    pub fn service_stop(&mut self) {
1009        if let Some(start) = self.service_start.take() {
1010            let duration = Instant::now() - start;
1011            self.service_time += duration;
1012        }
1013    }
1014
1015    pub fn wait_start(&mut self) {
1016        self.wait_start = Instant::now();
1017    }
1018
1019    pub fn service_time(&self) -> Duration {
1020        match self.service_start {
1021            Some(start) => {
1022                let last_duration = Instant::now() - start;
1023                self.service_time + last_duration
1024            }
1025            None => self.service_time,
1026        }
1027    }
1028
1029    /// time elapsed since the beginning of the session
1030    pub fn request_time(&self) -> Duration {
1031        match self.start {
1032            Some(start) => Instant::now() - start,
1033            None => Duration::from_secs(0),
1034        }
1035    }
1036
1037    pub fn backend_start(&mut self) {
1038        self.backend_start = Some(Instant::now());
1039    }
1040
1041    pub fn backend_connected(&mut self) {
1042        self.backend_connected = Some(Instant::now());
1043    }
1044
1045    pub fn backend_stop(&mut self) {
1046        self.backend_stop = Some(Instant::now());
1047    }
1048
1049    pub fn backend_response_time(&self) -> Option<Duration> {
1050        match (self.backend_connected, self.backend_stop) {
1051            (Some(start), Some(end)) => Some(end - start),
1052            (Some(start), None) => Some(Instant::now() - start),
1053            _ => None,
1054        }
1055    }
1056
1057    pub fn backend_connection_time(&self) -> Option<Duration> {
1058        match (self.backend_start, self.backend_connected) {
1059            (Some(start), Some(end)) => Some(end - start),
1060            _ => None,
1061        }
1062    }
1063
1064    pub fn register_end_of_session(&self, context: &LogContext) {
1065        let request_time = self.request_time();
1066        let service_time = self.service_time();
1067
1068        if let Some(cluster_id) = context.cluster_id {
1069            time!("request_time", cluster_id, request_time.as_millis());
1070            time!("service_time", cluster_id, service_time.as_millis());
1071        }
1072        time!("request_time", request_time.as_millis());
1073        time!("service_time", service_time.as_millis());
1074
1075        if let Some(backend_id) = self.backend_id.as_ref() {
1076            if let Some(backend_response_time) = self.backend_response_time() {
1077                record_backend_metrics!(
1078                    context.cluster_id.as_str_or("-"),
1079                    backend_id,
1080                    backend_response_time.as_millis(),
1081                    self.backend_connection_time(),
1082                    self.backend_bin,
1083                    self.backend_bout
1084                );
1085            }
1086        }
1087
1088        incr!("access_logs.count", context.cluster_id, context.backend_id);
1089    }
1090}
1091
1092/// exponentially weighted moving average with high sensibility to latency bursts
1093///
1094/// cf Finagle for the original implementation: <https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala>
1095#[derive(Debug, PartialEq, Clone)]
1096pub struct PeakEWMA {
1097    /// decay in nanoseconds
1098    ///
1099    /// higher values will make the EWMA decay slowly to 0
1100    pub decay: f64,
1101    /// estimated RTT in nanoseconds
1102    ///
1103    /// must be set to a high enough default value so that new backends do not
1104    /// get all the traffic right away
1105    pub rtt: f64,
1106    /// last modification
1107    pub last_event: Instant,
1108}
1109
1110impl Default for PeakEWMA {
1111    fn default() -> Self {
1112        Self::new()
1113    }
1114}
1115
1116impl PeakEWMA {
1117    // hardcoded default values for now
1118    pub fn new() -> Self {
1119        PeakEWMA {
1120            // 1s
1121            decay: 1_000_000_000f64,
1122            // 50ms
1123            rtt: 50_000_000f64,
1124            last_event: Instant::now(),
1125        }
1126    }
1127
1128    pub fn observe(&mut self, rtt: f64) {
1129        let now = Instant::now();
1130        let dur = now - self.last_event;
1131
1132        // if latency is rising, we will immediately raise the cost
1133        if rtt > self.rtt {
1134            self.rtt = rtt;
1135        } else {
1136            // new_rtt = old_rtt * e^(-elapsed/decay) + observed_rtt * (1 - e^(-elapsed/decay))
1137            let weight = (-1.0 * dur.as_nanos() as f64 / self.decay).exp();
1138            self.rtt = self.rtt * weight + rtt * (1.0 - weight);
1139        }
1140
1141        self.last_event = now;
1142    }
1143
1144    pub fn get(&mut self, active_requests: usize) -> f64 {
1145        // decay the current value
1146        // (we might not have seen a request in a long time)
1147        self.observe(0.0);
1148
1149        (active_requests + 1) as f64 * self.rtt
1150    }
1151}
1152
1153pub mod testing {
1154    pub use std::{cell::RefCell, os::fd::IntoRawFd, rc::Rc};
1155
1156    pub use anyhow::Context;
1157    pub use mio::{net::UnixStream, Poll, Registry, Token};
1158    pub use slab::Slab;
1159    pub use sozu_command::{
1160        proto::command::{
1161            HttpListenerConfig, HttpsListenerConfig, ServerConfig, TcpListenerConfig,
1162        },
1163        scm_socket::{Listeners, ScmSocket},
1164    };
1165
1166    pub use crate::{
1167        backends::BackendMap,
1168        http::HttpProxy,
1169        https::HttpsProxy,
1170        pool::Pool,
1171        server::Server,
1172        server::{ListenSession, ProxyChannel, SessionManager},
1173        tcp::TcpProxy,
1174        Protocol, ProxySession,
1175    };
1176
1177    /// Everything needed to create a Server
1178    pub struct ServerParts {
1179        pub event_loop: Poll,
1180        pub registry: Registry,
1181        pub sessions: Rc<RefCell<SessionManager>>,
1182        pub pool: Rc<RefCell<Pool>>,
1183        pub backends: Rc<RefCell<BackendMap>>,
1184        pub client_scm_socket: ScmSocket,
1185        pub server_scm_socket: ScmSocket,
1186        pub server_config: ServerConfig,
1187    }
1188
1189    /// Setup a standalone server, for testing purposes
1190    pub fn prebuild_server(
1191        max_buffers: usize,
1192        buffer_size: usize,
1193        send_scm: bool,
1194    ) -> anyhow::Result<ServerParts> {
1195        let event_loop = Poll::new().with_context(|| "Failed at creating event loop")?;
1196        let backends = Rc::new(RefCell::new(BackendMap::new()));
1197        let server_config = ServerConfig {
1198            max_connections: max_buffers as u64,
1199            ..Default::default()
1200        };
1201
1202        let pool = Rc::new(RefCell::new(Pool::with_capacity(
1203            1,
1204            max_buffers,
1205            buffer_size,
1206        )));
1207
1208        let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
1209        {
1210            let entry = sessions.vacant_entry();
1211            info!("taking token {:?} for channel", entry.key());
1212            entry.insert(Rc::new(RefCell::new(ListenSession {
1213                protocol: Protocol::Channel,
1214            })));
1215        }
1216        {
1217            let entry = sessions.vacant_entry();
1218            info!("taking token {:?} for timer", entry.key());
1219            entry.insert(Rc::new(RefCell::new(ListenSession {
1220                protocol: Protocol::Timer,
1221            })));
1222        }
1223        {
1224            let entry = sessions.vacant_entry();
1225            info!("taking token {:?} for metrics", entry.key());
1226            entry.insert(Rc::new(RefCell::new(ListenSession {
1227                protocol: Protocol::Metrics,
1228            })));
1229        }
1230        let sessions = SessionManager::new(sessions, max_buffers);
1231
1232        let registry = event_loop
1233            .registry()
1234            .try_clone()
1235            .with_context(|| "Failed at creating a registry")?;
1236
1237        let (scm_server, scm_client) =
1238            UnixStream::pair().with_context(|| "Failed at creating scm unix stream")?;
1239        let client_scm_socket = ScmSocket::new(scm_client.into_raw_fd())
1240            .with_context(|| "Failed at creating the scm client socket")?;
1241        let server_scm_socket = ScmSocket::new(scm_server.into_raw_fd())
1242            .with_context(|| "Failed at creating the scm server socket")?;
1243        if send_scm {
1244            client_scm_socket
1245                .send_listeners(&Listeners::default())
1246                .with_context(|| "Failed at sending empty listeners")?;
1247        }
1248
1249        Ok(ServerParts {
1250            event_loop,
1251            registry,
1252            sessions,
1253            pool,
1254            backends,
1255            client_scm_socket,
1256            server_scm_socket,
1257            server_config,
1258        })
1259    }
1260}