varta_client/client.rs
1//! Agent surface — `Varta` connects to the observer over a configured
2//! transport and `beat()` emits one fire-and-forget 32-byte VLP frame per call.
3
4use std::fmt;
5use std::io::{self, Write};
6use std::path::Path;
7use std::time::Instant;
8
9use varta_vlp::{Frame, Status, NONCE_TERMINAL};
10
11use crate::transport::{BeatTransport, UdsTransport};
12
13#[cfg(feature = "udp")]
14use crate::transport::UdpTransport;
15
16#[cfg(feature = "secure-udp")]
17use crate::secure_transport::SecureUdpTransport;
18
19#[cfg(feature = "secure-udp")]
20use varta_vlp::crypto::Key;
21
22/// Linux value of `ENOBUFS` from `<asm-generic/errno-base.h>` (Linux 2.6+,
23/// verified against 6.12). Hard-coded to preserve the zero-dependency
24/// invariant; do not replace with `libc`.
25#[cfg(target_os = "linux")]
26const ENOBUFS: i32 = 105;
27
28/// Darwin / BSD value of `ENOBUFS` from `<sys/errno.h>` (macOS 15 / XNU,
29/// FreeBSD 14, NetBSD 10, OpenBSD 7, DragonFly 6). Hard-coded for the
30/// same reason.
31#[cfg(any(
32 target_os = "macos",
33 target_os = "ios",
34 target_os = "freebsd",
35 target_os = "netbsd",
36 target_os = "openbsd",
37 target_os = "dragonfly",
38))]
39const ENOBUFS: i32 = 55;
40
41/// Solaris / illumos value of `ENOBUFS` from `<sys/errno.h>`. Hard-coded
42/// for the same reason.
43#[cfg(any(target_os = "solaris", target_os = "illumos"))]
44const ENOBUFS: i32 = 111;
45
46/// Catch-all for unlisted Unix targets.
47/// Cross-compilation to an unsupported target silently uses the wrong
48/// value; fail at compile time instead.
49#[cfg(not(any(
50 target_os = "linux",
51 target_os = "macos",
52 target_os = "ios",
53 target_os = "freebsd",
54 target_os = "netbsd",
55 target_os = "openbsd",
56 target_os = "dragonfly",
57 target_os = "solaris",
58 target_os = "illumos",
59)))]
60compile_error!("ENOBUFS value is unknown for this target — add it to the cfg gates above");
61
62/// Classify a `send(2)` error into a [`BeatOutcome`].
63///
64/// Checks the raw OS error code before the `ErrorKind` match so that
65/// `ENOBUFS` (kernel buffer pressure, transient) is caught even when the
66/// toolchain maps it to `ErrorKind::Other`.
67pub fn classify_send_error(e: &io::Error) -> BeatOutcome {
68 // (a) Raw-OS path first — catches ENOBUFS even when libstd has not
69 // minted a dedicated ErrorKind for it on this toolchain.
70 if let Some(code) = e.raw_os_error() {
71 if code == ENOBUFS {
72 return BeatOutcome::Dropped(DropReason::KernelQueueFull);
73 }
74 }
75
76 match e.kind() {
77 // (b) Transient kernel pressure.
78 io::ErrorKind::WouldBlock => BeatOutcome::Dropped(DropReason::KernelQueueFull),
79 // (c) Observer not bound yet or socket file missing.
80 io::ErrorKind::ConnectionRefused | io::ErrorKind::NotFound => {
81 BeatOutcome::Dropped(DropReason::NoObserver)
82 }
83 // (d) Peer socket torn down since the last beat.
84 io::ErrorKind::ConnectionReset
85 | io::ErrorKind::NotConnected
86 | io::ErrorKind::BrokenPipe => BeatOutcome::Dropped(DropReason::PeerGone),
87 // (e) Host out of disk space.
88 io::ErrorKind::StorageFull => BeatOutcome::Dropped(DropReason::StorageFull),
89 // (f) Unexpected error: capture as a Copy POD that cannot allocate.
90 _ => BeatOutcome::Failed(BeatError::from_io(e)),
91 }
92}
93
94/// Payload of [`BeatOutcome::Failed`].
95///
96/// `Copy` by construction: allocating a `BeatError` is structurally
97/// impossible, so the [`Varta::beat`] slow path is allocation-free
98/// regardless of how the underlying `io::Error` was represented.
99/// Callers wanting a full `io::Error` can call [`Self::to_io_error`].
100#[derive(Copy, Clone, Eq, PartialEq)]
101pub struct BeatError {
102 /// Raw OS errno when the failure came from a syscall;
103 /// [`BeatError::UNKNOWN_ERRNO`] (0) when not OS-derived.
104 /// POSIX guarantees errno is never 0 on a real syscall failure.
105 pub errno: i32,
106 /// The libstd [`io::ErrorKind`] classification. Always populated.
107 pub kind: io::ErrorKind,
108}
109
110impl BeatError {
111 /// Sentinel value used when no OS error number is available.
112 pub const UNKNOWN_ERRNO: i32 = 0;
113
114 /// Capture the failure shape from an `io::Error` without cloning or allocating.
115 pub fn from_io(e: &io::Error) -> Self {
116 Self {
117 errno: e.raw_os_error().unwrap_or(Self::UNKNOWN_ERRNO),
118 kind: e.kind(),
119 }
120 }
121
122 /// Reconstruct an `io::Error`. Allocation-free when `errno != 0` (uses
123 /// `Repr::Os`); falls back to `Repr::Simple(kind)` otherwise.
124 pub fn to_io_error(self) -> io::Error {
125 if self.errno != Self::UNKNOWN_ERRNO {
126 io::Error::from_raw_os_error(self.errno)
127 } else {
128 io::Error::from(self.kind)
129 }
130 }
131}
132
133impl fmt::Debug for BeatError {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 f.debug_struct("BeatError")
136 .field("errno", &self.errno)
137 .field("kind", &self.kind)
138 .finish()
139 }
140}
141
142impl fmt::Display for BeatError {
143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144 if self.errno != Self::UNKNOWN_ERRNO {
145 write!(f, "send failed: {:?} (errno={})", self.kind, self.errno)
146 } else {
147 write!(f, "send failed: {:?}", self.kind)
148 }
149 }
150}
151
152impl std::error::Error for BeatError {}
153
154/// Reason why a [`BeatOutcome::Dropped`] was produced.
155///
156/// Four-way taxonomy covering every `io::Error` classified as `Dropped`
157/// by [`classify_send_error`]. `Copy` by construction; fits in a single byte
158/// (statically asserted below). Operators can match on this to distinguish
159/// "observer not yet listening" from "socket torn down" from "host full".
160#[derive(Copy, Clone, Eq, PartialEq, Debug)]
161pub enum DropReason {
162 /// The kernel send buffer or socket queue was full (transient).
163 ///
164 /// Source: `WouldBlock` or `ENOBUFS`. The observer is likely alive;
165 /// retry after a short back-off or rely on
166 /// [`set_reconnect_after`](Varta::set_reconnect_after).
167 KernelQueueFull,
168 /// No observer is bound at the target path or address.
169 ///
170 /// Source: `NotFound` or `ConnectionRefused`. Expected during a
171 /// rolling observer restart before the new process has bound the socket.
172 NoObserver,
173 /// The peer socket was torn down since the last successful beat.
174 ///
175 /// Source: `ConnectionReset`, `NotConnected`, or `BrokenPipe`. The
176 /// channel was live and then disappeared — typically a crash or explicit
177 /// observer shutdown. Call [`reconnect`](Varta::reconnect) to recover.
178 PeerGone,
179 /// The host filesystem is out of space (UDS socket path on a full volume).
180 ///
181 /// Source: `StorageFull`. Retrying without clearing disk space will not
182 /// resolve this; operator intervention is required.
183 StorageFull,
184}
185
186impl fmt::Display for DropReason {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 match self {
189 Self::KernelQueueFull => f.write_str("kernel queue full"),
190 Self::NoObserver => f.write_str("no observer"),
191 Self::PeerGone => f.write_str("peer gone"),
192 Self::StorageFull => f.write_str("storage full"),
193 }
194 }
195}
196
197// Four variants fit in a single byte — no heap footprint on the beat path.
198const _: () = {
199 assert!(core::mem::size_of::<DropReason>() == 1);
200};
201
202/// Result of a single [`Varta::beat`] call.
203///
204/// `beat()` never blocks and never panics; the kernel's view of the send is
205/// translated into one of three steady-state outcomes. `Failed` carries the
206/// underlying error for higher layers that wish to log or escalate.
207#[must_use]
208pub enum BeatOutcome {
209 /// The 32-byte datagram was accepted by the kernel.
210 Sent,
211 /// The kernel could not accept the datagram; the agent should treat
212 /// this as a no-op. The [`DropReason`] payload identifies the underlying
213 /// cause and lets operators distinguish "observer absent" from "peer gone"
214 /// from "kernel pressure" from "disk full".
215 Dropped(DropReason),
216 /// An unexpected I/O error surfaced from the underlying `send(2)`.
217 /// Callers wanting an `io::Error` can call [`BeatError::to_io_error`].
218 Failed(BeatError),
219}
220
221impl fmt::Debug for BeatOutcome {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 match self {
224 Self::Sent => write!(f, "Sent"),
225 Self::Dropped(r) => write!(f, "Dropped({r:?})"),
226 Self::Failed(e) => write!(f, "Failed({e:?})"),
227 }
228 }
229}
230
231impl fmt::Display for BeatOutcome {
232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233 match self {
234 Self::Sent => write!(f, "sent"),
235 Self::Dropped(r) => write!(f, "dropped: {r}"),
236 Self::Failed(e) => write!(f, "failed: {e}"),
237 }
238 }
239}
240
241/// Agent-side handle that owns a configured [`BeatTransport`] and a 32-byte
242/// scratch buffer.
243///
244/// `Varta::connect` is the single allocation point: it creates the transport,
245/// switches it to non-blocking mode (where applicable), and captures the epoch
246/// used for monotonic timestamps. The process ID is fetched afresh via
247/// [`std::process::id`] on every [`beat`](Self::beat) so forked children
248/// report their own PID. Every subsequent `beat()` reuses the owned buffer
249/// and emits a frame without touching the heap.
250///
251/// The default transport is [`UdsTransport`] (Unix Domain Socket). Use
252/// `Varta::connect()` to create a UDS-backed agent. Other transports (e.g.
253/// UDP) are available behind feature flags.
254///
255/// # Examples
256///
257/// ```no_run
258/// use varta_client::{Status, Varta};
259/// let mut agent = Varta::connect("/tmp/varta.sock")?;
260/// agent.beat(Status::Ok, 0);
261/// # Ok::<(), std::io::Error>(())
262/// ```
263/// # Thread safety
264///
265/// `Varta` is [`Send`]: the underlying transport is `Send`, and a beat
266/// issues no shared state. `Varta` is **not** [`Sync`]: concurrent
267/// `&Varta::beat` calls would race on the kernel-side socket send buffer
268/// ordering. To share across threads, wrap in a [`std::sync::Mutex`] or move
269/// the handle into a dedicated emitter thread or channel.
270///
271/// After `fork(2)` the child inherits this handle. Fork is **auto-detected**
272/// on the next [`beat`](Self::beat): if `std::process::id()` differs from the
273/// PID captured at [`connect`](Self::connect) time, the underlying transport's
274/// [`reconnect`](crate::transport::BeatTransport::reconnect) is invoked
275/// **before** the frame is built. On secure-UDP this re-reads OS entropy and
276/// rotates the AEAD session salt, making catastrophic nonce reuse across the
277/// fork boundary structurally impossible. The recovery is silent — the caller
278/// sees [`BeatOutcome::Sent`] — and is observable via
279/// [`fork_recoveries`](Self::fork_recoveries). Calling
280/// [`reconnect`](Self::reconnect) explicitly in the child is still supported
281/// and idempotent.
282pub struct Varta<T: BeatTransport = UdsTransport> {
283 transport: T,
284 buf: [u8; 32],
285 start: Instant,
286 nonce: u64,
287 consecutive_dropped: u32,
288 reconnect_after: u32,
289 last_timestamp: u64,
290 clock_regressions: u64,
291 /// PID captured at `connect` / `reconnect` time. Compared against
292 /// `std::process::id()` on every [`beat`](Self::beat) to detect `fork(2)`
293 /// and trigger transport refresh before any frame leaves the process.
294 /// See the struct-level docstring for the safety contract.
295 connect_pid: u32,
296 /// Saturating count of fork-recovery events surfaced via
297 /// [`fork_recoveries`](Self::fork_recoveries).
298 fork_recoveries: u64,
299}
300
301// Static assertion: Varta<UdsTransport> is Send and must remain so.
302const _: () = {
303 const fn assert_send<T: Send>() {}
304 assert_send::<Varta<UdsTransport>>();
305};
306
307impl Varta<UdsTransport> {
308 /// Connect to the observer listening on `path` via Unix Domain Socket and
309 /// prepare the agent for non-blocking emission.
310 ///
311 /// Stores an `Instant` for per-frame elapsed-nanosecond timestamps. The
312 /// process ID is read afresh on every [`Varta::beat`] via
313 /// [`std::process::id`] so each frame carries the current PID. Subsequent
314 /// calls to [`Varta::beat`] do not allocate.
315 ///
316 /// # Errors
317 ///
318 /// Returns an [`io::Error`] if the socket cannot be created, the peer
319 /// path cannot be reached, or non-blocking mode cannot be enabled.
320 pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<Self> {
321 let transport = UdsTransport::connect(path)?;
322 Ok(Self {
323 transport,
324 buf: [0u8; 32],
325 start: Instant::now(),
326 nonce: 0,
327 consecutive_dropped: 0,
328 reconnect_after: 0,
329 last_timestamp: 0,
330 clock_regressions: 0,
331 connect_pid: std::process::id(),
332 fork_recoveries: 0,
333 })
334 }
335}
336
337#[cfg(feature = "udp")]
338impl Varta<UdpTransport> {
339 /// Connect to the observer listening on `addr` via UDP and prepare the
340 /// agent for non-blocking emission.
341 ///
342 /// The socket is bound to an ephemeral source port and connected to the
343 /// target address. On a connected UDP socket, `send` writes to the fixed
344 /// peer and ICMP errors (e.g. port-unreachable) are surfaced as I/O
345 /// errors handled by [`classify_send_error`].
346 ///
347 /// UDP semantics: there is no connection state — `beat()` returns
348 /// [`BeatOutcome::Sent`] even if no observer is listening. The observer
349 /// must be bound before the first beat is emitted. Reconnect creates a
350 /// fresh ephemeral socket.
351 ///
352 /// # Errors
353 ///
354 /// Returns an [`io::Error`] if the socket cannot be created, connected,
355 /// or switched to non-blocking mode.
356 pub fn connect_udp(addr: std::net::SocketAddr) -> io::Result<Self> {
357 let transport = UdpTransport::connect(addr)?;
358 Ok(Self {
359 transport,
360 buf: [0u8; 32],
361 start: Instant::now(),
362 nonce: 0,
363 consecutive_dropped: 0,
364 reconnect_after: 0,
365 last_timestamp: 0,
366 clock_regressions: 0,
367 connect_pid: std::process::id(),
368 fork_recoveries: 0,
369 })
370 }
371}
372
373#[cfg(feature = "secure-udp")]
374impl Varta<SecureUdpTransport> {
375 /// Connect to the observer listening on `addr` via secure UDP
376 /// (ChaCha20-Poly1305 AEAD) and prepare the agent for non-blocking
377 /// emission.
378 ///
379 /// Every [`beat`](Self::beat) is encrypted and authenticated with the
380 /// provided pre-shared `key`. The observer must be configured with the
381 /// same key and the `secure-udp` feature enabled.
382 ///
383 /// The IV random prefix is read from `/dev/urandom` at connect time —
384 /// no file I/O on the beat path.
385 ///
386 /// # Errors
387 ///
388 /// Returns an [`io::Error`] if the socket cannot be created, connected,
389 /// or switched to non-blocking mode.
390 pub fn connect_secure_udp(addr: std::net::SocketAddr, key: Key) -> io::Result<Self> {
391 let transport = SecureUdpTransport::connect(addr, key)?;
392 Ok(Self {
393 transport,
394 buf: [0u8; 32],
395 start: Instant::now(),
396 nonce: 0,
397 consecutive_dropped: 0,
398 reconnect_after: 0,
399 last_timestamp: 0,
400 clock_regressions: 0,
401 connect_pid: std::process::id(),
402 fork_recoveries: 0,
403 })
404 }
405
406 /// Connect via ChaCha20-Poly1305 AEAD over UDP using a master key.
407 ///
408 /// The per-agent key is derived via
409 /// [`varta_vlp::crypto::kdf::derive_agent_key`] from the master key
410 /// and the calling process's PID. The PID is also embedded in the
411 /// `iv_random` prefix so the observer can derive the same agent key
412 /// before decrypting the frame.
413 ///
414 /// Per-agent keys mean that compromise of one agent's derived key
415 /// does not reveal other agents' keys or the master key.
416 ///
417 /// # Errors
418 ///
419 /// Returns an [`io::Error`] if the socket cannot be created, connected,
420 /// or switched to non-blocking mode.
421 pub fn connect_secure_udp_with_master(
422 addr: std::net::SocketAddr,
423 master_key: Key,
424 ) -> io::Result<Self> {
425 let transport = SecureUdpTransport::connect_with_master(addr, master_key)?;
426 Ok(Self {
427 transport,
428 buf: [0u8; 32],
429 start: Instant::now(),
430 nonce: 0,
431 consecutive_dropped: 0,
432 reconnect_after: 0,
433 last_timestamp: 0,
434 clock_regressions: 0,
435 connect_pid: std::process::id(),
436 fork_recoveries: 0,
437 })
438 }
439
440 /// Test-only: fast-forward the AEAD counter on the underlying transport
441 /// so the next beat exercises the counter-wrap rotation path without
442 /// emitting 2^32 beats. H6 integration test surface.
443 #[cfg(any(test, feature = "test-hooks"))]
444 pub fn set_iv_counter_for_test(&mut self, value: u32) {
445 self.transport.set_iv_counter_for_test(value);
446 }
447
448 /// Test-only: read the currently-derived 8-byte IV prefix.
449 #[cfg(any(test, feature = "test-hooks"))]
450 pub fn iv_prefix_for_test(&self) -> [u8; 8] {
451 self.transport.iv_prefix_for_test()
452 }
453
454 /// Test-only: read the current prefix index.
455 #[cfg(any(test, feature = "test-hooks"))]
456 pub fn iv_prefix_index_for_test(&self) -> u32 {
457 self.transport.iv_prefix_index_for_test()
458 }
459
460 /// Test-only: read the currently-committed AEAD counter. Pairs with
461 /// `set_iv_counter_for_test` to assert commit-on-success semantics on
462 /// `BeatTransport::send` — a `Dropped` beat must NOT advance this value.
463 #[cfg(any(test, feature = "test-hooks"))]
464 pub fn iv_counter_for_test(&self) -> u32 {
465 self.transport.iv_counter_for_test()
466 }
467}
468
469impl<T: BeatTransport> Varta<T> {
470 /// Test-only: spoof the connect-time PID snapshot so the next
471 /// [`beat`](Self::beat) trips the fork-detection branch without an
472 /// actual `fork(2)` syscall. Pair with the transport-level
473 /// `iv_prefix_for_test()` accessor to assert that the IV salt rotated.
474 #[cfg(any(test, feature = "test-hooks"))]
475 pub fn set_connect_pid_for_test(&mut self, pid: u32) {
476 self.connect_pid = pid;
477 }
478}
479
480/// Nonce-wraparound warning emitted once per connection lifetime.
481///
482/// Writes a static message to stderr without heap allocation — no
483/// [`format!`] and no [`eprintln!`]. Practically unreachable under
484/// any realistic beat rate (hundreds of millions of years), but kept
485/// as a diagnostic signal for correctness audits.
486#[cold]
487fn warn_nonce_wrapping() {
488 let _ = io::stderr().write_all(b"[varta-client] nonce exhausted; wrapping to 0\n");
489}
490
491impl<T: BeatTransport> Varta<T> {
492 fn send_frame(&mut self) -> BeatOutcome {
493 match self.transport.send(&self.buf) {
494 Ok(_) => BeatOutcome::Sent,
495 Err(e) => classify_send_error(&e),
496 }
497 }
498
499 /// Emit a single VLP frame carrying `status` and an opaque 8-byte
500 /// `payload`.
501 ///
502 /// The nonce increments first (starting from 1) and wraps to 0 on
503 /// exhaustion; the very first beat after `connect` carries `nonce == 1`. The frame is
504 /// constructed on the stack, encoded into the owned scratch buffer, and
505 /// handed to `send(2)`. The steady-state path (`Sent` / `Dropped`) neither
506 /// blocks nor allocates; the rare `Failed` path may allocate when cloning
507 /// the underlying [`io::Error`].
508 ///
509 /// When [`set_reconnect_after`](Self::set_reconnect_after) is enabled and
510 /// the consecutive-dropped threshold is crossed, `beat` will internally
511 /// reconnect the socket and retry the send before returning. The retry
512 /// path allocates a fresh socket; this is acceptable because observer
513 /// restarts are rare and the steady-state path remains allocation-free.
514 pub fn beat(&mut self, status: Status, payload: u32) -> BeatOutcome {
515 let pid = std::process::id();
516 if pid != self.connect_pid {
517 // Fork detected. Refresh the underlying transport so any
518 // session-keyed state (e.g. secure-UDP iv_session_salt /
519 // iv_prefix_index / iv_counter) is re-seeded from OS entropy
520 // before the next frame is built. This is the only correct
521 // response to a fork on the secure path — without it, the
522 // child would derive the same 12-byte AEAD nonce its parent
523 // has already used under the same ChaCha20-Poly1305 key.
524 //
525 // Reset the local epoch and frame counters so the child's
526 // wire stream looks like a fresh session to the observer
527 // (per-pid tracker keying makes this safe).
528 match self.transport.reconnect() {
529 Ok(()) => {
530 self.connect_pid = pid;
531 self.fork_recoveries = self.fork_recoveries.saturating_add(1);
532 self.nonce = 0;
533 self.start = Instant::now();
534 self.last_timestamp = 0;
535 self.consecutive_dropped = 0;
536 }
537 Err(e) => return BeatOutcome::Failed(BeatError::from_io(&e)),
538 }
539 }
540 if self.nonce < NONCE_TERMINAL - 1 {
541 self.nonce += 1;
542 } else {
543 warn_nonce_wrapping();
544 self.nonce = 0;
545 }
546 // Saturate the nanosecond timestamp at `u64::MAX as u128` so the cast
547 // never wraps. `u64::MAX` itself is reserved as a wire-level sentinel
548 // (`DecodeError::BadTimestamp`); reaching it would require ~584.5
549 // years of continuous uptime on a single connect handle, which is
550 // physically unreachable. Note the resulting decode/encode asymmetry:
551 // a hypothetical saturated agent observes `BeatOutcome::Sent` from
552 // `send(2)` while the observer drops the frame as `BadTimestamp`.
553 // Documented for completeness; matches `Frame::decode` in
554 // `varta-vlp/src/lib.rs`.
555 let raw_elapsed = self.start.elapsed().as_nanos().min(u64::MAX as u128) as u64;
556 if raw_elapsed < self.last_timestamp {
557 // Underlying Instant::now() regressed — surface via the counter
558 // while preserving wire-format monotonicity through the .max()
559 // clamp below.
560 self.clock_regressions = self.clock_regressions.saturating_add(1);
561 }
562 self.last_timestamp = self.last_timestamp.max(raw_elapsed);
563 let timestamp = self.last_timestamp;
564 debug_assert!(
565 self.nonce != NONCE_TERMINAL,
566 "regular beat nonce must not equal NONCE_TERMINAL sentinel"
567 );
568 let frame = Frame::new(status, pid, timestamp, self.nonce, payload);
569 frame.encode(&mut self.buf);
570 let outcome = self.send_frame();
571 match &outcome {
572 BeatOutcome::Dropped(_) => {
573 self.consecutive_dropped = self.consecutive_dropped.saturating_add(1);
574 if self.reconnect_after > 0
575 && self.consecutive_dropped >= self.reconnect_after
576 && self.transport.reconnect().is_ok()
577 {
578 let retry = self.send_frame();
579 if matches!(&retry, BeatOutcome::Dropped(_)) {
580 self.consecutive_dropped = self.reconnect_after;
581 } else {
582 self.consecutive_dropped = 0;
583 }
584 return retry;
585 }
586 outcome
587 }
588 _ => {
589 self.consecutive_dropped = 0;
590 outcome
591 }
592 }
593 }
594
595 /// Re-establish the underlying transport connection.
596 ///
597 /// After an observer restart the old channel is stale — every `beat()`
598 /// returns [`BeatOutcome::Dropped`] (with reason [`DropReason::PeerGone`]
599 /// or [`DropReason::NoObserver`]) until reconnected. Call `reconnect` to establish
600 /// a fresh connection to the target stored at [`connect`](Self::connect)
601 /// time. Agent identity (`nonce`, `start` clock) is preserved.
602 ///
603 /// Also refreshes the internal fork-detection snapshot so an explicit
604 /// reconnect issued from a forked child (the documented manual escape
605 /// hatch) cannot leave a stale parent PID behind that would re-trigger
606 /// auto-recovery on the next beat.
607 ///
608 /// This is the only post-[`connect`](Self::connect) allocation site and
609 /// should only be called when recovery is needed, not on the steady-state
610 /// beat path.
611 pub fn reconnect(&mut self) -> io::Result<()> {
612 self.transport.reconnect()?;
613 self.connect_pid = std::process::id();
614 Ok(())
615 }
616
617 /// Enable automatic reconnect after `n` consecutive
618 /// [`BeatOutcome::Dropped`] outcomes. Set to `0` to disable (the
619 /// default).
620 ///
621 /// When enabled, [`beat`](Self::beat) increments an internal counter on
622 /// each `Dropped` outcome. After `n` consecutive drops — a strong signal
623 /// that the observer channel is stale — `beat` calls [`reconnect`](Self::reconnect)
624 /// internally and retries the send before returning. The counter resets
625 /// to zero on any `Sent` or `Failed` outcome, and after a successful
626 /// reconnect.
627 ///
628 /// Resets the internal consecutive-dropped counter to zero so that the
629 /// new threshold gates future drops rather than immediately triggering
630 /// on a past-saturated counter.
631 pub fn set_reconnect_after(&mut self, n: u32) {
632 self.reconnect_after = n;
633 self.consecutive_dropped = 0;
634 }
635
636 /// Number of times [`beat`](Self::beat) has observed
637 /// [`Instant::now`](std::time::Instant::now) regress since
638 /// [`connect`](Self::connect). Saturating; never wraps.
639 ///
640 /// The wire-format timestamp remains monotonic because `beat()` clamps
641 /// it through `.max()`, so a regression manifests on the wire as a
642 /// duplicate timestamp rather than a backwards jump. A non-zero value
643 /// here is the only in-process signal of the underlying platform-clock
644 /// bug.
645 ///
646 /// Consumers wiring a Prometheus exporter SHOULD publish this as a
647 /// counter named `varta_client_clock_regression_total`.
648 pub fn clock_regressions(&self) -> u64 {
649 self.clock_regressions
650 }
651
652 /// Number of times [`beat`](Self::beat) has observed a `fork(2)`
653 /// transition (i.e. `std::process::id()` differing from the PID captured
654 /// at [`connect`](Self::connect) time) and refreshed the underlying
655 /// transport in response. Saturating; never wraps.
656 ///
657 /// A non-zero value is the operational signal that auto-recovery has
658 /// fired. On the secure-UDP transport, each event corresponds to one
659 /// AEAD session-salt rotation in the forked child — the structural
660 /// guarantee against nonce reuse across the fork boundary.
661 ///
662 /// Consumers wiring a Prometheus exporter SHOULD publish this as a
663 /// counter named `varta_client_fork_recoveries_total`.
664 pub fn fork_recoveries(&self) -> u64 {
665 self.fork_recoveries
666 }
667}
668
669#[cfg(test)]
670mod tests {
671 use super::*;
672 use std::os::unix::fs::PermissionsExt;
673 use std::os::unix::net::UnixDatagram;
674 use std::time::{SystemTime, UNIX_EPOCH};
675
676 /// Bind a fresh UDS listener at a unique tempdir path and return both
677 /// the listener (kept alive by the caller) and its path. The listener
678 /// silently drops every datagram — enough to satisfy `Varta::connect`.
679 ///
680 /// The path embeds (pid, nanos, atomic counter). The counter guards
681 /// against same-process collisions when two unit tests in the same
682 /// `cargo test` run hit the same `SystemTime::now()` nanosecond value —
683 /// observed on macOS CI runners where the clock granularity is coarser
684 /// than on Apple Silicon. Without the counter, parallel test execution
685 /// can produce `AlreadyExists` from `create_dir`.
686 fn bind_listener() -> (UnixDatagram, std::path::PathBuf) {
687 use std::sync::atomic::{AtomicU64, Ordering};
688 static TEMPDIR_COUNTER: AtomicU64 = AtomicU64::new(0);
689
690 let counter = TEMPDIR_COUNTER.fetch_add(1, Ordering::Relaxed);
691 let dir = std::env::temp_dir().join(format!(
692 "varta-clock-{}-{}-{}",
693 std::process::id(),
694 SystemTime::now()
695 .duration_since(UNIX_EPOCH)
696 .map(|d| d.as_nanos())
697 .unwrap_or(0),
698 counter
699 ));
700 std::fs::create_dir(&dir).expect("create tempdir");
701 // Cerebrum 2026-05-13: process-wide umask from a concurrent
702 // UnixDatagram::bind elsewhere can strip the executable bit; force
703 // 0o755 before any further open() inside this dir.
704 std::fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o755))
705 .expect("chmod 0o755");
706 let sock_path = dir.join("varta.sock");
707 let listener = UnixDatagram::bind(&sock_path).expect("bind listener");
708 (listener, sock_path)
709 }
710
711 #[test]
712 fn clock_regression_counter_stays_zero_on_forward_clock() {
713 let (_listener, path) = bind_listener();
714 let mut agent = Varta::connect(&path).expect("connect");
715 // Outcome is irrelevant; this test asserts on the regression counter
716 // side effect, not the send result. The explicit `let _` discharges
717 // the `#[must_use]` on `BeatOutcome`.
718 let _ = agent.beat(Status::Ok, 0);
719 let _ = agent.beat(Status::Ok, 0);
720 assert_eq!(
721 agent.clock_regressions(),
722 0,
723 "no regression should be observed on a forward clock"
724 );
725 let _ = std::fs::remove_file(&path);
726 let _ = std::fs::remove_dir(path.parent().unwrap());
727 }
728
729 #[test]
730 fn clock_regression_counter_increments_on_backwards_clock() {
731 let (_listener, path) = bind_listener();
732 let mut agent = Varta::connect(&path).expect("connect");
733
734 // Jam the high-water mark past any plausible `start.elapsed()` so
735 // every subsequent beat trips the regression branch.
736 agent.last_timestamp = u64::MAX / 2;
737 let baseline_ts = agent.last_timestamp;
738
739 // Outcome is irrelevant; this test asserts on the regression counter
740 // side effect. The explicit `let _` discharges `#[must_use]`.
741 let _ = agent.beat(Status::Ok, 0);
742 assert_eq!(agent.clock_regressions(), 1);
743 // Wire timestamp must remain monotonic — `.max()` is unchanged.
744 assert_eq!(agent.last_timestamp, baseline_ts);
745
746 let _ = agent.beat(Status::Ok, 0);
747 assert_eq!(
748 agent.clock_regressions(),
749 2,
750 "counter must accumulate across consecutive regressions"
751 );
752 assert_eq!(agent.last_timestamp, baseline_ts);
753
754 let _ = std::fs::remove_file(&path);
755 let _ = std::fs::remove_dir(path.parent().unwrap());
756 }
757
758 /// Steady-state (no fork): the recovery counter must stay at zero
759 /// across many beats. This guards against accidental triggering of
760 /// the fork-detection branch on a forward-only PID.
761 #[test]
762 fn same_pid_does_not_trigger_fork_recovery() {
763 let (_listener, path) = bind_listener();
764 let mut agent = Varta::connect(&path).expect("connect");
765 for _ in 0..64 {
766 let _ = agent.beat(Status::Ok, 0);
767 }
768 assert_eq!(
769 agent.fork_recoveries(),
770 0,
771 "no fork-recovery should fire in a single-process beat loop"
772 );
773 let _ = std::fs::remove_file(&path);
774 let _ = std::fs::remove_dir(path.parent().unwrap());
775 }
776
777 /// Spoof a PID change via the test hook. The next `beat()` must
778 /// detect the mismatch, refresh the UDS transport, and increment
779 /// the fork-recovery counter. The frame counter (`nonce`) resets so
780 /// the post-recovery beat carries `nonce == 1`.
781 #[test]
782 fn spoofed_fork_triggers_uds_transport_reconnect() {
783 let (_listener, path) = bind_listener();
784 let mut agent = Varta::connect(&path).expect("connect");
785
786 // Emit a few beats so the counters are non-trivial pre-fork.
787 let _ = agent.beat(Status::Ok, 0);
788 let _ = agent.beat(Status::Ok, 0);
789 assert_eq!(agent.fork_recoveries(), 0);
790 assert_eq!(agent.nonce, 2);
791
792 // Spoof: pretend a fork happened. The actual pid is unchanged;
793 // the snapshot we lie about is `connect_pid`.
794 let real_pid = std::process::id();
795 agent.set_connect_pid_for_test(real_pid.wrapping_add(1));
796
797 // The next beat must observe the mismatch and recover.
798 let _ = agent.beat(Status::Ok, 0);
799 assert_eq!(
800 agent.fork_recoveries(),
801 1,
802 "fork-recovery counter must increment exactly once"
803 );
804 assert_eq!(
805 agent.connect_pid, real_pid,
806 "connect_pid must be refreshed to the current pid"
807 );
808 assert_eq!(
809 agent.nonce, 1,
810 "nonce must reset to 0 on recovery, then increment to 1 for the beat"
811 );
812
813 let _ = std::fs::remove_file(&path);
814 let _ = std::fs::remove_dir(path.parent().unwrap());
815 }
816
817 /// A `BeatTransport` whose `reconnect()` always fails. Used to assert
818 /// that a fork-recovery whose transport refresh fails surfaces as
819 /// `BeatOutcome::Failed` *and* does not increment the counter.
820 struct AlwaysFailReconnect {
821 sent: u32,
822 }
823
824 impl BeatTransport for AlwaysFailReconnect {
825 fn send(&mut self, _buf: &[u8; 32]) -> io::Result<usize> {
826 self.sent = self.sent.saturating_add(1);
827 Ok(32)
828 }
829
830 fn reconnect(&mut self) -> io::Result<()> {
831 Err(io::Error::from(io::ErrorKind::PermissionDenied))
832 }
833 }
834
835 fn varta_with_transport<T: BeatTransport>(transport: T) -> Varta<T> {
836 Varta {
837 transport,
838 buf: [0u8; 32],
839 start: Instant::now(),
840 nonce: 0,
841 consecutive_dropped: 0,
842 reconnect_after: 0,
843 last_timestamp: 0,
844 clock_regressions: 0,
845 connect_pid: std::process::id(),
846 fork_recoveries: 0,
847 }
848 }
849
850 #[test]
851 fn fork_recovery_with_failing_reconnect_returns_failed() {
852 let mut agent = varta_with_transport(AlwaysFailReconnect { sent: 0 });
853
854 // Spoof a fork.
855 agent.set_connect_pid_for_test(std::process::id().wrapping_add(1));
856
857 let outcome = agent.beat(Status::Ok, 0);
858 match outcome {
859 BeatOutcome::Failed(e) => {
860 assert_eq!(e.kind, io::ErrorKind::PermissionDenied);
861 }
862 other => panic!("expected Failed on reconnect failure, got {other:?}"),
863 }
864 assert_eq!(
865 agent.fork_recoveries(),
866 0,
867 "counter must NOT increment when the recovery transport refresh fails"
868 );
869 assert_eq!(
870 agent.transport.sent, 0,
871 "no frame should be sent when fork-recovery fails"
872 );
873 }
874
875 /// Secure-UDP path: spoofing a fork must rotate the AEAD session salt
876 /// and IV prefix. This is the load-bearing test — without salt
877 /// rotation, the child would derive the parent's nonce stream under
878 /// the same key.
879 #[cfg(feature = "secure-udp")]
880 #[test]
881 fn spoofed_fork_rotates_secure_udp_session_salt() {
882 use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
883 use varta_vlp::crypto::Key;
884
885 let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9876, 0, 0));
886 let key = Key::from_bytes([0x42; 32]);
887 let mut agent = Varta::connect_secure_udp(addr, key).expect("connect");
888
889 // Snapshot pre-fork crypto state.
890 let prefix_before = agent.iv_prefix_for_test();
891
892 // Spoof: pretend a fork happened.
893 agent.set_connect_pid_for_test(std::process::id().wrapping_add(1));
894
895 // The destination is a closed ephemeral address so the send may
896 // fail at the network layer, but the fork-recovery + reconnect
897 // logic runs before the syscall. The recovery and the IV
898 // rotation are what we are asserting on.
899 let _ = agent.beat(Status::Ok, 0);
900
901 assert_eq!(
902 agent.fork_recoveries(),
903 1,
904 "secure-UDP fork-recovery counter must increment"
905 );
906 let prefix_after = agent.iv_prefix_for_test();
907 assert_ne!(
908 prefix_before, prefix_after,
909 "IV prefix must rotate on fork-recovery (defeats nonce reuse)"
910 );
911 assert_eq!(
912 agent.iv_prefix_index_for_test(),
913 0,
914 "prefix_index must reset to 0 on transport.reconnect()"
915 );
916 }
917}