use std::fmt;
use std::io::{self, Write};
use std::path::Path;
use std::time::Instant;
use varta_vlp::{Frame, Status, NONCE_TERMINAL};
use crate::transport::{BeatTransport, UdsTransport};
#[cfg(feature = "udp")]
use crate::transport::UdpTransport;
#[cfg(feature = "secure-udp")]
use crate::secure_transport::SecureUdpTransport;
#[cfg(feature = "secure-udp")]
use varta_vlp::crypto::Key;
#[cfg(target_os = "linux")]
const ENOBUFS: i32 = 105;
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))]
const ENOBUFS: i32 = 55;
#[cfg(any(target_os = "solaris", target_os = "illumos"))]
const ENOBUFS: i32 = 111;
#[cfg(not(any(
target_os = "linux",
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
target_os = "solaris",
target_os = "illumos",
)))]
compile_error!("ENOBUFS value is unknown for this target — add it to the cfg gates above");
pub fn classify_send_error(e: &io::Error) -> BeatOutcome {
if let Some(code) = e.raw_os_error() {
if code == ENOBUFS {
return BeatOutcome::Dropped(DropReason::KernelQueueFull);
}
}
match e.kind() {
io::ErrorKind::WouldBlock => BeatOutcome::Dropped(DropReason::KernelQueueFull),
io::ErrorKind::ConnectionRefused | io::ErrorKind::NotFound => {
BeatOutcome::Dropped(DropReason::NoObserver)
}
io::ErrorKind::ConnectionReset
| io::ErrorKind::NotConnected
| io::ErrorKind::BrokenPipe => BeatOutcome::Dropped(DropReason::PeerGone),
io::ErrorKind::StorageFull => BeatOutcome::Dropped(DropReason::StorageFull),
_ => BeatOutcome::Failed(BeatError::from_io(e)),
}
}
#[derive(Copy, Clone, Eq, PartialEq)]
pub struct BeatError {
pub errno: i32,
pub kind: io::ErrorKind,
}
impl BeatError {
pub const UNKNOWN_ERRNO: i32 = 0;
pub fn from_io(e: &io::Error) -> Self {
Self {
errno: e.raw_os_error().unwrap_or(Self::UNKNOWN_ERRNO),
kind: e.kind(),
}
}
pub fn to_io_error(self) -> io::Error {
if self.errno != Self::UNKNOWN_ERRNO {
io::Error::from_raw_os_error(self.errno)
} else {
io::Error::from(self.kind)
}
}
}
impl fmt::Debug for BeatError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BeatError")
.field("errno", &self.errno)
.field("kind", &self.kind)
.finish()
}
}
impl fmt::Display for BeatError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.errno != Self::UNKNOWN_ERRNO {
write!(f, "send failed: {:?} (errno={})", self.kind, self.errno)
} else {
write!(f, "send failed: {:?}", self.kind)
}
}
}
impl std::error::Error for BeatError {}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum DropReason {
KernelQueueFull,
NoObserver,
PeerGone,
StorageFull,
}
impl fmt::Display for DropReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::KernelQueueFull => f.write_str("kernel queue full"),
Self::NoObserver => f.write_str("no observer"),
Self::PeerGone => f.write_str("peer gone"),
Self::StorageFull => f.write_str("storage full"),
}
}
}
const _: () = {
assert!(core::mem::size_of::<DropReason>() == 1);
};
#[must_use]
pub enum BeatOutcome {
Sent,
Dropped(DropReason),
Failed(BeatError),
}
impl fmt::Debug for BeatOutcome {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Sent => write!(f, "Sent"),
Self::Dropped(r) => write!(f, "Dropped({r:?})"),
Self::Failed(e) => write!(f, "Failed({e:?})"),
}
}
}
impl fmt::Display for BeatOutcome {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Sent => write!(f, "sent"),
Self::Dropped(r) => write!(f, "dropped: {r}"),
Self::Failed(e) => write!(f, "failed: {e}"),
}
}
}
pub struct Varta<T: BeatTransport = UdsTransport> {
transport: T,
buf: [u8; 32],
start: Instant,
nonce: u64,
consecutive_dropped: u32,
reconnect_after: u32,
last_timestamp: u64,
clock_regressions: u64,
connect_pid: u32,
fork_recoveries: u64,
}
const _: () = {
const fn assert_send<T: Send>() {}
assert_send::<Varta<UdsTransport>>();
};
impl Varta<UdsTransport> {
pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let transport = UdsTransport::connect(path)?;
Ok(Self {
transport,
buf: [0u8; 32],
start: Instant::now(),
nonce: 0,
consecutive_dropped: 0,
reconnect_after: 0,
last_timestamp: 0,
clock_regressions: 0,
connect_pid: std::process::id(),
fork_recoveries: 0,
})
}
}
#[cfg(feature = "udp")]
impl Varta<UdpTransport> {
pub fn connect_udp(addr: std::net::SocketAddr) -> io::Result<Self> {
let transport = UdpTransport::connect(addr)?;
Ok(Self {
transport,
buf: [0u8; 32],
start: Instant::now(),
nonce: 0,
consecutive_dropped: 0,
reconnect_after: 0,
last_timestamp: 0,
clock_regressions: 0,
connect_pid: std::process::id(),
fork_recoveries: 0,
})
}
}
#[cfg(feature = "secure-udp")]
impl Varta<SecureUdpTransport> {
pub fn connect_secure_udp(addr: std::net::SocketAddr, key: Key) -> io::Result<Self> {
let transport = SecureUdpTransport::connect(addr, key)?;
Ok(Self {
transport,
buf: [0u8; 32],
start: Instant::now(),
nonce: 0,
consecutive_dropped: 0,
reconnect_after: 0,
last_timestamp: 0,
clock_regressions: 0,
connect_pid: std::process::id(),
fork_recoveries: 0,
})
}
pub fn connect_secure_udp_with_master(
addr: std::net::SocketAddr,
master_key: Key,
) -> io::Result<Self> {
let transport = SecureUdpTransport::connect_with_master(addr, master_key)?;
Ok(Self {
transport,
buf: [0u8; 32],
start: Instant::now(),
nonce: 0,
consecutive_dropped: 0,
reconnect_after: 0,
last_timestamp: 0,
clock_regressions: 0,
connect_pid: std::process::id(),
fork_recoveries: 0,
})
}
#[cfg(any(test, feature = "test-hooks"))]
pub fn set_iv_counter_for_test(&mut self, value: u32) {
self.transport.set_iv_counter_for_test(value);
}
#[cfg(any(test, feature = "test-hooks"))]
pub fn iv_prefix_for_test(&self) -> [u8; 8] {
self.transport.iv_prefix_for_test()
}
#[cfg(any(test, feature = "test-hooks"))]
pub fn iv_prefix_index_for_test(&self) -> u32 {
self.transport.iv_prefix_index_for_test()
}
#[cfg(any(test, feature = "test-hooks"))]
pub fn iv_counter_for_test(&self) -> u32 {
self.transport.iv_counter_for_test()
}
}
impl<T: BeatTransport> Varta<T> {
#[cfg(any(test, feature = "test-hooks"))]
pub fn set_connect_pid_for_test(&mut self, pid: u32) {
self.connect_pid = pid;
}
}
#[cold]
fn warn_nonce_wrapping() {
let _ = io::stderr().write_all(b"[varta-client] nonce exhausted; wrapping to 0\n");
}
impl<T: BeatTransport> Varta<T> {
fn send_frame(&mut self) -> BeatOutcome {
match self.transport.send(&self.buf) {
Ok(_) => BeatOutcome::Sent,
Err(e) => classify_send_error(&e),
}
}
pub fn beat(&mut self, status: Status, payload: u32) -> BeatOutcome {
let pid = std::process::id();
if pid != self.connect_pid {
match self.transport.reconnect() {
Ok(()) => {
self.connect_pid = pid;
self.fork_recoveries = self.fork_recoveries.saturating_add(1);
self.nonce = 0;
self.start = Instant::now();
self.last_timestamp = 0;
self.consecutive_dropped = 0;
}
Err(e) => return BeatOutcome::Failed(BeatError::from_io(&e)),
}
}
if self.nonce < NONCE_TERMINAL - 1 {
self.nonce += 1;
} else {
warn_nonce_wrapping();
self.nonce = 0;
}
let raw_elapsed = self.start.elapsed().as_nanos().min(u64::MAX as u128) as u64;
if raw_elapsed < self.last_timestamp {
self.clock_regressions = self.clock_regressions.saturating_add(1);
}
self.last_timestamp = self.last_timestamp.max(raw_elapsed);
let timestamp = self.last_timestamp;
debug_assert!(
self.nonce != NONCE_TERMINAL,
"regular beat nonce must not equal NONCE_TERMINAL sentinel"
);
let frame = Frame::new(status, pid, timestamp, self.nonce, payload);
frame.encode(&mut self.buf);
let outcome = self.send_frame();
match &outcome {
BeatOutcome::Dropped(_) => {
self.consecutive_dropped = self.consecutive_dropped.saturating_add(1);
if self.reconnect_after > 0
&& self.consecutive_dropped >= self.reconnect_after
&& self.transport.reconnect().is_ok()
{
let retry = self.send_frame();
if matches!(&retry, BeatOutcome::Dropped(_)) {
self.consecutive_dropped = self.reconnect_after;
} else {
self.consecutive_dropped = 0;
}
return retry;
}
outcome
}
_ => {
self.consecutive_dropped = 0;
outcome
}
}
}
pub fn reconnect(&mut self) -> io::Result<()> {
self.transport.reconnect()?;
self.connect_pid = std::process::id();
Ok(())
}
pub fn set_reconnect_after(&mut self, n: u32) {
self.reconnect_after = n;
self.consecutive_dropped = 0;
}
pub fn clock_regressions(&self) -> u64 {
self.clock_regressions
}
pub fn fork_recoveries(&self) -> u64 {
self.fork_recoveries
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::os::unix::fs::PermissionsExt;
use std::os::unix::net::UnixDatagram;
use std::time::{SystemTime, UNIX_EPOCH};
fn bind_listener() -> (UnixDatagram, std::path::PathBuf) {
use std::sync::atomic::{AtomicU64, Ordering};
static TEMPDIR_COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = TEMPDIR_COUNTER.fetch_add(1, Ordering::Relaxed);
let dir = std::env::temp_dir().join(format!(
"varta-clock-{}-{}-{}",
std::process::id(),
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0),
counter
));
std::fs::create_dir(&dir).expect("create tempdir");
std::fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o755))
.expect("chmod 0o755");
let sock_path = dir.join("varta.sock");
let listener = UnixDatagram::bind(&sock_path).expect("bind listener");
(listener, sock_path)
}
#[test]
fn clock_regression_counter_stays_zero_on_forward_clock() {
let (_listener, path) = bind_listener();
let mut agent = Varta::connect(&path).expect("connect");
let _ = agent.beat(Status::Ok, 0);
let _ = agent.beat(Status::Ok, 0);
assert_eq!(
agent.clock_regressions(),
0,
"no regression should be observed on a forward clock"
);
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir(path.parent().unwrap());
}
#[test]
fn clock_regression_counter_increments_on_backwards_clock() {
let (_listener, path) = bind_listener();
let mut agent = Varta::connect(&path).expect("connect");
agent.last_timestamp = u64::MAX / 2;
let baseline_ts = agent.last_timestamp;
let _ = agent.beat(Status::Ok, 0);
assert_eq!(agent.clock_regressions(), 1);
assert_eq!(agent.last_timestamp, baseline_ts);
let _ = agent.beat(Status::Ok, 0);
assert_eq!(
agent.clock_regressions(),
2,
"counter must accumulate across consecutive regressions"
);
assert_eq!(agent.last_timestamp, baseline_ts);
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir(path.parent().unwrap());
}
#[test]
fn same_pid_does_not_trigger_fork_recovery() {
let (_listener, path) = bind_listener();
let mut agent = Varta::connect(&path).expect("connect");
for _ in 0..64 {
let _ = agent.beat(Status::Ok, 0);
}
assert_eq!(
agent.fork_recoveries(),
0,
"no fork-recovery should fire in a single-process beat loop"
);
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir(path.parent().unwrap());
}
#[test]
fn spoofed_fork_triggers_uds_transport_reconnect() {
let (_listener, path) = bind_listener();
let mut agent = Varta::connect(&path).expect("connect");
let _ = agent.beat(Status::Ok, 0);
let _ = agent.beat(Status::Ok, 0);
assert_eq!(agent.fork_recoveries(), 0);
assert_eq!(agent.nonce, 2);
let real_pid = std::process::id();
agent.set_connect_pid_for_test(real_pid.wrapping_add(1));
let _ = agent.beat(Status::Ok, 0);
assert_eq!(
agent.fork_recoveries(),
1,
"fork-recovery counter must increment exactly once"
);
assert_eq!(
agent.connect_pid, real_pid,
"connect_pid must be refreshed to the current pid"
);
assert_eq!(
agent.nonce, 1,
"nonce must reset to 0 on recovery, then increment to 1 for the beat"
);
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir(path.parent().unwrap());
}
struct AlwaysFailReconnect {
sent: u32,
}
impl BeatTransport for AlwaysFailReconnect {
fn send(&mut self, _buf: &[u8; 32]) -> io::Result<usize> {
self.sent = self.sent.saturating_add(1);
Ok(32)
}
fn reconnect(&mut self) -> io::Result<()> {
Err(io::Error::from(io::ErrorKind::PermissionDenied))
}
}
fn varta_with_transport<T: BeatTransport>(transport: T) -> Varta<T> {
Varta {
transport,
buf: [0u8; 32],
start: Instant::now(),
nonce: 0,
consecutive_dropped: 0,
reconnect_after: 0,
last_timestamp: 0,
clock_regressions: 0,
connect_pid: std::process::id(),
fork_recoveries: 0,
}
}
#[test]
fn fork_recovery_with_failing_reconnect_returns_failed() {
let mut agent = varta_with_transport(AlwaysFailReconnect { sent: 0 });
agent.set_connect_pid_for_test(std::process::id().wrapping_add(1));
let outcome = agent.beat(Status::Ok, 0);
match outcome {
BeatOutcome::Failed(e) => {
assert_eq!(e.kind, io::ErrorKind::PermissionDenied);
}
other => panic!("expected Failed on reconnect failure, got {other:?}"),
}
assert_eq!(
agent.fork_recoveries(),
0,
"counter must NOT increment when the recovery transport refresh fails"
);
assert_eq!(
agent.transport.sent, 0,
"no frame should be sent when fork-recovery fails"
);
}
#[cfg(feature = "secure-udp")]
#[test]
fn spoofed_fork_rotates_secure_udp_session_salt() {
use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
use varta_vlp::crypto::Key;
let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9876, 0, 0));
let key = Key::from_bytes([0x42; 32]);
let mut agent = Varta::connect_secure_udp(addr, key).expect("connect");
let prefix_before = agent.iv_prefix_for_test();
agent.set_connect_pid_for_test(std::process::id().wrapping_add(1));
let _ = agent.beat(Status::Ok, 0);
assert_eq!(
agent.fork_recoveries(),
1,
"secure-UDP fork-recovery counter must increment"
);
let prefix_after = agent.iv_prefix_for_test();
assert_ne!(
prefix_before, prefix_after,
"IV prefix must rotate on fork-recovery (defeats nonce reuse)"
);
assert_eq!(
agent.iv_prefix_index_for_test(),
0,
"prefix_index must reset to 0 on transport.reconnect()"
);
}
}