use std::io;
use std::os::unix::net::UnixDatagram;
use std::path::{Path, PathBuf};
use std::time::Instant;
use varta_vlp::{Frame, Status, NONCE_TERMINAL};
#[cfg(target_os = "linux")]
const ENOBUFS: i32 = 105;
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))]
const ENOBUFS: i32 = 55;
pub fn classify_send_error(e: &io::Error) -> BeatOutcome {
if let Some(code) = e.raw_os_error() {
if code == ENOBUFS {
return BeatOutcome::Dropped;
}
}
match e.kind() {
io::ErrorKind::WouldBlock
| io::ErrorKind::ConnectionRefused
| io::ErrorKind::ConnectionReset
| io::ErrorKind::NotFound
| io::ErrorKind::NotConnected
| io::ErrorKind::BrokenPipe
| io::ErrorKind::OutOfMemory
| io::ErrorKind::StorageFull => BeatOutcome::Dropped,
_ => {
let cloned = match e.raw_os_error() {
Some(code) => io::Error::from_raw_os_error(code),
None => io::Error::from(e.kind()),
};
BeatOutcome::Failed(cloned)
}
}
}
#[derive(Debug)]
pub enum BeatOutcome {
Sent,
Dropped,
Failed(io::Error),
}
pub struct Varta {
sock: UnixDatagram,
buf: [u8; 32],
start: Instant,
nonce: u64,
path: PathBuf,
consecutive_dropped: u32,
reconnect_after: u32,
}
impl Varta {
pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let path = path.as_ref().to_path_buf();
let sock = UnixDatagram::unbound()?;
sock.connect(&path)?;
sock.set_nonblocking(true)?;
Ok(Self {
sock,
buf: [0u8; 32],
start: Instant::now(),
nonce: 0,
path,
consecutive_dropped: 0,
reconnect_after: 0,
})
}
fn send_frame(&mut self) -> BeatOutcome {
match self.sock.send(&self.buf) {
Ok(_) => BeatOutcome::Sent,
Err(e) => classify_send_error(&e),
}
}
pub fn beat(&mut self, status: Status, payload: u64) -> BeatOutcome {
self.nonce = self.nonce.saturating_add(1).min(NONCE_TERMINAL - 1);
let timestamp = self.start.elapsed().as_nanos() as u64;
let frame = Frame::new(status, std::process::id(), timestamp, self.nonce, payload);
frame.encode(&mut self.buf);
let outcome = self.send_frame();
match &outcome {
BeatOutcome::Dropped => {
self.consecutive_dropped += 1;
if self.reconnect_after > 0
&& self.consecutive_dropped >= self.reconnect_after
&& self.reconnect().is_ok()
{
return self.send_frame();
}
outcome
}
_ => {
self.consecutive_dropped = 0;
outcome
}
}
}
pub fn reconnect(&mut self) -> io::Result<()> {
let sock = UnixDatagram::unbound()?;
sock.connect(&self.path)?;
sock.set_nonblocking(true)?;
self.sock = sock;
self.consecutive_dropped = 0;
Ok(())
}
pub fn set_reconnect_after(&mut self, n: u32) {
self.reconnect_after = n;
}
}