Skip to main content

Varta

Struct Varta 

Source
pub struct Varta<T: BeatTransport = UdsTransport> { /* private fields */ }
Expand description

Agent-side handle that owns a configured BeatTransport and a 32-byte scratch buffer.

Varta::connect is the single allocation point: it creates the transport, switches it to non-blocking mode (where applicable), and captures the epoch used for monotonic timestamps. The process ID is fetched afresh via std::process::id on every beat so forked children report their own PID. Every subsequent beat() reuses the owned buffer and emits a frame without touching the heap.

The default transport is UdsTransport (Unix Domain Socket). Use Varta::connect() to create a UDS-backed agent. Other transports (e.g. UDP) are available behind feature flags.

§Examples

use varta_client::{Status, Varta};
let mut agent = Varta::connect("/tmp/varta.sock")?;
agent.beat(Status::Ok, 0);

§Thread safety

Varta is Send: the underlying transport is Send, and a beat issues no shared state. Varta is not Sync: concurrent &Varta::beat calls would race on the kernel-side socket send buffer ordering. To share across threads, wrap in a std::sync::Mutex or move the handle into a dedicated emitter thread or channel.

After fork(2) the child inherits this handle. Fork is auto-detected on the next beat: if std::process::id() differs from the PID captured at connect time, the underlying transport’s reconnect is invoked before the frame is built. On secure-UDP this re-reads OS entropy and rotates the AEAD session salt, making catastrophic nonce reuse across the fork boundary structurally impossible. The recovery is silent — the caller sees BeatOutcome::Sent — and is observable via fork_recoveries. Calling reconnect explicitly in the child is still supported and idempotent.

Implementations§

Source§

impl Varta<UdsTransport>

Source

pub fn connect<P: AsRef<Path>>(path: P) -> Result<Self>

Connect to the observer listening on path via Unix Domain Socket and prepare the agent for non-blocking emission.

Stores an Instant for per-frame elapsed-nanosecond timestamps. The process ID is read afresh on every Varta::beat via std::process::id so each frame carries the current PID. Subsequent calls to Varta::beat do not allocate.

§Errors

Returns an io::Error if the socket cannot be created, the peer path cannot be reached, or non-blocking mode cannot be enabled.

Examples found in repository?
examples/basic.rs (line 11)
10fn main() -> std::io::Result<()> {
11    let mut agent = varta_client::Varta::connect("/tmp/varta.sock")?;
12    loop {
13        match agent.beat(varta_client::Status::Ok, 0) {
14            varta_client::BeatOutcome::Sent => {}
15            varta_client::BeatOutcome::Dropped(_) => {
16                eprintln!("varta: beat dropped (observer down or queue full)");
17            }
18            varta_client::BeatOutcome::Failed(e) => {
19                eprintln!("varta: beat failed: {e}");
20            }
21        }
22        std::thread::sleep(std::time::Duration::from_millis(500));
23    }
24}
More examples
Hide additional examples
examples/with_payload.rs (line 24)
23fn main() -> std::io::Result<()> {
24    let mut agent = varta_client::Varta::connect("/tmp/varta.sock")?;
25    loop {
26        let depth = QUEUE_DEPTH.load(Ordering::Relaxed);
27        let err = LAST_ERROR.load(Ordering::Relaxed);
28        let payload = ((depth as u32) << 16) | (err as u32);
29        match agent.beat(varta_client::Status::Ok, payload) {
30            varta_client::BeatOutcome::Sent => {}
31            varta_client::BeatOutcome::Dropped(_) => {
32                eprintln!("varta: beat dropped (observer down or queue full)");
33            }
34            varta_client::BeatOutcome::Failed(e) => {
35                eprintln!("varta: beat failed: {e}");
36            }
37        }
38        std::thread::sleep(std::time::Duration::from_millis(500));
39    }
40}
Source§

impl<T: BeatTransport> Varta<T>

Source

pub fn beat(&mut self, status: Status, payload: u32) -> BeatOutcome

Emit a single VLP frame carrying status and an opaque 8-byte payload.

The nonce increments first (starting from 1) and wraps to 0 on exhaustion; the very first beat after connect carries nonce == 1. The frame is constructed on the stack, encoded into the owned scratch buffer, and handed to send(2). The steady-state path (Sent / Dropped) neither blocks nor allocates; the rare Failed path may allocate when cloning the underlying io::Error.

When set_reconnect_after is enabled and the consecutive-dropped threshold is crossed, beat will internally reconnect the socket and retry the send before returning. The retry path allocates a fresh socket; this is acceptable because observer restarts are rare and the steady-state path remains allocation-free.

Examples found in repository?
examples/basic.rs (line 13)
10fn main() -> std::io::Result<()> {
11    let mut agent = varta_client::Varta::connect("/tmp/varta.sock")?;
12    loop {
13        match agent.beat(varta_client::Status::Ok, 0) {
14            varta_client::BeatOutcome::Sent => {}
15            varta_client::BeatOutcome::Dropped(_) => {
16                eprintln!("varta: beat dropped (observer down or queue full)");
17            }
18            varta_client::BeatOutcome::Failed(e) => {
19                eprintln!("varta: beat failed: {e}");
20            }
21        }
22        std::thread::sleep(std::time::Duration::from_millis(500));
23    }
24}
More examples
Hide additional examples
examples/with_payload.rs (line 29)
23fn main() -> std::io::Result<()> {
24    let mut agent = varta_client::Varta::connect("/tmp/varta.sock")?;
25    loop {
26        let depth = QUEUE_DEPTH.load(Ordering::Relaxed);
27        let err = LAST_ERROR.load(Ordering::Relaxed);
28        let payload = ((depth as u32) << 16) | (err as u32);
29        match agent.beat(varta_client::Status::Ok, payload) {
30            varta_client::BeatOutcome::Sent => {}
31            varta_client::BeatOutcome::Dropped(_) => {
32                eprintln!("varta: beat dropped (observer down or queue full)");
33            }
34            varta_client::BeatOutcome::Failed(e) => {
35                eprintln!("varta: beat failed: {e}");
36            }
37        }
38        std::thread::sleep(std::time::Duration::from_millis(500));
39    }
40}
Source

pub fn reconnect(&mut self) -> Result<()>

Re-establish the underlying transport connection.

After an observer restart the old channel is stale — every beat() returns BeatOutcome::Dropped (with reason DropReason::PeerGone or DropReason::NoObserver) until reconnected. Call reconnect to establish a fresh connection to the target stored at connect time. Agent identity (nonce, start clock) is preserved.

Also refreshes the internal fork-detection snapshot so an explicit reconnect issued from a forked child (the documented manual escape hatch) cannot leave a stale parent PID behind that would re-trigger auto-recovery on the next beat.

This is the only post-connect allocation site and should only be called when recovery is needed, not on the steady-state beat path.

Source

pub fn set_reconnect_after(&mut self, n: u32)

Enable automatic reconnect after n consecutive BeatOutcome::Dropped outcomes. Set to 0 to disable (the default).

When enabled, beat increments an internal counter on each Dropped outcome. After n consecutive drops — a strong signal that the observer channel is stale — beat calls reconnect internally and retries the send before returning. The counter resets to zero on any Sent or Failed outcome, and after a successful reconnect.

Resets the internal consecutive-dropped counter to zero so that the new threshold gates future drops rather than immediately triggering on a past-saturated counter.

Source

pub fn clock_regressions(&self) -> u64

Number of times beat has observed Instant::now regress since connect. Saturating; never wraps.

The wire-format timestamp remains monotonic because beat() clamps it through .max(), so a regression manifests on the wire as a duplicate timestamp rather than a backwards jump. A non-zero value here is the only in-process signal of the underlying platform-clock bug.

Consumers wiring a Prometheus exporter SHOULD publish this as a counter named varta_client_clock_regression_total.

Source

pub fn fork_recoveries(&self) -> u64

Number of times beat has observed a fork(2) transition (i.e. std::process::id() differing from the PID captured at connect time) and refreshed the underlying transport in response. Saturating; never wraps.

A non-zero value is the operational signal that auto-recovery has fired. On the secure-UDP transport, each event corresponds to one AEAD session-salt rotation in the forked child — the structural guarantee against nonce reuse across the fork boundary.

Consumers wiring a Prometheus exporter SHOULD publish this as a counter named varta_client_fork_recoveries_total.

Auto Trait Implementations§

§

impl<T> Freeze for Varta<T>
where T: Freeze,

§

impl<T> RefUnwindSafe for Varta<T>
where T: RefUnwindSafe,

§

impl<T> Send for Varta<T>

§

impl<T> Sync for Varta<T>
where T: Sync,

§

impl<T> Unpin for Varta<T>
where T: Unpin,

§

impl<T> UnsafeUnpin for Varta<T>
where T: UnsafeUnpin,

§

impl<T> UnwindSafe for Varta<T>
where T: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.