pub mod flow;
pub mod manager;
pub mod proxy_protocol;
use std::{net::SocketAddr, time::Duration};
use sozu_command::state::ClusterId;
pub use crate::protocol::udp::{
flow::{CloseReason, UdpFlow},
manager::{FlowKeyExtractor, SourceTupleExtractor, UdpManager},
};
pub type FlowId = usize;
pub type BackendId = String;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct FlowKey {
pub src: SocketAddr,
}
impl FlowKey {
pub fn from_src(src: SocketAddr, with_port: bool) -> Self {
if with_port {
FlowKey { src }
} else {
let mut src = src;
src.set_port(0);
FlowKey { src }
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Transmit {
pub dst: SocketAddr,
pub segment_size: Option<usize>,
pub payload: Vec<u8>,
}
#[derive(Debug)]
pub enum ManagerInput<'a> {
ClientDatagram {
src: SocketAddr,
payload: &'a [u8],
},
BackendDatagram {
flow: FlowId,
payload: &'a [u8],
},
Config(ConfigEvent),
BackendResolved {
flow: FlowId,
backend: BackendId,
addr: SocketAddr,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Output {
SelectBackend {
flow: FlowId,
cluster: ClusterId,
key: u64,
},
OpenUpstream {
flow: FlowId,
backend: SocketAddr,
},
SendToBackend(Transmit),
SendToClient(Transmit),
ArmTimer(std::time::Instant),
Metric(MetricEvent),
CloseFlow(FlowId),
Drop(DropReason),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum DropReason {
Invalid,
Truncated,
NoBackend,
Shed,
UnknownFlow,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum MetricEvent {
FlowCreated,
FlowEvicted,
FlowShed,
DatagramIn(usize),
DatagramOut(usize),
DatagramDropped(DropReason),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ClusterConfig {
pub cluster: ClusterId,
pub affinity_with_port: bool,
pub responses: u32,
pub requests: u32,
pub front_timeout: Duration,
pub back_timeout: Duration,
pub send_proxy_protocol: bool,
pub proxy_protocol_every_datagram: bool,
}
impl Default for ClusterConfig {
fn default() -> Self {
ClusterConfig {
cluster: String::new(),
affinity_with_port: false,
responses: 0,
requests: 0,
front_timeout: Duration::from_secs(30),
back_timeout: Duration::from_secs(30),
send_proxy_protocol: false,
proxy_protocol_every_datagram: false,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ConfigEvent {
SetCluster(ClusterConfig),
SetMaxFlows(usize),
SetMaxRxDatagramSize(usize),
Drain,
}
#[allow(unused_macros)]
macro_rules! log_context {
($self:expr) => {{
let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
format!(
"[- - - -]\t{open}UDP{reset}\t{grey}Manager{reset}({gray}flows{reset}={white}{flows}{reset}, {gray}max_flows{reset}={white}{max_flows}{reset}, {gray}draining{reset}={white}{draining}{reset})\t >>>",
open = open,
reset = reset,
grey = grey,
gray = gray,
white = white,
flows = $self.flow_count(),
max_flows = $self.max_flows(),
draining = $self.is_draining(),
)
}};
}
#[allow(unused_macros)]
macro_rules! log_context_lite {
($flow_id:expr, $flow:expr) => {{
let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
format!(
"[- - - -]\t{open}UDP-FLOW{reset}\t{grey}Flow{reset}({gray}id{reset}={white}{id}{reset}, {gray}client{reset}={white}{client}{reset}, {gray}backend{reset}={white}{backend:?}{reset})\t >>>",
open = open,
reset = reset,
grey = grey,
gray = gray,
white = white,
id = $flow_id,
client = $flow.client,
backend = $flow.backend_addr,
)
}};
}
#[allow(unused_imports)]
pub(crate) use {log_context, log_context_lite};