Skip to main content

sozu_lib/metrics/
mod.rs

1//! Metrics façade.
2//!
3//! Defines the per-thread `Aggregator`, the `incr!`/`count!`/`gauge!`/
4//! `gauge_add!`/`time!` macros consumed across the lib + bin crates, the
5//! local-vs-network drain dispatch, and the label allow/deny filtering
6//! used to keep cardinality bounded. Gauge underflow is treated as a
7//! correctness bug (saturating clamp + warn log), not a rounding artefact.
8
9mod local_drain;
10pub mod names;
11mod network_drain;
12mod writer;
13
14use std::{
15    cell::RefCell,
16    collections::{BTreeMap, HashMap},
17    io::{self, Write},
18    net::SocketAddr,
19    str,
20    time::{Duration, Instant},
21};
22
23use mio::net::UdpSocket;
24use sozu_command::config::MetricDetailLevel;
25use sozu_command::proto::command::{
26    FilteredMetrics, MetricsConfiguration, QueryMetricsOptions, ResponseContent,
27};
28
29use crate::metrics::{local_drain::LocalDrain, network_drain::NetworkDrain};
30
31/// Filter `(cluster_id, backend_id)` labels at emission time according to the
32/// configured [`MetricDetailLevel`]. Each level is a SUPERSET of the previous
33/// in cardinality terms:
34///
35/// - `Process`: drop both labels — proxy-only counters (smallest keyspace).
36/// - `Frontend`: same as `Process` today; reserved for the per-listener
37///   counters tracked as a follow-up. Listed in the proto + config so
38///   operators can opt in once per-listener labels land.
39/// - `Cluster`: keep `cluster_id`, drop `backend_id` — current default.
40/// - `Backend`: keep both — current historical behaviour.
41///
42/// Pure function so the filter can be unit-tested exhaustively without the
43/// drain machinery in scope.
44fn filter_labels_for_detail<'a>(
45    detail: MetricDetailLevel,
46    cluster_id: Option<&'a str>,
47    backend_id: Option<&'a str>,
48) -> (Option<&'a str>, Option<&'a str>) {
49    let (out_cluster, out_backend) = match detail {
50        MetricDetailLevel::Process | MetricDetailLevel::Frontend => (None, None),
51        MetricDetailLevel::Cluster => (cluster_id, None),
52        MetricDetailLevel::Backend => (cluster_id, backend_id),
53    };
54    // The filter only ever DROPS labels: an absent input can never become a
55    // present output, and a kept output must be the exact same borrow we
56    // were handed (no synthesised label). This is the cardinality-safety
57    // post-condition that keeps the local CLI view and the wire consistent.
58    debug_assert!(
59        cluster_id.is_some() || out_cluster.is_none(),
60        "filter must never invent a cluster_id"
61    );
62    debug_assert!(
63        backend_id.is_some() || out_backend.is_none(),
64        "filter must never invent a backend_id"
65    );
66    debug_assert!(
67        out_backend.is_none() || out_cluster.is_some(),
68        "a kept backend label implies a kept cluster label (no orphan backend)"
69    );
70    (out_cluster, out_backend)
71}
72
73/// Map a numeric HTTP status code to its dedicated counter name, if any.
74///
75/// Returns `Some("http.status.<code>")` for the eighteen status codes Sōzu
76/// either generates as a default answer or that operators routinely chart
77/// (`200/201/204`, `301/302/304`, `400/401/403/404/408/413/429`, plus the
78/// `5xx` family Sōzu can synthesise — `500/502/503/504/507`). All other
79/// codes return `None` so the bucket counter (`http.status.{1xx,…,other}`)
80/// remains the sole emission and metric cardinality stays bounded.
81///
82/// Hoisted out of the protocol modules so H1 (`kawa_h1::save_http_status_metric`)
83/// and H2 (`mux::stream::generate_access_log`) cannot drift on which codes
84/// get a per-code counter.
85pub(crate) fn http_status_code_metric_name(status: u16) -> Option<&'static str> {
86    match status {
87        200 => Some("http.status.200"),
88        201 => Some("http.status.201"),
89        204 => Some("http.status.204"),
90        301 => Some("http.status.301"),
91        302 => Some("http.status.302"),
92        304 => Some("http.status.304"),
93        400 => Some("http.status.400"),
94        401 => Some("http.status.401"),
95        403 => Some("http.status.403"),
96        404 => Some("http.status.404"),
97        408 => Some("http.status.408"),
98        413 => Some("http.status.413"),
99        429 => Some("http.status.429"),
100        500 => Some("http.status.500"),
101        502 => Some("http.status.502"),
102        503 => Some("http.status.503"),
103        504 => Some("http.status.504"),
104        507 => Some("http.status.507"),
105        _ => None,
106    }
107}
108
109thread_local! {
110  pub static METRICS: RefCell<Aggregator> = RefCell::new(Aggregator::new(String::from("sozu")));
111}
112
113#[derive(thiserror::Error, Debug)]
114pub enum MetricError {
115    #[error("Could not parse udp address {address}: {error}")]
116    WrongUdpAddress { address: String, error: String },
117    #[error("Could not bind to udp address {address}: {error}")]
118    UdpBind { address: String, error: String },
119    #[error("No metrics found for object with id {0}")]
120    NoMetrics(String),
121    #[error("Could not create histogram for time metric {time_metric:?}: {error}")]
122    HistogramCreation {
123        time_metric: MetricValue,
124        error: String,
125    },
126    #[error("could not record time metric {time_metric:?}: {error}")]
127    TimeMetricRecordingError {
128        time_metric: MetricValue,
129        error: String,
130    },
131}
132
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum MetricValue {
135    Gauge(usize),
136    GaugeAdd(i64),
137    Count(i64),
138    Time(usize),
139}
140
141impl MetricValue {
142    fn is_time(&self) -> bool {
143        matches!(self, &MetricValue::Time(_))
144    }
145
146    fn update(&mut self, key: &'static str, m: MetricValue) -> bool {
147        match (self, m) {
148            (&mut MetricValue::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
149                // `changed` reflects the absolute set: true iff the stored
150                // value actually moves. Pair-assert the post-state matches
151                // the requested value and that `changed` is the equality
152                // relationship between before and after.
153                let before = *v1;
154                let changed = *v1 != v2;
155                *v1 = v2;
156                debug_assert_eq!(*v1, v2, "gauge set must store exactly the requested value");
157                debug_assert_eq!(
158                    changed,
159                    before != v2,
160                    "`changed` must report whether the gauge actually moved"
161                );
162                changed
163            }
164            (&mut MetricValue::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
165                // Saturating clamp + `error!` log on underflow, symmetric
166                // with `AggregatedMetric::update` in `local_drain.rs`. The
167                // `debug_assert!` that lived here previously made debug
168                // builds panic while release silently clamped; H2
169                // `ConnectionH2::Drop` rebalance + the saturating clamps
170                // make underflow structurally impossible from live code,
171                // and a panic on a metric mismatch is too aggressive — the
172                // log line names the offending key for observability.
173                //
174                // INVARIANT (the canonical gauge-underflow guard): the post
175                // value is non-negative, equals `before + v2` on the
176                // non-underflow path, and is exactly the `0` floor whenever
177                // `before + v2` would have gone negative. We assert the
178                // *post-condition* (never panic the recovery path) rather
179                // than a `before >= -v2` pre-condition, because a real
180                // underflow is recovered (clamp + log) by design.
181                let before = *v1;
182                let changed = v2 != 0;
183                let res = before as i64 + v2;
184                *v1 = if res >= 0 {
185                    res as usize
186                } else {
187                    error!(
188                        "metric {} underflow: previous value: {}, adding: {}",
189                        key, before, v2
190                    );
191                    0
192                };
193                debug_assert!(
194                    res >= 0 || *v1 == 0,
195                    "gauge underflow must clamp to exactly 0 (never wrap)"
196                );
197                debug_assert!(
198                    res < 0 || *v1 as i64 == before as i64 + v2,
199                    "non-underflow gauge add must equal before + v2 exactly"
200                );
201
202                changed
203            }
204            (&mut MetricValue::Count(ref mut v1), MetricValue::Count(v2)) => {
205                // Counter delta: `v2` may be negative (e.g. `decr!`), so the
206                // monotonic-between-resets invariant is encoded as the exact
207                // delta — the post value equals the pre value plus `v2`.
208                let before = *v1;
209                let changed = v2 != 0;
210                *v1 += v2;
211                debug_assert_eq!(*v1, before + v2, "count update must advance by exactly v2");
212                changed
213            }
214            (s, m) => {
215                // Symmetric with `AggregatedMetric::update` in `local_drain.rs`:
216                // a type-mismatch is a coding bug (a `metric_name` was
217                // re-emitted with a different `MetricValue` variant), not a
218                // network-driven event, but log-and-continue is still the
219                // right policy — taking the worker process down on a metric
220                // bookkeeping mismatch is too aggressive for a single-bug
221                // recovery path. The log line names the offending key.
222                error!(
223                    "tried to update metric {} of value {:?} with an incompatible metric: {:?}",
224                    key, s, m
225                );
226                false
227            }
228        }
229    }
230}
231
232#[derive(Debug, Clone)]
233pub struct StoredMetricValue {
234    last_sent: Instant,
235    updated: bool,
236    data: MetricValue,
237}
238
239impl StoredMetricValue {
240    pub fn new(last_sent: Instant, data: MetricValue) -> StoredMetricValue {
241        // Symmetric with `AggregatedMetric::new`: a first emission with a
242        // negative `GaugeAdd` is a coding bug (a `-1` ran with no paired
243        // `+1` earlier in the worker's lifetime); clamp to 0 and log so the
244        // offending pattern shows up in operator logs alongside the
245        // local-drain side.
246        let data = if let MetricValue::GaugeAdd(v) = data {
247            if v >= 0 {
248                MetricValue::Gauge(v as usize)
249            } else {
250                error!(
251                    "stored metric created with negative GaugeAdd({}), clamping to 0",
252                    v
253                );
254                MetricValue::Gauge(0)
255            }
256        } else {
257            data
258        };
259        // Post-conditions: a `GaugeAdd` seed never survives as `GaugeAdd`
260        // (it is normalised to a non-negative `Gauge`), and a fresh stored
261        // value is always flagged `updated` so the next drain emits it.
262        debug_assert!(
263            !matches!(data, MetricValue::GaugeAdd(_)),
264            "GaugeAdd must be normalised to a non-negative Gauge at construction"
265        );
266        StoredMetricValue {
267            last_sent,
268            updated: true,
269            data,
270        }
271    }
272
273    pub fn update(&mut self, key: &'static str, m: MetricValue) {
274        // The `updated` flag is a monotone latch within one drain cycle: it
275        // may only go from false→true here (the drain resets it to false on
276        // emit). So once set, it stays set; and if this update reports a
277        // change, the flag must be set afterwards.
278        let was_updated = self.updated;
279        let updated = self.data.update(key, m);
280        if !self.updated {
281            self.updated = updated;
282        }
283        debug_assert!(
284            self.updated || (!was_updated && !updated),
285            "`updated` must stay set once latched; a reported change must latch it"
286        );
287        debug_assert!(
288            !was_updated || self.updated,
289            "`updated` is a monotone latch within a drain cycle (never cleared here)"
290        );
291    }
292}
293
294pub fn setup<O: Into<String>>(
295    metrics_host: &SocketAddr,
296    origin: O,
297    use_tagged_metrics: bool,
298    prefix: Option<String>,
299    detail: MetricDetailLevel,
300) -> Result<(), MetricError> {
301    let metrics_socket = udp_bind()?;
302
303    debug!(
304        "setting up metrics: local address = {:#?}",
305        metrics_socket.local_addr()
306    );
307
308    METRICS.with(|metrics| {
309        if let Some(p) = prefix {
310            (*metrics.borrow_mut()).set_up_prefix(p);
311        }
312        (*metrics.borrow_mut()).set_up_remote(metrics_socket, *metrics_host);
313        (*metrics.borrow_mut()).set_up_origin(origin.into());
314        (*metrics.borrow_mut()).set_up_tagged_metrics(use_tagged_metrics);
315        (*metrics.borrow_mut()).set_up_detail(detail);
316    });
317    Ok(())
318}
319
320pub trait Subscriber {
321    fn receive_metric(
322        &mut self,
323        label: &'static str,
324        cluster_id: Option<&str>,
325        backend_id: Option<&str>,
326        metric: MetricValue,
327    );
328}
329
330/// How often `lease_tick` actually does work; cheaper than recomputing the
331/// effective level on every metric emission. Polled at the top of the worker's
332/// `notify` loop, so the cadence floats with traffic but is bounded by this.
333const LEASE_TICK_INTERVAL: Duration = Duration::from_secs(5);
334
335/// Hard cap on lease TTL, mirroring the proto comment on `SetMetricDetail`.
336/// Bounds the worst-case effect of a stuck renewer.
337pub const LEASE_TTL_MAX: Duration = Duration::from_secs(300);
338
339/// Default lease TTL applied when the proto request omits `ttl_seconds` (or
340/// passes `0`). Matches the proto comment.
341pub const LEASE_TTL_DEFAULT: Duration = Duration::from_secs(60);
342
343/// Hard cap on the number of simultaneous leases held by the aggregator.
344/// `lease_apply` rejects new entries (with [`LeaseApplyOutcome::Capped`])
345/// once the table reaches this size. Bounds the lease table's memory and
346/// neutralises the CWE-770 vector where a same-UID attacker rolls
347/// `client_id` faster than expiry to grow the map unbounded. 64 is well
348/// above any plausible TUI fleet (one TUI per operator + a handful of
349/// metric scrapers); legitimate callers renewing the same `client_id`
350/// REPLACE rather than insert and therefore don't bump the count.
351pub const LEASE_TABLE_CAP: usize = 64;
352
353/// Hard cap on `client_id` length accepted by `lease_apply`. The TUI
354/// uses `top:<pid>:<8 hex chars>` ≤ 24 bytes; 64 leaves headroom for
355/// other operator-side scrapers while keeping the lease table's per-
356/// entry memory bounded and the audit-log lease_id column small.
357pub const LEASE_CLIENT_ID_MAX_BYTES: usize = 64;
358
359/// Outcome of [`Aggregator::lease_apply`]. The success arm returns the
360/// `(previous_effective, new_effective)` pair the caller can use to
361/// decide whether to emit a `MetricDetailChanged` audit event; the
362/// failure arms surface bounded-input rejections.
363#[derive(Clone, Copy, Debug, PartialEq, Eq)]
364pub enum LeaseApplyOutcome {
365    /// Lease inserted / renewed.
366    Applied {
367        previous_effective: MetricDetailLevel,
368        new_effective: MetricDetailLevel,
369    },
370    /// `client_id` length exceeds [`LEASE_CLIENT_ID_MAX_BYTES`].
371    ClientIdTooLong,
372    /// Lease table is at [`LEASE_TABLE_CAP`] and the request is a new
373    /// insert (not a renewal). Callers MUST surface this as a `FAILURE`
374    /// to the wire so the lessor can back off.
375    TableFull,
376    /// The requested TTL exceeds [`LEASE_TTL_MAX`]. This arm is
377    /// theoretically unreachable in the normal flow because the
378    /// dispatch site rejects out-of-range values before reaching the
379    /// aggregator; surfacing it explicitly catches callers that bypass
380    /// dispatch (proto fuzzing, future internal use) instead of
381    /// silently clamping their intent.
382    TtlOutOfRange,
383    /// A renewal request was presented with a [`PeerBinding`] that
384    /// disagrees with the existing lease's apply-time binding. Returned
385    /// only when the existing binding is fully known (per
386    /// [`PeerBinding::is_known`]); unknown-binding leases (no
387    /// `SO_PEERCRED` available, pre-binding callers) continue to
388    /// accept any renewer. Closes the same-UID `client_id`-collision
389    /// takeover where an attacker re-applies a lease against a
390    /// victim's id and replaces the binding to lock the victim out of
391    /// their own clear.
392    Unauthorized,
393}
394
395/// Master-populated peer binding stored alongside each lease so subsequent
396/// `clear` requests can be authorised against the apply-time owner. The
397/// binding pairs an OS pid (from `SO_PEERCRED` on Linux, captured by the
398/// master at command-socket accept time) with the master-side connection
399/// session ULID. A clear request must present BOTH values matching the
400/// apply-time binding. A binding of `(None, None)` ("unknown") at apply
401/// time means the connection had no peer credentials available — the
402/// worker then accepts any clear for that `client_id` to preserve compat
403/// with non-Linux callers and intermediate proxies. See the proto comment
404/// on `SetMetricDetail.peer_pid` for the trust model.
405#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
406pub struct PeerBinding {
407    pub pid: Option<i32>,
408    /// Session ULID rendered as a `u128` (the master's session anchor).
409    /// Stored as the raw u128 rather than the original `Ulid` to avoid
410    /// dragging that crate into the metrics-aggregator dependencies.
411    pub session_ulid: Option<u128>,
412}
413
414impl PeerBinding {
415    /// `true` when both halves are known. A fully-known binding is the
416    /// only one against which `clear` may be authorised; partial bindings
417    /// (one half `None`) degrade to "accept any clear" per the proto
418    /// contract on `SetMetricDetail.peer_pid` / `peer_session_ulid`.
419    pub fn is_known(&self) -> bool {
420        self.pid.is_some() && self.session_ulid.is_some()
421    }
422
423    /// True when `self` and `other` are compatible (same `pid` + same
424    /// `session_ulid`) AND `self.is_known()`. Used by `lease_clear` to
425    /// reject mismatched clears.
426    pub fn matches(&self, other: &PeerBinding) -> bool {
427        self.is_known() && self.pid == other.pid && self.session_ulid == other.session_ulid
428    }
429}
430
431/// One lease entry kept inside [`Aggregator::leases`]. Carries the
432/// requested cardinality level, the absolute expiry instant, and the
433/// master-supplied [`PeerBinding`] captured at apply time.
434#[derive(Clone, Copy, Debug)]
435struct LeaseEntry {
436    level: MetricDetailLevel,
437    expires_at: Instant,
438    binding: PeerBinding,
439}
440
441/// Outcome of [`Aggregator::lease_clear`].
442#[derive(Clone, Copy, Debug, PartialEq, Eq)]
443pub enum LeaseClearOutcome {
444    /// The lease was found, the binding matched (or was unknown at apply
445    /// time), and the entry has been removed. Carries the worker's
446    /// previous effective level so the caller can decide whether to
447    /// emit a `MetricDetailChanged` audit event.
448    Cleared {
449        previous_effective: MetricDetailLevel,
450    },
451    /// No lease was held by the requested `client_id`. Silent no-op.
452    NotFound,
453    /// A lease existed but the peer binding presented with the clear did
454    /// not match the apply-time binding. The lease is left intact. The
455    /// worker MUST surface this as a `FAILURE` response to discourage
456    /// guessing attacks against another operator's lease.
457    Unauthorized,
458}
459
460pub struct Aggregator {
461    /// appended to metric keys, usually "sozu-"
462    prefix: String,
463    /// gathers metrics and sends them on a UDP socket
464    network: Option<NetworkDrain>,
465    /// gather metrics locally, queried by the CLI
466    local: LocalDrain,
467    /// Static cardinality knob — set once at boot from `MetricsConfig.detail`.
468    /// Filters `(cluster_id, backend_id)` labels at emission time. Each level
469    /// is a SUPERSET of the previous one (`Process ⊆ Frontend ⊆ Cluster ⊆ Backend`).
470    configured: MetricDetailLevel,
471    /// Effective cardinality knob actually applied at emission. Equal to
472    /// `max(configured, max(active leases))`. Recomputed only when leases
473    /// change, so the hot path (`receive_metric`) reads a single field.
474    effective: MetricDetailLevel,
475    /// Active TTL leases keyed by `client_id` from `SetMetricDetail`. A live
476    /// lease holds the worker's effective level at-or-above its requested
477    /// detail until it expires or is explicitly cleared. Multiple clients
478    /// (e.g. several `sozu top` sessions) lease independently.
479    leases: HashMap<String, LeaseEntry>,
480    /// Wall-clock anchor for the polled lease janitor. Updated on every
481    /// `lease_tick` call regardless of whether expiry happened, so the
482    /// caller's "is it time to tick?" check stays cheap.
483    last_lease_tick: Instant,
484}
485
486impl Aggregator {
487    pub fn new(prefix: String) -> Aggregator {
488        let default_detail = MetricDetailLevel::default();
489        Aggregator {
490            prefix: prefix.clone(),
491            network: None,
492            local: LocalDrain::new(prefix),
493            configured: default_detail,
494            effective: default_detail,
495            leases: HashMap::new(),
496            last_lease_tick: Instant::now(),
497        }
498    }
499
500    pub fn set_up_prefix(&mut self, prefix: String) {
501        self.prefix = prefix;
502    }
503
504    pub fn set_up_remote(&mut self, socket: UdpSocket, addr: SocketAddr) {
505        self.network = Some(NetworkDrain::new(self.prefix.clone(), socket, addr));
506    }
507
508    pub fn set_up_origin(&mut self, origin: String) {
509        if let Some(n) = self.network.as_mut() {
510            n.origin = origin;
511        }
512    }
513
514    pub fn set_up_tagged_metrics(&mut self, tagged: bool) {
515        if let Some(n) = self.network.as_mut() {
516            n.use_tagged_metrics = tagged;
517        }
518    }
519
520    /// Set the static cardinality floor (`MetricsConfig.detail` from the TOML
521    /// configuration). Live leases applied via [`Self::lease_apply`] can
522    /// elevate the effective level at runtime; the configured floor is the
523    /// lower bound the worker falls back to when no lease is active.
524    ///
525    /// See [`MetricDetailLevel`] and [`filter_labels_for_detail`] for the
526    /// per-level filtering rules.
527    pub fn set_up_detail(&mut self, detail: MetricDetailLevel) {
528        self.configured = detail;
529        self.recompute_effective();
530        // Raising the configured floor can only raise/hold the effective
531        // level; the full invariant set must hold afterwards.
532        debug_assert!(
533            self.effective >= self.configured,
534            "effective must dominate the freshly-set configured floor"
535        );
536        #[cfg(debug_assertions)]
537        self.check_lease_invariants();
538    }
539
540    /// Returns the static (configured) cardinality floor. Independent of
541    /// runtime leases.
542    pub fn detail_configured(&self) -> MetricDetailLevel {
543        self.configured
544    }
545
546    /// Returns the cardinality level actually applied to emissions. Equal to
547    /// `max(configured, max(active leases))`.
548    pub fn detail_effective(&self) -> MetricDetailLevel {
549        self.effective
550    }
551
552    /// Apply or renew a runtime cardinality lease for `client_id`. If a lease
553    /// for the same client already exists it is REPLACED (used for renewals).
554    /// `ttl` values above [`LEASE_TTL_MAX`] are **rejected** with
555    /// [`LeaseApplyOutcome::TtlOutOfRange`] (NOT clamped); callers must
556    /// handle that arm or pre-validate the TTL. On success the call returns
557    /// `(previous_effective, new_effective)` so callers can decide whether
558    /// to emit a `MetricDetailChanged` audit event.
559    ///
560    /// The proto contract on `SetMetricDetail.ttl_seconds` is that the worker
561    /// **rejects** out-of-range values with a `FAILURE` response — that
562    /// enforcement lives at the dispatch site in `lib/src/server.rs::notify`.
563    /// `lease_apply` itself returns [`LeaseApplyOutcome::TtlOutOfRange`]
564    /// when called with `ttl > LEASE_TTL_MAX` so callers that bypass the
565    /// dispatch site (proto fuzzing, future internal use) see a loud
566    /// rejection instead of silently capped semantics. Same shape for
567    /// over-long `client_id` and a full lease table — see
568    /// [`LeaseApplyOutcome`] for the failure arms.
569    pub fn lease_apply(
570        &mut self,
571        client_id: String,
572        level: MetricDetailLevel,
573        ttl: Duration,
574        binding: PeerBinding,
575    ) -> LeaseApplyOutcome {
576        if client_id.len() > LEASE_CLIENT_ID_MAX_BYTES {
577            return LeaseApplyOutcome::ClientIdTooLong;
578        }
579        if ttl > LEASE_TTL_MAX {
580            return LeaseApplyOutcome::TtlOutOfRange;
581        }
582        // Cap the table size BEFORE the insert, but only when the caller
583        // is inserting a fresh entry. Renewals (same `client_id` already
584        // present) REPLACE the existing entry and therefore keep the
585        // count stable — they must always succeed so an active operator
586        // never loses their lease just because the table is full.
587        let is_renewal = self.leases.contains_key(&client_id);
588        if !is_renewal && self.leases.len() >= LEASE_TABLE_CAP {
589            return LeaseApplyOutcome::TableFull;
590        }
591        // Renewal-binding gate: when a lease already exists for this
592        // `client_id` and its apply-time binding is fully known, the
593        // renewer's presented binding MUST match. Without this check
594        // any same-UID caller that learns the `client_id` (PID from
595        // /proc, suffix from audit log) could re-apply against it,
596        // overwriting the binding to point at the attacker's session
597        // and then driving the victim's Drop-time `clear` into
598        // `Unauthorized`. Unknown apply-time bindings continue to
599        // accept any renewer per the proto contract on
600        // `SetMetricDetail.peer_pid` / `peer_session_ulid`.
601        if is_renewal
602            && let Some(entry) = self.leases.get(&client_id)
603            && entry.binding.is_known()
604            && !entry.binding.matches(&binding)
605        {
606            return LeaseApplyOutcome::Unauthorized;
607        }
608        let expires_at = Instant::now() + ttl;
609        let before_len = self.leases.len();
610        self.leases.insert(
611            client_id,
612            LeaseEntry {
613                level,
614                expires_at,
615                binding,
616            },
617        );
618        // Map-accounting invariant: a renewal REPLACES in place (count
619        // stable) while a fresh insert grows the table by exactly one. The
620        // table must never exceed the cap on the insert path because the
621        // fresh-insert branch was gated by the `>= LEASE_TABLE_CAP` check
622        // above.
623        debug_assert_eq!(
624            self.leases.len(),
625            before_len + (!is_renewal) as usize,
626            "lease_apply grows the table by exactly one iff this is a fresh insert"
627        );
628        debug_assert!(
629            self.leases.len() <= LEASE_TABLE_CAP,
630            "lease table must never exceed LEASE_TABLE_CAP after an accepted apply"
631        );
632        let previous_effective = self.effective;
633        self.recompute_effective();
634        // An apply can only ELEVATE the effective level (a fresh/renewed
635        // lease adds to the max), never lower it below the prior effective.
636        debug_assert!(
637            self.effective >= previous_effective,
638            "lease_apply must not lower the effective detail level"
639        );
640        debug_assert!(
641            self.effective >= self.configured,
642            "effective must never drop below the configured floor"
643        );
644        #[cfg(debug_assertions)]
645        self.check_lease_invariants();
646        LeaseApplyOutcome::Applied {
647            previous_effective,
648            new_effective: self.effective,
649        }
650    }
651
652    /// Explicitly release a lease keyed by `client_id`. The clear is
653    /// authorised against the apply-time [`PeerBinding`] when one was
654    /// recorded — see [`LeaseClearOutcome`] for the three result states.
655    /// A clear request with `presented = PeerBinding::default()` matches
656    /// only leases whose apply-time binding was also unknown, preserving
657    /// compat with pre-binding callers and platforms without
658    /// `SO_PEERCRED`.
659    pub fn lease_clear(&mut self, client_id: &str, presented: PeerBinding) -> LeaseClearOutcome {
660        let Some(entry) = self.leases.get(client_id) else {
661            return LeaseClearOutcome::NotFound;
662        };
663        // If the apply-time binding is fully known we MUST authorise the
664        // clear against it; presenting `default()` (an empty binding) is
665        // a mismatch. If the apply-time binding is unknown (a pre-binding
666        // caller, or a non-Linux peer with no credentials), we permit
667        // any clear — there is nothing to authorise against.
668        if entry.binding.is_known() && !entry.binding.matches(&presented) {
669            return LeaseClearOutcome::Unauthorized;
670        }
671        // We only reach here after the `get` above proved the key is
672        // present, so the removal evicts exactly one entry.
673        let before_len = self.leases.len();
674        self.leases.remove(client_id);
675        debug_assert!(
676            !self.leases.contains_key(client_id),
677            "lease_clear must evict the cleared client_id"
678        );
679        debug_assert_eq!(
680            self.leases.len(),
681            before_len - 1,
682            "lease_clear removes exactly one entry on the authorised path"
683        );
684        let previous = self.effective;
685        self.recompute_effective();
686        // Clearing a lease can only LOWER or hold the effective level (one
687        // contributor to the max is gone), never raise it.
688        debug_assert!(
689            self.effective <= previous,
690            "lease_clear must not raise the effective detail level"
691        );
692        debug_assert!(
693            self.effective >= self.configured,
694            "effective must never drop below the configured floor"
695        );
696        #[cfg(debug_assertions)]
697        self.check_lease_invariants();
698        LeaseClearOutcome::Cleared {
699            previous_effective: previous,
700        }
701    }
702
703    /// Returns the number of active (non-expired-as-of-last-tick) leases.
704    /// Surfaced in `WorkerMetricDetailStatus` so the TUI can show
705    /// "another client is still leasing this level".
706    pub fn lease_count(&self) -> u32 {
707        self.leases.len() as u32
708    }
709
710    /// Polled lease-expiry janitor. Called from the worker's `notify` loop
711    /// (and from periodic timers); cheap when nothing has expired. Returns
712    /// `Some(previous_effective)` when at least one lease expired AND that
713    /// expiry actually moved the effective level (so the caller can emit a
714    /// `MetricDetailChanged` audit event), or `None` for the no-change path.
715    ///
716    /// `now` is parameterised so unit tests can advance the clock
717    /// deterministically without sleeping.
718    pub fn lease_tick(&mut self, now: Instant) -> Option<MetricDetailLevel> {
719        self.last_lease_tick = now;
720        let before = self.leases.len();
721        self.leases.retain(|_, entry| entry.expires_at > now);
722        // Expiry is a pure shrink: `retain` can only drop entries, never
723        // add them, and every surviving lease must be strictly in the
724        // future relative to `now`.
725        debug_assert!(
726            self.leases.len() <= before,
727            "lease_tick (expiry) can only shrink the table, never grow it"
728        );
729        debug_assert!(
730            self.leases.values().all(|entry| entry.expires_at > now),
731            "every surviving lease must expire strictly after `now`"
732        );
733        if self.leases.len() == before {
734            return None;
735        }
736        let previous = self.effective;
737        self.recompute_effective();
738        // Losing leases can only lower or hold the effective level.
739        debug_assert!(
740            self.effective <= previous,
741            "lease expiry must not raise the effective detail level"
742        );
743        debug_assert!(
744            self.effective >= self.configured,
745            "effective must never drop below the configured floor"
746        );
747        #[cfg(debug_assertions)]
748        self.check_lease_invariants();
749        if previous != self.effective {
750            Some(previous)
751        } else {
752            None
753        }
754    }
755
756    /// True when at least [`LEASE_TICK_INTERVAL`] has passed since the last
757    /// `lease_tick`. Use to gate the polled janitor at the top of `notify`
758    /// without paying a HashMap walk on every event-loop iteration.
759    pub fn lease_tick_due(&self, now: Instant) -> bool {
760        now.duration_since(self.last_lease_tick) >= LEASE_TICK_INTERVAL
761    }
762
763    /// Recompute `effective = max(configured, max(active leases))`. Cheap (one
764    /// linear pass over the lease table) and only called on apply/clear/tick,
765    /// never on the metric-emission hot path.
766    fn recompute_effective(&mut self) {
767        let mut max_lease = self.configured;
768        for entry in self.leases.values() {
769            if entry.level > max_lease {
770                max_lease = entry.level;
771            }
772        }
773        self.effective = max_lease;
774        // Post-condition: effective is the floor AND every active lease's
775        // ceiling — i.e. it is at least `configured` and at least each
776        // lease level, and it equals one of those contributors (max).
777        debug_assert!(
778            self.effective >= self.configured,
779            "effective is bounded below by the configured floor"
780        );
781        debug_assert!(
782            self.leases.values().all(|e| self.effective >= e.level),
783            "effective dominates every active lease level"
784        );
785        debug_assert!(
786            self.effective == self.configured
787                || self.leases.values().any(|e| e.level == self.effective),
788            "effective must equal the configured floor or one active lease level"
789        );
790    }
791
792    /// Debug-only full invariant sweep for the lease table, called as a
793    /// run-to-completion post-condition from every lease-mutating method.
794    /// Gathers the cross-field relationships that the individual delta
795    /// assertions cannot express on their own.
796    #[cfg(debug_assertions)]
797    fn check_lease_invariants(&self) {
798        debug_assert!(
799            self.leases.len() <= LEASE_TABLE_CAP,
800            "lease table size must stay within LEASE_TABLE_CAP"
801        );
802        debug_assert!(
803            self.leases
804                .keys()
805                .all(|id| id.len() <= LEASE_CLIENT_ID_MAX_BYTES),
806            "every stored client_id must respect LEASE_CLIENT_ID_MAX_BYTES"
807        );
808        debug_assert!(
809            self.effective >= self.configured,
810            "effective is at least the configured floor"
811        );
812        debug_assert!(
813            self.leases.values().all(|e| self.effective >= e.level),
814            "effective dominates every active lease level"
815        );
816        debug_assert!(
817            self.effective == self.configured
818                || self.leases.values().any(|e| e.level == self.effective),
819            "effective equals the configured floor or some active lease level"
820        );
821    }
822
823    pub fn socket(&self) -> Option<&UdpSocket> {
824        self.network.as_ref().map(|n| &n.remote.get_ref().socket)
825    }
826
827    pub fn socket_mut(&mut self) -> Option<&mut UdpSocket> {
828        self.network
829            .as_mut()
830            .map(|n| &mut n.remote.get_mut().socket)
831    }
832
833    pub fn count_add(&mut self, key: &'static str, count_value: i64) {
834        self.receive_metric(key, None, None, MetricValue::Count(count_value));
835    }
836
837    pub fn set_gauge(&mut self, key: &'static str, gauge_value: usize) {
838        self.receive_metric(key, None, None, MetricValue::Gauge(gauge_value));
839    }
840
841    pub fn gauge_add(&mut self, key: &'static str, gauge_value: i64) {
842        self.receive_metric(key, None, None, MetricValue::GaugeAdd(gauge_value));
843    }
844
845    pub fn writable(&mut self) {
846        if let Some(ref mut net) = self.network.as_mut() {
847            net.writable();
848        }
849    }
850
851    pub fn send_data(&mut self) {
852        if let Some(ref mut net) = self.network.as_mut() {
853            net.send_metrics();
854        }
855    }
856
857    pub fn dump_local_proxy_metrics(&mut self) -> BTreeMap<String, FilteredMetrics> {
858        self.local.dump_proxy_metrics(&Vec::new())
859    }
860
861    pub fn query(&mut self, q: &QueryMetricsOptions) -> Result<ResponseContent, MetricError> {
862        self.local.query(q)
863    }
864
865    pub fn clear_local(&mut self) {
866        if let Some(ref mut net) = self.network.as_mut() {
867            net.clear();
868        }
869        self.local.clear();
870    }
871
872    pub fn configure(&mut self, config: &MetricsConfiguration) {
873        self.local.configure(config);
874    }
875
876    /// Drop all metric storage for a cluster across BOTH drains and arm
877    /// the per-drain tombstone so subsequent emissions for the cluster are
878    /// dropped on the floor instead of resurrecting the row via
879    /// `entry().or_default()`. Called from the worker IPC dispatch on
880    /// [`RequestType::RemoveCluster`]. Network-side draining the queued
881    /// `MetricLine`s produces immediate silence on the wire (any unsent
882    /// statsd interval for the cluster is discarded).
883    ///
884    /// Worker-event-loop only — `METRICS` is a `thread_local!`, and the
885    /// mutable borrow taken here must not be re-entered from a session-
886    /// bound code path. Calling this from a metrics emission site would
887    /// deadlock the thread-local on `borrow_mut`.
888    pub fn remove_cluster(&mut self, cluster_id: &str) {
889        if let Some(ref mut net) = self.network.as_mut() {
890            net.remove_cluster(cluster_id);
891        }
892        self.local.remove_cluster(cluster_id);
893    }
894
895    /// Re-arm a previously-removed cluster id across BOTH drains. Called
896    /// from the worker IPC dispatch on [`RequestType::AddCluster`].
897    /// Idempotent on ids that were never removed. Without this hook a
898    /// cluster removed then re-added would stay tombstoned forever and
899    /// every fresh metric emission would be dropped.
900    pub fn add_cluster(&mut self, cluster_id: &str) {
901        if let Some(ref mut net) = self.network.as_mut() {
902            net.add_cluster(cluster_id);
903        }
904        self.local.add_cluster(cluster_id);
905    }
906
907    /// Drop all metric storage for one backend across BOTH drains. Called
908    /// from the worker IPC dispatch on [`RequestType::RemoveBackend`].
909    /// Does NOT tombstone the cluster (only `remove_cluster` does).
910    pub fn remove_backend(&mut self, cluster_id: &str, backend_id: &str) {
911        if let Some(ref mut net) = self.network.as_mut() {
912            net.remove_backend(cluster_id, backend_id);
913        }
914        self.local.remove_backend(cluster_id, backend_id);
915    }
916}
917
918impl Subscriber for Aggregator {
919    fn receive_metric(
920        &mut self,
921        label: &'static str,
922        cluster_id: Option<&str>,
923        backend_id: Option<&str>,
924        metric: MetricValue,
925    ) {
926        // Apply the cardinality knob BEFORE handing the metric to either
927        // drain. Both drains see the same filtered labels, keeping the local
928        // CLI view consistent with what statsd receives on the wire. Reads
929        // `effective` (max of the configured floor and any active leases),
930        // which is recomputed off the hot path.
931        let (cluster_id, backend_id) =
932            filter_labels_for_detail(self.effective, cluster_id, backend_id);
933        if let Some(ref mut net) = self.network.as_mut() {
934            net.receive_metric(label, cluster_id, backend_id, metric.to_owned());
935        }
936        self.local
937            .receive_metric(label, cluster_id, backend_id, metric);
938    }
939}
940
941pub struct MetricSocket {
942    pub addr: SocketAddr,
943    pub socket: UdpSocket,
944}
945
946impl Write for MetricSocket {
947    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
948        self.socket.send_to(buf, self.addr)
949    }
950
951    fn flush(&mut self) -> io::Result<()> {
952        Ok(())
953    }
954}
955
956pub fn udp_bind() -> Result<UdpSocket, MetricError> {
957    let address = "0.0.0.0:0";
958
959    let udp_address =
960        address
961            .parse::<SocketAddr>()
962            .map_err(|parse_error| MetricError::WrongUdpAddress {
963                address: address.to_owned(),
964                error: parse_error.to_string(),
965            })?;
966
967    UdpSocket::bind(udp_address).map_err(|parse_error| MetricError::UdpBind {
968        address: udp_address.to_string(),
969        error: parse_error.to_string(),
970    })
971}
972
973/// adds a value to a counter
974#[macro_export]
975macro_rules! count (
976  ($key:expr, $value: expr) => ({
977    let v = $value;
978    $crate::metrics::METRICS.with(|metrics| {
979      (*metrics.borrow_mut()).count_add($key, v);
980    });
981  })
982);
983
984/// adds 1 to a counter
985#[macro_export]
986macro_rules! incr (
987  ($key:expr) => (count!($key, 1));
988  ($key:expr, $cluster_id:expr, $backend_id:expr) => {
989    {
990        use $crate::metrics::Subscriber;
991
992        $crate::metrics::METRICS.with(|metrics| {
993          (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Count(1));
994        });
995    }
996  }
997);
998
999#[macro_export]
1000macro_rules! decr (
1001  ($key:expr) => (count!($key, -1))
1002);
1003
1004#[macro_export]
1005macro_rules! gauge (
1006  ($key:expr, $value: expr) => ({
1007    let v = $value;
1008    $crate::metrics::METRICS.with(|metrics| {
1009      (*metrics.borrow_mut()).set_gauge($key, v);
1010    });
1011  });
1012  ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
1013    {
1014        use $crate::metrics::Subscriber;
1015        let v = $value;
1016
1017        $crate::metrics::METRICS.with(|metrics| {
1018          (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Gauge(v as usize));
1019        });
1020    }
1021  }
1022);
1023
1024#[macro_export]
1025macro_rules! gauge_add (
1026  ($key:expr, $value: expr) => ({
1027    let v = $value;
1028    $crate::metrics::METRICS.with(|metrics| {
1029      (*metrics.borrow_mut()).gauge_add($key, v);
1030    });
1031  });
1032  ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
1033    {
1034        use $crate::metrics::Subscriber;
1035        let v = $value;
1036
1037        $crate::metrics::METRICS.with(|metrics| {
1038          (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::GaugeAdd(v));
1039        });
1040    }
1041  }
1042);
1043
1044#[macro_export]
1045macro_rules! time (
1046  ($key:expr, $value: expr) => ({
1047    use $crate::metrics::{MetricValue,Subscriber};
1048    let v = $value;
1049    $crate::metrics::METRICS.with(|metrics| {
1050      let m = &mut *metrics.borrow_mut();
1051
1052      m.receive_metric($key, None, None, MetricValue::Time(v as usize));
1053    });
1054  });
1055  ($key:expr, $cluster_id:expr, $value: expr) => ({
1056    use $crate::metrics::{MetricValue,Subscriber};
1057    let v = $value;
1058    $crate::metrics::METRICS.with(|metrics| {
1059      let m = &mut *metrics.borrow_mut();
1060      let cluster: &str = $cluster_id;
1061
1062      m.receive_metric($key, Some(cluster), None, MetricValue::Time(v as usize));
1063    });
1064  })
1065);
1066
1067#[macro_export]
1068macro_rules! record_backend_metrics (
1069  ($cluster_id:expr, $backend_id:expr, $response_time: expr, $backend_connection_time: expr, $bin: expr, $bout: expr) => {
1070    use $crate::metrics::{MetricValue,Subscriber};
1071    $crate::metrics::METRICS.with(|metrics| {
1072      let m = &mut *metrics.borrow_mut();
1073      let cluster_id: &str = $cluster_id;
1074      let backend_id: &str = $backend_id;
1075
1076      m.receive_metric($crate::metrics::names::backend::BYTES_IN, Some(cluster_id), Some(backend_id), MetricValue::Count($bin as i64));
1077      m.receive_metric($crate::metrics::names::backend::BYTES_OUT, Some(cluster_id), Some(backend_id), MetricValue::Count($bout as i64));
1078      m.receive_metric($crate::metrics::names::backend::RESPONSE_TIME, Some(cluster_id), Some(backend_id), MetricValue::Time($response_time as usize));
1079      if let Some(t) = $backend_connection_time {
1080        m.receive_metric($crate::metrics::names::backend::CONNECTION_TIME, Some(cluster_id), Some(backend_id), MetricValue::Time(t.as_millis() as usize));
1081      }
1082
1083      m.receive_metric($crate::metrics::names::backend::REQUESTS, Some(cluster_id), Some(backend_id), MetricValue::Count(1));
1084    });
1085  }
1086);
1087
1088#[cfg(test)]
1089mod tests {
1090    use super::*;
1091
1092    #[test]
1093    fn filter_labels_process_drops_both() {
1094        assert_eq!(
1095            filter_labels_for_detail(MetricDetailLevel::Process, Some("c"), Some("b")),
1096            (None, None),
1097        );
1098    }
1099
1100    #[test]
1101    fn filter_labels_frontend_drops_both_today() {
1102        // Reserved for per-listener counters; same as Process for now.
1103        assert_eq!(
1104            filter_labels_for_detail(MetricDetailLevel::Frontend, Some("c"), Some("b")),
1105            (None, None),
1106        );
1107    }
1108
1109    #[test]
1110    fn filter_labels_cluster_keeps_cluster_drops_backend() {
1111        assert_eq!(
1112            filter_labels_for_detail(MetricDetailLevel::Cluster, Some("c"), Some("b")),
1113            (Some("c"), None),
1114        );
1115    }
1116
1117    #[test]
1118    fn filter_labels_backend_keeps_both() {
1119        assert_eq!(
1120            filter_labels_for_detail(MetricDetailLevel::Backend, Some("c"), Some("b")),
1121            (Some("c"), Some("b")),
1122        );
1123    }
1124
1125    #[test]
1126    fn filter_labels_none_in_none_out() {
1127        // Absent labels stay absent regardless of detail level — the filter
1128        // never invents a label, only drops.
1129        for detail in [
1130            MetricDetailLevel::Process,
1131            MetricDetailLevel::Frontend,
1132            MetricDetailLevel::Cluster,
1133            MetricDetailLevel::Backend,
1134        ] {
1135            assert_eq!(filter_labels_for_detail(detail, None, None), (None, None));
1136        }
1137    }
1138
1139    #[test]
1140    fn aggregator_default_detail_is_cluster() {
1141        // Pre-knob behaviour preserved: if a worker / process never calls
1142        // `set_up_detail`, cluster-scoped metrics still reach the drains.
1143        let agg = Aggregator::new("sozu".to_owned());
1144        assert_eq!(agg.detail_configured(), MetricDetailLevel::Cluster);
1145        assert_eq!(agg.detail_effective(), MetricDetailLevel::Cluster);
1146        assert_eq!(agg.lease_count(), 0);
1147    }
1148
1149    /// Fully-known binding used by tests that don't otherwise care about the
1150    /// peer-binding mechanic. Two distinct values (`OWNER_*` / `OTHER_*`)
1151    /// let `lease_clear` tests assert authorised vs unauthorised paths.
1152    fn owner_binding() -> PeerBinding {
1153        PeerBinding {
1154            pid: Some(1234),
1155            session_ulid: Some(0x0123_4567_89ab_cdef_0123_4567_89ab_cdefu128),
1156        }
1157    }
1158
1159    fn other_binding() -> PeerBinding {
1160        PeerBinding {
1161            pid: Some(5678),
1162            session_ulid: Some(0xfedc_ba98_7654_3210_fedc_ba98_7654_3210u128),
1163        }
1164    }
1165
1166    /// Extract `(previous_effective, new_effective)` from a successful
1167    /// `lease_apply`; panics on any failure arm so tests that don't care
1168    /// about the failure paths stay compact.
1169    fn unwrap_applied(outcome: LeaseApplyOutcome) -> (MetricDetailLevel, MetricDetailLevel) {
1170        match outcome {
1171            LeaseApplyOutcome::Applied {
1172                previous_effective,
1173                new_effective,
1174            } => (previous_effective, new_effective),
1175            other => panic!("expected LeaseApplyOutcome::Applied, got {other:?}"),
1176        }
1177    }
1178
1179    #[test]
1180    fn lease_apply_elevates_effective_above_configured() {
1181        // Configured floor stays at Cluster; a lease for Backend lifts the
1182        // effective level until the lease expires or is cleared.
1183        let mut agg = Aggregator::new("sozu".to_owned());
1184        agg.set_up_detail(MetricDetailLevel::Cluster);
1185        let (prev, new) = unwrap_applied(agg.lease_apply(
1186            "test:1".to_owned(),
1187            MetricDetailLevel::Backend,
1188            Duration::from_secs(60),
1189            PeerBinding::default(),
1190        ));
1191        assert_eq!(prev, MetricDetailLevel::Cluster);
1192        assert_eq!(new, MetricDetailLevel::Backend);
1193        assert_eq!(agg.detail_configured(), MetricDetailLevel::Cluster);
1194        assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1195        assert_eq!(agg.lease_count(), 1);
1196    }
1197
1198    #[test]
1199    fn lease_apply_below_configured_does_not_lower_effective() {
1200        // A lease can only ELEVATE the floor, never push below `configured`.
1201        let mut agg = Aggregator::new("sozu".to_owned());
1202        agg.set_up_detail(MetricDetailLevel::Backend);
1203        let (prev, new) = unwrap_applied(agg.lease_apply(
1204            "test:1".to_owned(),
1205            MetricDetailLevel::Cluster,
1206            Duration::from_secs(60),
1207            PeerBinding::default(),
1208        ));
1209        assert_eq!(prev, MetricDetailLevel::Backend);
1210        assert_eq!(new, MetricDetailLevel::Backend);
1211    }
1212
1213    #[test]
1214    fn lease_apply_rejects_client_id_over_cap() {
1215        // Defence-in-depth: even if dispatch lets a too-long id through,
1216        // the aggregator refuses to store it.
1217        let mut agg = Aggregator::new("sozu".to_owned());
1218        let too_long = "x".repeat(LEASE_CLIENT_ID_MAX_BYTES + 1);
1219        assert_eq!(
1220            agg.lease_apply(
1221                too_long,
1222                MetricDetailLevel::Backend,
1223                Duration::from_secs(60),
1224                PeerBinding::default(),
1225            ),
1226            LeaseApplyOutcome::ClientIdTooLong
1227        );
1228        assert_eq!(agg.lease_count(), 0);
1229    }
1230
1231    #[test]
1232    fn lease_apply_rejects_when_table_is_full() {
1233        // Fill the table to capacity with distinct client_ids; one more
1234        // insert is refused. A RENEWAL of an existing entry must still
1235        // succeed (replaces in place, count unchanged).
1236        let mut agg = Aggregator::new("sozu".to_owned());
1237        for i in 0..LEASE_TABLE_CAP {
1238            assert!(matches!(
1239                agg.lease_apply(
1240                    format!("client:{i:02}"),
1241                    MetricDetailLevel::Backend,
1242                    Duration::from_secs(60),
1243                    PeerBinding::default(),
1244                ),
1245                LeaseApplyOutcome::Applied { .. }
1246            ));
1247        }
1248        assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1249        // New distinct client → rejected.
1250        assert_eq!(
1251            agg.lease_apply(
1252                "newcomer".to_owned(),
1253                MetricDetailLevel::Backend,
1254                Duration::from_secs(60),
1255                PeerBinding::default(),
1256            ),
1257            LeaseApplyOutcome::TableFull,
1258        );
1259        assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1260        // Renewal of an existing entry → still accepted.
1261        assert!(matches!(
1262            agg.lease_apply(
1263                "client:00".to_owned(),
1264                MetricDetailLevel::Backend,
1265                Duration::from_secs(120),
1266                PeerBinding::default(),
1267            ),
1268            LeaseApplyOutcome::Applied { .. }
1269        ));
1270        assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1271    }
1272
1273    #[test]
1274    fn lease_apply_rejects_ttl_over_max() {
1275        // The aggregator no longer silently clamps oversize TTLs.
1276        let mut agg = Aggregator::new("sozu".to_owned());
1277        assert_eq!(
1278            agg.lease_apply(
1279                "client:0".to_owned(),
1280                MetricDetailLevel::Backend,
1281                LEASE_TTL_MAX + Duration::from_secs(1),
1282                PeerBinding::default(),
1283            ),
1284            LeaseApplyOutcome::TtlOutOfRange,
1285        );
1286        assert_eq!(agg.lease_count(), 0);
1287    }
1288
1289    #[test]
1290    fn lease_apply_renewal_replaces_previous_for_same_client() {
1291        // The renewer client re-sends with the same `client_id`; the entry
1292        // is REPLACED (not duplicated). Lease count stays at 1.
1293        // Unknown bindings on both sides skip the renewal-binding gate.
1294        let mut agg = Aggregator::new("sozu".to_owned());
1295        let _ = agg.lease_apply(
1296            "renewer".to_owned(),
1297            MetricDetailLevel::Backend,
1298            Duration::from_secs(30),
1299            PeerBinding::default(),
1300        );
1301        let _ = agg.lease_apply(
1302            "renewer".to_owned(),
1303            MetricDetailLevel::Backend,
1304            Duration::from_secs(60),
1305            PeerBinding::default(),
1306        );
1307        assert_eq!(agg.lease_count(), 1);
1308    }
1309
1310    #[test]
1311    fn lease_apply_renewal_rejects_foreign_binding() {
1312        // Same-UID `client_id` collision attack: the victim applies with
1313        // a known binding; an attacker that learns the `client_id`
1314        // attempts to renew under a different (pid, session_ulid). The
1315        // renewal must be refused so the victim's apply-time binding
1316        // remains the sole authoritative owner — both for subsequent
1317        // renewals AND for the victim's own Drop-time `clear`.
1318        let mut agg = Aggregator::new("sozu".to_owned());
1319        let victim = PeerBinding {
1320            pid: Some(4242),
1321            session_ulid: Some(0x0123_4567_89AB_CDEF_FEDC_BA98_7654_3210),
1322        };
1323        let outcome = agg.lease_apply(
1324            "topcli".to_owned(),
1325            MetricDetailLevel::Backend,
1326            Duration::from_secs(60),
1327            victim,
1328        );
1329        assert!(
1330            matches!(outcome, LeaseApplyOutcome::Applied { .. }),
1331            "victim's initial apply must succeed"
1332        );
1333        let attacker = PeerBinding {
1334            pid: Some(9999),
1335            session_ulid: Some(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF),
1336        };
1337        let outcome = agg.lease_apply(
1338            "topcli".to_owned(),
1339            MetricDetailLevel::Backend,
1340            Duration::from_secs(60),
1341            attacker,
1342        );
1343        assert_eq!(
1344            outcome,
1345            LeaseApplyOutcome::Unauthorized,
1346            "renewal with a mismatched known binding must be refused"
1347        );
1348        // The victim can still clear their own lease — proof the
1349        // refused attempt did not corrupt the stored binding.
1350        let clear = agg.lease_clear("topcli", victim);
1351        assert!(
1352            matches!(clear, LeaseClearOutcome::Cleared { .. }),
1353            "victim's original binding must still clear cleanly after \
1354             the foreign-binding renewal was refused"
1355        );
1356    }
1357
1358    #[test]
1359    fn lease_apply_renewal_with_matching_binding_succeeds() {
1360        // Symmetry case: the legitimate owner re-applies with the same
1361        // (pid, session_ulid). The renewal must succeed so the TUI's
1362        // own renewer thread keeps the lease alive across its TTL.
1363        let mut agg = Aggregator::new("sozu".to_owned());
1364        let owner = PeerBinding {
1365            pid: Some(1234),
1366            session_ulid: Some(0xAAAA_BBBB_CCCC_DDDD_EEEE_FFFF_0000_1111),
1367        };
1368        let _ = agg.lease_apply(
1369            "topcli".to_owned(),
1370            MetricDetailLevel::Backend,
1371            Duration::from_secs(30),
1372            owner,
1373        );
1374        let outcome = agg.lease_apply(
1375            "topcli".to_owned(),
1376            MetricDetailLevel::Backend,
1377            Duration::from_secs(60),
1378            owner,
1379        );
1380        assert!(
1381            matches!(outcome, LeaseApplyOutcome::Applied { .. }),
1382            "renewal with matching binding must succeed (otherwise the \
1383             TUI's own renewer thread would be locked out)"
1384        );
1385    }
1386
1387    #[test]
1388    fn lease_apply_max_merge_two_clients() {
1389        // Two clients, two levels: effective = max(both leases, configured).
1390        // Use `Process` as the floor so the Frontend lease is observable
1391        // after the Backend lease is cleared (otherwise the configured
1392        // Cluster floor would mask the Frontend lease).
1393        let mut agg = Aggregator::new("sozu".to_owned());
1394        agg.set_up_detail(MetricDetailLevel::Process);
1395        let _ = agg.lease_apply(
1396            "scraper".to_owned(),
1397            MetricDetailLevel::Frontend,
1398            Duration::from_secs(60),
1399            PeerBinding::default(),
1400        );
1401        let _ = agg.lease_apply(
1402            "topcli".to_owned(),
1403            MetricDetailLevel::Backend,
1404            Duration::from_secs(60),
1405            PeerBinding::default(),
1406        );
1407        assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1408        assert_eq!(agg.lease_count(), 2);
1409        // Clearing the higher lease drops effective back to the lower one.
1410        let outcome = agg.lease_clear("topcli", PeerBinding::default());
1411        assert_eq!(
1412            outcome,
1413            LeaseClearOutcome::Cleared {
1414                previous_effective: MetricDetailLevel::Backend,
1415            }
1416        );
1417        assert_eq!(agg.detail_effective(), MetricDetailLevel::Frontend);
1418        assert_eq!(agg.lease_count(), 1);
1419    }
1420
1421    #[test]
1422    fn lease_clear_unknown_id_is_silent_noop() {
1423        // Mismatched IDs are silently ignored (other clients' leases unaffected).
1424        let mut agg = Aggregator::new("sozu".to_owned());
1425        let _ = agg.lease_apply(
1426            "real".to_owned(),
1427            MetricDetailLevel::Backend,
1428            Duration::from_secs(60),
1429            PeerBinding::default(),
1430        );
1431        assert_eq!(
1432            agg.lease_clear("ghost", PeerBinding::default()),
1433            LeaseClearOutcome::NotFound
1434        );
1435        assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1436        assert_eq!(agg.lease_count(), 1);
1437    }
1438
1439    #[test]
1440    fn lease_clear_with_matching_binding_authorised() {
1441        // Apply with a known binding, clear with the same binding -> Cleared.
1442        let mut agg = Aggregator::new("sozu".to_owned());
1443        let _ = agg.lease_apply(
1444            "owner-lease".to_owned(),
1445            MetricDetailLevel::Backend,
1446            Duration::from_secs(60),
1447            owner_binding(),
1448        );
1449        let outcome = agg.lease_clear("owner-lease", owner_binding());
1450        assert!(matches!(outcome, LeaseClearOutcome::Cleared { .. }));
1451        assert_eq!(agg.lease_count(), 0);
1452    }
1453
1454    #[test]
1455    fn lease_clear_with_mismatched_binding_is_unauthorized() {
1456        // Apply with one binding, attempt clear with a different binding ->
1457        // Unauthorized; lease left intact.
1458        let mut agg = Aggregator::new("sozu".to_owned());
1459        let _ = agg.lease_apply(
1460            "owner-lease".to_owned(),
1461            MetricDetailLevel::Backend,
1462            Duration::from_secs(60),
1463            owner_binding(),
1464        );
1465        let outcome = agg.lease_clear("owner-lease", other_binding());
1466        assert_eq!(outcome, LeaseClearOutcome::Unauthorized);
1467        assert_eq!(agg.lease_count(), 1);
1468    }
1469
1470    #[test]
1471    fn lease_clear_unknown_apply_binding_accepts_any_clear() {
1472        // Pre-binding / non-Linux apply -> any clear authorised.
1473        let mut agg = Aggregator::new("sozu".to_owned());
1474        let _ = agg.lease_apply(
1475            "legacy".to_owned(),
1476            MetricDetailLevel::Backend,
1477            Duration::from_secs(60),
1478            PeerBinding::default(),
1479        );
1480        let outcome = agg.lease_clear("legacy", owner_binding());
1481        assert!(matches!(outcome, LeaseClearOutcome::Cleared { .. }));
1482        assert_eq!(agg.lease_count(), 0);
1483    }
1484
1485    #[test]
1486    fn lease_clear_known_apply_rejects_default_clear() {
1487        // Known apply binding -> a default ("unknown") clear is rejected.
1488        let mut agg = Aggregator::new("sozu".to_owned());
1489        let _ = agg.lease_apply(
1490            "owner-lease".to_owned(),
1491            MetricDetailLevel::Backend,
1492            Duration::from_secs(60),
1493            owner_binding(),
1494        );
1495        let outcome = agg.lease_clear("owner-lease", PeerBinding::default());
1496        assert_eq!(outcome, LeaseClearOutcome::Unauthorized);
1497    }
1498
1499    #[test]
1500    fn lease_tick_expires_only_past_due_leases() {
1501        // `lease_tick(now)` parameterises the clock so we can test expiry
1502        // without sleeping. Setup: one lease past due, one still active.
1503        // Use `Process` as the floor so the surviving Frontend lease drives
1504        // the effective level after the Backend lease expires (otherwise
1505        // the Cluster floor would mask it).
1506        let mut agg = Aggregator::new("sozu".to_owned());
1507        agg.set_up_detail(MetricDetailLevel::Process);
1508        let now = Instant::now();
1509        // Inject directly into the table to control expires_at deterministically.
1510        agg.leases.insert(
1511            "expired".to_owned(),
1512            LeaseEntry {
1513                level: MetricDetailLevel::Backend,
1514                expires_at: now - Duration::from_secs(1),
1515                binding: PeerBinding::default(),
1516            },
1517        );
1518        agg.leases.insert(
1519            "live".to_owned(),
1520            LeaseEntry {
1521                level: MetricDetailLevel::Frontend,
1522                expires_at: now + Duration::from_secs(60),
1523                binding: PeerBinding::default(),
1524            },
1525        );
1526        agg.recompute_effective();
1527        assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1528        let prev = agg.lease_tick(now);
1529        assert_eq!(prev, Some(MetricDetailLevel::Backend));
1530        assert_eq!(agg.detail_effective(), MetricDetailLevel::Frontend);
1531        assert_eq!(agg.lease_count(), 1);
1532    }
1533
1534    #[test]
1535    fn lease_tick_no_change_returns_none() {
1536        // No leases -> no-op tick, no audit signal.
1537        let mut agg = Aggregator::new("sozu".to_owned());
1538        assert!(agg.lease_tick(Instant::now()).is_none());
1539    }
1540
1541    #[test]
1542    fn lease_apply_at_max_ttl_succeeds() {
1543        // Boundary: exactly LEASE_TTL_MAX is allowed; LEASE_TTL_MAX + 1ns is
1544        // rejected (covered by lease_apply_rejects_ttl_over_max above).
1545        let mut agg = Aggregator::new("sozu".to_owned());
1546        let now = Instant::now();
1547        let outcome = agg.lease_apply(
1548            "max".to_owned(),
1549            MetricDetailLevel::Backend,
1550            LEASE_TTL_MAX,
1551            PeerBinding::default(),
1552        );
1553        assert!(matches!(outcome, LeaseApplyOutcome::Applied { .. }));
1554        let entry = agg.leases.get("max").unwrap();
1555        assert!(entry.expires_at <= now + LEASE_TTL_MAX + Duration::from_millis(50));
1556    }
1557}