#[macro_use]
extern crate sozu_command_lib as sozu_command;
#[macro_use]
pub mod util;
#[macro_use]
pub mod metrics;
pub mod backends;
pub mod crypto;
pub mod features;
pub mod health_check;
pub mod http;
pub mod load_balancing;
pub mod pool;
pub mod protocol;
pub mod retry;
pub mod router;
pub mod socket;
pub mod timer;
pub mod tls;
#[cfg(all(target_os = "linux", feature = "splice"))]
pub(crate) mod splice;
pub mod server;
pub mod tcp;
pub mod udp;
pub mod https;
use std::{
cell::RefCell,
collections::{BTreeMap, HashMap},
fmt::{self, Display, Formatter},
net::SocketAddr,
rc::Rc,
str,
time::{Duration, Instant, SystemTime},
};
use backends::BackendError;
use hex::FromHexError;
use mio::{Interest, Token, net::TcpStream};
use protocol::http::{answers::HttpAnswers, answers::TemplateError, parser::Method};
use router::RouterError;
use socket::ServerBindError;
use sozu_command::{
AsStr, ObjectKind,
logging::{CachedTags, LogContext},
proto::command::{Cluster, ListenerType, RequestHttpFrontend, WorkerRequest, WorkerResponse},
ready::Ready,
state::ClusterId,
};
use tls::CertificateResolverError;
use crate::{backends::BackendMap, metrics::names, router::RouteResult};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Protocol {
HTTP,
HTTPS,
TCP,
UDP,
HTTPListen,
HTTPSListen,
TCPListen,
UDPListen,
Channel,
Metrics,
Timer,
}
pub trait ProxySession {
fn protocol(&self) -> Protocol;
fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed;
fn update_readiness(&mut self, token: Token, events: Ready);
fn close(&mut self);
fn timeout(&mut self, t: Token) -> SessionIsToBeClosed;
fn last_event(&self) -> Instant;
fn print_session(&self);
fn frontend_token(&self) -> Token;
fn shutting_down(&mut self) -> SessionIsToBeClosed;
fn cluster_id(&self) -> Option<String> {
None
}
fn session_address(&self) -> Option<SocketAddr> {
None
}
}
#[macro_export]
macro_rules! branch {
(if $($value:ident)? == $expected:ident { $($then:tt)* } else { $($else:tt)* }) => {
macro_rules! expect {
($expected) => {$($then)*};
($a:ident) => {$($else)*};
() => {$($else)*}
}
expect!($($value)?);
};
(if $($value:ident)? == $expected:ident { $($then:tt)* } ) => {
macro_rules! expect {
($expected) => {$($then)*};
}
expect!($($value)?);
};
}
#[macro_export]
macro_rules! fallback {
({} $($default:tt)*) => {
$($default)*
};
({$($value:tt)+} $($default:tt)*) => {
$($value)+
};
}
#[macro_export]
macro_rules! StateMachineBuilder {
(
($d:tt)
$(#[$($state_macros:tt)*])*
enum $state_name:ident $(impl $trait:ident)? {
$($(#[$($variant_macros:tt)*])*
$variant_name:ident($state:ty$(,$($aux:ty),+)?) $(-> $override:expr)?),+ $(,)?
}
) => {
#[derive(Clone, Copy, Debug)]
pub enum StateMarker {
$($variant_name,)+
}
$(#[$($state_macros)*])*
#[allow(clippy::large_enum_variant)]
pub enum $state_name {
$(
$(#[$($variant_macros)*])*
$variant_name($state$(,$($aux),+)?),
)+
FailedUpgrade(StateMarker),
}
macro_rules! _fn_impl {
($function:ident(&$d($mut:ident)?, self $d(,$arg_name:ident: $arg_type:ty)*) $d(-> $ret:ty)? $d(| $marker:tt => $fail:expr)?) => {
fn $function(&$d($mut)? self $d(,$arg_name: $arg_type)*) $d(-> $ret)? {
match self {
$($state_name::$variant_name(_state, ..) => $crate::fallback!({$($override)?} _state.$function($d($arg_name),*)),)+
$state_name::FailedUpgrade($crate::fallback!({$d($marker)?} _)) => $crate::fallback!({$d($fail)?} unreachable!())
}
}
};
}
impl $state_name {
fn marker(&self) -> StateMarker {
match self {
$($state_name::$variant_name(..) => StateMarker::$variant_name,)+
$state_name::FailedUpgrade(marker) => *marker,
}
}
fn failed(&self) -> bool {
match self {
$state_name::FailedUpgrade(_) => true,
_ => false,
}
}
fn take(&mut self) -> $state_name {
let mut owned_state = $state_name::FailedUpgrade(self.marker());
std::mem::swap(&mut owned_state, self);
owned_state
}
_fn_impl!{front_socket(&, self) -> &mio::net::TcpStream}
}
$crate::branch!{
if $($trait)? == SessionState {
impl SessionState for $state_name {
_fn_impl!{ready(&mut, self, session: Rc<RefCell<dyn ProxySession>>, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) -> SessionResult}
_fn_impl!{update_readiness(&mut, self, token: Token, events: Ready)}
_fn_impl!{timeout(&mut, self, token: Token, metrics: &mut SessionMetrics) -> StateResult}
_fn_impl!{cancel_timeouts(&mut, self)}
_fn_impl!{print_state(&, self, context: &str) | marker => error!("{} Session(FailedUpgrade({:?}))", context, marker)}
_fn_impl!{close(&mut, self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) | _ => {}}
_fn_impl!{shutting_down(&mut, self) -> SessionIsToBeClosed | _ => true}
}
} else {}
}
};
($($tt:tt)+) => {
StateMachineBuilder!{($) $($tt)+}
}
}
pub trait ListenerHandler {
fn get_addr(&self) -> &SocketAddr;
fn get_tags(&self, key: &str) -> Option<&CachedTags>;
fn get_concatenated_tags(&self, key: &str) -> Option<&str> {
self.get_tags(key).map(|tags| tags.concatenated.as_str())
}
fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>);
fn protocol(&self) -> Protocol;
fn public_address(&self) -> SocketAddr;
}
#[derive(thiserror::Error, Debug)]
pub enum FrontendFromRequestError {
#[error("Could not parse hostname from '{host}': {error}")]
HostParse { host: String, error: String },
#[error("invalid remaining chars after hostname. Host: {0}")]
InvalidCharsAfterHost(String),
#[error("no cluster: {0}")]
NoClusterFound(RouterError),
}
pub trait L7ListenerHandler {
fn get_sticky_name(&self) -> &str;
fn get_sozu_id_header(&self) -> &str {
"Sozu-Id"
}
fn get_connect_timeout(&self) -> u32;
fn frontend_from_request(
&self,
host: &str,
uri: &str,
method: &Method,
) -> Result<RouteResult, FrontendFromRequestError>;
fn get_answers(&self) -> &Rc<RefCell<HttpAnswers>>;
fn get_h2_flood_config(&self) -> protocol::mux::H2FloodConfig {
protocol::mux::H2FloodConfig::default()
}
fn get_h2_connection_config(&self) -> protocol::mux::H2ConnectionConfig {
protocol::mux::H2ConnectionConfig::default()
}
fn get_strict_sni_binding(&self) -> bool {
true
}
fn get_elide_x_real_ip(&self) -> bool {
false
}
fn get_send_x_real_ip(&self) -> bool {
false
}
fn get_h2_stream_idle_timeout(&self) -> std::time::Duration {
std::time::Duration::from_secs(30)
}
fn get_h2_graceful_shutdown_deadline(&self) -> Option<std::time::Duration> {
Some(std::time::Duration::from_secs(5))
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BackendConnectionStatus {
NotConnected,
Connecting(Instant),
Connected,
}
impl BackendConnectionStatus {
pub fn is_connecting(&self) -> bool {
matches!(self, BackendConnectionStatus::Connecting(_))
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum BackendConnectAction {
New,
Reuse,
Replace,
}
#[derive(thiserror::Error, Debug)]
pub enum BackendConnectionError {
#[error("Not found: {0:?}")]
NotFound(ObjectKind),
#[error("Too many connections on cluster {0:?}")]
MaxConnectionRetries(Option<String>),
#[error("the sessions slab has reached maximum capacity")]
MaxSessionsMemory,
#[error("error from the backend: {0}")]
Backend(BackendError),
#[error("failed to retrieve the cluster: {0}")]
RetrieveClusterError(RetrieveClusterError),
#[error("maximum number of buffers reached")]
MaxBuffers,
#[error("per-(cluster, source-IP) connection limit reached for cluster {cluster_id:?}")]
TooManyConnectionsPerIp { cluster_id: String },
}
#[derive(thiserror::Error, Debug)]
pub enum RetrieveClusterError {
#[error("No method given")]
NoMethod,
#[error("No host given")]
NoHost,
#[error("No path given")]
NoPath,
#[error("unauthorized route")]
UnauthorizedRoute,
#[error("{0}")]
RetrieveFrontend(FrontendFromRequestError),
#[error("HTTPS redirect required")]
HttpsRedirect,
#[error("TLS SNI {sni:?} does not match HTTP authority {authority:?}")]
SniAuthorityMismatch { sni: String, authority: String },
}
#[derive(Debug, PartialEq, Eq)]
pub enum AcceptError {
IoError,
TooManySessions,
WouldBlock,
RegisterError,
WrongSocketAddress,
BufferCapacityReached,
}
#[derive(thiserror::Error, Debug)]
pub enum ListenerError {
#[error("failed to handle certificate request, got a resolver error, {0}")]
Resolver(CertificateResolverError),
#[error("failed to parse pem, {0}")]
PemParse(String),
#[error("failed to parse template {0:?}: {1}")]
TemplateParse(String, TemplateError),
#[error("failed to build rustls context, {0}")]
BuildRustls(String),
#[error("could not activate listener with address {address:?}: {error}")]
Activation { address: SocketAddr, error: String },
#[error("Could not register listener socket: {0}")]
SocketRegistration(std::io::Error),
#[error("could not add frontend: {0}")]
AddFrontend(RouterError),
#[error("could not remove frontend: {0}")]
RemoveFrontend(RouterError),
#[error("invalid value for field '{field}': {reason}")]
InvalidValue {
field: &'static str,
reason: &'static str,
},
#[error(
"UpdateHttpsListenerConfig.hsts is present but `enabled` is unset; the partial-update \
contract requires `enabled` whenever the `hsts` block is present"
)]
HstsEnabledRequired,
}
impl From<sozu_command::state::StateError> for ListenerError {
fn from(err: sozu_command::state::StateError) -> Self {
match err {
sozu_command::state::StateError::InvalidValue { field, reason } => {
ListenerError::InvalidValue { field, reason }
}
_ => ListenerError::InvalidValue {
field: "state",
reason: "unexpected state error on worker path",
},
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum ProxyError {
#[error("error while soft stopping {proxy_protocol} proxy: {error}")]
SoftStop {
proxy_protocol: String,
error: String,
},
#[error("error while hard stopping {proxy_protocol} proxy: {error}")]
HardStop {
proxy_protocol: String,
error: String,
},
#[error("found no listener with address {0:?}")]
NoListenerFound(SocketAddr),
#[error("a listener is already present for this token")]
ListenerAlreadyPresent,
#[error("could not add listener: {0}")]
AddListener(ListenerError),
#[error("could not add cluster: {0}")]
AddCluster(ListenerError),
#[error("failed to activate listener with address {address:?}: {listener_error}")]
ListenerActivation {
address: SocketAddr,
listener_error: ListenerError,
},
#[error("can not add frontend {front:?}: {error}")]
WrongInputFrontend {
front: Box<RequestHttpFrontend>,
error: String,
},
#[error("could not add frontend: {0}")]
AddFrontend(ListenerError),
#[error("could not remove frontend: {0}")]
RemoveFrontend(ListenerError),
#[error("could not add certificate: {0}")]
AddCertificate(CertificateResolverError),
#[error("could not remove certificate: {0}")]
RemoveCertificate(CertificateResolverError),
#[error("could not replace certificate: {0}")]
ReplaceCertificate(CertificateResolverError),
#[error("wrong certificate fingerprint: {0}")]
WrongCertificateFingerprint(FromHexError),
#[error("this request is not supported by the proxy")]
UnsupportedMessage,
#[error("failed to acquire the lock, {0}")]
Lock(String),
#[error("could not bind to socket {0:?}: {1}")]
BindToSocket(SocketAddr, ServerBindError),
#[error("error registering socket of listener: {0}")]
RegisterListener(std::io::Error),
#[error("the listener is not activated")]
UnactivatedListener,
#[error(
"HSTS is only valid on HTTPS frontends; rejecting AddHttpFrontend with hsts.enabled = \
true on address {0:?} (RFC 6797 §7.2)"
)]
HstsOnPlainHttp(SocketAddr),
}
use self::server::ListenToken;
pub trait ProxyConfiguration {
fn notify(&mut self, message: WorkerRequest) -> WorkerResponse;
fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError>;
fn create_session(
&mut self,
socket: TcpStream,
token: ListenToken,
wait_time: Duration,
proxy: Rc<RefCell<Self>>,
) -> Result<(), AcceptError>;
}
pub trait L7Proxy {
fn kind(&self) -> ListenerType;
fn register_socket(
&self,
socket: &mut TcpStream,
token: Token,
interest: Interest,
) -> Result<(), std::io::Error>;
fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error>;
fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token;
fn remove_session(&self, token: Token) -> bool;
fn backends(&self) -> Rc<RefCell<BackendMap>>;
fn clusters(&self) -> &HashMap<ClusterId, Cluster>;
fn sessions(&self) -> Rc<RefCell<crate::server::SessionManager>>;
}
#[derive(Debug, PartialEq, Eq)]
pub enum RequiredEvents {
FrontReadBackNone,
FrontWriteBackNone,
FrontReadWriteBackNone,
FrontNoneBackNone,
FrontReadBackRead,
FrontWriteBackRead,
FrontReadWriteBackRead,
FrontNoneBackRead,
FrontReadBackWrite,
FrontWriteBackWrite,
FrontReadWriteBackWrite,
FrontNoneBackWrite,
FrontReadBackReadWrite,
FrontWriteBackReadWrite,
FrontReadWriteBackReadWrite,
FrontNoneBackReadWrite,
}
impl RequiredEvents {
pub fn front_readable(&self) -> bool {
matches!(
*self,
RequiredEvents::FrontReadBackNone
| RequiredEvents::FrontReadWriteBackNone
| RequiredEvents::FrontReadBackRead
| RequiredEvents::FrontReadWriteBackRead
| RequiredEvents::FrontReadBackWrite
| RequiredEvents::FrontReadWriteBackWrite
| RequiredEvents::FrontReadBackReadWrite
| RequiredEvents::FrontReadWriteBackReadWrite
)
}
pub fn front_writable(&self) -> bool {
matches!(
*self,
RequiredEvents::FrontWriteBackNone
| RequiredEvents::FrontReadWriteBackNone
| RequiredEvents::FrontWriteBackRead
| RequiredEvents::FrontReadWriteBackRead
| RequiredEvents::FrontWriteBackWrite
| RequiredEvents::FrontReadWriteBackWrite
| RequiredEvents::FrontWriteBackReadWrite
| RequiredEvents::FrontReadWriteBackReadWrite
)
}
pub fn back_readable(&self) -> bool {
matches!(
*self,
RequiredEvents::FrontReadBackRead
| RequiredEvents::FrontWriteBackRead
| RequiredEvents::FrontReadWriteBackRead
| RequiredEvents::FrontNoneBackRead
| RequiredEvents::FrontReadBackReadWrite
| RequiredEvents::FrontWriteBackReadWrite
| RequiredEvents::FrontReadWriteBackReadWrite
| RequiredEvents::FrontNoneBackReadWrite
)
}
pub fn back_writable(&self) -> bool {
matches!(
*self,
RequiredEvents::FrontReadBackWrite
| RequiredEvents::FrontWriteBackWrite
| RequiredEvents::FrontReadWriteBackWrite
| RequiredEvents::FrontNoneBackWrite
| RequiredEvents::FrontReadBackReadWrite
| RequiredEvents::FrontWriteBackReadWrite
| RequiredEvents::FrontReadWriteBackReadWrite
| RequiredEvents::FrontNoneBackReadWrite
)
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum StateResult {
CloseBackend,
CloseSession,
ConnectBackend,
Continue,
Upgrade,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SessionResult {
Close,
Continue,
Upgrade,
}
#[derive(Debug, PartialEq, Eq)]
pub enum SocketType {
Listener,
FrontClient,
}
type SessionIsToBeClosed = bool;
#[derive(Clone)]
pub struct Readiness {
pub event: Ready,
pub interest: Ready,
}
impl Display for Readiness {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let i = &mut [b'-'; 4];
let r = &mut [b'-'; 4];
let mixed = &mut [b'-'; 4];
display_ready(i, self.interest);
display_ready(r, self.event);
display_ready(mixed, self.interest & self.event);
write!(
f,
"I({:?})&R({:?})=M({:?})",
String::from_utf8_lossy(i),
String::from_utf8_lossy(r),
String::from_utf8_lossy(mixed)
)
}
}
impl Default for Readiness {
fn default() -> Self {
Self::new()
}
}
impl Readiness {
const KNOWN_BITS: Ready =
Ready(Ready::READABLE.0 | Ready::WRITABLE.0 | Ready::ERROR.0 | Ready::HUP.0);
pub const fn new() -> Readiness {
Readiness {
event: Ready::EMPTY,
interest: Ready::EMPTY,
}
}
#[cfg(debug_assertions)]
fn check_invariants(&self) {
debug_assert_eq!(
self.event & Self::KNOWN_BITS,
self.event,
"Readiness.event carries a bit outside READABLE|WRITABLE|ERROR|HUP"
);
debug_assert_eq!(
self.interest & Self::KNOWN_BITS,
self.interest,
"Readiness.interest carries a bit outside READABLE|WRITABLE|ERROR|HUP"
);
}
pub fn reset(&mut self) {
self.event = Ready::EMPTY;
self.interest = Ready::EMPTY;
debug_assert!(
self.event.is_empty() && self.interest.is_empty(),
"reset must clear both event and interest"
);
#[cfg(debug_assertions)]
self.check_invariants();
}
pub fn filter_interest(&self) -> Ready {
#[cfg(debug_assertions)]
self.check_invariants();
let filtered = self.event & self.interest;
debug_assert_eq!(
filtered & Self::KNOWN_BITS,
filtered,
"filter_interest must not yield an unknown bit"
);
debug_assert!(
self.interest.contains(filtered) && self.event.contains(filtered),
"filtered readiness must be present in both interest and event"
);
filtered
}
pub fn signal_pending_write(&mut self) {
let other_event_before = Ready(self.event.0 & !Ready::WRITABLE.0);
self.event.insert(Ready::WRITABLE);
debug_assert!(
self.event.is_writable(),
"signal_pending_write must set the WRITABLE event bit"
);
debug_assert_eq!(
Ready(self.event.0 & !Ready::WRITABLE.0),
other_event_before,
"signal_pending_write must touch only the WRITABLE bit"
);
#[cfg(debug_assertions)]
self.check_invariants();
}
pub fn signal_pending_read(&mut self) {
let other_event_before = Ready(self.event.0 & !Ready::READABLE.0);
self.event.insert(Ready::READABLE);
debug_assert!(
self.event.is_readable(),
"signal_pending_read must set the READABLE event bit"
);
debug_assert_eq!(
Ready(self.event.0 & !Ready::READABLE.0),
other_event_before,
"signal_pending_read must touch only the READABLE bit"
);
#[cfg(debug_assertions)]
self.check_invariants();
}
#[inline]
pub fn arm_writable(&mut self) {
let other_interest_before = Ready(self.interest.0 & !Ready::WRITABLE.0);
let other_event_before = Ready(self.event.0 & !Ready::WRITABLE.0);
self.interest.insert(Ready::WRITABLE);
self.signal_pending_write();
debug_assert!(
self.interest.is_writable() && self.event.is_writable(),
"arm_writable must set WRITABLE in both interest and event"
);
debug_assert_eq!(
Ready(self.interest.0 & !Ready::WRITABLE.0),
other_interest_before,
"arm_writable must touch only the WRITABLE interest bit"
);
debug_assert_eq!(
Ready(self.event.0 & !Ready::WRITABLE.0),
other_event_before,
"arm_writable must touch only the WRITABLE event bit"
);
#[cfg(debug_assertions)]
self.check_invariants();
}
}
#[cfg(test)]
mod readiness_tests {
use super::{Readiness, Ready};
#[test]
fn arm_writable_sets_interest_and_event() {
let mut r = Readiness::new();
r.arm_writable();
assert!(r.interest.is_writable());
assert!(r.event.is_writable());
}
#[test]
fn arm_writable_is_idempotent() {
let mut r = Readiness::new();
r.arm_writable();
r.arm_writable();
assert_eq!(r.interest, Ready::WRITABLE);
assert_eq!(r.event, Ready::WRITABLE);
}
}
pub fn display_ready(s: &mut [u8], readiness: Ready) {
if readiness.is_readable() {
s[0] = b'R';
}
if readiness.is_writable() {
s[1] = b'W';
}
if readiness.is_error() {
s[2] = b'E';
}
if readiness.is_hup() {
s[3] = b'H';
}
}
pub fn ready_to_string(readiness: Ready) -> String {
let s = &mut [b'-'; 4];
display_ready(s, readiness);
String::from_utf8(s.to_vec()).unwrap()
}
impl fmt::Debug for Readiness {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let i = &mut [b'-'; 4];
let r = &mut [b'-'; 4];
let mixed = &mut [b'-'; 4];
display_ready(i, self.interest);
display_ready(r, self.event);
display_ready(mixed, self.interest & self.event);
write!(
f,
"Readiness {{ interest: {}, readiness: {}, mixed: {} }}",
str::from_utf8(i).unwrap(),
str::from_utf8(r).unwrap(),
str::from_utf8(mixed).unwrap()
)
}
}
#[derive(Clone, Debug)]
pub struct SessionMetrics {
pub start: Option<Instant>,
pub start_wall: Option<SystemTime>,
pub service_time: Duration,
pub wait_time: Duration,
pub bin: usize,
pub bout: usize,
pub service_start: Option<Instant>,
pub wait_start: Instant,
pub backend_id: Option<String>,
pub backend_start: Option<Instant>,
pub backend_connected: Option<Instant>,
pub backend_stop: Option<Instant>,
pub backend_bin: usize,
pub backend_bout: usize,
}
impl SessionMetrics {
pub fn new(wait_time: Option<Duration>) -> SessionMetrics {
SessionMetrics {
start: Some(Instant::now()),
start_wall: Some(SystemTime::now()),
service_time: Duration::from_secs(0),
wait_time: wait_time.unwrap_or_else(|| Duration::from_secs(0)),
bin: 0,
bout: 0,
service_start: None,
wait_start: Instant::now(),
backend_id: None,
backend_start: None,
backend_connected: None,
backend_stop: None,
backend_bin: 0,
backend_bout: 0,
}
}
pub fn reset(&mut self) {
self.start = None;
self.start_wall = None;
self.service_time = Duration::from_secs(0);
self.wait_time = Duration::from_secs(0);
self.bin = 0;
self.bout = 0;
self.service_start = None;
self.backend_start = None;
self.backend_connected = None;
self.backend_stop = None;
self.backend_bin = 0;
self.backend_bout = 0;
}
pub fn service_start(&mut self) {
let now = if self.start.is_none() {
self.mark_request_start()
} else {
Instant::now()
};
self.service_start = Some(now);
self.wait_time += now - self.wait_start;
}
pub fn service_stop(&mut self) {
if let Some(start) = self.service_start.take() {
let duration = Instant::now() - start;
self.service_time += duration;
}
}
pub fn wait_start(&mut self) {
self.wait_start = Instant::now();
}
pub fn service_time(&self) -> Duration {
match self.service_start {
Some(start) => {
let last_duration = Instant::now() - start;
self.service_time + last_duration
}
None => self.service_time,
}
}
pub fn mark_request_start(&mut self) -> Instant {
let now = Instant::now();
self.start = Some(now);
self.start_wall = Some(SystemTime::now());
now
}
pub fn request_time(&self) -> Duration {
match self.start {
Some(start) => Instant::now() - start,
None => Duration::from_secs(0),
}
}
pub fn start_wall_ns(&self) -> Option<i128> {
self.start_wall.and_then(|t| {
t.duration_since(SystemTime::UNIX_EPOCH)
.ok()
.map(|d| d.as_nanos() as i128)
})
}
pub fn backend_start(&mut self) {
self.backend_start = Some(Instant::now());
}
pub fn backend_connected(&mut self) {
self.backend_connected = Some(Instant::now());
}
pub fn backend_stop(&mut self) {
self.backend_stop = Some(Instant::now());
}
pub fn backend_response_time(&self) -> Option<Duration> {
match (self.backend_connected, self.backend_stop) {
(Some(start), Some(end)) => Some(end - start),
(Some(start), None) => Some(Instant::now() - start),
_ => None,
}
}
pub fn backend_connection_time(&self) -> Option<Duration> {
match (self.backend_start, self.backend_connected) {
(Some(start), Some(end)) => Some(end - start),
_ => None,
}
}
pub fn register_end_of_session(&self, context: &LogContext) {
let request_time = self.request_time();
let service_time = self.service_time();
if let Some(cluster_id) = context.cluster_id {
time!(
names::event_loop::REQUEST_TIME,
cluster_id,
request_time.as_millis()
);
time!(
names::event_loop::SERVICE_TIME,
cluster_id,
service_time.as_millis()
);
}
time!(names::event_loop::REQUEST_TIME, request_time.as_millis());
time!(names::event_loop::SERVICE_TIME, service_time.as_millis());
if let Some(backend_id) = self.backend_id.as_ref() {
if let Some(backend_response_time) = self.backend_response_time() {
record_backend_metrics!(
context.cluster_id.as_str_or("-"),
backend_id,
backend_response_time.as_millis(),
self.backend_connection_time(),
self.backend_bin,
self.backend_bout
);
}
}
incr!(
names::access_logs::COUNT,
context.cluster_id,
context.backend_id
);
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct PeakEWMA {
pub decay: f64,
pub rtt: f64,
pub last_event: Instant,
}
impl Default for PeakEWMA {
fn default() -> Self {
Self::new()
}
}
impl PeakEWMA {
pub fn new() -> Self {
PeakEWMA {
decay: 1_000_000_000f64,
rtt: 50_000_000f64,
last_event: Instant::now(),
}
}
pub fn observe(&mut self, rtt: f64) {
let now = Instant::now();
let dur = now - self.last_event;
if rtt > self.rtt {
self.rtt = rtt;
} else {
let weight = (-(dur.as_nanos() as f64) / self.decay).exp();
self.rtt = self.rtt * weight + rtt * (1.0 - weight);
}
self.last_event = now;
}
pub fn get(&mut self, active_requests: usize) -> f64 {
self.observe(0.0);
(active_requests + 1) as f64 * self.rtt
}
}
pub mod testing {
pub use std::{cell::RefCell, os::fd::IntoRawFd, rc::Rc};
pub use anyhow::Context;
pub use mio::{Poll, Registry, Token, net::UnixStream};
pub use slab::Slab;
pub use sozu_command::{
proto::command::{
HttpListenerConfig, HttpsListenerConfig, ServerConfig, TcpListenerConfig,
},
scm_socket::{Listeners, ScmSocket},
};
pub use crate::{
Protocol, ProxySession,
backends::BackendMap,
http::HttpProxy,
https::HttpsProxy,
pool::Pool,
server::{ListenSession, ProxyChannel, Server, SessionManager},
tcp::TcpProxy,
};
use std::sync::atomic::{AtomicU16, Ordering};
static PORT_PROVIDER: AtomicU16 = AtomicU16::new(10000);
pub fn provide_port() -> u16 {
PORT_PROVIDER.fetch_add(1, Ordering::SeqCst)
}
pub struct ServerParts {
pub event_loop: Poll,
pub registry: Registry,
pub sessions: Rc<RefCell<SessionManager>>,
pub pool: Rc<RefCell<Pool>>,
pub backends: Rc<RefCell<BackendMap>>,
pub client_scm_socket: ScmSocket,
pub server_scm_socket: ScmSocket,
pub server_config: ServerConfig,
}
pub fn prebuild_server(
max_buffers: usize,
buffer_size: usize,
send_scm: bool,
) -> anyhow::Result<ServerParts> {
let event_loop = Poll::new().with_context(|| "Failed at creating event loop")?;
let backends = Rc::new(RefCell::new(BackendMap::new()));
let server_config = ServerConfig {
max_connections: max_buffers as u64,
..Default::default()
};
let pool = Rc::new(RefCell::new(Pool::with_capacity(
1,
max_buffers,
buffer_size,
)));
let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for channel", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::Channel,
})));
}
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for timer", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::Timer,
})));
}
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for metrics", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::Metrics,
})));
}
let sessions = SessionManager::new(sessions, max_buffers, 0, 0);
let registry = event_loop
.registry()
.try_clone()
.with_context(|| "Failed at creating a registry")?;
let (scm_server, scm_client) =
UnixStream::pair().with_context(|| "Failed at creating scm unix stream")?;
let client_scm_socket = ScmSocket::new(scm_client.into_raw_fd())
.with_context(|| "Failed at creating the scm client socket")?;
let server_scm_socket = ScmSocket::new(scm_server.into_raw_fd())
.with_context(|| "Failed at creating the scm server socket")?;
if send_scm {
client_scm_socket
.send_listeners(&Listeners::default())
.with_context(|| "Failed at sending empty listeners")?;
}
Ok(ServerParts {
event_loop,
registry,
sessions,
pool,
backends,
client_scm_socket,
server_scm_socket,
server_config,
})
}
}