Skip to main content

common/
histogram_logger.rs

1//! Process-wide histogram logger task.
2//!
3//! Owns one `Arc<Mutex<HistogramAccumulator>>` per registered (side, op)
4//! pair (each shared with the corresponding `ControlUnit`). On each tick,
5//! takes a snapshot of every accumulator, publishes the snapshot through
6//! a per-unit watch channel for the live display, and (when a log path
7//! was configured) appends a binary record per non-empty snapshot.
8//!
9//! When the task ticks past a snapshot interval, it uses the actual
10//! snapshot end time as the record's `unix_micros` field — readers see
11//! the true coverage even if the host was loaded.
12
13use congestion::format::{
14    LogHeader, write_file_header, write_histogram_record, write_progress_record,
15};
16use congestion::{HistogramAccumulator, MetadataOp, Side};
17
18/// Closure that, when called, returns the JSON-encoded current progress
19/// snapshot. The logger calls this once per tick (only when a log file
20/// is being written) and emits one Progress record. Boxed so the
21/// concrete snapshot type stays in the caller's crate — the logger
22/// doesn't depend on it.
23pub type ProgressSource = Box<dyn Fn() -> Vec<u8> + Send + Sync>;
24
25/// One slot the logger owns: the accumulator (shared with a ControlUnit)
26/// and the watch sender used to publish snapshots to the display.
27pub struct LoggerUnit {
28    pub label: &'static str,
29    pub side: Side,
30    pub op: MetadataOp,
31    pub accumulator: std::sync::Arc<std::sync::Mutex<HistogramAccumulator>>,
32    pub snapshot_tx: tokio::sync::watch::Sender<hdrhistogram::Histogram<u64>>,
33}
34
35/// Configuration for the logger task.
36pub struct LoggerConfig {
37    pub interval: std::time::Duration,
38    pub log_path: Option<std::path::PathBuf>,
39    pub header: LogHeader,
40    /// Optional progress source. When set and a log file is open, the
41    /// logger calls it once per tick and writes one Progress record
42    /// carrying the returned JSON bytes — letting offline tools
43    /// correlate latency distributions with the throughput counters
44    /// from the progress bar.
45    pub progress_source: Option<ProgressSource>,
46}
47
48/// Run the logger task: ticks, snapshots, publishes, optionally writes
49/// to file. Exits when the provided cancellation token signals.
50pub async fn run_logger(
51    config: LoggerConfig,
52    units: Vec<LoggerUnit>,
53    mut cancel: tokio::sync::watch::Receiver<bool>,
54) {
55    let mut writer: Option<std::io::BufWriter<std::fs::File>> = match &config.log_path {
56        Some(path) => {
57            let mut open_options = std::fs::OpenOptions::new();
58            open_options.create(true).write(true).truncate(true);
59            #[cfg(unix)]
60            {
61                use std::os::unix::fs::OpenOptionsExt;
62                open_options.custom_flags(libc::O_NOFOLLOW);
63            }
64            match open_options.open(path) {
65                Ok(f) => {
66                    let mut w = std::io::BufWriter::new(f);
67                    if let Err(err) = write_file_header(&mut w, &config.header) {
68                        tracing::warn!(
69                            "histogram-logger: failed to write file header: {err:#}; \
70                                        disabling file output"
71                        );
72                        None
73                    } else {
74                        Some(w)
75                    }
76                }
77                Err(err) => {
78                    tracing::warn!(
79                        "histogram-logger: failed to open {path:?}: {err:#}; \
80                                    disabling file output"
81                    );
82                    None
83                }
84            }
85        }
86        None => None,
87    };
88    let progress_source = config.progress_source;
89    let mut interval = tokio::time::interval(config.interval);
90    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
91    interval.tick().await;
92    loop {
93        tokio::select! {
94            _ = interval.tick() => {
95                writer = snapshot_and_publish_units(&units, progress_source.as_deref(), writer);
96            }
97            _ = cancel.changed() => {
98                if *cancel.borrow() {
99                    // Flush any samples accumulated since the last tick
100                    // so a short copy / partial final interval doesn't lose data.
101                    drop(snapshot_and_publish_units(&units, progress_source.as_deref(), writer));
102                    break;
103                }
104            }
105        }
106    }
107    tracing::debug!("histogram-logger: exiting");
108}
109
110/// Snapshot every accumulator, publish to its watch, optionally write
111/// to the log file. When `progress_source` is set and a writer is
112/// active, append one Progress record per call carrying the
113/// JSON-encoded snapshot the closure returns. Returns the (possibly
114/// None'd) writer back to the caller — if a write or flush fails,
115/// the returned Option is None and a warning has been emitted.
116fn snapshot_and_publish_units(
117    units: &[LoggerUnit],
118    progress_source: Option<&(dyn Fn() -> Vec<u8> + Send + Sync)>,
119    mut writer: Option<std::io::BufWriter<std::fs::File>>,
120) -> Option<std::io::BufWriter<std::fs::File>> {
121    use std::io::Write;
122    for unit in units {
123        let snap = unit
124            .accumulator
125            .lock()
126            .expect("histogram accumulator mutex poisoned")
127            .snapshot_and_reset();
128        // Capture the snapshot's end-time AFTER the lock+reset so the
129        // record's unix_micros reflects what samples are actually in
130        // the snapshot. With synchronous histogram capture in the
131        // RoutingSink, samples can land in *later* units' accumulators
132        // while this loop is still walking earlier ones; a single
133        // pre-loop timestamp would backdate those later snapshots.
134        let snapshot_micros = unix_micros_now();
135        let _ = unit.snapshot_tx.send(snap.clone());
136        if snap.is_empty() {
137            continue;
138        }
139        if let Some(w) = writer.as_mut()
140            && let Err(err) = write_histogram_record(w, snapshot_micros, unit.side, unit.op, &snap)
141        {
142            tracing::warn!(
143                "histogram-logger: write_histogram_record({label}) failed: {err:#}; \
144                 disabling file output",
145                label = unit.label,
146            );
147            writer = None;
148            break;
149        }
150    }
151    // emit a progress record after the unit loop so its timestamp
152    // bounds the tick from above: every preceding unit record is at or
153    // before this point. progress is monotonic, so we always write
154    // it — empty progress (all zeros) is meaningful at run start.
155    // an empty json payload is the source's "skip this tick" signal
156    // (e.g. transient encoding failure already logged inside src()); we
157    // drop the record rather than emit something unparseable.
158    if let Some(src) = progress_source
159        && let Some(w) = writer.as_mut()
160    {
161        let json = src();
162        let ts = unix_micros_now();
163        if !json.is_empty()
164            && let Err(err) = write_progress_record(w, ts, &json)
165        {
166            tracing::warn!(
167                "histogram-logger: write_progress_record failed: {err:#}; \
168                 disabling file output",
169            );
170            writer = None;
171        }
172    }
173    if let Some(w) = writer.as_mut()
174        && let Err(err) = w.flush()
175    {
176        tracing::warn!("histogram-logger: flush failed: {err:#}; disabling file output",);
177        writer = None;
178    }
179    writer
180}
181
182fn unix_micros_now() -> u64 {
183    std::time::SystemTime::now()
184        .duration_since(std::time::UNIX_EPOCH)
185        .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
186        .unwrap_or(0)
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    use congestion::format::{
193        AutoMetaSnapshot, FORMAT_VERSION, HdrSnapshot, LogHeader, Record, UnitLabel,
194        read_file_header, read_record,
195    };
196
197    fn header() -> LogHeader {
198        LogHeader {
199            format_version: FORMAT_VERSION,
200            tool: "test".into(),
201            tool_version: "0.0.0".into(),
202            hostname: "h".into(),
203            pid: 0,
204            start_unix_micros: 0,
205            snapshot_interval_micros: 100_000,
206            auto_meta: AutoMetaSnapshot {
207                initial_cwnd: 1,
208                min_cwnd: 1,
209                max_cwnd: 4096,
210                alpha: 1.3,
211                beta: 1.8,
212                increase_step: 1,
213                decrease_step: 1,
214                baseline_percentile: 0.1,
215                current_percentile: 0.5,
216                long_window_micros: 10_000_000,
217                short_window_micros: 1_000_000,
218                tick_interval_micros: 50_000,
219            },
220            hdr: HdrSnapshot {
221                lowest_discernible_micros: 1,
222                highest_trackable_micros: 3_600_000_000,
223                significant_figures: 3,
224                unit: "microseconds".into(),
225            },
226            unit_labels: vec![UnitLabel {
227                side: 0,
228                op: 0,
229                label: "src-stat".into(),
230            }],
231        }
232    }
233
234    #[tokio::test]
235    async fn writes_records_to_file_for_non_empty_snapshots() {
236        let dir = tempfile::tempdir().unwrap();
237        let path = dir.path().join("test.hdr");
238        let acc = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
239        let (snap_tx, _snap_rx) = tokio::sync::watch::channel(
240            hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
241        );
242        let units = vec![LoggerUnit {
243            label: "src-stat",
244            side: Side::Source,
245            op: MetadataOp::Stat,
246            accumulator: acc.clone(),
247            snapshot_tx: snap_tx,
248        }];
249        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
250        // Pre-load some samples so the first tick records something.
251        acc.lock()
252            .unwrap()
253            .record(std::time::Duration::from_micros(100));
254        acc.lock()
255            .unwrap()
256            .record(std::time::Duration::from_micros(200));
257        let config = LoggerConfig {
258            interval: std::time::Duration::from_millis(50),
259            log_path: Some(path.clone()),
260            header: header(),
261            progress_source: None,
262        };
263        let handle = tokio::spawn(run_logger(config, units, cancel_rx));
264        // Wait for at least one tick to fire and write a record.
265        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
266        cancel_tx.send(true).unwrap();
267        handle.await.unwrap();
268
269        let file = std::fs::File::open(&path).unwrap();
270        let mut reader = std::io::BufReader::new(file);
271        let _ = read_file_header(&mut reader).unwrap();
272        let rec = match read_record(&mut reader)
273            .unwrap()
274            .expect("at least one record written")
275        {
276            Record::Histogram(h) => h,
277            Record::Progress(_) => panic!("unexpected progress record"),
278        };
279        assert_eq!(rec.samples_count, 2);
280        assert_eq!(rec.side, Side::Source);
281        assert_eq!(rec.op, MetadataOp::Stat);
282    }
283
284    #[tokio::test]
285    async fn empty_snapshots_publish_via_watch_but_skip_file() {
286        let dir = tempfile::tempdir().unwrap();
287        let path = dir.path().join("test.hdr");
288        let acc = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
289        let (snap_tx, snap_rx) = tokio::sync::watch::channel(
290            hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
291        );
292        let units = vec![LoggerUnit {
293            label: "src-stat",
294            side: Side::Source,
295            op: MetadataOp::Stat,
296            accumulator: acc.clone(),
297            snapshot_tx: snap_tx,
298        }];
299        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
300        let config = LoggerConfig {
301            interval: std::time::Duration::from_millis(50),
302            log_path: Some(path.clone()),
303            header: header(),
304            progress_source: None,
305        };
306        let handle = tokio::spawn(run_logger(config, units, cancel_rx));
307        // Don't preload any samples; let the logger tick.
308        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
309        cancel_tx.send(true).unwrap();
310        handle.await.unwrap();
311
312        let file = std::fs::File::open(&path).unwrap();
313        let mut reader = std::io::BufReader::new(file);
314        let _ = read_file_header(&mut reader).unwrap();
315        // No records were written.
316        assert!(read_record(&mut reader).unwrap().is_none());
317        // But the watch has at least one update (the empty snapshot).
318        assert!(snap_rx.has_changed().unwrap_or(false) || snap_rx.borrow().is_empty());
319    }
320
321    #[tokio::test]
322    async fn cancel_before_first_tick_still_writes_pending_samples() {
323        // Regression: a short-lived copy may finish before the first
324        // periodic tick fires. The cancel arm must take one final snapshot
325        // before exiting so the log isn't header-only.
326        let dir = tempfile::tempdir().unwrap();
327        let path = dir.path().join("test.hdr");
328        let acc = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
329        let (snap_tx, _snap_rx) = tokio::sync::watch::channel(
330            hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
331        );
332        let units = vec![LoggerUnit {
333            label: "src-stat",
334            side: Side::Source,
335            op: MetadataOp::Stat,
336            accumulator: acc.clone(),
337            snapshot_tx: snap_tx,
338        }];
339        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
340        // Pre-load samples, then cancel before any tick fires.
341        acc.lock()
342            .unwrap()
343            .record(std::time::Duration::from_micros(42));
344        let config = LoggerConfig {
345            // Long interval so the periodic tick definitely doesn't fire
346            // before our cancel signal does.
347            interval: std::time::Duration::from_secs(60),
348            log_path: Some(path.clone()),
349            header: header(),
350            progress_source: None,
351        };
352        let handle = tokio::spawn(run_logger(config, units, cancel_rx));
353        // Give the task a moment to start and consume the initial tick,
354        // then send cancel before the next 60s tick.
355        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
356        cancel_tx.send(true).unwrap();
357        handle.await.unwrap();
358
359        let file = std::fs::File::open(&path).unwrap();
360        let mut reader = std::io::BufReader::new(file);
361        let _ = read_file_header(&mut reader).unwrap();
362        let rec = match read_record(&mut reader)
363            .unwrap()
364            .expect("cancellation must flush a final record")
365        {
366            Record::Histogram(h) => h,
367            Record::Progress(_) => panic!("unexpected progress record"),
368        };
369        assert_eq!(rec.samples_count, 1);
370    }
371
372    #[test]
373    fn snapshot_and_publish_uses_per_unit_timestamps() {
374        // Regression: a single pre-loop timestamp would stamp later
375        // units' records with a stale time, backdating samples that
376        // were synchronously recorded into them while the loop was
377        // walking earlier units.
378        let dir = tempfile::tempdir().unwrap();
379        let path = dir.path().join("test.hdr");
380        let header = header();
381        let mut writer = Some(std::io::BufWriter::new(
382            std::fs::File::create(&path).unwrap(),
383        ));
384        {
385            use std::io::Write;
386            congestion::format::write_file_header(writer.as_mut().unwrap(), &header).unwrap();
387            writer.as_mut().unwrap().flush().unwrap();
388        }
389
390        let acc_a = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
391        let acc_b = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
392        acc_a
393            .lock()
394            .unwrap()
395            .record(std::time::Duration::from_micros(10));
396        acc_b
397            .lock()
398            .unwrap()
399            .record(std::time::Duration::from_micros(20));
400        let (snap_tx_a, _rx_a) = tokio::sync::watch::channel(
401            hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
402        );
403        let (snap_tx_b, _rx_b) = tokio::sync::watch::channel(
404            hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
405        );
406        let units = vec![
407            LoggerUnit {
408                label: "src-stat",
409                side: Side::Source,
410                op: MetadataOp::Stat,
411                accumulator: acc_a,
412                snapshot_tx: snap_tx_a,
413            },
414            LoggerUnit {
415                label: "dst-stat",
416                side: Side::Destination,
417                op: MetadataOp::Stat,
418                accumulator: acc_b,
419                snapshot_tx: snap_tx_b,
420            },
421        ];
422
423        let before_micros = unix_micros_now();
424        writer = snapshot_and_publish_units(&units, None, writer);
425        let after_micros = unix_micros_now();
426        drop(writer);
427
428        let f = std::fs::File::open(&path).unwrap();
429        let mut reader = std::io::BufReader::new(f);
430        let _ = congestion::format::read_file_header(&mut reader).unwrap();
431        let r1 = congestion::format::read_record(&mut reader)
432            .unwrap()
433            .expect("record 1");
434        let r2 = congestion::format::read_record(&mut reader)
435            .unwrap()
436            .expect("record 2");
437        let r1_ts = r1.unix_micros();
438        let r2_ts = r2.unix_micros();
439        assert!(
440            r1_ts >= before_micros && r1_ts <= after_micros,
441            "record 1 ts {r1_ts} not in [{before_micros}, {after_micros}]",
442        );
443        assert!(
444            r2_ts >= r1_ts && r2_ts <= after_micros,
445            "record 2 ts {r2_ts} must be >= record 1 ts {r1_ts} and <= after {after_micros}",
446        );
447    }
448
449    #[tokio::test]
450    async fn writes_progress_record_per_tick_when_source_set() {
451        // Even when the histogram accumulator is empty (no samples were
452        // recorded into it this tick), a configured progress source
453        // must still emit one Progress record per tick — progress
454        // counters are monotonic and meaningful from the first sample
455        // onward, including zero state.
456        let dir = tempfile::tempdir().unwrap();
457        let path = dir.path().join("test.hdr");
458        let acc = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
459        let (snap_tx, _snap_rx) = tokio::sync::watch::channel(
460            hdrhistogram::Histogram::<u64>::new_with_bounds(1, 3_600_000_000, 3).unwrap(),
461        );
462        let units = vec![LoggerUnit {
463            label: "src-stat",
464            side: Side::Source,
465            op: MetadataOp::Stat,
466            accumulator: acc,
467            snapshot_tx: snap_tx,
468        }];
469        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
470        let payload = br#"{"files_copied":3}"#.to_vec();
471        let payload_for_closure = payload.clone();
472        let config = LoggerConfig {
473            interval: std::time::Duration::from_millis(50),
474            log_path: Some(path.clone()),
475            header: header(),
476            progress_source: Some(Box::new(move || payload_for_closure.clone())),
477        };
478        let handle = tokio::spawn(run_logger(config, units, cancel_rx));
479        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
480        cancel_tx.send(true).unwrap();
481        handle.await.unwrap();
482
483        let f = std::fs::File::open(&path).unwrap();
484        let mut reader = std::io::BufReader::new(f);
485        let _ = read_file_header(&mut reader).unwrap();
486        let mut progress_count = 0;
487        while let Some(rec) = read_record(&mut reader).unwrap() {
488            if let Record::Progress(p) = rec {
489                assert_eq!(p.json, payload);
490                progress_count += 1;
491            }
492        }
493        assert!(
494            progress_count >= 1,
495            "expected ≥1 progress record, got {progress_count}",
496        );
497    }
498}