Skip to main content

varta_client/
client.rs

1//! Agent surface — `Varta` connects to the observer over a configured
2//! transport and `beat()` emits one fire-and-forget 32-byte VLP frame per call.
3
4use std::fmt;
5use std::io::{self, Write};
6use std::path::Path;
7use std::time::Instant;
8
9use varta_vlp::{Frame, Status, NONCE_TERMINAL};
10
11use crate::transport::{BeatTransport, UdsTransport};
12
13#[cfg(feature = "udp")]
14use crate::transport::UdpTransport;
15
16#[cfg(feature = "secure-udp")]
17use crate::secure_transport::SecureUdpTransport;
18
19#[cfg(feature = "secure-udp")]
20use varta_vlp::crypto::Key;
21
22/// Linux value of `ENOBUFS` from `<asm-generic/errno-base.h>` (Linux 2.6+,
23/// verified against 6.12). Hard-coded to preserve the zero-dependency
24/// invariant; do not replace with `libc`.
25#[cfg(target_os = "linux")]
26const ENOBUFS: i32 = 105;
27
28/// Darwin / BSD value of `ENOBUFS` from `<sys/errno.h>` (macOS 15 / XNU,
29/// FreeBSD 14, NetBSD 10, OpenBSD 7, DragonFly 6). Hard-coded for the
30/// same reason.
31#[cfg(any(
32    target_os = "macos",
33    target_os = "ios",
34    target_os = "freebsd",
35    target_os = "netbsd",
36    target_os = "openbsd",
37    target_os = "dragonfly",
38))]
39const ENOBUFS: i32 = 55;
40
41/// Solaris / illumos value of `ENOBUFS` from `<sys/errno.h>`. Hard-coded
42/// for the same reason.
43#[cfg(any(target_os = "solaris", target_os = "illumos"))]
44const ENOBUFS: i32 = 111;
45
46/// Catch-all for unlisted Unix targets.
47/// Cross-compilation to an unsupported target silently uses the wrong
48/// value; fail at compile time instead.
49#[cfg(not(any(
50    target_os = "linux",
51    target_os = "macos",
52    target_os = "ios",
53    target_os = "freebsd",
54    target_os = "netbsd",
55    target_os = "openbsd",
56    target_os = "dragonfly",
57    target_os = "solaris",
58    target_os = "illumos",
59)))]
60compile_error!("ENOBUFS value is unknown for this target — add it to the cfg gates above");
61
62/// Classify a `send(2)` error into a [`BeatOutcome`].
63///
64/// Checks the raw OS error code before the `ErrorKind` match so that
65/// `ENOBUFS` (kernel buffer pressure, transient) is caught even when the
66/// toolchain maps it to `ErrorKind::Other`.
67pub fn classify_send_error(e: &io::Error) -> BeatOutcome {
68    // (a) Raw-OS path first — catches ENOBUFS even when libstd has not
69    //     minted a dedicated ErrorKind for it on this toolchain.
70    if let Some(code) = e.raw_os_error() {
71        if code == ENOBUFS {
72            return BeatOutcome::Dropped(DropReason::KernelQueueFull);
73        }
74    }
75
76    match e.kind() {
77        // (b) Transient kernel pressure.
78        io::ErrorKind::WouldBlock => BeatOutcome::Dropped(DropReason::KernelQueueFull),
79        // (c) Observer not bound yet or socket file missing.
80        io::ErrorKind::ConnectionRefused | io::ErrorKind::NotFound => {
81            BeatOutcome::Dropped(DropReason::NoObserver)
82        }
83        // (d) Peer socket torn down since the last beat.
84        io::ErrorKind::ConnectionReset
85        | io::ErrorKind::NotConnected
86        | io::ErrorKind::BrokenPipe => BeatOutcome::Dropped(DropReason::PeerGone),
87        // (e) Host out of disk space.
88        io::ErrorKind::StorageFull => BeatOutcome::Dropped(DropReason::StorageFull),
89        // (f) Unexpected error: capture as a Copy POD that cannot allocate.
90        _ => BeatOutcome::Failed(BeatError::from_io(e)),
91    }
92}
93
94/// Payload of [`BeatOutcome::Failed`].
95///
96/// `Copy` by construction: allocating a `BeatError` is structurally
97/// impossible, so the [`Varta::beat`] slow path is allocation-free
98/// regardless of how the underlying `io::Error` was represented.
99/// Callers wanting a full `io::Error` can call [`Self::to_io_error`].
100#[derive(Copy, Clone, Eq, PartialEq)]
101pub struct BeatError {
102    /// Raw OS errno when the failure came from a syscall;
103    /// [`BeatError::UNKNOWN_ERRNO`] (0) when not OS-derived.
104    /// POSIX guarantees errno is never 0 on a real syscall failure.
105    pub errno: i32,
106    /// The libstd [`io::ErrorKind`] classification. Always populated.
107    pub kind: io::ErrorKind,
108}
109
110impl BeatError {
111    /// Sentinel value used when no OS error number is available.
112    pub const UNKNOWN_ERRNO: i32 = 0;
113
114    /// Capture the failure shape from an `io::Error` without cloning or allocating.
115    pub fn from_io(e: &io::Error) -> Self {
116        Self {
117            errno: e.raw_os_error().unwrap_or(Self::UNKNOWN_ERRNO),
118            kind: e.kind(),
119        }
120    }
121
122    /// Reconstruct an `io::Error`. Allocation-free when `errno != 0` (uses
123    /// `Repr::Os`); falls back to `Repr::Simple(kind)` otherwise.
124    pub fn to_io_error(self) -> io::Error {
125        if self.errno != Self::UNKNOWN_ERRNO {
126            io::Error::from_raw_os_error(self.errno)
127        } else {
128            io::Error::from(self.kind)
129        }
130    }
131}
132
133impl fmt::Debug for BeatError {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        f.debug_struct("BeatError")
136            .field("errno", &self.errno)
137            .field("kind", &self.kind)
138            .finish()
139    }
140}
141
142impl fmt::Display for BeatError {
143    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144        if self.errno != Self::UNKNOWN_ERRNO {
145            write!(f, "send failed: {:?} (errno={})", self.kind, self.errno)
146        } else {
147            write!(f, "send failed: {:?}", self.kind)
148        }
149    }
150}
151
152impl std::error::Error for BeatError {}
153
154/// Reason why a [`BeatOutcome::Dropped`] was produced.
155///
156/// Four-way taxonomy covering every `io::Error` classified as `Dropped`
157/// by [`classify_send_error`]. `Copy` by construction; fits in a single byte
158/// (statically asserted below). Operators can match on this to distinguish
159/// "observer not yet listening" from "socket torn down" from "host full".
160#[derive(Copy, Clone, Eq, PartialEq, Debug)]
161pub enum DropReason {
162    /// The kernel send buffer or socket queue was full (transient).
163    ///
164    /// Source: `WouldBlock` or `ENOBUFS`. The observer is likely alive;
165    /// retry after a short back-off or rely on
166    /// [`set_reconnect_after`](Varta::set_reconnect_after).
167    KernelQueueFull,
168    /// No observer is bound at the target path or address.
169    ///
170    /// Source: `NotFound` or `ConnectionRefused`. Expected during a
171    /// rolling observer restart before the new process has bound the socket.
172    NoObserver,
173    /// The peer socket was torn down since the last successful beat.
174    ///
175    /// Source: `ConnectionReset`, `NotConnected`, or `BrokenPipe`. The
176    /// channel was live and then disappeared — typically a crash or explicit
177    /// observer shutdown. Call [`reconnect`](Varta::reconnect) to recover.
178    PeerGone,
179    /// The host filesystem is out of space (UDS socket path on a full volume).
180    ///
181    /// Source: `StorageFull`. Retrying without clearing disk space will not
182    /// resolve this; operator intervention is required.
183    StorageFull,
184}
185
186impl fmt::Display for DropReason {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        match self {
189            Self::KernelQueueFull => f.write_str("kernel queue full"),
190            Self::NoObserver => f.write_str("no observer"),
191            Self::PeerGone => f.write_str("peer gone"),
192            Self::StorageFull => f.write_str("storage full"),
193        }
194    }
195}
196
197// Four variants fit in a single byte — no heap footprint on the beat path.
198const _: () = {
199    assert!(core::mem::size_of::<DropReason>() == 1);
200};
201
202/// Result of a single [`Varta::beat`] call.
203///
204/// `beat()` never blocks and never panics; the kernel's view of the send is
205/// translated into one of three steady-state outcomes. `Failed` carries the
206/// underlying error for higher layers that wish to log or escalate.
207#[must_use]
208pub enum BeatOutcome {
209    /// The 32-byte datagram was accepted by the kernel.
210    Sent,
211    /// The kernel could not accept the datagram; the agent should treat
212    /// this as a no-op. The [`DropReason`] payload identifies the underlying
213    /// cause and lets operators distinguish "observer absent" from "peer gone"
214    /// from "kernel pressure" from "disk full".
215    Dropped(DropReason),
216    /// An unexpected I/O error surfaced from the underlying `send(2)`.
217    /// Callers wanting an `io::Error` can call [`BeatError::to_io_error`].
218    Failed(BeatError),
219}
220
221impl fmt::Debug for BeatOutcome {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        match self {
224            Self::Sent => write!(f, "Sent"),
225            Self::Dropped(r) => write!(f, "Dropped({r:?})"),
226            Self::Failed(e) => write!(f, "Failed({e:?})"),
227        }
228    }
229}
230
231impl fmt::Display for BeatOutcome {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        match self {
234            Self::Sent => write!(f, "sent"),
235            Self::Dropped(r) => write!(f, "dropped: {r}"),
236            Self::Failed(e) => write!(f, "failed: {e}"),
237        }
238    }
239}
240
241/// Agent-side handle that owns a configured [`BeatTransport`] and a 32-byte
242/// scratch buffer.
243///
244/// `Varta::connect` is the single allocation point: it creates the transport,
245/// switches it to non-blocking mode (where applicable), and captures the epoch
246/// used for monotonic timestamps. The process ID is fetched afresh via
247/// [`std::process::id`] on every [`beat`](Self::beat) so forked children
248/// report their own PID. Every subsequent `beat()` reuses the owned buffer
249/// and emits a frame without touching the heap.
250///
251/// The default transport is [`UdsTransport`] (Unix Domain Socket). Use
252/// `Varta::connect()` to create a UDS-backed agent. Other transports (e.g.
253/// UDP) are available behind feature flags.
254///
255/// # Examples
256///
257/// ```no_run
258/// use varta_client::{Status, Varta};
259/// let mut agent = Varta::connect("/tmp/varta.sock")?;
260/// agent.beat(Status::Ok, 0);
261/// # Ok::<(), std::io::Error>(())
262/// ```
263/// # Thread safety
264///
265/// `Varta` is [`Send`]: the underlying transport is `Send`, and a beat
266/// issues no shared state. `Varta` is **not** [`Sync`]: concurrent
267/// `&Varta::beat` calls would race on the kernel-side socket send buffer
268/// ordering. To share across threads, wrap in a [`std::sync::Mutex`] or move
269/// the handle into a dedicated emitter thread or channel.
270///
271/// After `fork(2)` the child inherits this handle. Fork is **auto-detected**
272/// on the next [`beat`](Self::beat): if `std::process::id()` differs from the
273/// PID captured at [`connect`](Self::connect) time, the underlying transport's
274/// [`reconnect`](crate::transport::BeatTransport::reconnect) is invoked
275/// **before** the frame is built. On secure-UDP this re-reads OS entropy and
276/// rotates the AEAD session salt, making catastrophic nonce reuse across the
277/// fork boundary structurally impossible. The recovery is silent — the caller
278/// sees [`BeatOutcome::Sent`] — and is observable via
279/// [`fork_recoveries`](Self::fork_recoveries). Calling
280/// [`reconnect`](Self::reconnect) explicitly in the child is still supported
281/// and idempotent.
282pub struct Varta<T: BeatTransport = UdsTransport> {
283    transport: T,
284    buf: [u8; 32],
285    start: Instant,
286    nonce: u64,
287    consecutive_dropped: u32,
288    reconnect_after: u32,
289    last_timestamp: u64,
290    clock_regressions: u64,
291    /// PID captured at `connect` / `reconnect` time. Compared against
292    /// `std::process::id()` on every [`beat`](Self::beat) to detect `fork(2)`
293    /// and trigger transport refresh before any frame leaves the process.
294    /// See the struct-level docstring for the safety contract.
295    connect_pid: u32,
296    /// Saturating count of fork-recovery events surfaced via
297    /// [`fork_recoveries`](Self::fork_recoveries).
298    fork_recoveries: u64,
299}
300
301// Static assertion: Varta<UdsTransport> is Send and must remain so.
302const _: () = {
303    const fn assert_send<T: Send>() {}
304    assert_send::<Varta<UdsTransport>>();
305};
306
307impl Varta<UdsTransport> {
308    /// Connect to the observer listening on `path` via Unix Domain Socket and
309    /// prepare the agent for non-blocking emission.
310    ///
311    /// Stores an `Instant` for per-frame elapsed-nanosecond timestamps. The
312    /// process ID is read afresh on every [`Varta::beat`] via
313    /// [`std::process::id`] so each frame carries the current PID. Subsequent
314    /// calls to [`Varta::beat`] do not allocate.
315    ///
316    /// # Errors
317    ///
318    /// Returns an [`io::Error`] if the socket cannot be created, the peer
319    /// path cannot be reached, or non-blocking mode cannot be enabled.
320    pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<Self> {
321        let transport = UdsTransport::connect(path)?;
322        Ok(Self {
323            transport,
324            buf: [0u8; 32],
325            start: Instant::now(),
326            nonce: 0,
327            consecutive_dropped: 0,
328            reconnect_after: 0,
329            last_timestamp: 0,
330            clock_regressions: 0,
331            connect_pid: std::process::id(),
332            fork_recoveries: 0,
333        })
334    }
335}
336
337#[cfg(feature = "udp")]
338impl Varta<UdpTransport> {
339    /// Connect to the observer listening on `addr` via UDP and prepare the
340    /// agent for non-blocking emission.
341    ///
342    /// The socket is bound to an ephemeral source port and connected to the
343    /// target address. On a connected UDP socket, `send` writes to the fixed
344    /// peer and ICMP errors (e.g. port-unreachable) are surfaced as I/O
345    /// errors handled by [`classify_send_error`].
346    ///
347    /// UDP semantics: there is no connection state — `beat()` returns
348    /// [`BeatOutcome::Sent`] even if no observer is listening. The observer
349    /// must be bound before the first beat is emitted. Reconnect creates a
350    /// fresh ephemeral socket.
351    ///
352    /// # Errors
353    ///
354    /// Returns an [`io::Error`] if the socket cannot be created, connected,
355    /// or switched to non-blocking mode.
356    pub fn connect_udp(addr: std::net::SocketAddr) -> io::Result<Self> {
357        let transport = UdpTransport::connect(addr)?;
358        Ok(Self {
359            transport,
360            buf: [0u8; 32],
361            start: Instant::now(),
362            nonce: 0,
363            consecutive_dropped: 0,
364            reconnect_after: 0,
365            last_timestamp: 0,
366            clock_regressions: 0,
367            connect_pid: std::process::id(),
368            fork_recoveries: 0,
369        })
370    }
371}
372
373#[cfg(feature = "secure-udp")]
374impl Varta<SecureUdpTransport> {
375    /// Connect to the observer listening on `addr` via secure UDP
376    /// (ChaCha20-Poly1305 AEAD) and prepare the agent for non-blocking
377    /// emission.
378    ///
379    /// Every [`beat`](Self::beat) is encrypted and authenticated with the
380    /// provided pre-shared `key`. The observer must be configured with the
381    /// same key and the `secure-udp` feature enabled.
382    ///
383    /// The IV random prefix is read from `/dev/urandom` at connect time —
384    /// no file I/O on the beat path.
385    ///
386    /// # Errors
387    ///
388    /// Returns an [`io::Error`] if the socket cannot be created, connected,
389    /// or switched to non-blocking mode.
390    pub fn connect_secure_udp(addr: std::net::SocketAddr, key: Key) -> io::Result<Self> {
391        let transport = SecureUdpTransport::connect(addr, key)?;
392        Ok(Self {
393            transport,
394            buf: [0u8; 32],
395            start: Instant::now(),
396            nonce: 0,
397            consecutive_dropped: 0,
398            reconnect_after: 0,
399            last_timestamp: 0,
400            clock_regressions: 0,
401            connect_pid: std::process::id(),
402            fork_recoveries: 0,
403        })
404    }
405
406    /// Connect via ChaCha20-Poly1305 AEAD over UDP using a master key.
407    ///
408    /// The per-agent key is derived via
409    /// [`varta_vlp::crypto::kdf::derive_agent_key`] from the master key
410    /// and the calling process's PID. The PID is also embedded in the
411    /// `iv_random` prefix so the observer can derive the same agent key
412    /// before decrypting the frame.
413    ///
414    /// Per-agent keys mean that compromise of one agent's derived key
415    /// does not reveal other agents' keys or the master key.
416    ///
417    /// # Errors
418    ///
419    /// Returns an [`io::Error`] if the socket cannot be created, connected,
420    /// or switched to non-blocking mode.
421    pub fn connect_secure_udp_with_master(
422        addr: std::net::SocketAddr,
423        master_key: Key,
424    ) -> io::Result<Self> {
425        let transport = SecureUdpTransport::connect_with_master(addr, master_key)?;
426        Ok(Self {
427            transport,
428            buf: [0u8; 32],
429            start: Instant::now(),
430            nonce: 0,
431            consecutive_dropped: 0,
432            reconnect_after: 0,
433            last_timestamp: 0,
434            clock_regressions: 0,
435            connect_pid: std::process::id(),
436            fork_recoveries: 0,
437        })
438    }
439
440    /// Test-only: fast-forward the AEAD counter on the underlying transport
441    /// so the next beat exercises the counter-wrap rotation path without
442    /// emitting 2^32 beats. H6 integration test surface.
443    #[cfg(any(test, feature = "test-hooks"))]
444    pub fn set_iv_counter_for_test(&mut self, value: u32) {
445        self.transport.set_iv_counter_for_test(value);
446    }
447
448    /// Test-only: read the currently-derived 8-byte IV prefix.
449    #[cfg(any(test, feature = "test-hooks"))]
450    pub fn iv_prefix_for_test(&self) -> [u8; 8] {
451        self.transport.iv_prefix_for_test()
452    }
453
454    /// Test-only: read the current prefix index.
455    #[cfg(any(test, feature = "test-hooks"))]
456    pub fn iv_prefix_index_for_test(&self) -> u32 {
457        self.transport.iv_prefix_index_for_test()
458    }
459
460    /// Test-only: read the currently-committed AEAD counter. Pairs with
461    /// `set_iv_counter_for_test` to assert commit-on-success semantics on
462    /// `BeatTransport::send` — a `Dropped` beat must NOT advance this value.
463    #[cfg(any(test, feature = "test-hooks"))]
464    pub fn iv_counter_for_test(&self) -> u32 {
465        self.transport.iv_counter_for_test()
466    }
467}
468
469impl<T: BeatTransport> Varta<T> {
470    /// Test-only: spoof the connect-time PID snapshot so the next
471    /// [`beat`](Self::beat) trips the fork-detection branch without an
472    /// actual `fork(2)` syscall. Pair with the transport-level
473    /// `iv_prefix_for_test()` accessor to assert that the IV salt rotated.
474    #[cfg(any(test, feature = "test-hooks"))]
475    pub fn set_connect_pid_for_test(&mut self, pid: u32) {
476        self.connect_pid = pid;
477    }
478}
479
480/// Nonce-wraparound warning emitted once per connection lifetime.
481///
482/// Writes a static message to stderr without heap allocation — no
483/// [`format!`] and no [`eprintln!`].  Practically unreachable under
484/// any realistic beat rate (hundreds of millions of years), but kept
485/// as a diagnostic signal for correctness audits.
486#[cold]
487fn warn_nonce_wrapping() {
488    let _ = io::stderr().write_all(b"[varta-client] nonce exhausted; wrapping to 0\n");
489}
490
491impl<T: BeatTransport> Varta<T> {
492    fn send_frame(&mut self) -> BeatOutcome {
493        match self.transport.send(&self.buf) {
494            Ok(_) => BeatOutcome::Sent,
495            Err(e) => classify_send_error(&e),
496        }
497    }
498
499    /// Emit a single VLP frame carrying `status` and an opaque 8-byte
500    /// `payload`.
501    ///
502    /// The nonce increments first (starting from 1) and wraps to 0 on
503    /// exhaustion; the very first beat after `connect` carries `nonce == 1`. The frame is
504    /// constructed on the stack, encoded into the owned scratch buffer, and
505    /// handed to `send(2)`. The steady-state path (`Sent` / `Dropped`) neither
506    /// blocks nor allocates; the rare `Failed` path may allocate when cloning
507    /// the underlying [`io::Error`].
508    ///
509    /// When [`set_reconnect_after`](Self::set_reconnect_after) is enabled and
510    /// the consecutive-dropped threshold is crossed, `beat` will internally
511    /// reconnect the socket and retry the send before returning. The retry
512    /// path allocates a fresh socket; this is acceptable because observer
513    /// restarts are rare and the steady-state path remains allocation-free.
514    pub fn beat(&mut self, status: Status, payload: u32) -> BeatOutcome {
515        let pid = std::process::id();
516        if pid != self.connect_pid {
517            // Fork detected. Refresh the underlying transport so any
518            // session-keyed state (e.g. secure-UDP iv_session_salt /
519            // iv_prefix_index / iv_counter) is re-seeded from OS entropy
520            // before the next frame is built. This is the only correct
521            // response to a fork on the secure path — without it, the
522            // child would derive the same 12-byte AEAD nonce its parent
523            // has already used under the same ChaCha20-Poly1305 key.
524            //
525            // Reset the local epoch and frame counters so the child's
526            // wire stream looks like a fresh session to the observer
527            // (per-pid tracker keying makes this safe).
528            match self.transport.reconnect() {
529                Ok(()) => {
530                    self.connect_pid = pid;
531                    self.fork_recoveries = self.fork_recoveries.saturating_add(1);
532                    self.nonce = 0;
533                    self.start = Instant::now();
534                    self.last_timestamp = 0;
535                    self.consecutive_dropped = 0;
536                }
537                Err(e) => return BeatOutcome::Failed(BeatError::from_io(&e)),
538            }
539        }
540        if self.nonce < NONCE_TERMINAL - 1 {
541            self.nonce += 1;
542        } else {
543            warn_nonce_wrapping();
544            self.nonce = 0;
545        }
546        // Saturate the nanosecond timestamp at `u64::MAX as u128` so the cast
547        // never wraps. `u64::MAX` itself is reserved as a wire-level sentinel
548        // (`DecodeError::BadTimestamp`); reaching it would require ~584.5
549        // years of continuous uptime on a single connect handle, which is
550        // physically unreachable. Note the resulting decode/encode asymmetry:
551        // a hypothetical saturated agent observes `BeatOutcome::Sent` from
552        // `send(2)` while the observer drops the frame as `BadTimestamp`.
553        // Documented for completeness; matches `Frame::decode` in
554        // `varta-vlp/src/lib.rs`.
555        let raw_elapsed = self.start.elapsed().as_nanos().min(u64::MAX as u128) as u64;
556        if raw_elapsed < self.last_timestamp {
557            // Underlying Instant::now() regressed — surface via the counter
558            // while preserving wire-format monotonicity through the .max()
559            // clamp below.
560            self.clock_regressions = self.clock_regressions.saturating_add(1);
561        }
562        self.last_timestamp = self.last_timestamp.max(raw_elapsed);
563        let timestamp = self.last_timestamp;
564        debug_assert!(
565            self.nonce != NONCE_TERMINAL,
566            "regular beat nonce must not equal NONCE_TERMINAL sentinel"
567        );
568        let frame = Frame::new(status, pid, timestamp, self.nonce, payload);
569        frame.encode(&mut self.buf);
570        let outcome = self.send_frame();
571        match &outcome {
572            BeatOutcome::Dropped(_) => {
573                self.consecutive_dropped = self.consecutive_dropped.saturating_add(1);
574                if self.reconnect_after > 0
575                    && self.consecutive_dropped >= self.reconnect_after
576                    && self.transport.reconnect().is_ok()
577                {
578                    let retry = self.send_frame();
579                    if matches!(&retry, BeatOutcome::Dropped(_)) {
580                        self.consecutive_dropped = self.reconnect_after;
581                    } else {
582                        self.consecutive_dropped = 0;
583                    }
584                    return retry;
585                }
586                outcome
587            }
588            _ => {
589                self.consecutive_dropped = 0;
590                outcome
591            }
592        }
593    }
594
595    /// Re-establish the underlying transport connection.
596    ///
597    /// After an observer restart the old channel is stale — every `beat()`
598    /// returns [`BeatOutcome::Dropped`] (with reason [`DropReason::PeerGone`]
599    /// or [`DropReason::NoObserver`]) until reconnected. Call `reconnect` to establish
600    /// a fresh connection to the target stored at [`connect`](Self::connect)
601    /// time. Agent identity (`nonce`, `start` clock) is preserved.
602    ///
603    /// Also refreshes the internal fork-detection snapshot so an explicit
604    /// reconnect issued from a forked child (the documented manual escape
605    /// hatch) cannot leave a stale parent PID behind that would re-trigger
606    /// auto-recovery on the next beat.
607    ///
608    /// This is the only post-[`connect`](Self::connect) allocation site and
609    /// should only be called when recovery is needed, not on the steady-state
610    /// beat path.
611    pub fn reconnect(&mut self) -> io::Result<()> {
612        self.transport.reconnect()?;
613        self.connect_pid = std::process::id();
614        Ok(())
615    }
616
617    /// Enable automatic reconnect after `n` consecutive
618    /// [`BeatOutcome::Dropped`] outcomes. Set to `0` to disable (the
619    /// default).
620    ///
621    /// When enabled, [`beat`](Self::beat) increments an internal counter on
622    /// each `Dropped` outcome. After `n` consecutive drops — a strong signal
623    /// that the observer channel is stale — `beat` calls [`reconnect`](Self::reconnect)
624    /// internally and retries the send before returning. The counter resets
625    /// to zero on any `Sent` or `Failed` outcome, and after a successful
626    /// reconnect.
627    ///
628    /// Resets the internal consecutive-dropped counter to zero so that the
629    /// new threshold gates future drops rather than immediately triggering
630    /// on a past-saturated counter.
631    pub fn set_reconnect_after(&mut self, n: u32) {
632        self.reconnect_after = n;
633        self.consecutive_dropped = 0;
634    }
635
636    /// Number of times [`beat`](Self::beat) has observed
637    /// [`Instant::now`](std::time::Instant::now) regress since
638    /// [`connect`](Self::connect). Saturating; never wraps.
639    ///
640    /// The wire-format timestamp remains monotonic because `beat()` clamps
641    /// it through `.max()`, so a regression manifests on the wire as a
642    /// duplicate timestamp rather than a backwards jump. A non-zero value
643    /// here is the only in-process signal of the underlying platform-clock
644    /// bug.
645    ///
646    /// Consumers wiring a Prometheus exporter SHOULD publish this as a
647    /// counter named `varta_client_clock_regression_total`.
648    pub fn clock_regressions(&self) -> u64 {
649        self.clock_regressions
650    }
651
652    /// Number of times [`beat`](Self::beat) has observed a `fork(2)`
653    /// transition (i.e. `std::process::id()` differing from the PID captured
654    /// at [`connect`](Self::connect) time) and refreshed the underlying
655    /// transport in response. Saturating; never wraps.
656    ///
657    /// A non-zero value is the operational signal that auto-recovery has
658    /// fired. On the secure-UDP transport, each event corresponds to one
659    /// AEAD session-salt rotation in the forked child — the structural
660    /// guarantee against nonce reuse across the fork boundary.
661    ///
662    /// Consumers wiring a Prometheus exporter SHOULD publish this as a
663    /// counter named `varta_client_fork_recoveries_total`.
664    pub fn fork_recoveries(&self) -> u64 {
665        self.fork_recoveries
666    }
667}
668
669#[cfg(test)]
670mod tests {
671    use super::*;
672    use std::os::unix::fs::PermissionsExt;
673    use std::os::unix::net::UnixDatagram;
674    use std::time::{SystemTime, UNIX_EPOCH};
675
676    /// Bind a fresh UDS listener at a unique tempdir path and return both
677    /// the listener (kept alive by the caller) and its path. The listener
678    /// silently drops every datagram — enough to satisfy `Varta::connect`.
679    ///
680    /// The path embeds (pid, nanos, atomic counter). The counter guards
681    /// against same-process collisions when two unit tests in the same
682    /// `cargo test` run hit the same `SystemTime::now()` nanosecond value —
683    /// observed on macOS CI runners where the clock granularity is coarser
684    /// than on Apple Silicon. Without the counter, parallel test execution
685    /// can produce `AlreadyExists` from `create_dir`.
686    fn bind_listener() -> (UnixDatagram, std::path::PathBuf) {
687        use std::sync::atomic::{AtomicU64, Ordering};
688        static TEMPDIR_COUNTER: AtomicU64 = AtomicU64::new(0);
689
690        let counter = TEMPDIR_COUNTER.fetch_add(1, Ordering::Relaxed);
691        let dir = std::env::temp_dir().join(format!(
692            "varta-clock-{}-{}-{}",
693            std::process::id(),
694            SystemTime::now()
695                .duration_since(UNIX_EPOCH)
696                .map(|d| d.as_nanos())
697                .unwrap_or(0),
698            counter
699        ));
700        std::fs::create_dir(&dir).expect("create tempdir");
701        // Cerebrum 2026-05-13: process-wide umask from a concurrent
702        // UnixDatagram::bind elsewhere can strip the executable bit; force
703        // 0o755 before any further open() inside this dir.
704        std::fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o755))
705            .expect("chmod 0o755");
706        let sock_path = dir.join("varta.sock");
707        let listener = UnixDatagram::bind(&sock_path).expect("bind listener");
708        (listener, sock_path)
709    }
710
711    #[test]
712    fn clock_regression_counter_stays_zero_on_forward_clock() {
713        let (_listener, path) = bind_listener();
714        let mut agent = Varta::connect(&path).expect("connect");
715        // Outcome is irrelevant; this test asserts on the regression counter
716        // side effect, not the send result. The explicit `let _` discharges
717        // the `#[must_use]` on `BeatOutcome`.
718        let _ = agent.beat(Status::Ok, 0);
719        let _ = agent.beat(Status::Ok, 0);
720        assert_eq!(
721            agent.clock_regressions(),
722            0,
723            "no regression should be observed on a forward clock"
724        );
725        let _ = std::fs::remove_file(&path);
726        let _ = std::fs::remove_dir(path.parent().unwrap());
727    }
728
729    #[test]
730    fn clock_regression_counter_increments_on_backwards_clock() {
731        let (_listener, path) = bind_listener();
732        let mut agent = Varta::connect(&path).expect("connect");
733
734        // Jam the high-water mark past any plausible `start.elapsed()` so
735        // every subsequent beat trips the regression branch.
736        agent.last_timestamp = u64::MAX / 2;
737        let baseline_ts = agent.last_timestamp;
738
739        // Outcome is irrelevant; this test asserts on the regression counter
740        // side effect. The explicit `let _` discharges `#[must_use]`.
741        let _ = agent.beat(Status::Ok, 0);
742        assert_eq!(agent.clock_regressions(), 1);
743        // Wire timestamp must remain monotonic — `.max()` is unchanged.
744        assert_eq!(agent.last_timestamp, baseline_ts);
745
746        let _ = agent.beat(Status::Ok, 0);
747        assert_eq!(
748            agent.clock_regressions(),
749            2,
750            "counter must accumulate across consecutive regressions"
751        );
752        assert_eq!(agent.last_timestamp, baseline_ts);
753
754        let _ = std::fs::remove_file(&path);
755        let _ = std::fs::remove_dir(path.parent().unwrap());
756    }
757
758    /// Steady-state (no fork): the recovery counter must stay at zero
759    /// across many beats. This guards against accidental triggering of
760    /// the fork-detection branch on a forward-only PID.
761    #[test]
762    fn same_pid_does_not_trigger_fork_recovery() {
763        let (_listener, path) = bind_listener();
764        let mut agent = Varta::connect(&path).expect("connect");
765        for _ in 0..64 {
766            let _ = agent.beat(Status::Ok, 0);
767        }
768        assert_eq!(
769            agent.fork_recoveries(),
770            0,
771            "no fork-recovery should fire in a single-process beat loop"
772        );
773        let _ = std::fs::remove_file(&path);
774        let _ = std::fs::remove_dir(path.parent().unwrap());
775    }
776
777    /// Spoof a PID change via the test hook. The next `beat()` must
778    /// detect the mismatch, refresh the UDS transport, and increment
779    /// the fork-recovery counter. The frame counter (`nonce`) resets so
780    /// the post-recovery beat carries `nonce == 1`.
781    #[test]
782    fn spoofed_fork_triggers_uds_transport_reconnect() {
783        let (_listener, path) = bind_listener();
784        let mut agent = Varta::connect(&path).expect("connect");
785
786        // Emit a few beats so the counters are non-trivial pre-fork.
787        let _ = agent.beat(Status::Ok, 0);
788        let _ = agent.beat(Status::Ok, 0);
789        assert_eq!(agent.fork_recoveries(), 0);
790        assert_eq!(agent.nonce, 2);
791
792        // Spoof: pretend a fork happened. The actual pid is unchanged;
793        // the snapshot we lie about is `connect_pid`.
794        let real_pid = std::process::id();
795        agent.set_connect_pid_for_test(real_pid.wrapping_add(1));
796
797        // The next beat must observe the mismatch and recover.
798        let _ = agent.beat(Status::Ok, 0);
799        assert_eq!(
800            agent.fork_recoveries(),
801            1,
802            "fork-recovery counter must increment exactly once"
803        );
804        assert_eq!(
805            agent.connect_pid, real_pid,
806            "connect_pid must be refreshed to the current pid"
807        );
808        assert_eq!(
809            agent.nonce, 1,
810            "nonce must reset to 0 on recovery, then increment to 1 for the beat"
811        );
812
813        let _ = std::fs::remove_file(&path);
814        let _ = std::fs::remove_dir(path.parent().unwrap());
815    }
816
817    /// A `BeatTransport` whose `reconnect()` always fails. Used to assert
818    /// that a fork-recovery whose transport refresh fails surfaces as
819    /// `BeatOutcome::Failed` *and* does not increment the counter.
820    struct AlwaysFailReconnect {
821        sent: u32,
822    }
823
824    impl BeatTransport for AlwaysFailReconnect {
825        fn send(&mut self, _buf: &[u8; 32]) -> io::Result<usize> {
826            self.sent = self.sent.saturating_add(1);
827            Ok(32)
828        }
829
830        fn reconnect(&mut self) -> io::Result<()> {
831            Err(io::Error::from(io::ErrorKind::PermissionDenied))
832        }
833    }
834
835    fn varta_with_transport<T: BeatTransport>(transport: T) -> Varta<T> {
836        Varta {
837            transport,
838            buf: [0u8; 32],
839            start: Instant::now(),
840            nonce: 0,
841            consecutive_dropped: 0,
842            reconnect_after: 0,
843            last_timestamp: 0,
844            clock_regressions: 0,
845            connect_pid: std::process::id(),
846            fork_recoveries: 0,
847        }
848    }
849
850    #[test]
851    fn fork_recovery_with_failing_reconnect_returns_failed() {
852        let mut agent = varta_with_transport(AlwaysFailReconnect { sent: 0 });
853
854        // Spoof a fork.
855        agent.set_connect_pid_for_test(std::process::id().wrapping_add(1));
856
857        let outcome = agent.beat(Status::Ok, 0);
858        match outcome {
859            BeatOutcome::Failed(e) => {
860                assert_eq!(e.kind, io::ErrorKind::PermissionDenied);
861            }
862            other => panic!("expected Failed on reconnect failure, got {other:?}"),
863        }
864        assert_eq!(
865            agent.fork_recoveries(),
866            0,
867            "counter must NOT increment when the recovery transport refresh fails"
868        );
869        assert_eq!(
870            agent.transport.sent, 0,
871            "no frame should be sent when fork-recovery fails"
872        );
873    }
874
875    /// Secure-UDP path: spoofing a fork must rotate the AEAD session salt
876    /// and IV prefix. This is the load-bearing test — without salt
877    /// rotation, the child would derive the parent's nonce stream under
878    /// the same key.
879    #[cfg(feature = "secure-udp")]
880    #[test]
881    fn spoofed_fork_rotates_secure_udp_session_salt() {
882        use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
883        use varta_vlp::crypto::Key;
884
885        let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9876, 0, 0));
886        let key = Key::from_bytes([0x42; 32]);
887        let mut agent = Varta::connect_secure_udp(addr, key).expect("connect");
888
889        // Snapshot pre-fork crypto state.
890        let prefix_before = agent.iv_prefix_for_test();
891
892        // Spoof: pretend a fork happened.
893        agent.set_connect_pid_for_test(std::process::id().wrapping_add(1));
894
895        // The destination is a closed ephemeral address so the send may
896        // fail at the network layer, but the fork-recovery + reconnect
897        // logic runs before the syscall. The recovery and the IV
898        // rotation are what we are asserting on.
899        let _ = agent.beat(Status::Ok, 0);
900
901        assert_eq!(
902            agent.fork_recoveries(),
903            1,
904            "secure-UDP fork-recovery counter must increment"
905        );
906        let prefix_after = agent.iv_prefix_for_test();
907        assert_ne!(
908            prefix_before, prefix_after,
909            "IV prefix must rotate on fork-recovery (defeats nonce reuse)"
910        );
911        assert_eq!(
912            agent.iv_prefix_index_for_test(),
913            0,
914            "prefix_index must reset to 0 on transport.reconnect()"
915        );
916    }
917}