mod local_drain;
pub mod names;
mod network_drain;
mod writer;
use std::{
cell::RefCell,
collections::{BTreeMap, HashMap},
io::{self, Write},
net::SocketAddr,
str,
time::{Duration, Instant},
};
use mio::net::UdpSocket;
use sozu_command::config::MetricDetailLevel;
use sozu_command::proto::command::{
FilteredMetrics, MetricsConfiguration, QueryMetricsOptions, ResponseContent,
};
use crate::metrics::{local_drain::LocalDrain, network_drain::NetworkDrain};
fn filter_labels_for_detail<'a>(
detail: MetricDetailLevel,
cluster_id: Option<&'a str>,
backend_id: Option<&'a str>,
) -> (Option<&'a str>, Option<&'a str>) {
match detail {
MetricDetailLevel::Process | MetricDetailLevel::Frontend => (None, None),
MetricDetailLevel::Cluster => (cluster_id, None),
MetricDetailLevel::Backend => (cluster_id, backend_id),
}
}
pub(crate) fn http_status_code_metric_name(status: u16) -> Option<&'static str> {
match status {
200 => Some("http.status.200"),
201 => Some("http.status.201"),
204 => Some("http.status.204"),
301 => Some("http.status.301"),
302 => Some("http.status.302"),
304 => Some("http.status.304"),
400 => Some("http.status.400"),
401 => Some("http.status.401"),
403 => Some("http.status.403"),
404 => Some("http.status.404"),
408 => Some("http.status.408"),
413 => Some("http.status.413"),
429 => Some("http.status.429"),
500 => Some("http.status.500"),
502 => Some("http.status.502"),
503 => Some("http.status.503"),
504 => Some("http.status.504"),
507 => Some("http.status.507"),
_ => None,
}
}
thread_local! {
pub static METRICS: RefCell<Aggregator> = RefCell::new(Aggregator::new(String::from("sozu")));
}
#[derive(thiserror::Error, Debug)]
pub enum MetricError {
#[error("Could not parse udp address {address}: {error}")]
WrongUdpAddress { address: String, error: String },
#[error("Could not bind to udp address {address}: {error}")]
UdpBind { address: String, error: String },
#[error("No metrics found for object with id {0}")]
NoMetrics(String),
#[error("Could not create histogram for time metric {time_metric:?}: {error}")]
HistogramCreation {
time_metric: MetricValue,
error: String,
},
#[error("could not record time metric {time_metric:?}: {error}")]
TimeMetricRecordingError {
time_metric: MetricValue,
error: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MetricValue {
Gauge(usize),
GaugeAdd(i64),
Count(i64),
Time(usize),
}
impl MetricValue {
fn is_time(&self) -> bool {
matches!(self, &MetricValue::Time(_))
}
fn update(&mut self, key: &'static str, m: MetricValue) -> bool {
match (self, m) {
(&mut MetricValue::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
let changed = *v1 != v2;
*v1 = v2;
changed
}
(&mut MetricValue::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
let changed = v2 != 0;
let res = *v1 as i64 + v2;
*v1 = if res >= 0 {
res as usize
} else {
error!(
"metric {} underflow: previous value: {}, adding: {}",
key, v1, v2
);
0
};
changed
}
(&mut MetricValue::Count(ref mut v1), MetricValue::Count(v2)) => {
let changed = v2 != 0;
*v1 += v2;
changed
}
(s, m) => {
error!(
"tried to update metric {} of value {:?} with an incompatible metric: {:?}",
key, s, m
);
false
}
}
}
}
#[derive(Debug, Clone)]
pub struct StoredMetricValue {
last_sent: Instant,
updated: bool,
data: MetricValue,
}
impl StoredMetricValue {
pub fn new(last_sent: Instant, data: MetricValue) -> StoredMetricValue {
let data = if let MetricValue::GaugeAdd(v) = data {
if v >= 0 {
MetricValue::Gauge(v as usize)
} else {
error!(
"stored metric created with negative GaugeAdd({}), clamping to 0",
v
);
MetricValue::Gauge(0)
}
} else {
data
};
StoredMetricValue {
last_sent,
updated: true,
data,
}
}
pub fn update(&mut self, key: &'static str, m: MetricValue) {
let updated = self.data.update(key, m);
if !self.updated {
self.updated = updated;
}
}
}
pub fn setup<O: Into<String>>(
metrics_host: &SocketAddr,
origin: O,
use_tagged_metrics: bool,
prefix: Option<String>,
detail: MetricDetailLevel,
) -> Result<(), MetricError> {
let metrics_socket = udp_bind()?;
debug!(
"setting up metrics: local address = {:#?}",
metrics_socket.local_addr()
);
METRICS.with(|metrics| {
if let Some(p) = prefix {
(*metrics.borrow_mut()).set_up_prefix(p);
}
(*metrics.borrow_mut()).set_up_remote(metrics_socket, *metrics_host);
(*metrics.borrow_mut()).set_up_origin(origin.into());
(*metrics.borrow_mut()).set_up_tagged_metrics(use_tagged_metrics);
(*metrics.borrow_mut()).set_up_detail(detail);
});
Ok(())
}
pub trait Subscriber {
fn receive_metric(
&mut self,
label: &'static str,
cluster_id: Option<&str>,
backend_id: Option<&str>,
metric: MetricValue,
);
}
const LEASE_TICK_INTERVAL: Duration = Duration::from_secs(5);
pub const LEASE_TTL_MAX: Duration = Duration::from_secs(300);
pub const LEASE_TTL_DEFAULT: Duration = Duration::from_secs(60);
pub const LEASE_TABLE_CAP: usize = 64;
pub const LEASE_CLIENT_ID_MAX_BYTES: usize = 64;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LeaseApplyOutcome {
Applied {
previous_effective: MetricDetailLevel,
new_effective: MetricDetailLevel,
},
ClientIdTooLong,
TableFull,
TtlOutOfRange,
Unauthorized,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct PeerBinding {
pub pid: Option<i32>,
pub session_ulid: Option<u128>,
}
impl PeerBinding {
pub fn is_known(&self) -> bool {
self.pid.is_some() && self.session_ulid.is_some()
}
pub fn matches(&self, other: &PeerBinding) -> bool {
self.is_known() && self.pid == other.pid && self.session_ulid == other.session_ulid
}
}
#[derive(Clone, Copy, Debug)]
struct LeaseEntry {
level: MetricDetailLevel,
expires_at: Instant,
binding: PeerBinding,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LeaseClearOutcome {
Cleared {
previous_effective: MetricDetailLevel,
},
NotFound,
Unauthorized,
}
pub struct Aggregator {
prefix: String,
network: Option<NetworkDrain>,
local: LocalDrain,
configured: MetricDetailLevel,
effective: MetricDetailLevel,
leases: HashMap<String, LeaseEntry>,
last_lease_tick: Instant,
}
impl Aggregator {
pub fn new(prefix: String) -> Aggregator {
let default_detail = MetricDetailLevel::default();
Aggregator {
prefix: prefix.clone(),
network: None,
local: LocalDrain::new(prefix),
configured: default_detail,
effective: default_detail,
leases: HashMap::new(),
last_lease_tick: Instant::now(),
}
}
pub fn set_up_prefix(&mut self, prefix: String) {
self.prefix = prefix;
}
pub fn set_up_remote(&mut self, socket: UdpSocket, addr: SocketAddr) {
self.network = Some(NetworkDrain::new(self.prefix.clone(), socket, addr));
}
pub fn set_up_origin(&mut self, origin: String) {
if let Some(n) = self.network.as_mut() {
n.origin = origin;
}
}
pub fn set_up_tagged_metrics(&mut self, tagged: bool) {
if let Some(n) = self.network.as_mut() {
n.use_tagged_metrics = tagged;
}
}
pub fn set_up_detail(&mut self, detail: MetricDetailLevel) {
self.configured = detail;
self.recompute_effective();
}
pub fn detail_configured(&self) -> MetricDetailLevel {
self.configured
}
pub fn detail_effective(&self) -> MetricDetailLevel {
self.effective
}
pub fn lease_apply(
&mut self,
client_id: String,
level: MetricDetailLevel,
ttl: Duration,
binding: PeerBinding,
) -> LeaseApplyOutcome {
if client_id.len() > LEASE_CLIENT_ID_MAX_BYTES {
return LeaseApplyOutcome::ClientIdTooLong;
}
if ttl > LEASE_TTL_MAX {
return LeaseApplyOutcome::TtlOutOfRange;
}
let is_renewal = self.leases.contains_key(&client_id);
if !is_renewal && self.leases.len() >= LEASE_TABLE_CAP {
return LeaseApplyOutcome::TableFull;
}
if is_renewal
&& let Some(entry) = self.leases.get(&client_id)
&& entry.binding.is_known()
&& !entry.binding.matches(&binding)
{
return LeaseApplyOutcome::Unauthorized;
}
let expires_at = Instant::now() + ttl;
self.leases.insert(
client_id,
LeaseEntry {
level,
expires_at,
binding,
},
);
let previous_effective = self.effective;
self.recompute_effective();
LeaseApplyOutcome::Applied {
previous_effective,
new_effective: self.effective,
}
}
pub fn lease_clear(&mut self, client_id: &str, presented: PeerBinding) -> LeaseClearOutcome {
let Some(entry) = self.leases.get(client_id) else {
return LeaseClearOutcome::NotFound;
};
if entry.binding.is_known() && !entry.binding.matches(&presented) {
return LeaseClearOutcome::Unauthorized;
}
self.leases.remove(client_id);
let previous = self.effective;
self.recompute_effective();
LeaseClearOutcome::Cleared {
previous_effective: previous,
}
}
pub fn lease_count(&self) -> u32 {
self.leases.len() as u32
}
pub fn lease_tick(&mut self, now: Instant) -> Option<MetricDetailLevel> {
self.last_lease_tick = now;
let before = self.leases.len();
self.leases.retain(|_, entry| entry.expires_at > now);
if self.leases.len() == before {
return None;
}
let previous = self.effective;
self.recompute_effective();
if previous != self.effective {
Some(previous)
} else {
None
}
}
pub fn lease_tick_due(&self, now: Instant) -> bool {
now.duration_since(self.last_lease_tick) >= LEASE_TICK_INTERVAL
}
fn recompute_effective(&mut self) {
let mut max_lease = self.configured;
for entry in self.leases.values() {
if entry.level > max_lease {
max_lease = entry.level;
}
}
self.effective = max_lease;
}
pub fn socket(&self) -> Option<&UdpSocket> {
self.network.as_ref().map(|n| &n.remote.get_ref().socket)
}
pub fn socket_mut(&mut self) -> Option<&mut UdpSocket> {
self.network
.as_mut()
.map(|n| &mut n.remote.get_mut().socket)
}
pub fn count_add(&mut self, key: &'static str, count_value: i64) {
self.receive_metric(key, None, None, MetricValue::Count(count_value));
}
pub fn set_gauge(&mut self, key: &'static str, gauge_value: usize) {
self.receive_metric(key, None, None, MetricValue::Gauge(gauge_value));
}
pub fn gauge_add(&mut self, key: &'static str, gauge_value: i64) {
self.receive_metric(key, None, None, MetricValue::GaugeAdd(gauge_value));
}
pub fn writable(&mut self) {
if let Some(ref mut net) = self.network.as_mut() {
net.writable();
}
}
pub fn send_data(&mut self) {
if let Some(ref mut net) = self.network.as_mut() {
net.send_metrics();
}
}
pub fn dump_local_proxy_metrics(&mut self) -> BTreeMap<String, FilteredMetrics> {
self.local.dump_proxy_metrics(&Vec::new())
}
pub fn query(&mut self, q: &QueryMetricsOptions) -> Result<ResponseContent, MetricError> {
self.local.query(q)
}
pub fn clear_local(&mut self) {
if let Some(ref mut net) = self.network.as_mut() {
net.clear();
}
self.local.clear();
}
pub fn configure(&mut self, config: &MetricsConfiguration) {
self.local.configure(config);
}
pub fn remove_cluster(&mut self, cluster_id: &str) {
if let Some(ref mut net) = self.network.as_mut() {
net.remove_cluster(cluster_id);
}
self.local.remove_cluster(cluster_id);
}
pub fn add_cluster(&mut self, cluster_id: &str) {
if let Some(ref mut net) = self.network.as_mut() {
net.add_cluster(cluster_id);
}
self.local.add_cluster(cluster_id);
}
pub fn remove_backend(&mut self, cluster_id: &str, backend_id: &str) {
if let Some(ref mut net) = self.network.as_mut() {
net.remove_backend(cluster_id, backend_id);
}
self.local.remove_backend(cluster_id, backend_id);
}
}
impl Subscriber for Aggregator {
fn receive_metric(
&mut self,
label: &'static str,
cluster_id: Option<&str>,
backend_id: Option<&str>,
metric: MetricValue,
) {
let (cluster_id, backend_id) =
filter_labels_for_detail(self.effective, cluster_id, backend_id);
if let Some(ref mut net) = self.network.as_mut() {
net.receive_metric(label, cluster_id, backend_id, metric.to_owned());
}
self.local
.receive_metric(label, cluster_id, backend_id, metric);
}
}
pub struct MetricSocket {
pub addr: SocketAddr,
pub socket: UdpSocket,
}
impl Write for MetricSocket {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.socket.send_to(buf, self.addr)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub fn udp_bind() -> Result<UdpSocket, MetricError> {
let address = "0.0.0.0:0";
let udp_address =
address
.parse::<SocketAddr>()
.map_err(|parse_error| MetricError::WrongUdpAddress {
address: address.to_owned(),
error: parse_error.to_string(),
})?;
UdpSocket::bind(udp_address).map_err(|parse_error| MetricError::UdpBind {
address: udp_address.to_string(),
error: parse_error.to_string(),
})
}
#[macro_export]
macro_rules! count (
($key:expr, $value: expr) => ({
let v = $value;
$crate::metrics::METRICS.with(|metrics| {
(*metrics.borrow_mut()).count_add($key, v);
});
})
);
#[macro_export]
macro_rules! incr (
($key:expr) => (count!($key, 1));
($key:expr, $cluster_id:expr, $backend_id:expr) => {
{
use $crate::metrics::Subscriber;
$crate::metrics::METRICS.with(|metrics| {
(*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Count(1));
});
}
}
);
#[macro_export]
macro_rules! decr (
($key:expr) => (count!($key, -1))
);
#[macro_export]
macro_rules! gauge (
($key:expr, $value: expr) => ({
let v = $value;
$crate::metrics::METRICS.with(|metrics| {
(*metrics.borrow_mut()).set_gauge($key, v);
});
});
($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
{
use $crate::metrics::Subscriber;
let v = $value;
$crate::metrics::METRICS.with(|metrics| {
(*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Gauge(v as usize));
});
}
}
);
#[macro_export]
macro_rules! gauge_add (
($key:expr, $value: expr) => ({
let v = $value;
$crate::metrics::METRICS.with(|metrics| {
(*metrics.borrow_mut()).gauge_add($key, v);
});
});
($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
{
use $crate::metrics::Subscriber;
let v = $value;
$crate::metrics::METRICS.with(|metrics| {
(*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::GaugeAdd(v));
});
}
}
);
#[macro_export]
macro_rules! time (
($key:expr, $value: expr) => ({
use $crate::metrics::{MetricValue,Subscriber};
let v = $value;
$crate::metrics::METRICS.with(|metrics| {
let m = &mut *metrics.borrow_mut();
m.receive_metric($key, None, None, MetricValue::Time(v as usize));
});
});
($key:expr, $cluster_id:expr, $value: expr) => ({
use $crate::metrics::{MetricValue,Subscriber};
let v = $value;
$crate::metrics::METRICS.with(|metrics| {
let m = &mut *metrics.borrow_mut();
let cluster: &str = $cluster_id;
m.receive_metric($key, Some(cluster), None, MetricValue::Time(v as usize));
});
})
);
#[macro_export]
macro_rules! record_backend_metrics (
($cluster_id:expr, $backend_id:expr, $response_time: expr, $backend_connection_time: expr, $bin: expr, $bout: expr) => {
use $crate::metrics::{MetricValue,Subscriber};
$crate::metrics::METRICS.with(|metrics| {
let m = &mut *metrics.borrow_mut();
let cluster_id: &str = $cluster_id;
let backend_id: &str = $backend_id;
m.receive_metric($crate::metrics::names::backend::BYTES_IN, Some(cluster_id), Some(backend_id), MetricValue::Count($bin as i64));
m.receive_metric($crate::metrics::names::backend::BYTES_OUT, Some(cluster_id), Some(backend_id), MetricValue::Count($bout as i64));
m.receive_metric($crate::metrics::names::backend::RESPONSE_TIME, Some(cluster_id), Some(backend_id), MetricValue::Time($response_time as usize));
if let Some(t) = $backend_connection_time {
m.receive_metric($crate::metrics::names::backend::CONNECTION_TIME, Some(cluster_id), Some(backend_id), MetricValue::Time(t.as_millis() as usize));
}
m.receive_metric($crate::metrics::names::backend::REQUESTS, Some(cluster_id), Some(backend_id), MetricValue::Count(1));
});
}
);
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn filter_labels_process_drops_both() {
assert_eq!(
filter_labels_for_detail(MetricDetailLevel::Process, Some("c"), Some("b")),
(None, None),
);
}
#[test]
fn filter_labels_frontend_drops_both_today() {
assert_eq!(
filter_labels_for_detail(MetricDetailLevel::Frontend, Some("c"), Some("b")),
(None, None),
);
}
#[test]
fn filter_labels_cluster_keeps_cluster_drops_backend() {
assert_eq!(
filter_labels_for_detail(MetricDetailLevel::Cluster, Some("c"), Some("b")),
(Some("c"), None),
);
}
#[test]
fn filter_labels_backend_keeps_both() {
assert_eq!(
filter_labels_for_detail(MetricDetailLevel::Backend, Some("c"), Some("b")),
(Some("c"), Some("b")),
);
}
#[test]
fn filter_labels_none_in_none_out() {
for detail in [
MetricDetailLevel::Process,
MetricDetailLevel::Frontend,
MetricDetailLevel::Cluster,
MetricDetailLevel::Backend,
] {
assert_eq!(filter_labels_for_detail(detail, None, None), (None, None));
}
}
#[test]
fn aggregator_default_detail_is_cluster() {
let agg = Aggregator::new("sozu".to_owned());
assert_eq!(agg.detail_configured(), MetricDetailLevel::Cluster);
assert_eq!(agg.detail_effective(), MetricDetailLevel::Cluster);
assert_eq!(agg.lease_count(), 0);
}
fn owner_binding() -> PeerBinding {
PeerBinding {
pid: Some(1234),
session_ulid: Some(0x0123_4567_89ab_cdef_0123_4567_89ab_cdefu128),
}
}
fn other_binding() -> PeerBinding {
PeerBinding {
pid: Some(5678),
session_ulid: Some(0xfedc_ba98_7654_3210_fedc_ba98_7654_3210u128),
}
}
fn unwrap_applied(outcome: LeaseApplyOutcome) -> (MetricDetailLevel, MetricDetailLevel) {
match outcome {
LeaseApplyOutcome::Applied {
previous_effective,
new_effective,
} => (previous_effective, new_effective),
other => panic!("expected LeaseApplyOutcome::Applied, got {other:?}"),
}
}
#[test]
fn lease_apply_elevates_effective_above_configured() {
let mut agg = Aggregator::new("sozu".to_owned());
agg.set_up_detail(MetricDetailLevel::Cluster);
let (prev, new) = unwrap_applied(agg.lease_apply(
"test:1".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
PeerBinding::default(),
));
assert_eq!(prev, MetricDetailLevel::Cluster);
assert_eq!(new, MetricDetailLevel::Backend);
assert_eq!(agg.detail_configured(), MetricDetailLevel::Cluster);
assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
assert_eq!(agg.lease_count(), 1);
}
#[test]
fn lease_apply_below_configured_does_not_lower_effective() {
let mut agg = Aggregator::new("sozu".to_owned());
agg.set_up_detail(MetricDetailLevel::Backend);
let (prev, new) = unwrap_applied(agg.lease_apply(
"test:1".to_owned(),
MetricDetailLevel::Cluster,
Duration::from_secs(60),
PeerBinding::default(),
));
assert_eq!(prev, MetricDetailLevel::Backend);
assert_eq!(new, MetricDetailLevel::Backend);
}
#[test]
fn lease_apply_rejects_client_id_over_cap() {
let mut agg = Aggregator::new("sozu".to_owned());
let too_long = "x".repeat(LEASE_CLIENT_ID_MAX_BYTES + 1);
assert_eq!(
agg.lease_apply(
too_long,
MetricDetailLevel::Backend,
Duration::from_secs(60),
PeerBinding::default(),
),
LeaseApplyOutcome::ClientIdTooLong
);
assert_eq!(agg.lease_count(), 0);
}
#[test]
fn lease_apply_rejects_when_table_is_full() {
let mut agg = Aggregator::new("sozu".to_owned());
for i in 0..LEASE_TABLE_CAP {
assert!(matches!(
agg.lease_apply(
format!("client:{i:02}"),
MetricDetailLevel::Backend,
Duration::from_secs(60),
PeerBinding::default(),
),
LeaseApplyOutcome::Applied { .. }
));
}
assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
assert_eq!(
agg.lease_apply(
"newcomer".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
PeerBinding::default(),
),
LeaseApplyOutcome::TableFull,
);
assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
assert!(matches!(
agg.lease_apply(
"client:00".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(120),
PeerBinding::default(),
),
LeaseApplyOutcome::Applied { .. }
));
assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
}
#[test]
fn lease_apply_rejects_ttl_over_max() {
let mut agg = Aggregator::new("sozu".to_owned());
assert_eq!(
agg.lease_apply(
"client:0".to_owned(),
MetricDetailLevel::Backend,
LEASE_TTL_MAX + Duration::from_secs(1),
PeerBinding::default(),
),
LeaseApplyOutcome::TtlOutOfRange,
);
assert_eq!(agg.lease_count(), 0);
}
#[test]
fn lease_apply_renewal_replaces_previous_for_same_client() {
let mut agg = Aggregator::new("sozu".to_owned());
let _ = agg.lease_apply(
"renewer".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(30),
PeerBinding::default(),
);
let _ = agg.lease_apply(
"renewer".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
PeerBinding::default(),
);
assert_eq!(agg.lease_count(), 1);
}
#[test]
fn lease_apply_renewal_rejects_foreign_binding() {
let mut agg = Aggregator::new("sozu".to_owned());
let victim = PeerBinding {
pid: Some(4242),
session_ulid: Some(0x0123_4567_89AB_CDEF_FEDC_BA98_7654_3210),
};
let outcome = agg.lease_apply(
"topcli".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
victim,
);
assert!(
matches!(outcome, LeaseApplyOutcome::Applied { .. }),
"victim's initial apply must succeed"
);
let attacker = PeerBinding {
pid: Some(9999),
session_ulid: Some(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF),
};
let outcome = agg.lease_apply(
"topcli".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
attacker,
);
assert_eq!(
outcome,
LeaseApplyOutcome::Unauthorized,
"renewal with a mismatched known binding must be refused"
);
let clear = agg.lease_clear("topcli", victim);
assert!(
matches!(clear, LeaseClearOutcome::Cleared { .. }),
"victim's original binding must still clear cleanly after \
the foreign-binding renewal was refused"
);
}
#[test]
fn lease_apply_renewal_with_matching_binding_succeeds() {
let mut agg = Aggregator::new("sozu".to_owned());
let owner = PeerBinding {
pid: Some(1234),
session_ulid: Some(0xAAAA_BBBB_CCCC_DDDD_EEEE_FFFF_0000_1111),
};
let _ = agg.lease_apply(
"topcli".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(30),
owner,
);
let outcome = agg.lease_apply(
"topcli".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
owner,
);
assert!(
matches!(outcome, LeaseApplyOutcome::Applied { .. }),
"renewal with matching binding must succeed (otherwise the \
TUI's own renewer thread would be locked out)"
);
}
#[test]
fn lease_apply_max_merge_two_clients() {
let mut agg = Aggregator::new("sozu".to_owned());
agg.set_up_detail(MetricDetailLevel::Process);
let _ = agg.lease_apply(
"scraper".to_owned(),
MetricDetailLevel::Frontend,
Duration::from_secs(60),
PeerBinding::default(),
);
let _ = agg.lease_apply(
"topcli".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
PeerBinding::default(),
);
assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
assert_eq!(agg.lease_count(), 2);
let outcome = agg.lease_clear("topcli", PeerBinding::default());
assert_eq!(
outcome,
LeaseClearOutcome::Cleared {
previous_effective: MetricDetailLevel::Backend,
}
);
assert_eq!(agg.detail_effective(), MetricDetailLevel::Frontend);
assert_eq!(agg.lease_count(), 1);
}
#[test]
fn lease_clear_unknown_id_is_silent_noop() {
let mut agg = Aggregator::new("sozu".to_owned());
let _ = agg.lease_apply(
"real".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
PeerBinding::default(),
);
assert_eq!(
agg.lease_clear("ghost", PeerBinding::default()),
LeaseClearOutcome::NotFound
);
assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
assert_eq!(agg.lease_count(), 1);
}
#[test]
fn lease_clear_with_matching_binding_authorised() {
let mut agg = Aggregator::new("sozu".to_owned());
let _ = agg.lease_apply(
"owner-lease".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
owner_binding(),
);
let outcome = agg.lease_clear("owner-lease", owner_binding());
assert!(matches!(outcome, LeaseClearOutcome::Cleared { .. }));
assert_eq!(agg.lease_count(), 0);
}
#[test]
fn lease_clear_with_mismatched_binding_is_unauthorized() {
let mut agg = Aggregator::new("sozu".to_owned());
let _ = agg.lease_apply(
"owner-lease".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
owner_binding(),
);
let outcome = agg.lease_clear("owner-lease", other_binding());
assert_eq!(outcome, LeaseClearOutcome::Unauthorized);
assert_eq!(agg.lease_count(), 1);
}
#[test]
fn lease_clear_unknown_apply_binding_accepts_any_clear() {
let mut agg = Aggregator::new("sozu".to_owned());
let _ = agg.lease_apply(
"legacy".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
PeerBinding::default(),
);
let outcome = agg.lease_clear("legacy", owner_binding());
assert!(matches!(outcome, LeaseClearOutcome::Cleared { .. }));
assert_eq!(agg.lease_count(), 0);
}
#[test]
fn lease_clear_known_apply_rejects_default_clear() {
let mut agg = Aggregator::new("sozu".to_owned());
let _ = agg.lease_apply(
"owner-lease".to_owned(),
MetricDetailLevel::Backend,
Duration::from_secs(60),
owner_binding(),
);
let outcome = agg.lease_clear("owner-lease", PeerBinding::default());
assert_eq!(outcome, LeaseClearOutcome::Unauthorized);
}
#[test]
fn lease_tick_expires_only_past_due_leases() {
let mut agg = Aggregator::new("sozu".to_owned());
agg.set_up_detail(MetricDetailLevel::Process);
let now = Instant::now();
agg.leases.insert(
"expired".to_owned(),
LeaseEntry {
level: MetricDetailLevel::Backend,
expires_at: now - Duration::from_secs(1),
binding: PeerBinding::default(),
},
);
agg.leases.insert(
"live".to_owned(),
LeaseEntry {
level: MetricDetailLevel::Frontend,
expires_at: now + Duration::from_secs(60),
binding: PeerBinding::default(),
},
);
agg.recompute_effective();
assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
let prev = agg.lease_tick(now);
assert_eq!(prev, Some(MetricDetailLevel::Backend));
assert_eq!(agg.detail_effective(), MetricDetailLevel::Frontend);
assert_eq!(agg.lease_count(), 1);
}
#[test]
fn lease_tick_no_change_returns_none() {
let mut agg = Aggregator::new("sozu".to_owned());
assert!(agg.lease_tick(Instant::now()).is_none());
}
#[test]
fn lease_apply_at_max_ttl_succeeds() {
let mut agg = Aggregator::new("sozu".to_owned());
let now = Instant::now();
let outcome = agg.lease_apply(
"max".to_owned(),
MetricDetailLevel::Backend,
LEASE_TTL_MAX,
PeerBinding::default(),
);
assert!(matches!(outcome, LeaseApplyOutcome::Applied { .. }));
let entry = agg.leases.get("max").unwrap();
assert!(entry.expires_at <= now + LEASE_TTL_MAX + Duration::from_millis(50));
}
}