pub struct Varta<T: BeatTransport = UdsTransport> { /* private fields */ }Expand description
Agent-side handle that owns a configured BeatTransport and a 32-byte
scratch buffer.
Varta::connect is the single allocation point: it creates the transport,
switches it to non-blocking mode (where applicable), and captures the epoch
used for monotonic timestamps. The process ID is fetched afresh via
std::process::id on every beat so forked children
report their own PID. Every subsequent beat() reuses the owned buffer
and emits a frame without touching the heap.
The default transport is UdsTransport (Unix Domain Socket). Use
Varta::connect() to create a UDS-backed agent. Other transports (e.g.
UDP) are available behind feature flags.
§Examples
use varta_client::{Status, Varta};
let mut agent = Varta::connect("/tmp/varta.sock")?;
agent.beat(Status::Ok, 0);§Thread safety
Varta is Send: the underlying transport is Send, and a beat
issues no shared state. Varta is not Sync: concurrent
&Varta::beat calls would race on the kernel-side socket send buffer
ordering. To share across threads, wrap in a std::sync::Mutex or move
the handle into a dedicated emitter thread or channel.
After fork(2) the child inherits this handle. Fork is auto-detected
on the next beat: if std::process::id() differs from the
PID captured at connect time, the underlying transport’s
reconnect is invoked
before the frame is built. On secure-UDP this re-reads OS entropy and
rotates the AEAD session salt, making catastrophic nonce reuse across the
fork boundary structurally impossible. The recovery is silent — the caller
sees BeatOutcome::Sent — and is observable via
fork_recoveries. Calling
reconnect explicitly in the child is still supported
and idempotent.
Implementations§
Source§impl Varta<UdsTransport>
impl Varta<UdsTransport>
Sourcepub fn connect<P: AsRef<Path>>(path: P) -> Result<Self>
pub fn connect<P: AsRef<Path>>(path: P) -> Result<Self>
Connect to the observer listening on path via Unix Domain Socket and
prepare the agent for non-blocking emission.
Stores an Instant for per-frame elapsed-nanosecond timestamps. The
process ID is read afresh on every Varta::beat via
std::process::id so each frame carries the current PID. Subsequent
calls to Varta::beat do not allocate.
§Errors
Returns an io::Error if the socket cannot be created, the peer
path cannot be reached, or non-blocking mode cannot be enabled.
Examples found in repository?
10fn main() -> std::io::Result<()> {
11 let mut agent = varta_client::Varta::connect("/tmp/varta.sock")?;
12 loop {
13 match agent.beat(varta_client::Status::Ok, 0) {
14 varta_client::BeatOutcome::Sent => {}
15 varta_client::BeatOutcome::Dropped(_) => {
16 eprintln!("varta: beat dropped (observer down or queue full)");
17 }
18 varta_client::BeatOutcome::Failed(e) => {
19 eprintln!("varta: beat failed: {e}");
20 }
21 }
22 std::thread::sleep(std::time::Duration::from_millis(500));
23 }
24}More examples
23fn main() -> std::io::Result<()> {
24 let mut agent = varta_client::Varta::connect("/tmp/varta.sock")?;
25 loop {
26 let depth = QUEUE_DEPTH.load(Ordering::Relaxed);
27 let err = LAST_ERROR.load(Ordering::Relaxed);
28 let payload = ((depth as u32) << 16) | (err as u32);
29 match agent.beat(varta_client::Status::Ok, payload) {
30 varta_client::BeatOutcome::Sent => {}
31 varta_client::BeatOutcome::Dropped(_) => {
32 eprintln!("varta: beat dropped (observer down or queue full)");
33 }
34 varta_client::BeatOutcome::Failed(e) => {
35 eprintln!("varta: beat failed: {e}");
36 }
37 }
38 std::thread::sleep(std::time::Duration::from_millis(500));
39 }
40}Source§impl<T: BeatTransport> Varta<T>
impl<T: BeatTransport> Varta<T>
Sourcepub fn beat(&mut self, status: Status, payload: u32) -> BeatOutcome
pub fn beat(&mut self, status: Status, payload: u32) -> BeatOutcome
Emit a single VLP frame carrying status and an opaque 8-byte
payload.
The nonce increments first (starting from 1) and wraps to 0 on
exhaustion; the very first beat after connect carries nonce == 1. The frame is
constructed on the stack, encoded into the owned scratch buffer, and
handed to send(2). The steady-state path (Sent / Dropped) neither
blocks nor allocates; the rare Failed path may allocate when cloning
the underlying io::Error.
When set_reconnect_after is enabled and
the consecutive-dropped threshold is crossed, beat will internally
reconnect the socket and retry the send before returning. The retry
path allocates a fresh socket; this is acceptable because observer
restarts are rare and the steady-state path remains allocation-free.
Examples found in repository?
10fn main() -> std::io::Result<()> {
11 let mut agent = varta_client::Varta::connect("/tmp/varta.sock")?;
12 loop {
13 match agent.beat(varta_client::Status::Ok, 0) {
14 varta_client::BeatOutcome::Sent => {}
15 varta_client::BeatOutcome::Dropped(_) => {
16 eprintln!("varta: beat dropped (observer down or queue full)");
17 }
18 varta_client::BeatOutcome::Failed(e) => {
19 eprintln!("varta: beat failed: {e}");
20 }
21 }
22 std::thread::sleep(std::time::Duration::from_millis(500));
23 }
24}More examples
23fn main() -> std::io::Result<()> {
24 let mut agent = varta_client::Varta::connect("/tmp/varta.sock")?;
25 loop {
26 let depth = QUEUE_DEPTH.load(Ordering::Relaxed);
27 let err = LAST_ERROR.load(Ordering::Relaxed);
28 let payload = ((depth as u32) << 16) | (err as u32);
29 match agent.beat(varta_client::Status::Ok, payload) {
30 varta_client::BeatOutcome::Sent => {}
31 varta_client::BeatOutcome::Dropped(_) => {
32 eprintln!("varta: beat dropped (observer down or queue full)");
33 }
34 varta_client::BeatOutcome::Failed(e) => {
35 eprintln!("varta: beat failed: {e}");
36 }
37 }
38 std::thread::sleep(std::time::Duration::from_millis(500));
39 }
40}Sourcepub fn reconnect(&mut self) -> Result<()>
pub fn reconnect(&mut self) -> Result<()>
Re-establish the underlying transport connection.
After an observer restart the old channel is stale — every beat()
returns BeatOutcome::Dropped (with reason DropReason::PeerGone
or DropReason::NoObserver) until reconnected. Call reconnect to establish
a fresh connection to the target stored at connect
time. Agent identity (nonce, start clock) is preserved.
Also refreshes the internal fork-detection snapshot so an explicit reconnect issued from a forked child (the documented manual escape hatch) cannot leave a stale parent PID behind that would re-trigger auto-recovery on the next beat.
This is the only post-connect allocation site and
should only be called when recovery is needed, not on the steady-state
beat path.
Sourcepub fn set_reconnect_after(&mut self, n: u32)
pub fn set_reconnect_after(&mut self, n: u32)
Enable automatic reconnect after n consecutive
BeatOutcome::Dropped outcomes. Set to 0 to disable (the
default).
When enabled, beat increments an internal counter on
each Dropped outcome. After n consecutive drops — a strong signal
that the observer channel is stale — beat calls reconnect
internally and retries the send before returning. The counter resets
to zero on any Sent or Failed outcome, and after a successful
reconnect.
Resets the internal consecutive-dropped counter to zero so that the new threshold gates future drops rather than immediately triggering on a past-saturated counter.
Sourcepub fn clock_regressions(&self) -> u64
pub fn clock_regressions(&self) -> u64
Number of times beat has observed
Instant::now regress since
connect. Saturating; never wraps.
The wire-format timestamp remains monotonic because beat() clamps
it through .max(), so a regression manifests on the wire as a
duplicate timestamp rather than a backwards jump. A non-zero value
here is the only in-process signal of the underlying platform-clock
bug.
Consumers wiring a Prometheus exporter SHOULD publish this as a
counter named varta_client_clock_regression_total.
Sourcepub fn fork_recoveries(&self) -> u64
pub fn fork_recoveries(&self) -> u64
Number of times beat has observed a fork(2)
transition (i.e. std::process::id() differing from the PID captured
at connect time) and refreshed the underlying
transport in response. Saturating; never wraps.
A non-zero value is the operational signal that auto-recovery has fired. On the secure-UDP transport, each event corresponds to one AEAD session-salt rotation in the forked child — the structural guarantee against nonce reuse across the fork boundary.
Consumers wiring a Prometheus exporter SHOULD publish this as a
counter named varta_client_fork_recoveries_total.