varta_client/client.rs
1//! Agent surface — `Varta` connects to the observer's UDS and `beat()` emits
2//! one fire-and-forget 32-byte VLP frame per call.
3
4use std::io;
5use std::os::unix::net::UnixDatagram;
6use std::path::{Path, PathBuf};
7use std::time::Instant;
8
9use varta_vlp::{Frame, Status, NONCE_TERMINAL};
10
11/// Linux value of `ENOBUFS` from `<asm-generic/errno.h>`. Hard-coded to
12/// preserve the zero-dependency invariant; do not replace with `libc`.
13#[cfg(target_os = "linux")]
14const ENOBUFS: i32 = 105;
15
16/// Darwin / BSD value of `ENOBUFS` from `<sys/errno.h>`. Hard-coded for
17/// the same reason.
18#[cfg(any(
19 target_os = "macos",
20 target_os = "ios",
21 target_os = "freebsd",
22 target_os = "netbsd",
23 target_os = "openbsd",
24 target_os = "dragonfly",
25))]
26const ENOBUFS: i32 = 55;
27
28/// Classify a `send(2)` error into a [`BeatOutcome`].
29///
30/// Checks the raw OS error code before the `ErrorKind` match so that
31/// `ENOBUFS` (kernel buffer pressure, transient) is caught even when the
32/// toolchain maps it to `ErrorKind::Other`. The `Failed` branch constructs
33/// the returned error without heap allocation.
34pub fn classify_send_error(e: &io::Error) -> BeatOutcome {
35 // (a) Raw-OS path first — catches ENOBUFS even when libstd has not
36 // minted a dedicated ErrorKind for it on this toolchain.
37 if let Some(code) = e.raw_os_error() {
38 if code == ENOBUFS {
39 return BeatOutcome::Dropped;
40 }
41 }
42
43 match e.kind() {
44 // (b) Peer not present or channel transiently full.
45 io::ErrorKind::WouldBlock
46 | io::ErrorKind::ConnectionRefused
47 | io::ErrorKind::ConnectionReset
48 | io::ErrorKind::NotFound
49 | io::ErrorKind::NotConnected
50 | io::ErrorKind::BrokenPipe
51 // (c) Belt-and-braces: covers toolchains that surface ENOBUFS as a
52 // kind rather than a raw_os_error.
53 | io::ErrorKind::OutOfMemory
54 | io::ErrorKind::StorageFull => BeatOutcome::Dropped,
55
56 // (d) Unexpected error: clone heap-free and escalate.
57 _ => {
58 let cloned = match e.raw_os_error() {
59 // Repr::Os(i32) — no heap allocation.
60 Some(code) => io::Error::from_raw_os_error(code),
61 // Repr::Simple(kind) — no heap allocation.
62 None => io::Error::from(e.kind()),
63 };
64 BeatOutcome::Failed(cloned)
65 }
66 }
67}
68
69/// Result of a single [`Varta::beat`] call.
70///
71/// `beat()` never blocks and never panics; the kernel's view of the send is
72/// translated into one of three steady-state outcomes. `Failed` carries the
73/// underlying error untouched for higher layers that wish to log or escalate.
74#[derive(Debug)]
75pub enum BeatOutcome {
76 /// The 32-byte datagram was accepted by the kernel.
77 Sent,
78 /// The kernel could not accept the datagram and the agent should treat
79 /// this as a no-op. Possible causes: the observer is not listening, the
80 /// socket file vanished, or the per-socket queue is full
81 /// (`WouldBlock` under non-blocking I/O).
82 Dropped,
83 /// An unexpected I/O error surfaced from the underlying `send(2)`. The
84 /// inner [`io::Error`] is forwarded verbatim; constructing it does not
85 /// allocate on the heap.
86 Failed(io::Error),
87}
88
89/// Agent-side handle that owns a connected [`UnixDatagram`] and a 32-byte
90/// scratch buffer.
91///
92/// `Varta::connect` is the single allocation point: it creates the socket,
93/// switches it to non-blocking mode, and captures the epoch used for
94/// monotonic timestamps. The process ID is fetched afresh via
95/// [`std::process::id`] on every [`beat`](Self::beat) so forked children
96/// report their own PID. Every subsequent `beat()` reuses the owned buffer
97/// and emits a frame without touching the heap.
98///
99/// # Examples
100///
101/// ```no_run
102/// use varta_client::{Status, Varta};
103/// let mut agent = Varta::connect("/tmp/varta.sock")?;
104/// agent.beat(Status::Ok, 0);
105/// # Ok::<(), std::io::Error>(())
106/// ```
107pub struct Varta {
108 sock: UnixDatagram,
109 buf: [u8; 32],
110 start: Instant,
111 nonce: u64,
112 path: PathBuf,
113 consecutive_dropped: u32,
114 reconnect_after: u32,
115}
116
117impl Varta {
118 /// Connect to the observer listening on `path` and prepare the agent for
119 /// non-blocking emission.
120 ///
121 /// Stores an `Instant` for per-frame elapsed-nanosecond timestamps. The
122 /// process ID is intentionally not cached here — it is read afresh on
123 /// every [`Varta::beat`] via [`std::process::id`] so a child that forks
124 /// after `connect` reports its own PID, not the parent's. Subsequent
125 /// calls to [`Varta::beat`] do not allocate.
126 ///
127 /// # Errors
128 ///
129 /// Returns an [`io::Error`] if the socket cannot be created, the peer
130 /// path cannot be reached, or non-blocking mode cannot be enabled.
131 pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<Self> {
132 let path = path.as_ref().to_path_buf();
133 let sock = UnixDatagram::unbound()?;
134 sock.connect(&path)?;
135 sock.set_nonblocking(true)?;
136 Ok(Self {
137 sock,
138 buf: [0u8; 32],
139 start: Instant::now(),
140 nonce: 0,
141 path,
142 consecutive_dropped: 0,
143 reconnect_after: 0,
144 })
145 }
146
147 fn send_frame(&mut self) -> BeatOutcome {
148 match self.sock.send(&self.buf) {
149 Ok(_) => BeatOutcome::Sent,
150 Err(e) => classify_send_error(&e),
151 }
152 }
153
154 /// Emit a single VLP frame carrying `status` and an opaque 8-byte
155 /// `payload`.
156 ///
157 /// The nonce increments first (capping at `NONCE_TERMINAL - 1`), so the
158 /// very first beat after `connect` carries `nonce == 1`. The frame is
159 /// constructed on the stack, encoded into the owned scratch buffer, and
160 /// handed to `send(2)`. This call neither blocks nor allocates on the
161 /// heap on the steady-state path.
162 ///
163 /// When [`set_reconnect_after`](Self::set_reconnect_after) is enabled and
164 /// the consecutive-dropped threshold is crossed, `beat` will internally
165 /// reconnect the socket and retry the send before returning. The retry
166 /// path allocates a fresh socket; this is acceptable because observer
167 /// restarts are rare and the steady-state path remains allocation-free.
168 pub fn beat(&mut self, status: Status, payload: u64) -> BeatOutcome {
169 self.nonce = self.nonce.saturating_add(1).min(NONCE_TERMINAL - 1);
170 let timestamp = self.start.elapsed().as_nanos() as u64;
171 let frame = Frame::new(status, std::process::id(), timestamp, self.nonce, payload);
172 frame.encode(&mut self.buf);
173 let outcome = self.send_frame();
174 match &outcome {
175 BeatOutcome::Dropped => {
176 self.consecutive_dropped += 1;
177 if self.reconnect_after > 0
178 && self.consecutive_dropped >= self.reconnect_after
179 && self.reconnect().is_ok()
180 {
181 return self.send_frame();
182 }
183 outcome
184 }
185 _ => {
186 self.consecutive_dropped = 0;
187 outcome
188 }
189 }
190 }
191
192 /// Re-bind the Unix datagram socket to the original observer path.
193 ///
194 /// After an observer restart the old socket inode is stale — every
195 /// `beat()` returns [`BeatOutcome::Dropped`] forever. Call `reconnect`
196 /// to bind a fresh socket against the path stored at [`connect`](Self::connect)
197 /// time. Agent identity (`nonce`, `start` clock) is preserved; the PID
198 /// is re-read from the kernel on every beat so reconnect cannot strand
199 /// a stale identity.
200 ///
201 /// This is the only post-[`connect`](Self::connect) allocation site and
202 /// should only be called when recovery is needed, not on the steady-state
203 /// beat path.
204 pub fn reconnect(&mut self) -> io::Result<()> {
205 let sock = UnixDatagram::unbound()?;
206 sock.connect(&self.path)?;
207 sock.set_nonblocking(true)?;
208 self.sock = sock;
209 self.consecutive_dropped = 0;
210 Ok(())
211 }
212
213 /// Enable automatic reconnect after `n` consecutive
214 /// [`BeatOutcome::Dropped`] outcomes. Set to `0` to disable (the
215 /// default).
216 ///
217 /// When enabled, [`beat`](Self::beat) increments an internal counter on
218 /// each `Dropped` outcome. After `n` consecutive drops — a strong signal
219 /// that the observer socket is stale — `beat` calls [`reconnect`](Self::reconnect)
220 /// internally and retries the send before returning. The counter resets
221 /// to zero on any `Sent` or `Failed` outcome, and after a successful
222 /// reconnect.
223 pub fn set_reconnect_after(&mut self, n: u32) {
224 self.reconnect_after = n;
225 }
226}