use std::fs::{self, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use serde::Serialize;
use crate::metrics::{MetricEvent, Metrics, WindowMetrics};
use crate::prometheus::PrometheusEncoder;
use crate::{Error, ErrorKind, Result};
const JSONL_SCHEMA_VERSION: u32 = 1;
static TEMP_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum MetricsFileFormat {
Jsonl,
Prometheus,
}
impl MetricsFileFormat {
pub fn parse(raw: &str) -> Option<Self> {
Self::parse_trimmed_bytes(raw.trim().as_bytes())
}
fn parse_trimmed_bytes(raw: &[u8]) -> Option<Self> {
match raw {
b"jsonl" => Some(Self::Jsonl),
b"prometheus" => Some(Self::Prometheus),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct MetricsFileSink {
path: PathBuf,
format: MetricsFileFormat,
encoder: PrometheusEncoder,
}
impl MetricsFileSink {
pub fn new(path: impl Into<PathBuf>, format: MetricsFileFormat) -> Result<Self> {
Self::with_prefix(path, format, PrometheusEncoder::DEFAULT_PREFIX)
}
pub fn with_prefix(
path: impl Into<PathBuf>,
format: MetricsFileFormat,
metric_prefix: impl Into<String>,
) -> Result<Self> {
Self::with_prefix_and_labels(
path,
format,
metric_prefix,
std::iter::empty::<(String, String)>(),
)
}
pub fn with_prefix_and_labels<I, K, V>(
path: impl Into<PathBuf>,
format: MetricsFileFormat,
metric_prefix: impl Into<String>,
labels: I,
) -> Result<Self>
where
I: IntoIterator<Item = (K, V)>,
K: Into<String>,
V: Into<String>,
{
let sink = Self {
path: path.into(),
format,
encoder: PrometheusEncoder::with_labels(metric_prefix, labels)?,
};
sink.create_empty_file()?;
Ok(sink)
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn format(&self) -> MetricsFileFormat {
self.format
}
pub fn write_event(&self, event: &MetricEvent) -> Result<()> {
match event {
MetricEvent::Interval(metrics) => self.write_interval(metrics),
MetricEvent::Window(metrics) => self.write_window(metrics),
}
}
pub fn write_interval(&self, metrics: &Metrics) -> Result<()> {
match self.format {
MetricsFileFormat::Jsonl => self.append_jsonl("interval", metrics),
MetricsFileFormat::Prometheus => self.write_prometheus(metrics),
}
}
pub fn write_window(&self, metrics: &WindowMetrics) -> Result<()> {
match self.format {
MetricsFileFormat::Jsonl => self.append_jsonl("window", metrics),
MetricsFileFormat::Prometheus => self.write_window_prometheus(metrics),
}
}
fn create_empty_file(&self) -> Result<()> {
File::create(&self.path)
.map(|_| ())
.map_err(|err| file_error("failed to create metrics file", &self.path, err))
}
fn append_jsonl<T>(&self, event: &'static str, metrics: &T) -> Result<()>
where
T: Serialize,
{
let mut file = OpenOptions::new()
.append(true)
.open(&self.path)
.map_err(|err| file_error("failed to open metrics file", &self.path, err))?;
serde_json::to_writer(
&mut file,
&JsonlEvent {
schema_version: JSONL_SCHEMA_VERSION,
event,
metrics,
},
)
.map_err(|err| {
Error::with_source(ErrorKind::MetricsFile, "failed to encode metrics JSON", err)
})?;
file.write_all(b"\n")
.map_err(|err| file_error("failed to write metrics file", &self.path, err))
}
fn write_prometheus(&self, metrics: &Metrics) -> Result<()> {
atomic_write(&self.path, self.encoder.encode_interval(metrics).as_bytes())
}
fn write_window_prometheus(&self, metrics: &WindowMetrics) -> Result<()> {
atomic_write(&self.path, self.encoder.encode_window(metrics).as_bytes())
}
}
#[derive(Serialize)]
struct JsonlEvent<'a, T> {
schema_version: u32,
event: &'static str,
#[serde(flatten)]
metrics: &'a T,
}
fn file_error(
message: &'static str,
path: &Path,
source: impl std::error::Error + Send + Sync + 'static,
) -> Error {
Error::with_source(
ErrorKind::MetricsFile,
format!("{message}: {}", path.display()),
source,
)
}
fn atomic_write(path: &Path, contents: &[u8]) -> Result<()> {
let temp_path = temp_path_for(path);
let result = write_temp_then_rename(&temp_path, path, contents);
if result.is_err() {
let _ = fs::remove_file(&temp_path);
}
result.map_err(|err| file_error("failed to write metrics file", path, err))
}
fn write_temp_then_rename(temp_path: &Path, path: &Path, contents: &[u8]) -> std::io::Result<()> {
let mut file = OpenOptions::new()
.write(true)
.create_new(true)
.open(temp_path)?;
file.write_all(contents)?;
file.flush()?;
drop(file);
fs::rename(temp_path, path)
}
fn temp_path_for(path: &Path) -> PathBuf {
let parent = path
.parent()
.filter(|parent| !parent.as_os_str().is_empty())
.unwrap_or_else(|| Path::new("."));
let file_name = path
.file_name()
.map(|name| name.to_string_lossy())
.unwrap_or_else(|| "metrics".into());
let counter = TEMP_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
parent.join(format!(".{file_name}.tmp-{}-{counter}", std::process::id()))
}
#[cfg(kani)]
mod verification {
use super::*;
#[kani::proof]
#[kani::unwind(12)]
fn metrics_format_parser_matches_documented_values_for_bounded_bytes() {
let len: usize = kani::any();
kani::assume(len <= 10);
let bytes: [u8; 10] = kani::any();
let raw = &bytes[..len];
let expected = match raw {
b"jsonl" => Some(MetricsFileFormat::Jsonl),
b"prometheus" => Some(MetricsFileFormat::Prometheus),
_ => None,
};
assert_eq!(MetricsFileFormat::parse_trimmed_bytes(raw), expected);
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use super::*;
static TEMP_COUNTER: AtomicUsize = AtomicUsize::new(0);
#[test]
fn jsonl_format_appends_interval_events() {
let path = temp_path("jsonl");
let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
sink.write_interval(&sample_metrics(1.0)).unwrap();
sink.write_interval(&sample_metrics(2.0)).unwrap();
let contents = fs::read_to_string(&path).unwrap();
let lines = contents.lines().collect::<Vec<_>>();
assert_eq!(lines.len(), 2);
assert!(lines[0].contains(r#""schema_version":1"#));
assert!(lines[0].contains(r#""event":"interval""#));
assert!(lines[0].contains(r#""transferred_bytes":1.0"#));
assert!(lines[1].contains(r#""transferred_bytes":2.0"#));
let _ = fs::remove_file(path);
}
#[test]
fn prometheus_format_replaces_latest_snapshot() {
let path = temp_path("prom");
let sink = MetricsFileSink::with_prefix_and_labels(
&path,
MetricsFileFormat::Prometheus,
"nettest",
[("site", "ci")],
)
.unwrap();
sink.write_interval(&sample_metrics(1.0)).unwrap();
sink.write_interval(&sample_metrics(2.0)).unwrap();
let contents = fs::read_to_string(&path).unwrap();
assert!(contents.contains("nettest_transferred_bytes{site=\"ci\"} 2\n"));
assert!(!contents.contains("nettest_transferred_bytes{site=\"ci\"} 1\n"));
assert_no_temp_files_for(&path);
let _ = fs::remove_file(path);
}
#[test]
fn write_event_supports_window_jsonl() {
let path = temp_path("jsonl");
let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
sink.write_event(&MetricEvent::Window(WindowMetrics {
timestamp_unix_seconds: 123.0,
role: crate::Role::Client,
direction: crate::MetricDirection::Sender,
stream_count: 2,
protocol: crate::TransportProtocol::Tcp,
duration_seconds: 2.0,
transferred_bytes: 64.0,
..WindowMetrics::default()
}))
.unwrap();
let contents = fs::read_to_string(&path).unwrap();
assert!(contents.contains(r#""schema_version":1"#));
assert!(contents.contains(r#""event":"window""#));
assert!(contents.contains(r#""timestamp_unix_seconds":123.0"#));
assert!(contents.contains(r#""role":"Client""#));
assert!(contents.contains(r#""direction":"Sender""#));
assert!(contents.contains(r#""stream_count":2"#));
assert!(contents.contains(r#""protocol":"Tcp""#));
assert!(contents.contains(r#""duration_seconds":2.0"#));
assert!(contents.contains(r#""transferred_bytes":64.0"#));
let _ = fs::remove_file(path);
}
#[test]
fn write_interval_reports_file_errors() {
let path = temp_path("jsonl");
let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
fs::remove_file(&path).unwrap();
fs::create_dir(&path).unwrap();
let err = sink.write_interval(&sample_metrics(1.0)).unwrap_err();
assert_eq!(err.kind(), ErrorKind::MetricsFile);
assert!(
err.to_string().contains("failed to open metrics file"),
"{err:#}"
);
let _ = fs::remove_dir(path);
}
fn sample_metrics(transferred_bytes: f64) -> Metrics {
Metrics {
transferred_bytes,
bandwidth_bits_per_second: transferred_bytes * 8.0,
interval_duration_seconds: 1.0,
..Metrics::default()
}
}
fn temp_path(extension: &str) -> PathBuf {
let nonce = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let counter = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"iperf3-rs-metrics-file-{}-{nonce}-{counter}.{extension}",
std::process::id()
))
}
fn assert_no_temp_files_for(path: &Path) {
let parent = path.parent().unwrap();
let file_name = path.file_name().unwrap().to_string_lossy();
let prefix = format!(".{file_name}.tmp-");
let leftovers = fs::read_dir(parent)
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.file_name().to_string_lossy().starts_with(&prefix))
.collect::<Vec<_>>();
assert!(
leftovers.is_empty(),
"Prometheus atomic writes should not leave temp files"
);
}
}