#[macro_use]
extern crate sozu_command_lib as sozu_command;
#[cfg(test)]
#[macro_use]
extern crate quickcheck;
#[macro_use]
pub mod util;
#[macro_use]
pub mod metrics;
pub mod backends;
pub mod features;
pub mod http;
pub mod load_balancing;
pub mod pool;
pub mod protocol;
pub mod retry;
pub mod router;
pub mod socket;
pub mod timer;
pub mod tls;
#[cfg(feature = "splice")]
mod splice;
pub mod server;
pub mod tcp;
pub mod https;
use std::{
cell::RefCell,
collections::{BTreeMap, HashMap},
fmt::{self, Display, Formatter},
net::SocketAddr,
rc::Rc,
str,
time::{Duration, Instant},
};
use backends::BackendError;
use hex::FromHexError;
use mio::{Interest, Token, net::TcpStream};
use protocol::http::{answers::TemplateError, parser::Method};
use router::RouterError;
use socket::ServerBindError;
use sozu_command::{
AsStr, ObjectKind,
logging::{CachedTags, LogContext},
proto::command::{Cluster, ListenerType, RequestHttpFrontend, WorkerRequest, WorkerResponse},
ready::Ready,
state::ClusterId,
};
use tls::CertificateResolverError;
use crate::{backends::BackendMap, router::Route};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Protocol {
HTTP,
HTTPS,
TCP,
HTTPListen,
HTTPSListen,
TCPListen,
Channel,
Metrics,
Timer,
}
pub trait ProxySession {
fn protocol(&self) -> Protocol;
fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed;
fn update_readiness(&mut self, token: Token, events: Ready);
fn close(&mut self);
fn timeout(&mut self, t: Token) -> SessionIsToBeClosed;
fn last_event(&self) -> Instant;
fn print_session(&self);
fn frontend_token(&self) -> Token;
fn shutting_down(&mut self) -> SessionIsToBeClosed;
}
#[macro_export]
macro_rules! branch {
(if $($value:ident)? == $expected:ident { $($then:tt)* } else { $($else:tt)* }) => {
macro_rules! expect {
($expected) => {$($then)*};
($a:ident) => {$($else)*};
() => {$($else)*}
}
expect!($($value)?);
};
(if $($value:ident)? == $expected:ident { $($then:tt)* } ) => {
macro_rules! expect {
($expected) => {$($then)*};
}
expect!($($value)?);
};
}
#[macro_export]
macro_rules! fallback {
({} $($default:tt)*) => {
$($default)*
};
({$($value:tt)+} $($default:tt)*) => {
$($value)+
};
}
#[macro_export]
macro_rules! StateMachineBuilder {
(
($d:tt)
$(#[$($state_macros:tt)*])*
enum $state_name:ident $(impl $trait:ident)? {
$($(#[$($variant_macros:tt)*])*
$variant_name:ident($state:ty$(,$($aux:ty),+)?) $(-> $override:expr)?),+ $(,)?
}
) => {
#[derive(Clone, Copy, Debug)]
pub enum StateMarker {
$($variant_name,)+
}
$(#[$($state_macros)*])*
pub enum $state_name {
$(
$(#[$($variant_macros)*])*
$variant_name($state$(,$($aux),+)?),
)+
FailedUpgrade(StateMarker),
}
macro_rules! _fn_impl {
($function:ident(&$d($mut:ident)?, self $d(,$arg_name:ident: $arg_type:ty)*) $d(-> $ret:ty)? $d(| $marker:tt => $fail:expr)?) => {
fn $function(&$d($mut)? self $d(,$arg_name: $arg_type)*) $d(-> $ret)? {
match self {
$($state_name::$variant_name(_state, ..) => $crate::fallback!({$($override)?} _state.$function($d($arg_name),*)),)+
$state_name::FailedUpgrade($crate::fallback!({$d($marker)?} _)) => $crate::fallback!({$d($fail)?} unreachable!())
}
}
};
}
impl $state_name {
fn marker(&self) -> StateMarker {
match self {
$($state_name::$variant_name(..) => StateMarker::$variant_name,)+
$state_name::FailedUpgrade(marker) => *marker,
}
}
fn failed(&self) -> bool {
match self {
$state_name::FailedUpgrade(_) => true,
_ => false,
}
}
fn take(&mut self) -> $state_name {
let mut owned_state = $state_name::FailedUpgrade(self.marker());
std::mem::swap(&mut owned_state, self);
owned_state
}
_fn_impl!{front_socket(&, self) -> &mio::net::TcpStream}
}
$crate::branch!{
if $($trait)? == SessionState {
impl SessionState for $state_name {
_fn_impl!{ready(&mut, self, session: Rc<RefCell<dyn ProxySession>>, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) -> SessionResult}
_fn_impl!{update_readiness(&mut, self, token: Token, events: Ready)}
_fn_impl!{timeout(&mut, self, token: Token, metrics: &mut SessionMetrics) -> StateResult}
_fn_impl!{cancel_timeouts(&mut, self)}
_fn_impl!{print_state(&, self, context: &str) | marker => error!("{} Session(FailedUpgrade({:?}))", context, marker)}
_fn_impl!{close(&mut, self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) | _ => {}}
_fn_impl!{shutting_down(&mut, self) -> SessionIsToBeClosed | _ => true}
}
} else {}
}
};
($($tt:tt)+) => {
StateMachineBuilder!{($) $($tt)+}
}
}
pub trait ListenerHandler {
fn get_addr(&self) -> &SocketAddr;
fn get_tags(&self, key: &str) -> Option<&CachedTags>;
fn get_concatenated_tags(&self, key: &str) -> Option<&str> {
self.get_tags(key).map(|tags| tags.concatenated.as_str())
}
fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>);
}
#[derive(thiserror::Error, Debug)]
pub enum FrontendFromRequestError {
#[error("Could not parse hostname from '{host}': {error}")]
HostParse { host: String, error: String },
#[error("invalid remaining chars after hostname. Host: {0}")]
InvalidCharsAfterHost(String),
#[error("no cluster: {0}")]
NoClusterFound(RouterError),
}
pub trait L7ListenerHandler {
fn get_sticky_name(&self) -> &str;
fn get_connect_timeout(&self) -> u32;
fn frontend_from_request(
&self,
host: &str,
uri: &str,
method: &Method,
) -> Result<Route, FrontendFromRequestError>;
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BackendConnectionStatus {
NotConnected,
Connecting(Instant),
Connected,
}
impl BackendConnectionStatus {
pub fn is_connecting(&self) -> bool {
matches!(self, BackendConnectionStatus::Connecting(_))
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum BackendConnectAction {
New,
Reuse,
Replace,
}
#[derive(thiserror::Error, Debug)]
pub enum BackendConnectionError {
#[error("Not found: {0:?}")]
NotFound(ObjectKind),
#[error("Too many connections on cluster {0:?}")]
MaxConnectionRetries(Option<String>),
#[error("the sessions slab has reached maximum capacity")]
MaxSessionsMemory,
#[error("error from the backend: {0}")]
Backend(BackendError),
#[error("failed to retrieve the cluster: {0}")]
RetrieveClusterError(RetrieveClusterError),
}
#[derive(thiserror::Error, Debug)]
pub enum RetrieveClusterError {
#[error("No method given")]
NoMethod,
#[error("No host given")]
NoHost,
#[error("No path given")]
NoPath,
#[error("unauthorized route")]
UnauthorizedRoute,
#[error("{0}")]
RetrieveFrontend(FrontendFromRequestError),
}
#[derive(Debug, PartialEq, Eq)]
pub enum AcceptError {
IoError,
TooManySessions,
WouldBlock,
RegisterError,
WrongSocketAddress,
BufferCapacityReached,
}
#[derive(thiserror::Error, Debug)]
pub enum ListenerError {
#[error("failed to handle certificate request, got a resolver error, {0}")]
Resolver(CertificateResolverError),
#[error("failed to parse pem, {0}")]
PemParse(String),
#[error("failed to parse template {0}: {1}")]
TemplateParse(u16, TemplateError),
#[error("failed to build rustls context, {0}")]
BuildRustls(String),
#[error("could not activate listener with address {address:?}: {error}")]
Activation { address: SocketAddr, error: String },
#[error("Could not register listener socket: {0}")]
SocketRegistration(std::io::Error),
#[error("could not add frontend: {0}")]
AddFrontend(RouterError),
#[error("could not remove frontend: {0}")]
RemoveFrontend(RouterError),
}
#[derive(thiserror::Error, Debug)]
pub enum ProxyError {
#[error("error while soft stopping {proxy_protocol} proxy: {error}")]
SoftStop {
proxy_protocol: String,
error: String,
},
#[error("error while hard stopping {proxy_protocol} proxy: {error}")]
HardStop {
proxy_protocol: String,
error: String,
},
#[error("found no listener with address {0:?}")]
NoListenerFound(SocketAddr),
#[error("a listener is already present for this token")]
ListenerAlreadyPresent,
#[error("could not add listener: {0}")]
AddListener(ListenerError),
#[error("could not add cluster: {0}")]
AddCluster(ListenerError),
#[error("failed to activate listener with address {address:?}: {listener_error}")]
ListenerActivation {
address: SocketAddr,
listener_error: ListenerError,
},
#[error("can not add frontend {front:?}: {error}")]
WrongInputFrontend {
front: RequestHttpFrontend,
error: String,
},
#[error("could not add frontend: {0}")]
AddFrontend(ListenerError),
#[error("could not remove frontend: {0}")]
RemoveFrontend(ListenerError),
#[error("could not add certificate: {0}")]
AddCertificate(CertificateResolverError),
#[error("could not remove certificate: {0}")]
RemoveCertificate(CertificateResolverError),
#[error("could not replace certificate: {0}")]
ReplaceCertificate(CertificateResolverError),
#[error("wrong certificate fingerprint: {0}")]
WrongCertificateFingerprint(FromHexError),
#[error("this request is not supported by the proxy")]
UnsupportedMessage,
#[error("failed to acquire the lock, {0}")]
Lock(String),
#[error("could not bind to socket {0:?}: {1}")]
BindToSocket(SocketAddr, ServerBindError),
#[error("error registering socket of listener: {0}")]
RegisterListener(std::io::Error),
#[error("the listener is not activated")]
UnactivatedListener,
}
use self::server::ListenToken;
pub trait ProxyConfiguration {
fn notify(&mut self, message: WorkerRequest) -> WorkerResponse;
fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError>;
fn create_session(
&mut self,
socket: TcpStream,
token: ListenToken,
wait_time: Duration,
proxy: Rc<RefCell<Self>>,
) -> Result<(), AcceptError>;
}
pub trait L7Proxy {
fn kind(&self) -> ListenerType;
fn register_socket(
&self,
socket: &mut TcpStream,
token: Token,
interest: Interest,
) -> Result<(), std::io::Error>;
fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error>;
fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token;
fn remove_session(&self, token: Token) -> bool;
fn backends(&self) -> Rc<RefCell<BackendMap>>;
fn clusters(&self) -> &HashMap<ClusterId, Cluster>;
}
#[derive(Debug, PartialEq, Eq)]
pub enum RequiredEvents {
FrontReadBackNone,
FrontWriteBackNone,
FrontReadWriteBackNone,
FrontNoneBackNone,
FrontReadBackRead,
FrontWriteBackRead,
FrontReadWriteBackRead,
FrontNoneBackRead,
FrontReadBackWrite,
FrontWriteBackWrite,
FrontReadWriteBackWrite,
FrontNoneBackWrite,
FrontReadBackReadWrite,
FrontWriteBackReadWrite,
FrontReadWriteBackReadWrite,
FrontNoneBackReadWrite,
}
impl RequiredEvents {
pub fn front_readable(&self) -> bool {
matches!(
*self,
RequiredEvents::FrontReadBackNone
| RequiredEvents::FrontReadWriteBackNone
| RequiredEvents::FrontReadBackRead
| RequiredEvents::FrontReadWriteBackRead
| RequiredEvents::FrontReadBackWrite
| RequiredEvents::FrontReadWriteBackWrite
| RequiredEvents::FrontReadBackReadWrite
| RequiredEvents::FrontReadWriteBackReadWrite
)
}
pub fn front_writable(&self) -> bool {
matches!(
*self,
RequiredEvents::FrontWriteBackNone
| RequiredEvents::FrontReadWriteBackNone
| RequiredEvents::FrontWriteBackRead
| RequiredEvents::FrontReadWriteBackRead
| RequiredEvents::FrontWriteBackWrite
| RequiredEvents::FrontReadWriteBackWrite
| RequiredEvents::FrontWriteBackReadWrite
| RequiredEvents::FrontReadWriteBackReadWrite
)
}
pub fn back_readable(&self) -> bool {
matches!(
*self,
RequiredEvents::FrontReadBackRead
| RequiredEvents::FrontWriteBackRead
| RequiredEvents::FrontReadWriteBackRead
| RequiredEvents::FrontNoneBackRead
| RequiredEvents::FrontReadBackReadWrite
| RequiredEvents::FrontWriteBackReadWrite
| RequiredEvents::FrontReadWriteBackReadWrite
| RequiredEvents::FrontNoneBackReadWrite
)
}
pub fn back_writable(&self) -> bool {
matches!(
*self,
RequiredEvents::FrontReadBackWrite
| RequiredEvents::FrontWriteBackWrite
| RequiredEvents::FrontReadWriteBackWrite
| RequiredEvents::FrontNoneBackWrite
| RequiredEvents::FrontReadBackReadWrite
| RequiredEvents::FrontWriteBackReadWrite
| RequiredEvents::FrontReadWriteBackReadWrite
| RequiredEvents::FrontNoneBackReadWrite
)
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum StateResult {
CloseBackend,
CloseSession,
ConnectBackend,
Continue,
Upgrade,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SessionResult {
Close,
Continue,
Upgrade,
}
#[derive(Debug, PartialEq, Eq)]
pub enum SocketType {
Listener,
FrontClient,
}
type SessionIsToBeClosed = bool;
#[derive(Clone)]
pub struct Readiness {
pub event: Ready,
pub interest: Ready,
}
impl Display for Readiness {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let i = &mut [b'-'; 4];
let r = &mut [b'-'; 4];
let mixed = &mut [b'-'; 4];
display_ready(i, self.interest);
display_ready(r, self.event);
display_ready(mixed, self.interest & self.event);
write!(
f,
"I({:?})&R({:?})=M({:?})",
String::from_utf8_lossy(i),
String::from_utf8_lossy(r),
String::from_utf8_lossy(mixed)
)
}
}
impl Default for Readiness {
fn default() -> Self {
Self::new()
}
}
impl Readiness {
pub const fn new() -> Readiness {
Readiness {
event: Ready::EMPTY,
interest: Ready::EMPTY,
}
}
pub fn reset(&mut self) {
self.event = Ready::EMPTY;
self.interest = Ready::EMPTY;
}
pub fn filter_interest(&self) -> Ready {
self.event & self.interest
}
}
pub fn display_ready(s: &mut [u8], readiness: Ready) {
if readiness.is_readable() {
s[0] = b'R';
}
if readiness.is_writable() {
s[1] = b'W';
}
if readiness.is_error() {
s[2] = b'E';
}
if readiness.is_hup() {
s[3] = b'H';
}
}
pub fn ready_to_string(readiness: Ready) -> String {
let s = &mut [b'-'; 4];
display_ready(s, readiness);
String::from_utf8(s.to_vec()).unwrap()
}
impl fmt::Debug for Readiness {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let i = &mut [b'-'; 4];
let r = &mut [b'-'; 4];
let mixed = &mut [b'-'; 4];
display_ready(i, self.interest);
display_ready(r, self.event);
display_ready(mixed, self.interest & self.event);
write!(
f,
"Readiness {{ interest: {}, readiness: {}, mixed: {} }}",
str::from_utf8(i).unwrap(),
str::from_utf8(r).unwrap(),
str::from_utf8(mixed).unwrap()
)
}
}
#[derive(Clone, Debug)]
pub struct SessionMetrics {
pub start: Option<Instant>,
pub service_time: Duration,
pub wait_time: Duration,
pub bin: usize,
pub bout: usize,
pub service_start: Option<Instant>,
pub wait_start: Instant,
pub backend_id: Option<String>,
pub backend_start: Option<Instant>,
pub backend_connected: Option<Instant>,
pub backend_stop: Option<Instant>,
pub backend_bin: usize,
pub backend_bout: usize,
}
impl SessionMetrics {
pub fn new(wait_time: Option<Duration>) -> SessionMetrics {
SessionMetrics {
start: Some(Instant::now()),
service_time: Duration::from_secs(0),
wait_time: wait_time.unwrap_or_else(|| Duration::from_secs(0)),
bin: 0,
bout: 0,
service_start: None,
wait_start: Instant::now(),
backend_id: None,
backend_start: None,
backend_connected: None,
backend_stop: None,
backend_bin: 0,
backend_bout: 0,
}
}
pub fn reset(&mut self) {
self.start = None;
self.service_time = Duration::from_secs(0);
self.wait_time = Duration::from_secs(0);
self.bin = 0;
self.bout = 0;
self.service_start = None;
self.backend_start = None;
self.backend_connected = None;
self.backend_stop = None;
self.backend_bin = 0;
self.backend_bout = 0;
}
pub fn service_start(&mut self) {
let now = Instant::now();
if self.start.is_none() {
self.start = Some(now);
}
self.service_start = Some(now);
self.wait_time += now - self.wait_start;
}
pub fn service_stop(&mut self) {
if let Some(start) = self.service_start.take() {
let duration = Instant::now() - start;
self.service_time += duration;
}
}
pub fn wait_start(&mut self) {
self.wait_start = Instant::now();
}
pub fn service_time(&self) -> Duration {
match self.service_start {
Some(start) => {
let last_duration = Instant::now() - start;
self.service_time + last_duration
}
None => self.service_time,
}
}
pub fn request_time(&self) -> Duration {
match self.start {
Some(start) => Instant::now() - start,
None => Duration::from_secs(0),
}
}
pub fn backend_start(&mut self) {
self.backend_start = Some(Instant::now());
}
pub fn backend_connected(&mut self) {
self.backend_connected = Some(Instant::now());
}
pub fn backend_stop(&mut self) {
self.backend_stop = Some(Instant::now());
}
pub fn backend_response_time(&self) -> Option<Duration> {
match (self.backend_connected, self.backend_stop) {
(Some(start), Some(end)) => Some(end - start),
(Some(start), None) => Some(Instant::now() - start),
_ => None,
}
}
pub fn backend_connection_time(&self) -> Option<Duration> {
match (self.backend_start, self.backend_connected) {
(Some(start), Some(end)) => Some(end - start),
_ => None,
}
}
pub fn register_end_of_session(&self, context: &LogContext) {
let request_time = self.request_time();
let service_time = self.service_time();
if let Some(cluster_id) = context.cluster_id {
time!("request_time", cluster_id, request_time.as_millis());
time!("service_time", cluster_id, service_time.as_millis());
}
time!("request_time", request_time.as_millis());
time!("service_time", service_time.as_millis());
if let Some(backend_id) = self.backend_id.as_ref() {
if let Some(backend_response_time) = self.backend_response_time() {
record_backend_metrics!(
context.cluster_id.as_str_or("-"),
backend_id,
backend_response_time.as_millis(),
self.backend_connection_time(),
self.backend_bin,
self.backend_bout
);
}
}
incr!("access_logs.count", context.cluster_id, context.backend_id);
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct PeakEWMA {
pub decay: f64,
pub rtt: f64,
pub last_event: Instant,
}
impl Default for PeakEWMA {
fn default() -> Self {
Self::new()
}
}
impl PeakEWMA {
pub fn new() -> Self {
PeakEWMA {
decay: 1_000_000_000f64,
rtt: 50_000_000f64,
last_event: Instant::now(),
}
}
pub fn observe(&mut self, rtt: f64) {
let now = Instant::now();
let dur = now - self.last_event;
if rtt > self.rtt {
self.rtt = rtt;
} else {
let weight = (-1.0 * dur.as_nanos() as f64 / self.decay).exp();
self.rtt = self.rtt * weight + rtt * (1.0 - weight);
}
self.last_event = now;
}
pub fn get(&mut self, active_requests: usize) -> f64 {
self.observe(0.0);
(active_requests + 1) as f64 * self.rtt
}
}
pub mod testing {
pub use std::{cell::RefCell, os::fd::IntoRawFd, rc::Rc};
pub use anyhow::Context;
pub use mio::{Poll, Registry, Token, net::UnixStream};
pub use slab::Slab;
pub use sozu_command::{
proto::command::{
HttpListenerConfig, HttpsListenerConfig, ServerConfig, TcpListenerConfig,
},
scm_socket::{Listeners, ScmSocket},
};
pub use crate::{
Protocol, ProxySession,
backends::BackendMap,
http::HttpProxy,
https::HttpsProxy,
pool::Pool,
server::{ListenSession, ProxyChannel, Server, SessionManager},
tcp::TcpProxy,
};
pub struct ServerParts {
pub event_loop: Poll,
pub registry: Registry,
pub sessions: Rc<RefCell<SessionManager>>,
pub pool: Rc<RefCell<Pool>>,
pub backends: Rc<RefCell<BackendMap>>,
pub client_scm_socket: ScmSocket,
pub server_scm_socket: ScmSocket,
pub server_config: ServerConfig,
}
pub fn prebuild_server(
max_buffers: usize,
buffer_size: usize,
send_scm: bool,
) -> anyhow::Result<ServerParts> {
let event_loop = Poll::new().with_context(|| "Failed at creating event loop")?;
let backends = Rc::new(RefCell::new(BackendMap::new()));
let server_config = ServerConfig {
max_connections: max_buffers as u64,
..Default::default()
};
let pool = Rc::new(RefCell::new(Pool::with_capacity(
1,
max_buffers,
buffer_size,
)));
let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for channel", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::Channel,
})));
}
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for timer", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::Timer,
})));
}
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for metrics", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::Metrics,
})));
}
let sessions = SessionManager::new(sessions, max_buffers);
let registry = event_loop
.registry()
.try_clone()
.with_context(|| "Failed at creating a registry")?;
let (scm_server, scm_client) =
UnixStream::pair().with_context(|| "Failed at creating scm unix stream")?;
let client_scm_socket = ScmSocket::new(scm_client.into_raw_fd())
.with_context(|| "Failed at creating the scm client socket")?;
let server_scm_socket = ScmSocket::new(scm_server.into_raw_fd())
.with_context(|| "Failed at creating the scm server socket")?;
if send_scm {
client_scm_socket
.send_listeners(&Listeners::default())
.with_context(|| "Failed at sending empty listeners")?;
}
Ok(ServerParts {
event_loop,
registry,
sessions,
pool,
backends,
client_scm_socket,
server_scm_socket,
server_config,
})
}
}