varta_watch/tracker.rs
1//! Per-pid liveness tracker backed by a pre-allocated `Vec<Slot>` plus a
2//! fixed-size, open-addressed [`PidIndex`] for O(1) pid lookup.
3//!
4//! The tracker is the in-memory ledger the observer consults each time a
5//! frame arrives or the read timeout expires. It never reallocates: capacity
6//! is fixed at construction, the pid-index table is sized for load factor
7//! ≤ 0.5 with a bounded probe budget ([`PidIndex::MAX_PROBE`]), and an
8//! exhausted tracker yields [`Update::CapacityExceeded`] rather than growing.
9//!
10//! The custom pid index replaces `std::collections::HashMap` for two
11//! DO-178C-style reasons: (1) `HashMap` uses SipHash randomized per process,
12//! producing a non-constant memory access pattern that defeats WCET
13//! analysis, and (2) it can rehash on collision-driven growth. `PidIndex`
14//! uses a deterministic integer mixer (Murmur3 finalizer) and linear
15//! probing with a fixed budget, so every operation has a tight WCET bound.
16
17use varta_vlp::{Frame, Status};
18
19use crate::peer_cred::BeatOrigin;
20
21/// Maximum number of distinct agents the observer can track concurrently.
22///
23/// v0.2.0 raises this from 64 to 256. Override via `--tracker-capacity`.
24pub const DEFAULT_CAPACITY: usize = 256;
25
26/// Hard upper bound for `--tracker-capacity`. The tracker uses a linear scan
27/// over active slots; at capacities exceeding this value the scan becomes a
28/// latency spike risk in the observer poll loop.
29pub const MAX_CAPACITY: usize = 4096;
30
31/// Multiplier applied to the stall threshold when choosing eviction victims.
32///
33/// A slot is only evictable if (a) the observer has already surfaced a stall
34/// event for its pid (`stall_emitted == true`) **and** (b) the silence duration
35/// exceeds `threshold * EVICTION_MULTIPLIER`. The 10× multiplier ensures that
36/// only agents which have been silent for **significantly** longer than the
37/// stall threshold are evicted — a slow-beating but alive agent (e.g. every
38/// 40 s with a 5 s threshold) will not be evicted because it resets
39/// `stall_emitted` on every beat.
40const EVICTION_MULTIPLIER: u32 = 10;
41
42/// Default maximum number of slots scanned per [`Tracker::find_evictable_slot`] call.
43///
44/// The eviction scan used to be O(`len`) — at [`MAX_CAPACITY`] = 4096 that
45/// meant up to 4096 slot reads on **every** new-pid frame once the table was
46/// full. An attacker who could send beats from many unique pids could
47/// therefore force O(n) work per arriving frame on the single-threaded
48/// observer poll loop.
49///
50/// The scan is now bounded to `Tracker::eviction_scan_window` (configurable
51/// via `--eviction-scan-window`, defaulting to this constant), with a rotating
52/// cursor ([`Tracker::eviction_scan_cursor`]) that resumes where the previous
53/// call left off. A full sweep takes `ceil(capacity / eviction_scan_window)`
54/// consecutive calls. First-fit eviction inside the window is correct under
55/// capacity pressure (any slot whose silence exceeds
56/// `threshold * EVICTION_MULTIPLIER` is a valid victim — they are by
57/// definition not actively beating).
58///
59/// 256 was chosen as a compromise: large enough that a single call typically
60/// finds a victim on tables of 1–2 k pids, small enough that the per-frame
61/// upper bound stays well under the existing observer-tick budget.
62pub const DEFAULT_EVICTION_SCAN_WINDOW: usize = 256;
63
64/// Minimum allowed value for `--eviction-scan-window`. Window = 1 is
65/// degenerate but correct; only window = 0 breaks the algorithm.
66pub const MIN_EVICTION_SCAN_WINDOW: usize = 1;
67
68/// Maximum allowed value for `--eviction-scan-window`. Capped at
69/// [`MAX_CAPACITY`] so a table scan in one call is bounded by the maximum
70/// tracker size.
71pub const MAX_EVICTION_SCAN_WINDOW: usize = MAX_CAPACITY;
72
73/// Threshold for nonce wrap detection. When the tracker's `last_nonce` for a
74/// pid is within this distance of `u64::MAX` and an incoming frame carries a
75/// nonce below this threshold, the tracker treats the gap as a nonce-space
76/// wrap (agent exhausted u64 nonces and looped to 0) rather than an
77/// out-of-order beat. The threshold is 2^20 (~1M); at 1M beats/sec the agent
78/// would take days to exhaust the nonce space, so a genuine gap this large
79/// can only be a wrap.
80const NONCE_WRAP_THRESHOLD: u64 = 1_048_576;
81
82/// Fixed-size, open-addressed `u32 → u32` map from agent pid to slot index.
83///
84/// Thin newtype over the generic [`crate::probe_table::BoundedIndex`]; see
85/// that module for the full WCET argument. The hot tracker path uses this
86/// type directly so the call sites stay readable while the probe-table
87/// machinery is shared with `OutstandingTable` and `IpStateTable`.
88///
89/// `Entry<u32>` in the generic table is still 8 bytes (see the
90/// `entry_u32_is_8_bytes` test in `probe_table`), so the per-slot cache
91/// pressure on the hot path is unchanged across the refactor.
92pub(crate) struct PidIndex(crate::probe_table::BoundedIndex<u32>);
93
94/// Re-export the generic probe-exhaustion marker so the rest of the tracker
95/// keeps referring to a `ProbeExhausted` type local to this module.
96pub(crate) use crate::probe_table::ProbeExhausted;
97
98impl PidIndex {
99 /// Hard cap on the probe sequence length per `get` / `insert` /
100 /// `remove`. Referenced from the doc comments above and from
101 /// `Tracker::take_probe_exhausted`'s remediation text; the actual
102 /// bound is enforced inside the generic `BoundedIndex`.
103 #[allow(dead_code)]
104 pub(crate) const MAX_PROBE: usize = crate::probe_table::BoundedIndex::<u32>::MAX_PROBE;
105
106 /// Build a pid index sized for `capacity` agents.
107 pub(crate) fn new(capacity: usize) -> Self {
108 Self(crate::probe_table::BoundedIndex::new(capacity))
109 }
110
111 /// Look up the slot index recorded for `pid`. Returns `None` if absent
112 /// or if the probe budget was exhausted (treated as absent so callers
113 /// fall through to insert / capacity-exceeded paths).
114 pub(crate) fn get(&self, pid: u32) -> Option<usize> {
115 self.0.get(pid)
116 }
117
118 /// Insert or update `pid → slot_idx`. Returns `Err(ProbeExhausted)` if
119 /// no free or matching slot was found within
120 /// [`Self::MAX_PROBE`] probes; table state is unchanged in that case
121 /// and the probe-exhausted counter is incremented.
122 pub(crate) fn insert(&mut self, pid: u32, slot_idx: usize) -> Result<(), ProbeExhausted> {
123 self.0.insert(pid, slot_idx)
124 }
125
126 /// Remove `pid` from the index. Returns the slot index it pointed to,
127 /// if any.
128 pub(crate) fn remove(&mut self, pid: u32) -> Option<usize> {
129 self.0.remove(pid)
130 }
131
132 /// Drain and reset the probe-exhausted counter.
133 pub(crate) fn take_probe_exhausted(&mut self) -> u64 {
134 self.0.take_probe_exhausted()
135 }
136
137 /// Number of live entries. Used by the existing occupancy invariant
138 /// tests below; production code reads occupancy through the tracker
139 /// itself, not the index.
140 #[cfg(test)]
141 pub(crate) fn len(&self) -> usize {
142 self.0.len()
143 }
144}
145
146/// Controls which slot to reclaim when the tracker is at capacity and a
147/// new pid arrives.
148#[derive(Clone, Copy, Debug, PartialEq, Eq)]
149pub enum EvictionPolicy {
150 /// Only evict slots that have already been surfaced as stalled and
151 /// have been silent for > `threshold * EVICTION_MULTIPLIER`. This is
152 /// the safest choice — a correctly-beating agent is never evicted,
153 /// but a capacity-exhaustion attack can cause `CapacityExceeded`.
154 Strict,
155 /// Like `Strict`, but when no strictly-evictable slot exists, falls
156 /// back to evicting the oldest active slot (by `last_ns`) whose
157 /// silence exceeds `threshold * EVICTION_MULTIPLIER`. This prevents
158 /// `CapacityExceeded` completely at the expense of potentially
159 /// evicting a slow-but-alive agent during a flood.
160 Balanced,
161}
162
163/// Liveness slot for a single agent pid.
164///
165/// `Slot` is internal to the observer and never crosses the wire, so it uses
166/// the default Rust repr (lets the compiler tighten field order). The
167/// `stall_emitted` latch is private: it tracks whether the observer has
168/// already surfaced an [`crate::observer::Event::Stall`] for the current
169/// silence run, so a stalled pid raises the event exactly once and then stays
170/// silent until a fresh beat resets it.
171#[derive(Clone, Copy, Debug)]
172pub struct Slot {
173 /// OS process id of the tracked agent.
174 pub(crate) pid: u32,
175 /// Most recent nonce accepted from this pid.
176 pub(crate) last_nonce: u64,
177 /// Observer-local timestamp (nanoseconds since [`crate::observer::Observer`]
178 /// start) of the last accepted beat for this pid.
179 pub(crate) last_ns: u64,
180 /// Most recent [`Status`] reported by this pid.
181 pub(crate) status: Status,
182 /// Transport origin pinned at the slot's first beat. Used to gate
183 /// recovery-eligibility — beats from a different origin than the pinned
184 /// one are rejected as [`Update::OriginConflict`] without mutating the
185 /// slot. See [`BeatOrigin`] for the trust model.
186 pub(crate) origin: BeatOrigin,
187 /// PID-namespace inode pinned at the slot's first beat (Linux only).
188 ///
189 /// `None` on non-Linux platforms, for UDP transports (no kernel attestation),
190 /// or when `/proc/<peer_pid>/ns/pid` was unreadable at first contact. A
191 /// later beat carrying a different `Some(_)` namespace inode for the same
192 /// pid is rejected as [`Update::NamespaceConflict`] without mutating the
193 /// slot. A `None → Some(_)` upgrade is permitted exactly once — it
194 /// represents a peer whose namespace became readable after a transient
195 /// failure (e.g. peer died briefly between `recvmsg` and `readlink`).
196 pub(crate) pid_ns_inode: Option<u64>,
197 /// False iff this slot has never been written; observers treat the
198 /// slot's other fields as undefined when `used == false`.
199 pub(crate) used: bool,
200 /// True iff the observer has already emitted a stall event for the
201 /// current silence run. Cleared when a fresh beat arrives.
202 pub(crate) stall_emitted: bool,
203}
204
205impl Slot {}
206
207/// Result of [`Tracker::record`].
208#[derive(Clone, Copy, Debug, Eq, PartialEq)]
209pub enum Update {
210 /// The frame's pid was new and a fresh slot was allocated for it.
211 Inserted,
212 /// An existing slot was updated with the new nonce / timestamp / status.
213 Refreshed,
214 /// The frame's nonce was not strictly greater than the slot's last
215 /// observed nonce; the slot was left untouched.
216 OutOfOrder,
217 /// The tracker is full and the frame's pid is not yet known. The slot
218 /// table was not modified.
219 CapacityExceeded,
220 /// A beat arrived for a pid that is already tracked, but the beat's
221 /// transport origin disagrees with the origin pinned by the slot's
222 /// first beat. First-origin-wins: the slot is **not** mutated and the
223 /// beat is dropped. Prevents an attacker on an untrusted transport
224 /// from "tainting" a slot that legitimately belongs to a kernel-attested
225 /// agent (or vice-versa).
226 OriginConflict,
227 /// A beat arrived for a pid that is already tracked, but the beat's
228 /// kernel-attested PID-namespace inode disagrees with the inode pinned
229 /// by the slot's first beat (Linux only — see
230 /// [`crate::peer_cred::read_pid_namespace_inode`]). First-namespace-wins:
231 /// the slot is **not** mutated and the beat is dropped. Catches the
232 /// PID-collision case where two containers happen to share a numeric pid
233 /// value (e.g. PID 1 in container A vs PID 1 in container B); the
234 /// existing `frame.pid == peer_pid` gate at the observer fires first for
235 /// most cross-namespace traffic, but a same-pid-different-namespace
236 /// collision is invisible to that gate.
237 NamespaceConflict,
238}
239
240/// Bounded per-pid liveness ledger.
241///
242/// The slot table is a `Vec<Slot>` pre-allocated at construction to the
243/// configured capacity; subsequent inserts push into that pre-allocated
244/// space without reallocation. Lookups use a fixed-size [`PidIndex`] for
245/// O(1) pid-to-index mapping — replaces the original `HashMap` so the hot
246/// path is WCET-bounded (deterministic hash, bounded probe budget, no
247/// rehashing on growth).
248pub struct Tracker {
249 entries: Vec<Slot>,
250 len: usize,
251 pid_to_index: PidIndex,
252 evictions: u64,
253 capacity_exceeded: u64,
254 nonce_wraps: u64,
255 last_evicted_pid: Option<u32>,
256 eviction_policy: EvictionPolicy,
257 /// Cached count of slots whose `stall_emitted` flag is currently set.
258 ///
259 /// Allows [`Tracker::find_evictable_slot`] to skip the strict scan
260 /// entirely when no slots have surfaced a stall yet — defangs the most
261 /// realistic DoS profile where an attacker fills the tracker faster
262 /// than the stall threshold can elapse.
263 stall_emitted_count: usize,
264 /// Maximum slots inspected per [`Tracker::scan_window`] call.
265 /// Configurable via `--eviction-scan-window`; defaults to
266 /// [`DEFAULT_EVICTION_SCAN_WINDOW`]. A full table sweep takes
267 /// `ceil(len / eviction_scan_window)` consecutive calls.
268 eviction_scan_window: usize,
269 /// Round-robin cursor into `entries` for the bounded eviction scan.
270 /// Persists across `find_evictable_slot` calls so a sequence of N
271 /// failed evictions covers the whole table in
272 /// `ceil(len / eviction_scan_window)` calls without ever scanning more
273 /// than `eviction_scan_window` slots in a single call.
274 eviction_scan_cursor: usize,
275 /// Number of times the bounded eviction scan reached its window cap
276 /// without finding a victim while the table was full. Surfaced via
277 /// [`Tracker::take_eviction_scan_truncated`] for Prometheus.
278 eviction_scan_truncated: u64,
279 /// Count of beats dropped because their transport origin disagreed with
280 /// the slot's pinned origin (first-origin-wins). Surfaced via
281 /// [`Tracker::take_origin_conflicts`] for Prometheus.
282 origin_conflicts: u64,
283 /// Count of beats dropped because their kernel-attested PID-namespace
284 /// inode disagreed with the slot's pinned namespace (first-namespace-wins).
285 /// Surfaced via [`Tracker::take_namespace_conflicts`] for Prometheus.
286 namespace_conflicts: u64,
287 /// Count of internal invariant violations encountered on the hot path —
288 /// e.g. a [`PidIndex`] entry pointed at a slot index outside `entries`,
289 /// or `find_evictable_slot` returned a stale index. Each violation is
290 /// recovered defensively (the operation behaves as a miss or as
291 /// [`Update::CapacityExceeded`]) rather than panicking. Surfaced via
292 /// [`Tracker::take_invariant_violations`] for Prometheus so operators
293 /// can alert on a non-zero value — in correctly-operating code this
294 /// counter stays at 0 forever.
295 invariant_violations: u64,
296}
297
298impl Default for Tracker {
299 fn default() -> Self {
300 Self::new(
301 DEFAULT_CAPACITY,
302 EvictionPolicy::Strict,
303 DEFAULT_EVICTION_SCAN_WINDOW,
304 )
305 }
306}
307
308impl Tracker {
309 /// Create an empty tracker with capacity for `capacity` pids.
310 ///
311 /// The slot table is pre-allocated to `capacity` entries; pushing
312 /// beyond that boundary yields [`Update::CapacityExceeded`] rather
313 /// than reallocating.
314 ///
315 /// `eviction_scan_window` caps the number of slots inspected per
316 /// eviction attempt. Values outside
317 /// `[MIN_EVICTION_SCAN_WINDOW, MAX_EVICTION_SCAN_WINDOW]` are clamped
318 /// as defense in depth; the config layer rejects out-of-range values
319 /// loudly at startup.
320 pub fn new(
321 capacity: usize,
322 eviction_policy: EvictionPolicy,
323 eviction_scan_window: usize,
324 ) -> Self {
325 let cap = capacity.min(MAX_CAPACITY);
326 let window = eviction_scan_window.clamp(MIN_EVICTION_SCAN_WINDOW, MAX_EVICTION_SCAN_WINDOW);
327 Tracker {
328 entries: Vec::with_capacity(cap),
329 len: 0,
330 pid_to_index: PidIndex::new(cap),
331 evictions: 0,
332 capacity_exceeded: 0,
333 nonce_wraps: 0,
334 last_evicted_pid: None,
335 eviction_policy,
336 stall_emitted_count: 0,
337 eviction_scan_window: window,
338 eviction_scan_cursor: 0,
339 eviction_scan_truncated: 0,
340 origin_conflicts: 0,
341 namespace_conflicts: 0,
342 invariant_violations: 0,
343 }
344 }
345
346 /// Record a frame against the tracker.
347 ///
348 /// Uses O(1) HashMap pid lookup to find the slot for `frame.pid`.
349 /// Returns [`Update::Inserted`] for a brand-new pid, [`Update::Refreshed`]
350 /// for an existing pid whose nonce moved forward, [`Update::OutOfOrder`]
351 /// if the nonce did not strictly increase, [`Update::CapacityExceeded`]
352 /// if the slot table is full (and no stale slot could be reclaimed) and
353 /// the pid is not yet tracked, or [`Update::OriginConflict`] if the
354 /// frame's transport origin disagrees with the slot's pinned origin.
355 ///
356 /// `origin` is the transport-class classification surfaced by the
357 /// receiving listener (`KernelAttested` for UDS, `NetworkUnverified` for
358 /// any UDP variant). The first beat for a pid pins the slot's origin;
359 /// subsequent beats from a different origin are dropped without
360 /// mutating the slot.
361 ///
362 /// `peer_pid_ns_inode` is the kernel-attested PID-namespace inode of the
363 /// sending process (Linux only; `None` on non-Linux or when
364 /// `/proc/<peer_pid>/ns/pid` was unreadable). The first beat pins the
365 /// slot's namespace inode; a later beat carrying a different `Some(_)`
366 /// inode for the same pid is rejected as [`Update::NamespaceConflict`].
367 /// A `None → Some(_)` upgrade is permitted (peer became readable after a
368 /// transient failure); a `Some(_) → None` regression is treated as a
369 /// conflict.
370 pub fn record(
371 &mut self,
372 frame: &Frame,
373 now_ns: u64,
374 threshold_ns: u64,
375 origin: BeatOrigin,
376 peer_pid_ns_inode: Option<u64>,
377 ) -> Update {
378 let status = frame.status;
379
380 if let Some(idx) = self.pid_to_index.get(frame.pid) {
381 // Defensive: the index promised this slot exists. If it doesn't,
382 // we treat the lookup as a miss and bump the invariant counter
383 // so ops can alert; the code then falls through to the insert
384 // path. Never panics.
385 let Some(slot) = self.entries.get_mut(idx) else {
386 self.invariant_violations = self.invariant_violations.saturating_add(1);
387 // Drop the stale index entry so the next lookup is a clean miss.
388 let _ = self.pid_to_index.remove(frame.pid);
389 self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
390 return Update::CapacityExceeded;
391 };
392 if slot.used {
393 if slot.origin != origin {
394 self.origin_conflicts = self.origin_conflicts.saturating_add(1);
395 return Update::OriginConflict;
396 }
397 // First-namespace-wins. Same precedence as origin: an actively
398 // disagreeing inode is a conflict; a `None → Some` upgrade
399 // pins the now-known namespace and falls through to refresh;
400 // both-`None` is the non-Linux / unreadable case and is a
401 // no-op.
402 match (slot.pid_ns_inode, peer_pid_ns_inode) {
403 (Some(a), Some(b)) if a != b => {
404 self.namespace_conflicts = self.namespace_conflicts.saturating_add(1);
405 return Update::NamespaceConflict;
406 }
407 (Some(_), None) => {
408 // Regression — pinned-then-lost is a tampering signal.
409 self.namespace_conflicts = self.namespace_conflicts.saturating_add(1);
410 return Update::NamespaceConflict;
411 }
412 (None, Some(_)) => {
413 // Forgiving upgrade — fill in the previously-unknown
414 // inode in place and continue with refresh.
415 slot.pid_ns_inode = peer_pid_ns_inode;
416 }
417 _ => {}
418 }
419 if frame.nonce <= slot.last_nonce {
420 // Detect nonce wrap: agent exhausted u64 nonce space
421 // and looped to 0. last_nonce is near u64::MAX and
422 // the incoming nonce is near 0 — a gap this large
423 // cannot be a genuine out-of-order beat.
424 let wrap_lo = NONCE_WRAP_THRESHOLD;
425 let wrap_hi = u64::MAX.saturating_sub(NONCE_WRAP_THRESHOLD);
426 if slot.last_nonce >= wrap_hi && frame.nonce < wrap_lo {
427 slot.last_nonce = frame.nonce;
428 slot.last_ns = now_ns;
429 slot.status = status;
430 if slot.stall_emitted {
431 slot.stall_emitted = false;
432 self.stall_emitted_count = self.stall_emitted_count.saturating_sub(1);
433 }
434 self.nonce_wraps = self.nonce_wraps.saturating_add(1);
435 return Update::Refreshed;
436 }
437 return Update::OutOfOrder;
438 }
439 slot.last_nonce = frame.nonce;
440 slot.last_ns = now_ns;
441 slot.status = status;
442 if slot.stall_emitted {
443 slot.stall_emitted = false;
444 self.stall_emitted_count = self.stall_emitted_count.saturating_sub(1);
445 }
446 return Update::Refreshed;
447 }
448 }
449
450 if self.len >= self.entries.capacity() {
451 if let Some(evict_idx) = self.find_evictable_slot(now_ns, threshold_ns) {
452 // Snapshot the slot we're evicting. If `find_evictable_slot`
453 // ever returned an OOB index (invariant break), defensively
454 // surface CapacityExceeded instead of panicking.
455 let Some(&evicted_slot) = self.entries.get(evict_idx) else {
456 self.invariant_violations = self.invariant_violations.saturating_add(1);
457 self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
458 return Update::CapacityExceeded;
459 };
460 let _ = self.pid_to_index.remove(evicted_slot.pid);
461 let Some(slot_mut) = self.entries.get_mut(evict_idx) else {
462 self.invariant_violations = self.invariant_violations.saturating_add(1);
463 self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
464 return Update::CapacityExceeded;
465 };
466 *slot_mut = Slot {
467 pid: frame.pid,
468 last_nonce: frame.nonce,
469 last_ns: now_ns,
470 status,
471 origin,
472 pid_ns_inode: peer_pid_ns_inode,
473 used: true,
474 stall_emitted: false,
475 };
476 if self.pid_to_index.insert(frame.pid, evict_idx).is_err() {
477 // Probe budget exhausted — roll back the slot write so
478 // the table stays internally consistent and surface
479 // CapacityExceeded to the caller. The `stall_emitted_count`
480 // decrement is deferred to the commit point below, so no
481 // rollback of the counter is needed here.
482 if let Some(slot_mut) = self.entries.get_mut(evict_idx) {
483 *slot_mut = evicted_slot;
484 }
485 // Best-effort re-pin of the old pid; if even this insert
486 // fails the slot is logically vacant for the next call.
487 let _ = self.pid_to_index.insert(evicted_slot.pid, evict_idx);
488 self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
489 return Update::CapacityExceeded;
490 }
491 // Commit-on-success: `stall_emitted_count` is decremented only
492 // after the new pid is pinned in the index. If the index insert
493 // had failed above, the slot rollback would have restored the
494 // old `stall_emitted = true` flag — decrementing the counter
495 // before the insert (the pre-commit-on-success layout) caused
496 // an `observed > tracked` divergence, surfaced by the
497 // `tracker_record` fuzz target. Pattern mirrors cerebrum
498 // 2026-05-15 (AEAD nonce state mutation).
499 if evicted_slot.stall_emitted {
500 self.stall_emitted_count = self.stall_emitted_count.saturating_sub(1);
501 }
502 self.evictions = self.evictions.saturating_add(1);
503 self.last_evicted_pid = Some(evicted_slot.pid);
504 return Update::Inserted;
505 }
506 self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
507 return Update::CapacityExceeded;
508 }
509 let idx = self.len;
510 // Reserve the index in the pid map *before* pushing — on probe
511 // exhaustion we surface CapacityExceeded and leave entries unchanged.
512 if self.pid_to_index.insert(frame.pid, idx).is_err() {
513 self.capacity_exceeded = self.capacity_exceeded.saturating_add(1);
514 return Update::CapacityExceeded;
515 }
516 self.entries.push(Slot {
517 pid: frame.pid,
518 last_nonce: frame.nonce,
519 last_ns: now_ns,
520 status,
521 origin,
522 pid_ns_inode: peer_pid_ns_inode,
523 used: true,
524 stall_emitted: false,
525 });
526 self.len += 1;
527 Update::Inserted
528 }
529
530 /// Find a slot that can be evicted to make room for a new pid.
531 ///
532 /// A slot is evictable when both conditions hold:
533 /// 1. The observer has already surfaced a stall event for this pid
534 /// (`stall_emitted == true`).
535 /// 2. Silence duration exceeds `threshold_ns * EVICTION_MULTIPLIER`.
536 ///
537 /// **Bounded-work guarantee.** The scan visits at most
538 /// [`EVICTION_SCAN_WINDOW`] slots per call, starting at
539 /// `self.eviction_scan_cursor` and wrapping mod `self.len`. The cursor
540 /// is advanced regardless of outcome so back-to-back failed evictions
541 /// eventually cover the whole table without ever performing more than
542 /// `WINDOW` slot reads in a single call. This trades strict
543 /// global-oldest LRU for an O(1) per-frame upper bound — the right
544 /// tradeoff under capacity pressure, because every slot satisfying the
545 /// threshold criterion is by definition a safe victim.
546 ///
547 /// **Fast-bail for Strict policy.** When no slots have surfaced a stall
548 /// yet (`stall_emitted_count == 0`), the strict pass is skipped
549 /// entirely. This is the common DoS profile: an attacker can fill the
550 /// tracker faster than the threshold can elapse, so no slot has a
551 /// `stall_emitted` flag set, and the previous code wasted O(n) work
552 /// looking for one anyway.
553 ///
554 /// When the policy is [`EvictionPolicy::Balanced`] and no
555 /// strictly-evictable slot is found in the window, a second windowed
556 /// pass picks the first slot whose silence exceeds the threshold
557 /// (disregarding `stall_emitted`). This prevents capacity-exhaustion
558 /// attacks at the cost of possibly evicting a slow-but-alive agent.
559 fn find_evictable_slot(&mut self, now_ns: u64, threshold_ns: u64) -> Option<usize> {
560 let evict_threshold = threshold_ns.saturating_mul(EVICTION_MULTIPLIER as u64);
561
562 // Strict pass — cheap bail when no slots have stalled yet.
563 if self.stall_emitted_count > 0 {
564 if let Some(idx) = self.scan_window(now_ns, evict_threshold, true) {
565 return Some(idx);
566 }
567 }
568 if self.eviction_policy == EvictionPolicy::Balanced {
569 if let Some(idx) = self.scan_window(now_ns, evict_threshold, false) {
570 return Some(idx);
571 }
572 }
573 self.eviction_scan_truncated = self.eviction_scan_truncated.saturating_add(1);
574 None
575 }
576
577 /// Bounded windowed scan helper for [`Tracker::find_evictable_slot`].
578 ///
579 /// Examines at most [`EVICTION_SCAN_WINDOW`] slots starting at
580 /// `eviction_scan_cursor` (mod `self.len`). Returns the index of the
581 /// first slot whose silence exceeds `evict_threshold` and, if
582 /// `require_stall`, whose `stall_emitted` flag is set. The cursor is
583 /// advanced past the inspected window (or just past the hit) so
584 /// subsequent calls progress around the ring.
585 fn scan_window(
586 &mut self,
587 now_ns: u64,
588 evict_threshold: u64,
589 require_stall: bool,
590 ) -> Option<usize> {
591 let n = self.len.min(self.entries.len());
592 if n == 0 {
593 return None;
594 }
595 let window = self.eviction_scan_window.min(n);
596 let start = self.eviction_scan_cursor % n;
597 for i in 0..window {
598 let idx = (start + i) % n;
599 // Defensive: if `n` ever exceeded `entries.len()` this would
600 // be unreachable under invariant `n = len.min(entries.len())`,
601 // but treat OOB as "skip" rather than panic.
602 let Some(slot) = self.entries.get(idx) else {
603 self.invariant_violations = self.invariant_violations.saturating_add(1);
604 continue;
605 };
606 let stale = now_ns.saturating_sub(slot.last_ns) > evict_threshold;
607 let qualifies = stale && (!require_stall || slot.stall_emitted);
608 if qualifies {
609 self.eviction_scan_cursor = (idx + 1) % n;
610 return Some(idx);
611 }
612 }
613 self.eviction_scan_cursor = (start + window) % n;
614 None
615 }
616
617 /// Take and reset the eviction counter. Returns the number of slots
618 /// reclaimed since the last call.
619 pub fn take_evictions(&mut self) -> u64 {
620 let count = self.evictions;
621 self.evictions = 0;
622 count
623 }
624
625 /// Return the pid of the most recently evicted slot, if any slots
626 /// have been evicted since the last call.
627 pub fn take_evicted_pid(&mut self) -> Option<u32> {
628 self.last_evicted_pid.take()
629 }
630
631 /// Take and reset the nonce-wrap counter. Returns the number of
632 /// nonce-space wraps detected since the last call.
633 pub fn take_nonce_wraps(&mut self) -> u64 {
634 let count = self.nonce_wraps;
635 self.nonce_wraps = 0;
636 count
637 }
638
639 /// Take and reset the capacity-exceeded counter. Returns the number of
640 /// beats dropped due to a full tracker since the last call.
641 pub fn take_capacity_exceeded(&mut self) -> u64 {
642 let count = self.capacity_exceeded;
643 self.capacity_exceeded = 0;
644 count
645 }
646
647 /// Number of pids currently tracked.
648 pub fn len(&self) -> usize {
649 self.len
650 }
651
652 /// Return the `last_ns` timestamp for a tracked pid, if present.
653 /// Used by the observer for per-pid rate limiting without exposing
654 /// internal slot layout.
655 pub fn last_ns_of(&self, pid: u32) -> Option<u64> {
656 self.pid_to_index
657 .get(pid)
658 .and_then(|idx| self.entries.get(idx).map(|s| s.last_ns))
659 }
660
661 /// Return the pinned transport origin of a tracked pid, if present.
662 /// Used by the observer to populate `Event::OriginConflict::slot_origin`
663 /// before calling `record` (which may produce the conflict).
664 pub fn origin_of(&self, pid: u32) -> Option<BeatOrigin> {
665 self.pid_to_index
666 .get(pid)
667 .and_then(|idx| self.entries.get(idx))
668 .filter(|s| s.used)
669 .map(|s| s.origin)
670 }
671
672 /// Return the pinned PID-namespace inode of a tracked pid, if present.
673 ///
674 /// The outer `Option` is `Some` when the pid is tracked at all; the inner
675 /// `Option` is the inode (or `None` for non-Linux / unreadable). Used by
676 /// the observer to populate `Event::NamespaceConflict::slot_ns_inode`
677 /// without an extra slot lookup.
678 pub fn pid_ns_inode_of(&self, pid: u32) -> Option<Option<u64>> {
679 self.pid_to_index
680 .get(pid)
681 .and_then(|idx| self.entries.get(idx))
682 .filter(|s| s.used)
683 .map(|s| s.pid_ns_inode)
684 }
685
686 /// True iff no pids are tracked.
687 pub fn is_empty(&self) -> bool {
688 self.len == 0
689 }
690
691 /// Find newly-stalled slots and mark them emitted in one atomic pass.
692 ///
693 /// A slot is "newly stalled" when its silence duration exceeds
694 /// `threshold_ns` **and** the observer has not yet surfaced a stall
695 /// event for the current silence run (`stall_emitted == false`).
696 /// Qualifying slots are marked `stall_emitted = true` and the callback
697 /// is invoked with `(pid, last_nonce, last_ns, origin, pid_ns_inode)` —
698 /// all within the same mutable borrow, closing the TOCTOU window that
699 /// existed between the former `iter_stalled` / `mark_stall_emitted` pair.
700 pub fn drain_stalled_slots(
701 &mut self,
702 now_ns: u64,
703 threshold_ns: u64,
704 mut cb: impl FnMut(u32, u64, u64, BeatOrigin, Option<u64>),
705 ) {
706 // Clamp the slice to actual `entries` length so the slice
707 // expression cannot panic even if `len` somehow exceeded it
708 // (invariant violation — counted, never panicked on).
709 let upper = self.len.min(self.entries.len());
710 if upper < self.len {
711 self.invariant_violations = self.invariant_violations.saturating_add(1);
712 }
713 if let Some(slice) = self.entries.get_mut(..upper) {
714 for slot in slice {
715 if !slot.used || slot.stall_emitted {
716 continue;
717 }
718 if now_ns.saturating_sub(slot.last_ns) >= threshold_ns {
719 slot.stall_emitted = true;
720 self.stall_emitted_count = self.stall_emitted_count.saturating_add(1);
721 cb(
722 slot.pid,
723 slot.last_nonce,
724 slot.last_ns,
725 slot.origin,
726 slot.pid_ns_inode,
727 );
728 }
729 }
730 }
731 #[cfg(debug_assertions)]
732 self.debug_assert_stall_count();
733 }
734
735 /// Take and reset the origin-conflict counter.
736 ///
737 /// Surfaced as `varta_origin_conflict_total` by the Prometheus exporter;
738 /// non-zero values indicate that beats for a tracked pid arrived from a
739 /// transport other than the one that first claimed the pid — either a
740 /// misconfigured agent or an active spoofing attempt.
741 pub fn take_origin_conflicts(&mut self) -> u64 {
742 let count = self.origin_conflicts;
743 self.origin_conflicts = 0;
744 count
745 }
746
747 /// Take and reset the namespace-conflict counter.
748 ///
749 /// Surfaced as `varta_tracker_namespace_conflict_total` by the Prometheus
750 /// exporter; non-zero values mean beats for a tracked pid arrived from a
751 /// different PID namespace than the one pinned by the slot's first beat.
752 /// Linux-only signal; on non-Linux platforms this counter stays at 0.
753 pub fn take_namespace_conflicts(&mut self) -> u64 {
754 let count = self.namespace_conflicts;
755 self.namespace_conflicts = 0;
756 count
757 }
758
759 /// Take and reset the bounded-window truncated-scan counter.
760 ///
761 /// Surfaced as `varta_tracker_eviction_scan_truncated_total` by the
762 /// Prometheus exporter; non-zero values prove the window cap actually
763 /// engaged (i.e. the table was full and no victim was found within
764 /// `EVICTION_SCAN_WINDOW` slots).
765 pub fn take_eviction_scan_truncated(&mut self) -> u64 {
766 let count = self.eviction_scan_truncated;
767 self.eviction_scan_truncated = 0;
768 count
769 }
770
771 /// Take and reset the invariant-violation counter.
772 ///
773 /// Surfaced as `varta_tracker_invariant_violations_total` by the
774 /// Prometheus exporter. In correctly-operating code this counter stays
775 /// at 0 forever — non-zero values mean one of the defensive `.get()`
776 /// fall-throughs in the hot path triggered (e.g. a stale `PidIndex`
777 /// entry pointed at an out-of-range slot). The tracker recovers
778 /// without panicking; ops should still treat any non-zero value as a
779 /// bug worth investigating.
780 pub fn take_invariant_violations(&mut self) -> u64 {
781 let count = self.invariant_violations;
782 self.invariant_violations = 0;
783 count
784 }
785
786 /// Take and reset the [`PidIndex`] probe-exhaustion counter.
787 ///
788 /// Surfaced as `varta_tracker_pid_index_probe_exhausted_total` by the
789 /// Prometheus exporter. Non-zero values mean a pid lookup walked
790 /// [`PidIndex::MAX_PROBE`] slots without resolving — at load factor
791 /// ≤ 0.5 this is effectively unreachable, so any non-zero value is a
792 /// red flag (pathological pid distribution, or an attempt to fill the
793 /// index past its safe load factor).
794 pub fn take_probe_exhausted(&mut self) -> u64 {
795 self.pid_to_index.take_probe_exhausted()
796 }
797
798 /// Recompute `stall_emitted_count` from scratch and assert it matches
799 /// the maintained counter. Cheap (single linear pass over `len` slots),
800 /// gated to debug builds to keep the release-mode hot path untouched.
801 #[cfg(debug_assertions)]
802 fn debug_assert_stall_count(&self) {
803 let upper = self.len.min(self.entries.len());
804 let observed = self
805 .entries
806 .get(..upper)
807 .unwrap_or(&[])
808 .iter()
809 .filter(|s| s.stall_emitted)
810 .count();
811 debug_assert_eq!(
812 observed, self.stall_emitted_count,
813 "stall_emitted_count out of sync: observed {}, tracked {}",
814 observed, self.stall_emitted_count
815 );
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 use super::*;
822 use varta_vlp::Frame;
823
824 fn frame(pid: u32, nonce: u64) -> Frame {
825 Frame::new(Status::Ok, pid, nonce, nonce, 0)
826 }
827
828 /// Default origin used by tests that don't exercise transport-origin
829 /// behaviour. Picked as `KernelAttested` so existing tests continue to
830 /// represent the common UDS path.
831 const ORIGIN: BeatOrigin = BeatOrigin::KernelAttested;
832
833 /// Fill capacity entirely; never trigger a stall. find_evictable_slot
834 /// must return None without scanning any slot (Strict policy).
835 #[test]
836 fn find_evictable_slot_returns_none_when_no_stalls_emitted() {
837 let cap = 64;
838 let mut t = Tracker::new(cap, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
839 let threshold_ns = 1_000;
840 // Fill at t=0 so silence isn't a factor either.
841 for pid in 1u32..=(cap as u32) {
842 assert_eq!(
843 t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
844 Update::Inserted
845 );
846 }
847 assert_eq!(t.len(), cap);
848 assert_eq!(t.stall_emitted_count, 0);
849
850 // Even at very large "now_ns" (silence >> 10× threshold), Strict
851 // policy must bail without scanning: no slot has stall_emitted=true.
852 let now_ns = threshold_ns * 100;
853 let result = t.record(&frame(99_999, 1), now_ns, threshold_ns, ORIGIN, None);
854 assert_eq!(result, Update::CapacityExceeded);
855 // Cursor must NOT have advanced through the table (fast-bail path).
856 assert_eq!(t.eviction_scan_cursor, 0);
857 }
858
859 /// drain_stalled_slots marks slots; counter must reflect that, and the
860 /// next find_evictable_slot must actually scan and (eventually) succeed.
861 #[test]
862 fn stall_counter_enables_eviction_after_drain() {
863 let cap = 8;
864 let mut t = Tracker::new(cap, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
865 let threshold_ns = 100;
866
867 for pid in 1u32..=(cap as u32) {
868 assert_eq!(
869 t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
870 Update::Inserted
871 );
872 }
873 // Time advances past threshold — every slot stalls.
874 let now_ns = threshold_ns * 20;
875 let mut stalled = 0u32;
876 t.drain_stalled_slots(now_ns, threshold_ns, |_, _, _, _, _| stalled += 1);
877 assert_eq!(stalled, cap as u32);
878 assert_eq!(t.stall_emitted_count, cap);
879
880 // Silence now exceeds 10× threshold → eviction succeeds.
881 let result = t.record(&frame(9_999, 1), now_ns, threshold_ns, ORIGIN, None);
882 assert_eq!(result, Update::Inserted);
883 // The replacing slot is fresh — stall counter decremented once.
884 assert_eq!(t.stall_emitted_count, cap - 1);
885 }
886
887 /// A fresh beat on a previously-stalled slot must decrement the counter.
888 #[test]
889 fn stall_counter_decrements_on_refresh() {
890 let mut t = Tracker::new(4, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
891 let threshold_ns = 100;
892 assert_eq!(
893 t.record(&frame(1, 1), 0, threshold_ns, ORIGIN, None),
894 Update::Inserted
895 );
896 t.drain_stalled_slots(threshold_ns * 2, threshold_ns, |_, _, _, _, _| {});
897 assert_eq!(t.stall_emitted_count, 1);
898
899 // New beat with strictly increasing nonce → refresh and clear flag.
900 assert_eq!(
901 t.record(&frame(1, 2), threshold_ns * 3, threshold_ns, ORIGIN, None),
902 Update::Refreshed
903 );
904 assert_eq!(t.stall_emitted_count, 0);
905 }
906
907 /// The bounded scan window must cap per-call work. Fill 4096 slots
908 /// at t=0, stall them all, then verify each find_evictable_slot call
909 /// advances the cursor by at most the configured window.
910 #[test]
911 fn find_evictable_slot_scan_is_bounded_to_window() {
912 let cap = MAX_CAPACITY;
913 let mut t = Tracker::new(cap, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
914 let threshold_ns = 100;
915 for pid in 1u32..=(cap as u32) {
916 assert_eq!(
917 t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
918 Update::Inserted
919 );
920 }
921 // Stall everything.
922 let now_ns = threshold_ns * 20;
923 t.drain_stalled_slots(now_ns, threshold_ns, |_, _, _, _, _| {});
924 assert_eq!(t.stall_emitted_count, cap);
925
926 // Each new-pid insert evicts one slot. Cursor must advance by ≤ window.
927 let window = t.eviction_scan_window;
928 let start_cursor = t.eviction_scan_cursor;
929 let _ = t.record(&frame(50_001, 1), now_ns, threshold_ns, ORIGIN, None);
930 let advanced = t.eviction_scan_cursor.wrapping_sub(start_cursor) % cap;
931 assert!(
932 advanced <= window,
933 "cursor advanced by {advanced}, expected ≤ {window}"
934 );
935 }
936
937 /// A Tracker constructed with a small eviction_scan_window must honour
938 /// that window, not the default.
939 #[test]
940 fn eviction_scan_window_is_plumbed_through() {
941 let cap = 16;
942 let window = 4;
943 let mut t = Tracker::new(cap, EvictionPolicy::Strict, window);
944 assert_eq!(t.eviction_scan_window, window);
945 let threshold_ns = 100;
946 for pid in 1u32..=(cap as u32) {
947 assert_eq!(
948 t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
949 Update::Inserted
950 );
951 }
952 // Stall everything so every slot is eviction-eligible.
953 let now_ns = threshold_ns * 20;
954 t.drain_stalled_slots(now_ns, threshold_ns, |_, _, _, _, _| {});
955 assert_eq!(t.stall_emitted_count, cap);
956 // Force an eviction attempt and confirm the cursor advanced by ≤ window.
957 let start = t.eviction_scan_cursor;
958 let _ = t.record(&frame(9_999, 1), now_ns, threshold_ns, ORIGIN, None);
959 let advanced = t.eviction_scan_cursor.wrapping_sub(start) % cap;
960 assert!(
961 advanced <= window,
962 "cursor advanced {advanced}, expected ≤ {window} (configured window)"
963 );
964 }
965
966 /// Cursor must wrap past `len` correctly so a long sequence of failed
967 /// evictions doesn't go out of bounds.
968 #[test]
969 fn scan_window_cursor_wraps_correctly() {
970 let cap = 4;
971 let mut t = Tracker::new(cap, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
972 let threshold_ns = 100;
973 for pid in 1u32..=(cap as u32) {
974 assert_eq!(
975 t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
976 Update::Inserted
977 );
978 }
979 // Force the cursor to advance past `len` by calling scan_window
980 // many times with no qualifying slots (threshold not exceeded).
981 for _ in 0..10 {
982 let _ = t.scan_window(50, 1_000_000, true);
983 }
984 assert!(t.eviction_scan_cursor < cap);
985 }
986
987 /// Stress: random sequence of record / drain_stalled / time advances.
988 /// debug_assert_stall_count fires inside drain_stalled_slots after every
989 /// call, so this test exercises the invariant.
990 #[test]
991 fn stall_emitted_count_invariant_holds_across_random_ops() {
992 let mut t = Tracker::new(32, EvictionPolicy::Balanced, DEFAULT_EVICTION_SCAN_WINDOW);
993 let threshold_ns = 100;
994 let mut now_ns: u64 = 0;
995 // Simple deterministic PRNG (xorshift64) — no rand dep.
996 let mut s: u64 = 0xC0FFEE;
997 let mut next = || {
998 s ^= s << 13;
999 s ^= s >> 7;
1000 s ^= s << 17;
1001 s
1002 };
1003 for _ in 0..2000 {
1004 let r = next() % 4;
1005 now_ns = now_ns.saturating_add(20);
1006 match r {
1007 0 => {
1008 let pid = (next() % 64) as u32 + 1;
1009 let _ = t.record(&frame(pid, now_ns), now_ns, threshold_ns, ORIGIN, None);
1010 }
1011 1 => {
1012 // Advance and drain (may flip flags to true).
1013 now_ns = now_ns.saturating_add(threshold_ns * 2);
1014 t.drain_stalled_slots(now_ns, threshold_ns, |_, _, _, _, _| {});
1015 }
1016 _ => {
1017 // No-op — let other ops dominate.
1018 }
1019 }
1020 }
1021 // Final consistency check (also runs implicitly in drain).
1022 let observed = t.entries[..t.len]
1023 .iter()
1024 .filter(|s| s.stall_emitted)
1025 .count();
1026 assert_eq!(observed, t.stall_emitted_count);
1027 }
1028
1029 /// Acceptance check: scan-truncated counter increments only when we
1030 /// run the full window without finding a victim.
1031 #[test]
1032 fn scan_truncated_counter_increments_on_dry_scan() {
1033 let mut t = Tracker::new(32, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1034 let threshold_ns = 100;
1035 for pid in 1u32..=32 {
1036 assert_eq!(
1037 t.record(&frame(pid, 1), 0, threshold_ns, ORIGIN, None),
1038 Update::Inserted
1039 );
1040 }
1041 // Table full, no stalls emitted → strict bails, balanced not used →
1042 // counter still increments since we returned None at capacity.
1043 let _ = t.record(
1044 &frame(99_999, 1),
1045 threshold_ns * 100,
1046 threshold_ns,
1047 ORIGIN,
1048 None,
1049 );
1050 assert_eq!(t.take_eviction_scan_truncated(), 1);
1051 // Take resets.
1052 assert_eq!(t.take_eviction_scan_truncated(), 0);
1053 }
1054
1055 /// First-origin-wins: once a slot is pinned to an origin, a beat with a
1056 /// different origin is dropped as `OriginConflict` without mutating the
1057 /// slot or incrementing the slot's `last_ns`.
1058 #[test]
1059 fn origin_conflict_first_origin_wins() {
1060 let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1061 let threshold_ns = 100;
1062
1063 // Beat 1 arrives via UDS (kernel-attested) and pins the slot.
1064 assert_eq!(
1065 t.record(
1066 &frame(7, 1),
1067 10,
1068 threshold_ns,
1069 BeatOrigin::KernelAttested,
1070 None
1071 ),
1072 Update::Inserted
1073 );
1074
1075 // Beat 2 arrives via UDP with the same pid — must be rejected.
1076 assert_eq!(
1077 t.record(
1078 &frame(7, 2),
1079 20,
1080 threshold_ns,
1081 BeatOrigin::NetworkUnverified,
1082 None,
1083 ),
1084 Update::OriginConflict
1085 );
1086
1087 // Slot is untouched: nonce still 1, last_ns still 10, origin still UDS.
1088 assert_eq!(t.last_ns_of(7), Some(10));
1089 assert_eq!(t.entries[0].last_nonce, 1);
1090 assert_eq!(t.entries[0].origin, BeatOrigin::KernelAttested);
1091
1092 // Counter reflects the dropped beat.
1093 assert_eq!(t.take_origin_conflicts(), 1);
1094 assert_eq!(t.take_origin_conflicts(), 0);
1095
1096 // Same-origin follow-up still works.
1097 assert_eq!(
1098 t.record(
1099 &frame(7, 3),
1100 30,
1101 threshold_ns,
1102 BeatOrigin::KernelAttested,
1103 None
1104 ),
1105 Update::Refreshed
1106 );
1107 }
1108
1109 // ---------------------- PidIndex unit tests ----------------------
1110
1111 #[test]
1112 fn pid_index_insert_get_remove_roundtrip() {
1113 let mut idx = PidIndex::new(16);
1114 assert_eq!(idx.get(42), None);
1115 idx.insert(42, 7).expect("insert");
1116 assert_eq!(idx.get(42), Some(7));
1117
1118 // Update in place preserves occupied count.
1119 idx.insert(42, 9).expect("update");
1120 assert_eq!(idx.get(42), Some(9));
1121 assert_eq!(idx.len(), 1);
1122
1123 assert_eq!(idx.remove(42), Some(9));
1124 assert_eq!(idx.get(42), None);
1125 assert_eq!(idx.len(), 0);
1126 }
1127
1128 #[test]
1129 fn pid_index_tombstone_reuse() {
1130 // Insert N pids, remove half, re-insert: lookups must still work
1131 // even though the removed slots left tombstones along the probe
1132 // sequences.
1133 let mut idx = PidIndex::new(64);
1134 for pid in 1u32..=32 {
1135 idx.insert(pid, pid as usize).expect("insert");
1136 }
1137 for pid in 1u32..=16 {
1138 assert_eq!(idx.remove(pid), Some(pid as usize));
1139 }
1140 // The remaining 16 are still findable.
1141 for pid in 17u32..=32 {
1142 assert_eq!(idx.get(pid), Some(pid as usize));
1143 }
1144 // Re-insert the removed ones; tombstones must be reused (table is
1145 // small enough that probe walks could otherwise overflow).
1146 for pid in 1u32..=16 {
1147 idx.insert(pid, (pid + 100) as usize).expect("reinsert");
1148 }
1149 for pid in 1u32..=16 {
1150 assert_eq!(idx.get(pid), Some((pid + 100) as usize));
1151 }
1152 for pid in 17u32..=32 {
1153 assert_eq!(idx.get(pid), Some(pid as usize));
1154 }
1155 }
1156
1157 #[test]
1158 fn pid_index_probe_exhaustion_returns_error() {
1159 // Build a tiny table where MAX_PROBE is large enough to find slots
1160 // through linear probing under normal use, then deliberately fill
1161 // every slot to force exhaustion of the probe budget on insert.
1162 // Table size = next_power_of_two(4 * 2) = 8 slots.
1163 let mut idx = PidIndex::new(4);
1164 // Insert MAX_PROBE-many pids that all hash to the same bucket would
1165 // be impossible with a deterministic mix; instead we fill the
1166 // *whole* table so any new pid hashing into a fully-occupied chain
1167 // exhausts the budget.
1168 for pid in 1u32..=8 {
1169 idx.insert(pid, pid as usize).expect("fill");
1170 }
1171 // Now every slot is occupied (no EMPTY anywhere). Any new pid must
1172 // walk the full MAX_PROBE without finding an EMPTY slot.
1173 let err = idx.insert(9999, 0).expect_err("must exhaust");
1174 assert_eq!(err, ProbeExhausted);
1175 assert_eq!(idx.take_probe_exhausted(), 1);
1176 assert_eq!(idx.take_probe_exhausted(), 0);
1177 }
1178
1179 #[test]
1180 fn record_probe_exhaustion_surfaces_capacity_exceeded() {
1181 // PidIndex table size = next_power_of_two(cap * 2). At cap = 4 the
1182 // table has 8 slots. Filling the *entry* table at cap leaves 4
1183 // PidIndex slots occupied (half full), so we never exhaust the
1184 // probe budget through ordinary inserts. To force exhaustion we
1185 // need the index itself to be saturated — which only happens if
1186 // someone constructs a Tracker with capacity ≥ table_size. For
1187 // safety we verify the rollback path: a forced-error scenario is
1188 // not realistically reachable through normal API use, so we instead
1189 // assert that under heavy churn the counter stays at 0.
1190 let mut t = Tracker::new(32, EvictionPolicy::Balanced, DEFAULT_EVICTION_SCAN_WINDOW);
1191 let threshold_ns = 100;
1192 let mut now = 0u64;
1193 for pid in 1u32..=4096 {
1194 now = now.saturating_add(1);
1195 let _ = t.record(&frame(pid, 1), now, threshold_ns, ORIGIN, None);
1196 }
1197 // Under nominal use probe exhaustion is unreachable at load ≤ 0.5.
1198 assert_eq!(t.take_probe_exhausted(), 0);
1199 }
1200
1201 #[test]
1202 fn invariant_violations_stays_zero_under_random_ops() {
1203 // Mirrors `stall_emitted_count_invariant_holds_across_random_ops`
1204 // but asserts the new invariant_violations counter never ticks.
1205 let mut t = Tracker::new(32, EvictionPolicy::Balanced, DEFAULT_EVICTION_SCAN_WINDOW);
1206 let threshold_ns = 100;
1207 let mut now_ns: u64 = 0;
1208 let mut s: u64 = 0xDEADBEEF;
1209 let mut next = || {
1210 s ^= s << 13;
1211 s ^= s >> 7;
1212 s ^= s << 17;
1213 s
1214 };
1215 for _ in 0..4000 {
1216 let r = next() % 4;
1217 now_ns = now_ns.saturating_add(20);
1218 match r {
1219 0 => {
1220 let pid = (next() % 96) as u32 + 1;
1221 let _ = t.record(&frame(pid, now_ns), now_ns, threshold_ns, ORIGIN, None);
1222 }
1223 1 => {
1224 now_ns = now_ns.saturating_add(threshold_ns * 2);
1225 t.drain_stalled_slots(now_ns, threshold_ns, |_, _, _, _, _| {});
1226 }
1227 2 => {
1228 let pid = (next() % 96) as u32 + 1;
1229 let _ = t.last_ns_of(pid);
1230 let _ = t.origin_of(pid);
1231 }
1232 _ => {}
1233 }
1234 }
1235 assert_eq!(t.take_invariant_violations(), 0);
1236 assert_eq!(t.take_probe_exhausted(), 0);
1237 }
1238
1239 /// drain_stalled_slots propagates each slot's pinned origin to the
1240 /// callback so downstream consumers (Recovery) can gate on transport
1241 /// trust.
1242 #[test]
1243 fn drain_stalled_slots_emits_pinned_origin() {
1244 let mut t = Tracker::new(4, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1245 let threshold_ns = 100;
1246
1247 assert_eq!(
1248 t.record(
1249 &frame(11, 1),
1250 0,
1251 threshold_ns,
1252 BeatOrigin::KernelAttested,
1253 None
1254 ),
1255 Update::Inserted
1256 );
1257 assert_eq!(
1258 t.record(
1259 &frame(22, 1),
1260 0,
1261 threshold_ns,
1262 BeatOrigin::NetworkUnverified,
1263 None,
1264 ),
1265 Update::Inserted
1266 );
1267
1268 let mut seen: Vec<(u32, BeatOrigin)> = Vec::new();
1269 t.drain_stalled_slots(threshold_ns * 2, threshold_ns, |pid, _, _, origin, _| {
1270 seen.push((pid, origin));
1271 });
1272 seen.sort_by_key(|(p, _)| *p);
1273 assert_eq!(
1274 seen,
1275 vec![
1276 (11, BeatOrigin::KernelAttested),
1277 (22, BeatOrigin::NetworkUnverified),
1278 ]
1279 );
1280 }
1281
1282 // ---------------------- PID-namespace gate tests ----------------------
1283
1284 /// First-namespace-wins: a beat with a different `Some(_)` inode for an
1285 /// already-tracked pid is rejected as `NamespaceConflict`.
1286 #[test]
1287 fn namespace_conflict_blocks_rebind() {
1288 let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1289 let threshold_ns = 100;
1290 assert_eq!(
1291 t.record(
1292 &frame(7, 1),
1293 0,
1294 threshold_ns,
1295 BeatOrigin::KernelAttested,
1296 Some(4026531836),
1297 ),
1298 Update::Inserted
1299 );
1300 let r = t.record(
1301 &frame(7, 2),
1302 10,
1303 threshold_ns,
1304 BeatOrigin::KernelAttested,
1305 Some(4026531840),
1306 );
1307 assert_eq!(r, Update::NamespaceConflict);
1308 // Slot is untouched.
1309 assert_eq!(t.pid_ns_inode_of(7), Some(Some(4026531836)));
1310 assert_eq!(t.take_namespace_conflicts(), 1);
1311 assert_eq!(t.take_namespace_conflicts(), 0);
1312 }
1313
1314 /// Same inode → normal refresh.
1315 #[test]
1316 fn namespace_match_passes_through() {
1317 let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1318 let threshold_ns = 100;
1319 let _ = t.record(
1320 &frame(7, 1),
1321 0,
1322 threshold_ns,
1323 BeatOrigin::KernelAttested,
1324 Some(123),
1325 );
1326 let r = t.record(
1327 &frame(7, 2),
1328 10,
1329 threshold_ns,
1330 BeatOrigin::KernelAttested,
1331 Some(123),
1332 );
1333 assert_eq!(r, Update::Refreshed);
1334 assert_eq!(t.take_namespace_conflicts(), 0);
1335 }
1336
1337 /// `Some → None` regression on a same-pid rebind is a conflict.
1338 #[test]
1339 fn namespace_some_to_none_is_conflict() {
1340 let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1341 let threshold_ns = 100;
1342 let _ = t.record(
1343 &frame(7, 1),
1344 0,
1345 threshold_ns,
1346 BeatOrigin::KernelAttested,
1347 Some(123),
1348 );
1349 let r = t.record(
1350 &frame(7, 2),
1351 10,
1352 threshold_ns,
1353 BeatOrigin::KernelAttested,
1354 None,
1355 );
1356 assert_eq!(r, Update::NamespaceConflict);
1357 assert_eq!(t.take_namespace_conflicts(), 1);
1358 }
1359
1360 /// `None → Some` upgrade on a same-pid rebind pins the now-known inode
1361 /// and falls through to refresh. This is the forgiving case for a peer
1362 /// whose `/proc/<pid>/ns/pid` was briefly unreadable at first contact.
1363 #[test]
1364 fn namespace_none_to_some_upgrades_in_place() {
1365 let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1366 let threshold_ns = 100;
1367 let _ = t.record(
1368 &frame(7, 1),
1369 0,
1370 threshold_ns,
1371 BeatOrigin::KernelAttested,
1372 None,
1373 );
1374 assert_eq!(t.pid_ns_inode_of(7), Some(None));
1375 let r = t.record(
1376 &frame(7, 2),
1377 10,
1378 threshold_ns,
1379 BeatOrigin::KernelAttested,
1380 Some(999),
1381 );
1382 assert_eq!(r, Update::Refreshed);
1383 assert_eq!(t.pid_ns_inode_of(7), Some(Some(999)));
1384 assert_eq!(t.take_namespace_conflicts(), 0);
1385 }
1386
1387 /// Both `None` (non-Linux / unreadable) → refresh, no conflict.
1388 #[test]
1389 fn namespace_both_none_is_match() {
1390 let mut t = Tracker::new(8, EvictionPolicy::Strict, DEFAULT_EVICTION_SCAN_WINDOW);
1391 let threshold_ns = 100;
1392 let _ = t.record(
1393 &frame(7, 1),
1394 0,
1395 threshold_ns,
1396 BeatOrigin::KernelAttested,
1397 None,
1398 );
1399 let r = t.record(
1400 &frame(7, 2),
1401 10,
1402 threshold_ns,
1403 BeatOrigin::KernelAttested,
1404 None,
1405 );
1406 assert_eq!(r, Update::Refreshed);
1407 assert_eq!(t.take_namespace_conflicts(), 0);
1408 }
1409
1410 // ---- C1 regression: PidIndex::insert occupancy bookkeeping ----------
1411
1412 /// `occupied` tracks live entries. Under a cyclic insert/remove cycle the
1413 /// counter must stay exactly equal to the number of live pids — neither
1414 /// drifting up (double-counting) nor drifting down (under-counting).
1415 #[test]
1416 fn pid_index_occupied_tracks_live_entries_under_churn() {
1417 // Table sized for 32 entries (64 slots, load ≤ 0.5).
1418 // We use a *cyclic* pid space (0..48) so tombstones from removed pids
1419 // fall in the same hash chains as later inserts, ensuring reuse.
1420 const CAP: usize = 32;
1421 const PID_RANGE: u32 = 48; // > CAP but < table_size; guarantees reuse
1422 let mut idx = PidIndex::new(CAP);
1423
1424 let mut expected_live: u32 = 0;
1425 let mut live_set = std::collections::HashSet::new();
1426
1427 for i in 0u32..2_000 {
1428 let pid = i % PID_RANGE;
1429 if live_set.contains(&pid) {
1430 // Already live — remove then re-insert to exercise the tombstone path.
1431 idx.remove(pid);
1432 live_set.remove(&pid);
1433 expected_live -= 1;
1434 idx.insert(pid, pid as usize).expect("re-insert");
1435 live_set.insert(pid);
1436 expected_live += 1;
1437 } else if expected_live < CAP as u32 {
1438 idx.insert(pid, pid as usize).expect("fresh insert");
1439 live_set.insert(pid);
1440 expected_live += 1;
1441 } else {
1442 // At capacity: remove the first entry and insert the new one.
1443 let victim = *live_set.iter().next().unwrap();
1444 idx.remove(victim);
1445 live_set.remove(&victim);
1446 expected_live -= 1;
1447 idx.insert(pid, pid as usize).expect("insert after evict");
1448 live_set.insert(pid);
1449 expected_live += 1;
1450 }
1451 assert_eq!(
1452 idx.len(),
1453 expected_live as usize,
1454 "i={i} pid={pid}: occupied={} expected={expected_live}",
1455 idx.len()
1456 );
1457 }
1458 }
1459
1460 /// Re-inserting a previously-removed pid via its tombstone slot must
1461 /// restore the live count. `remove()` decremented `occupied`; the
1462 /// re-insert must re-increment it so the counter stays accurate.
1463 #[test]
1464 fn pid_index_occupied_restored_on_tombstone_reuse() {
1465 let mut idx = PidIndex::new(16);
1466
1467 idx.insert(42, 0).expect("first insert");
1468 assert_eq!(idx.len(), 1);
1469
1470 idx.remove(42);
1471 assert_eq!(idx.len(), 0);
1472
1473 // Re-insert via the tombstone slot: live count must go back to 1.
1474 idx.insert(42, 5).expect("reinsert via tombstone");
1475 assert_eq!(
1476 idx.len(),
1477 1,
1478 "reinsert via tombstone did not restore occupied to 1 (was {})",
1479 idx.len()
1480 );
1481 }
1482}