Skip to main content

varta_watch/
tracker.rs

1//! Per-pid liveness tracker backed by a pre-allocated `Vec<Slot>` plus a
2//! fixed-size, open-addressed [`PidIndex`] for O(1) pid lookup.
3//!
4//! The tracker is the in-memory ledger the observer consults each time a
5//! frame arrives or the read timeout expires. It never reallocates: capacity
6//! is fixed at construction, the pid-index table is sized for load factor
7//! ≤ 0.5 with a bounded probe budget ([`PidIndex::MAX_PROBE`]), and an
8//! exhausted tracker yields [`Update::CapacityExceeded`] rather than growing.
9//!
10//! The custom pid index replaces `std::collections::HashMap` for two
11//! DO-178C-style reasons: (1) `HashMap` uses SipHash randomized per process,
12//! producing a non-constant memory access pattern that defeats WCET
13//! analysis, and (2) it can rehash on collision-driven growth. `PidIndex`
14//! uses a deterministic integer mixer (Murmur3 finalizer) and linear
15//! probing with a fixed budget, so every operation has a tight WCET bound.
16
17use varta_vlp::{Frame, Status};
18
19use crate::peer_cred::BeatOrigin;
20
21/// Maximum number of distinct agents the observer can track concurrently.
22///
23/// v0.2.0 raises this from 64 to 256. Override via `--tracker-capacity`.
24pub const DEFAULT_CAPACITY: usize = 256;
25
26/// Hard upper bound for `--tracker-capacity`. The tracker uses a linear scan
27/// over active slots; at capacities exceeding this value the scan becomes a
28/// latency spike risk in the observer poll loop.
29pub const MAX_CAPACITY: usize = 4096;
30
31/// Multiplier applied to the stall threshold when choosing eviction victims.
32///
33/// A slot is only evictable if (a) the observer has already surfaced a stall
34/// event for its pid (`stall_emitted == true`) **and** (b) the silence duration
35/// exceeds `threshold * EVICTION_MULTIPLIER`. The 10× multiplier ensures that
36/// only agents which have been silent for **significantly** longer than the
37/// stall threshold are evicted — a slow-beating but alive agent (e.g. every
38/// 40 s with a 5 s threshold) will not be evicted because it resets
39/// `stall_emitted` on every beat.
40const EVICTION_MULTIPLIER: u32 = 10;
41
42/// Default maximum number of slots scanned per [`Tracker::find_evictable_slot`] call.
43///
44/// The eviction scan used to be O(`len`) — at [`MAX_CAPACITY`] = 4096 that
45/// meant up to 4096 slot reads on **every** new-pid frame once the table was
46/// full. An attacker who could send beats from many unique pids could
47/// therefore force O(n) work per arriving frame on the single-threaded
48/// observer poll loop.
49///
50/// The scan is now bounded to `Tracker::eviction_scan_window` (configurable
51/// via `--eviction-scan-window`, defaulting to this constant), with a rotating
52/// cursor ([`Tracker::eviction_scan_cursor`]) that resumes where the previous
53/// call left off. A full sweep takes `ceil(capacity / eviction_scan_window)`
54/// consecutive calls. First-fit eviction inside the window is correct under
55/// capacity pressure (any slot whose silence exceeds
56/// `threshold * EVICTION_MULTIPLIER` is a valid victim — they are by
57/// definition not actively beating).
58///
59/// 256 was chosen as a compromise: large enough that a single call typically
60/// finds a victim on tables of 1–2 k pids, small enough that the per-frame
61/// upper bound stays well under the existing observer-tick budget.
62pub const DEFAULT_EVICTION_SCAN_WINDOW: usize = 256;
63
64/// Minimum allowed value for `--eviction-scan-window`. Window = 1 is
65/// degenerate but correct; only window = 0 breaks the algorithm.
66pub const MIN_EVICTION_SCAN_WINDOW: usize = 1;
67
68/// Maximum allowed value for `--eviction-scan-window`. Capped at
69/// [`MAX_CAPACITY`] so a table scan in one call is bounded by the maximum
70/// tracker size.
71pub const MAX_EVICTION_SCAN_WINDOW: usize = MAX_CAPACITY;
72
73/// Threshold for nonce wrap detection. When the tracker's `last_nonce` for a
74/// pid is within this distance of `u64::MAX` and an incoming frame carries a
75/// nonce below this threshold, the tracker treats the gap as a nonce-space
76/// wrap (agent exhausted u64 nonces and looped to 0) rather than an
77/// out-of-order beat. The threshold is 2^20 (~1M); at 1M beats/sec the agent
78/// would take days to exhaust the nonce space, so a genuine gap this large
79/// can only be a wrap.
80const NONCE_WRAP_THRESHOLD: u64 = 1_048_576;
81
82/// Fixed-size, open-addressed `u32 → u32` map from agent pid to slot index.
83///
84/// Thin newtype over the generic [`crate::probe_table::BoundedIndex`]; see
85/// that module for the full WCET argument. The hot tracker path uses this
86/// type directly so the call sites stay readable while the probe-table
87/// machinery is shared with `OutstandingTable` and `IpStateTable`.
88///
89/// `Entry<u32>` in the generic table is still 8 bytes (see the
90/// `entry_u32_is_8_bytes` test in `probe_table`), so the per-slot cache
91/// pressure on the hot path is unchanged across the refactor.
92pub(crate) struct PidIndex(crate::probe_table::BoundedIndex<u32>);
93
94/// Re-export the generic probe-exhaustion marker so the rest of the tracker
95/// keeps referring to a `ProbeExhausted` type local to this module.
96pub(crate) use crate::probe_table::ProbeExhausted;
97
98impl PidIndex {
99    /// Hard cap on the probe sequence length per `get` / `insert` /
100    /// `remove`.  Referenced from the doc comments above and from
101    /// `Tracker::take_probe_exhausted`'s remediation text; the actual
102    /// bound is enforced inside the generic `BoundedIndex`.
103    #[allow(dead_code)]
104    pub(crate) const MAX_PROBE: usize = crate::probe_table::BoundedIndex::<u32>::MAX_PROBE;
105
106    /// Build a pid index sized for `capacity` agents.
107    pub(crate) fn new(capacity: usize) -> Self {
108        Self(crate::probe_table::BoundedIndex::new(capacity))
109    }
110
111    /// Look up the slot index recorded for `pid`. Returns `None` if absent
112    /// or if the probe budget was exhausted (treated as absent so callers
113    /// fall through to insert / capacity-exceeded paths).
114    pub(crate) fn get(&self, pid: u32) -> Option<usize> {
115        self.0.get(pid)
116    }
117
118    /// Insert or update `pid → slot_idx`. Returns `Err(ProbeExhausted)` if
119    /// no free or matching slot was found within
120    /// [`Self::MAX_PROBE`] probes; table state is unchanged in that case
121    /// and the probe-exhausted counter is incremented.
122    pub(crate) fn insert(&mut self, pid: u32, slot_idx: usize) -> Result<(), ProbeExhausted> {
123        self.0.insert(pid, slot_idx)
124    }
125
126    /// Remove `pid` from the index. Returns the slot index it pointed to,
127    /// if any.
128    pub(crate) fn remove(&mut self, pid: u32) -> Option<usize> {
129        self.0.remove(pid)
130    }
131
132    /// Drain and reset the probe-exhausted counter.
133    pub(crate) fn take_probe_exhausted(&mut self) -> u64 {
134        self.0.take_probe_exhausted()
135    }
136
137    /// Number of live entries.  Used by the existing occupancy invariant
138    /// tests below; production code reads occupancy through the tracker
139    /// itself, not the index.
140    #[cfg(test)]
141    pub(crate) fn len(&self) -> usize {
142        self.0.len()
143    }
144}
145
146/// Controls which slot to reclaim when the tracker is at capacity and a
147/// new pid arrives.
148#[derive(Clone, Copy, Debug, PartialEq, Eq)]
149pub enum EvictionPolicy {
150    /// Only evict slots that have already been surfaced as stalled and
151    /// have been silent for > `threshold * EVICTION_MULTIPLIER`. This is
152    /// the safest choice — a correctly-beating agent is never evicted,
153    /// but a capacity-exhaustion attack can cause `CapacityExceeded`.
154    Strict,
155    /// Like `Strict`, but when no strictly-evictable slot exists, falls
156    /// back to evicting the oldest active slot (by `last_ns`) whose
157    /// silence exceeds `threshold * EVICTION_MULTIPLIER`. This prevents
158    /// `CapacityExceeded` completely at the expense of potentially
159    /// evicting a slow-but-alive agent during a flood.
160    Balanced,
161}
162
163/// Liveness slot for a single agent pid.
164///
165/// `Slot` is internal to the observer and never crosses the wire, so it uses
166/// the default Rust repr (lets the compiler tighten field order). The
167/// `stall_emitted` latch is private: it tracks whether the observer has
168/// already surfaced an [`crate::observer::Event::Stall`] for the current
169/// silence run, so a stalled pid raises the event exactly once and then stays
170/// silent until a fresh beat resets it.
171#[derive(Clone, Copy, Debug)]
172pub struct Slot {
173    /// OS process id of the tracked agent.
174    pub(crate) pid: u32,
175    /// Most recent nonce accepted from this pid.
176    pub(crate) last_nonce: u64,
177    /// Observer-local timestamp (nanoseconds since [`crate::observer::Observer`]
178    /// start) of the last accepted beat for this pid.
179    pub(crate) last_ns: u64,
180    /// Most recent [`Status`] reported by this pid.
181    pub(crate) status: Status,
182    /// Transport origin pinned at the slot's first beat. Used to gate
183    /// recovery-eligibility — beats from a different origin than the pinned
184    /// one are rejected as [`Update::OriginConflict`] without mutating the
185    /// slot. See [`BeatOrigin`] for the trust model.
186    pub(crate) origin: BeatOrigin,
187    /// PID-namespace inode pinned at the slot's first beat (Linux only).
188    ///
189    /// `None` on non-Linux platforms, for UDP transports (no kernel attestation),
190    /// or when `/proc/<peer_pid>/ns/pid` was unreadable at first contact. A
191    /// later beat carrying a different `Some(_)` namespace inode for the same
192    /// pid is rejected as [`Update::NamespaceConflict`] without mutating the
193    /// slot. A `None → Some(_)` upgrade is permitted exactly once — it
194    /// represents a peer whose namespace became readable after a transient
195    /// failure (e.g. peer died briefly between `recvmsg` and `readlink`).
196    pub(crate) pid_ns_inode: Option<u64>,
197    /// False iff this slot has never been written; observers treat the
198    /// slot's other fields as undefined when `used == false`.
199    pub(crate) used: bool,
200    /// True iff the observer has already emitted a stall event for the
201    /// current silence run. Cleared when a fresh beat arrives.
202    pub(crate) stall_emitted: bool,
203}
204
205impl Slot {}
206
207/// Result of [`Tracker::record`].
208#[derive(Clone, Copy, Debug, Eq, PartialEq)]
209pub enum Update {
210    /// The frame's pid was new and a fresh slot was allocated for it.
211    Inserted,
212    /// An existing slot was updated with the new nonce / timestamp / status.
213    Refreshed,
214    /// The frame's nonce was not strictly greater than the slot's last
215    /// observed nonce; the slot was left untouched.
216    OutOfOrder,
217    /// The tracker is full and the frame's pid is not yet known. The slot
218    /// table was not modified.
219    CapacityExceeded,
220    /// A beat arrived for a pid that is already tracked, but the beat's
221    /// transport origin disagrees with the origin pinned by the slot's
222    /// first beat. First-origin-wins: the slot is **not** mutated and the
223    /// beat is dropped. Prevents an attacker on an untrusted transport
224    /// from "tainting" a slot that legitimately belongs to a kernel-attested
225    /// agent (or vice-versa).
226    OriginConflict,
227    /// A beat arrived for a pid that is already tracked, but the beat's
228    /// kernel-attested PID-namespace inode disagrees with the inode pinned
229    /// by the slot's first beat (Linux only — see
230    /// [`crate::peer_cred::read_pid_namespace_inode`]). First-namespace-wins:
231    /// the slot is **not** mutated and the beat is dropped. Catches the
232    /// PID-collision case where two containers happen to share a numeric pid
233    /// value (e.g. PID 1 in container A vs PID 1 in container B); the
234    /// existing `frame.pid == peer_pid` gate at the observer fires first for
235    /// most cross-namespace traffic, but a same-pid-different-namespace
236    /// collision is invisible to that gate.
237    NamespaceConflict,
238}
239
240/// Bounded per-pid liveness ledger.
241///
242/// The slot table is a `Vec<Slot>` pre-allocated at construction to the
243/// configured capacity; subsequent inserts push into that pre-allocated
244/// space without reallocation.  Lookups use a fixed-size [`PidIndex`] for
245/// O(1) pid-to-index mapping — replaces the original `HashMap` so the hot
246/// path is WCET-bounded (deterministic hash, bounded probe budget, no
247/// rehashing on growth).
248pub struct Tracker {
249    entries: Vec<Slot>,
250    len: usize,
251    pid_to_index: PidIndex,
252    evictions: u64,
253    capacity_exceeded: u64,
254    nonce_wraps: u64,
255    last_evicted_pid: Option<u32>,
256    eviction_policy: EvictionPolicy,
257    /// Cached count of slots whose `stall_emitted` flag is currently set.
258    ///
259    /// Allows [`Tracker::find_evictable_slot`] to skip the strict scan
260    /// entirely when no slots have surfaced a stall yet — defangs the most
261    /// realistic DoS profile where an attacker fills the tracker faster
262    /// than the stall threshold can elapse.
263    stall_emitted_count: usize,
264    /// Maximum slots inspected per [`Tracker::scan_window`] call.
265    /// Configurable via `--eviction-scan-window`; defaults to
266    /// [`DEFAULT_EVICTION_SCAN_WINDOW`]. A full table sweep takes
267    /// `ceil(len / eviction_scan_window)` consecutive calls.
268    eviction_scan_window: usize,
269    /// Round-robin cursor into `entries` for the bounded eviction scan.
270    /// Persists across `find_evictable_slot` calls so a sequence of N
271    /// failed evictions covers the whole table in
272    /// `ceil(len / eviction_scan_window)` calls without ever scanning more
273    /// than `eviction_scan_window` slots in a single call.
274    eviction_scan_cursor: usize,
275    /// Number of times the bounded eviction scan reached its window cap
276    /// without finding a victim while the table was full. Surfaced via
277    /// [`Tracker::take_eviction_scan_truncated`] for Prometheus.
278    eviction_scan_truncated: u64,
279    /// Count of beats dropped because their transport origin disagreed with
280    /// the slot's pinned origin (first-origin-wins). Surfaced via
281    /// [`Tracker::take_origin_conflicts`] for Prometheus.
282    origin_conflicts: u64,
283    /// Count of beats dropped because their kernel-attested PID-namespace
284    /// inode disagreed with the slot's pinned namespace (first-namespace-wins).
285    /// Surfaced via [`Tracker::take_namespace_conflicts`] for Prometheus.
286    namespace_conflicts: u64,
287    /// Count of internal invariant violations encountered on the hot path —
288    /// e.g. a [`PidIndex`] entry pointed at a slot index outside `entries`,
289    /// or `find_evictable_slot` returned a stale index. Each violation is
290    /// recovered defensively (the operation behaves as a miss or as
291    /// [`Update::CapacityExceeded`]) rather than panicking. Surfaced via
292    /// [`Tracker::take_invariant_violations`] for Prometheus so operators
293    /// can alert on a non-zero value — in correctly-operating code this
294    /// counter stays at 0 forever.
295    invariant_violations: u64,
296}
297
298impl Default for Tracker {
299    fn default() -> Self {
300        Self::new(
301            DEFAULT_CAPACITY,
302            EvictionPolicy::Strict,
303            DEFAULT_EVICTION_SCAN_WINDOW,
304        )
305    }
306}
307
308impl Tracker {
309    /// Create an empty tracker with capacity for `capacity` pids.
310    ///
311    /// The slot table is pre-allocated to `capacity` entries; pushing
312    /// beyond that boundary yields [`Update::CapacityExceeded`] rather
313    /// than reallocating.
314    ///
315    /// `eviction_scan_window` caps the number of slots inspected per
316    /// eviction attempt. Values outside
317    /// `[MIN_EVICTION_SCAN_WINDOW, MAX_EVICTION_SCAN_WINDOW]` are clamped
318    /// as defense in depth; the config layer rejects out-of-range values
319    /// loudly at startup.
320    pub fn new(
321        capacity: usize,
322        eviction_policy: EvictionPolicy,
323        eviction_scan_window: usize,
324    ) -> Self {
325        let cap = capacity.min(MAX_CAPACITY);
326        let window = eviction_scan_window.clamp(MIN_EVICTION_SCAN_WINDOW, MAX_EVICTION_SCAN_WINDOW);
327        Tracker {
328            entries: Vec::with_capacity(cap),
329            len: 0,
330            pid_to_index: PidIndex::new(cap),
331            evictions: 0,
332            capacity_exceeded: 0,
333            nonce_wraps: 0,
334            last_evicted_pid: None,
335            eviction_policy,
336            stall_emitted_count: 0,
337            eviction_scan_window: window,
338            eviction_scan_cursor: 0,
339            eviction_scan_truncated: 0,
340            origin_conflicts: 0,
341            namespace_conflicts: 0,
342            invariant_violations: 0,
343        }
344    }
345
346    /// Record a frame against the tracker.
347    ///
348    /// Uses O(1) HashMap pid lookup to find the slot for `frame.pid`.
349    /// Returns [`Update::Inserted`] for a brand-new pid, [`Update::Refreshed`]
350    /// for an existing pid whose nonce moved forward, [`Update::OutOfOrder`]
351    /// if the nonce did not strictly increase, [`Update::CapacityExceeded`]
352    /// if the slot table is full (and no stale slot could be reclaimed) and
353    /// the pid is not yet tracked, or [`Update::OriginConflict`] if the
354    /// frame's transport origin disagrees with the slot's pinned origin.
355    ///
356    /// `origin` is the transport-class classification surfaced by the
357    /// receiving listener (`KernelAttested` for UDS, `NetworkUnverified` for
358    /// any UDP variant). The first beat for a pid pins the slot's origin;
359    /// subsequent beats from a different origin are dropped without
360    /// mutating the slot.
361    ///
362    /// `peer_pid_ns_inode` is the kernel-attested PID-namespace inode of the
363    /// sending process (Linux only; `None` on non-Linux or when
364    /// `/proc/<peer_pid>/ns/pid` was unreadable). The first beat pins the
365    /// slot's namespace inode; a later beat carrying a different `Some(_)`
366    /// inode for the same pid is rejected as [`Update::NamespaceConflict`].
367    /// A `None → Some(_)` upgrade is permitted (peer became readable after a
368    /// transient failure); a `Some(_) → None` regression is treated as a
369    /// conflict.
370    pub fn record(
371        &mut self,
372        frame: &Frame,
373        now_ns: u64,
374        threshold_ns: u64,
375        origin: BeatOrigin,
376        peer_pid_ns_inode: Option<u64>,
377    ) -> Update {
378        let status = frame.status;
379
380        if let Some(idx) = self.pid_to_index.get(frame.pid) {
381            // Defensive: the index promised this slot exists. If it doesn't,
382            // we treat the lookup as a miss and bump the invariant counter
383            // so ops can alert; the code then falls through to the insert
384            // path. Never panics.
385            let Some(slot) = self.entries.get_mut(idx) else {
386                self.invariant_violations = self.invariant_violations.saturating_add(1);
387                // Drop the stale index entry so the next lookup is a clean miss.
388                let _ = self.pid_to_index.remove(frame.pid);
389                self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
390                return Update::CapacityExceeded;
391            };
392            if slot.used {
393                if slot.origin != origin {
394                    self.origin_conflicts = self.origin_conflicts.saturating_add(1);
395                    return Update::OriginConflict;
396                }
397                // First-namespace-wins. Same precedence as origin: an actively
398                // disagreeing inode is a conflict; a `None → Some` upgrade
399                // pins the now-known namespace and falls through to refresh;
400                // both-`None` is the non-Linux / unreadable case and is a
401                // no-op.
402                match (slot.pid_ns_inode, peer_pid_ns_inode) {
403                    (Some(a), Some(b)) if a != b => {
404                        self.namespace_conflicts = self.namespace_conflicts.saturating_add(1);
405                        return Update::NamespaceConflict;
406                    }
407                    (Some(_), None) => {
408                        // Regression — pinned-then-lost is a tampering signal.
409                        self.namespace_conflicts = self.namespace_conflicts.saturating_add(1);
410                        return Update::NamespaceConflict;
411                    }
412                    (None, Some(_)) => {
413                        // Forgiving upgrade — fill in the previously-unknown
414                        // inode in place and continue with refresh.
415                        slot.pid_ns_inode = peer_pid_ns_inode;
416                    }
417                    _ => {}
418                }
419                if frame.nonce <= slot.last_nonce {
420                    // Detect nonce wrap: agent exhausted u64 nonce space
421                    // and looped to 0.  last_nonce is near u64::MAX and
422                    // the incoming nonce is near 0 — a gap this large
423                    // cannot be a genuine out-of-order beat.
424                    let wrap_lo = NONCE_WRAP_THRESHOLD;
425                    let wrap_hi = u64::MAX.saturating_sub(NONCE_WRAP_THRESHOLD);
426                    if slot.last_nonce >= wrap_hi && frame.nonce < wrap_lo {
427                        slot.last_nonce = frame.nonce;
428                        slot.last_ns = now_ns;
429                        slot.status = status;
430                        if slot.stall_emitted {
431                            slot.stall_emitted = false;
432                            self.stall_emitted_count = self.stall_emitted_count.saturating_sub(1);
433                        }
434                        self.nonce_wraps = self.nonce_wraps.saturating_add(1);
435                        return Update::Refreshed;
436                    }
437                    return Update::OutOfOrder;
438                }
439                slot.last_nonce = frame.nonce;
440                slot.last_ns = now_ns;
441                slot.status = status;
442                if slot.stall_emitted {
443                    slot.stall_emitted = false;
444                    self.stall_emitted_count = self.stall_emitted_count.saturating_sub(1);
445                }
446                return Update::Refreshed;
447            }
448        }
449
450        if self.len >= self.entries.capacity() {
451            if let Some(evict_idx) = self.find_evictable_slot(now_ns, threshold_ns) {
452                // Snapshot the slot we're evicting. If `find_evictable_slot`
453                // ever returned an OOB index (invariant break), defensively
454                // surface CapacityExceeded instead of panicking.
455                let Some(&evicted_slot) = self.entries.get(evict_idx) else {
456                    self.invariant_violations = self.invariant_violations.saturating_add(1);
457                    self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
458                    return Update::CapacityExceeded;
459                };
460                let _ = self.pid_to_index.remove(evicted_slot.pid);
461                let Some(slot_mut) = self.entries.get_mut(evict_idx) else {
462                    self.invariant_violations = self.invariant_violations.saturating_add(1);
463                    self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
464                    return Update::CapacityExceeded;
465                };
466                *slot_mut = Slot {
467                    pid: frame.pid,
468                    last_nonce: frame.nonce,
469                    last_ns: now_ns,
470                    status,
471                    origin,
472                    pid_ns_inode: peer_pid_ns_inode,
473                    used: true,
474                    stall_emitted: false,
475                };
476                if self.pid_to_index.insert(frame.pid, evict_idx).is_err() {
477                    // Probe budget exhausted — roll back the slot write so
478                    // the table stays internally consistent and surface
479                    // CapacityExceeded to the caller. The `stall_emitted_count`
480                    // decrement is deferred to the commit point below, so no
481                    // rollback of the counter is needed here.
482                    if let Some(slot_mut) = self.entries.get_mut(evict_idx) {
483                        *slot_mut = evicted_slot;
484                    }
485                    // Best-effort re-pin of the old pid; if even this insert
486                    // fails the slot is logically vacant for the next call.
487                    let _ = self.pid_to_index.insert(evicted_slot.pid, evict_idx);
488                    self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
489                    return Update::CapacityExceeded;
490                }
491                // Commit-on-success: `stall_emitted_count` is decremented only
492                // after the new pid is pinned in the index. If the index insert
493                // had failed above, the slot rollback would have restored the
494                // old `stall_emitted = true` flag — decrementing the counter
495                // before the insert (the pre-commit-on-success layout) caused
496                // an `observed > tracked` divergence, surfaced by the
497                // `tracker_record` fuzz target. Pattern mirrors cerebrum
498                // 2026-05-15 (AEAD nonce state mutation).
499                if evicted_slot.stall_emitted {
500                    self.stall_emitted_count = self.stall_emitted_count.saturating_sub(1);
501                }
502                self.evictions = self.evictions.saturating_add(1);
503                self.last_evicted_pid = Some(evicted_slot.pid);
504                return Update::Inserted;
505            }
506            self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
507            return Update::CapacityExceeded;
508        }
509        let idx = self.len;
510        // Reserve the index in the pid map *before* pushing — on probe
511        // exhaustion we surface CapacityExceeded and leave entries unchanged.
512        if self.pid_to_index.insert(frame.pid, idx).is_err() {
513            self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
514            return Update::CapacityExceeded;
515        }
516        self.entries.push(Slot {
517            pid: frame.pid,
518            last_nonce: frame.nonce,
519            last_ns: now_ns,
520            status,
521            origin,
522            pid_ns_inode: peer_pid_ns_inode,
523            used: true,
524            stall_emitted: false,
525        });
526        self.len += 1;
527        Update::Inserted
528    }
529
530    /// Find a slot that can be evicted to make room for a new pid.
531    ///
532    /// A slot is evictable when both conditions hold:
533    /// 1. The observer has already surfaced a stall event for this pid
534    ///    (`stall_emitted == true`).
535    /// 2. Silence duration exceeds `threshold_ns * EVICTION_MULTIPLIER`.
536    ///
537    /// **Bounded-work guarantee.** The scan visits at most
538    /// [`EVICTION_SCAN_WINDOW`] slots per call, starting at
539    /// `self.eviction_scan_cursor` and wrapping mod `self.len`. The cursor
540    /// is advanced regardless of outcome so back-to-back failed evictions
541    /// eventually cover the whole table without ever performing more than
542    /// `WINDOW` slot reads in a single call. This trades strict
543    /// global-oldest LRU for an O(1) per-frame upper bound — the right
544    /// tradeoff under capacity pressure, because every slot satisfying the
545    /// threshold criterion is by definition a safe victim.
546    ///
547    /// **Fast-bail for Strict policy.** When no slots have surfaced a stall
548    /// yet (`stall_emitted_count == 0`), the strict pass is skipped
549    /// entirely. This is the common DoS profile: an attacker can fill the
550    /// tracker faster than the threshold can elapse, so no slot has a
551    /// `stall_emitted` flag set, and the previous code wasted O(n) work
552    /// looking for one anyway.
553    ///
554    /// When the policy is [`EvictionPolicy::Balanced`] and no
555    /// strictly-evictable slot is found in the window, a second windowed
556    /// pass picks the first slot whose silence exceeds the threshold
557    /// (disregarding `stall_emitted`). This prevents capacity-exhaustion
558    /// attacks at the cost of possibly evicting a slow-but-alive agent.
559    fn find_evictable_slot(&mut self, now_ns: u64, threshold_ns: u64) -> Option<usize> {
560        let evict_threshold = threshold_ns.saturating_mul(EVICTION_MULTIPLIER as u64);
561
562        // Strict pass — cheap bail when no slots have stalled yet.
563        if self.stall_emitted_count > 0 {
564            if let Some(idx) = self.scan_window(now_ns, evict_threshold, true) {
565                return Some(idx);
566            }
567        }
568        if self.eviction_policy == EvictionPolicy::Balanced {
569            if let Some(idx) = self.scan_window(now_ns, evict_threshold, false) {
570                return Some(idx);
571            }
572        }
573        self.eviction_scan_truncated = self.eviction_scan_truncated.saturating_add(1);
574        None
575    }
576
577    /// Bounded windowed scan helper for [`Tracker::find_evictable_slot`].
578    ///
579    /// Examines at most [`EVICTION_SCAN_WINDOW`] slots starting at
580    /// `eviction_scan_cursor` (mod `self.len`). Returns the index of the
581    /// first slot whose silence exceeds `evict_threshold` and, if
582    /// `require_stall`, whose `stall_emitted` flag is set. The cursor is
583    /// advanced past the inspected window (or just past the hit) so
584    /// subsequent calls progress around the ring.
585    fn scan_window(
586        &mut self,
587        now_ns: u64,
588        evict_threshold: u64,
589        require_stall: bool,
590    ) -> Option<usize> {
591        let n = self.len.min(self.entries.len());
592        if n == 0 {
593            return None;
594        }
595        let window = self.eviction_scan_window.min(n);
596        let start = self.eviction_scan_cursor % n;
597        for i in 0..window {
598            let idx = (start + i) % n;
599            // Defensive: if `n` ever exceeded `entries.len()` this would
600            // be unreachable under invariant `n = len.min(entries.len())`,
601            // but treat OOB as "skip" rather than panic.
602            let Some(slot) = self.entries.get(idx) else {
603                self.invariant_violations = self.invariant_violations.saturating_add(1);
604                continue;
605            };
606            let stale = now_ns.saturating_sub(slot.last_ns) > evict_threshold;
607            let qualifies = stale && (!require_stall || slot.stall_emitted);
608            if qualifies {
609                self.eviction_scan_cursor = (idx + 1) % n;
610                return Some(idx);
611            }
612        }
613        self.eviction_scan_cursor = (start + window) % n;
614        None
615    }
616
617    /// Take and reset the eviction counter. Returns the number of slots
618    /// reclaimed since the last call.
619    pub fn take_evictions(&mut self) -> u64 {
620        let count = self.evictions;
621        self.evictions = 0;
622        count
623    }
624
625    /// Return the pid of the most recently evicted slot, if any slots
626    /// have been evicted since the last call.
627    pub fn take_evicted_pid(&mut self) -> Option<u32> {
628        self.last_evicted_pid.take()
629    }
630
631    /// Take and reset the nonce-wrap counter. Returns the number of
632    /// nonce-space wraps detected since the last call.
633    pub fn take_nonce_wraps(&mut self) -> u64 {
634        let count = self.nonce_wraps;
635        self.nonce_wraps = 0;
636        count
637    }
638
639    /// Take and reset the capacity-exceeded counter. Returns the number of
640    /// beats dropped due to a full tracker since the last call.
641    pub fn take_capacity_exceeded(&mut self) -> u64 {
642        let count = self.capacity_exceeded;
643        self.capacity_exceeded = 0;
644        count
645    }
646
647    /// Number of pids currently tracked.
648    pub fn len(&self) -> usize {
649        self.len
650    }
651
652    /// Return the `last_ns` timestamp for a tracked pid, if present.
653    /// Used by the observer for per-pid rate limiting without exposing
654    /// internal slot layout.
655    pub fn last_ns_of(&self, pid: u32) -> Option<u64> {
656        self.pid_to_index
657            .get(pid)
658            .and_then(|idx| self.entries.get(idx).map(|s| s.last_ns))
659    }
660
661    /// Return the pinned transport origin of a tracked pid, if present.
662    /// Used by the observer to populate `Event::OriginConflict::slot_origin`
663    /// before calling `record` (which may produce the conflict).
664    pub fn origin_of(&self, pid: u32) -> Option<BeatOrigin> {
665        self.pid_to_index
666            .get(pid)
667            .and_then(|idx| self.entries.get(idx))
668            .filter(|s| s.used)
669            .map(|s| s.origin)
670    }
671
672    /// Return the pinned PID-namespace inode of a tracked pid, if present.
673    ///
674    /// The outer `Option` is `Some` when the pid is tracked at all; the inner
675    /// `Option` is the inode (or `None` for non-Linux / unreadable). Used by
676    /// the observer to populate `Event::NamespaceConflict::slot_ns_inode`
677    /// without an extra slot lookup.
678    pub fn pid_ns_inode_of(&self, pid: u32) -> Option<Option<u64>> {
679        self.pid_to_index
680            .get(pid)
681            .and_then(|idx| self.entries.get(idx))
682            .filter(|s| s.used)
683            .map(|s| s.pid_ns_inode)
684    }
685
686    /// True iff no pids are tracked.
687    pub fn is_empty(&self) -> bool {
688        self.len == 0
689    }
690
691    /// Find newly-stalled slots and mark them emitted in one atomic pass.
692    ///
693    /// A slot is "newly stalled" when its silence duration exceeds
694    /// `threshold_ns` **and** the observer has not yet surfaced a stall
695    /// event for the current silence run (`stall_emitted == false`).
696    /// Qualifying slots are marked `stall_emitted = true` and the callback
697    /// is invoked with `(pid, last_nonce, last_ns, origin, pid_ns_inode)` —
698    /// all within the same mutable borrow, closing the TOCTOU window that
699    /// existed between the former `iter_stalled` / `mark_stall_emitted` pair.
700    pub fn drain_stalled_slots(
701        &mut self,
702        now_ns: u64,
703        threshold_ns: u64,
704        mut cb: impl FnMut(u32, u64, u64, BeatOrigin, Option<u64>),
705    ) {
706        // Clamp the slice to actual `entries` length so the slice
707        // expression cannot panic even if `len` somehow exceeded it
708        // (invariant violation — counted, never panicked on).
709        let upper = self.len.min(self.entries.len());
710        if upper < self.len {
711            self.invariant_violations = self.invariant_violations.saturating_add(1);
712        }
713        if let Some(slice) = self.entries.get_mut(..upper) {
714            for slot in slice {
715                if !slot.used || slot.stall_emitted {
716                    continue;
717                }
718                if now_ns.saturating_sub(slot.last_ns) >= threshold_ns {
719                    slot.stall_emitted = true;
720                    self.stall_emitted_count = self.stall_emitted_count.saturating_add(1);
721                    cb(
722                        slot.pid,
723                        slot.last_nonce,
724                        slot.last_ns,
725                        slot.origin,
726                        slot.pid_ns_inode,
727                    );
728                }
729            }
730        }
731        #[cfg(debug_assertions)]
732        self.debug_assert_stall_count();
733    }
734
735    /// Take and reset the origin-conflict counter.
736    ///
737    /// Surfaced as `varta_origin_conflict_total` by the Prometheus exporter;
738    /// non-zero values indicate that beats for a tracked pid arrived from a
739    /// transport other than the one that first claimed the pid — either a
740    /// misconfigured agent or an active spoofing attempt.
741    pub fn take_origin_conflicts(&mut self) -> u64 {
742        let count = self.origin_conflicts;
743        self.origin_conflicts = 0;
744        count
745    }
746
747    /// Take and reset the namespace-conflict counter.
748    ///
749    /// Surfaced as `varta_tracker_namespace_conflict_total` by the Prometheus
750    /// exporter; non-zero values mean beats for a tracked pid arrived from a
751    /// different PID namespace than the one pinned by the slot's first beat.
752    /// Linux-only signal; on non-Linux platforms this counter stays at 0.
753    pub fn take_namespace_conflicts(&mut self) -> u64 {
754        let count = self.namespace_conflicts;
755        self.namespace_conflicts = 0;
756        count
757    }
758
759    /// Take and reset the bounded-window truncated-scan counter.
760    ///
761    /// Surfaced as `varta_tracker_eviction_scan_truncated_total` by the
762    /// Prometheus exporter; non-zero values prove the window cap actually
763    /// engaged (i.e. the table was full and no victim was found within
764    /// `EVICTION_SCAN_WINDOW` slots).
765    pub fn take_eviction_scan_truncated(&mut self) -> u64 {
766        let count = self.eviction_scan_truncated;
767        self.eviction_scan_truncated = 0;
768        count
769    }
770
771    /// Take and reset the invariant-violation counter.
772    ///
773    /// Surfaced as `varta_tracker_invariant_violations_total` by the
774    /// Prometheus exporter. In correctly-operating code this counter stays
775    /// at 0 forever — non-zero values mean one of the defensive `.get()`
776    /// fall-throughs in the hot path triggered (e.g. a stale `PidIndex`
777    /// entry pointed at an out-of-range slot). The tracker recovers
778    /// without panicking; ops should still treat any non-zero value as a
779    /// bug worth investigating.
780    pub fn take_invariant_violations(&mut self) -> u64 {
781        let count = self.invariant_violations;
782        self.invariant_violations = 0;
783        count
784    }
785
786    /// Take and reset the [`PidIndex`] probe-exhaustion counter.
787    ///
788    /// Surfaced as `varta_tracker_pid_index_probe_exhausted_total` by the
789    /// Prometheus exporter. Non-zero values mean a pid lookup walked
790    /// [`PidIndex::MAX_PROBE`] slots without resolving — at load factor
791    /// ≤ 0.5 this is effectively unreachable, so any non-zero value is a
792    /// red flag (pathological pid distribution, or an attempt to fill the
793    /// index past its safe load factor).
794    pub fn take_probe_exhausted(&mut self) -> u64 {
795        self.pid_to_index.take_probe_exhausted()
796    }
797
798    /// Recompute `stall_emitted_count` from scratch and assert it matches
799    /// the maintained counter. Cheap (single linear pass over `len` slots),
800    /// gated to debug builds to keep the release-mode hot path untouched.
801    #[cfg(debug_assertions)]
802    fn debug_assert_stall_count(&self) {
803        let upper = self.len.min(self.entries.len());
804        let observed = self
805            .entries
806            .get(..upper)
807            .unwrap_or(&[])
808            .iter()
809            .filter(|s| s.stall_emitted)
810            .count();
811        debug_assert_eq!(
812            observed, self.stall_emitted_count,
813            "stall_emitted_count out of sync: observed {}, tracked {}",
814            observed, self.stall_emitted_count
815        );
816    }
817}
818
819#[cfg(test)]
820mod tests {
821    use super::*;
822    use varta_vlp::Frame;
823
824    fn frame(pid: u32, nonce: u64) -> Frame {
825        Frame::new(Status::Ok, pid, nonce, nonce, 0)
826    }
827
828    /// Default origin used by tests that don't exercise transport-origin
829    /// behaviour. Picked as `KernelAttested` so existing tests continue to
830    /// represent the common UDS path.
831    const ORIGIN: BeatOrigin = BeatOrigin::KernelAttested;
832
833    /// Fill capacity entirely; never trigger a stall. find_evictable_slot
834    /// must return None without scanning any slot (Strict policy).
835    #[test]
836    fn find_evictable_slot_returns_none_when_no_stalls_emitted() {
837        let cap = 64;
838        let mut t = Tracker::new(cap, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
839        let threshold_ns = 1_000;
840        // Fill at t=0 so silence isn't a factor either.
841        for pid in 1u32..=(cap as u32) {
842            assert_eq!(
843                t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
844                Update::Inserted
845            );
846        }
847        assert_eq!(t.len(), cap);
848        assert_eq!(t.stall_emitted_count, 0);
849
850        // Even at very large "now_ns" (silence >> 10× threshold), Strict
851        // policy must bail without scanning: no slot has stall_emitted=true.
852        let now_ns = threshold_ns * 100;
853        let result = t.record(&frame(99_999, 1), now_ns, threshold_ns, ORIGIN, None);
854        assert_eq!(result, Update::CapacityExceeded);
855        // Cursor must NOT have advanced through the table (fast-bail path).
856        assert_eq!(t.eviction_scan_cursor, 0);
857    }
858
859    /// drain_stalled_slots marks slots; counter must reflect that, and the
860    /// next find_evictable_slot must actually scan and (eventually) succeed.
861    #[test]
862    fn stall_counter_enables_eviction_after_drain() {
863        let cap = 8;
864        let mut t = Tracker::new(cap, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
865        let threshold_ns = 100;
866
867        for pid in 1u32..=(cap as u32) {
868            assert_eq!(
869                t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
870                Update::Inserted
871            );
872        }
873        // Time advances past threshold — every slot stalls.
874        let now_ns = threshold_ns * 20;
875        let mut stalled = 0u32;
876        t.drain_stalled_slots(now_ns, threshold_ns, |_, _, _, _, _| stalled += 1);
877        assert_eq!(stalled, cap as u32);
878        assert_eq!(t.stall_emitted_count, cap);
879
880        // Silence now exceeds 10× threshold → eviction succeeds.
881        let result = t.record(&frame(9_999, 1), now_ns, threshold_ns, ORIGIN, None);
882        assert_eq!(result, Update::Inserted);
883        // The replacing slot is fresh — stall counter decremented once.
884        assert_eq!(t.stall_emitted_count, cap - 1);
885    }
886
887    /// A fresh beat on a previously-stalled slot must decrement the counter.
888    #[test]
889    fn stall_counter_decrements_on_refresh() {
890        let mut t = Tracker::new(4, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
891        let threshold_ns = 100;
892        assert_eq!(
893            t.record(&frame(1, 1), 0, threshold_ns, ORIGIN, None),
894            Update::Inserted
895        );
896        t.drain_stalled_slots(threshold_ns * 2, threshold_ns, |_, _, _, _, _| {});
897        assert_eq!(t.stall_emitted_count, 1);
898
899        // New beat with strictly increasing nonce → refresh and clear flag.
900        assert_eq!(
901            t.record(&frame(1, 2), threshold_ns * 3, threshold_ns, ORIGIN, None),
902            Update::Refreshed
903        );
904        assert_eq!(t.stall_emitted_count, 0);
905    }
906
907    /// The bounded scan window must cap per-call work. Fill 4096 slots
908    /// at t=0, stall them all, then verify each find_evictable_slot call
909    /// advances the cursor by at most the configured window.
910    #[test]
911    fn find_evictable_slot_scan_is_bounded_to_window() {
912        let cap = MAX_CAPACITY;
913        let mut t = Tracker::new(cap, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
914        let threshold_ns = 100;
915        for pid in 1u32..=(cap as u32) {
916            assert_eq!(
917                t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
918                Update::Inserted
919            );
920        }
921        // Stall everything.
922        let now_ns = threshold_ns * 20;
923        t.drain_stalled_slots(now_ns, threshold_ns, |_, _, _, _, _| {});
924        assert_eq!(t.stall_emitted_count, cap);
925
926        // Each new-pid insert evicts one slot. Cursor must advance by ≤ window.
927        let window = t.eviction_scan_window;
928        let start_cursor = t.eviction_scan_cursor;
929        let _ = t.record(&frame(50_001, 1), now_ns, threshold_ns, ORIGIN, None);
930        let advanced = t.eviction_scan_cursor.wrapping_sub(start_cursor) % cap;
931        assert!(
932            advanced <= window,
933            "cursor advanced by {advanced}, expected ≤ {window}"
934        );
935    }
936
937    /// A Tracker constructed with a small eviction_scan_window must honour
938    /// that window, not the default.
939    #[test]
940    fn eviction_scan_window_is_plumbed_through() {
941        let cap = 16;
942        let window = 4;
943        let mut t = Tracker::new(cap, EvictionPolicy::Strict, window);
944        assert_eq!(t.eviction_scan_window, window);
945        let threshold_ns = 100;
946        for pid in 1u32..=(cap as u32) {
947            assert_eq!(
948                t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
949                Update::Inserted
950            );
951        }
952        // Stall everything so every slot is eviction-eligible.
953        let now_ns = threshold_ns * 20;
954        t.drain_stalled_slots(now_ns, threshold_ns, |_, _, _, _, _| {});
955        assert_eq!(t.stall_emitted_count, cap);
956        // Force an eviction attempt and confirm the cursor advanced by ≤ window.
957        let start = t.eviction_scan_cursor;
958        let _ = t.record(&frame(9_999, 1), now_ns, threshold_ns, ORIGIN, None);
959        let advanced = t.eviction_scan_cursor.wrapping_sub(start) % cap;
960        assert!(
961            advanced <= window,
962            "cursor advanced {advanced}, expected ≤ {window} (configured window)"
963        );
964    }
965
966    /// Cursor must wrap past `len` correctly so a long sequence of failed
967    /// evictions doesn't go out of bounds.
968    #[test]
969    fn scan_window_cursor_wraps_correctly() {
970        let cap = 4;
971        let mut t = Tracker::new(cap, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
972        let threshold_ns = 100;
973        for pid in 1u32..=(cap as u32) {
974            assert_eq!(
975                t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
976                Update::Inserted
977            );
978        }
979        // Force the cursor to advance past `len` by calling scan_window
980        // many times with no qualifying slots (threshold not exceeded).
981        for _ in 0..10 {
982            let _ = t.scan_window(50, 1_000_000, true);
983        }
984        assert!(t.eviction_scan_cursor < cap);
985    }
986
987    /// Stress: random sequence of record / drain_stalled / time advances.
988    /// debug_assert_stall_count fires inside drain_stalled_slots after every
989    /// call, so this test exercises the invariant.
990    #[test]
991    fn stall_emitted_count_invariant_holds_across_random_ops() {
992        let mut t = Tracker::new(32, EvictionPolicy::Balanced, DEFAULT_EVICTION_SCAN_WINDOW);
993        let threshold_ns = 100;
994        let mut now_ns: u64 = 0;
995        // Simple deterministic PRNG (xorshift64) — no rand dep.
996        let mut s: u64 = 0xC0FFEE;
997        let mut next = || {
998            s ^= s << 13;
999            s ^= s >> 7;
1000            s ^= s << 17;
1001            s
1002        };
1003        for _ in 0..2000 {
1004            let r = next() % 4;
1005            now_ns = now_ns.saturating_add(20);
1006            match r {
1007                0 => {
1008                    let pid = (next() % 64) as u32 + 1;
1009                    let _ = t.record(&frame(pid, now_ns), now_ns, threshold_ns, ORIGIN, None);
1010                }
1011                1 => {
1012                    // Advance and drain (may flip flags to true).
1013                    now_ns = now_ns.saturating_add(threshold_ns * 2);
1014                    t.drain_stalled_slots(now_ns, threshold_ns, |_, _, _, _, _| {});
1015                }
1016                _ => {
1017                    // No-op — let other ops dominate.
1018                }
1019            }
1020        }
1021        // Final consistency check (also runs implicitly in drain).
1022        let observed = t.entries[..t.len]
1023            .iter()
1024            .filter(|s| s.stall_emitted)
1025            .count();
1026        assert_eq!(observed, t.stall_emitted_count);
1027    }
1028
1029    /// Acceptance check: scan-truncated counter increments only when we
1030    /// run the full window without finding a victim.
1031    #[test]
1032    fn scan_truncated_counter_increments_on_dry_scan() {
1033        let mut t = Tracker::new(32, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1034        let threshold_ns = 100;
1035        for pid in 1u32..=32 {
1036            assert_eq!(
1037                t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
1038                Update::Inserted
1039            );
1040        }
1041        // Table full, no stalls emitted → strict bails, balanced not used →
1042        // counter still increments since we returned None at capacity.
1043        let _ = t.record(
1044            &frame(99_999, 1),
1045            threshold_ns * 100,
1046            threshold_ns,
1047            ORIGIN,
1048            None,
1049        );
1050        assert_eq!(t.take_eviction_scan_truncated(), 1);
1051        // Take resets.
1052        assert_eq!(t.take_eviction_scan_truncated(), 0);
1053    }
1054
1055    /// First-origin-wins: once a slot is pinned to an origin, a beat with a
1056    /// different origin is dropped as `OriginConflict` without mutating the
1057    /// slot or incrementing the slot's `last_ns`.
1058    #[test]
1059    fn origin_conflict_first_origin_wins() {
1060        let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1061        let threshold_ns = 100;
1062
1063        // Beat 1 arrives via UDS (kernel-attested) and pins the slot.
1064        assert_eq!(
1065            t.record(
1066                &frame(7, 1),
1067                10,
1068                threshold_ns,
1069                BeatOrigin::KernelAttested,
1070                None
1071            ),
1072            Update::Inserted
1073        );
1074
1075        // Beat 2 arrives via UDP with the same pid — must be rejected.
1076        assert_eq!(
1077            t.record(
1078                &frame(7, 2),
1079                20,
1080                threshold_ns,
1081                BeatOrigin::NetworkUnverified,
1082                None,
1083            ),
1084            Update::OriginConflict
1085        );
1086
1087        // Slot is untouched: nonce still 1, last_ns still 10, origin still UDS.
1088        assert_eq!(t.last_ns_of(7), Some(10));
1089        assert_eq!(t.entries[0].last_nonce, 1);
1090        assert_eq!(t.entries[0].origin, BeatOrigin::KernelAttested);
1091
1092        // Counter reflects the dropped beat.
1093        assert_eq!(t.take_origin_conflicts(), 1);
1094        assert_eq!(t.take_origin_conflicts(), 0);
1095
1096        // Same-origin follow-up still works.
1097        assert_eq!(
1098            t.record(
1099                &frame(7, 3),
1100                30,
1101                threshold_ns,
1102                BeatOrigin::KernelAttested,
1103                None
1104            ),
1105            Update::Refreshed
1106        );
1107    }
1108
1109    // ---------------------- PidIndex unit tests ----------------------
1110
1111    #[test]
1112    fn pid_index_insert_get_remove_roundtrip() {
1113        let mut idx = PidIndex::new(16);
1114        assert_eq!(idx.get(42), None);
1115        idx.insert(42, 7).expect("insert");
1116        assert_eq!(idx.get(42), Some(7));
1117
1118        // Update in place preserves occupied count.
1119        idx.insert(42, 9).expect("update");
1120        assert_eq!(idx.get(42), Some(9));
1121        assert_eq!(idx.len(), 1);
1122
1123        assert_eq!(idx.remove(42), Some(9));
1124        assert_eq!(idx.get(42), None);
1125        assert_eq!(idx.len(), 0);
1126    }
1127
1128    #[test]
1129    fn pid_index_tombstone_reuse() {
1130        // Insert N pids, remove half, re-insert: lookups must still work
1131        // even though the removed slots left tombstones along the probe
1132        // sequences.
1133        let mut idx = PidIndex::new(64);
1134        for pid in 1u32..=32 {
1135            idx.insert(pid, pid as usize).expect("insert");
1136        }
1137        for pid in 1u32..=16 {
1138            assert_eq!(idx.remove(pid), Some(pid as usize));
1139        }
1140        // The remaining 16 are still findable.
1141        for pid in 17u32..=32 {
1142            assert_eq!(idx.get(pid), Some(pid as usize));
1143        }
1144        // Re-insert the removed ones; tombstones must be reused (table is
1145        // small enough that probe walks could otherwise overflow).
1146        for pid in 1u32..=16 {
1147            idx.insert(pid, (pid + 100) as usize).expect("reinsert");
1148        }
1149        for pid in 1u32..=16 {
1150            assert_eq!(idx.get(pid), Some((pid + 100) as usize));
1151        }
1152        for pid in 17u32..=32 {
1153            assert_eq!(idx.get(pid), Some(pid as usize));
1154        }
1155    }
1156
1157    #[test]
1158    fn pid_index_probe_exhaustion_returns_error() {
1159        // Build a tiny table where MAX_PROBE is large enough to find slots
1160        // through linear probing under normal use, then deliberately fill
1161        // every slot to force exhaustion of the probe budget on insert.
1162        // Table size = next_power_of_two(4 * 2) = 8 slots.
1163        let mut idx = PidIndex::new(4);
1164        // Insert MAX_PROBE-many pids that all hash to the same bucket would
1165        // be impossible with a deterministic mix; instead we fill the
1166        // *whole* table so any new pid hashing into a fully-occupied chain
1167        // exhausts the budget.
1168        for pid in 1u32..=8 {
1169            idx.insert(pid, pid as usize).expect("fill");
1170        }
1171        // Now every slot is occupied (no EMPTY anywhere). Any new pid must
1172        // walk the full MAX_PROBE without finding an EMPTY slot.
1173        let err = idx.insert(9999, 0).expect_err("must exhaust");
1174        assert_eq!(err, ProbeExhausted);
1175        assert_eq!(idx.take_probe_exhausted(), 1);
1176        assert_eq!(idx.take_probe_exhausted(), 0);
1177    }
1178
1179    #[test]
1180    fn record_probe_exhaustion_surfaces_capacity_exceeded() {
1181        // PidIndex table size = next_power_of_two(cap * 2). At cap = 4 the
1182        // table has 8 slots. Filling the *entry* table at cap leaves 4
1183        // PidIndex slots occupied (half full), so we never exhaust the
1184        // probe budget through ordinary inserts. To force exhaustion we
1185        // need the index itself to be saturated — which only happens if
1186        // someone constructs a Tracker with capacity ≥ table_size. For
1187        // safety we verify the rollback path: a forced-error scenario is
1188        // not realistically reachable through normal API use, so we instead
1189        // assert that under heavy churn the counter stays at 0.
1190        let mut t = Tracker::new(32, EvictionPolicy::Balanced, DEFAULT_EVICTION_SCAN_WINDOW);
1191        let threshold_ns = 100;
1192        let mut now = 0u64;
1193        for pid in 1u32..=4096 {
1194            now = now.saturating_add(1);
1195            let _ = t.record(&frame(pid, 1), now, threshold_ns, ORIGIN, None);
1196        }
1197        // Under nominal use probe exhaustion is unreachable at load ≤ 0.5.
1198        assert_eq!(t.take_probe_exhausted(), 0);
1199    }
1200
1201    #[test]
1202    fn invariant_violations_stays_zero_under_random_ops() {
1203        // Mirrors `stall_emitted_count_invariant_holds_across_random_ops`
1204        // but asserts the new invariant_violations counter never ticks.
1205        let mut t = Tracker::new(32, EvictionPolicy::Balanced, DEFAULT_EVICTION_SCAN_WINDOW);
1206        let threshold_ns = 100;
1207        let mut now_ns: u64 = 0;
1208        let mut s: u64 = 0xDEADBEEF;
1209        let mut next = || {
1210            s ^= s << 13;
1211            s ^= s >> 7;
1212            s ^= s << 17;
1213            s
1214        };
1215        for _ in 0..4000 {
1216            let r = next() % 4;
1217            now_ns = now_ns.saturating_add(20);
1218            match r {
1219                0 => {
1220                    let pid = (next() % 96) as u32 + 1;
1221                    let _ = t.record(&frame(pid, now_ns), now_ns, threshold_ns, ORIGIN, None);
1222                }
1223                1 => {
1224                    now_ns = now_ns.saturating_add(threshold_ns * 2);
1225                    t.drain_stalled_slots(now_ns, threshold_ns, |_, _, _, _, _| {});
1226                }
1227                2 => {
1228                    let pid = (next() % 96) as u32 + 1;
1229                    let _ = t.last_ns_of(pid);
1230                    let _ = t.origin_of(pid);
1231                }
1232                _ => {}
1233            }
1234        }
1235        assert_eq!(t.take_invariant_violations(), 0);
1236        assert_eq!(t.take_probe_exhausted(), 0);
1237    }
1238
1239    /// drain_stalled_slots propagates each slot's pinned origin to the
1240    /// callback so downstream consumers (Recovery) can gate on transport
1241    /// trust.
1242    #[test]
1243    fn drain_stalled_slots_emits_pinned_origin() {
1244        let mut t = Tracker::new(4, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1245        let threshold_ns = 100;
1246
1247        assert_eq!(
1248            t.record(
1249                &frame(11, 1),
1250                0,
1251                threshold_ns,
1252                BeatOrigin::KernelAttested,
1253                None
1254            ),
1255            Update::Inserted
1256        );
1257        assert_eq!(
1258            t.record(
1259                &frame(22, 1),
1260                0,
1261                threshold_ns,
1262                BeatOrigin::NetworkUnverified,
1263                None,
1264            ),
1265            Update::Inserted
1266        );
1267
1268        let mut seen: Vec<(u32, BeatOrigin)> = Vec::new();
1269        t.drain_stalled_slots(threshold_ns * 2, threshold_ns, |pid, _, _, origin, _| {
1270            seen.push((pid, origin));
1271        });
1272        seen.sort_by_key(|(p, _)| *p);
1273        assert_eq!(
1274            seen,
1275            vec![
1276                (11, BeatOrigin::KernelAttested),
1277                (22, BeatOrigin::NetworkUnverified),
1278            ]
1279        );
1280    }
1281
1282    // ---------------------- PID-namespace gate tests ----------------------
1283
1284    /// First-namespace-wins: a beat with a different `Some(_)` inode for an
1285    /// already-tracked pid is rejected as `NamespaceConflict`.
1286    #[test]
1287    fn namespace_conflict_blocks_rebind() {
1288        let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1289        let threshold_ns = 100;
1290        assert_eq!(
1291            t.record(
1292                &frame(7, 1),
1293                0,
1294                threshold_ns,
1295                BeatOrigin::KernelAttested,
1296                Some(4026531836),
1297            ),
1298            Update::Inserted
1299        );
1300        let r = t.record(
1301            &frame(7, 2),
1302            10,
1303            threshold_ns,
1304            BeatOrigin::KernelAttested,
1305            Some(4026531840),
1306        );
1307        assert_eq!(r, Update::NamespaceConflict);
1308        // Slot is untouched.
1309        assert_eq!(t.pid_ns_inode_of(7), Some(Some(4026531836)));
1310        assert_eq!(t.take_namespace_conflicts(), 1);
1311        assert_eq!(t.take_namespace_conflicts(), 0);
1312    }
1313
1314    /// Same inode → normal refresh.
1315    #[test]
1316    fn namespace_match_passes_through() {
1317        let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1318        let threshold_ns = 100;
1319        let _ = t.record(
1320            &frame(7, 1),
1321            0,
1322            threshold_ns,
1323            BeatOrigin::KernelAttested,
1324            Some(123),
1325        );
1326        let r = t.record(
1327            &frame(7, 2),
1328            10,
1329            threshold_ns,
1330            BeatOrigin::KernelAttested,
1331            Some(123),
1332        );
1333        assert_eq!(r, Update::Refreshed);
1334        assert_eq!(t.take_namespace_conflicts(), 0);
1335    }
1336
1337    /// `Some → None` regression on a same-pid rebind is a conflict.
1338    #[test]
1339    fn namespace_some_to_none_is_conflict() {
1340        let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1341        let threshold_ns = 100;
1342        let _ = t.record(
1343            &frame(7, 1),
1344            0,
1345            threshold_ns,
1346            BeatOrigin::KernelAttested,
1347            Some(123),
1348        );
1349        let r = t.record(
1350            &frame(7, 2),
1351            10,
1352            threshold_ns,
1353            BeatOrigin::KernelAttested,
1354            None,
1355        );
1356        assert_eq!(r, Update::NamespaceConflict);
1357        assert_eq!(t.take_namespace_conflicts(), 1);
1358    }
1359
1360    /// `None → Some` upgrade on a same-pid rebind pins the now-known inode
1361    /// and falls through to refresh. This is the forgiving case for a peer
1362    /// whose `/proc/<pid>/ns/pid` was briefly unreadable at first contact.
1363    #[test]
1364    fn namespace_none_to_some_upgrades_in_place() {
1365        let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1366        let threshold_ns = 100;
1367        let _ = t.record(
1368            &frame(7, 1),
1369            0,
1370            threshold_ns,
1371            BeatOrigin::KernelAttested,
1372            None,
1373        );
1374        assert_eq!(t.pid_ns_inode_of(7), Some(None));
1375        let r = t.record(
1376            &frame(7, 2),
1377            10,
1378            threshold_ns,
1379            BeatOrigin::KernelAttested,
1380            Some(999),
1381        );
1382        assert_eq!(r, Update::Refreshed);
1383        assert_eq!(t.pid_ns_inode_of(7), Some(Some(999)));
1384        assert_eq!(t.take_namespace_conflicts(), 0);
1385    }
1386
1387    /// Both `None` (non-Linux / unreadable) → refresh, no conflict.
1388    #[test]
1389    fn namespace_both_none_is_match() {
1390        let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1391        let threshold_ns = 100;
1392        let _ = t.record(
1393            &frame(7, 1),
1394            0,
1395            threshold_ns,
1396            BeatOrigin::KernelAttested,
1397            None,
1398        );
1399        let r = t.record(
1400            &frame(7, 2),
1401            10,
1402            threshold_ns,
1403            BeatOrigin::KernelAttested,
1404            None,
1405        );
1406        assert_eq!(r, Update::Refreshed);
1407        assert_eq!(t.take_namespace_conflicts(), 0);
1408    }
1409
1410    // ---- C1 regression: PidIndex::insert occupancy bookkeeping ----------
1411
1412    /// `occupied` tracks live entries.  Under a cyclic insert/remove cycle the
1413    /// counter must stay exactly equal to the number of live pids — neither
1414    /// drifting up (double-counting) nor drifting down (under-counting).
1415    #[test]
1416    fn pid_index_occupied_tracks_live_entries_under_churn() {
1417        // Table sized for 32 entries (64 slots, load ≤ 0.5).
1418        // We use a *cyclic* pid space (0..48) so tombstones from removed pids
1419        // fall in the same hash chains as later inserts, ensuring reuse.
1420        const CAP: usize = 32;
1421        const PID_RANGE: u32 = 48; // > CAP but < table_size; guarantees reuse
1422        let mut idx = PidIndex::new(CAP);
1423
1424        let mut expected_live: u32 = 0;
1425        let mut live_set = std::collections::HashSet::new();
1426
1427        for i in 0u32..2_000 {
1428            let pid = i % PID_RANGE;
1429            if live_set.contains(&pid) {
1430                // Already live — remove then re-insert to exercise the tombstone path.
1431                idx.remove(pid);
1432                live_set.remove(&pid);
1433                expected_live -= 1;
1434                idx.insert(pid, pid as usize).expect("re-insert");
1435                live_set.insert(pid);
1436                expected_live += 1;
1437            } else if expected_live < CAP as u32 {
1438                idx.insert(pid, pid as usize).expect("fresh insert");
1439                live_set.insert(pid);
1440                expected_live += 1;
1441            } else {
1442                // At capacity: remove the first entry and insert the new one.
1443                let victim = *live_set.iter().next().unwrap();
1444                idx.remove(victim);
1445                live_set.remove(&victim);
1446                expected_live -= 1;
1447                idx.insert(pid, pid as usize).expect("insert after evict");
1448                live_set.insert(pid);
1449                expected_live += 1;
1450            }
1451            assert_eq!(
1452                idx.len(),
1453                expected_live as usize,
1454                "i={i} pid={pid}: occupied={} expected={expected_live}",
1455                idx.len()
1456            );
1457        }
1458    }
1459
1460    /// Re-inserting a previously-removed pid via its tombstone slot must
1461    /// restore the live count.  `remove()` decremented `occupied`; the
1462    /// re-insert must re-increment it so the counter stays accurate.
1463    #[test]
1464    fn pid_index_occupied_restored_on_tombstone_reuse() {
1465        let mut idx = PidIndex::new(16);
1466
1467        idx.insert(42, 0).expect("first insert");
1468        assert_eq!(idx.len(), 1);
1469
1470        idx.remove(42);
1471        assert_eq!(idx.len(), 0);
1472
1473        // Re-insert via the tombstone slot: live count must go back to 1.
1474        idx.insert(42, 5).expect("reinsert via tombstone");
1475        assert_eq!(
1476            idx.len(),
1477            1,
1478            "reinsert via tombstone did not restore occupied to 1 (was {})",
1479            idx.len()
1480        );
1481    }
1482}