Skip to main content

iperf3_rs/
metrics_file.rs

1//! File-backed metrics output.
2
3use std::fs::{self, File, OpenOptions};
4use std::io::Write;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use serde::Serialize;
9
10use crate::metrics::{MetricEvent, Metrics, WindowMetrics};
11use crate::prometheus::PrometheusEncoder;
12use crate::{Error, ErrorKind, Result};
13
14const JSONL_SCHEMA_VERSION: u32 = 1;
15static TEMP_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
16
17/// File output format for metrics snapshots.
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19#[non_exhaustive]
20pub enum MetricsFileFormat {
21    /// Append one JSON object per interval.
22    Jsonl,
23    /// Replace the file with the latest Prometheus text exposition snapshot.
24    Prometheus,
25}
26
27impl MetricsFileFormat {
28    /// Parse a CLI-compatible metrics file format name.
29    pub fn parse(raw: &str) -> Option<Self> {
30        Self::parse_trimmed_bytes(raw.trim().as_bytes())
31    }
32
33    fn parse_trimmed_bytes(raw: &[u8]) -> Option<Self> {
34        match raw {
35            b"jsonl" => Some(Self::Jsonl),
36            b"prometheus" => Some(Self::Prometheus),
37            _ => None,
38        }
39    }
40}
41
42/// Writer for one metrics output file.
43///
44/// JSONL output appends one object per event. Prometheus output atomically
45/// replaces the file with the latest encoded snapshot on each write.
46#[derive(Debug, Clone)]
47pub struct MetricsFileSink {
48    path: PathBuf,
49    format: MetricsFileFormat,
50    encoder: PrometheusEncoder,
51}
52
53impl MetricsFileSink {
54    /// Create a sink with the default Prometheus metric prefix.
55    pub fn new(path: impl Into<PathBuf>, format: MetricsFileFormat) -> Result<Self> {
56        Self::with_prefix(path, format, PrometheusEncoder::DEFAULT_PREFIX)
57    }
58
59    /// Create a sink with a custom Prometheus metric prefix.
60    pub fn with_prefix(
61        path: impl Into<PathBuf>,
62        format: MetricsFileFormat,
63        metric_prefix: impl Into<String>,
64    ) -> Result<Self> {
65        Self::with_prefix_and_labels(
66            path,
67            format,
68            metric_prefix,
69            std::iter::empty::<(String, String)>(),
70        )
71    }
72
73    /// Create a sink with a custom Prometheus metric prefix and labels.
74    ///
75    /// Labels are applied only to Prometheus text exposition output. JSONL
76    /// output carries structured metric fields and its own schema version.
77    pub fn with_prefix_and_labels<I, K, V>(
78        path: impl Into<PathBuf>,
79        format: MetricsFileFormat,
80        metric_prefix: impl Into<String>,
81        labels: I,
82    ) -> Result<Self>
83    where
84        I: IntoIterator<Item = (K, V)>,
85        K: Into<String>,
86        V: Into<String>,
87    {
88        let sink = Self {
89            path: path.into(),
90            format,
91            encoder: PrometheusEncoder::with_labels(metric_prefix, labels)?,
92        };
93        sink.create_empty_file()?;
94        Ok(sink)
95    }
96
97    /// Return the output path this sink writes.
98    pub fn path(&self) -> &Path {
99        &self.path
100    }
101
102    /// Return the configured output format.
103    pub fn format(&self) -> MetricsFileFormat {
104        self.format
105    }
106
107    /// Write one metrics stream event.
108    pub fn write_event(&self, event: &MetricEvent) -> Result<()> {
109        match event {
110            MetricEvent::Interval(metrics) => self.write_interval(metrics),
111            MetricEvent::Window(metrics) => self.write_window(metrics),
112        }
113    }
114
115    /// Write one immediate interval sample.
116    pub fn write_interval(&self, metrics: &Metrics) -> Result<()> {
117        match self.format {
118            MetricsFileFormat::Jsonl => self.append_jsonl("interval", metrics),
119            MetricsFileFormat::Prometheus => self.write_prometheus(metrics),
120        }
121    }
122
123    /// Write one aggregated window summary.
124    pub fn write_window(&self, metrics: &WindowMetrics) -> Result<()> {
125        match self.format {
126            MetricsFileFormat::Jsonl => self.append_jsonl("window", metrics),
127            MetricsFileFormat::Prometheus => self.write_window_prometheus(metrics),
128        }
129    }
130
131    fn create_empty_file(&self) -> Result<()> {
132        File::create(&self.path)
133            .map(|_| ())
134            .map_err(|err| file_error("failed to create metrics file", &self.path, err))
135    }
136
137    fn append_jsonl<T>(&self, event: &'static str, metrics: &T) -> Result<()>
138    where
139        T: Serialize,
140    {
141        let mut file = OpenOptions::new()
142            .append(true)
143            .open(&self.path)
144            .map_err(|err| file_error("failed to open metrics file", &self.path, err))?;
145        serde_json::to_writer(
146            &mut file,
147            &JsonlEvent {
148                schema_version: JSONL_SCHEMA_VERSION,
149                event,
150                metrics,
151            },
152        )
153        .map_err(|err| {
154            Error::with_source(ErrorKind::MetricsFile, "failed to encode metrics JSON", err)
155        })?;
156        file.write_all(b"\n")
157            .map_err(|err| file_error("failed to write metrics file", &self.path, err))
158    }
159
160    fn write_prometheus(&self, metrics: &Metrics) -> Result<()> {
161        atomic_write(&self.path, self.encoder.encode_interval(metrics).as_bytes())
162    }
163
164    fn write_window_prometheus(&self, metrics: &WindowMetrics) -> Result<()> {
165        atomic_write(&self.path, self.encoder.encode_window(metrics).as_bytes())
166    }
167}
168
169#[derive(Serialize)]
170struct JsonlEvent<'a, T> {
171    schema_version: u32,
172    event: &'static str,
173    #[serde(flatten)]
174    metrics: &'a T,
175}
176
177fn file_error(
178    message: &'static str,
179    path: &Path,
180    source: impl std::error::Error + Send + Sync + 'static,
181) -> Error {
182    Error::with_source(
183        ErrorKind::MetricsFile,
184        format!("{message}: {}", path.display()),
185        source,
186    )
187}
188
189fn atomic_write(path: &Path, contents: &[u8]) -> Result<()> {
190    let temp_path = temp_path_for(path);
191    let result = write_temp_then_rename(&temp_path, path, contents);
192    if result.is_err() {
193        let _ = fs::remove_file(&temp_path);
194    }
195    result.map_err(|err| file_error("failed to write metrics file", path, err))
196}
197
198fn write_temp_then_rename(temp_path: &Path, path: &Path, contents: &[u8]) -> std::io::Result<()> {
199    let mut file = OpenOptions::new()
200        .write(true)
201        .create_new(true)
202        .open(temp_path)?;
203    file.write_all(contents)?;
204    file.flush()?;
205    drop(file);
206    fs::rename(temp_path, path)
207}
208
209fn temp_path_for(path: &Path) -> PathBuf {
210    let parent = path
211        .parent()
212        .filter(|parent| !parent.as_os_str().is_empty())
213        .unwrap_or_else(|| Path::new("."));
214    let file_name = path
215        .file_name()
216        .map(|name| name.to_string_lossy())
217        .unwrap_or_else(|| "metrics".into());
218    let counter = TEMP_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
219    parent.join(format!(".{file_name}.tmp-{}-{counter}", std::process::id()))
220}
221
222#[cfg(kani)]
223mod verification {
224    use super::*;
225
226    #[kani::proof]
227    #[kani::unwind(12)]
228    fn metrics_format_parser_matches_documented_values_for_bounded_bytes() {
229        let len: usize = kani::any();
230        kani::assume(len <= 10);
231        let bytes: [u8; 10] = kani::any();
232        let raw = &bytes[..len];
233
234        let expected = match raw {
235            b"jsonl" => Some(MetricsFileFormat::Jsonl),
236            b"prometheus" => Some(MetricsFileFormat::Prometheus),
237            _ => None,
238        };
239
240        assert_eq!(MetricsFileFormat::parse_trimmed_bytes(raw), expected);
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use std::sync::atomic::{AtomicUsize, Ordering};
247    use std::time::{SystemTime, UNIX_EPOCH};
248
249    use super::*;
250
251    static TEMP_COUNTER: AtomicUsize = AtomicUsize::new(0);
252
253    #[test]
254    fn jsonl_format_appends_interval_events() {
255        let path = temp_path("jsonl");
256        let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
257
258        sink.write_interval(&sample_metrics(1.0)).unwrap();
259        sink.write_interval(&sample_metrics(2.0)).unwrap();
260
261        let contents = fs::read_to_string(&path).unwrap();
262        let lines = contents.lines().collect::<Vec<_>>();
263        assert_eq!(lines.len(), 2);
264        assert!(lines[0].contains(r#""schema_version":1"#));
265        assert!(lines[0].contains(r#""event":"interval""#));
266        assert!(lines[0].contains(r#""transferred_bytes":1.0"#));
267        assert!(lines[1].contains(r#""transferred_bytes":2.0"#));
268        let _ = fs::remove_file(path);
269    }
270
271    #[test]
272    fn prometheus_format_replaces_latest_snapshot() {
273        let path = temp_path("prom");
274        let sink = MetricsFileSink::with_prefix_and_labels(
275            &path,
276            MetricsFileFormat::Prometheus,
277            "nettest",
278            [("site", "ci")],
279        )
280        .unwrap();
281
282        sink.write_interval(&sample_metrics(1.0)).unwrap();
283        sink.write_interval(&sample_metrics(2.0)).unwrap();
284
285        let contents = fs::read_to_string(&path).unwrap();
286        assert!(contents.contains("nettest_transferred_bytes{site=\"ci\"} 2\n"));
287        assert!(!contents.contains("nettest_transferred_bytes{site=\"ci\"} 1\n"));
288        assert_no_temp_files_for(&path);
289        let _ = fs::remove_file(path);
290    }
291
292    #[test]
293    fn write_event_supports_window_jsonl() {
294        let path = temp_path("jsonl");
295        let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
296
297        sink.write_event(&MetricEvent::Window(WindowMetrics {
298            timestamp_unix_seconds: 123.0,
299            role: crate::Role::Client,
300            direction: crate::MetricDirection::Sender,
301            stream_count: 2,
302            protocol: crate::TransportProtocol::Tcp,
303            duration_seconds: 2.0,
304            transferred_bytes: 64.0,
305            ..WindowMetrics::default()
306        }))
307        .unwrap();
308
309        let contents = fs::read_to_string(&path).unwrap();
310        assert!(contents.contains(r#""schema_version":1"#));
311        assert!(contents.contains(r#""event":"window""#));
312        assert!(contents.contains(r#""timestamp_unix_seconds":123.0"#));
313        assert!(contents.contains(r#""role":"Client""#));
314        assert!(contents.contains(r#""direction":"Sender""#));
315        assert!(contents.contains(r#""stream_count":2"#));
316        assert!(contents.contains(r#""protocol":"Tcp""#));
317        assert!(contents.contains(r#""duration_seconds":2.0"#));
318        assert!(contents.contains(r#""transferred_bytes":64.0"#));
319        let _ = fs::remove_file(path);
320    }
321
322    #[test]
323    fn write_interval_reports_file_errors() {
324        let path = temp_path("jsonl");
325        let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
326        fs::remove_file(&path).unwrap();
327        fs::create_dir(&path).unwrap();
328
329        let err = sink.write_interval(&sample_metrics(1.0)).unwrap_err();
330
331        assert_eq!(err.kind(), ErrorKind::MetricsFile);
332        assert!(
333            err.to_string().contains("failed to open metrics file"),
334            "{err:#}"
335        );
336        let _ = fs::remove_dir(path);
337    }
338
339    fn sample_metrics(transferred_bytes: f64) -> Metrics {
340        Metrics {
341            transferred_bytes,
342            bandwidth_bits_per_second: transferred_bytes * 8.0,
343            interval_duration_seconds: 1.0,
344            ..Metrics::default()
345        }
346    }
347
348    fn temp_path(extension: &str) -> PathBuf {
349        let nonce = SystemTime::now()
350            .duration_since(UNIX_EPOCH)
351            .unwrap()
352            .as_nanos();
353        let counter = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
354        std::env::temp_dir().join(format!(
355            "iperf3-rs-metrics-file-{}-{nonce}-{counter}.{extension}",
356            std::process::id()
357        ))
358    }
359
360    fn assert_no_temp_files_for(path: &Path) {
361        let parent = path.parent().unwrap();
362        let file_name = path.file_name().unwrap().to_string_lossy();
363        let prefix = format!(".{file_name}.tmp-");
364        let leftovers = fs::read_dir(parent)
365            .unwrap()
366            .filter_map(|entry| entry.ok())
367            .filter(|entry| entry.file_name().to_string_lossy().starts_with(&prefix))
368            .collect::<Vec<_>>();
369        assert!(
370            leftovers.is_empty(),
371            "Prometheus atomic writes should not leave temp files"
372        );
373    }
374}