1use congestion::format::{
14 LogHeader, write_file_header, write_histogram_record, write_progress_record,
15};
16use congestion::{HistogramAccumulator, MetadataOp, Side};
17
18pub type ProgressSource = Box<dyn Fn() -> Vec<u8> + Send + Sync>;
24
25pub struct LoggerUnit {
28 pub label: &'static str,
29 pub side: Side,
30 pub op: MetadataOp,
31 pub accumulator: std::sync::Arc<std::sync::Mutex<HistogramAccumulator>>,
32 pub snapshot_tx: tokio::sync::watch::Sender<hdrhistogram::Histogram<u64>>,
33}
34
35pub struct LoggerConfig {
37 pub interval: std::time::Duration,
38 pub log_path: Option<std::path::PathBuf>,
39 pub header: LogHeader,
40 pub progress_source: Option<ProgressSource>,
46}
47
48pub async fn run_logger(
51 config: LoggerConfig,
52 units: Vec<LoggerUnit>,
53 mut cancel: tokio::sync::watch::Receiver<bool>,
54) {
55 let mut writer: Option<std::io::BufWriter<std::fs::File>> = match &config.log_path {
56 Some(path) => {
57 let mut open_options = std::fs::OpenOptions::new();
58 open_options.create(true).write(true).truncate(true);
59 #[cfg(unix)]
60 {
61 use std::os::unix::fs::OpenOptionsExt;
62 open_options.custom_flags(libc::O_NOFOLLOW);
63 }
64 match open_options.open(path) {
65 Ok(f) => {
66 let mut w = std::io::BufWriter::new(f);
67 if let Err(err) = write_file_header(&mut w, &config.header) {
68 tracing::warn!(
69 "histogram-logger: failed to write file header: {err:#}; \
70 disabling file output"
71 );
72 None
73 } else {
74 Some(w)
75 }
76 }
77 Err(err) => {
78 tracing::warn!(
79 "histogram-logger: failed to open {path:?}: {err:#}; \
80 disabling file output"
81 );
82 None
83 }
84 }
85 }
86 None => None,
87 };
88 let progress_source = config.progress_source;
89 let mut interval = tokio::time::interval(config.interval);
90 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
91 interval.tick().await;
92 loop {
93 tokio::select! {
94 _ = interval.tick() => {
95 writer = snapshot_and_publish_units(&units, progress_source.as_deref(), writer);
96 }
97 _ = cancel.changed() => {
98 if *cancel.borrow() {
99 drop(snapshot_and_publish_units(&units, progress_source.as_deref(), writer));
102 break;
103 }
104 }
105 }
106 }
107 tracing::debug!("histogram-logger: exiting");
108}
109
110fn snapshot_and_publish_units(
117 units: &[LoggerUnit],
118 progress_source: Option<&(dyn Fn() -> Vec<u8> + Send + Sync)>,
119 mut writer: Option<std::io::BufWriter<std::fs::File>>,
120) -> Option<std::io::BufWriter<std::fs::File>> {
121 use std::io::Write;
122 for unit in units {
123 let snap = unit
124 .accumulator
125 .lock()
126 .expect("histogram accumulator mutex poisoned")
127 .snapshot_and_reset();
128 let snapshot_micros = unix_micros_now();
135 let _ = unit.snapshot_tx.send(snap.clone());
136 if snap.is_empty() {
137 continue;
138 }
139 if let Some(w) = writer.as_mut()
140 && let Err(err) = write_histogram_record(w, snapshot_micros, unit.side, unit.op, &snap)
141 {
142 tracing::warn!(
143 "histogram-logger: write_histogram_record({label}) failed: {err:#}; \
144 disabling file output",
145 label = unit.label,
146 );
147 writer = None;
148 break;
149 }
150 }
151 if let Some(src) = progress_source
159 && let Some(w) = writer.as_mut()
160 {
161 let json = src();
162 let ts = unix_micros_now();
163 if !json.is_empty()
164 && let Err(err) = write_progress_record(w, ts, &json)
165 {
166 tracing::warn!(
167 "histogram-logger: write_progress_record failed: {err:#}; \
168 disabling file output",
169 );
170 writer = None;
171 }
172 }
173 if let Some(w) = writer.as_mut()
174 && let Err(err) = w.flush()
175 {
176 tracing::warn!("histogram-logger: flush failed: {err:#}; disabling file output",);
177 writer = None;
178 }
179 writer
180}
181
182fn unix_micros_now() -> u64 {
183 std::time::SystemTime::now()
184 .duration_since(std::time::UNIX_EPOCH)
185 .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
186 .unwrap_or(0)
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 use congestion::format::{
193 AutoMetaSnapshot, FORMAT_VERSION, HdrSnapshot, LogHeader, Record, UnitLabel,
194 read_file_header, read_record,
195 };
196
197 fn header() -> LogHeader {
198 LogHeader {
199 format_version: FORMAT_VERSION,
200 tool: "test".into(),
201 tool_version: "0.0.0".into(),
202 hostname: "h".into(),
203 pid: 0,
204 start_unix_micros: 0,
205 snapshot_interval_micros: 100_000,
206 auto_meta: AutoMetaSnapshot {
207 initial_cwnd: 1,
208 min_cwnd: 1,
209 max_cwnd: 4096,
210 alpha: 1.3,
211 beta: 1.8,
212 increase_step: 1,
213 decrease_step: 1,
214 baseline_percentile: 0.1,
215 current_percentile: 0.5,
216 long_window_micros: 10_000_000,
217 short_window_micros: 1_000_000,
218 tick_interval_micros: 50_000,
219 },
220 hdr: HdrSnapshot {
221 lowest_discernible_micros: 1,
222 highest_trackable_micros: 3_600_000_000,
223 significant_figures: 3,
224 unit: "microseconds".into(),
225 },
226 unit_labels: vec![UnitLabel {
227 side: 0,
228 op: 0,
229 label: "src-stat".into(),
230 }],
231 }
232 }
233
234 #[tokio::test]
235 async fn writes_records_to_file_for_non_empty_snapshots() {
236 let dir = tempfile::tempdir().unwrap();
237 let path = dir.path().join("test.hdr");
238 let acc = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
239 let (snap_tx, _snap_rx) = tokio::sync::watch::channel(
240 hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
241 );
242 let units = vec![LoggerUnit {
243 label: "src-stat",
244 side: Side::Source,
245 op: MetadataOp::Stat,
246 accumulator: acc.clone(),
247 snapshot_tx: snap_tx,
248 }];
249 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
250 acc.lock()
252 .unwrap()
253 .record(std::time::Duration::from_micros(100));
254 acc.lock()
255 .unwrap()
256 .record(std::time::Duration::from_micros(200));
257 let config = LoggerConfig {
258 interval: std::time::Duration::from_millis(50),
259 log_path: Some(path.clone()),
260 header: header(),
261 progress_source: None,
262 };
263 let handle = tokio::spawn(run_logger(config, units, cancel_rx));
264 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
266 cancel_tx.send(true).unwrap();
267 handle.await.unwrap();
268
269 let file = std::fs::File::open(&path).unwrap();
270 let mut reader = std::io::BufReader::new(file);
271 let _ = read_file_header(&mut reader).unwrap();
272 let rec = match read_record(&mut reader)
273 .unwrap()
274 .expect("at least one record written")
275 {
276 Record::Histogram(h) => h,
277 Record::Progress(_) => panic!("unexpected progress record"),
278 };
279 assert_eq!(rec.samples_count, 2);
280 assert_eq!(rec.side, Side::Source);
281 assert_eq!(rec.op, MetadataOp::Stat);
282 }
283
284 #[tokio::test]
285 async fn empty_snapshots_publish_via_watch_but_skip_file() {
286 let dir = tempfile::tempdir().unwrap();
287 let path = dir.path().join("test.hdr");
288 let acc = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
289 let (snap_tx, snap_rx) = tokio::sync::watch::channel(
290 hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
291 );
292 let units = vec![LoggerUnit {
293 label: "src-stat",
294 side: Side::Source,
295 op: MetadataOp::Stat,
296 accumulator: acc.clone(),
297 snapshot_tx: snap_tx,
298 }];
299 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
300 let config = LoggerConfig {
301 interval: std::time::Duration::from_millis(50),
302 log_path: Some(path.clone()),
303 header: header(),
304 progress_source: None,
305 };
306 let handle = tokio::spawn(run_logger(config, units, cancel_rx));
307 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
309 cancel_tx.send(true).unwrap();
310 handle.await.unwrap();
311
312 let file = std::fs::File::open(&path).unwrap();
313 let mut reader = std::io::BufReader::new(file);
314 let _ = read_file_header(&mut reader).unwrap();
315 assert!(read_record(&mut reader).unwrap().is_none());
317 assert!(snap_rx.has_changed().unwrap_or(false) || snap_rx.borrow().is_empty());
319 }
320
321 #[tokio::test]
322 async fn cancel_before_first_tick_still_writes_pending_samples() {
323 let dir = tempfile::tempdir().unwrap();
327 let path = dir.path().join("test.hdr");
328 let acc = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
329 let (snap_tx, _snap_rx) = tokio::sync::watch::channel(
330 hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
331 );
332 let units = vec![LoggerUnit {
333 label: "src-stat",
334 side: Side::Source,
335 op: MetadataOp::Stat,
336 accumulator: acc.clone(),
337 snapshot_tx: snap_tx,
338 }];
339 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
340 acc.lock()
342 .unwrap()
343 .record(std::time::Duration::from_micros(42));
344 let config = LoggerConfig {
345 interval: std::time::Duration::from_secs(60),
348 log_path: Some(path.clone()),
349 header: header(),
350 progress_source: None,
351 };
352 let handle = tokio::spawn(run_logger(config, units, cancel_rx));
353 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
356 cancel_tx.send(true).unwrap();
357 handle.await.unwrap();
358
359 let file = std::fs::File::open(&path).unwrap();
360 let mut reader = std::io::BufReader::new(file);
361 let _ = read_file_header(&mut reader).unwrap();
362 let rec = match read_record(&mut reader)
363 .unwrap()
364 .expect("cancellation must flush a final record")
365 {
366 Record::Histogram(h) => h,
367 Record::Progress(_) => panic!("unexpected progress record"),
368 };
369 assert_eq!(rec.samples_count, 1);
370 }
371
372 #[test]
373 fn snapshot_and_publish_uses_per_unit_timestamps() {
374 let dir = tempfile::tempdir().unwrap();
379 let path = dir.path().join("test.hdr");
380 let header = header();
381 let mut writer = Some(std::io::BufWriter::new(
382 std::fs::File::create(&path).unwrap(),
383 ));
384 {
385 use std::io::Write;
386 congestion::format::write_file_header(writer.as_mut().unwrap(), &header).unwrap();
387 writer.as_mut().unwrap().flush().unwrap();
388 }
389
390 let acc_a = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
391 let acc_b = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
392 acc_a
393 .lock()
394 .unwrap()
395 .record(std::time::Duration::from_micros(10));
396 acc_b
397 .lock()
398 .unwrap()
399 .record(std::time::Duration::from_micros(20));
400 let (snap_tx_a, _rx_a) = tokio::sync::watch::channel(
401 hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
402 );
403 let (snap_tx_b, _rx_b) = tokio::sync::watch::channel(
404 hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
405 );
406 let units = vec![
407 LoggerUnit {
408 label: "src-stat",
409 side: Side::Source,
410 op: MetadataOp::Stat,
411 accumulator: acc_a,
412 snapshot_tx: snap_tx_a,
413 },
414 LoggerUnit {
415 label: "dst-stat",
416 side: Side::Destination,
417 op: MetadataOp::Stat,
418 accumulator: acc_b,
419 snapshot_tx: snap_tx_b,
420 },
421 ];
422
423 let before_micros = unix_micros_now();
424 writer = snapshot_and_publish_units(&units, None, writer);
425 let after_micros = unix_micros_now();
426 drop(writer);
427
428 let f = std::fs::File::open(&path).unwrap();
429 let mut reader = std::io::BufReader::new(f);
430 let _ = congestion::format::read_file_header(&mut reader).unwrap();
431 let r1 = congestion::format::read_record(&mut reader)
432 .unwrap()
433 .expect("record 1");
434 let r2 = congestion::format::read_record(&mut reader)
435 .unwrap()
436 .expect("record 2");
437 let r1_ts = r1.unix_micros();
438 let r2_ts = r2.unix_micros();
439 assert!(
440 r1_ts >= before_micros && r1_ts <= after_micros,
441 "record 1 ts {r1_ts} not in [{before_micros}, {after_micros}]",
442 );
443 assert!(
444 r2_ts >= r1_ts && r2_ts <= after_micros,
445 "record 2 ts {r2_ts} must be >= record 1 ts {r1_ts} and <= after {after_micros}",
446 );
447 }
448
449 #[tokio::test]
450 async fn writes_progress_record_per_tick_when_source_set() {
451 let dir = tempfile::tempdir().unwrap();
457 let path = dir.path().join("test.hdr");
458 let acc = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
459 let (snap_tx, _snap_rx) = tokio::sync::watch::channel(
460 hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
461 );
462 let units = vec![LoggerUnit {
463 label: "src-stat",
464 side: Side::Source,
465 op: MetadataOp::Stat,
466 accumulator: acc,
467 snapshot_tx: snap_tx,
468 }];
469 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
470 let payload = br#"{"files_copied":3}"#.to_vec();
471 let payload_for_closure = payload.clone();
472 let config = LoggerConfig {
473 interval: std::time::Duration::from_millis(50),
474 log_path: Some(path.clone()),
475 header: header(),
476 progress_source: Some(Box::new(move || payload_for_closure.clone())),
477 };
478 let handle = tokio::spawn(run_logger(config, units, cancel_rx));
479 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
480 cancel_tx.send(true).unwrap();
481 handle.await.unwrap();
482
483 let f = std::fs::File::open(&path).unwrap();
484 let mut reader = std::io::BufReader::new(f);
485 let _ = read_file_header(&mut reader).unwrap();
486 let mut progress_count = 0;
487 while let Some(rec) = read_record(&mut reader).unwrap() {
488 if let Record::Progress(p) = rec {
489 assert_eq!(p.json, payload);
490 progress_count += 1;
491 }
492 }
493 assert!(
494 progress_count >= 1,
495 "expected ≥1 progress record, got {progress_count}",
496 );
497 }
498}