Skip to main content

varta_client/
client.rs

1//! Agent surface — `Varta` connects to the observer's UDS and `beat()` emits
2//! one fire-and-forget 32-byte VLP frame per call.
3
4use std::io;
5use std::os::unix::net::UnixDatagram;
6use std::path::{Path, PathBuf};
7use std::time::Instant;
8
9use varta_vlp::{Frame, Status, NONCE_TERMINAL};
10
11/// Linux value of `ENOBUFS` from `<asm-generic/errno.h>`. Hard-coded to
12/// preserve the zero-dependency invariant; do not replace with `libc`.
13#[cfg(target_os = "linux")]
14const ENOBUFS: i32 = 105;
15
16/// Darwin / BSD value of `ENOBUFS` from `<sys/errno.h>`. Hard-coded for
17/// the same reason.
18#[cfg(any(
19    target_os = "macos",
20    target_os = "ios",
21    target_os = "freebsd",
22    target_os = "netbsd",
23    target_os = "openbsd",
24    target_os = "dragonfly",
25))]
26const ENOBUFS: i32 = 55;
27
28/// Classify a `send(2)` error into a [`BeatOutcome`].
29///
30/// Checks the raw OS error code before the `ErrorKind` match so that
31/// `ENOBUFS` (kernel buffer pressure, transient) is caught even when the
32/// toolchain maps it to `ErrorKind::Other`. The `Failed` branch constructs
33/// the returned error without heap allocation.
34pub fn classify_send_error(e: &io::Error) -> BeatOutcome {
35    // (a) Raw-OS path first — catches ENOBUFS even when libstd has not
36    //     minted a dedicated ErrorKind for it on this toolchain.
37    if let Some(code) = e.raw_os_error() {
38        if code == ENOBUFS {
39            return BeatOutcome::Dropped;
40        }
41    }
42
43    match e.kind() {
44        // (b) Peer not present or channel transiently full.
45        io::ErrorKind::WouldBlock
46        | io::ErrorKind::ConnectionRefused
47        | io::ErrorKind::ConnectionReset
48        | io::ErrorKind::NotFound
49        | io::ErrorKind::NotConnected
50        | io::ErrorKind::BrokenPipe
51        // (c) Belt-and-braces: covers toolchains that surface ENOBUFS as a
52        //     kind rather than a raw_os_error.
53        | io::ErrorKind::OutOfMemory
54        | io::ErrorKind::StorageFull => BeatOutcome::Dropped,
55
56        // (d) Unexpected error: clone heap-free and escalate.
57        _ => {
58            let cloned = match e.raw_os_error() {
59                // Repr::Os(i32) — no heap allocation.
60                Some(code) => io::Error::from_raw_os_error(code),
61                // Repr::Simple(kind) — no heap allocation.
62                None => io::Error::from(e.kind()),
63            };
64            BeatOutcome::Failed(cloned)
65        }
66    }
67}
68
69/// Result of a single [`Varta::beat`] call.
70///
71/// `beat()` never blocks and never panics; the kernel's view of the send is
72/// translated into one of three steady-state outcomes. `Failed` carries the
73/// underlying error untouched for higher layers that wish to log or escalate.
74#[derive(Debug)]
75pub enum BeatOutcome {
76    /// The 32-byte datagram was accepted by the kernel.
77    Sent,
78    /// The kernel could not accept the datagram and the agent should treat
79    /// this as a no-op. Possible causes: the observer is not listening, the
80    /// socket file vanished, or the per-socket queue is full
81    /// (`WouldBlock` under non-blocking I/O).
82    Dropped,
83    /// An unexpected I/O error surfaced from the underlying `send(2)`. The
84    /// inner [`io::Error`] is forwarded verbatim; constructing it does not
85    /// allocate on the heap.
86    Failed(io::Error),
87}
88
89/// Agent-side handle that owns a connected [`UnixDatagram`] and a 32-byte
90/// scratch buffer.
91///
92/// `Varta::connect` is the single allocation point: it creates the socket,
93/// switches it to non-blocking mode, and captures the epoch used for
94/// monotonic timestamps. The process ID is fetched afresh via
95/// [`std::process::id`] on every [`beat`](Self::beat) so forked children
96/// report their own PID. Every subsequent `beat()` reuses the owned buffer
97/// and emits a frame without touching the heap.
98///
99/// # Examples
100///
101/// ```no_run
102/// use varta_client::{Status, Varta};
103/// let mut agent = Varta::connect("/tmp/varta.sock")?;
104/// agent.beat(Status::Ok, 0);
105/// # Ok::<(), std::io::Error>(())
106/// ```
107pub struct Varta {
108    sock: UnixDatagram,
109    buf: [u8; 32],
110    start: Instant,
111    nonce: u64,
112    path: PathBuf,
113    consecutive_dropped: u32,
114    reconnect_after: u32,
115}
116
117impl Varta {
118    /// Connect to the observer listening on `path` and prepare the agent for
119    /// non-blocking emission.
120    ///
121    /// Stores an `Instant` for per-frame elapsed-nanosecond timestamps. The
122    /// process ID is intentionally not cached here — it is read afresh on
123    /// every [`Varta::beat`] via [`std::process::id`] so a child that forks
124    /// after `connect` reports its own PID, not the parent's. Subsequent
125    /// calls to [`Varta::beat`] do not allocate.
126    ///
127    /// # Errors
128    ///
129    /// Returns an [`io::Error`] if the socket cannot be created, the peer
130    /// path cannot be reached, or non-blocking mode cannot be enabled.
131    pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<Self> {
132        let path = path.as_ref().to_path_buf();
133        let sock = UnixDatagram::unbound()?;
134        sock.connect(&path)?;
135        sock.set_nonblocking(true)?;
136        Ok(Self {
137            sock,
138            buf: [0u8; 32],
139            start: Instant::now(),
140            nonce: 0,
141            path,
142            consecutive_dropped: 0,
143            reconnect_after: 0,
144        })
145    }
146
147    fn send_frame(&mut self) -> BeatOutcome {
148        match self.sock.send(&self.buf) {
149            Ok(_) => BeatOutcome::Sent,
150            Err(e) => classify_send_error(&e),
151        }
152    }
153
154    /// Emit a single VLP frame carrying `status` and an opaque 8-byte
155    /// `payload`.
156    ///
157    /// The nonce increments first (capping at `NONCE_TERMINAL - 1`), so the
158    /// very first beat after `connect` carries `nonce == 1`. The frame is
159    /// constructed on the stack, encoded into the owned scratch buffer, and
160    /// handed to `send(2)`. This call neither blocks nor allocates on the
161    /// heap on the steady-state path.
162    ///
163    /// When [`set_reconnect_after`](Self::set_reconnect_after) is enabled and
164    /// the consecutive-dropped threshold is crossed, `beat` will internally
165    /// reconnect the socket and retry the send before returning. The retry
166    /// path allocates a fresh socket; this is acceptable because observer
167    /// restarts are rare and the steady-state path remains allocation-free.
168    pub fn beat(&mut self, status: Status, payload: u64) -> BeatOutcome {
169        self.nonce = self.nonce.saturating_add(1).min(NONCE_TERMINAL - 1);
170        let timestamp = self.start.elapsed().as_nanos() as u64;
171        let frame = Frame::new(status, std::process::id(), timestamp, self.nonce, payload);
172        frame.encode(&mut self.buf);
173        let outcome = self.send_frame();
174        match &outcome {
175            BeatOutcome::Dropped => {
176                self.consecutive_dropped += 1;
177                if self.reconnect_after > 0
178                    && self.consecutive_dropped >= self.reconnect_after
179                    && self.reconnect().is_ok()
180                {
181                    return self.send_frame();
182                }
183                outcome
184            }
185            _ => {
186                self.consecutive_dropped = 0;
187                outcome
188            }
189        }
190    }
191
192    /// Re-bind the Unix datagram socket to the original observer path.
193    ///
194    /// After an observer restart the old socket inode is stale — every
195    /// `beat()` returns [`BeatOutcome::Dropped`] forever. Call `reconnect`
196    /// to bind a fresh socket against the path stored at [`connect`](Self::connect)
197    /// time. Agent identity (`nonce`, `start` clock) is preserved; the PID
198    /// is re-read from the kernel on every beat so reconnect cannot strand
199    /// a stale identity.
200    ///
201    /// This is the only post-[`connect`](Self::connect) allocation site and
202    /// should only be called when recovery is needed, not on the steady-state
203    /// beat path.
204    pub fn reconnect(&mut self) -> io::Result<()> {
205        let sock = UnixDatagram::unbound()?;
206        sock.connect(&self.path)?;
207        sock.set_nonblocking(true)?;
208        self.sock = sock;
209        self.consecutive_dropped = 0;
210        Ok(())
211    }
212
213    /// Enable automatic reconnect after `n` consecutive
214    /// [`BeatOutcome::Dropped`] outcomes. Set to `0` to disable (the
215    /// default).
216    ///
217    /// When enabled, [`beat`](Self::beat) increments an internal counter on
218    /// each `Dropped` outcome. After `n` consecutive drops — a strong signal
219    /// that the observer socket is stale — `beat` calls [`reconnect`](Self::reconnect)
220    /// internally and retries the send before returning. The counter resets
221    /// to zero on any `Sent` or `Failed` outcome, and after a successful
222    /// reconnect.
223    pub fn set_reconnect_after(&mut self, n: u32) {
224        self.reconnect_after = n;
225    }
226}