1use std::fs::{self, File, OpenOptions};
4use std::io::Write;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use serde::Serialize;
9
10use crate::metrics::{MetricEvent, Metrics, WindowMetrics};
11use crate::prometheus::PrometheusEncoder;
12use crate::{Error, ErrorKind, Result};
13
14const JSONL_SCHEMA_VERSION: u32 = 1;
15static TEMP_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19#[non_exhaustive]
20pub enum MetricsFileFormat {
21 Jsonl,
23 Prometheus,
25}
26
27impl MetricsFileFormat {
28 pub fn parse(raw: &str) -> Option<Self> {
30 Self::parse_trimmed_bytes(raw.trim().as_bytes())
31 }
32
33 fn parse_trimmed_bytes(raw: &[u8]) -> Option<Self> {
34 match raw {
35 b"jsonl" => Some(Self::Jsonl),
36 b"prometheus" => Some(Self::Prometheus),
37 _ => None,
38 }
39 }
40}
41
42#[derive(Debug, Clone)]
47pub struct MetricsFileSink {
48 path: PathBuf,
49 format: MetricsFileFormat,
50 encoder: PrometheusEncoder,
51}
52
53impl MetricsFileSink {
54 pub fn new(path: impl Into<PathBuf>, format: MetricsFileFormat) -> Result<Self> {
56 Self::with_prefix(path, format, PrometheusEncoder::DEFAULT_PREFIX)
57 }
58
59 pub fn with_prefix(
61 path: impl Into<PathBuf>,
62 format: MetricsFileFormat,
63 metric_prefix: impl Into<String>,
64 ) -> Result<Self> {
65 Self::with_prefix_and_labels(
66 path,
67 format,
68 metric_prefix,
69 std::iter::empty::<(String, String)>(),
70 )
71 }
72
73 pub fn with_prefix_and_labels<I, K, V>(
78 path: impl Into<PathBuf>,
79 format: MetricsFileFormat,
80 metric_prefix: impl Into<String>,
81 labels: I,
82 ) -> Result<Self>
83 where
84 I: IntoIterator<Item = (K, V)>,
85 K: Into<String>,
86 V: Into<String>,
87 {
88 let sink = Self {
89 path: path.into(),
90 format,
91 encoder: PrometheusEncoder::with_labels(metric_prefix, labels)?,
92 };
93 sink.create_empty_file()?;
94 Ok(sink)
95 }
96
97 pub fn path(&self) -> &Path {
99 &self.path
100 }
101
102 pub fn format(&self) -> MetricsFileFormat {
104 self.format
105 }
106
107 pub fn write_event(&self, event: &MetricEvent) -> Result<()> {
109 match event {
110 MetricEvent::Interval(metrics) => self.write_interval(metrics),
111 MetricEvent::Window(metrics) => self.write_window(metrics),
112 }
113 }
114
115 pub fn write_interval(&self, metrics: &Metrics) -> Result<()> {
117 match self.format {
118 MetricsFileFormat::Jsonl => self.append_jsonl("interval", metrics),
119 MetricsFileFormat::Prometheus => self.write_prometheus(metrics),
120 }
121 }
122
123 pub fn write_window(&self, metrics: &WindowMetrics) -> Result<()> {
125 match self.format {
126 MetricsFileFormat::Jsonl => self.append_jsonl("window", metrics),
127 MetricsFileFormat::Prometheus => self.write_window_prometheus(metrics),
128 }
129 }
130
131 fn create_empty_file(&self) -> Result<()> {
132 File::create(&self.path)
133 .map(|_| ())
134 .map_err(|err| file_error("failed to create metrics file", &self.path, err))
135 }
136
137 fn append_jsonl<T>(&self, event: &'static str, metrics: &T) -> Result<()>
138 where
139 T: Serialize,
140 {
141 let mut file = OpenOptions::new()
142 .append(true)
143 .open(&self.path)
144 .map_err(|err| file_error("failed to open metrics file", &self.path, err))?;
145 serde_json::to_writer(
146 &mut file,
147 &JsonlEvent {
148 schema_version: JSONL_SCHEMA_VERSION,
149 event,
150 metrics,
151 },
152 )
153 .map_err(|err| {
154 Error::with_source(ErrorKind::MetricsFile, "failed to encode metrics JSON", err)
155 })?;
156 file.write_all(b"\n")
157 .map_err(|err| file_error("failed to write metrics file", &self.path, err))
158 }
159
160 fn write_prometheus(&self, metrics: &Metrics) -> Result<()> {
161 atomic_write(&self.path, self.encoder.encode_interval(metrics).as_bytes())
162 }
163
164 fn write_window_prometheus(&self, metrics: &WindowMetrics) -> Result<()> {
165 atomic_write(&self.path, self.encoder.encode_window(metrics).as_bytes())
166 }
167}
168
169#[derive(Serialize)]
170struct JsonlEvent<'a, T> {
171 schema_version: u32,
172 event: &'static str,
173 #[serde(flatten)]
174 metrics: &'a T,
175}
176
177fn file_error(
178 message: &'static str,
179 path: &Path,
180 source: impl std::error::Error + Send + Sync + 'static,
181) -> Error {
182 Error::with_source(
183 ErrorKind::MetricsFile,
184 format!("{message}: {}", path.display()),
185 source,
186 )
187}
188
189fn atomic_write(path: &Path, contents: &[u8]) -> Result<()> {
190 let temp_path = temp_path_for(path);
191 let result = write_temp_then_rename(&temp_path, path, contents);
192 if result.is_err() {
193 let _ = fs::remove_file(&temp_path);
194 }
195 result.map_err(|err| file_error("failed to write metrics file", path, err))
196}
197
198fn write_temp_then_rename(temp_path: &Path, path: &Path, contents: &[u8]) -> std::io::Result<()> {
199 let mut file = OpenOptions::new()
200 .write(true)
201 .create_new(true)
202 .open(temp_path)?;
203 file.write_all(contents)?;
204 file.flush()?;
205 drop(file);
206 fs::rename(temp_path, path)
207}
208
209fn temp_path_for(path: &Path) -> PathBuf {
210 let parent = path
211 .parent()
212 .filter(|parent| !parent.as_os_str().is_empty())
213 .unwrap_or_else(|| Path::new("."));
214 let file_name = path
215 .file_name()
216 .map(|name| name.to_string_lossy())
217 .unwrap_or_else(|| "metrics".into());
218 let counter = TEMP_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
219 parent.join(format!(".{file_name}.tmp-{}-{counter}", std::process::id()))
220}
221
222#[cfg(kani)]
223mod verification {
224 use super::*;
225
226 #[kani::proof]
227 #[kani::unwind(12)]
228 fn metrics_format_parser_matches_documented_values_for_bounded_bytes() {
229 let len: usize = kani::any();
230 kani::assume(len <= 10);
231 let bytes: [u8; 10] = kani::any();
232 let raw = &bytes[..len];
233
234 let expected = match raw {
235 b"jsonl" => Some(MetricsFileFormat::Jsonl),
236 b"prometheus" => Some(MetricsFileFormat::Prometheus),
237 _ => None,
238 };
239
240 assert_eq!(MetricsFileFormat::parse_trimmed_bytes(raw), expected);
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use std::sync::atomic::{AtomicUsize, Ordering};
247 use std::time::{SystemTime, UNIX_EPOCH};
248
249 use super::*;
250
251 static TEMP_COUNTER: AtomicUsize = AtomicUsize::new(0);
252
253 #[test]
254 fn jsonl_format_appends_interval_events() {
255 let path = temp_path("jsonl");
256 let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
257
258 sink.write_interval(&sample_metrics(1.0)).unwrap();
259 sink.write_interval(&sample_metrics(2.0)).unwrap();
260
261 let contents = fs::read_to_string(&path).unwrap();
262 let lines = contents.lines().collect::<Vec<_>>();
263 assert_eq!(lines.len(), 2);
264 assert!(lines[0].contains(r#""schema_version":1"#));
265 assert!(lines[0].contains(r#""event":"interval""#));
266 assert!(lines[0].contains(r#""transferred_bytes":1.0"#));
267 assert!(lines[1].contains(r#""transferred_bytes":2.0"#));
268 let _ = fs::remove_file(path);
269 }
270
271 #[test]
272 fn prometheus_format_replaces_latest_snapshot() {
273 let path = temp_path("prom");
274 let sink = MetricsFileSink::with_prefix_and_labels(
275 &path,
276 MetricsFileFormat::Prometheus,
277 "nettest",
278 [("site", "ci")],
279 )
280 .unwrap();
281
282 sink.write_interval(&sample_metrics(1.0)).unwrap();
283 sink.write_interval(&sample_metrics(2.0)).unwrap();
284
285 let contents = fs::read_to_string(&path).unwrap();
286 assert!(contents.contains("nettest_transferred_bytes{site=\"ci\"} 2\n"));
287 assert!(!contents.contains("nettest_transferred_bytes{site=\"ci\"} 1\n"));
288 assert_no_temp_files_for(&path);
289 let _ = fs::remove_file(path);
290 }
291
292 #[test]
293 fn write_event_supports_window_jsonl() {
294 let path = temp_path("jsonl");
295 let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
296
297 sink.write_event(&MetricEvent::Window(WindowMetrics {
298 timestamp_unix_seconds: 123.0,
299 role: crate::Role::Client,
300 direction: crate::MetricDirection::Sender,
301 stream_count: 2,
302 protocol: crate::TransportProtocol::Tcp,
303 duration_seconds: 2.0,
304 transferred_bytes: 64.0,
305 ..WindowMetrics::default()
306 }))
307 .unwrap();
308
309 let contents = fs::read_to_string(&path).unwrap();
310 assert!(contents.contains(r#""schema_version":1"#));
311 assert!(contents.contains(r#""event":"window""#));
312 assert!(contents.contains(r#""timestamp_unix_seconds":123.0"#));
313 assert!(contents.contains(r#""role":"Client""#));
314 assert!(contents.contains(r#""direction":"Sender""#));
315 assert!(contents.contains(r#""stream_count":2"#));
316 assert!(contents.contains(r#""protocol":"Tcp""#));
317 assert!(contents.contains(r#""duration_seconds":2.0"#));
318 assert!(contents.contains(r#""transferred_bytes":64.0"#));
319 let _ = fs::remove_file(path);
320 }
321
322 #[test]
323 fn write_interval_reports_file_errors() {
324 let path = temp_path("jsonl");
325 let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
326 fs::remove_file(&path).unwrap();
327 fs::create_dir(&path).unwrap();
328
329 let err = sink.write_interval(&sample_metrics(1.0)).unwrap_err();
330
331 assert_eq!(err.kind(), ErrorKind::MetricsFile);
332 assert!(
333 err.to_string().contains("failed to open metrics file"),
334 "{err:#}"
335 );
336 let _ = fs::remove_dir(path);
337 }
338
339 fn sample_metrics(transferred_bytes: f64) -> Metrics {
340 Metrics {
341 transferred_bytes,
342 bandwidth_bits_per_second: transferred_bytes * 8.0,
343 interval_duration_seconds: 1.0,
344 ..Metrics::default()
345 }
346 }
347
348 fn temp_path(extension: &str) -> PathBuf {
349 let nonce = SystemTime::now()
350 .duration_since(UNIX_EPOCH)
351 .unwrap()
352 .as_nanos();
353 let counter = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
354 std::env::temp_dir().join(format!(
355 "iperf3-rs-metrics-file-{}-{nonce}-{counter}.{extension}",
356 std::process::id()
357 ))
358 }
359
360 fn assert_no_temp_files_for(path: &Path) {
361 let parent = path.parent().unwrap();
362 let file_name = path.file_name().unwrap().to_string_lossy();
363 let prefix = format!(".{file_name}.tmp-");
364 let leftovers = fs::read_dir(parent)
365 .unwrap()
366 .filter_map(|entry| entry.ok())
367 .filter(|entry| entry.file_name().to_string_lossy().starts_with(&prefix))
368 .collect::<Vec<_>>();
369 assert!(
370 leftovers.is_empty(),
371 "Prometheus atomic writes should not leave temp files"
372 );
373 }
374}