use std::collections::HashMap;
use std::fmt::Write as _;
use std::fs::{File, OpenOptions};
use std::io::{self, BufWriter, ErrorKind, Read, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::path::Path;
use std::time::{Duration, Instant};
use varta_vlp::{DecodeError, Status};
use crate::observer::Event;
pub trait Exporter {
fn record(&mut self, ev: &Event);
fn flush(&mut self) -> io::Result<()>;
}
pub struct FileExporter {
sink: BufWriter<File>,
pending_err: Option<io::Error>,
}
impl FileExporter {
pub fn create(path: impl AsRef<Path>) -> io::Result<Self> {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(path.as_ref())?;
Ok(FileExporter {
sink: BufWriter::new(file),
pending_err: None,
})
}
}
impl Exporter for FileExporter {
fn record(&mut self, ev: &Event) {
if self.pending_err.is_some() {
return;
}
let line = match ev {
Event::Beat {
pid,
status,
payload,
nonce,
observer_ns,
} => format!(
"{observer_ns}\tbeat\t{pid}\t{nonce}\t{status}\t{payload}\n",
status = status_label(*status),
),
Event::Stall {
pid,
last_nonce,
last_ns: _,
observer_ns,
} => format!("{observer_ns}\tstall\t{pid}\t{last_nonce}\tstall\t-\n"),
Event::Decode(err, observer_ns) => {
format!("{observer_ns}\tdecode\t-\t-\t-\t{err:?}\n")
}
Event::Io(err, observer_ns) => {
format!("{observer_ns}\tio\t-\t-\t-\t{err}\n")
}
Event::AuthFailure {
claimed_pid,
observer_ns,
} => {
format!("{observer_ns}\tmismatch\t{claimed_pid}\t-\t-\tauth_failure\n")
}
};
if let Err(e) = self.sink.write_all(line.as_bytes()) {
self.pending_err = Some(e);
}
}
fn flush(&mut self) -> io::Result<()> {
let sink_result = self.sink.flush();
match (self.pending_err.take(), sink_result) {
(Some(e), _) => Err(e),
(None, Err(e)) => Err(e),
(None, Ok(())) => Ok(()),
}
}
}
fn status_label(s: Status) -> &'static str {
match s {
Status::Ok => "ok",
Status::Degraded => "degraded",
Status::Critical => "critical",
Status::Stall => "stall",
}
}
fn status_code(s: Status) -> u8 {
match s {
Status::Ok => 0,
Status::Degraded => 1,
Status::Critical => 2,
Status::Stall => 3,
}
}
const DECODE_KIND_LABELS: [&str; 3] = ["bad_magic", "bad_version", "bad_status"];
fn decode_kind_index(err: &DecodeError) -> usize {
match err {
DecodeError::BadMagic => 0,
DecodeError::BadVersion => 1,
DecodeError::BadStatus(_) => 2,
}
}
#[derive(Clone, Copy, Debug)]
struct GaugeRow {
beats_total: u64,
stalls_total: u64,
last_status: Option<u8>,
}
impl GaugeRow {
const fn new() -> Self {
GaugeRow {
beats_total: 0,
stalls_total: 0,
last_status: None,
}
}
}
const PROM_READ_DEADLINE: Duration = Duration::from_millis(10);
const PROM_WRITE_TIMEOUT: Duration = Duration::from_millis(50);
const PROM_REQUEST_CAP: usize = 4096;
pub struct PromExporter {
listener: TcpListener,
rows: HashMap<u32, GaugeRow>,
evicted_total: u64,
auth_failures_total: u64,
decode_errors_total: [u64; 3],
io_errors_total: u64,
capacity_exceeded_total: u64,
}
impl PromExporter {
pub fn bind(addr: SocketAddr) -> io::Result<Self> {
let listener = TcpListener::bind(addr)?;
listener.set_nonblocking(true)?;
Ok(PromExporter {
listener,
rows: HashMap::new(),
evicted_total: 0,
auth_failures_total: 0,
decode_errors_total: [0; 3],
io_errors_total: 0,
capacity_exceeded_total: 0,
})
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.listener.local_addr()
}
pub fn record_eviction(&mut self, count: u64) {
self.evicted_total = self.evicted_total.saturating_add(count);
}
pub fn record_capacity_exceeded(&mut self, count: u64) {
self.capacity_exceeded_total = self.capacity_exceeded_total.saturating_add(count);
}
pub fn serve_pending(&mut self) -> io::Result<()> {
let serve_deadline = Instant::now() + Duration::from_millis(100);
loop {
if Instant::now() >= serve_deadline {
return Ok(());
}
match self.listener.accept() {
Ok((stream, _)) => self.serve_one(stream)?,
Err(e) if e.kind() == ErrorKind::WouldBlock => return Ok(()),
Err(e) => return Err(e),
}
}
}
fn serve_one(&self, mut stream: TcpStream) -> io::Result<()> {
let deadline = Instant::now() + PROM_READ_DEADLINE;
let mut buf = [0u8; 512];
let mut total = 0;
loop {
if Instant::now() >= deadline {
break;
}
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
total += n;
let preview = &buf[..n];
if contains_subsequence(preview, b"\r\n\r\n") || total >= PROM_REQUEST_CAP {
break;
}
}
Err(e) if e.kind() == ErrorKind::WouldBlock => break,
Err(e) => return Err(e),
}
}
let body = self.render_body();
let response = format!(
"HTTP/1.0 200 OK\r\n\
Content-Type: text/plain; version=0.0.4\r\n\
Content-Length: {len}\r\n\
Connection: close\r\n\
\r\n\
{body}",
len = body.len(),
);
let buf = response.as_bytes();
let mut written = 0;
let write_deadline = Instant::now() + PROM_WRITE_TIMEOUT;
while written < buf.len() {
if Instant::now() >= write_deadline {
break;
}
match stream.write(&buf[written..]) {
Ok(0) => break,
Ok(n) => written += n,
Err(e) if e.kind() == ErrorKind::WouldBlock => {
std::hint::spin_loop();
continue;
}
Err(e) => return Err(e),
}
}
let _ = stream.shutdown(Shutdown::Both);
Ok(())
}
fn render_body(&self) -> String {
let mut pids: Vec<u32> = self.rows.keys().copied().collect();
pids.sort_unstable();
let mut out = String::with_capacity(256 + pids.len() * 96);
out.push_str("# HELP varta_beats_total Total accepted beats per agent pid.\n");
out.push_str("# TYPE varta_beats_total counter\n");
for pid in &pids {
let row = &self.rows[pid];
let _ = writeln!(
out,
"varta_beats_total{{pid=\"{pid}\"}} {}",
row.beats_total
);
}
out.push_str("# HELP varta_stalls_total Total observer-detected stalls per agent pid.\n");
out.push_str("# TYPE varta_stalls_total counter\n");
for pid in &pids {
let row = &self.rows[pid];
let _ = writeln!(
out,
"varta_stalls_total{{pid=\"{pid}\"}} {}",
row.stalls_total
);
}
out.push_str("# HELP varta_status Last reported status code per agent pid (0=ok,1=degraded,2=critical,3=stall).\n");
out.push_str("# TYPE varta_status gauge\n");
for pid in &pids {
let row = &self.rows[pid];
if let Some(code) = row.last_status {
let _ = writeln!(out, "varta_status{{pid=\"{pid}\"}} {code}");
}
}
if self.evicted_total > 0 {
out.push_str("# HELP varta_tracker_evicted_total Total tracker slots reclaimed from dead agents.\n");
out.push_str("# TYPE varta_tracker_evicted_total counter\n");
let _ = writeln!(out, "varta_tracker_evicted_total {}", self.evicted_total);
}
out.push_str(
"# HELP varta_frame_auth_failures_total Frames rejected due to PID spoofing or authentication failure.\n",
);
out.push_str("# TYPE varta_frame_auth_failures_total counter\n");
let _ = writeln!(
out,
"varta_frame_auth_failures_total {}",
self.auth_failures_total
);
out.push_str("# HELP varta_decode_errors_total Total VLP decode failures by kind.\n");
out.push_str("# TYPE varta_decode_errors_total counter\n");
for (idx, kind) in DECODE_KIND_LABELS.iter().enumerate() {
let _ = writeln!(
out,
"varta_decode_errors_total{{kind=\"{kind}\"}} {}",
self.decode_errors_total[idx]
);
}
out.push_str("# HELP varta_io_errors_total Total socket receive errors.\n");
out.push_str("# TYPE varta_io_errors_total counter\n");
let _ = writeln!(out, "varta_io_errors_total {}", self.io_errors_total);
if self.capacity_exceeded_total > 0 {
out.push_str("# HELP varta_tracker_capacity_exceeded_total Total beats dropped because tracker is full.\n");
out.push_str("# TYPE varta_tracker_capacity_exceeded_total counter\n");
let _ = writeln!(
out,
"varta_tracker_capacity_exceeded_total {}",
self.capacity_exceeded_total
);
}
out
}
}
impl Exporter for PromExporter {
fn record(&mut self, ev: &Event) {
match ev {
Event::Beat {
pid,
status,
observer_ns: _,
..
} => {
let row = self.rows.entry(*pid).or_insert_with(GaugeRow::new);
row.beats_total = row.beats_total.saturating_add(1);
row.last_status = Some(status_code(*status));
}
Event::Stall {
pid,
observer_ns: _,
..
} => {
let row = self.rows.entry(*pid).or_insert_with(GaugeRow::new);
row.stalls_total = row.stalls_total.saturating_add(1);
row.last_status = Some(status_code(Status::Stall));
}
Event::AuthFailure { observer_ns: _, .. } => {
self.auth_failures_total = self.auth_failures_total.saturating_add(1);
}
Event::Decode(err, _) => {
let idx = decode_kind_index(err);
self.decode_errors_total[idx] = self.decode_errors_total[idx].saturating_add(1);
}
Event::Io(_, _) => {
self.io_errors_total = self.io_errors_total.saturating_add(1);
}
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
fn contains_subsequence(haystack: &[u8], needle: &[u8]) -> bool {
if needle.is_empty() || needle.len() > haystack.len() {
return needle.is_empty();
}
haystack.windows(needle.len()).any(|w| w == needle)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn render_body_sorts_pids_numerically() {
let mut prom = PromExporter::bind("127.0.0.1:0".parse().unwrap()).expect("bind");
prom.record(&Event::Beat {
pid: 30,
status: Status::Ok,
nonce: 1,
payload: 0,
observer_ns: 0,
});
prom.record(&Event::Beat {
pid: 2,
status: Status::Ok,
nonce: 1,
payload: 0,
observer_ns: 0,
});
prom.record(&Event::Beat {
pid: 11,
status: Status::Ok,
nonce: 1,
payload: 0,
observer_ns: 0,
});
let body = prom.render_body();
let pos2 = body.find("pid=\"2\"").expect("pid 2");
let pos11 = body.find("pid=\"11\"").expect("pid 11");
let pos30 = body.find("pid=\"30\"").expect("pid 30");
assert!(pos2 < pos11 && pos11 < pos30, "sort order broken:\n{body}");
}
#[test]
fn decode_and_io_events_do_not_create_rows() {
let mut prom = PromExporter::bind("127.0.0.1:0".parse().unwrap()).expect("bind");
prom.record(&Event::Decode(varta_vlp::DecodeError::BadMagic, 0));
prom.record(&Event::Io(io::Error::other("x"), 0));
assert!(prom.rows.is_empty());
}
#[test]
fn decode_errors_emit_kind_label_for_every_variant_even_at_zero() {
let mut prom = PromExporter::bind("127.0.0.1:0".parse().unwrap()).expect("bind");
prom.record(&Event::Decode(DecodeError::BadMagic, 0));
prom.record(&Event::Decode(DecodeError::BadMagic, 0));
prom.record(&Event::Decode(DecodeError::BadStatus(0xff), 0));
let body = prom.render_body();
assert!(
body.contains("varta_decode_errors_total{kind=\"bad_magic\"} 2"),
"missing or wrong bad_magic series:\n{body}"
);
assert!(
body.contains("varta_decode_errors_total{kind=\"bad_version\"} 0"),
"missing zero-valued bad_version series:\n{body}"
);
assert!(
body.contains("varta_decode_errors_total{kind=\"bad_status\"} 1"),
"missing or wrong bad_status series:\n{body}"
);
}
}