Skip to main content

sozu_lib/metrics/
mod.rs

1//! Metrics façade.
2//!
3//! Defines the per-thread `Aggregator`, the `incr!`/`count!`/`gauge!`/
4//! `gauge_add!`/`time!` macros consumed across the lib + bin crates, the
5//! local-vs-network drain dispatch, and the label allow/deny filtering
6//! used to keep cardinality bounded. Gauge underflow is treated as a
7//! correctness bug (saturating clamp + warn log), not a rounding artefact.
8
9mod local_drain;
10pub mod names;
11mod network_drain;
12mod writer;
13
14use std::{
15    cell::RefCell,
16    collections::{BTreeMap, HashMap},
17    io::{self, Write},
18    net::SocketAddr,
19    str,
20    time::{Duration, Instant},
21};
22
23use mio::net::UdpSocket;
24use sozu_command::config::MetricDetailLevel;
25use sozu_command::proto::command::{
26    FilteredMetrics, MetricsConfiguration, QueryMetricsOptions, ResponseContent,
27};
28
29use crate::metrics::{local_drain::LocalDrain, network_drain::NetworkDrain};
30
31/// Filter `(cluster_id, backend_id)` labels at emission time according to the
32/// configured [`MetricDetailLevel`]. Each level is a SUPERSET of the previous
33/// in cardinality terms:
34///
35/// - `Process`: drop both labels — proxy-only counters (smallest keyspace).
36/// - `Frontend`: same as `Process` today; reserved for the per-listener
37///   counters tracked as a follow-up. Listed in the proto + config so
38///   operators can opt in once per-listener labels land.
39/// - `Cluster`: keep `cluster_id`, drop `backend_id` — current default.
40/// - `Backend`: keep both — current historical behaviour.
41///
42/// Pure function so the filter can be unit-tested exhaustively without the
43/// drain machinery in scope.
44fn filter_labels_for_detail<'a>(
45    detail: MetricDetailLevel,
46    cluster_id: Option<&'a str>,
47    backend_id: Option<&'a str>,
48) -> (Option<&'a str>, Option<&'a str>) {
49    match detail {
50        MetricDetailLevel::Process | MetricDetailLevel::Frontend => (None, None),
51        MetricDetailLevel::Cluster => (cluster_id, None),
52        MetricDetailLevel::Backend => (cluster_id, backend_id),
53    }
54}
55
56/// Map a numeric HTTP status code to its dedicated counter name, if any.
57///
58/// Returns `Some("http.status.<code>")` for the eighteen status codes Sōzu
59/// either generates as a default answer or that operators routinely chart
60/// (`200/201/204`, `301/302/304`, `400/401/403/404/408/413/429`, plus the
61/// `5xx` family Sōzu can synthesise — `500/502/503/504/507`). All other
62/// codes return `None` so the bucket counter (`http.status.{1xx,…,other}`)
63/// remains the sole emission and metric cardinality stays bounded.
64///
65/// Hoisted out of the protocol modules so H1 (`kawa_h1::save_http_status_metric`)
66/// and H2 (`mux::stream::generate_access_log`) cannot drift on which codes
67/// get a per-code counter.
68pub(crate) fn http_status_code_metric_name(status: u16) -> Option<&'static str> {
69    match status {
70        200 => Some("http.status.200"),
71        201 => Some("http.status.201"),
72        204 => Some("http.status.204"),
73        301 => Some("http.status.301"),
74        302 => Some("http.status.302"),
75        304 => Some("http.status.304"),
76        400 => Some("http.status.400"),
77        401 => Some("http.status.401"),
78        403 => Some("http.status.403"),
79        404 => Some("http.status.404"),
80        408 => Some("http.status.408"),
81        413 => Some("http.status.413"),
82        429 => Some("http.status.429"),
83        500 => Some("http.status.500"),
84        502 => Some("http.status.502"),
85        503 => Some("http.status.503"),
86        504 => Some("http.status.504"),
87        507 => Some("http.status.507"),
88        _ => None,
89    }
90}
91
92thread_local! {
93  pub static METRICS: RefCell<Aggregator> = RefCell::new(Aggregator::new(String::from("sozu")));
94}
95
96#[derive(thiserror::Error, Debug)]
97pub enum MetricError {
98    #[error("Could not parse udp address {address}: {error}")]
99    WrongUdpAddress { address: String, error: String },
100    #[error("Could not bind to udp address {address}: {error}")]
101    UdpBind { address: String, error: String },
102    #[error("No metrics found for object with id {0}")]
103    NoMetrics(String),
104    #[error("Could not create histogram for time metric {time_metric:?}: {error}")]
105    HistogramCreation {
106        time_metric: MetricValue,
107        error: String,
108    },
109    #[error("could not record time metric {time_metric:?}: {error}")]
110    TimeMetricRecordingError {
111        time_metric: MetricValue,
112        error: String,
113    },
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub enum MetricValue {
118    Gauge(usize),
119    GaugeAdd(i64),
120    Count(i64),
121    Time(usize),
122}
123
124impl MetricValue {
125    fn is_time(&self) -> bool {
126        matches!(self, &MetricValue::Time(_))
127    }
128
129    fn update(&mut self, key: &'static str, m: MetricValue) -> bool {
130        match (self, m) {
131            (&mut MetricValue::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
132                let changed = *v1 != v2;
133                *v1 = v2;
134                changed
135            }
136            (&mut MetricValue::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
137                // Saturating clamp + `error!` log on underflow, symmetric
138                // with `AggregatedMetric::update` in `local_drain.rs`. The
139                // `debug_assert!` that lived here previously made debug
140                // builds panic while release silently clamped; H2
141                // `ConnectionH2::Drop` rebalance + the saturating clamps
142                // make underflow structurally impossible from live code,
143                // and a panic on a metric mismatch is too aggressive — the
144                // log line names the offending key for observability.
145                let changed = v2 != 0;
146                let res = *v1 as i64 + v2;
147                *v1 = if res >= 0 {
148                    res as usize
149                } else {
150                    error!(
151                        "metric {} underflow: previous value: {}, adding: {}",
152                        key, v1, v2
153                    );
154                    0
155                };
156
157                changed
158            }
159            (&mut MetricValue::Count(ref mut v1), MetricValue::Count(v2)) => {
160                let changed = v2 != 0;
161                *v1 += v2;
162                changed
163            }
164            (s, m) => {
165                // Symmetric with `AggregatedMetric::update` in `local_drain.rs`:
166                // a type-mismatch is a coding bug (a `metric_name` was
167                // re-emitted with a different `MetricValue` variant), not a
168                // network-driven event, but log-and-continue is still the
169                // right policy — taking the worker process down on a metric
170                // bookkeeping mismatch is too aggressive for a single-bug
171                // recovery path. The log line names the offending key.
172                error!(
173                    "tried to update metric {} of value {:?} with an incompatible metric: {:?}",
174                    key, s, m
175                );
176                false
177            }
178        }
179    }
180}
181
182#[derive(Debug, Clone)]
183pub struct StoredMetricValue {
184    last_sent: Instant,
185    updated: bool,
186    data: MetricValue,
187}
188
189impl StoredMetricValue {
190    pub fn new(last_sent: Instant, data: MetricValue) -> StoredMetricValue {
191        // Symmetric with `AggregatedMetric::new`: a first emission with a
192        // negative `GaugeAdd` is a coding bug (a `-1` ran with no paired
193        // `+1` earlier in the worker's lifetime); clamp to 0 and log so the
194        // offending pattern shows up in operator logs alongside the
195        // local-drain side.
196        let data = if let MetricValue::GaugeAdd(v) = data {
197            if v >= 0 {
198                MetricValue::Gauge(v as usize)
199            } else {
200                error!(
201                    "stored metric created with negative GaugeAdd({}), clamping to 0",
202                    v
203                );
204                MetricValue::Gauge(0)
205            }
206        } else {
207            data
208        };
209        StoredMetricValue {
210            last_sent,
211            updated: true,
212            data,
213        }
214    }
215
216    pub fn update(&mut self, key: &'static str, m: MetricValue) {
217        let updated = self.data.update(key, m);
218        if !self.updated {
219            self.updated = updated;
220        }
221    }
222}
223
224pub fn setup<O: Into<String>>(
225    metrics_host: &SocketAddr,
226    origin: O,
227    use_tagged_metrics: bool,
228    prefix: Option<String>,
229    detail: MetricDetailLevel,
230) -> Result<(), MetricError> {
231    let metrics_socket = udp_bind()?;
232
233    debug!(
234        "setting up metrics: local address = {:#?}",
235        metrics_socket.local_addr()
236    );
237
238    METRICS.with(|metrics| {
239        if let Some(p) = prefix {
240            (*metrics.borrow_mut()).set_up_prefix(p);
241        }
242        (*metrics.borrow_mut()).set_up_remote(metrics_socket, *metrics_host);
243        (*metrics.borrow_mut()).set_up_origin(origin.into());
244        (*metrics.borrow_mut()).set_up_tagged_metrics(use_tagged_metrics);
245        (*metrics.borrow_mut()).set_up_detail(detail);
246    });
247    Ok(())
248}
249
250pub trait Subscriber {
251    fn receive_metric(
252        &mut self,
253        label: &'static str,
254        cluster_id: Option<&str>,
255        backend_id: Option<&str>,
256        metric: MetricValue,
257    );
258}
259
260/// How often `lease_tick` actually does work; cheaper than recomputing the
261/// effective level on every metric emission. Polled at the top of the worker's
262/// `notify` loop, so the cadence floats with traffic but is bounded by this.
263const LEASE_TICK_INTERVAL: Duration = Duration::from_secs(5);
264
265/// Hard cap on lease TTL, mirroring the proto comment on `SetMetricDetail`.
266/// Bounds the worst-case effect of a stuck renewer.
267pub const LEASE_TTL_MAX: Duration = Duration::from_secs(300);
268
269/// Default lease TTL applied when the proto request omits `ttl_seconds` (or
270/// passes `0`). Matches the proto comment.
271pub const LEASE_TTL_DEFAULT: Duration = Duration::from_secs(60);
272
273/// Hard cap on the number of simultaneous leases held by the aggregator.
274/// `lease_apply` rejects new entries (with [`LeaseApplyOutcome::Capped`])
275/// once the table reaches this size. Bounds the lease table's memory and
276/// neutralises the CWE-770 vector where a same-UID attacker rolls
277/// `client_id` faster than expiry to grow the map unbounded. 64 is well
278/// above any plausible TUI fleet (one TUI per operator + a handful of
279/// metric scrapers); legitimate callers renewing the same `client_id`
280/// REPLACE rather than insert and therefore don't bump the count.
281pub const LEASE_TABLE_CAP: usize = 64;
282
283/// Hard cap on `client_id` length accepted by `lease_apply`. The TUI
284/// uses `top:<pid>:<8 hex chars>` ≤ 24 bytes; 64 leaves headroom for
285/// other operator-side scrapers while keeping the lease table's per-
286/// entry memory bounded and the audit-log lease_id column small.
287pub const LEASE_CLIENT_ID_MAX_BYTES: usize = 64;
288
289/// Outcome of [`Aggregator::lease_apply`]. The success arm returns the
290/// `(previous_effective, new_effective)` pair the caller can use to
291/// decide whether to emit a `MetricDetailChanged` audit event; the
292/// failure arms surface bounded-input rejections.
293#[derive(Clone, Copy, Debug, PartialEq, Eq)]
294pub enum LeaseApplyOutcome {
295    /// Lease inserted / renewed.
296    Applied {
297        previous_effective: MetricDetailLevel,
298        new_effective: MetricDetailLevel,
299    },
300    /// `client_id` length exceeds [`LEASE_CLIENT_ID_MAX_BYTES`].
301    ClientIdTooLong,
302    /// Lease table is at [`LEASE_TABLE_CAP`] and the request is a new
303    /// insert (not a renewal). Callers MUST surface this as a `FAILURE`
304    /// to the wire so the lessor can back off.
305    TableFull,
306    /// The requested TTL exceeds [`LEASE_TTL_MAX`]. This arm is
307    /// theoretically unreachable in the normal flow because the
308    /// dispatch site rejects out-of-range values before reaching the
309    /// aggregator; surfacing it explicitly catches callers that bypass
310    /// dispatch (proto fuzzing, future internal use) instead of
311    /// silently clamping their intent.
312    TtlOutOfRange,
313    /// A renewal request was presented with a [`PeerBinding`] that
314    /// disagrees with the existing lease's apply-time binding. Returned
315    /// only when the existing binding is fully known (per
316    /// [`PeerBinding::is_known`]); unknown-binding leases (no
317    /// `SO_PEERCRED` available, pre-binding callers) continue to
318    /// accept any renewer. Closes the same-UID `client_id`-collision
319    /// takeover where an attacker re-applies a lease against a
320    /// victim's id and replaces the binding to lock the victim out of
321    /// their own clear.
322    Unauthorized,
323}
324
325/// Master-populated peer binding stored alongside each lease so subsequent
326/// `clear` requests can be authorised against the apply-time owner. The
327/// binding pairs an OS pid (from `SO_PEERCRED` on Linux, captured by the
328/// master at command-socket accept time) with the master-side connection
329/// session ULID. A clear request must present BOTH values matching the
330/// apply-time binding. A binding of `(None, None)` ("unknown") at apply
331/// time means the connection had no peer credentials available — the
332/// worker then accepts any clear for that `client_id` to preserve compat
333/// with non-Linux callers and intermediate proxies. See the proto comment
334/// on `SetMetricDetail.peer_pid` for the trust model.
335#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
336pub struct PeerBinding {
337    pub pid: Option<i32>,
338    /// Session ULID rendered as a `u128` (the master's session anchor).
339    /// Stored as the raw u128 rather than the original `Ulid` to avoid
340    /// dragging that crate into the metrics-aggregator dependencies.
341    pub session_ulid: Option<u128>,
342}
343
344impl PeerBinding {
345    /// `true` when both halves are known. A fully-known binding is the
346    /// only one against which `clear` may be authorised; partial bindings
347    /// (one half `None`) degrade to "accept any clear" per the proto
348    /// contract on `SetMetricDetail.peer_pid` / `peer_session_ulid`.
349    pub fn is_known(&self) -> bool {
350        self.pid.is_some() && self.session_ulid.is_some()
351    }
352
353    /// True when `self` and `other` are compatible (same `pid` + same
354    /// `session_ulid`) AND `self.is_known()`. Used by `lease_clear` to
355    /// reject mismatched clears.
356    pub fn matches(&self, other: &PeerBinding) -> bool {
357        self.is_known() && self.pid == other.pid && self.session_ulid == other.session_ulid
358    }
359}
360
361/// One lease entry kept inside [`Aggregator::leases`]. Carries the
362/// requested cardinality level, the absolute expiry instant, and the
363/// master-supplied [`PeerBinding`] captured at apply time.
364#[derive(Clone, Copy, Debug)]
365struct LeaseEntry {
366    level: MetricDetailLevel,
367    expires_at: Instant,
368    binding: PeerBinding,
369}
370
371/// Outcome of [`Aggregator::lease_clear`].
372#[derive(Clone, Copy, Debug, PartialEq, Eq)]
373pub enum LeaseClearOutcome {
374    /// The lease was found, the binding matched (or was unknown at apply
375    /// time), and the entry has been removed. Carries the worker's
376    /// previous effective level so the caller can decide whether to
377    /// emit a `MetricDetailChanged` audit event.
378    Cleared {
379        previous_effective: MetricDetailLevel,
380    },
381    /// No lease was held by the requested `client_id`. Silent no-op.
382    NotFound,
383    /// A lease existed but the peer binding presented with the clear did
384    /// not match the apply-time binding. The lease is left intact. The
385    /// worker MUST surface this as a `FAILURE` response to discourage
386    /// guessing attacks against another operator's lease.
387    Unauthorized,
388}
389
390pub struct Aggregator {
391    /// appended to metric keys, usually "sozu-"
392    prefix: String,
393    /// gathers metrics and sends them on a UDP socket
394    network: Option<NetworkDrain>,
395    /// gather metrics locally, queried by the CLI
396    local: LocalDrain,
397    /// Static cardinality knob — set once at boot from `MetricsConfig.detail`.
398    /// Filters `(cluster_id, backend_id)` labels at emission time. Each level
399    /// is a SUPERSET of the previous one (`Process ⊆ Frontend ⊆ Cluster ⊆ Backend`).
400    configured: MetricDetailLevel,
401    /// Effective cardinality knob actually applied at emission. Equal to
402    /// `max(configured, max(active leases))`. Recomputed only when leases
403    /// change, so the hot path (`receive_metric`) reads a single field.
404    effective: MetricDetailLevel,
405    /// Active TTL leases keyed by `client_id` from `SetMetricDetail`. A live
406    /// lease holds the worker's effective level at-or-above its requested
407    /// detail until it expires or is explicitly cleared. Multiple clients
408    /// (e.g. several `sozu top` sessions) lease independently.
409    leases: HashMap<String, LeaseEntry>,
410    /// Wall-clock anchor for the polled lease janitor. Updated on every
411    /// `lease_tick` call regardless of whether expiry happened, so the
412    /// caller's "is it time to tick?" check stays cheap.
413    last_lease_tick: Instant,
414}
415
416impl Aggregator {
417    pub fn new(prefix: String) -> Aggregator {
418        let default_detail = MetricDetailLevel::default();
419        Aggregator {
420            prefix: prefix.clone(),
421            network: None,
422            local: LocalDrain::new(prefix),
423            configured: default_detail,
424            effective: default_detail,
425            leases: HashMap::new(),
426            last_lease_tick: Instant::now(),
427        }
428    }
429
430    pub fn set_up_prefix(&mut self, prefix: String) {
431        self.prefix = prefix;
432    }
433
434    pub fn set_up_remote(&mut self, socket: UdpSocket, addr: SocketAddr) {
435        self.network = Some(NetworkDrain::new(self.prefix.clone(), socket, addr));
436    }
437
438    pub fn set_up_origin(&mut self, origin: String) {
439        if let Some(n) = self.network.as_mut() {
440            n.origin = origin;
441        }
442    }
443
444    pub fn set_up_tagged_metrics(&mut self, tagged: bool) {
445        if let Some(n) = self.network.as_mut() {
446            n.use_tagged_metrics = tagged;
447        }
448    }
449
450    /// Set the static cardinality floor (`MetricsConfig.detail` from the TOML
451    /// configuration). Live leases applied via [`Self::lease_apply`] can
452    /// elevate the effective level at runtime; the configured floor is the
453    /// lower bound the worker falls back to when no lease is active.
454    ///
455    /// See [`MetricDetailLevel`] and [`filter_labels_for_detail`] for the
456    /// per-level filtering rules.
457    pub fn set_up_detail(&mut self, detail: MetricDetailLevel) {
458        self.configured = detail;
459        self.recompute_effective();
460    }
461
462    /// Returns the static (configured) cardinality floor. Independent of
463    /// runtime leases.
464    pub fn detail_configured(&self) -> MetricDetailLevel {
465        self.configured
466    }
467
468    /// Returns the cardinality level actually applied to emissions. Equal to
469    /// `max(configured, max(active leases))`.
470    pub fn detail_effective(&self) -> MetricDetailLevel {
471        self.effective
472    }
473
474    /// Apply or renew a runtime cardinality lease for `client_id`. If a lease
475    /// for the same client already exists it is REPLACED (used for renewals).
476    /// `ttl` values above [`LEASE_TTL_MAX`] are **rejected** with
477    /// [`LeaseApplyOutcome::TtlOutOfRange`] (NOT clamped); callers must
478    /// handle that arm or pre-validate the TTL. On success the call returns
479    /// `(previous_effective, new_effective)` so callers can decide whether
480    /// to emit a `MetricDetailChanged` audit event.
481    ///
482    /// The proto contract on `SetMetricDetail.ttl_seconds` is that the worker
483    /// **rejects** out-of-range values with a `FAILURE` response — that
484    /// enforcement lives at the dispatch site in `lib/src/server.rs::notify`.
485    /// `lease_apply` itself returns [`LeaseApplyOutcome::TtlOutOfRange`]
486    /// when called with `ttl > LEASE_TTL_MAX` so callers that bypass the
487    /// dispatch site (proto fuzzing, future internal use) see a loud
488    /// rejection instead of silently capped semantics. Same shape for
489    /// over-long `client_id` and a full lease table — see
490    /// [`LeaseApplyOutcome`] for the failure arms.
491    pub fn lease_apply(
492        &mut self,
493        client_id: String,
494        level: MetricDetailLevel,
495        ttl: Duration,
496        binding: PeerBinding,
497    ) -> LeaseApplyOutcome {
498        if client_id.len() > LEASE_CLIENT_ID_MAX_BYTES {
499            return LeaseApplyOutcome::ClientIdTooLong;
500        }
501        if ttl > LEASE_TTL_MAX {
502            return LeaseApplyOutcome::TtlOutOfRange;
503        }
504        // Cap the table size BEFORE the insert, but only when the caller
505        // is inserting a fresh entry. Renewals (same `client_id` already
506        // present) REPLACE the existing entry and therefore keep the
507        // count stable — they must always succeed so an active operator
508        // never loses their lease just because the table is full.
509        let is_renewal = self.leases.contains_key(&client_id);
510        if !is_renewal && self.leases.len() >= LEASE_TABLE_CAP {
511            return LeaseApplyOutcome::TableFull;
512        }
513        // Renewal-binding gate: when a lease already exists for this
514        // `client_id` and its apply-time binding is fully known, the
515        // renewer's presented binding MUST match. Without this check
516        // any same-UID caller that learns the `client_id` (PID from
517        // /proc, suffix from audit log) could re-apply against it,
518        // overwriting the binding to point at the attacker's session
519        // and then driving the victim's Drop-time `clear` into
520        // `Unauthorized`. Unknown apply-time bindings continue to
521        // accept any renewer per the proto contract on
522        // `SetMetricDetail.peer_pid` / `peer_session_ulid`.
523        if is_renewal
524            && let Some(entry) = self.leases.get(&client_id)
525            && entry.binding.is_known()
526            && !entry.binding.matches(&binding)
527        {
528            return LeaseApplyOutcome::Unauthorized;
529        }
530        let expires_at = Instant::now() + ttl;
531        self.leases.insert(
532            client_id,
533            LeaseEntry {
534                level,
535                expires_at,
536                binding,
537            },
538        );
539        let previous_effective = self.effective;
540        self.recompute_effective();
541        LeaseApplyOutcome::Applied {
542            previous_effective,
543            new_effective: self.effective,
544        }
545    }
546
547    /// Explicitly release a lease keyed by `client_id`. The clear is
548    /// authorised against the apply-time [`PeerBinding`] when one was
549    /// recorded — see [`LeaseClearOutcome`] for the three result states.
550    /// A clear request with `presented = PeerBinding::default()` matches
551    /// only leases whose apply-time binding was also unknown, preserving
552    /// compat with pre-binding callers and platforms without
553    /// `SO_PEERCRED`.
554    pub fn lease_clear(&mut self, client_id: &str, presented: PeerBinding) -> LeaseClearOutcome {
555        let Some(entry) = self.leases.get(client_id) else {
556            return LeaseClearOutcome::NotFound;
557        };
558        // If the apply-time binding is fully known we MUST authorise the
559        // clear against it; presenting `default()` (an empty binding) is
560        // a mismatch. If the apply-time binding is unknown (a pre-binding
561        // caller, or a non-Linux peer with no credentials), we permit
562        // any clear — there is nothing to authorise against.
563        if entry.binding.is_known() && !entry.binding.matches(&presented) {
564            return LeaseClearOutcome::Unauthorized;
565        }
566        self.leases.remove(client_id);
567        let previous = self.effective;
568        self.recompute_effective();
569        LeaseClearOutcome::Cleared {
570            previous_effective: previous,
571        }
572    }
573
574    /// Returns the number of active (non-expired-as-of-last-tick) leases.
575    /// Surfaced in `WorkerMetricDetailStatus` so the TUI can show
576    /// "another client is still leasing this level".
577    pub fn lease_count(&self) -> u32 {
578        self.leases.len() as u32
579    }
580
581    /// Polled lease-expiry janitor. Called from the worker's `notify` loop
582    /// (and from periodic timers); cheap when nothing has expired. Returns
583    /// `Some(previous_effective)` when at least one lease expired AND that
584    /// expiry actually moved the effective level (so the caller can emit a
585    /// `MetricDetailChanged` audit event), or `None` for the no-change path.
586    ///
587    /// `now` is parameterised so unit tests can advance the clock
588    /// deterministically without sleeping.
589    pub fn lease_tick(&mut self, now: Instant) -> Option<MetricDetailLevel> {
590        self.last_lease_tick = now;
591        let before = self.leases.len();
592        self.leases.retain(|_, entry| entry.expires_at > now);
593        if self.leases.len() == before {
594            return None;
595        }
596        let previous = self.effective;
597        self.recompute_effective();
598        if previous != self.effective {
599            Some(previous)
600        } else {
601            None
602        }
603    }
604
605    /// True when at least [`LEASE_TICK_INTERVAL`] has passed since the last
606    /// `lease_tick`. Use to gate the polled janitor at the top of `notify`
607    /// without paying a HashMap walk on every event-loop iteration.
608    pub fn lease_tick_due(&self, now: Instant) -> bool {
609        now.duration_since(self.last_lease_tick) >= LEASE_TICK_INTERVAL
610    }
611
612    /// Recompute `effective = max(configured, max(active leases))`. Cheap (one
613    /// linear pass over the lease table) and only called on apply/clear/tick,
614    /// never on the metric-emission hot path.
615    fn recompute_effective(&mut self) {
616        let mut max_lease = self.configured;
617        for entry in self.leases.values() {
618            if entry.level > max_lease {
619                max_lease = entry.level;
620            }
621        }
622        self.effective = max_lease;
623    }
624
625    pub fn socket(&self) -> Option<&UdpSocket> {
626        self.network.as_ref().map(|n| &n.remote.get_ref().socket)
627    }
628
629    pub fn socket_mut(&mut self) -> Option<&mut UdpSocket> {
630        self.network
631            .as_mut()
632            .map(|n| &mut n.remote.get_mut().socket)
633    }
634
635    pub fn count_add(&mut self, key: &'static str, count_value: i64) {
636        self.receive_metric(key, None, None, MetricValue::Count(count_value));
637    }
638
639    pub fn set_gauge(&mut self, key: &'static str, gauge_value: usize) {
640        self.receive_metric(key, None, None, MetricValue::Gauge(gauge_value));
641    }
642
643    pub fn gauge_add(&mut self, key: &'static str, gauge_value: i64) {
644        self.receive_metric(key, None, None, MetricValue::GaugeAdd(gauge_value));
645    }
646
647    pub fn writable(&mut self) {
648        if let Some(ref mut net) = self.network.as_mut() {
649            net.writable();
650        }
651    }
652
653    pub fn send_data(&mut self) {
654        if let Some(ref mut net) = self.network.as_mut() {
655            net.send_metrics();
656        }
657    }
658
659    pub fn dump_local_proxy_metrics(&mut self) -> BTreeMap<String, FilteredMetrics> {
660        self.local.dump_proxy_metrics(&Vec::new())
661    }
662
663    pub fn query(&mut self, q: &QueryMetricsOptions) -> Result<ResponseContent, MetricError> {
664        self.local.query(q)
665    }
666
667    pub fn clear_local(&mut self) {
668        if let Some(ref mut net) = self.network.as_mut() {
669            net.clear();
670        }
671        self.local.clear();
672    }
673
674    pub fn configure(&mut self, config: &MetricsConfiguration) {
675        self.local.configure(config);
676    }
677
678    /// Drop all metric storage for a cluster across BOTH drains and arm
679    /// the per-drain tombstone so subsequent emissions for the cluster are
680    /// dropped on the floor instead of resurrecting the row via
681    /// `entry().or_default()`. Called from the worker IPC dispatch on
682    /// [`RequestType::RemoveCluster`]. Network-side draining the queued
683    /// `MetricLine`s produces immediate silence on the wire (any unsent
684    /// statsd interval for the cluster is discarded).
685    ///
686    /// Worker-event-loop only — `METRICS` is a `thread_local!`, and the
687    /// mutable borrow taken here must not be re-entered from a session-
688    /// bound code path. Calling this from a metrics emission site would
689    /// deadlock the thread-local on `borrow_mut`.
690    pub fn remove_cluster(&mut self, cluster_id: &str) {
691        if let Some(ref mut net) = self.network.as_mut() {
692            net.remove_cluster(cluster_id);
693        }
694        self.local.remove_cluster(cluster_id);
695    }
696
697    /// Re-arm a previously-removed cluster id across BOTH drains. Called
698    /// from the worker IPC dispatch on [`RequestType::AddCluster`].
699    /// Idempotent on ids that were never removed. Without this hook a
700    /// cluster removed then re-added would stay tombstoned forever and
701    /// every fresh metric emission would be dropped.
702    pub fn add_cluster(&mut self, cluster_id: &str) {
703        if let Some(ref mut net) = self.network.as_mut() {
704            net.add_cluster(cluster_id);
705        }
706        self.local.add_cluster(cluster_id);
707    }
708
709    /// Drop all metric storage for one backend across BOTH drains. Called
710    /// from the worker IPC dispatch on [`RequestType::RemoveBackend`].
711    /// Does NOT tombstone the cluster (only `remove_cluster` does).
712    pub fn remove_backend(&mut self, cluster_id: &str, backend_id: &str) {
713        if let Some(ref mut net) = self.network.as_mut() {
714            net.remove_backend(cluster_id, backend_id);
715        }
716        self.local.remove_backend(cluster_id, backend_id);
717    }
718}
719
720impl Subscriber for Aggregator {
721    fn receive_metric(
722        &mut self,
723        label: &'static str,
724        cluster_id: Option<&str>,
725        backend_id: Option<&str>,
726        metric: MetricValue,
727    ) {
728        // Apply the cardinality knob BEFORE handing the metric to either
729        // drain. Both drains see the same filtered labels, keeping the local
730        // CLI view consistent with what statsd receives on the wire. Reads
731        // `effective` (max of the configured floor and any active leases),
732        // which is recomputed off the hot path.
733        let (cluster_id, backend_id) =
734            filter_labels_for_detail(self.effective, cluster_id, backend_id);
735        if let Some(ref mut net) = self.network.as_mut() {
736            net.receive_metric(label, cluster_id, backend_id, metric.to_owned());
737        }
738        self.local
739            .receive_metric(label, cluster_id, backend_id, metric);
740    }
741}
742
743pub struct MetricSocket {
744    pub addr: SocketAddr,
745    pub socket: UdpSocket,
746}
747
748impl Write for MetricSocket {
749    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
750        self.socket.send_to(buf, self.addr)
751    }
752
753    fn flush(&mut self) -> io::Result<()> {
754        Ok(())
755    }
756}
757
758pub fn udp_bind() -> Result<UdpSocket, MetricError> {
759    let address = "0.0.0.0:0";
760
761    let udp_address =
762        address
763            .parse::<SocketAddr>()
764            .map_err(|parse_error| MetricError::WrongUdpAddress {
765                address: address.to_owned(),
766                error: parse_error.to_string(),
767            })?;
768
769    UdpSocket::bind(udp_address).map_err(|parse_error| MetricError::UdpBind {
770        address: udp_address.to_string(),
771        error: parse_error.to_string(),
772    })
773}
774
775/// adds a value to a counter
776#[macro_export]
777macro_rules! count (
778  ($key:expr, $value: expr) => ({
779    let v = $value;
780    $crate::metrics::METRICS.with(|metrics| {
781      (*metrics.borrow_mut()).count_add($key, v);
782    });
783  })
784);
785
786/// adds 1 to a counter
787#[macro_export]
788macro_rules! incr (
789  ($key:expr) => (count!($key, 1));
790  ($key:expr, $cluster_id:expr, $backend_id:expr) => {
791    {
792        use $crate::metrics::Subscriber;
793
794        $crate::metrics::METRICS.with(|metrics| {
795          (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Count(1));
796        });
797    }
798  }
799);
800
801#[macro_export]
802macro_rules! decr (
803  ($key:expr) => (count!($key, -1))
804);
805
806#[macro_export]
807macro_rules! gauge (
808  ($key:expr, $value: expr) => ({
809    let v = $value;
810    $crate::metrics::METRICS.with(|metrics| {
811      (*metrics.borrow_mut()).set_gauge($key, v);
812    });
813  });
814  ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
815    {
816        use $crate::metrics::Subscriber;
817        let v = $value;
818
819        $crate::metrics::METRICS.with(|metrics| {
820          (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Gauge(v as usize));
821        });
822    }
823  }
824);
825
826#[macro_export]
827macro_rules! gauge_add (
828  ($key:expr, $value: expr) => ({
829    let v = $value;
830    $crate::metrics::METRICS.with(|metrics| {
831      (*metrics.borrow_mut()).gauge_add($key, v);
832    });
833  });
834  ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
835    {
836        use $crate::metrics::Subscriber;
837        let v = $value;
838
839        $crate::metrics::METRICS.with(|metrics| {
840          (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::GaugeAdd(v));
841        });
842    }
843  }
844);
845
846#[macro_export]
847macro_rules! time (
848  ($key:expr, $value: expr) => ({
849    use $crate::metrics::{MetricValue,Subscriber};
850    let v = $value;
851    $crate::metrics::METRICS.with(|metrics| {
852      let m = &mut *metrics.borrow_mut();
853
854      m.receive_metric($key, None, None, MetricValue::Time(v as usize));
855    });
856  });
857  ($key:expr, $cluster_id:expr, $value: expr) => ({
858    use $crate::metrics::{MetricValue,Subscriber};
859    let v = $value;
860    $crate::metrics::METRICS.with(|metrics| {
861      let m = &mut *metrics.borrow_mut();
862      let cluster: &str = $cluster_id;
863
864      m.receive_metric($key, Some(cluster), None, MetricValue::Time(v as usize));
865    });
866  })
867);
868
869#[macro_export]
870macro_rules! record_backend_metrics (
871  ($cluster_id:expr, $backend_id:expr, $response_time: expr, $backend_connection_time: expr, $bin: expr, $bout: expr) => {
872    use $crate::metrics::{MetricValue,Subscriber};
873    $crate::metrics::METRICS.with(|metrics| {
874      let m = &mut *metrics.borrow_mut();
875      let cluster_id: &str = $cluster_id;
876      let backend_id: &str = $backend_id;
877
878      m.receive_metric($crate::metrics::names::backend::BYTES_IN, Some(cluster_id), Some(backend_id), MetricValue::Count($bin as i64));
879      m.receive_metric($crate::metrics::names::backend::BYTES_OUT, Some(cluster_id), Some(backend_id), MetricValue::Count($bout as i64));
880      m.receive_metric($crate::metrics::names::backend::RESPONSE_TIME, Some(cluster_id), Some(backend_id), MetricValue::Time($response_time as usize));
881      if let Some(t) = $backend_connection_time {
882        m.receive_metric($crate::metrics::names::backend::CONNECTION_TIME, Some(cluster_id), Some(backend_id), MetricValue::Time(t.as_millis() as usize));
883      }
884
885      m.receive_metric($crate::metrics::names::backend::REQUESTS, Some(cluster_id), Some(backend_id), MetricValue::Count(1));
886    });
887  }
888);
889
890#[cfg(test)]
891mod tests {
892    use super::*;
893
894    #[test]
895    fn filter_labels_process_drops_both() {
896        assert_eq!(
897            filter_labels_for_detail(MetricDetailLevel::Process, Some("c"), Some("b")),
898            (None, None),
899        );
900    }
901
902    #[test]
903    fn filter_labels_frontend_drops_both_today() {
904        // Reserved for per-listener counters; same as Process for now.
905        assert_eq!(
906            filter_labels_for_detail(MetricDetailLevel::Frontend, Some("c"), Some("b")),
907            (None, None),
908        );
909    }
910
911    #[test]
912    fn filter_labels_cluster_keeps_cluster_drops_backend() {
913        assert_eq!(
914            filter_labels_for_detail(MetricDetailLevel::Cluster, Some("c"), Some("b")),
915            (Some("c"), None),
916        );
917    }
918
919    #[test]
920    fn filter_labels_backend_keeps_both() {
921        assert_eq!(
922            filter_labels_for_detail(MetricDetailLevel::Backend, Some("c"), Some("b")),
923            (Some("c"), Some("b")),
924        );
925    }
926
927    #[test]
928    fn filter_labels_none_in_none_out() {
929        // Absent labels stay absent regardless of detail level — the filter
930        // never invents a label, only drops.
931        for detail in [
932            MetricDetailLevel::Process,
933            MetricDetailLevel::Frontend,
934            MetricDetailLevel::Cluster,
935            MetricDetailLevel::Backend,
936        ] {
937            assert_eq!(filter_labels_for_detail(detail, None, None), (None, None));
938        }
939    }
940
941    #[test]
942    fn aggregator_default_detail_is_cluster() {
943        // Pre-knob behaviour preserved: if a worker / process never calls
944        // `set_up_detail`, cluster-scoped metrics still reach the drains.
945        let agg = Aggregator::new("sozu".to_owned());
946        assert_eq!(agg.detail_configured(), MetricDetailLevel::Cluster);
947        assert_eq!(agg.detail_effective(), MetricDetailLevel::Cluster);
948        assert_eq!(agg.lease_count(), 0);
949    }
950
951    /// Fully-known binding used by tests that don't otherwise care about the
952    /// peer-binding mechanic. Two distinct values (`OWNER_*` / `OTHER_*`)
953    /// let `lease_clear` tests assert authorised vs unauthorised paths.
954    fn owner_binding() -> PeerBinding {
955        PeerBinding {
956            pid: Some(1234),
957            session_ulid: Some(0x0123_4567_89ab_cdef_0123_4567_89ab_cdefu128),
958        }
959    }
960
961    fn other_binding() -> PeerBinding {
962        PeerBinding {
963            pid: Some(5678),
964            session_ulid: Some(0xfedc_ba98_7654_3210_fedc_ba98_7654_3210u128),
965        }
966    }
967
968    /// Extract `(previous_effective, new_effective)` from a successful
969    /// `lease_apply`; panics on any failure arm so tests that don't care
970    /// about the failure paths stay compact.
971    fn unwrap_applied(outcome: LeaseApplyOutcome) -> (MetricDetailLevel, MetricDetailLevel) {
972        match outcome {
973            LeaseApplyOutcome::Applied {
974                previous_effective,
975                new_effective,
976            } => (previous_effective, new_effective),
977            other => panic!("expected LeaseApplyOutcome::Applied, got {other:?}"),
978        }
979    }
980
981    #[test]
982    fn lease_apply_elevates_effective_above_configured() {
983        // Configured floor stays at Cluster; a lease for Backend lifts the
984        // effective level until the lease expires or is cleared.
985        let mut agg = Aggregator::new("sozu".to_owned());
986        agg.set_up_detail(MetricDetailLevel::Cluster);
987        let (prev, new) = unwrap_applied(agg.lease_apply(
988            "test:1".to_owned(),
989            MetricDetailLevel::Backend,
990            Duration::from_secs(60),
991            PeerBinding::default(),
992        ));
993        assert_eq!(prev, MetricDetailLevel::Cluster);
994        assert_eq!(new, MetricDetailLevel::Backend);
995        assert_eq!(agg.detail_configured(), MetricDetailLevel::Cluster);
996        assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
997        assert_eq!(agg.lease_count(), 1);
998    }
999
1000    #[test]
1001    fn lease_apply_below_configured_does_not_lower_effective() {
1002        // A lease can only ELEVATE the floor, never push below `configured`.
1003        let mut agg = Aggregator::new("sozu".to_owned());
1004        agg.set_up_detail(MetricDetailLevel::Backend);
1005        let (prev, new) = unwrap_applied(agg.lease_apply(
1006            "test:1".to_owned(),
1007            MetricDetailLevel::Cluster,
1008            Duration::from_secs(60),
1009            PeerBinding::default(),
1010        ));
1011        assert_eq!(prev, MetricDetailLevel::Backend);
1012        assert_eq!(new, MetricDetailLevel::Backend);
1013    }
1014
1015    #[test]
1016    fn lease_apply_rejects_client_id_over_cap() {
1017        // Defence-in-depth: even if dispatch lets a too-long id through,
1018        // the aggregator refuses to store it.
1019        let mut agg = Aggregator::new("sozu".to_owned());
1020        let too_long = "x".repeat(LEASE_CLIENT_ID_MAX_BYTES + 1);
1021        assert_eq!(
1022            agg.lease_apply(
1023                too_long,
1024                MetricDetailLevel::Backend,
1025                Duration::from_secs(60),
1026                PeerBinding::default(),
1027            ),
1028            LeaseApplyOutcome::ClientIdTooLong
1029        );
1030        assert_eq!(agg.lease_count(), 0);
1031    }
1032
1033    #[test]
1034    fn lease_apply_rejects_when_table_is_full() {
1035        // Fill the table to capacity with distinct client_ids; one more
1036        // insert is refused. A RENEWAL of an existing entry must still
1037        // succeed (replaces in place, count unchanged).
1038        let mut agg = Aggregator::new("sozu".to_owned());
1039        for i in 0..LEASE_TABLE_CAP {
1040            assert!(matches!(
1041                agg.lease_apply(
1042                    format!("client:{i:02}"),
1043                    MetricDetailLevel::Backend,
1044                    Duration::from_secs(60),
1045                    PeerBinding::default(),
1046                ),
1047                LeaseApplyOutcome::Applied { .. }
1048            ));
1049        }
1050        assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1051        // New distinct client → rejected.
1052        assert_eq!(
1053            agg.lease_apply(
1054                "newcomer".to_owned(),
1055                MetricDetailLevel::Backend,
1056                Duration::from_secs(60),
1057                PeerBinding::default(),
1058            ),
1059            LeaseApplyOutcome::TableFull,
1060        );
1061        assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1062        // Renewal of an existing entry → still accepted.
1063        assert!(matches!(
1064            agg.lease_apply(
1065                "client:00".to_owned(),
1066                MetricDetailLevel::Backend,
1067                Duration::from_secs(120),
1068                PeerBinding::default(),
1069            ),
1070            LeaseApplyOutcome::Applied { .. }
1071        ));
1072        assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1073    }
1074
1075    #[test]
1076    fn lease_apply_rejects_ttl_over_max() {
1077        // The aggregator no longer silently clamps oversize TTLs.
1078        let mut agg = Aggregator::new("sozu".to_owned());
1079        assert_eq!(
1080            agg.lease_apply(
1081                "client:0".to_owned(),
1082                MetricDetailLevel::Backend,
1083                LEASE_TTL_MAX + Duration::from_secs(1),
1084                PeerBinding::default(),
1085            ),
1086            LeaseApplyOutcome::TtlOutOfRange,
1087        );
1088        assert_eq!(agg.lease_count(), 0);
1089    }
1090
1091    #[test]
1092    fn lease_apply_renewal_replaces_previous_for_same_client() {
1093        // The renewer client re-sends with the same `client_id`; the entry
1094        // is REPLACED (not duplicated). Lease count stays at 1.
1095        // Unknown bindings on both sides skip the renewal-binding gate.
1096        let mut agg = Aggregator::new("sozu".to_owned());
1097        let _ = agg.lease_apply(
1098            "renewer".to_owned(),
1099            MetricDetailLevel::Backend,
1100            Duration::from_secs(30),
1101            PeerBinding::default(),
1102        );
1103        let _ = agg.lease_apply(
1104            "renewer".to_owned(),
1105            MetricDetailLevel::Backend,
1106            Duration::from_secs(60),
1107            PeerBinding::default(),
1108        );
1109        assert_eq!(agg.lease_count(), 1);
1110    }
1111
1112    #[test]
1113    fn lease_apply_renewal_rejects_foreign_binding() {
1114        // Same-UID `client_id` collision attack: the victim applies with
1115        // a known binding; an attacker that learns the `client_id`
1116        // attempts to renew under a different (pid, session_ulid). The
1117        // renewal must be refused so the victim's apply-time binding
1118        // remains the sole authoritative owner — both for subsequent
1119        // renewals AND for the victim's own Drop-time `clear`.
1120        let mut agg = Aggregator::new("sozu".to_owned());
1121        let victim = PeerBinding {
1122            pid: Some(4242),
1123            session_ulid: Some(0x0123_4567_89AB_CDEF_FEDC_BA98_7654_3210),
1124        };
1125        let outcome = agg.lease_apply(
1126            "topcli".to_owned(),
1127            MetricDetailLevel::Backend,
1128            Duration::from_secs(60),
1129            victim,
1130        );
1131        assert!(
1132            matches!(outcome, LeaseApplyOutcome::Applied { .. }),
1133            "victim's initial apply must succeed"
1134        );
1135        let attacker = PeerBinding {
1136            pid: Some(9999),
1137            session_ulid: Some(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF),
1138        };
1139        let outcome = agg.lease_apply(
1140            "topcli".to_owned(),
1141            MetricDetailLevel::Backend,
1142            Duration::from_secs(60),
1143            attacker,
1144        );
1145        assert_eq!(
1146            outcome,
1147            LeaseApplyOutcome::Unauthorized,
1148            "renewal with a mismatched known binding must be refused"
1149        );
1150        // The victim can still clear their own lease — proof the
1151        // refused attempt did not corrupt the stored binding.
1152        let clear = agg.lease_clear("topcli", victim);
1153        assert!(
1154            matches!(clear, LeaseClearOutcome::Cleared { .. }),
1155            "victim's original binding must still clear cleanly after \
1156             the foreign-binding renewal was refused"
1157        );
1158    }
1159
1160    #[test]
1161    fn lease_apply_renewal_with_matching_binding_succeeds() {
1162        // Symmetry case: the legitimate owner re-applies with the same
1163        // (pid, session_ulid). The renewal must succeed so the TUI's
1164        // own renewer thread keeps the lease alive across its TTL.
1165        let mut agg = Aggregator::new("sozu".to_owned());
1166        let owner = PeerBinding {
1167            pid: Some(1234),
1168            session_ulid: Some(0xAAAA_BBBB_CCCC_DDDD_EEEE_FFFF_0000_1111),
1169        };
1170        let _ = agg.lease_apply(
1171            "topcli".to_owned(),
1172            MetricDetailLevel::Backend,
1173            Duration::from_secs(30),
1174            owner,
1175        );
1176        let outcome = agg.lease_apply(
1177            "topcli".to_owned(),
1178            MetricDetailLevel::Backend,
1179            Duration::from_secs(60),
1180            owner,
1181        );
1182        assert!(
1183            matches!(outcome, LeaseApplyOutcome::Applied { .. }),
1184            "renewal with matching binding must succeed (otherwise the \
1185             TUI's own renewer thread would be locked out)"
1186        );
1187    }
1188
1189    #[test]
1190    fn lease_apply_max_merge_two_clients() {
1191        // Two clients, two levels: effective = max(both leases, configured).
1192        // Use `Process` as the floor so the Frontend lease is observable
1193        // after the Backend lease is cleared (otherwise the configured
1194        // Cluster floor would mask the Frontend lease).
1195        let mut agg = Aggregator::new("sozu".to_owned());
1196        agg.set_up_detail(MetricDetailLevel::Process);
1197        let _ = agg.lease_apply(
1198            "scraper".to_owned(),
1199            MetricDetailLevel::Frontend,
1200            Duration::from_secs(60),
1201            PeerBinding::default(),
1202        );
1203        let _ = agg.lease_apply(
1204            "topcli".to_owned(),
1205            MetricDetailLevel::Backend,
1206            Duration::from_secs(60),
1207            PeerBinding::default(),
1208        );
1209        assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1210        assert_eq!(agg.lease_count(), 2);
1211        // Clearing the higher lease drops effective back to the lower one.
1212        let outcome = agg.lease_clear("topcli", PeerBinding::default());
1213        assert_eq!(
1214            outcome,
1215            LeaseClearOutcome::Cleared {
1216                previous_effective: MetricDetailLevel::Backend,
1217            }
1218        );
1219        assert_eq!(agg.detail_effective(), MetricDetailLevel::Frontend);
1220        assert_eq!(agg.lease_count(), 1);
1221    }
1222
1223    #[test]
1224    fn lease_clear_unknown_id_is_silent_noop() {
1225        // Mismatched IDs are silently ignored (other clients' leases unaffected).
1226        let mut agg = Aggregator::new("sozu".to_owned());
1227        let _ = agg.lease_apply(
1228            "real".to_owned(),
1229            MetricDetailLevel::Backend,
1230            Duration::from_secs(60),
1231            PeerBinding::default(),
1232        );
1233        assert_eq!(
1234            agg.lease_clear("ghost", PeerBinding::default()),
1235            LeaseClearOutcome::NotFound
1236        );
1237        assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1238        assert_eq!(agg.lease_count(), 1);
1239    }
1240
1241    #[test]
1242    fn lease_clear_with_matching_binding_authorised() {
1243        // Apply with a known binding, clear with the same binding -> Cleared.
1244        let mut agg = Aggregator::new("sozu".to_owned());
1245        let _ = agg.lease_apply(
1246            "owner-lease".to_owned(),
1247            MetricDetailLevel::Backend,
1248            Duration::from_secs(60),
1249            owner_binding(),
1250        );
1251        let outcome = agg.lease_clear("owner-lease", owner_binding());
1252        assert!(matches!(outcome, LeaseClearOutcome::Cleared { .. }));
1253        assert_eq!(agg.lease_count(), 0);
1254    }
1255
1256    #[test]
1257    fn lease_clear_with_mismatched_binding_is_unauthorized() {
1258        // Apply with one binding, attempt clear with a different binding ->
1259        // Unauthorized; lease left intact.
1260        let mut agg = Aggregator::new("sozu".to_owned());
1261        let _ = agg.lease_apply(
1262            "owner-lease".to_owned(),
1263            MetricDetailLevel::Backend,
1264            Duration::from_secs(60),
1265            owner_binding(),
1266        );
1267        let outcome = agg.lease_clear("owner-lease", other_binding());
1268        assert_eq!(outcome, LeaseClearOutcome::Unauthorized);
1269        assert_eq!(agg.lease_count(), 1);
1270    }
1271
1272    #[test]
1273    fn lease_clear_unknown_apply_binding_accepts_any_clear() {
1274        // Pre-binding / non-Linux apply -> any clear authorised.
1275        let mut agg = Aggregator::new("sozu".to_owned());
1276        let _ = agg.lease_apply(
1277            "legacy".to_owned(),
1278            MetricDetailLevel::Backend,
1279            Duration::from_secs(60),
1280            PeerBinding::default(),
1281        );
1282        let outcome = agg.lease_clear("legacy", owner_binding());
1283        assert!(matches!(outcome, LeaseClearOutcome::Cleared { .. }));
1284        assert_eq!(agg.lease_count(), 0);
1285    }
1286
1287    #[test]
1288    fn lease_clear_known_apply_rejects_default_clear() {
1289        // Known apply binding -> a default ("unknown") clear is rejected.
1290        let mut agg = Aggregator::new("sozu".to_owned());
1291        let _ = agg.lease_apply(
1292            "owner-lease".to_owned(),
1293            MetricDetailLevel::Backend,
1294            Duration::from_secs(60),
1295            owner_binding(),
1296        );
1297        let outcome = agg.lease_clear("owner-lease", PeerBinding::default());
1298        assert_eq!(outcome, LeaseClearOutcome::Unauthorized);
1299    }
1300
1301    #[test]
1302    fn lease_tick_expires_only_past_due_leases() {
1303        // `lease_tick(now)` parameterises the clock so we can test expiry
1304        // without sleeping. Setup: one lease past due, one still active.
1305        // Use `Process` as the floor so the surviving Frontend lease drives
1306        // the effective level after the Backend lease expires (otherwise
1307        // the Cluster floor would mask it).
1308        let mut agg = Aggregator::new("sozu".to_owned());
1309        agg.set_up_detail(MetricDetailLevel::Process);
1310        let now = Instant::now();
1311        // Inject directly into the table to control expires_at deterministically.
1312        agg.leases.insert(
1313            "expired".to_owned(),
1314            LeaseEntry {
1315                level: MetricDetailLevel::Backend,
1316                expires_at: now - Duration::from_secs(1),
1317                binding: PeerBinding::default(),
1318            },
1319        );
1320        agg.leases.insert(
1321            "live".to_owned(),
1322            LeaseEntry {
1323                level: MetricDetailLevel::Frontend,
1324                expires_at: now + Duration::from_secs(60),
1325                binding: PeerBinding::default(),
1326            },
1327        );
1328        agg.recompute_effective();
1329        assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1330        let prev = agg.lease_tick(now);
1331        assert_eq!(prev, Some(MetricDetailLevel::Backend));
1332        assert_eq!(agg.detail_effective(), MetricDetailLevel::Frontend);
1333        assert_eq!(agg.lease_count(), 1);
1334    }
1335
1336    #[test]
1337    fn lease_tick_no_change_returns_none() {
1338        // No leases -> no-op tick, no audit signal.
1339        let mut agg = Aggregator::new("sozu".to_owned());
1340        assert!(agg.lease_tick(Instant::now()).is_none());
1341    }
1342
1343    #[test]
1344    fn lease_apply_at_max_ttl_succeeds() {
1345        // Boundary: exactly LEASE_TTL_MAX is allowed; LEASE_TTL_MAX + 1ns is
1346        // rejected (covered by lease_apply_rejects_ttl_over_max above).
1347        let mut agg = Aggregator::new("sozu".to_owned());
1348        let now = Instant::now();
1349        let outcome = agg.lease_apply(
1350            "max".to_owned(),
1351            MetricDetailLevel::Backend,
1352            LEASE_TTL_MAX,
1353            PeerBinding::default(),
1354        );
1355        assert!(matches!(outcome, LeaseApplyOutcome::Applied { .. }));
1356        let entry = agg.leases.get("max").unwrap();
1357        assert!(entry.expires_at <= now + LEASE_TTL_MAX + Duration::from_millis(50));
1358    }
1359}