sozu_lib/
lib.rs

1//! ## What this library does
2//!
3//! This library provides tools to build and start HTTP, HTTPS and TCP reverse proxies.
4//!
5//! The proxies handles network polling, HTTP parsing, TLS in a fast single threaded event
6//! loop.
7//!
8//! Each proxy is designed to receive configuration changes at runtime instead of
9//! reloading from a file regularly. The event loop runs in its own thread
10//! and receives commands through a message queue.
11//!
12//! ## Difference with the crate `sozu`
13//!
14//! To create several workers and manage them all at once (which is the most common way to
15//! use Sōzu), the crate `sozu` is more indicated than using the lib directly.
16//!
17//! The crate `sozu` provides a binary called the main process.
18//! The main process uses `sozu_lib` to start and manage workers.
19//! Each worker can handle HTTP, HTTPS and TCP traffic.
20//! The main process receives synchronizes the state of all workers, using UNIX sockets
21//! and custom channels to communicate with them.
22//! The main process itself is is configurable with a file, and has a CLI.
23//!
24//! ## How to use this library directly
25//!
26//! This documentation here explains how to write a binary that will start a single Sōzu
27//! worker and give it orders. The method has two steps:
28//!
29//! 1. Starts a Sōzu worker in a distinct thread
30//! 2. sends instructions to the worker on a UNIX socket via a Sōzu channel
31//!
32//! ### How to start a Sōzu worker
33//!
34//! Before creating an HTTP proxy, we first need to create an HTTP listener.
35//! The listener is an abstraction around a TCP socket provided by the kernel.
36//! We need the `sozu_command_lib` to build a listener.
37//!
38//! ```
39//! use sozu_command_lib::{config::ListenerBuilder, proto::command::SocketAddress};
40//!
41//! let address = SocketAddress::new_v4(127,0,0,1,8080);
42//! let http_listener = ListenerBuilder::new_http(address)
43//!     .to_http(None)
44//!     .expect("Could not create HTTP listener");
45//! ```
46//!
47//! The `http_listener` is of the type `HttpListenerConfig`, that we can be sent to the worker
48//! to start the proxy.
49//!
50//! Then create a pair of channels to communicate with the proxy.
51//! The channel is a wrapper around a unix socket.
52//!
53//! ```ignore
54//! use sozu_command_lib::{
55//!     channel::Channel,
56//!     proto::command::{WorkerRequest, WorkerResponse},
57//! };
58//!
59//! let (mut command_channel, proxy_channel): (
60//!     Channel<WorkerRequest, WorkerResponse>,
61//!     Channel<WorkerResponse, WorkerRequest>,
62//! ) = Channel::generate(1000, 10000).expect("should create a channel");
63//!```
64//!
65//! Here, the `command_channel` end is blocking, it sends `WorkerRequest`s and receives
66//! `WorkerResponses`, while the `proxy_channel` end is non-blocking, and the types are reversed.
67//! Writing the types here isn't even necessary thanks to the compiler,
68//! but it brings the point accross.
69//!
70//! You can now launch the worker in a separate thread, providing the HTTP listener config,
71//! the proxy end of the channel, and your custom number of buffers and their size:
72//!
73//! ```ignore
74//! use std::thread;
75//!
76//! let worker_thread_join_handle = thread::spawn(move || {
77//!     let max_buffers = 500;
78//!     let buffer_size = 16384;
79//!     sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size);
80//! });
81//! ```
82//!
83//! ### Send orders
84//!
85//! Once the thread is launched, the proxy worker will start its event loop and handle
86//! events on the listening interface and port specified when building the HTTP Listener.
87//! Since no frontends or backends were specified for the proxy, it will receive
88//! the connections, parse the requests, then send a default (but configurable)
89//! answer.
90//!
91//! Before defining a frontend and backends, we need to define a cluster, which describes
92//! a routing configuration. A cluster contains:
93//!
94//! - one frontend
95//! - one or several backends
96//! - routing rules
97//!
98//! A cluster is identified by its `cluster_id`, which will be used to define frontends
99//! and backends later on.
100//!
101//! ```
102//! use sozu_command_lib::proto::command::{Cluster, LoadBalancingAlgorithms};
103//!
104//! let cluster = Cluster {
105//!     cluster_id: "my-cluster".to_string(),
106//!     sticky_session: false,
107//!     https_redirect: false,
108//!     load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
109//!     answer_503: Some("A custom forbidden message".to_string()),
110//!     ..Default::default()
111//! };
112//! ```
113//!
114//! The defaults are sensible, so we could define only the `cluster_id`.
115//!
116//! We can now define a frontend. A frontend is a way to recognize a request and match
117//! it to a `cluster_id`, depending on the hostname and the beginning of the URL path.
118//! The `address` field must match the one of the HTTP listener we defined before:
119//!
120//! ```
121//! use std::collections::BTreeMap;
122//!
123//!  use sozu_command_lib::proto::command::{PathRule, RequestHttpFrontend, RulePosition, SocketAddress};
124//!
125//! let http_front = RequestHttpFrontend {
126//!     cluster_id: Some("my-cluster".to_string()),
127//!     address: SocketAddress::new_v4(127,0,0,1,8080),
128//!     hostname: "example.com".to_string(),
129//!     path: PathRule::prefix(String::from("/")),
130//!     position: RulePosition::Pre.into(),
131//!     tags: BTreeMap::from([
132//!         ("owner".to_owned(), "John".to_owned()),
133//!         ("id".to_owned(), "my-own-http-front".to_owned()),
134//!     ]),
135//!     ..Default::default()
136//! };
137//! ```
138//!
139//! The `tags` are keys and values that will appear in the access logs,
140//! which can come in handy.
141//!
142//! Now let's define a backend.
143//! A backend is an instance of a backend application we want to route traffic to.
144//! The `address` field must match the IP and port of the backend server.
145//!
146//! ```
147//! use sozu_command_lib::proto::command::{AddBackend, LoadBalancingParams, SocketAddress};
148//!
149//! let http_backend = AddBackend {
150//!     cluster_id: "my-cluster".to_string(),
151//!     backend_id: "test-backend".to_string(),
152//!     address: SocketAddress::new_v4(127,0,0,1,8000),
153//!     load_balancing_parameters: Some(LoadBalancingParams::default()),
154//!     ..Default::default()
155//! };
156//! ```
157//!
158//! A cluster can have multiple backend servers, and they can be added or
159//! removed while the proxy is running. If a backend is removed from the configuration
160//! while the proxy is handling a request to that server, it will finish that
161//! request and stop sending new traffic to that server.
162//!
163//!
164//! Now we can use the other end of the channel to send all these requests to the worker,
165//! using the WorkerRequest type:
166//!
167//! ```ignore
168//! use sozu_command_lib::{
169//!     proto::command::{Request, request::RequestType, WorkerRequest},
170//! };
171//!
172//! command_channel
173//!     .write_message(&WorkerRequest {
174//!         id: String::from("add-the-cluster"),
175//!         content: RequestType::AddCluster(cluster).into(),
176//!     })
177//!     .expect("Could not send AddHttpFrontend request");
178//!
179//! command_channel
180//!     .write_message(&WorkerRequest {
181//!         id: String::from("add-the-frontend"),
182//!         content: RequestType::AddHttpFrontend(http_front).into(),
183//!     })
184//!     .expect("Could not send AddHttpFrontend request");
185//!
186//! command_channel
187//!     .write_message(&WorkerRequest {
188//!         id: String::from("add-the-backend"),
189//!         content: RequestType::AddBackend(http_backend).into(),
190//!     })
191//!     .expect("Could not send AddBackend request");
192//!
193//! println!("HTTP -> {:?}", command_channel.read_message());
194//! println!("HTTP -> {:?}", command_channel.read_message());
195//! println!("HTTP -> {:?}", command_channel.read_message());
196//! ```
197//!
198//!
199//! The event loop of the worker will process these instructions and add them to
200//! its state, and the worker will send back an acknowledgement
201//! message.
202//!
203//! Now we can let the worker thread run in the background:
204//!
205//! ```ignore
206//! let _ = worker_thread_join_handle.join();
207//! ```
208//!
209//! Here is the complete example for reference, it matches the `examples/http.rs` example:
210//!
211//! ```
212//! #[macro_use]
213//! extern crate sozu_command_lib;
214//!
215//! use std::{collections::BTreeMap, env, io::stdout, thread};
216//!
217//! use anyhow::Context;
218//! use sozu_command_lib::{
219//!     channel::Channel,
220//!     config::ListenerBuilder,
221//!     logging::setup_default_logging,
222//!     proto::command::{
223//!         request::RequestType, AddBackend, Cluster, LoadBalancingAlgorithms, LoadBalancingParams,
224//!         PathRule, Request, RequestHttpFrontend, RulePosition, SocketAddress,WorkerRequest,
225//!     },
226//! };
227//!
228//! fn main() -> anyhow::Result<()> {
229//!     setup_default_logging(true, "info", "EXAMPLE").with_context(|| "could not setup logging")?;
230//!
231//!     info!("starting up");
232//!
233//!     let http_listener = ListenerBuilder::new_http(SocketAddress::new_v4(127,0,0,1,8080))
234//!         .to_http(None)
235//!         .expect("Could not create HTTP listener");
236//!
237//!     let (mut command_channel, proxy_channel) =
238//!         Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
239//!
240//!     let worker_thread_join_handle = thread::spawn(move || {
241//!         let max_buffers = 500;
242//!         let buffer_size = 16384;
243//!         sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size)
244//!             .expect("The worker could not be started, or shut down");
245//!     });
246//!
247//!     let cluster = Cluster {
248//!         cluster_id: "my-cluster".to_string(),
249//!         sticky_session: false,
250//!         https_redirect: false,
251//!         load_balancing: LoadBalancingAlgorithms::RoundRobin as i32,
252//!         answer_503: Some("A custom forbidden message".to_string()),
253//!         ..Default::default()
254//!     };
255//!
256//!     let http_front = RequestHttpFrontend {
257//!         cluster_id: Some("my-cluster".to_string()),
258//!         address: SocketAddress::new_v4(127,0,0,1,8080),
259//!         hostname: "example.com".to_string(),
260//!         path: PathRule::prefix(String::from("/")),
261//!         position: RulePosition::Pre.into(),
262//!         tags: BTreeMap::from([
263//!             ("owner".to_owned(), "John".to_owned()),
264//!             ("id".to_owned(), "my-own-http-front".to_owned()),
265//!         ]),
266//!         ..Default::default()
267//!     };
268//!     let http_backend = AddBackend {
269//!         cluster_id: "my-cluster".to_string(),
270//!         backend_id: "test-backend".to_string(),
271//!         address: SocketAddress::new_v4(127,0,0,1,8000),
272//!         load_balancing_parameters: Some(LoadBalancingParams::default()),
273//!         ..Default::default()
274//!     };
275//!
276//!     command_channel
277//!         .write_message(&WorkerRequest {
278//!             id: String::from("add-the-cluster"),
279//!             content: RequestType::AddCluster(cluster).into(),
280//!         })
281//!         .expect("Could not send AddHttpFrontend request");
282//!
283//!     command_channel
284//!         .write_message(&WorkerRequest {
285//!             id: String::from("add-the-frontend"),
286//!             content: RequestType::AddHttpFrontend(http_front).into(),
287//!         })
288//!         .expect("Could not send AddHttpFrontend request");
289//!
290//!     command_channel
291//!         .write_message(&WorkerRequest {
292//!             id: String::from("add-the-backend"),
293//!             content: RequestType::AddBackend(http_backend).into(),
294//!         })
295//!         .expect("Could not send AddBackend request");
296//!
297//!     println!("HTTP -> {:?}", command_channel.read_message());
298//!     println!("HTTP -> {:?}", command_channel.read_message());
299//!
300//!     // uncomment to let it run in the background
301//!     // let _ = worker_thread_join_handle.join();
302//!     info!("good bye");
303//!     Ok(())
304//! }
305//! ```
306
307#[macro_use]
308extern crate sozu_command_lib as sozu_command;
309#[cfg(test)]
310#[macro_use]
311extern crate quickcheck;
312
313#[macro_use]
314pub mod util;
315#[macro_use]
316pub mod metrics;
317
318pub mod backends;
319pub mod features;
320pub mod http;
321pub mod load_balancing;
322pub mod pool;
323pub mod protocol;
324pub mod retry;
325pub mod router;
326pub mod socket;
327pub mod timer;
328pub mod tls;
329
330/// unused for now but may be usefull for bypassing sozu on a low level
331#[cfg(feature = "splice")]
332mod splice;
333
334pub mod server;
335pub mod tcp;
336
337pub mod https;
338
339use std::{
340    cell::RefCell,
341    collections::{BTreeMap, HashMap},
342    fmt::{self, Display, Formatter},
343    net::SocketAddr,
344    rc::Rc,
345    str,
346    time::{Duration, Instant},
347};
348
349use backends::BackendError;
350use hex::FromHexError;
351use mio::{Interest, Token, net::TcpStream};
352use protocol::http::{answers::TemplateError, parser::Method};
353use router::RouterError;
354use socket::ServerBindError;
355use sozu_command::{
356    AsStr, ObjectKind,
357    logging::{CachedTags, LogContext},
358    proto::command::{Cluster, ListenerType, RequestHttpFrontend, WorkerRequest, WorkerResponse},
359    ready::Ready,
360    state::ClusterId,
361};
362use tls::CertificateResolverError;
363
364use crate::{backends::BackendMap, router::Route};
365
366/// Anything that can be registered in mio (subscribe to kernel events)
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368pub enum Protocol {
369    HTTP,
370    HTTPS,
371    TCP,
372    HTTPListen,
373    HTTPSListen,
374    TCPListen,
375    Channel,
376    Metrics,
377    Timer,
378}
379
380/// trait that must be implemented by listeners and client sessions
381pub trait ProxySession {
382    /// indicates the protocol associated with the session
383    ///
384    /// this is used to distinguish sessions from listenrs, channels, metrics
385    /// and timers
386    fn protocol(&self) -> Protocol;
387    /// if a session received an event or can still execute, the event loop will
388    /// call this method. Its result indicates if it can still execute, needs to
389    /// connect to a backend server, close the session
390    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed;
391    /// if the event loop got an event for a token associated with the session,
392    /// it will call this method on the session
393    fn update_readiness(&mut self, token: Token, events: Ready);
394    /// close a session, frontend and backend sockets,
395    /// remove the entries from the session manager slab
396    fn close(&mut self);
397    /// if a timeout associated with the session triggers, the event loop will
398    /// call this method with the timeout's token
399    fn timeout(&mut self, t: Token) -> SessionIsToBeClosed;
400    /// last time the session got an event
401    fn last_event(&self) -> Instant;
402    /// display the session's internal state (for debugging purpose)
403    fn print_session(&self);
404    /// get the token associated with the frontend
405    fn frontend_token(&self) -> Token;
406    /// tell the session it has to shut down if possible
407    ///
408    /// if the session handles HTTP requests, it will not close until the response
409    /// is completely sent back to the client
410    fn shutting_down(&mut self) -> SessionIsToBeClosed;
411}
412
413#[macro_export]
414macro_rules! branch {
415    (if $($value:ident)? == $expected:ident { $($then:tt)* } else { $($else:tt)* }) => {
416        macro_rules! expect {
417            ($expected) => {$($then)*};
418            ($a:ident) => {$($else)*};
419            () => {$($else)*}
420        }
421        expect!($($value)?);
422    };
423    (if $($value:ident)? == $expected:ident { $($then:tt)* } ) => {
424        macro_rules! expect {
425            ($expected) => {$($then)*};
426        }
427        expect!($($value)?);
428    };
429}
430
431#[macro_export]
432macro_rules! fallback {
433    ({} $($default:tt)*) => {
434        $($default)*
435    };
436    ({$($value:tt)+} $($default:tt)*) => {
437        $($value)+
438    };
439}
440
441#[macro_export]
442macro_rules! StateMachineBuilder {
443    (
444        ($d:tt)
445        $(#[$($state_macros:tt)*])*
446        enum $state_name:ident $(impl $trait:ident)?  {
447            $($(#[$($variant_macros:tt)*])*
448            $variant_name:ident($state:ty$(,$($aux:ty),+)?) $(-> $override:expr)?),+ $(,)?
449        }
450    ) => {
451        /// A summary of the last valid State
452        #[derive(Clone, Copy, Debug)]
453        pub enum StateMarker {
454            $($variant_name,)+
455        }
456
457        $(#[$($state_macros)*])*
458        pub enum $state_name {
459            $(
460                $(#[$($variant_macros)*])*
461                $variant_name($state$(,$($aux),+)?),
462            )+
463            /// Informs about upgrade failure, contains a summary the last valid State
464            FailedUpgrade(StateMarker),
465        }
466
467        macro_rules! _fn_impl {
468            ($function:ident(&$d($mut:ident)?, self $d(,$arg_name:ident: $arg_type:ty)*) $d(-> $ret:ty)? $d(| $marker:tt => $fail:expr)?) => {
469                fn $function(&$d($mut)? self $d(,$arg_name: $arg_type)*) $d(-> $ret)? {
470                    match self {
471                        $($state_name::$variant_name(_state, ..) => $crate::fallback!({$($override)?} _state.$function($d($arg_name),*)),)+
472                        $state_name::FailedUpgrade($crate::fallback!({$d($marker)?} _)) => $crate::fallback!({$d($fail)?} unreachable!())
473                    }
474                }
475            };
476        }
477
478        impl $state_name {
479            /// Informs about the last valid State before upgrade failure
480            fn marker(&self) -> StateMarker {
481                match self {
482                    $($state_name::$variant_name(..) => StateMarker::$variant_name,)+
483                    $state_name::FailedUpgrade(marker) => *marker,
484                }
485            }
486            /// Returns wether or not the State is FailedUpgrade
487            fn failed(&self) -> bool {
488                match self {
489                    $state_name::FailedUpgrade(_) => true,
490                    _ => false,
491                }
492            }
493            /// Gives back an owned version of the State,
494            /// leaving a FailedUpgrade in its place.
495            /// The FailedUpgrade retains the marker of the previous State.
496            fn take(&mut self) -> $state_name {
497                let mut owned_state = $state_name::FailedUpgrade(self.marker());
498                std::mem::swap(&mut owned_state, self);
499                owned_state
500            }
501            _fn_impl!{front_socket(&, self) -> &mio::net::TcpStream}
502        }
503
504        $crate::branch!{
505            if $($trait)? == SessionState {
506                impl SessionState for $state_name {
507                    _fn_impl!{ready(&mut, self, session: Rc<RefCell<dyn ProxySession>>, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) -> SessionResult}
508                    _fn_impl!{update_readiness(&mut, self, token: Token, events: Ready)}
509                    _fn_impl!{timeout(&mut, self, token: Token, metrics: &mut SessionMetrics) -> StateResult}
510                    _fn_impl!{cancel_timeouts(&mut, self)}
511                    _fn_impl!{print_state(&, self, context: &str) | marker => error!("{} Session(FailedUpgrade({:?}))", context, marker)}
512                    _fn_impl!{close(&mut, self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) | _ => {}}
513                    _fn_impl!{shutting_down(&mut, self) -> SessionIsToBeClosed | _ => true}
514                }
515            } else {}
516        }
517    };
518    ($($tt:tt)+) => {
519        StateMachineBuilder!{($) $($tt)+}
520    }
521}
522
523pub trait ListenerHandler {
524    fn get_addr(&self) -> &SocketAddr;
525
526    fn get_tags(&self, key: &str) -> Option<&CachedTags>;
527
528    fn get_concatenated_tags(&self, key: &str) -> Option<&str> {
529        self.get_tags(key).map(|tags| tags.concatenated.as_str())
530    }
531
532    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>);
533}
534
535#[derive(thiserror::Error, Debug)]
536pub enum FrontendFromRequestError {
537    #[error("Could not parse hostname from '{host}': {error}")]
538    HostParse { host: String, error: String },
539    #[error("invalid remaining chars after hostname. Host: {0}")]
540    InvalidCharsAfterHost(String),
541    #[error("no cluster: {0}")]
542    NoClusterFound(RouterError),
543}
544
545pub trait L7ListenerHandler {
546    fn get_sticky_name(&self) -> &str;
547
548    fn get_connect_timeout(&self) -> u32;
549
550    /// retrieve a frontend by parsing a request's hostname, uri and method
551    fn frontend_from_request(
552        &self,
553        host: &str,
554        uri: &str,
555        method: &Method,
556    ) -> Result<Route, FrontendFromRequestError>;
557}
558
559#[derive(Clone, Copy, Debug, PartialEq, Eq)]
560pub enum BackendConnectionStatus {
561    NotConnected,
562    Connecting(Instant),
563    Connected,
564}
565
566impl BackendConnectionStatus {
567    pub fn is_connecting(&self) -> bool {
568        matches!(self, BackendConnectionStatus::Connecting(_))
569    }
570}
571
572#[derive(Debug, PartialEq, Eq)]
573pub enum BackendConnectAction {
574    New,
575    Reuse,
576    Replace,
577}
578
579#[derive(thiserror::Error, Debug)]
580pub enum BackendConnectionError {
581    #[error("Not found: {0:?}")]
582    NotFound(ObjectKind),
583    #[error("Too many connections on cluster {0:?}")]
584    MaxConnectionRetries(Option<String>),
585    #[error("the sessions slab has reached maximum capacity")]
586    MaxSessionsMemory,
587    #[error("error from the backend: {0}")]
588    Backend(BackendError),
589    #[error("failed to retrieve the cluster: {0}")]
590    RetrieveClusterError(RetrieveClusterError),
591}
592
593/// used in kawa_h1 module for the Http session state
594#[derive(thiserror::Error, Debug)]
595pub enum RetrieveClusterError {
596    #[error("No method given")]
597    NoMethod,
598    #[error("No host given")]
599    NoHost,
600    #[error("No path given")]
601    NoPath,
602    #[error("unauthorized route")]
603    UnauthorizedRoute,
604    #[error("{0}")]
605    RetrieveFrontend(FrontendFromRequestError),
606}
607
608/// Used in sessions
609#[derive(Debug, PartialEq, Eq)]
610pub enum AcceptError {
611    IoError,
612    TooManySessions,
613    WouldBlock,
614    RegisterError,
615    WrongSocketAddress,
616    BufferCapacityReached,
617}
618
619/// returned by the HTTP, HTTPS and TCP listeners
620#[derive(thiserror::Error, Debug)]
621pub enum ListenerError {
622    #[error("failed to handle certificate request, got a resolver error, {0}")]
623    Resolver(CertificateResolverError),
624    #[error("failed to parse pem, {0}")]
625    PemParse(String),
626    #[error("failed to parse template {0}: {1}")]
627    TemplateParse(u16, TemplateError),
628    #[error("failed to build rustls context, {0}")]
629    BuildRustls(String),
630    #[error("could not activate listener with address {address:?}: {error}")]
631    Activation { address: SocketAddr, error: String },
632    #[error("Could not register listener socket: {0}")]
633    SocketRegistration(std::io::Error),
634    #[error("could not add frontend: {0}")]
635    AddFrontend(RouterError),
636    #[error("could not remove frontend: {0}")]
637    RemoveFrontend(RouterError),
638}
639
640/// Returned by the HTTP, HTTPS and TCP proxies
641#[derive(thiserror::Error, Debug)]
642pub enum ProxyError {
643    #[error("error while soft stopping {proxy_protocol} proxy: {error}")]
644    SoftStop {
645        proxy_protocol: String,
646        error: String,
647    },
648    #[error("error while hard stopping {proxy_protocol} proxy: {error}")]
649    HardStop {
650        proxy_protocol: String,
651        error: String,
652    },
653    #[error("found no listener with address {0:?}")]
654    NoListenerFound(SocketAddr),
655    #[error("a listener is already present for this token")]
656    ListenerAlreadyPresent,
657    #[error("could not add listener: {0}")]
658    AddListener(ListenerError),
659    #[error("could not add cluster: {0}")]
660    AddCluster(ListenerError),
661    #[error("failed to activate listener with address {address:?}: {listener_error}")]
662    ListenerActivation {
663        address: SocketAddr,
664        listener_error: ListenerError,
665    },
666    #[error("can not add frontend {front:?}: {error}")]
667    WrongInputFrontend {
668        front: RequestHttpFrontend,
669        error: String,
670    },
671    #[error("could not add frontend: {0}")]
672    AddFrontend(ListenerError),
673    #[error("could not remove frontend: {0}")]
674    RemoveFrontend(ListenerError),
675    #[error("could not add certificate: {0}")]
676    AddCertificate(CertificateResolverError),
677    #[error("could not remove certificate: {0}")]
678    RemoveCertificate(CertificateResolverError),
679    #[error("could not replace certificate: {0}")]
680    ReplaceCertificate(CertificateResolverError),
681    #[error("wrong certificate fingerprint: {0}")]
682    WrongCertificateFingerprint(FromHexError),
683    #[error("this request is not supported by the proxy")]
684    UnsupportedMessage,
685    #[error("failed to acquire the lock, {0}")]
686    Lock(String),
687    #[error("could not bind to socket {0:?}: {1}")]
688    BindToSocket(SocketAddr, ServerBindError),
689    #[error("error registering socket of listener: {0}")]
690    RegisterListener(std::io::Error),
691    #[error("the listener is not activated")]
692    UnactivatedListener,
693}
694
695use self::server::ListenToken;
696pub trait ProxyConfiguration {
697    fn notify(&mut self, message: WorkerRequest) -> WorkerResponse;
698    fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError>;
699    fn create_session(
700        &mut self,
701        socket: TcpStream,
702        token: ListenToken,
703        wait_time: Duration,
704        proxy: Rc<RefCell<Self>>,
705        // should we insert the tags here?
706    ) -> Result<(), AcceptError>;
707}
708
709pub trait L7Proxy {
710    fn kind(&self) -> ListenerType;
711
712    fn register_socket(
713        &self,
714        socket: &mut TcpStream,
715        token: Token,
716        interest: Interest,
717    ) -> Result<(), std::io::Error>;
718
719    fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error>;
720
721    fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token;
722
723    /// Remove the session from the session manager slab.
724    /// Returns true if the session was actually there before deletion
725    fn remove_session(&self, token: Token) -> bool;
726
727    fn backends(&self) -> Rc<RefCell<BackendMap>>;
728
729    fn clusters(&self) -> &HashMap<ClusterId, Cluster>;
730}
731
732#[derive(Debug, PartialEq, Eq)]
733pub enum RequiredEvents {
734    FrontReadBackNone,
735    FrontWriteBackNone,
736    FrontReadWriteBackNone,
737    FrontNoneBackNone,
738    FrontReadBackRead,
739    FrontWriteBackRead,
740    FrontReadWriteBackRead,
741    FrontNoneBackRead,
742    FrontReadBackWrite,
743    FrontWriteBackWrite,
744    FrontReadWriteBackWrite,
745    FrontNoneBackWrite,
746    FrontReadBackReadWrite,
747    FrontWriteBackReadWrite,
748    FrontReadWriteBackReadWrite,
749    FrontNoneBackReadWrite,
750}
751
752impl RequiredEvents {
753    pub fn front_readable(&self) -> bool {
754        matches!(
755            *self,
756            RequiredEvents::FrontReadBackNone
757                | RequiredEvents::FrontReadWriteBackNone
758                | RequiredEvents::FrontReadBackRead
759                | RequiredEvents::FrontReadWriteBackRead
760                | RequiredEvents::FrontReadBackWrite
761                | RequiredEvents::FrontReadWriteBackWrite
762                | RequiredEvents::FrontReadBackReadWrite
763                | RequiredEvents::FrontReadWriteBackReadWrite
764        )
765    }
766
767    pub fn front_writable(&self) -> bool {
768        matches!(
769            *self,
770            RequiredEvents::FrontWriteBackNone
771                | RequiredEvents::FrontReadWriteBackNone
772                | RequiredEvents::FrontWriteBackRead
773                | RequiredEvents::FrontReadWriteBackRead
774                | RequiredEvents::FrontWriteBackWrite
775                | RequiredEvents::FrontReadWriteBackWrite
776                | RequiredEvents::FrontWriteBackReadWrite
777                | RequiredEvents::FrontReadWriteBackReadWrite
778        )
779    }
780
781    pub fn back_readable(&self) -> bool {
782        matches!(
783            *self,
784            RequiredEvents::FrontReadBackRead
785                | RequiredEvents::FrontWriteBackRead
786                | RequiredEvents::FrontReadWriteBackRead
787                | RequiredEvents::FrontNoneBackRead
788                | RequiredEvents::FrontReadBackReadWrite
789                | RequiredEvents::FrontWriteBackReadWrite
790                | RequiredEvents::FrontReadWriteBackReadWrite
791                | RequiredEvents::FrontNoneBackReadWrite
792        )
793    }
794
795    pub fn back_writable(&self) -> bool {
796        matches!(
797            *self,
798            RequiredEvents::FrontReadBackWrite
799                | RequiredEvents::FrontWriteBackWrite
800                | RequiredEvents::FrontReadWriteBackWrite
801                | RequiredEvents::FrontNoneBackWrite
802                | RequiredEvents::FrontReadBackReadWrite
803                | RequiredEvents::FrontWriteBackReadWrite
804                | RequiredEvents::FrontReadWriteBackReadWrite
805                | RequiredEvents::FrontNoneBackReadWrite
806        )
807    }
808}
809
810/// Signals transitions between states of a given Protocol
811#[derive(Debug, PartialEq, Eq)]
812pub enum StateResult {
813    /// Signals to the Protocol to close its backend
814    CloseBackend,
815    /// Signals to the parent Session to close itself
816    CloseSession,
817    /// Signals to the Protocol to connect to backend
818    ConnectBackend,
819    /// Signals to the Protocol to continue
820    Continue,
821    /// Signals to the parent Session to upgrade to the next Protocol
822    Upgrade,
823}
824
825/// Signals transitions between states of a given Session
826#[derive(Debug, Clone, Copy, PartialEq, Eq)]
827pub enum SessionResult {
828    /// Signals to the Session to close itself
829    Close,
830    /// Signals to the Session to continue
831    Continue,
832    /// Signals to the Session to upgrade its Protocol
833    Upgrade,
834}
835
836#[derive(Debug, PartialEq, Eq)]
837pub enum SocketType {
838    Listener,
839    FrontClient,
840}
841
842type SessionIsToBeClosed = bool;
843
844#[derive(Clone)]
845pub struct Readiness {
846    /// the current readiness
847    pub event: Ready,
848    /// the readiness we wish to attain
849    pub interest: Ready,
850}
851
852impl Display for Readiness {
853    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
854        let i = &mut [b'-'; 4];
855        let r = &mut [b'-'; 4];
856        let mixed = &mut [b'-'; 4];
857
858        display_ready(i, self.interest);
859        display_ready(r, self.event);
860        display_ready(mixed, self.interest & self.event);
861
862        write!(
863            f,
864            "I({:?})&R({:?})=M({:?})",
865            String::from_utf8_lossy(i),
866            String::from_utf8_lossy(r),
867            String::from_utf8_lossy(mixed)
868        )
869    }
870}
871
872impl Default for Readiness {
873    fn default() -> Self {
874        Self::new()
875    }
876}
877
878impl Readiness {
879    pub const fn new() -> Readiness {
880        Readiness {
881            event: Ready::EMPTY,
882            interest: Ready::EMPTY,
883        }
884    }
885
886    pub fn reset(&mut self) {
887        self.event = Ready::EMPTY;
888        self.interest = Ready::EMPTY;
889    }
890
891    /// filters the readiness we actually want
892    pub fn filter_interest(&self) -> Ready {
893        self.event & self.interest
894    }
895}
896
897pub fn display_ready(s: &mut [u8], readiness: Ready) {
898    if readiness.is_readable() {
899        s[0] = b'R';
900    }
901    if readiness.is_writable() {
902        s[1] = b'W';
903    }
904    if readiness.is_error() {
905        s[2] = b'E';
906    }
907    if readiness.is_hup() {
908        s[3] = b'H';
909    }
910}
911
912pub fn ready_to_string(readiness: Ready) -> String {
913    let s = &mut [b'-'; 4];
914    display_ready(s, readiness);
915    String::from_utf8(s.to_vec()).unwrap()
916}
917
918impl fmt::Debug for Readiness {
919    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
920        let i = &mut [b'-'; 4];
921        let r = &mut [b'-'; 4];
922        let mixed = &mut [b'-'; 4];
923
924        display_ready(i, self.interest);
925        display_ready(r, self.event);
926        display_ready(mixed, self.interest & self.event);
927
928        write!(
929            f,
930            "Readiness {{ interest: {}, readiness: {}, mixed: {} }}",
931            str::from_utf8(i).unwrap(),
932            str::from_utf8(r).unwrap(),
933            str::from_utf8(mixed).unwrap()
934        )
935    }
936}
937
938#[derive(Clone, Debug)]
939pub struct SessionMetrics {
940    /// date at which we started handling that request
941    pub start: Option<Instant>,
942    /// time actually spent handling the request
943    pub service_time: Duration,
944    /// time spent waiting for its turn
945    pub wait_time: Duration,
946    /// bytes received by the frontend
947    pub bin: usize,
948    /// bytes sent by the frontend
949    pub bout: usize,
950
951    /// date at which we started working on the request
952    pub service_start: Option<Instant>,
953    pub wait_start: Instant,
954
955    pub backend_id: Option<String>,
956    pub backend_start: Option<Instant>,
957    pub backend_connected: Option<Instant>,
958    pub backend_stop: Option<Instant>,
959    pub backend_bin: usize,
960    pub backend_bout: usize,
961}
962
963impl SessionMetrics {
964    pub fn new(wait_time: Option<Duration>) -> SessionMetrics {
965        SessionMetrics {
966            start: Some(Instant::now()),
967            service_time: Duration::from_secs(0),
968            wait_time: wait_time.unwrap_or_else(|| Duration::from_secs(0)),
969            bin: 0,
970            bout: 0,
971            service_start: None,
972            wait_start: Instant::now(),
973            backend_id: None,
974            backend_start: None,
975            backend_connected: None,
976            backend_stop: None,
977            backend_bin: 0,
978            backend_bout: 0,
979        }
980    }
981
982    pub fn reset(&mut self) {
983        self.start = None;
984        self.service_time = Duration::from_secs(0);
985        self.wait_time = Duration::from_secs(0);
986        self.bin = 0;
987        self.bout = 0;
988        self.service_start = None;
989        self.backend_start = None;
990        self.backend_connected = None;
991        self.backend_stop = None;
992        self.backend_bin = 0;
993        self.backend_bout = 0;
994    }
995
996    pub fn service_start(&mut self) {
997        let now = Instant::now();
998
999        if self.start.is_none() {
1000            self.start = Some(now);
1001        }
1002
1003        self.service_start = Some(now);
1004        self.wait_time += now - self.wait_start;
1005    }
1006
1007    pub fn service_stop(&mut self) {
1008        if let Some(start) = self.service_start.take() {
1009            let duration = Instant::now() - start;
1010            self.service_time += duration;
1011        }
1012    }
1013
1014    pub fn wait_start(&mut self) {
1015        self.wait_start = Instant::now();
1016    }
1017
1018    pub fn service_time(&self) -> Duration {
1019        match self.service_start {
1020            Some(start) => {
1021                let last_duration = Instant::now() - start;
1022                self.service_time + last_duration
1023            }
1024            None => self.service_time,
1025        }
1026    }
1027
1028    /// time elapsed since the beginning of the session
1029    pub fn request_time(&self) -> Duration {
1030        match self.start {
1031            Some(start) => Instant::now() - start,
1032            None => Duration::from_secs(0),
1033        }
1034    }
1035
1036    pub fn backend_start(&mut self) {
1037        self.backend_start = Some(Instant::now());
1038    }
1039
1040    pub fn backend_connected(&mut self) {
1041        self.backend_connected = Some(Instant::now());
1042    }
1043
1044    pub fn backend_stop(&mut self) {
1045        self.backend_stop = Some(Instant::now());
1046    }
1047
1048    pub fn backend_response_time(&self) -> Option<Duration> {
1049        match (self.backend_connected, self.backend_stop) {
1050            (Some(start), Some(end)) => Some(end - start),
1051            (Some(start), None) => Some(Instant::now() - start),
1052            _ => None,
1053        }
1054    }
1055
1056    pub fn backend_connection_time(&self) -> Option<Duration> {
1057        match (self.backend_start, self.backend_connected) {
1058            (Some(start), Some(end)) => Some(end - start),
1059            _ => None,
1060        }
1061    }
1062
1063    pub fn register_end_of_session(&self, context: &LogContext) {
1064        let request_time = self.request_time();
1065        let service_time = self.service_time();
1066
1067        if let Some(cluster_id) = context.cluster_id {
1068            time!("request_time", cluster_id, request_time.as_millis());
1069            time!("service_time", cluster_id, service_time.as_millis());
1070        }
1071        time!("request_time", request_time.as_millis());
1072        time!("service_time", service_time.as_millis());
1073
1074        if let Some(backend_id) = self.backend_id.as_ref() {
1075            if let Some(backend_response_time) = self.backend_response_time() {
1076                record_backend_metrics!(
1077                    context.cluster_id.as_str_or("-"),
1078                    backend_id,
1079                    backend_response_time.as_millis(),
1080                    self.backend_connection_time(),
1081                    self.backend_bin,
1082                    self.backend_bout
1083                );
1084            }
1085        }
1086
1087        incr!("access_logs.count", context.cluster_id, context.backend_id);
1088    }
1089}
1090
1091/// exponentially weighted moving average with high sensibility to latency bursts
1092///
1093/// cf Finagle for the original implementation: <https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala>
1094#[derive(Debug, PartialEq, Clone)]
1095pub struct PeakEWMA {
1096    /// decay in nanoseconds
1097    ///
1098    /// higher values will make the EWMA decay slowly to 0
1099    pub decay: f64,
1100    /// estimated RTT in nanoseconds
1101    ///
1102    /// must be set to a high enough default value so that new backends do not
1103    /// get all the traffic right away
1104    pub rtt: f64,
1105    /// last modification
1106    pub last_event: Instant,
1107}
1108
1109impl Default for PeakEWMA {
1110    fn default() -> Self {
1111        Self::new()
1112    }
1113}
1114
1115impl PeakEWMA {
1116    // hardcoded default values for now
1117    pub fn new() -> Self {
1118        PeakEWMA {
1119            // 1s
1120            decay: 1_000_000_000f64,
1121            // 50ms
1122            rtt: 50_000_000f64,
1123            last_event: Instant::now(),
1124        }
1125    }
1126
1127    pub fn observe(&mut self, rtt: f64) {
1128        let now = Instant::now();
1129        let dur = now - self.last_event;
1130
1131        // if latency is rising, we will immediately raise the cost
1132        if rtt > self.rtt {
1133            self.rtt = rtt;
1134        } else {
1135            // new_rtt = old_rtt * e^(-elapsed/decay) + observed_rtt * (1 - e^(-elapsed/decay))
1136            let weight = (-1.0 * dur.as_nanos() as f64 / self.decay).exp();
1137            self.rtt = self.rtt * weight + rtt * (1.0 - weight);
1138        }
1139
1140        self.last_event = now;
1141    }
1142
1143    pub fn get(&mut self, active_requests: usize) -> f64 {
1144        // decay the current value
1145        // (we might not have seen a request in a long time)
1146        self.observe(0.0);
1147
1148        (active_requests + 1) as f64 * self.rtt
1149    }
1150}
1151
1152pub mod testing {
1153    pub use std::{cell::RefCell, os::fd::IntoRawFd, rc::Rc};
1154
1155    pub use anyhow::Context;
1156    pub use mio::{Poll, Registry, Token, net::UnixStream};
1157    pub use slab::Slab;
1158    pub use sozu_command::{
1159        proto::command::{
1160            HttpListenerConfig, HttpsListenerConfig, ServerConfig, TcpListenerConfig,
1161        },
1162        scm_socket::{Listeners, ScmSocket},
1163    };
1164
1165    pub use crate::{
1166        Protocol, ProxySession,
1167        backends::BackendMap,
1168        http::HttpProxy,
1169        https::HttpsProxy,
1170        pool::Pool,
1171        server::{ListenSession, ProxyChannel, Server, SessionManager},
1172        tcp::TcpProxy,
1173    };
1174
1175    /// Everything needed to create a Server
1176    pub struct ServerParts {
1177        pub event_loop: Poll,
1178        pub registry: Registry,
1179        pub sessions: Rc<RefCell<SessionManager>>,
1180        pub pool: Rc<RefCell<Pool>>,
1181        pub backends: Rc<RefCell<BackendMap>>,
1182        pub client_scm_socket: ScmSocket,
1183        pub server_scm_socket: ScmSocket,
1184        pub server_config: ServerConfig,
1185    }
1186
1187    /// Setup a standalone server, for testing purposes
1188    pub fn prebuild_server(
1189        max_buffers: usize,
1190        buffer_size: usize,
1191        send_scm: bool,
1192    ) -> anyhow::Result<ServerParts> {
1193        let event_loop = Poll::new().with_context(|| "Failed at creating event loop")?;
1194        let backends = Rc::new(RefCell::new(BackendMap::new()));
1195        let server_config = ServerConfig {
1196            max_connections: max_buffers as u64,
1197            ..Default::default()
1198        };
1199
1200        let pool = Rc::new(RefCell::new(Pool::with_capacity(
1201            1,
1202            max_buffers,
1203            buffer_size,
1204        )));
1205
1206        let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
1207        {
1208            let entry = sessions.vacant_entry();
1209            info!("taking token {:?} for channel", entry.key());
1210            entry.insert(Rc::new(RefCell::new(ListenSession {
1211                protocol: Protocol::Channel,
1212            })));
1213        }
1214        {
1215            let entry = sessions.vacant_entry();
1216            info!("taking token {:?} for timer", entry.key());
1217            entry.insert(Rc::new(RefCell::new(ListenSession {
1218                protocol: Protocol::Timer,
1219            })));
1220        }
1221        {
1222            let entry = sessions.vacant_entry();
1223            info!("taking token {:?} for metrics", entry.key());
1224            entry.insert(Rc::new(RefCell::new(ListenSession {
1225                protocol: Protocol::Metrics,
1226            })));
1227        }
1228        let sessions = SessionManager::new(sessions, max_buffers);
1229
1230        let registry = event_loop
1231            .registry()
1232            .try_clone()
1233            .with_context(|| "Failed at creating a registry")?;
1234
1235        let (scm_server, scm_client) =
1236            UnixStream::pair().with_context(|| "Failed at creating scm unix stream")?;
1237        let client_scm_socket = ScmSocket::new(scm_client.into_raw_fd())
1238            .with_context(|| "Failed at creating the scm client socket")?;
1239        let server_scm_socket = ScmSocket::new(scm_server.into_raw_fd())
1240            .with_context(|| "Failed at creating the scm server socket")?;
1241        if send_scm {
1242            client_scm_socket
1243                .send_listeners(&Listeners::default())
1244                .with_context(|| "Failed at sending empty listeners")?;
1245        }
1246
1247        Ok(ServerParts {
1248            event_loop,
1249            registry,
1250            sessions,
1251            pool,
1252            backends,
1253            client_scm_socket,
1254            server_scm_socket,
1255            server_config,
1256        })
1257    }
1258}