Skip to main content

varta_watch/
observer.rs

1//! Single-threaded observer: bind one or more transport listeners, decode
2//! incoming VLP frames, surface beats / stalls / decode errors via [`Event`].
3//!
4//! The observer never spawns threads, never allocates after setup,
5//! and surfaces at most one [`Event`] per call to [`Observer::poll`]. The
6//! caller drives the loop — see `main.rs` for the daemon entrypoint.
7//!
8//! Multiple listeners (e.g. UDS + UDP) are polled round-robin. Each call to
9//! [`Observer::poll`] tries every listener once; the first non-`WouldBlock`
10//! event is returned but all remaining listeners are still tried, so a
11//! busy listener cannot starve co-located listeners. If all listeners return
12//! `WouldBlock`, stalls are drained and `None` is returned.
13
14use std::io;
15use std::path::Path;
16use std::time::Duration;
17
18use varta_vlp::{DecodeError, Frame, Status};
19
20use crate::clock::{Clock, ClockSource};
21use crate::listener::{BeatListener, PreThreadAttestation, UdsListener};
22use crate::peer_cred::{BeatOrigin, RecvResult};
23use crate::tracker::{EvictionPolicy, Tracker, Update};
24
25/// Reason a beat was dropped by the rate limiter.
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub(crate) enum RateLimitReason {
28    PerPid = 0,
29    Global = 1,
30}
31
32pub(crate) const RATE_LIMIT_N: usize = 2;
33
34/// Forward-jump sentinel: a single poll-tick advance exceeding this threshold
35/// is counted as an anomalous forward jump (sleep/wake, VM live migration,
36/// hypervisor pause). 5 s is far above worst-case poll-tick latency on a
37/// loaded host and far below any plausible sleep or migration interval.
38const CLOCK_JUMP_FORWARD_THRESHOLD_NS: u64 = 5_000_000_000;
39
40/// Re-read `/proc/sys/kernel/pid_max` at most every 60 s. Bounded so that an
41/// operator-driven `sysctl -w kernel.pid_max=...` change is picked up without
42/// daemon restart; coarse enough that the `/proc` read never appears on any
43/// latency profile (the refresh runs in the maintenance phase, not on the
44/// poll hot path). Hardcoded — no CLI knob, matching the self-watchdog
45/// cadence convention.
46const PID_MAX_REFRESH_INTERVAL_NS: u64 = 60_000_000_000;
47
48/// Global per-observer token bucket — one shared across all senders.
49///
50/// Guards against per-pid rotation attacks where an attacker cycles through
51/// fake pids to keep every per-pid bucket empty.
52///
53/// Disabled when `capacity_milli == 0`.  All arithmetic is integer-only
54/// (milli-tokens) to stay allocation-free on the hot path.
55pub(crate) struct GlobalRateLimit {
56    /// Current token count in milli-tokens (1000 milli-tokens = 1 frame allowed).
57    tokens_milli: u64,
58    /// Maximum token count (= burst × 1000).
59    capacity_milli: u64,
60    /// Tokens added per nanosecond × 1_000_000 to keep integer math.
61    /// Stored as (rate_per_sec * 1_000_000) to avoid float division.
62    refill_numerator: u64,
63    /// Denominator for refill: 1_000_000_000 (ns per sec).
64    refill_denominator: u64,
65    /// Nanosecond timestamp of last refill.
66    last_refill_ns: u64,
67}
68
69impl GlobalRateLimit {
70    /// Construct a new token bucket.  `rate_per_sec = 0` or `burst = 0`
71    /// produces a disabled bucket (always allows).
72    pub(crate) fn new(rate_per_sec: u32, burst: u32) -> Self {
73        if rate_per_sec == 0 || burst == 0 {
74            return GlobalRateLimit {
75                tokens_milli: 0,
76                capacity_milli: 0,
77                refill_numerator: 0,
78                refill_denominator: 1,
79                last_refill_ns: 0,
80            };
81        }
82        let capacity_milli = (burst as u64).saturating_mul(1_000);
83        GlobalRateLimit {
84            tokens_milli: capacity_milli,
85            capacity_milli,
86            refill_numerator: (rate_per_sec as u64).saturating_mul(1_000_000),
87            refill_denominator: 1_000_000_000,
88            last_refill_ns: 0,
89        }
90    }
91
92    /// Disabled when capacity is 0 — all frames pass.
93    #[inline]
94    pub(crate) fn is_disabled(&self) -> bool {
95        self.capacity_milli == 0
96    }
97
98    /// Try to consume one token.  Returns `true` if the frame is allowed,
99    /// `false` if the global bucket is exhausted.
100    #[inline]
101    pub(crate) fn try_consume(&mut self, now_ns: u64) -> bool {
102        if self.is_disabled() {
103            return true;
104        }
105        // Lazy refill: add tokens proportional to elapsed time since last refill.
106        let elapsed_ns = now_ns.saturating_sub(self.last_refill_ns);
107        if elapsed_ns > 0 {
108            let added = elapsed_ns
109                .saturating_mul(self.refill_numerator)
110                .checked_div(self.refill_denominator)
111                .unwrap_or(0);
112            self.tokens_milli = self
113                .tokens_milli
114                .saturating_add(added)
115                .min(self.capacity_milli);
116            self.last_refill_ns = now_ns;
117        }
118        // Consume 1000 milli-tokens (= 1 frame).
119        if self.tokens_milli >= 1_000 {
120            self.tokens_milli -= 1_000;
121            true
122        } else {
123            false
124        }
125    }
126}
127
128/// Event surfaced by [`Observer::poll`].
129///
130/// Each call to `poll` returns at most one event. Unknown-pid overflow and
131/// out-of-order beats are silently dropped at this layer; the bench / metrics
132/// sessions can layer counters on top without changing this enum.
133#[derive(Debug)]
134pub enum Event {
135    /// A well-formed beat was accepted for a tracked pid.
136    Beat {
137        /// OS process id of the emitting agent.
138        pid: u32,
139        /// Decoded health status of the beat.
140        status: Status,
141        /// Application-defined payload carried by the beat.
142        payload: u32,
143        /// Monotonic nonce of the beat.
144        nonce: u64,
145        /// Transport-class classification of the beat (see [`BeatOrigin`]).
146        /// Recovery commands consult this to refuse firing on non-kernel-attested origins.
147        origin: BeatOrigin,
148        /// Kernel-attested PID-namespace inode of the sender (Linux only).
149        /// `None` for non-Linux platforms, UDP transports, or when the peer's
150        /// `/proc/<pid>/ns/pid` was unreadable.
151        pid_ns_inode: Option<u64>,
152        /// Observer-local timestamp (ns since [`Observer`] start) when this
153        /// event was produced.
154        observer_ns: u64,
155    },
156    /// A tracked pid has not beaten within the configured threshold and the
157    /// observer has not yet surfaced a stall event for this silence run.
158    Stall {
159        /// OS process id of the silent agent.
160        pid: u32,
161        /// Last nonce observed for this pid.
162        last_nonce: u64,
163        /// Observer-local timestamp (ns since [`Observer`] start) of the
164        /// last accepted beat for this pid.
165        last_ns: u64,
166        /// Transport origin pinned by the slot's first beat. Recovery
167        /// refuses to spawn for `NetworkUnverified` unless the operator has
168        /// explicitly opted in via
169        /// `--i-accept-recovery-on-unauthenticated-transport`.
170        origin: BeatOrigin,
171        /// PID-namespace inode pinned by the slot's first beat (Linux only).
172        /// Used by main.rs to construct the recovery `StallSource`: a
173        /// `Some(_)` value that differs from the observer's namespace inode
174        /// indicates a cross-namespace agent and gates recovery refusal.
175        pid_ns_inode: Option<u64>,
176        /// Observer-local timestamp (ns since [`Observer`] start) when this
177        /// stall event was produced.
178        observer_ns: u64,
179    },
180    /// A 32-byte payload arrived but failed VLP decoding.
181    Decode(DecodeError, u64),
182    /// Frame decoded but the `frame.pid` does not match the kernel-verified
183    /// peer PID of the sender. The claimed pid is preserved so exporters can
184    /// record what the frame *claimed* to be.
185    AuthFailure {
186        /// The pid the frame on the wire claimed to be.
187        claimed_pid: u32,
188        /// Observer-local timestamp (ns since [`Observer`] start) when this
189        /// event was produced.
190        observer_ns: u64,
191    },
192    /// A beat arrived for an already-tracked pid, but its transport origin
193    /// disagreed with the origin pinned by the slot's first beat. The slot
194    /// was not mutated; the beat was dropped. First-origin-wins prevents an
195    /// attacker on an untrusted transport from "tainting" a slot that
196    /// legitimately belongs to a kernel-attested agent.
197    OriginConflict {
198        /// The pid claimed by the dropped beat (same as the existing slot's pid).
199        claimed_pid: u32,
200        /// Transport origin observed on this datagram.
201        observed_origin: BeatOrigin,
202        /// Origin pinned by the slot (the one that "won" the conflict).
203        slot_origin: BeatOrigin,
204        /// Observer-local timestamp (ns since [`Observer`] start) when this
205        /// event was produced.
206        observer_ns: u64,
207    },
208    /// A kernel-attested beat arrived whose peer PID-namespace inode differs
209    /// from the observer's namespace (Linux only). Recovery for the
210    /// associated pid cannot safely fire because the pid is in a different
211    /// namespace — `kill(2)` and `systemctl` would target the wrong process.
212    /// The beat was dropped at receive; the tracker was not modified.
213    NamespaceConflict {
214        /// The pid claimed by the dropped beat.
215        claimed_pid: u32,
216        /// PID-namespace inode of the sender (Linux only; `None` when
217        /// `/proc/<peer_pid>/ns/pid` was unreadable).
218        observed_ns_inode: Option<u64>,
219        /// The observer's own PID-namespace inode (cached at startup; `None`
220        /// when `/proc/self/ns/pid` is unreadable, which usually means the
221        /// platform isn't Linux).
222        observer_ns_inode: Option<u64>,
223        /// Observer-local timestamp (ns since [`Observer`] start) when this
224        /// event was produced.
225        observer_ns: u64,
226    },
227    /// Receiving from a listener failed with an error other than
228    /// `WouldBlock` / `TimedOut`.
229    Io(io::Error, u64),
230    /// Ancillary data truncated by the kernel (`MSG_CTRUNC` on Linux).
231    /// Indicates the kernel's ancillary-data buffer was too small for the
232    /// per-message metadata — a kernel-level buffer sizing issue.
233    CtrlTruncated(io::Error, u64),
234}
235
236/// Observer bound to one or more transport listeners.
237///
238/// The observer owns all listeners; cleanup (e.g. socket file unlink) happens
239/// when the [`Observer`] is dropped.
240pub struct Observer {
241    listeners: Vec<Box<dyn BeatListener>>,
242    tracker: Tracker,
243    threshold_ns: u64,
244    clock: Clock,
245    stall_queue: Vec<Option<Event>>,
246    stall_cursor: usize,
247    /// Next index to start polling from for fair round-robin across listeners.
248    next_listener_start: usize,
249    /// Minimum inter-beat interval applied per pid, in nanoseconds.
250    /// `None` means no rate limiting (the default).
251    rate_limit_interval_ns: Option<u64>,
252    /// Beats dropped by the per-pid and global rate limiters since the last drain.
253    /// Index 0 = per-pid (`RateLimitReason::PerPid`), 1 = global (`RateLimitReason::Global`).
254    rate_limited_total: [u64; RATE_LIMIT_N],
255    /// Global per-observer token bucket for defeating per-pid rotation attacks.
256    global_rl: GlobalRateLimit,
257    /// Monotonicity guard — last `now_ns()` value, clamped forward-only to
258    /// survive TSC drift and VM live migration.
259    last_now_ns: u64,
260    /// Count of times the underlying monotonic clock returned a value
261    /// strictly less than `last_now_ns` and the clamp absorbed the
262    /// regression. Surfaced as `varta_observer_clock_regression_total` so
263    /// operators can alert on TSC drift / VM-live-migration events that
264    /// would otherwise be invisible. Drained via
265    /// [`Observer::drain_clock_regressions`].
266    clock_regressions: u64,
267    /// Count of times consecutive `now_ns()` readings advanced by more than
268    /// [`CLOCK_JUMP_FORWARD_THRESHOLD_NS`] in a single poll tick. This
269    /// captures sleep/wake on `monotonic-raw`/`boottime`, VM live migration,
270    /// and hypervisor pauses that are invisible to the regression counter.
271    /// Surfaced as `varta_observer_clock_jump_forward_total`. Drained via
272    /// [`Observer::drain_clock_jumps_forward`].
273    clock_jumps_forward: u64,
274    /// When true, beats from agents whose kernel-attested PID namespace
275    /// differs from the observer's are admitted into the tracker (and may
276    /// later be passed to recovery). Set by `--allow-cross-namespace-agents`.
277    /// Default `false` — beats from cross-namespace agents are dropped at
278    /// ingress and counted via [`Observer::drain_cross_namespace_drops`].
279    allow_cross_namespace: bool,
280    /// Count of beats dropped at ingress because the kernel-attested peer's
281    /// PID namespace inode differs from the observer's. Linux-only signal;
282    /// 0 on other platforms.
283    cross_namespace_drops: u64,
284    /// Maximum PID accepted on the wire — cached from
285    /// `/proc/sys/kernel/pid_max` on Linux at observer startup. On non-Linux
286    /// targets and when `/proc` is unreadable, this is `u32::MAX` (gate
287    /// effectively disabled). See [`crate::pid_max::read_pid_max`].
288    pid_max: u32,
289    /// Count of beats dropped at ingress because `frame.pid > pid_max`.
290    /// Surfaced as `varta_frame_rejected_pid_above_max_total`.
291    pid_above_max_drops: u64,
292    /// Monotonic-clock timestamp (ns) of the most recent `pid_max` refresh
293    /// from `/proc/sys/kernel/pid_max`. `0` until the first periodic refresh
294    /// fires from [`Observer::maybe_refresh_pid_max`]; the value cached at
295    /// `Observer::new` covers the startup window until then. Compared against
296    /// `self.now_ns()` with [`PID_MAX_REFRESH_INTERVAL_NS`].
297    last_pid_max_refresh_ns: u64,
298    /// Effective `SO_RCVBUF` size granted by the kernel for the observer UDS,
299    /// in bytes.  `0` if `--uds-rcvbuf-bytes 0` was used or tuning failed.
300    /// Set by [`Observer::bind`] from the [`UdsListener::rcvbuf_bytes`] accessor.
301    pub uds_rcvbuf_bytes: u32,
302}
303
304impl Observer {
305    /// Create an empty observer with no listeners. Use
306    /// [`Observer::add_listener`] to attach transports, or call
307    /// [`Observer::bind`] for the common single-UDS case.
308    ///
309    /// `tracker_capacity` sets the maximum number of distinct agent pids
310    /// tracked concurrently. Beats for new pids beyond this limit are
311    /// dropped with [`Update::CapacityExceeded`] (the counter is surfaced
312    /// via `varta_tracker_capacity_exceeded_total`).
313    ///
314    /// `eviction_policy` controls which slot to reclaim when the tracker
315    /// is full and a new pid arrives ([`EvictionPolicy::Strict`] only
316    /// evicts confirmed-stalled agents; [`EvictionPolicy::Balanced`] also
317    /// evicts the oldest active slot to prevent capacity exhaustion).
318    ///
319    /// `max_beat_rate` is an optional per-pid rate limit in beats per
320    /// second.  When set, beats arriving faster than this rate from the
321    /// same pid are dropped and counted via [`Observer::drain_rate_limited`].
322    /// `None` (the default) disables rate limiting.
323    #[allow(clippy::too_many_arguments)]
324    pub fn new(
325        threshold: Duration,
326        tracker_capacity: usize,
327        eviction_policy: EvictionPolicy,
328        eviction_scan_window: usize,
329        max_beat_rate: Option<u32>,
330        global_beat_rate: u32,
331        global_beat_burst: u32,
332        clock_source: ClockSource,
333    ) -> io::Result<Self> {
334        let threshold_ns = threshold.as_nanos().min(u64::MAX as u128) as u64;
335        let rate_limit_interval_ns = max_beat_rate.and_then(|rps| {
336            if rps == 0 {
337                None
338            } else {
339                // Convert beats/sec to nanosecond interval.
340                // Saturate at 1 ns (1 GHz rate) to avoid overflow.
341                let interval_ns = 1_000_000_000u64.checked_div(rps as u64).unwrap_or(1);
342                Some(interval_ns)
343            }
344        });
345        let clock = Clock::new(clock_source).map_err(io::Error::from)?;
346        Ok(Observer {
347            listeners: Vec::new(),
348            tracker: Tracker::new(tracker_capacity, eviction_policy, eviction_scan_window),
349            threshold_ns,
350            clock,
351            stall_queue: Vec::with_capacity(tracker_capacity),
352            stall_cursor: 0,
353            next_listener_start: 0,
354            rate_limit_interval_ns,
355            rate_limited_total: [0; RATE_LIMIT_N],
356            global_rl: GlobalRateLimit::new(global_beat_rate, global_beat_burst),
357            last_now_ns: 0,
358            clock_regressions: 0,
359            clock_jumps_forward: 0,
360            allow_cross_namespace: false,
361            cross_namespace_drops: 0,
362            pid_max: crate::pid_max::read_pid_max(),
363            pid_above_max_drops: 0,
364            last_pid_max_refresh_ns: 0,
365            uds_rcvbuf_bytes: 0,
366        })
367    }
368
369    /// Allow beats from agents whose kernel-attested PID namespace differs
370    /// from the observer's own namespace. Default `false`. Wired from the
371    /// `--allow-cross-namespace-agents` CLI flag.
372    pub fn with_allow_cross_namespace(mut self, allow: bool) -> Self {
373        self.allow_cross_namespace = allow;
374        self
375    }
376
377    /// Create an observer from a single already-configured listener.
378    #[allow(clippy::too_many_arguments)]
379    pub fn from_listener<L: BeatListener + 'static>(
380        listener: L,
381        threshold: Duration,
382        tracker_capacity: usize,
383        eviction_policy: EvictionPolicy,
384        eviction_scan_window: usize,
385        max_beat_rate: Option<u32>,
386        global_beat_rate: u32,
387        global_beat_burst: u32,
388        clock_source: ClockSource,
389    ) -> io::Result<Self> {
390        let mut obs = Self::new(
391            threshold,
392            tracker_capacity,
393            eviction_policy,
394            eviction_scan_window,
395            max_beat_rate,
396            global_beat_rate,
397            global_beat_burst,
398            clock_source,
399        )?;
400        obs.add_listener(Box::new(listener));
401        Ok(obs)
402    }
403
404    /// Bind a Unix datagram socket at `path` and return an [`Observer`]
405    /// with that single UDS listener.
406    ///
407    /// This is the backward-compatible convenience constructor for the common
408    /// single-UDS case. For multi-transport setups, use [`Observer::new`]
409    /// followed by [`Observer::add_listener`].
410    #[allow(clippy::too_many_arguments)]
411    pub fn bind(
412        path: impl AsRef<Path>,
413        threshold: Duration,
414        socket_mode: u32,
415        read_timeout: Duration,
416        uds_rcvbuf_bytes: u32,
417        tracker_capacity: usize,
418        eviction_policy: EvictionPolicy,
419        eviction_scan_window: usize,
420        max_beat_rate: Option<u32>,
421        global_beat_rate: u32,
422        global_beat_burst: u32,
423        clock_source: ClockSource,
424        pre_thread: &PreThreadAttestation,
425    ) -> io::Result<Self> {
426        let listener = UdsListener::bind(
427            path,
428            socket_mode,
429            read_timeout,
430            uds_rcvbuf_bytes,
431            pre_thread,
432        )?;
433        let rcvbuf = listener.rcvbuf_bytes();
434        let mut obs = Self::from_listener(
435            listener,
436            threshold,
437            tracker_capacity,
438            eviction_policy,
439            eviction_scan_window,
440            max_beat_rate,
441            global_beat_rate,
442            global_beat_burst,
443            clock_source,
444        )?;
445        obs.uds_rcvbuf_bytes = rcvbuf;
446        Ok(obs)
447    }
448
449    /// Add a listener to the observer. The listener is polled in round-robin
450    /// order alongside any existing listeners.
451    pub fn add_listener(&mut self, listener: Box<dyn BeatListener>) {
452        self.listeners.push(listener);
453    }
454
455    /// Poll every listener once round-robin and return the first
456    /// non-`WouldBlock` [`Event`] found. Each listener is tried exactly
457    /// once per call — a busy listener cannot starve others because the
458    /// round-robin cursor (`next_listener_start`) advances past each
459    /// non-`WouldBlock` listener on every call.
460    ///
461    /// **Latency bound:** worst-case per-call work is
462    /// `N_listeners × per-listener-recv-cost + eviction_scan_window`.
463    /// Under the canonical stress profile (3 listeners, 4096 tracker
464    /// capacity, 256-slot eviction window) the p99 iteration time is
465    /// ≤ 5 ms — see `book/src/architecture/observer-liveness.md` and the
466    /// `tick-distribution` bench (`cargo run -p varta-bench --release --
467    /// tick-distribution`) which asserts this bound under sustained load.
468    ///
469    /// This method never returns [`Event::Stall`] — queued stall events must
470    /// be retrieved via [`Observer::poll_pending`].
471    pub fn poll(&mut self) -> Option<Event> {
472        let len = self.listeners.len();
473        let start = self.next_listener_start;
474        let mut first_event: Option<Event> = None;
475        let mut round = 0;
476        while round < len {
477            let i = (start + round) % len;
478            round += 1;
479            match self.listeners[i].recv() {
480                RecvResult::Authenticated {
481                    peer_pid,
482                    peer_uid: _,
483                    peer_pid_ns_inode,
484                    origin,
485                    data,
486                } => {
487                    let now_ns = self.now_ns();
488                    if first_event.is_none() {
489                        self.next_listener_start = (i + 1) % len;
490                    }
491                    match Frame::decode(&data) {
492                        Ok(frame) => {
493                            // Observer-side PID range gate. VLP rejects 0/1
494                            // as wire-format `BadPid`; here we additionally
495                            // reject any pid above the kernel's configured
496                            // `pid_max` (Linux) — no live process can hold
497                            // that id, so the frame is either corrupted or
498                            // forged. Non-Linux: `pid_max == u32::MAX`,
499                            // gate is a no-op.
500                            if frame.pid > self.pid_max {
501                                self.pid_above_max_drops =
502                                    self.pid_above_max_drops.saturating_add(1);
503                                continue;
504                            }
505                            // Per-datagram PID verification — works on Linux
506                            // (SCM_CREDENTIALS via SO_PASSCRED) and macOS
507                            // (LOCAL_PEERTOKEN via getsockopt). For transports
508                            // without kernel credential support, peer_pid is 0
509                            // and this check is a no-op.
510                            if peer_pid != 0 && frame.pid != peer_pid {
511                                if first_event.is_none() {
512                                    first_event = Some(Event::AuthFailure {
513                                        claimed_pid: frame.pid,
514                                        observer_ns: now_ns,
515                                    });
516                                }
517                                continue;
518                            }
519                            // Global token bucket: drop BEFORE namespace /
520                            // per-pid classification so a rotation attack
521                            // cannot exhaust classification work.
522                            if !self.global_rl.try_consume(now_ns) {
523                                self.rate_limited_total[RateLimitReason::Global as usize] =
524                                    self.rate_limited_total[RateLimitReason::Global as usize]
525                                        .saturating_add(1);
526                                continue;
527                            }
528                            // Cross-namespace gate (Linux only). When the
529                            // kernel-attested peer's PID namespace inode
530                            // differs from the observer's, the frame.pid
531                            // cannot safely be used to target recovery
532                            // commands. The check is a no-op on non-Linux
533                            // (both inodes are `None`), for UDP transports
534                            // (peer inode is `None`), and when the operator
535                            // has opted in via --allow-cross-namespace-agents.
536                            let observer_ns_inode =
537                                crate::peer_cred::observer_pid_namespace_inode();
538                            let cross_ns = matches!(
539                                (observer_ns_inode, peer_pid_ns_inode),
540                                (Some(a), Some(b)) if a != b
541                            );
542                            if cross_ns && !self.allow_cross_namespace {
543                                self.cross_namespace_drops =
544                                    self.cross_namespace_drops.saturating_add(1);
545                                if first_event.is_none() {
546                                    first_event = Some(Event::NamespaceConflict {
547                                        claimed_pid: frame.pid,
548                                        observed_ns_inode: peer_pid_ns_inode,
549                                        observer_ns_inode,
550                                        observer_ns: now_ns,
551                                    });
552                                }
553                                continue;
554                            }
555                            // Per-pid rate limiting: if a minimum inter-beat
556                            // interval is configured, skip frames that arrive
557                            // too soon from the same pid.
558                            if let Some(interval_ns) = self.rate_limit_interval_ns {
559                                if let Some(last_ns) = self.tracker.last_ns_of(frame.pid) {
560                                    if now_ns.saturating_sub(last_ns) < interval_ns {
561                                        self.rate_limited_total[RateLimitReason::PerPid as usize] =
562                                            self.rate_limited_total
563                                                [RateLimitReason::PerPid as usize]
564                                                .saturating_add(1);
565                                        continue;
566                                    }
567                                }
568                            }
569                            // Capture the slot's pre-record pinned origin (if
570                            // any) so an OriginConflict event can report what
571                            // the slot was pinned to without an extra lookup
572                            // afterwards.
573                            let slot_origin_before = self.tracker.origin_of(frame.pid);
574                            match self.tracker.record(
575                                &frame,
576                                now_ns,
577                                self.threshold_ns,
578                                origin,
579                                peer_pid_ns_inode,
580                            ) {
581                                Update::Inserted | Update::Refreshed => {
582                                    if first_event.is_none() {
583                                        first_event = Some(Event::Beat {
584                                            pid: frame.pid,
585                                            status: frame.status,
586                                            payload: frame.payload,
587                                            nonce: frame.nonce,
588                                            origin,
589                                            pid_ns_inode: peer_pid_ns_inode,
590                                            observer_ns: now_ns,
591                                        });
592                                    }
593                                }
594                                Update::OriginConflict => {
595                                    if first_event.is_none() {
596                                        first_event = Some(Event::OriginConflict {
597                                            claimed_pid: frame.pid,
598                                            observed_origin: origin,
599                                            slot_origin: slot_origin_before.unwrap_or(origin),
600                                            observer_ns: now_ns,
601                                        });
602                                    }
603                                }
604                                Update::NamespaceConflict => {
605                                    if first_event.is_none() {
606                                        first_event = Some(Event::NamespaceConflict {
607                                            claimed_pid: frame.pid,
608                                            observed_ns_inode: peer_pid_ns_inode,
609                                            observer_ns_inode: self
610                                                .tracker
611                                                .pid_ns_inode_of(frame.pid)
612                                                .flatten(),
613                                            observer_ns: now_ns,
614                                        });
615                                    }
616                                }
617                                Update::OutOfOrder | Update::CapacityExceeded => {}
618                            }
619                        }
620                        Err(e) => {
621                            if first_event.is_none() {
622                                first_event = Some(Event::Decode(e, now_ns));
623                            }
624                        }
625                    }
626                }
627                RecvResult::WouldBlock => continue,
628                RecvResult::ShortRead => continue,
629                RecvResult::CtrlTruncated(e) => {
630                    if first_event.is_none() {
631                        self.next_listener_start = (i + 1) % len;
632                        first_event = Some(Event::CtrlTruncated(e, self.now_ns()));
633                    }
634                }
635                RecvResult::IoError(e) => {
636                    if first_event.is_none() {
637                        self.next_listener_start = (i + 1) % len;
638                        first_event = Some(Event::Io(e, self.now_ns()));
639                    }
640                }
641            }
642        }
643        self.drain_stalls();
644        first_event
645    }
646
647    /// Return the next queued [`Event::Stall`], if any.
648    pub fn poll_pending(&mut self) -> Option<Event> {
649        if self.stall_cursor < self.stall_queue.len() {
650            let stall = self.stall_queue[self.stall_cursor].take();
651            self.stall_cursor += 1;
652            return stall;
653        }
654        None
655    }
656
657    /// Whether the stall queue has unconsumed [`Event::Stall`] entries.
658    pub fn has_pending_stalls(&self) -> bool {
659        self.stall_cursor < self.stall_queue.len()
660    }
661
662    /// Observer-local nanosecond timestamp (ns since [`Observer`] start).
663    ///
664    /// Clamped to never decrease — on some platforms (VMs with TSC drift,
665    /// live-migration pause-and-resume), the underlying clock can produce
666    /// values that appear to go backwards. Without clamping, a forward clock
667    /// jump after a backward excursion can cause false stall detections.
668    ///
669    /// The kernel clock backing this reading is selected via
670    /// [`crate::clock::ClockSource`] (`--clock-source` CLI flag); see
671    /// `book/src/architecture/safety-profiles.md` for the SRE vs. medical
672    /// deployment matrix.
673    pub fn now_ns(&mut self) -> u64 {
674        let raw = self.clock.now_ns();
675        self.apply_raw_clock(raw)
676    }
677
678    fn apply_raw_clock(&mut self, raw: u64) -> u64 {
679        if raw < self.last_now_ns {
680            self.clock_regressions = self.clock_regressions.saturating_add(1);
681        } else if self.last_now_ns > 0
682            && raw.saturating_sub(self.last_now_ns) > CLOCK_JUMP_FORWARD_THRESHOLD_NS
683        {
684            self.clock_jumps_forward = self.clock_jumps_forward.saturating_add(1);
685        }
686        self.last_now_ns = self.last_now_ns.max(raw);
687        self.last_now_ns
688    }
689
690    /// Feed a synthetic raw clock value directly, bypassing `self.clock`.
691    /// Only available in tests; allows forward-jump and regression scenarios
692    /// without waiting for real time to advance.
693    #[cfg(test)]
694    pub(crate) fn apply_raw_clock_test(&mut self, raw: u64) -> u64 {
695        self.apply_raw_clock(raw)
696    }
697
698    /// Drain and reset the clock-regression counter — number of times the
699    /// kernel monotonic clock returned a value strictly less than the
700    /// previously observed one and the forward clamp absorbed the
701    /// regression. Non-zero values surface TSC drift, VM live migration,
702    /// or other anomalous clock behavior that would otherwise be invisible.
703    /// Surfaced as `varta_observer_clock_regression_total`.
704    pub fn drain_clock_regressions(&mut self) -> u64 {
705        let n = self.clock_regressions;
706        self.clock_regressions = 0;
707        n
708    }
709
710    /// Drain and reset the forward-jump counter — number of times the kernel
711    /// monotonic clock advanced by more than [`CLOCK_JUMP_FORWARD_THRESHOLD_NS`]
712    /// between adjacent poll ticks. Non-zero values indicate sleep/wake on
713    /// `monotonic-raw`/`boottime`, VM live migration, or a hypervisor pause.
714    /// Surfaced as `varta_observer_clock_jump_forward_total`.
715    pub fn drain_clock_jumps_forward(&mut self) -> u64 {
716        let n = self.clock_jumps_forward;
717        self.clock_jumps_forward = 0;
718        n
719    }
720
721    /// Inspect the kernel clock backing this observer's stall accounting.
722    pub fn clock_source(&self) -> ClockSource {
723        self.clock.source()
724    }
725
726    fn drain_stalls(&mut self) {
727        if self.stall_cursor < self.stall_queue.len() {
728            return;
729        }
730        let now_ns = self.now_ns();
731        self.stall_queue.clear();
732        self.stall_cursor = 0;
733        self.tracker.drain_stalled_slots(
734            now_ns,
735            self.threshold_ns,
736            |pid, last_nonce, last_ns, origin, pid_ns_inode| {
737                self.stall_queue.push(Some(Event::Stall {
738                    pid,
739                    last_nonce,
740                    last_ns,
741                    origin,
742                    pid_ns_inode,
743                    observer_ns: now_ns,
744                }));
745            },
746        );
747    }
748
749    /// Drain and reset the eviction counter.
750    pub fn drain_evictions(&mut self) -> u64 {
751        self.tracker.take_evictions()
752    }
753
754    /// Drain the pid of the most recently evicted slot, if any.
755    pub fn drain_evicted_pid(&mut self) -> Option<u32> {
756        self.tracker.take_evicted_pid()
757    }
758
759    /// Drain and reset the capacity-exceeded counter.
760    pub fn drain_capacity_exceeded(&mut self) -> u64 {
761        self.tracker.take_capacity_exceeded()
762    }
763
764    /// Drain and reset the nonce-wrap counter.
765    pub fn drain_nonce_wraps(&mut self) -> u64 {
766        self.tracker.take_nonce_wraps()
767    }
768
769    /// Drain and reset the count of bounded eviction-scan calls that ran
770    /// the full [`crate::tracker::EVICTION_SCAN_WINDOW`] without finding a
771    /// victim. Non-zero values prove the per-frame work cap engaged — i.e.
772    /// the tracker was full and an attacker would otherwise have forced
773    /// O(n) work per arriving frame.
774    pub fn drain_eviction_scan_truncated(&mut self) -> u64 {
775        self.tracker.take_eviction_scan_truncated()
776    }
777
778    /// Drain and reset the per-tracker origin-conflict counter — number of
779    /// beats dropped because their transport origin disagreed with the
780    /// slot's pinned origin (first-origin-wins). Surfaced as
781    /// `varta_origin_conflict_total` in the Prometheus exporter.
782    pub fn drain_origin_conflicts(&mut self) -> u64 {
783        self.tracker.take_origin_conflicts()
784    }
785
786    /// Drain and reset the count of beats dropped at ingress because the
787    /// peer's PID-namespace inode differs from the observer's. Surfaced as
788    /// `varta_frame_namespace_mismatch_total` in the Prometheus exporter.
789    pub fn drain_cross_namespace_drops(&mut self) -> u64 {
790        let n = self.cross_namespace_drops;
791        self.cross_namespace_drops = 0;
792        n
793    }
794
795    /// Drain and reset the count of beats dropped at ingress because
796    /// `frame.pid` exceeded the kernel's configured `pid_max`. Surfaced as
797    /// `varta_frame_rejected_pid_above_max_total` in the Prometheus
798    /// exporter. Linux-only signal; 0 on platforms where the gate defaults
799    /// to `u32::MAX`.
800    pub fn drain_pid_above_max_drops(&mut self) -> u64 {
801        let n = self.pid_above_max_drops;
802        self.pid_above_max_drops = 0;
803        n
804    }
805
806    /// Observer's cached `pid_max`. Linux-only meaningful value; otherwise
807    /// `u32::MAX`. Exposed for tests and for the Prometheus exporter's
808    /// gauge.
809    pub fn pid_max(&self) -> u32 {
810        self.pid_max
811    }
812
813    /// Re-read `/proc/sys/kernel/pid_max` if at least
814    /// [`PID_MAX_REFRESH_INTERVAL_NS`] has elapsed since the last refresh.
815    /// Cheap no-op otherwise (single `u64` compare).
816    ///
817    /// Intended to be called from the daemon's maintenance phase — *not*
818    /// from `poll()` — so the I/O hot path stays untouched. Picks up
819    /// runtime `sysctl -w kernel.pid_max=...` changes within one interval.
820    /// On non-Linux targets, [`crate::pid_max::read_pid_max`] returns
821    /// `u32::MAX` so the gate stays effectively disabled and this method
822    /// is a steady no-op.
823    ///
824    /// Returns `true` when a refresh actually ran this call (regardless of
825    /// whether the read value changed), `false` when gated by the interval.
826    pub fn maybe_refresh_pid_max(&mut self) -> bool {
827        let now_ns = self.now_ns();
828        if now_ns.saturating_sub(self.last_pid_max_refresh_ns) < PID_MAX_REFRESH_INTERVAL_NS {
829            return false;
830        }
831        self.pid_max = crate::pid_max::read_pid_max();
832        self.last_pid_max_refresh_ns = now_ns;
833        true
834    }
835
836    /// Drain and reset the per-tracker namespace-conflict counter — beats
837    /// dropped because the beat's namespace inode disagreed with the slot's
838    /// pinned namespace inode (first-namespace-wins). Surfaced as
839    /// `varta_tracker_namespace_conflict_total`.
840    pub fn drain_namespace_conflicts(&mut self) -> u64 {
841        self.tracker.take_namespace_conflicts()
842    }
843
844    /// Observer's own PID-namespace inode (Linux only; cached). Used by
845    /// `main.rs` to construct recovery `StallSource` values that include
846    /// the observer's namespace for the audit record.
847    pub fn observer_pid_namespace_inode(&self) -> Option<u64> {
848        crate::peer_cred::observer_pid_namespace_inode()
849    }
850
851    /// Drain and reset the tracker invariant-violation counter. Non-zero
852    /// values surface that a defensive fall-through in the hot path
853    /// triggered (e.g. a stale `PidIndex` entry pointed at an out-of-range
854    /// slot). Exposed as `varta_tracker_invariant_violations_total`.
855    pub fn drain_invariant_violations(&mut self) -> u64 {
856        self.tracker.take_invariant_violations()
857    }
858
859    /// Drain and reset the `PidIndex` probe-exhaustion counter — number of
860    /// times a pid lookup ran the full `MAX_PROBE` budget without finding
861    /// a match. Surfaced as `varta_tracker_pid_index_probe_exhausted_total`.
862    pub fn drain_pid_index_probe_exhausted(&mut self) -> u64 {
863        self.tracker.take_probe_exhausted()
864    }
865
866    /// Drain and reset the per-pid rate-limited counter.
867    pub fn drain_per_pid_rate_limited(&mut self) -> u64 {
868        let n = self.rate_limited_total[RateLimitReason::PerPid as usize];
869        self.rate_limited_total[RateLimitReason::PerPid as usize] = 0;
870        n
871    }
872
873    /// Drain and reset the global rate-limited counter.
874    pub fn drain_global_rate_limited(&mut self) -> u64 {
875        let n = self.rate_limited_total[RateLimitReason::Global as usize];
876        self.rate_limited_total[RateLimitReason::Global as usize] = 0;
877        n
878    }
879
880    /// Effective `SO_RCVBUF` size granted by the kernel for the observer UDS.
881    pub fn uds_rcvbuf_bytes(&self) -> u32 {
882        self.uds_rcvbuf_bytes
883    }
884
885    /// Drain and reset the AEAD decryption failure counter across all
886    /// listeners.
887    pub fn drain_decrypt_failures(&mut self) -> u64 {
888        self.listeners
889            .iter_mut()
890            .map(|l| l.drain_decrypt_failures())
891            .sum()
892    }
893
894    /// Drain and reset the truncated-datagram counter across all listeners.
895    pub fn drain_truncated(&mut self) -> u64 {
896        self.listeners.iter_mut().map(|l| l.drain_truncated()).sum()
897    }
898
899    /// Drain and reset the sender-state-full counter across all listeners.
900    pub fn drain_sender_state_full(&mut self) -> u64 {
901        self.listeners
902            .iter_mut()
903            .map(|l| l.drain_sender_state_full())
904            .sum()
905    }
906
907    /// Drain and reset the AEAD-decryption-attempt counter across all
908    /// listeners. In steady state this equals
909    /// `frames_received * (keys.len() + master_key_configured as u64)` for
910    /// the secure-UDP listener — every loaded key is tried per frame to
911    /// remove the key-rotation timing side-channel.
912    pub fn drain_aead_attempts(&mut self) -> u64 {
913        self.listeners
914            .iter_mut()
915            .map(|l| l.drain_aead_attempts())
916            .sum()
917    }
918
919    /// Drain and reset the parent-directory fsync failure counter for UDS
920    /// bind.  Non-zero only when the OS returned an error from `fsync(2)` on
921    /// the socket's parent directory during startup.  Surfaced as
922    /// `varta_socket_bind_dir_fsync_failed_total`.
923    pub fn drain_bind_dir_fsync_failures() -> u64 {
924        crate::listener::drain_bind_dir_fsync_failures()
925    }
926}
927
928#[cfg(test)]
929mod tests {
930    use super::*;
931    use crate::tracker::DEFAULT_EVICTION_SCAN_WINDOW;
932    use std::path::PathBuf;
933    use std::sync::atomic::{AtomicU64, Ordering};
934
935    static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
936
937    fn unique_sock_path() -> PathBuf {
938        let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
939        let mut p = std::env::temp_dir();
940        p.push(format!(
941            "varta-observer-drop-{}-{}.sock",
942            std::process::id(),
943            n
944        ));
945        let _ = std::fs::remove_file(&p);
946        p
947    }
948
949    #[test]
950    #[allow(unsafe_code)]
951    fn drop_unlinks_bound_socket() {
952        // SAFETY: unit-test runner may be multi-threaded; the umask window is
953        // benign since no concurrent thread creates files at our temp path.
954        let pre = unsafe { PreThreadAttestation::new_unchecked() };
955        let path = unique_sock_path();
956        let obs = Observer::bind(
957            &path,
958            Duration::from_secs(1),
959            0o600,
960            Duration::from_millis(100),
961            0,
962            64,
963            EvictionPolicy::Strict,
964            DEFAULT_EVICTION_SCAN_WINDOW,
965            None,
966            0,
967            0,
968            ClockSource::Monotonic,
969            &pre,
970        )
971        .expect("bind should succeed on a clean temp path");
972        assert!(path.exists(), "socket file must exist after bind");
973        drop(obs);
974        assert!(
975            !path.exists(),
976            "socket file must be removed after observer drop"
977        );
978    }
979
980    #[test]
981    fn maybe_refresh_pid_max_respects_interval() {
982        // Drive the cadence gate without exercising the /proc read itself —
983        // the value `read_pid_max` returns is host-dependent (kernel default
984        // 4_194_304 on Linux, u32::MAX elsewhere); we assert the gate's
985        // *timing* contract, not the value.
986        //
987        // The observer's monotonic clock is anchored to `Observer::new` (see
988        // `Clock::new`), so `now_ns()` starts near zero and only crosses
989        // PID_MAX_REFRESH_INTERVAL_NS after ~60 s of real uptime. The test
990        // advances the observer's `last_now_ns` directly via the forward
991        // clamp to simulate elapsed time without sleeping.
992        let mut obs = Observer::new(
993            Duration::from_secs(1),
994            64,
995            EvictionPolicy::Strict,
996            DEFAULT_EVICTION_SCAN_WINDOW,
997            None,
998            0,
999            0,
1000            ClockSource::Monotonic,
1001        )
1002        .expect("Observer::new should succeed");
1003
1004        let initial = obs.pid_max();
1005        assert_eq!(
1006            obs.last_pid_max_refresh_ns, 0,
1007            "fresh Observer has not yet run a periodic refresh"
1008        );
1009
1010        // Immediately after construction the observer clock is still inside
1011        // the startup window. `now_ns() - 0 < INTERVAL`, so the gate skips:
1012        // `Observer::new` has already read pid_max, no need to re-read yet.
1013        let refreshed_at_startup = obs.maybe_refresh_pid_max();
1014        assert!(
1015            !refreshed_at_startup,
1016            "first call within startup window must skip (Observer::new already read pid_max)"
1017        );
1018        assert_eq!(
1019            obs.last_pid_max_refresh_ns, 0,
1020            "skip must leave the timestamp untouched"
1021        );
1022
1023        // Simulate >60 s of observer uptime by pushing the forward-clamped
1024        // monotonic anchor past the interval. The next `now_ns()` reading
1025        // will be clamped to at least this value.
1026        obs.last_now_ns = PID_MAX_REFRESH_INTERVAL_NS + 1_000_000_000;
1027        // The forward clamp registers the real raw clock as a regression
1028        // when computing now_ns; drain it so unrelated tests stay clean.
1029        let refreshed_after_interval = obs.maybe_refresh_pid_max();
1030        assert!(
1031            refreshed_after_interval,
1032            "refresh must fire once the interval has elapsed since startup"
1033        );
1034        let first_ts = obs.last_pid_max_refresh_ns;
1035        assert!(
1036            first_ts >= PID_MAX_REFRESH_INTERVAL_NS,
1037            "post-interval refresh stamps a fresh timestamp >= INTERVAL"
1038        );
1039        assert_eq!(
1040            obs.pid_max(),
1041            initial,
1042            "refresh re-reads the same host value within a single test process"
1043        );
1044
1045        // Immediate follow-up: the gate must close again until another full
1046        // interval elapses.
1047        let refreshed_again = obs.maybe_refresh_pid_max();
1048        assert!(
1049            !refreshed_again,
1050            "second call within new interval must skip"
1051        );
1052        assert_eq!(
1053            obs.last_pid_max_refresh_ns, first_ts,
1054            "skip must leave the new timestamp untouched"
1055        );
1056
1057        // Rewind the recorded timestamp by more than the interval and confirm
1058        // the gate opens again.
1059        obs.last_pid_max_refresh_ns = first_ts.saturating_sub(PID_MAX_REFRESH_INTERVAL_NS + 1);
1060        let refreshed_after_rewind = obs.maybe_refresh_pid_max();
1061        assert!(
1062            refreshed_after_rewind,
1063            "refresh must fire after rewinding the recorded timestamp"
1064        );
1065        assert!(
1066            obs.last_pid_max_refresh_ns >= first_ts,
1067            "rewind-driven refresh records a fresh timestamp"
1068        );
1069
1070        // Test produced clock regressions as a side effect of pushing
1071        // `last_now_ns` past the real raw clock; drain so subsequent suite
1072        // state stays neutral. The count is non-deterministic (depends on
1073        // how many `now_ns()` calls were issued by `maybe_refresh_pid_max`).
1074        let _ = obs.drain_clock_regressions();
1075    }
1076
1077    #[test]
1078    fn clock_regression_counter_increments_on_backward_clock() {
1079        let mut obs = Observer::new(
1080            Duration::from_secs(1),
1081            64,
1082            EvictionPolicy::Strict,
1083            DEFAULT_EVICTION_SCAN_WINDOW,
1084            None,
1085            0,
1086            0,
1087            ClockSource::Monotonic,
1088        )
1089        .expect("Observer::new should succeed");
1090
1091        // Baseline reading — the forward clamp seeds `last_now_ns` from the
1092        // current monotonic value. No regression yet.
1093        let _ = obs.now_ns();
1094        assert_eq!(
1095            obs.drain_clock_regressions(),
1096            0,
1097            "no regressions after the first reading"
1098        );
1099
1100        // Simulate the kernel clock having previously reported a value far
1101        // in the future (e.g. before a VM live migration that rewound the
1102        // TSC). The next `now_ns()` call reads a real value strictly less
1103        // than `last_now_ns`, so the forward clamp absorbs it AND the
1104        // regression counter must increment.
1105        obs.last_now_ns = u64::MAX / 2;
1106        let clamped = obs.now_ns();
1107        assert_eq!(
1108            clamped,
1109            u64::MAX / 2,
1110            "forward clamp preserves the larger value"
1111        );
1112        assert_eq!(
1113            obs.drain_clock_regressions(),
1114            1,
1115            "exactly one regression observed"
1116        );
1117
1118        // Drain resets — a second drain reads zero.
1119        assert_eq!(
1120            obs.drain_clock_regressions(),
1121            0,
1122            "drain must reset the counter"
1123        );
1124
1125        // A second backward excursion bumps the counter again.
1126        obs.last_now_ns = u64::MAX / 2;
1127        let _ = obs.now_ns();
1128        obs.last_now_ns = u64::MAX / 2;
1129        let _ = obs.now_ns();
1130        assert_eq!(
1131            obs.drain_clock_regressions(),
1132            2,
1133            "counter is saturating-add cumulative until drained"
1134        );
1135    }
1136
1137    #[test]
1138    fn clock_jump_forward_counter_increments_on_large_advance() {
1139        let mut obs = Observer::new(
1140            Duration::from_secs(1),
1141            64,
1142            EvictionPolicy::Strict,
1143            DEFAULT_EVICTION_SCAN_WINDOW,
1144            None,
1145            0,
1146            0,
1147            ClockSource::Monotonic,
1148        )
1149        .expect("Observer::new should succeed");
1150
1151        // Feed synthetic timestamps via apply_raw_clock_test so we don't need
1152        // to wait real time. Simulate a baseline reading then a 10 s jump.
1153        let _ = obs.apply_raw_clock_test(1_000_000); // prime: 1 ms from baseline
1154        let _ = obs.apply_raw_clock_test(11_000_000_000); // +10 s jump
1155        assert_eq!(
1156            obs.drain_clock_jumps_forward(),
1157            1,
1158            "forward jump exceeding threshold must increment the counter"
1159        );
1160        assert_eq!(
1161            obs.drain_clock_regressions(),
1162            0,
1163            "a forward jump must not also count as a regression"
1164        );
1165
1166        // Drain resets — second drain reads zero.
1167        assert_eq!(
1168            obs.drain_clock_jumps_forward(),
1169            0,
1170            "drain must reset the forward-jump counter"
1171        );
1172
1173        // A sub-threshold advance (2 s) must not be counted.
1174        let _ = obs.apply_raw_clock_test(13_000_000_000); // +2 s — below 5 s sentinel
1175        assert_eq!(
1176            obs.drain_clock_jumps_forward(),
1177            0,
1178            "advance below threshold must not be counted as a jump"
1179        );
1180
1181        // Bootstrap case: last_now_ns == 0 must not trigger a jump (startup).
1182        obs.last_now_ns = 0;
1183        let _ = obs.apply_raw_clock_test(10_000_000_000); // 10 s from zero
1184        assert_eq!(
1185            obs.drain_clock_jumps_forward(),
1186            0,
1187            "initial read from last_now_ns==0 must not count as a forward jump"
1188        );
1189    }
1190}