use std::fs::{File, OpenOptions};
use std::io::{self, BufWriter, Write};
use std::path::{Path, PathBuf};
use varta_vlp::Status;
use crate::observer::Event;
pub trait Exporter {
fn record(&mut self, ev: &Event) -> io::Result<()>;
fn flush(&mut self) -> io::Result<()>;
}
pub struct FileExporter {
sink: BufWriter<File>,
pending_err: Option<io::Error>,
path: PathBuf,
max_bytes: Option<u64>,
bytes_written: u64,
sync_every: u32,
writes_since_sync: u32,
}
const MAX_ROTATION_GENERATIONS: u32 = 5;
impl FileExporter {
pub fn create(
path: impl AsRef<Path>,
max_bytes: Option<u64>,
sync_every: u32,
) -> io::Result<Self> {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(path.as_ref())?;
let bytes_written = file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(FileExporter {
sink: BufWriter::new(file),
pending_err: None,
path: path.as_ref().to_path_buf(),
max_bytes,
bytes_written,
sync_every,
writes_since_sync: 0,
})
}
fn flush_and_sync(&mut self) -> io::Result<()> {
self.sink.flush()?;
self.sink.get_ref().sync_data()
}
pub fn record_eviction_pid(&mut self, pid: u32, observer_ns: u64) {
let result = writeln!(self.sink, "{observer_ns}\teviction\t{pid}\t-\t-\t-",);
if let Err(e) = result {
self.pending_err = Some(e);
} else {
self.pending_err = None;
let line_len = decimal_digits(observer_ns) as u64
+ 1 + 8 + 1 + decimal_digits(pid as u64) as u64
+ 1 + 1 + 1 + 1 + 1 + 1 + 1; self.after_write(line_len);
}
}
fn after_write(&mut self, line_len: u64) {
if self.sync_every > 0 {
self.writes_since_sync = self.writes_since_sync.saturating_add(1);
if self.writes_since_sync >= self.sync_every {
match self.flush_and_sync() {
Ok(()) => self.writes_since_sync = 0,
Err(e) => {
self.pending_err = Some(e);
}
}
}
}
let Some(max) = self.max_bytes else {
return;
};
self.bytes_written = self.bytes_written.saturating_add(line_len);
if self.bytes_written < max {
return;
}
if let Err(e) = self.sink.flush() {
self.pending_err = Some(e);
return;
}
if let Err(e) = Self::rotate(&self.path) {
self.pending_err = Some(e);
return;
}
match OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
{
Ok(file) => {
self.sink = BufWriter::new(file);
self.bytes_written = 0;
self.writes_since_sync = 0;
}
Err(e) => {
self.pending_err = Some(e);
}
}
}
fn rotate(path: &Path) -> io::Result<()> {
let path_str = path.to_string_lossy();
let oldest = format!("{path_str}.{MAX_ROTATION_GENERATIONS}");
match std::fs::remove_file(&oldest) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
Err(e) => return Err(e),
}
for gen in (1..MAX_ROTATION_GENERATIONS).rev() {
let src = format!("{path_str}.{gen}");
let dst = format!("{path_str}.{}", gen + 1);
match std::fs::rename(&src, &dst) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
Err(e) => return Err(e),
}
}
let first = format!("{path_str}.1");
#[allow(clippy::incompatible_msrv)]
match std::fs::rename(path, &first) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
Err(e) if e.kind() == io::ErrorKind::CrossesDevices => {
std::fs::copy(path, &first)?;
std::fs::remove_file(path)?;
}
Err(e) => return Err(e),
}
Ok(())
}
}
impl Exporter for FileExporter {
fn record(&mut self, ev: &Event) -> io::Result<()> {
let line_len: u64 = match ev {
Event::Beat {
pid,
status,
payload,
nonce,
observer_ns,
origin: _,
pid_ns_inode: _,
} => {
let label = status_label(*status);
decimal_digits(*observer_ns) as u64
+ 1 + 4 + 1 + decimal_digits(*pid as u64) as u64
+ 1 + decimal_digits(*nonce) as u64
+ 1 + label.len() as u64
+ 1 + decimal_digits(*payload as u64) as u64
+ 1 }
Event::Stall {
pid,
last_nonce,
observer_ns,
..
} => {
decimal_digits(*observer_ns) as u64
+ 1 + 5 + 1 + decimal_digits(*pid as u64) as u64
+ 1 + decimal_digits(*last_nonce) as u64
+ 1 + 5 + 1 + 1 + 1
} Event::Decode(_, _)
| Event::Io(_, _)
| Event::CtrlTruncated(_, _)
| Event::OriginConflict { .. }
| Event::NamespaceConflict { .. } => 0,
Event::AuthFailure {
claimed_pid,
observer_ns,
} => {
decimal_digits(*observer_ns) as u64
+ 1 + 8 + 1 + decimal_digits(*claimed_pid as u64) as u64
+ 1 + 1 + 1 + 1 + 1 + 13 + 1
} };
let result = match ev {
Event::Beat {
pid,
status,
payload,
nonce,
observer_ns,
origin: _,
pid_ns_inode: _,
} => writeln!(
self.sink,
"{observer_ns}\tbeat\t{pid}\t{nonce}\t{}\t{payload}",
status_label(*status),
),
Event::Stall {
pid,
last_nonce,
last_ns: _,
observer_ns,
origin: _,
pid_ns_inode: _,
} => writeln!(
self.sink,
"{observer_ns}\tstall\t{pid}\t{last_nonce}\tstall\t-",
),
Event::Decode(err, observer_ns) => {
writeln!(self.sink, "{observer_ns}\tdecode\t-\t-\t-\t{err:?}")
}
Event::Io(err, observer_ns) => {
writeln!(self.sink, "{observer_ns}\tio\t-\t-\t-\t{err}")
}
Event::AuthFailure {
claimed_pid,
observer_ns,
} => {
writeln!(
self.sink,
"{observer_ns}\tmismatch\t{claimed_pid}\t-\t-\tauth_failure",
)
}
Event::OriginConflict {
claimed_pid,
observer_ns,
..
} => {
writeln!(
self.sink,
"{observer_ns}\tmismatch\t{claimed_pid}\t-\t-\torigin_conflict",
)
}
Event::NamespaceConflict {
claimed_pid,
observer_ns,
..
} => {
writeln!(
self.sink,
"{observer_ns}\tmismatch\t{claimed_pid}\t-\t-\tnamespace_conflict",
)
}
Event::CtrlTruncated(err, observer_ns) => {
writeln!(self.sink, "{observer_ns}\tctrunc\t-\t-\t-\t{err}")
}
};
if let Err(ref e) = result {
self.pending_err = Some(io::Error::new(e.kind(), e.to_string()));
}
match result {
Err(e) => Err(e),
Ok(()) => {
self.pending_err = None;
let actual_len = if line_len > 0 {
line_len
} else {
match ev {
Event::Decode(err, observer_ns) => {
format!("{observer_ns}\tdecode\t-\t-\t-\t{err:?}\n").len() as u64
}
Event::Io(err, observer_ns) => {
let msg = err.to_string();
format!("{observer_ns}\tio\t-\t-\t-\t{msg}\n").len() as u64
}
Event::CtrlTruncated(err, observer_ns) => {
let msg = err.to_string();
format!("{observer_ns}\tctrunc\t-\t-\t-\t{msg}\n").len() as u64
}
Event::OriginConflict {
claimed_pid,
observer_ns,
..
} => format!(
"{observer_ns}\tmismatch\t{claimed_pid}\t-\t-\torigin_conflict\n"
)
.len() as u64,
_ => unreachable!(),
}
};
self.after_write(actual_len);
Ok(())
}
}
}
fn flush(&mut self) -> io::Result<()> {
let sink_result = self.sink.flush();
match (self.pending_err.take(), sink_result) {
(Some(e), _) => Err(e),
(None, Err(e)) => Err(e),
(None, Ok(())) => Ok(()),
}
}
}
fn decimal_digits(mut n: u64) -> usize {
if n == 0 {
return 1;
}
let mut digits = 0;
while n > 0 {
n /= 10;
digits += 1;
}
digits
}
fn status_label(s: Status) -> &'static str {
match s {
Status::Ok => "ok",
Status::Degraded => "degraded",
Status::Critical => "critical",
Status::Stall => "stall",
}
}