use std::io::{self, ErrorKind};
use std::os::unix::fs::PermissionsExt;
use std::os::unix::io::AsRawFd;
use std::os::unix::net::UnixDatagram;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use varta_vlp::{DecodeError, Frame, Status};
use crate::peer_cred::{self, RecvResult};
use crate::tracker::{Tracker, Update, CAPACITY};
const READ_TIMEOUT: Duration = Duration::from_millis(100);
#[derive(Debug)]
pub enum Event {
Beat {
pid: u32,
status: Status,
payload: u64,
nonce: u64,
observer_ns: u64,
},
Stall {
pid: u32,
last_nonce: u64,
last_ns: u64,
observer_ns: u64,
},
Decode(DecodeError, u64),
AuthFailure {
claimed_pid: u32,
observer_ns: u64,
},
Io(io::Error, u64),
}
pub struct Observer {
sock: UnixDatagram,
path: PathBuf,
bound_dev: u64,
bound_ino: u64,
tracker: Tracker,
threshold_ns: u64,
start: Instant,
stall_queue: Vec<Option<Event>>,
stall_pending: Vec<(u32, u64, u64)>,
stall_cursor: usize,
}
impl Observer {
pub fn bind(path: impl AsRef<Path>, threshold: Duration, socket_mode: u32) -> io::Result<Self> {
let path = path.as_ref();
let owned_path: PathBuf = path.to_path_buf();
match UnixDatagram::bind(path) {
Ok(sock) => {
std::fs::set_permissions(path, std::fs::Permissions::from_mode(socket_mode))?;
Self::finish_bind(sock, threshold, owned_path)
}
Err(e) if e.kind() == ErrorKind::AddrInUse => {
match probe_live(path) {
Ok(true) => Err(io::Error::new(
ErrorKind::AddrInUse,
format!(
"another varta-watch is already running at {}",
path.display()
),
)),
Ok(false) => {
std::fs::remove_file(path)?;
let sock = UnixDatagram::bind(path)?;
std::fs::set_permissions(
path,
std::fs::Permissions::from_mode(socket_mode),
)?;
Self::finish_bind(sock, threshold, owned_path)
}
Err(e) => Err(io::Error::new(
e.kind(),
format!("cannot probe socket at {}: {e}", path.display()),
)),
}
}
Err(e) => Err(e),
}
}
pub fn poll(&mut self) -> Option<Event> {
if self.stall_cursor < self.stall_queue.len() {
let stall = self.stall_queue[self.stall_cursor].take();
self.stall_cursor += 1;
return stall;
}
match peer_cred::recv_authenticated(self.sock.as_raw_fd()) {
RecvResult::Authenticated { peer_pid, data } => {
let now_ns = self.now_ns();
match Frame::decode(&data) {
Ok(frame) => {
#[cfg(target_os = "linux")]
if frame.pid != peer_pid {
return Some(Event::AuthFailure {
claimed_pid: frame.pid,
observer_ns: now_ns,
});
}
let _ = peer_pid; match self.tracker.record(&frame, now_ns, self.threshold_ns) {
Update::Inserted | Update::Refreshed => Some(Event::Beat {
pid: frame.pid,
status: frame.status,
payload: frame.payload,
nonce: frame.nonce,
observer_ns: now_ns,
}),
Update::OutOfOrder | Update::CapacityExceeded => None,
}
}
Err(e) => Some(Event::Decode(e, now_ns)),
}
}
RecvResult::WouldBlock => {
self.drain_stalls();
if self.stall_cursor < self.stall_queue.len() {
let stall = self.stall_queue[self.stall_cursor].take();
self.stall_cursor += 1;
return stall;
}
None
}
RecvResult::ShortRead => None,
RecvResult::IoError(e) => Some(Event::Io(e, self.now_ns())),
}
}
fn now_ns(&self) -> u64 {
let elapsed = self.start.elapsed().as_nanos();
elapsed.min(u64::MAX as u128) as u64
}
fn drain_stalls(&mut self) {
let now_ns = self.now_ns();
self.stall_queue.clear();
self.stall_cursor = 0;
self.stall_pending.clear();
for slot in self
.tracker
.iter_stalled(now_ns, self.threshold_ns)
.filter(|slot| !slot.stall_emitted)
{
self.stall_pending
.push((slot.pid, slot.last_nonce, slot.last_ns));
}
for &(pid, last_nonce, last_ns) in &self.stall_pending {
self.stall_queue.push(Some(Event::Stall {
pid,
last_nonce,
last_ns,
observer_ns: now_ns,
}));
self.tracker.mark_stall_emitted(pid);
}
}
pub fn drain_evictions(&mut self) -> u64 {
self.tracker.take_evictions()
}
pub fn drain_capacity_exceeded(&mut self) -> u64 {
self.tracker.take_capacity_exceeded()
}
fn finish_bind(sock: UnixDatagram, threshold: Duration, path: PathBuf) -> io::Result<Self> {
use std::os::unix::fs::MetadataExt;
sock.set_read_timeout(Some(READ_TIMEOUT))?;
let raw_fd = sock.as_raw_fd();
peer_cred::enable_credential_passing(raw_fd)?;
let threshold_ns = threshold.as_nanos().min(u64::MAX as u128) as u64;
let meta = std::fs::metadata(&path)?;
let bound_dev = meta.dev();
let bound_ino = meta.ino();
Ok(Observer {
sock,
path,
bound_dev,
bound_ino,
tracker: Tracker::new(),
threshold_ns,
start: Instant::now(),
stall_queue: Vec::new(),
stall_pending: Vec::with_capacity(CAPACITY),
stall_cursor: 0,
})
}
}
fn probe_live(path: &Path) -> io::Result<bool> {
let sock = UnixDatagram::unbound()?;
if let Err(e) = sock.connect(path) {
return match e.kind() {
ErrorKind::PermissionDenied => Err(e),
_ => Ok(false), };
}
match sock.send(&[]) {
Ok(_) => Ok(true),
Err(e) if e.kind() == ErrorKind::PermissionDenied => Err(e),
Err(_) => Ok(false), }
}
impl Drop for Observer {
fn drop(&mut self) {
use std::os::unix::fs::MetadataExt;
if let Ok(meta) = std::fs::metadata(&self.path) {
if meta.dev() == self.bound_dev && meta.ino() == self.bound_ino {
let _ = std::fs::remove_file(&self.path);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
fn unique_sock_path() -> PathBuf {
let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
let mut p = std::env::temp_dir();
p.push(format!(
"varta-observer-drop-{}-{}.sock",
std::process::id(),
n
));
let _ = std::fs::remove_file(&p);
p
}
#[test]
fn drop_unlinks_bound_socket() {
let path = unique_sock_path();
let obs = Observer::bind(&path, Duration::from_secs(1), 0o600)
.expect("bind should succeed on a clean temp path");
assert!(path.exists(), "socket file must exist after bind");
drop(obs);
assert!(!path.exists(), "socket file must be removed after drop");
}
}