varta_watch/observer.rs
1//! Single-threaded observer: bind one or more transport listeners, decode
2//! incoming VLP frames, surface beats / stalls / decode errors via [`Event`].
3//!
4//! The observer never spawns threads, never allocates after setup,
5//! and surfaces at most one [`Event`] per call to [`Observer::poll`]. The
6//! caller drives the loop — see `main.rs` for the daemon entrypoint.
7//!
8//! Multiple listeners (e.g. UDS + UDP) are polled round-robin. Each call to
9//! [`Observer::poll`] tries every listener once; the first non-`WouldBlock`
10//! event is returned but all remaining listeners are still tried, so a
11//! busy listener cannot starve co-located listeners. If all listeners return
12//! `WouldBlock`, stalls are drained and `None` is returned.
13
14use std::io;
15use std::path::Path;
16use std::time::Duration;
17
18use varta_vlp::{DecodeError, Frame, Status};
19
20use crate::clock::{Clock, ClockSource};
21use crate::listener::{BeatListener, PreThreadAttestation, UdsListener};
22use crate::peer_cred::{BeatOrigin, RecvResult};
23use crate::tracker::{EvictionPolicy, Tracker, Update};
24
25/// Reason a beat was dropped by the rate limiter.
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub(crate) enum RateLimitReason {
28 PerPid = 0,
29 Global = 1,
30}
31
32pub(crate) const RATE_LIMIT_N: usize = 2;
33
34/// Forward-jump sentinel: a single poll-tick advance exceeding this threshold
35/// is counted as an anomalous forward jump (sleep/wake, VM live migration,
36/// hypervisor pause). 5 s is far above worst-case poll-tick latency on a
37/// loaded host and far below any plausible sleep or migration interval.
38const CLOCK_JUMP_FORWARD_THRESHOLD_NS: u64 = 5_000_000_000;
39
40/// Re-read `/proc/sys/kernel/pid_max` at most every 60 s. Bounded so that an
41/// operator-driven `sysctl -w kernel.pid_max=...` change is picked up without
42/// daemon restart; coarse enough that the `/proc` read never appears on any
43/// latency profile (the refresh runs in the maintenance phase, not on the
44/// poll hot path). Hardcoded — no CLI knob, matching the self-watchdog
45/// cadence convention.
46const PID_MAX_REFRESH_INTERVAL_NS: u64 = 60_000_000_000;
47
48/// Global per-observer token bucket — one shared across all senders.
49///
50/// Guards against per-pid rotation attacks where an attacker cycles through
51/// fake pids to keep every per-pid bucket empty.
52///
53/// Disabled when `capacity_milli == 0`. All arithmetic is integer-only
54/// (milli-tokens) to stay allocation-free on the hot path.
55pub(crate) struct GlobalRateLimit {
56 /// Current token count in milli-tokens (1000 milli-tokens = 1 frame allowed).
57 tokens_milli: u64,
58 /// Maximum token count (= burst × 1000).
59 capacity_milli: u64,
60 /// Tokens added per nanosecond × 1_000_000 to keep integer math.
61 /// Stored as (rate_per_sec * 1_000_000) to avoid float division.
62 refill_numerator: u64,
63 /// Denominator for refill: 1_000_000_000 (ns per sec).
64 refill_denominator: u64,
65 /// Nanosecond timestamp of last refill.
66 last_refill_ns: u64,
67}
68
69impl GlobalRateLimit {
70 /// Construct a new token bucket. `rate_per_sec = 0` or `burst = 0`
71 /// produces a disabled bucket (always allows).
72 pub(crate) fn new(rate_per_sec: u32, burst: u32) -> Self {
73 if rate_per_sec == 0 || burst == 0 {
74 return GlobalRateLimit {
75 tokens_milli: 0,
76 capacity_milli: 0,
77 refill_numerator: 0,
78 refill_denominator: 1,
79 last_refill_ns: 0,
80 };
81 }
82 let capacity_milli = (burst as u64).saturating_mul(1_000);
83 GlobalRateLimit {
84 tokens_milli: capacity_milli,
85 capacity_milli,
86 refill_numerator: (rate_per_sec as u64).saturating_mul(1_000_000),
87 refill_denominator: 1_000_000_000,
88 last_refill_ns: 0,
89 }
90 }
91
92 /// Disabled when capacity is 0 — all frames pass.
93 #[inline]
94 pub(crate) fn is_disabled(&self) -> bool {
95 self.capacity_milli == 0
96 }
97
98 /// Try to consume one token. Returns `true` if the frame is allowed,
99 /// `false` if the global bucket is exhausted.
100 #[inline]
101 pub(crate) fn try_consume(&mut self, now_ns: u64) -> bool {
102 if self.is_disabled() {
103 return true;
104 }
105 // Lazy refill: add tokens proportional to elapsed time since last refill.
106 let elapsed_ns = now_ns.saturating_sub(self.last_refill_ns);
107 if elapsed_ns > 0 {
108 let added = elapsed_ns
109 .saturating_mul(self.refill_numerator)
110 .checked_div(self.refill_denominator)
111 .unwrap_or(0);
112 self.tokens_milli = self
113 .tokens_milli
114 .saturating_add(added)
115 .min(self.capacity_milli);
116 self.last_refill_ns = now_ns;
117 }
118 // Consume 1000 milli-tokens (= 1 frame).
119 if self.tokens_milli >= 1_000 {
120 self.tokens_milli -= 1_000;
121 true
122 } else {
123 false
124 }
125 }
126}
127
128/// Event surfaced by [`Observer::poll`].
129///
130/// Each call to `poll` returns at most one event. Unknown-pid overflow and
131/// out-of-order beats are silently dropped at this layer; the bench / metrics
132/// sessions can layer counters on top without changing this enum.
133#[derive(Debug)]
134pub enum Event {
135 /// A well-formed beat was accepted for a tracked pid.
136 Beat {
137 /// OS process id of the emitting agent.
138 pid: u32,
139 /// Decoded health status of the beat.
140 status: Status,
141 /// Application-defined payload carried by the beat.
142 payload: u32,
143 /// Monotonic nonce of the beat.
144 nonce: u64,
145 /// Transport-class classification of the beat (see [`BeatOrigin`]).
146 /// Recovery commands consult this to refuse firing on non-kernel-attested origins.
147 origin: BeatOrigin,
148 /// Kernel-attested PID-namespace inode of the sender (Linux only).
149 /// `None` for non-Linux platforms, UDP transports, or when the peer's
150 /// `/proc/<pid>/ns/pid` was unreadable.
151 pid_ns_inode: Option<u64>,
152 /// Observer-local timestamp (ns since [`Observer`] start) when this
153 /// event was produced.
154 observer_ns: u64,
155 },
156 /// A tracked pid has not beaten within the configured threshold and the
157 /// observer has not yet surfaced a stall event for this silence run.
158 Stall {
159 /// OS process id of the silent agent.
160 pid: u32,
161 /// Last nonce observed for this pid.
162 last_nonce: u64,
163 /// Observer-local timestamp (ns since [`Observer`] start) of the
164 /// last accepted beat for this pid.
165 last_ns: u64,
166 /// Transport origin pinned by the slot's first beat. Recovery
167 /// refuses to spawn for `NetworkUnverified` unless the operator has
168 /// explicitly opted in via
169 /// `--i-accept-recovery-on-unauthenticated-transport`.
170 origin: BeatOrigin,
171 /// PID-namespace inode pinned by the slot's first beat (Linux only).
172 /// Used by main.rs to construct the recovery `StallSource`: a
173 /// `Some(_)` value that differs from the observer's namespace inode
174 /// indicates a cross-namespace agent and gates recovery refusal.
175 pid_ns_inode: Option<u64>,
176 /// Observer-local timestamp (ns since [`Observer`] start) when this
177 /// stall event was produced.
178 observer_ns: u64,
179 },
180 /// A 32-byte payload arrived but failed VLP decoding.
181 Decode(DecodeError, u64),
182 /// Frame decoded but the `frame.pid` does not match the kernel-verified
183 /// peer PID of the sender. The claimed pid is preserved so exporters can
184 /// record what the frame *claimed* to be.
185 AuthFailure {
186 /// The pid the frame on the wire claimed to be.
187 claimed_pid: u32,
188 /// Observer-local timestamp (ns since [`Observer`] start) when this
189 /// event was produced.
190 observer_ns: u64,
191 },
192 /// A beat arrived for an already-tracked pid, but its transport origin
193 /// disagreed with the origin pinned by the slot's first beat. The slot
194 /// was not mutated; the beat was dropped. First-origin-wins prevents an
195 /// attacker on an untrusted transport from "tainting" a slot that
196 /// legitimately belongs to a kernel-attested agent.
197 OriginConflict {
198 /// The pid claimed by the dropped beat (same as the existing slot's pid).
199 claimed_pid: u32,
200 /// Transport origin observed on this datagram.
201 observed_origin: BeatOrigin,
202 /// Origin pinned by the slot (the one that "won" the conflict).
203 slot_origin: BeatOrigin,
204 /// Observer-local timestamp (ns since [`Observer`] start) when this
205 /// event was produced.
206 observer_ns: u64,
207 },
208 /// A kernel-attested beat arrived whose peer PID-namespace inode differs
209 /// from the observer's namespace (Linux only). Recovery for the
210 /// associated pid cannot safely fire because the pid is in a different
211 /// namespace — `kill(2)` and `systemctl` would target the wrong process.
212 /// The beat was dropped at receive; the tracker was not modified.
213 NamespaceConflict {
214 /// The pid claimed by the dropped beat.
215 claimed_pid: u32,
216 /// PID-namespace inode of the sender (Linux only; `None` when
217 /// `/proc/<peer_pid>/ns/pid` was unreadable).
218 observed_ns_inode: Option<u64>,
219 /// The observer's own PID-namespace inode (cached at startup; `None`
220 /// when `/proc/self/ns/pid` is unreadable, which usually means the
221 /// platform isn't Linux).
222 observer_ns_inode: Option<u64>,
223 /// Observer-local timestamp (ns since [`Observer`] start) when this
224 /// event was produced.
225 observer_ns: u64,
226 },
227 /// Receiving from a listener failed with an error other than
228 /// `WouldBlock` / `TimedOut`.
229 Io(io::Error, u64),
230 /// Ancillary data truncated by the kernel (`MSG_CTRUNC` on Linux).
231 /// Indicates the kernel's ancillary-data buffer was too small for the
232 /// per-message metadata — a kernel-level buffer sizing issue.
233 CtrlTruncated(io::Error, u64),
234}
235
236/// Observer bound to one or more transport listeners.
237///
238/// The observer owns all listeners; cleanup (e.g. socket file unlink) happens
239/// when the [`Observer`] is dropped.
240pub struct Observer {
241 listeners: Vec<Box<dyn BeatListener>>,
242 tracker: Tracker,
243 threshold_ns: u64,
244 clock: Clock,
245 stall_queue: Vec<Option<Event>>,
246 stall_cursor: usize,
247 /// Next index to start polling from for fair round-robin across listeners.
248 next_listener_start: usize,
249 /// Minimum inter-beat interval applied per pid, in nanoseconds.
250 /// `None` means no rate limiting (the default).
251 rate_limit_interval_ns: Option<u64>,
252 /// Beats dropped by the per-pid and global rate limiters since the last drain.
253 /// Index 0 = per-pid (`RateLimitReason::PerPid`), 1 = global (`RateLimitReason::Global`).
254 rate_limited_total: [u64; RATE_LIMIT_N],
255 /// Global per-observer token bucket for defeating per-pid rotation attacks.
256 global_rl: GlobalRateLimit,
257 /// Monotonicity guard — last `now_ns()` value, clamped forward-only to
258 /// survive TSC drift and VM live migration.
259 last_now_ns: u64,
260 /// Count of times the underlying monotonic clock returned a value
261 /// strictly less than `last_now_ns` and the clamp absorbed the
262 /// regression. Surfaced as `varta_observer_clock_regression_total` so
263 /// operators can alert on TSC drift / VM-live-migration events that
264 /// would otherwise be invisible. Drained via
265 /// [`Observer::drain_clock_regressions`].
266 clock_regressions: u64,
267 /// Count of times consecutive `now_ns()` readings advanced by more than
268 /// [`CLOCK_JUMP_FORWARD_THRESHOLD_NS`] in a single poll tick. This
269 /// captures sleep/wake on `monotonic-raw`/`boottime`, VM live migration,
270 /// and hypervisor pauses that are invisible to the regression counter.
271 /// Surfaced as `varta_observer_clock_jump_forward_total`. Drained via
272 /// [`Observer::drain_clock_jumps_forward`].
273 clock_jumps_forward: u64,
274 /// When true, beats from agents whose kernel-attested PID namespace
275 /// differs from the observer's are admitted into the tracker (and may
276 /// later be passed to recovery). Set by `--allow-cross-namespace-agents`.
277 /// Default `false` — beats from cross-namespace agents are dropped at
278 /// ingress and counted via [`Observer::drain_cross_namespace_drops`].
279 allow_cross_namespace: bool,
280 /// Count of beats dropped at ingress because the kernel-attested peer's
281 /// PID namespace inode differs from the observer's. Linux-only signal;
282 /// 0 on other platforms.
283 cross_namespace_drops: u64,
284 /// Maximum PID accepted on the wire — cached from
285 /// `/proc/sys/kernel/pid_max` on Linux at observer startup. On non-Linux
286 /// targets and when `/proc` is unreadable, this is `u32::MAX` (gate
287 /// effectively disabled). See [`crate::pid_max::read_pid_max`].
288 pid_max: u32,
289 /// Count of beats dropped at ingress because `frame.pid > pid_max`.
290 /// Surfaced as `varta_frame_rejected_pid_above_max_total`.
291 pid_above_max_drops: u64,
292 /// Monotonic-clock timestamp (ns) of the most recent `pid_max` refresh
293 /// from `/proc/sys/kernel/pid_max`. `0` until the first periodic refresh
294 /// fires from [`Observer::maybe_refresh_pid_max`]; the value cached at
295 /// `Observer::new` covers the startup window until then. Compared against
296 /// `self.now_ns()` with [`PID_MAX_REFRESH_INTERVAL_NS`].
297 last_pid_max_refresh_ns: u64,
298 /// Effective `SO_RCVBUF` size granted by the kernel for the observer UDS,
299 /// in bytes. `0` if `--uds-rcvbuf-bytes 0` was used or tuning failed.
300 /// Set by [`Observer::bind`] from the [`UdsListener::rcvbuf_bytes`] accessor.
301 pub uds_rcvbuf_bytes: u32,
302}
303
304impl Observer {
305 /// Create an empty observer with no listeners. Use
306 /// [`Observer::add_listener`] to attach transports, or call
307 /// [`Observer::bind`] for the common single-UDS case.
308 ///
309 /// `tracker_capacity` sets the maximum number of distinct agent pids
310 /// tracked concurrently. Beats for new pids beyond this limit are
311 /// dropped with [`Update::CapacityExceeded`] (the counter is surfaced
312 /// via `varta_tracker_capacity_exceeded_total`).
313 ///
314 /// `eviction_policy` controls which slot to reclaim when the tracker
315 /// is full and a new pid arrives ([`EvictionPolicy::Strict`] only
316 /// evicts confirmed-stalled agents; [`EvictionPolicy::Balanced`] also
317 /// evicts the oldest active slot to prevent capacity exhaustion).
318 ///
319 /// `max_beat_rate` is an optional per-pid rate limit in beats per
320 /// second. When set, beats arriving faster than this rate from the
321 /// same pid are dropped and counted via [`Observer::drain_rate_limited`].
322 /// `None` (the default) disables rate limiting.
323 #[allow(clippy::too_many_arguments)]
324 pub fn new(
325 threshold: Duration,
326 tracker_capacity: usize,
327 eviction_policy: EvictionPolicy,
328 eviction_scan_window: usize,
329 max_beat_rate: Option<u32>,
330 global_beat_rate: u32,
331 global_beat_burst: u32,
332 clock_source: ClockSource,
333 ) -> io::Result<Self> {
334 let threshold_ns = threshold.as_nanos().min(u64::MAX as u128) as u64;
335 let rate_limit_interval_ns = max_beat_rate.and_then(|rps| {
336 if rps == 0 {
337 None
338 } else {
339 // Convert beats/sec to nanosecond interval.
340 // Saturate at 1 ns (1 GHz rate) to avoid overflow.
341 let interval_ns = 1_000_000_000u64.checked_div(rps as u64).unwrap_or(1);
342 Some(interval_ns)
343 }
344 });
345 let clock = Clock::new(clock_source).map_err(io::Error::from)?;
346 Ok(Observer {
347 listeners: Vec::new(),
348 tracker: Tracker::new(tracker_capacity, eviction_policy, eviction_scan_window),
349 threshold_ns,
350 clock,
351 stall_queue: Vec::with_capacity(tracker_capacity),
352 stall_cursor: 0,
353 next_listener_start: 0,
354 rate_limit_interval_ns,
355 rate_limited_total: [0; RATE_LIMIT_N],
356 global_rl: GlobalRateLimit::new(global_beat_rate, global_beat_burst),
357 last_now_ns: 0,
358 clock_regressions: 0,
359 clock_jumps_forward: 0,
360 allow_cross_namespace: false,
361 cross_namespace_drops: 0,
362 pid_max: crate::pid_max::read_pid_max(),
363 pid_above_max_drops: 0,
364 last_pid_max_refresh_ns: 0,
365 uds_rcvbuf_bytes: 0,
366 })
367 }
368
369 /// Allow beats from agents whose kernel-attested PID namespace differs
370 /// from the observer's own namespace. Default `false`. Wired from the
371 /// `--allow-cross-namespace-agents` CLI flag.
372 pub fn with_allow_cross_namespace(mut self, allow: bool) -> Self {
373 self.allow_cross_namespace = allow;
374 self
375 }
376
377 /// Create an observer from a single already-configured listener.
378 #[allow(clippy::too_many_arguments)]
379 pub fn from_listener<L: BeatListener + 'static>(
380 listener: L,
381 threshold: Duration,
382 tracker_capacity: usize,
383 eviction_policy: EvictionPolicy,
384 eviction_scan_window: usize,
385 max_beat_rate: Option<u32>,
386 global_beat_rate: u32,
387 global_beat_burst: u32,
388 clock_source: ClockSource,
389 ) -> io::Result<Self> {
390 let mut obs = Self::new(
391 threshold,
392 tracker_capacity,
393 eviction_policy,
394 eviction_scan_window,
395 max_beat_rate,
396 global_beat_rate,
397 global_beat_burst,
398 clock_source,
399 )?;
400 obs.add_listener(Box::new(listener));
401 Ok(obs)
402 }
403
404 /// Bind a Unix datagram socket at `path` and return an [`Observer`]
405 /// with that single UDS listener.
406 ///
407 /// This is the backward-compatible convenience constructor for the common
408 /// single-UDS case. For multi-transport setups, use [`Observer::new`]
409 /// followed by [`Observer::add_listener`].
410 #[allow(clippy::too_many_arguments)]
411 pub fn bind(
412 path: impl AsRef<Path>,
413 threshold: Duration,
414 socket_mode: u32,
415 read_timeout: Duration,
416 uds_rcvbuf_bytes: u32,
417 tracker_capacity: usize,
418 eviction_policy: EvictionPolicy,
419 eviction_scan_window: usize,
420 max_beat_rate: Option<u32>,
421 global_beat_rate: u32,
422 global_beat_burst: u32,
423 clock_source: ClockSource,
424 pre_thread: &PreThreadAttestation,
425 ) -> io::Result<Self> {
426 let listener = UdsListener::bind(
427 path,
428 socket_mode,
429 read_timeout,
430 uds_rcvbuf_bytes,
431 pre_thread,
432 )?;
433 let rcvbuf = listener.rcvbuf_bytes();
434 let mut obs = Self::from_listener(
435 listener,
436 threshold,
437 tracker_capacity,
438 eviction_policy,
439 eviction_scan_window,
440 max_beat_rate,
441 global_beat_rate,
442 global_beat_burst,
443 clock_source,
444 )?;
445 obs.uds_rcvbuf_bytes = rcvbuf;
446 Ok(obs)
447 }
448
449 /// Add a listener to the observer. The listener is polled in round-robin
450 /// order alongside any existing listeners.
451 pub fn add_listener(&mut self, listener: Box<dyn BeatListener>) {
452 self.listeners.push(listener);
453 }
454
455 /// Poll every listener once round-robin and return the first
456 /// non-`WouldBlock` [`Event`] found. Each listener is tried exactly
457 /// once per call — a busy listener cannot starve others because the
458 /// round-robin cursor (`next_listener_start`) advances past each
459 /// non-`WouldBlock` listener on every call.
460 ///
461 /// **Latency bound:** worst-case per-call work is
462 /// `N_listeners × per-listener-recv-cost + eviction_scan_window`.
463 /// Under the canonical stress profile (3 listeners, 4096 tracker
464 /// capacity, 256-slot eviction window) the p99 iteration time is
465 /// ≤ 5 ms — see `book/src/architecture/observer-liveness.md` and the
466 /// `tick-distribution` bench (`cargo run -p varta-bench --release --
467 /// tick-distribution`) which asserts this bound under sustained load.
468 ///
469 /// This method never returns [`Event::Stall`] — queued stall events must
470 /// be retrieved via [`Observer::poll_pending`].
471 pub fn poll(&mut self) -> Option<Event> {
472 let len = self.listeners.len();
473 let start = self.next_listener_start;
474 let mut first_event: Option<Event> = None;
475 let mut round = 0;
476 while round < len {
477 let i = (start + round) % len;
478 round += 1;
479 match self.listeners[i].recv() {
480 RecvResult::Authenticated {
481 peer_pid,
482 peer_uid: _,
483 peer_pid_ns_inode,
484 origin,
485 data,
486 } => {
487 let now_ns = self.now_ns();
488 if first_event.is_none() {
489 self.next_listener_start = (i + 1) % len;
490 }
491 match Frame::decode(&data) {
492 Ok(frame) => {
493 // Observer-side PID range gate. VLP rejects 0/1
494 // as wire-format `BadPid`; here we additionally
495 // reject any pid above the kernel's configured
496 // `pid_max` (Linux) — no live process can hold
497 // that id, so the frame is either corrupted or
498 // forged. Non-Linux: `pid_max == u32::MAX`,
499 // gate is a no-op.
500 if frame.pid > self.pid_max {
501 self.pid_above_max_drops =
502 self.pid_above_max_drops.saturating_add(1);
503 continue;
504 }
505 // Per-datagram PID verification — works on Linux
506 // (SCM_CREDENTIALS via SO_PASSCRED) and macOS
507 // (LOCAL_PEERTOKEN via getsockopt). For transports
508 // without kernel credential support, peer_pid is 0
509 // and this check is a no-op.
510 if peer_pid != 0 && frame.pid != peer_pid {
511 if first_event.is_none() {
512 first_event = Some(Event::AuthFailure {
513 claimed_pid: frame.pid,
514 observer_ns: now_ns,
515 });
516 }
517 continue;
518 }
519 // Global token bucket: drop BEFORE namespace /
520 // per-pid classification so a rotation attack
521 // cannot exhaust classification work.
522 if !self.global_rl.try_consume(now_ns) {
523 self.rate_limited_total[RateLimitReason::Global as usize] =
524 self.rate_limited_total[RateLimitReason::Global as usize]
525 .saturating_add(1);
526 continue;
527 }
528 // Cross-namespace gate (Linux only). When the
529 // kernel-attested peer's PID namespace inode
530 // differs from the observer's, the frame.pid
531 // cannot safely be used to target recovery
532 // commands. The check is a no-op on non-Linux
533 // (both inodes are `None`), for UDP transports
534 // (peer inode is `None`), and when the operator
535 // has opted in via --allow-cross-namespace-agents.
536 let observer_ns_inode =
537 crate::peer_cred::observer_pid_namespace_inode();
538 let cross_ns = matches!(
539 (observer_ns_inode, peer_pid_ns_inode),
540 (Some(a), Some(b)) if a != b
541 );
542 if cross_ns && !self.allow_cross_namespace {
543 self.cross_namespace_drops =
544 self.cross_namespace_drops.saturating_add(1);
545 if first_event.is_none() {
546 first_event = Some(Event::NamespaceConflict {
547 claimed_pid: frame.pid,
548 observed_ns_inode: peer_pid_ns_inode,
549 observer_ns_inode,
550 observer_ns: now_ns,
551 });
552 }
553 continue;
554 }
555 // Per-pid rate limiting: if a minimum inter-beat
556 // interval is configured, skip frames that arrive
557 // too soon from the same pid.
558 if let Some(interval_ns) = self.rate_limit_interval_ns {
559 if let Some(last_ns) = self.tracker.last_ns_of(frame.pid) {
560 if now_ns.saturating_sub(last_ns) < interval_ns {
561 self.rate_limited_total[RateLimitReason::PerPid as usize] =
562 self.rate_limited_total
563 [RateLimitReason::PerPid as usize]
564 .saturating_add(1);
565 continue;
566 }
567 }
568 }
569 // Capture the slot's pre-record pinned origin (if
570 // any) so an OriginConflict event can report what
571 // the slot was pinned to without an extra lookup
572 // afterwards.
573 let slot_origin_before = self.tracker.origin_of(frame.pid);
574 match self.tracker.record(
575 &frame,
576 now_ns,
577 self.threshold_ns,
578 origin,
579 peer_pid_ns_inode,
580 ) {
581 Update::Inserted | Update::Refreshed => {
582 if first_event.is_none() {
583 first_event = Some(Event::Beat {
584 pid: frame.pid,
585 status: frame.status,
586 payload: frame.payload,
587 nonce: frame.nonce,
588 origin,
589 pid_ns_inode: peer_pid_ns_inode,
590 observer_ns: now_ns,
591 });
592 }
593 }
594 Update::OriginConflict => {
595 if first_event.is_none() {
596 first_event = Some(Event::OriginConflict {
597 claimed_pid: frame.pid,
598 observed_origin: origin,
599 slot_origin: slot_origin_before.unwrap_or(origin),
600 observer_ns: now_ns,
601 });
602 }
603 }
604 Update::NamespaceConflict => {
605 if first_event.is_none() {
606 first_event = Some(Event::NamespaceConflict {
607 claimed_pid: frame.pid,
608 observed_ns_inode: peer_pid_ns_inode,
609 observer_ns_inode: self
610 .tracker
611 .pid_ns_inode_of(frame.pid)
612 .flatten(),
613 observer_ns: now_ns,
614 });
615 }
616 }
617 Update::OutOfOrder | Update::CapacityExceeded => {}
618 }
619 }
620 Err(e) => {
621 if first_event.is_none() {
622 first_event = Some(Event::Decode(e, now_ns));
623 }
624 }
625 }
626 }
627 RecvResult::WouldBlock => continue,
628 RecvResult::ShortRead => continue,
629 RecvResult::CtrlTruncated(e) => {
630 if first_event.is_none() {
631 self.next_listener_start = (i + 1) % len;
632 first_event = Some(Event::CtrlTruncated(e, self.now_ns()));
633 }
634 }
635 RecvResult::IoError(e) => {
636 if first_event.is_none() {
637 self.next_listener_start = (i + 1) % len;
638 first_event = Some(Event::Io(e, self.now_ns()));
639 }
640 }
641 }
642 }
643 self.drain_stalls();
644 first_event
645 }
646
647 /// Return the next queued [`Event::Stall`], if any.
648 pub fn poll_pending(&mut self) -> Option<Event> {
649 if self.stall_cursor < self.stall_queue.len() {
650 let stall = self.stall_queue[self.stall_cursor].take();
651 self.stall_cursor += 1;
652 return stall;
653 }
654 None
655 }
656
657 /// Whether the stall queue has unconsumed [`Event::Stall`] entries.
658 pub fn has_pending_stalls(&self) -> bool {
659 self.stall_cursor < self.stall_queue.len()
660 }
661
662 /// Observer-local nanosecond timestamp (ns since [`Observer`] start).
663 ///
664 /// Clamped to never decrease — on some platforms (VMs with TSC drift,
665 /// live-migration pause-and-resume), the underlying clock can produce
666 /// values that appear to go backwards. Without clamping, a forward clock
667 /// jump after a backward excursion can cause false stall detections.
668 ///
669 /// The kernel clock backing this reading is selected via
670 /// [`crate::clock::ClockSource`] (`--clock-source` CLI flag); see
671 /// `book/src/architecture/safety-profiles.md` for the SRE vs. medical
672 /// deployment matrix.
673 pub fn now_ns(&mut self) -> u64 {
674 let raw = self.clock.now_ns();
675 self.apply_raw_clock(raw)
676 }
677
678 fn apply_raw_clock(&mut self, raw: u64) -> u64 {
679 if raw < self.last_now_ns {
680 self.clock_regressions = self.clock_regressions.saturating_add(1);
681 } else if self.last_now_ns > 0
682 && raw.saturating_sub(self.last_now_ns) > CLOCK_JUMP_FORWARD_THRESHOLD_NS
683 {
684 self.clock_jumps_forward = self.clock_jumps_forward.saturating_add(1);
685 }
686 self.last_now_ns = self.last_now_ns.max(raw);
687 self.last_now_ns
688 }
689
690 /// Feed a synthetic raw clock value directly, bypassing `self.clock`.
691 /// Only available in tests; allows forward-jump and regression scenarios
692 /// without waiting for real time to advance.
693 #[cfg(test)]
694 pub(crate) fn apply_raw_clock_test(&mut self, raw: u64) -> u64 {
695 self.apply_raw_clock(raw)
696 }
697
698 /// Drain and reset the clock-regression counter — number of times the
699 /// kernel monotonic clock returned a value strictly less than the
700 /// previously observed one and the forward clamp absorbed the
701 /// regression. Non-zero values surface TSC drift, VM live migration,
702 /// or other anomalous clock behavior that would otherwise be invisible.
703 /// Surfaced as `varta_observer_clock_regression_total`.
704 pub fn drain_clock_regressions(&mut self) -> u64 {
705 let n = self.clock_regressions;
706 self.clock_regressions = 0;
707 n
708 }
709
710 /// Drain and reset the forward-jump counter — number of times the kernel
711 /// monotonic clock advanced by more than [`CLOCK_JUMP_FORWARD_THRESHOLD_NS`]
712 /// between adjacent poll ticks. Non-zero values indicate sleep/wake on
713 /// `monotonic-raw`/`boottime`, VM live migration, or a hypervisor pause.
714 /// Surfaced as `varta_observer_clock_jump_forward_total`.
715 pub fn drain_clock_jumps_forward(&mut self) -> u64 {
716 let n = self.clock_jumps_forward;
717 self.clock_jumps_forward = 0;
718 n
719 }
720
721 /// Inspect the kernel clock backing this observer's stall accounting.
722 pub fn clock_source(&self) -> ClockSource {
723 self.clock.source()
724 }
725
726 fn drain_stalls(&mut self) {
727 if self.stall_cursor < self.stall_queue.len() {
728 return;
729 }
730 let now_ns = self.now_ns();
731 self.stall_queue.clear();
732 self.stall_cursor = 0;
733 self.tracker.drain_stalled_slots(
734 now_ns,
735 self.threshold_ns,
736 |pid, last_nonce, last_ns, origin, pid_ns_inode| {
737 self.stall_queue.push(Some(Event::Stall {
738 pid,
739 last_nonce,
740 last_ns,
741 origin,
742 pid_ns_inode,
743 observer_ns: now_ns,
744 }));
745 },
746 );
747 }
748
749 /// Drain and reset the eviction counter.
750 pub fn drain_evictions(&mut self) -> u64 {
751 self.tracker.take_evictions()
752 }
753
754 /// Drain the pid of the most recently evicted slot, if any.
755 pub fn drain_evicted_pid(&mut self) -> Option<u32> {
756 self.tracker.take_evicted_pid()
757 }
758
759 /// Drain and reset the capacity-exceeded counter.
760 pub fn drain_capacity_exceeded(&mut self) -> u64 {
761 self.tracker.take_capacity_exceeded()
762 }
763
764 /// Drain and reset the nonce-wrap counter.
765 pub fn drain_nonce_wraps(&mut self) -> u64 {
766 self.tracker.take_nonce_wraps()
767 }
768
769 /// Drain and reset the count of bounded eviction-scan calls that ran
770 /// the full [`crate::tracker::EVICTION_SCAN_WINDOW`] without finding a
771 /// victim. Non-zero values prove the per-frame work cap engaged — i.e.
772 /// the tracker was full and an attacker would otherwise have forced
773 /// O(n) work per arriving frame.
774 pub fn drain_eviction_scan_truncated(&mut self) -> u64 {
775 self.tracker.take_eviction_scan_truncated()
776 }
777
778 /// Drain and reset the per-tracker origin-conflict counter — number of
779 /// beats dropped because their transport origin disagreed with the
780 /// slot's pinned origin (first-origin-wins). Surfaced as
781 /// `varta_origin_conflict_total` in the Prometheus exporter.
782 pub fn drain_origin_conflicts(&mut self) -> u64 {
783 self.tracker.take_origin_conflicts()
784 }
785
786 /// Drain and reset the count of beats dropped at ingress because the
787 /// peer's PID-namespace inode differs from the observer's. Surfaced as
788 /// `varta_frame_namespace_mismatch_total` in the Prometheus exporter.
789 pub fn drain_cross_namespace_drops(&mut self) -> u64 {
790 let n = self.cross_namespace_drops;
791 self.cross_namespace_drops = 0;
792 n
793 }
794
795 /// Drain and reset the count of beats dropped at ingress because
796 /// `frame.pid` exceeded the kernel's configured `pid_max`. Surfaced as
797 /// `varta_frame_rejected_pid_above_max_total` in the Prometheus
798 /// exporter. Linux-only signal; 0 on platforms where the gate defaults
799 /// to `u32::MAX`.
800 pub fn drain_pid_above_max_drops(&mut self) -> u64 {
801 let n = self.pid_above_max_drops;
802 self.pid_above_max_drops = 0;
803 n
804 }
805
806 /// Observer's cached `pid_max`. Linux-only meaningful value; otherwise
807 /// `u32::MAX`. Exposed for tests and for the Prometheus exporter's
808 /// gauge.
809 pub fn pid_max(&self) -> u32 {
810 self.pid_max
811 }
812
813 /// Re-read `/proc/sys/kernel/pid_max` if at least
814 /// [`PID_MAX_REFRESH_INTERVAL_NS`] has elapsed since the last refresh.
815 /// Cheap no-op otherwise (single `u64` compare).
816 ///
817 /// Intended to be called from the daemon's maintenance phase — *not*
818 /// from `poll()` — so the I/O hot path stays untouched. Picks up
819 /// runtime `sysctl -w kernel.pid_max=...` changes within one interval.
820 /// On non-Linux targets, [`crate::pid_max::read_pid_max`] returns
821 /// `u32::MAX` so the gate stays effectively disabled and this method
822 /// is a steady no-op.
823 ///
824 /// Returns `true` when a refresh actually ran this call (regardless of
825 /// whether the read value changed), `false` when gated by the interval.
826 pub fn maybe_refresh_pid_max(&mut self) -> bool {
827 let now_ns = self.now_ns();
828 if now_ns.saturating_sub(self.last_pid_max_refresh_ns) < PID_MAX_REFRESH_INTERVAL_NS {
829 return false;
830 }
831 self.pid_max = crate::pid_max::read_pid_max();
832 self.last_pid_max_refresh_ns = now_ns;
833 true
834 }
835
836 /// Drain and reset the per-tracker namespace-conflict counter — beats
837 /// dropped because the beat's namespace inode disagreed with the slot's
838 /// pinned namespace inode (first-namespace-wins). Surfaced as
839 /// `varta_tracker_namespace_conflict_total`.
840 pub fn drain_namespace_conflicts(&mut self) -> u64 {
841 self.tracker.take_namespace_conflicts()
842 }
843
844 /// Observer's own PID-namespace inode (Linux only; cached). Used by
845 /// `main.rs` to construct recovery `StallSource` values that include
846 /// the observer's namespace for the audit record.
847 pub fn observer_pid_namespace_inode(&self) -> Option<u64> {
848 crate::peer_cred::observer_pid_namespace_inode()
849 }
850
851 /// Drain and reset the tracker invariant-violation counter. Non-zero
852 /// values surface that a defensive fall-through in the hot path
853 /// triggered (e.g. a stale `PidIndex` entry pointed at an out-of-range
854 /// slot). Exposed as `varta_tracker_invariant_violations_total`.
855 pub fn drain_invariant_violations(&mut self) -> u64 {
856 self.tracker.take_invariant_violations()
857 }
858
859 /// Drain and reset the `PidIndex` probe-exhaustion counter — number of
860 /// times a pid lookup ran the full `MAX_PROBE` budget without finding
861 /// a match. Surfaced as `varta_tracker_pid_index_probe_exhausted_total`.
862 pub fn drain_pid_index_probe_exhausted(&mut self) -> u64 {
863 self.tracker.take_probe_exhausted()
864 }
865
866 /// Drain and reset the per-pid rate-limited counter.
867 pub fn drain_per_pid_rate_limited(&mut self) -> u64 {
868 let n = self.rate_limited_total[RateLimitReason::PerPid as usize];
869 self.rate_limited_total[RateLimitReason::PerPid as usize] = 0;
870 n
871 }
872
873 /// Drain and reset the global rate-limited counter.
874 pub fn drain_global_rate_limited(&mut self) -> u64 {
875 let n = self.rate_limited_total[RateLimitReason::Global as usize];
876 self.rate_limited_total[RateLimitReason::Global as usize] = 0;
877 n
878 }
879
880 /// Effective `SO_RCVBUF` size granted by the kernel for the observer UDS.
881 pub fn uds_rcvbuf_bytes(&self) -> u32 {
882 self.uds_rcvbuf_bytes
883 }
884
885 /// Drain and reset the AEAD decryption failure counter across all
886 /// listeners.
887 pub fn drain_decrypt_failures(&mut self) -> u64 {
888 self.listeners
889 .iter_mut()
890 .map(|l| l.drain_decrypt_failures())
891 .sum()
892 }
893
894 /// Drain and reset the truncated-datagram counter across all listeners.
895 pub fn drain_truncated(&mut self) -> u64 {
896 self.listeners.iter_mut().map(|l| l.drain_truncated()).sum()
897 }
898
899 /// Drain and reset the sender-state-full counter across all listeners.
900 pub fn drain_sender_state_full(&mut self) -> u64 {
901 self.listeners
902 .iter_mut()
903 .map(|l| l.drain_sender_state_full())
904 .sum()
905 }
906
907 /// Drain and reset the AEAD-decryption-attempt counter across all
908 /// listeners. In steady state this equals
909 /// `frames_received * (keys.len() + master_key_configured as u64)` for
910 /// the secure-UDP listener — every loaded key is tried per frame to
911 /// remove the key-rotation timing side-channel.
912 pub fn drain_aead_attempts(&mut self) -> u64 {
913 self.listeners
914 .iter_mut()
915 .map(|l| l.drain_aead_attempts())
916 .sum()
917 }
918
919 /// Drain and reset the parent-directory fsync failure counter for UDS
920 /// bind. Non-zero only when the OS returned an error from `fsync(2)` on
921 /// the socket's parent directory during startup. Surfaced as
922 /// `varta_socket_bind_dir_fsync_failed_total`.
923 pub fn drain_bind_dir_fsync_failures() -> u64 {
924 crate::listener::drain_bind_dir_fsync_failures()
925 }
926}
927
928#[cfg(test)]
929mod tests {
930 use super::*;
931 use crate::tracker::DEFAULT_EVICTION_SCAN_WINDOW;
932 use std::path::PathBuf;
933 use std::sync::atomic::{AtomicU64, Ordering};
934
935 static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
936
937 fn unique_sock_path() -> PathBuf {
938 let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
939 let mut p = std::env::temp_dir();
940 p.push(format!(
941 "varta-observer-drop-{}-{}.sock",
942 std::process::id(),
943 n
944 ));
945 let _ = std::fs::remove_file(&p);
946 p
947 }
948
949 #[test]
950 #[allow(unsafe_code)]
951 fn drop_unlinks_bound_socket() {
952 // SAFETY: unit-test runner may be multi-threaded; the umask window is
953 // benign since no concurrent thread creates files at our temp path.
954 let pre = unsafe { PreThreadAttestation::new_unchecked() };
955 let path = unique_sock_path();
956 let obs = Observer::bind(
957 &path,
958 Duration::from_secs(1),
959 0o600,
960 Duration::from_millis(100),
961 0,
962 64,
963 EvictionPolicy::Strict,
964 DEFAULT_EVICTION_SCAN_WINDOW,
965 None,
966 0,
967 0,
968 ClockSource::Monotonic,
969 &pre,
970 )
971 .expect("bind should succeed on a clean temp path");
972 assert!(path.exists(), "socket file must exist after bind");
973 drop(obs);
974 assert!(
975 !path.exists(),
976 "socket file must be removed after observer drop"
977 );
978 }
979
980 #[test]
981 fn maybe_refresh_pid_max_respects_interval() {
982 // Drive the cadence gate without exercising the /proc read itself —
983 // the value `read_pid_max` returns is host-dependent (kernel default
984 // 4_194_304 on Linux, u32::MAX elsewhere); we assert the gate's
985 // *timing* contract, not the value.
986 //
987 // The observer's monotonic clock is anchored to `Observer::new` (see
988 // `Clock::new`), so `now_ns()` starts near zero and only crosses
989 // PID_MAX_REFRESH_INTERVAL_NS after ~60 s of real uptime. The test
990 // advances the observer's `last_now_ns` directly via the forward
991 // clamp to simulate elapsed time without sleeping.
992 let mut obs = Observer::new(
993 Duration::from_secs(1),
994 64,
995 EvictionPolicy::Strict,
996 DEFAULT_EVICTION_SCAN_WINDOW,
997 None,
998 0,
999 0,
1000 ClockSource::Monotonic,
1001 )
1002 .expect("Observer::new should succeed");
1003
1004 let initial = obs.pid_max();
1005 assert_eq!(
1006 obs.last_pid_max_refresh_ns, 0,
1007 "fresh Observer has not yet run a periodic refresh"
1008 );
1009
1010 // Immediately after construction the observer clock is still inside
1011 // the startup window. `now_ns() - 0 < INTERVAL`, so the gate skips:
1012 // `Observer::new` has already read pid_max, no need to re-read yet.
1013 let refreshed_at_startup = obs.maybe_refresh_pid_max();
1014 assert!(
1015 !refreshed_at_startup,
1016 "first call within startup window must skip (Observer::new already read pid_max)"
1017 );
1018 assert_eq!(
1019 obs.last_pid_max_refresh_ns, 0,
1020 "skip must leave the timestamp untouched"
1021 );
1022
1023 // Simulate >60 s of observer uptime by pushing the forward-clamped
1024 // monotonic anchor past the interval. The next `now_ns()` reading
1025 // will be clamped to at least this value.
1026 obs.last_now_ns = PID_MAX_REFRESH_INTERVAL_NS + 1_000_000_000;
1027 // The forward clamp registers the real raw clock as a regression
1028 // when computing now_ns; drain it so unrelated tests stay clean.
1029 let refreshed_after_interval = obs.maybe_refresh_pid_max();
1030 assert!(
1031 refreshed_after_interval,
1032 "refresh must fire once the interval has elapsed since startup"
1033 );
1034 let first_ts = obs.last_pid_max_refresh_ns;
1035 assert!(
1036 first_ts >= PID_MAX_REFRESH_INTERVAL_NS,
1037 "post-interval refresh stamps a fresh timestamp >= INTERVAL"
1038 );
1039 assert_eq!(
1040 obs.pid_max(),
1041 initial,
1042 "refresh re-reads the same host value within a single test process"
1043 );
1044
1045 // Immediate follow-up: the gate must close again until another full
1046 // interval elapses.
1047 let refreshed_again = obs.maybe_refresh_pid_max();
1048 assert!(
1049 !refreshed_again,
1050 "second call within new interval must skip"
1051 );
1052 assert_eq!(
1053 obs.last_pid_max_refresh_ns, first_ts,
1054 "skip must leave the new timestamp untouched"
1055 );
1056
1057 // Rewind the recorded timestamp by more than the interval and confirm
1058 // the gate opens again.
1059 obs.last_pid_max_refresh_ns = first_ts.saturating_sub(PID_MAX_REFRESH_INTERVAL_NS + 1);
1060 let refreshed_after_rewind = obs.maybe_refresh_pid_max();
1061 assert!(
1062 refreshed_after_rewind,
1063 "refresh must fire after rewinding the recorded timestamp"
1064 );
1065 assert!(
1066 obs.last_pid_max_refresh_ns >= first_ts,
1067 "rewind-driven refresh records a fresh timestamp"
1068 );
1069
1070 // Test produced clock regressions as a side effect of pushing
1071 // `last_now_ns` past the real raw clock; drain so subsequent suite
1072 // state stays neutral. The count is non-deterministic (depends on
1073 // how many `now_ns()` calls were issued by `maybe_refresh_pid_max`).
1074 let _ = obs.drain_clock_regressions();
1075 }
1076
1077 #[test]
1078 fn clock_regression_counter_increments_on_backward_clock() {
1079 let mut obs = Observer::new(
1080 Duration::from_secs(1),
1081 64,
1082 EvictionPolicy::Strict,
1083 DEFAULT_EVICTION_SCAN_WINDOW,
1084 None,
1085 0,
1086 0,
1087 ClockSource::Monotonic,
1088 )
1089 .expect("Observer::new should succeed");
1090
1091 // Baseline reading — the forward clamp seeds `last_now_ns` from the
1092 // current monotonic value. No regression yet.
1093 let _ = obs.now_ns();
1094 assert_eq!(
1095 obs.drain_clock_regressions(),
1096 0,
1097 "no regressions after the first reading"
1098 );
1099
1100 // Simulate the kernel clock having previously reported a value far
1101 // in the future (e.g. before a VM live migration that rewound the
1102 // TSC). The next `now_ns()` call reads a real value strictly less
1103 // than `last_now_ns`, so the forward clamp absorbs it AND the
1104 // regression counter must increment.
1105 obs.last_now_ns = u64::MAX / 2;
1106 let clamped = obs.now_ns();
1107 assert_eq!(
1108 clamped,
1109 u64::MAX / 2,
1110 "forward clamp preserves the larger value"
1111 );
1112 assert_eq!(
1113 obs.drain_clock_regressions(),
1114 1,
1115 "exactly one regression observed"
1116 );
1117
1118 // Drain resets — a second drain reads zero.
1119 assert_eq!(
1120 obs.drain_clock_regressions(),
1121 0,
1122 "drain must reset the counter"
1123 );
1124
1125 // A second backward excursion bumps the counter again.
1126 obs.last_now_ns = u64::MAX / 2;
1127 let _ = obs.now_ns();
1128 obs.last_now_ns = u64::MAX / 2;
1129 let _ = obs.now_ns();
1130 assert_eq!(
1131 obs.drain_clock_regressions(),
1132 2,
1133 "counter is saturating-add cumulative until drained"
1134 );
1135 }
1136
1137 #[test]
1138 fn clock_jump_forward_counter_increments_on_large_advance() {
1139 let mut obs = Observer::new(
1140 Duration::from_secs(1),
1141 64,
1142 EvictionPolicy::Strict,
1143 DEFAULT_EVICTION_SCAN_WINDOW,
1144 None,
1145 0,
1146 0,
1147 ClockSource::Monotonic,
1148 )
1149 .expect("Observer::new should succeed");
1150
1151 // Feed synthetic timestamps via apply_raw_clock_test so we don't need
1152 // to wait real time. Simulate a baseline reading then a 10 s jump.
1153 let _ = obs.apply_raw_clock_test(1_000_000); // prime: 1 ms from baseline
1154 let _ = obs.apply_raw_clock_test(11_000_000_000); // +10 s jump
1155 assert_eq!(
1156 obs.drain_clock_jumps_forward(),
1157 1,
1158 "forward jump exceeding threshold must increment the counter"
1159 );
1160 assert_eq!(
1161 obs.drain_clock_regressions(),
1162 0,
1163 "a forward jump must not also count as a regression"
1164 );
1165
1166 // Drain resets — second drain reads zero.
1167 assert_eq!(
1168 obs.drain_clock_jumps_forward(),
1169 0,
1170 "drain must reset the forward-jump counter"
1171 );
1172
1173 // A sub-threshold advance (2 s) must not be counted.
1174 let _ = obs.apply_raw_clock_test(13_000_000_000); // +2 s — below 5 s sentinel
1175 assert_eq!(
1176 obs.drain_clock_jumps_forward(),
1177 0,
1178 "advance below threshold must not be counted as a jump"
1179 );
1180
1181 // Bootstrap case: last_now_ns == 0 must not trigger a jump (startup).
1182 obs.last_now_ns = 0;
1183 let _ = obs.apply_raw_clock_test(10_000_000_000); // 10 s from zero
1184 assert_eq!(
1185 obs.drain_clock_jumps_forward(),
1186 0,
1187 "initial read from last_now_ns==0 must not count as a forward jump"
1188 );
1189 }
1190}