Skip to main content

common/
observability.rs

1//! Process-wide registry of congestion-control snapshot streams.
2//!
3//! When the auto-meta-throttle setup spawns a `ControlUnit`, it
4//! [`register_unit`]s the unit's snapshot watch receiver. Renderers (the
5//! progress bar, text-update mode, future telemetry exporters) call
6//! [`registered_units`] to enumerate the active controllers and read
7//! their latest snapshots without subscribing to the underlying watch.
8//!
9//! The registry is empty unless adaptive control is active; non-adaptive
10//! runs see an empty list and can render a plain progress bar.
11
12use congestion::ControllerSnapshot;
13
14/// One entry in the registry: the unit's stable label plus a watch
15/// receiver for its snapshot stream. The receiver is cheap to clone
16/// (one ref-count bump), so renderers can pull the latest value
17/// non-blockingly with `borrow()`.
18#[derive(Clone)]
19pub struct RegisteredUnit {
20    pub label: &'static str,
21    pub snapshot_rx: tokio::sync::watch::Receiver<ControllerSnapshot>,
22}
23
24static REGISTRY: std::sync::LazyLock<std::sync::RwLock<Vec<RegisteredUnit>>> =
25    std::sync::LazyLock::new(|| std::sync::RwLock::new(Vec::new()));
26
27/// Register a unit's snapshot stream with the process-wide registry.
28///
29/// Called once per spawned `ControlUnit`. Order is preserved — renderers
30/// display units in registration order, so callers should register in
31/// the order they want lines to appear.
32pub fn register_unit(
33    label: &'static str,
34    snapshot_rx: tokio::sync::watch::Receiver<ControllerSnapshot>,
35) {
36    REGISTRY
37        .write()
38        .expect("observability registry poisoned")
39        .push(RegisteredUnit { label, snapshot_rx });
40}
41
42/// Snapshot of the current registry. Cheap (clones the inner Vec of
43/// `Arc`-backed receivers); intended to be called once per
44/// progress-render tick.
45#[must_use]
46pub fn registered_units() -> Vec<RegisteredUnit> {
47    REGISTRY
48        .read()
49        .expect("observability registry poisoned")
50        .clone()
51}
52
53/// Drop all registered units. Called from the process-wide reset path
54/// in `crate::run` so a second invocation of `run()` in the same
55/// process starts with a clean registry.
56pub fn clear() {
57    REGISTRY
58        .write()
59        .expect("observability registry poisoned")
60        .clear();
61    HIST_REGISTRY
62        .write()
63        .expect("histogram registry poisoned")
64        .clear();
65}
66
67/// One entry in the histogram registry.
68#[derive(Clone)]
69pub struct RegisteredHistogram {
70    pub label: &'static str,
71    pub snapshot_rx: tokio::sync::watch::Receiver<hdrhistogram::Histogram<u64>>,
72    /// The configured snapshot interval — used by the panel renderer to
73    /// label its "(last X.Xs)" header so the label matches the actual
74    /// cadence even when the user overrode the default.
75    pub interval: std::time::Duration,
76}
77
78static HIST_REGISTRY: std::sync::LazyLock<std::sync::RwLock<Vec<RegisteredHistogram>>> =
79    std::sync::LazyLock::new(|| std::sync::RwLock::new(Vec::new()));
80
81/// Register a per-(side, op) histogram snapshot stream. Called once per
82/// active unit by the auto-meta setup when histograms are enabled.
83pub fn register_histogram(
84    label: &'static str,
85    snapshot_rx: tokio::sync::watch::Receiver<hdrhistogram::Histogram<u64>>,
86    interval: std::time::Duration,
87) {
88    HIST_REGISTRY
89        .write()
90        .expect("histogram registry poisoned")
91        .push(RegisteredHistogram {
92            label,
93            snapshot_rx,
94            interval,
95        });
96}
97
98/// Cheap clone of the current histogram registry.
99#[must_use]
100pub fn registered_histograms() -> Vec<RegisteredHistogram> {
101    HIST_REGISTRY
102        .read()
103        .expect("histogram registry poisoned")
104        .clone()
105}
106
107/// Width (in display chars) of every right-aligned numeric column in
108/// the rendered panel. Wide enough to fit the realistic worst cases
109/// without truncation: `999.9µs`, `1234.5×`, `999.9k`.
110const FIELD_WIDTH: usize = 7;
111
112/// Section separator drawn above the auto-meta panel. Matches the
113/// dashed style used elsewhere in the progress printers so the panel
114/// reads as just another section break rather than free-floating text.
115const SEPARATOR: &str = "-----------------------";
116
117/// Render the registered units as a multi-line block suitable for
118/// appending to the progress display. Returns an empty string when
119/// either (a) no units are registered (non-adaptive run) or (b) every
120/// registered unit has zero samples — typically a brief startup window
121/// before the first probe lands, or a unit class that the current tool
122/// never exercises (e.g. `mkdir` for `rcmp`, which only stats both
123/// sides). With per-syscall controllers we register up to 18 units
124/// (Side × MetadataOp); per-tool only a handful actually fire probes
125/// and the rest stay hidden via this `samples_seen > 0` filter.
126///
127/// The format is one fixed-width line per unit, prefixed by a dashed
128/// separator so the panel sits visually apart from the COPIED/REMOVED/
129/// SKIPPED sections above it:
130///
131/// ```text
132/// -----------------------
133/// src-stat   cwnd=  42  base=  0.8ms  curr=  2.1ms  ratio=   2.6×  samples=   1.2k
134/// unlink     cwnd=  18  base=  1.2ms  curr=  3.0ms  ratio=   2.5×  samples= 980.0
135/// rmdir      cwnd=   4  base=  2.4ms  curr=  6.1ms  ratio=   2.5×  samples=  80.0
136/// ```
137///
138/// Unit labels are padded to a uniform width so columns align even
139/// across labels of varying length (`stat`, `dst-read-link`, `open-create`).
140/// Numeric columns are right-aligned to a uniform fixed width.
141#[must_use]
142pub fn render_lines() -> String {
143    let units = registered_units();
144    if units.is_empty() {
145        return String::new();
146    }
147    // Snapshot once per render so a probe completing mid-render can't
148    // make a row appear/disappear between the empty check and the loop.
149    let snapshots: Vec<(&'static str, ControllerSnapshot)> = units
150        .iter()
151        .map(|u| (u.label, *u.snapshot_rx.borrow()))
152        .collect();
153    let visible: Vec<(&'static str, ControllerSnapshot)> = snapshots
154        .into_iter()
155        .filter(|(_, snap)| snap.samples_seen > 0)
156        .collect();
157    if visible.is_empty() {
158        return String::new();
159    }
160    let label_width = visible.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
161    let mut out = String::new();
162    out.push('\n');
163    out.push_str(SEPARATOR);
164    for (label, snap) in &visible {
165        out.push('\n');
166        out.push_str(&format_unit_line(label, label_width, *snap));
167    }
168    out
169}
170
171fn format_unit_line(label: &str, label_width: usize, snap: ControllerSnapshot) -> String {
172    let ratio = if snap.baseline_latency.is_zero() || snap.current_latency.is_zero() {
173        // Either statistic missing → no meaningful ratio. The
174        // controller surfaces both as `Duration::ZERO` in the snapshot
175        // when its underlying `Option<u64>` was `None` (e.g. an empty
176        // short window holds cwnd but leaves the current statistic
177        // unset). Treat both as the unset sentinel and emit "—" rather
178        // than rendering `ratio=0.0×`, which would imply an actual
179        // faster-than-baseline reading.
180        String::from("—")
181    } else {
182        let ratio =
183            snap.current_latency.as_nanos() as f64 / snap.baseline_latency.as_nanos() as f64;
184        format!("{ratio:.1}×")
185    };
186    format!(
187        "{label:<lwidth$}  cwnd={cwnd:>4}  base={base:>fwidth$}  curr={curr:>fwidth$}  ratio={ratio:>fwidth$}  samples={samples:>fwidth$}",
188        label = label,
189        lwidth = label_width,
190        fwidth = FIELD_WIDTH,
191        cwnd = snap.cwnd,
192        base = format_duration(snap.baseline_latency),
193        curr = format_duration(snap.current_latency),
194        ratio = ratio,
195        samples = format_count(snap.samples_seen),
196    )
197}
198
199/// Compact latency formatter. Picks the unit so the number stays in
200/// 1–4 chars (`58`, `1.7`, `33.5`, `999.9`); the outer format string
201/// pads the result to [`FIELD_WIDTH`] so consecutive rows line up.
202fn format_duration(d: std::time::Duration) -> String {
203    if d.is_zero() {
204        return String::from("—");
205    }
206    let nanos = d.as_nanos();
207    if nanos < 1_000 {
208        format!("{nanos}ns")
209    } else if nanos < 1_000_000 {
210        format!("{:.1}µs", nanos as f64 / 1_000.0)
211    } else if nanos < 1_000_000_000 {
212        format!("{:.1}ms", nanos as f64 / 1_000_000.0)
213    } else {
214        format!("{:.1}s", nanos as f64 / 1_000_000_000.0)
215    }
216}
217
218/// Compact thousands formatting — `1234` → `"1.2k"`, `1_500_000` →
219/// `"1.5M"`. Saturates at 'G' (10^9) which is plenty for sample counts.
220fn format_count(n: u64) -> String {
221    if n < 1_000 {
222        n.to_string()
223    } else if n < 1_000_000 {
224        format!("{:.1}k", n as f64 / 1_000.0)
225    } else if n < 1_000_000_000 {
226        format!("{:.1}M", n as f64 / 1_000_000.0)
227    } else {
228        format!("{:.1}G", n as f64 / 1_000_000_000.0)
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    /// The registry is global, so these tests serialize via this guard
237    /// to avoid stepping on each other when run concurrently.
238    static GUARD: std::sync::Mutex<()> = std::sync::Mutex::new(());
239
240    #[test]
241    fn empty_registry_returns_empty_vec() {
242        let _g = GUARD.lock().unwrap();
243        clear();
244        assert!(registered_units().is_empty());
245    }
246
247    #[test]
248    fn registered_units_preserve_insertion_order() {
249        let _g = GUARD.lock().unwrap();
250        clear();
251        let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot::default());
252        let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot::default());
253        register_unit("first", rx_a);
254        register_unit("second", rx_b);
255        let units = registered_units();
256        assert_eq!(units.len(), 2);
257        assert_eq!(units[0].label, "first");
258        assert_eq!(units[1].label, "second");
259        clear();
260    }
261
262    #[test]
263    fn snapshot_updates_visible_via_registered_receiver() {
264        let _g = GUARD.lock().unwrap();
265        clear();
266        let (tx, rx) = tokio::sync::watch::channel(ControllerSnapshot::default());
267        register_unit("only", rx);
268        let new_snapshot = ControllerSnapshot {
269            cwnd: 42,
270            ..ControllerSnapshot::default()
271        };
272        tx.send(new_snapshot).expect("send snapshot");
273        let units = registered_units();
274        assert_eq!(units[0].snapshot_rx.borrow().cwnd, 42);
275        clear();
276    }
277
278    #[test]
279    fn render_lines_is_empty_when_registry_is_empty() {
280        let _g = GUARD.lock().unwrap();
281        clear();
282        assert_eq!(render_lines(), "");
283    }
284
285    #[test]
286    fn render_lines_shows_one_line_per_unit_with_aligned_labels() {
287        let _g = GUARD.lock().unwrap();
288        clear();
289        let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot {
290            cwnd: 8,
291            baseline_latency: std::time::Duration::from_micros(800),
292            current_latency: std::time::Duration::from_millis(2),
293            samples_seen: 1234,
294        });
295        let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot {
296            cwnd: 16,
297            baseline_latency: std::time::Duration::from_millis(1),
298            current_latency: std::time::Duration::from_millis(3),
299            samples_seen: 5678,
300        });
301        register_unit("walk-src", rx_a);
302        register_unit("meta-dst", rx_b);
303        let out = render_lines();
304        let lines: Vec<&str> = out.split('\n').filter(|s| !s.is_empty()).collect();
305        // Separator + 2 unit rows.
306        assert_eq!(lines.len(), 3);
307        assert_eq!(lines[0], SEPARATOR);
308        assert!(lines[1].contains("walk-src"));
309        // cwnd is right-aligned to 4 chars; numeric columns to FIELD_WIDTH.
310        assert!(lines[1].contains("cwnd=   8"));
311        // current / baseline = 2ms / 800µs = 2.5×
312        assert!(lines[1].contains("ratio=   2.5×"));
313        assert!(lines[1].contains("samples=   1.2k"));
314        assert!(lines[2].contains("meta-dst"));
315        assert!(lines[2].contains("cwnd=  16"));
316        assert!(lines[2].contains("samples=   5.7k"));
317        clear();
318    }
319
320    #[test]
321    fn render_lines_skips_units_with_zero_samples() {
322        // Tools that don't exercise a side (e.g. rrm never walks the
323        // destination tree) leave that controller's `samples_seen` at
324        // zero. We don't show the row at all rather than render a
325        // permanent placeholder of dashes.
326        let _g = GUARD.lock().unwrap();
327        clear();
328        let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot {
329            cwnd: 8,
330            baseline_latency: std::time::Duration::from_micros(800),
331            current_latency: std::time::Duration::from_millis(2),
332            samples_seen: 1234,
333        });
334        let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot {
335            cwnd: 1,
336            baseline_latency: std::time::Duration::ZERO,
337            current_latency: std::time::Duration::ZERO,
338            samples_seen: 0,
339        });
340        register_unit("walk-src", rx_a);
341        register_unit("walk-dst", rx_b);
342        let out = render_lines();
343        assert!(out.contains("walk-src"));
344        assert!(!out.contains("walk-dst"));
345        clear();
346    }
347
348    #[test]
349    fn render_lines_is_empty_when_all_units_have_zero_samples() {
350        // At startup, before any probes have completed, every controller
351        // reports samples_seen = 0. The panel shouldn't render a bare
352        // separator with nothing under it.
353        let _g = GUARD.lock().unwrap();
354        clear();
355        let (_tx, rx) = tokio::sync::watch::channel(ControllerSnapshot::default());
356        register_unit("walk-src", rx);
357        assert_eq!(render_lines(), "");
358        clear();
359    }
360
361    #[test]
362    fn render_lines_shows_em_dash_when_baseline_unset() {
363        // It's possible (briefly) for a unit to have samples_seen > 0
364        // but the published snapshot's baseline_latency still at zero, if
365        // the snapshot was captured between on_sample and the first
366        // sample-bearing on_tick. Guard against the resulting 0/0 by
367        // emitting "—" for ratio.
368        let _g = GUARD.lock().unwrap();
369        clear();
370        let (_tx, rx) = tokio::sync::watch::channel(ControllerSnapshot {
371            cwnd: 1,
372            baseline_latency: std::time::Duration::ZERO,
373            current_latency: std::time::Duration::ZERO,
374            samples_seen: 1,
375        });
376        register_unit("walk-src", rx);
377        let out = render_lines();
378        assert!(out.contains("ratio="));
379        assert!(out.contains("—"));
380        clear();
381    }
382
383    #[test]
384    fn render_lines_shows_em_dash_when_only_current_unset() {
385        // Regression: when the long window has samples but the short
386        // window is empty (a common state on ticks where the activity
387        // gap exceeds short_window), the controller publishes a
388        // populated baseline_latency and `current_latency =
389        // Duration::ZERO`. The renderer must treat that as the unset
390        // sentinel and emit "—"; computing
391        // `ratio = current / baseline = 0.0×` would falsely imply a
392        // faster-than-baseline reading.
393        let _g = GUARD.lock().unwrap();
394        clear();
395        let (_tx, rx) = tokio::sync::watch::channel(ControllerSnapshot {
396            cwnd: 5,
397            baseline_latency: std::time::Duration::from_millis(2),
398            current_latency: std::time::Duration::ZERO,
399            samples_seen: 42,
400        });
401        register_unit("idle-short-window", rx);
402        let out = render_lines();
403        assert!(out.contains("ratio="));
404        assert!(
405            out.contains("—"),
406            "expected '—' for unset current, got {out:?}"
407        );
408        assert!(
409            !out.contains("ratio=   0.0×"),
410            "ratio must not render as 0.0× when current is unset: {out}",
411        );
412        clear();
413    }
414
415    #[test]
416    fn render_lines_columns_are_aligned_across_rows() {
417        // Regression: durations like "58ns" (4 chars) and "33.5µs" (6
418        // chars) used to land in unpadded columns, so consecutive rows
419        // were visually misaligned. With FIELD_WIDTH right-alignment,
420        // each "key=" anchor must start at the same display-column on
421        // every rendered row. We compare char counts rather than byte
422        // offsets so the multi-byte `µ` doesn't confuse the check.
423        let _g = GUARD.lock().unwrap();
424        clear();
425        let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot {
426            cwnd: 1,
427            baseline_latency: std::time::Duration::from_nanos(58),
428            current_latency: std::time::Duration::from_micros(33),
429            samples_seen: 629_000,
430        });
431        let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot {
432            cwnd: 1,
433            baseline_latency: std::time::Duration::from_micros(1700),
434            current_latency: std::time::Duration::from_micros(3500),
435            samples_seen: 64_600,
436        });
437        register_unit("walk-src", rx_a);
438        register_unit("meta-src", rx_b);
439        let out = render_lines();
440        let row_lines: Vec<&str> = out
441            .split('\n')
442            .filter(|s| !s.is_empty() && *s != SEPARATOR)
443            .collect();
444        assert_eq!(row_lines.len(), 2);
445        let char_offset = |row: &str, key: &str| -> Option<usize> {
446            let byte = row.find(key)?;
447            Some(row[..byte].chars().count())
448        };
449        for key in ["cwnd=", "base=", "curr=", "ratio=", "samples="] {
450            let col_a = char_offset(row_lines[0], key);
451            let col_b = char_offset(row_lines[1], key);
452            assert_eq!(col_a, col_b, "{key} column misaligned: {row_lines:?}");
453            assert!(col_a.is_some(), "{key} missing from row: {row_lines:?}");
454        }
455        clear();
456    }
457
458    #[test]
459    fn histogram_registry_starts_empty() {
460        let _g = GUARD.lock().unwrap();
461        clear();
462        assert!(registered_histograms().is_empty());
463    }
464
465    #[test]
466    fn registered_histograms_preserve_order() {
467        let _g = GUARD.lock().unwrap();
468        clear();
469        let h_empty = hdrhistogram::Histogram::<u64>::new_with_bounds(1, 1_000_000, 3).unwrap();
470        let (_tx_a, rx_a) = tokio::sync::watch::channel(h_empty.clone());
471        let (_tx_b, rx_b) = tokio::sync::watch::channel(h_empty);
472        register_histogram("first", rx_a, std::time::Duration::from_secs(1));
473        register_histogram("second", rx_b, std::time::Duration::from_secs(1));
474        let units = registered_histograms();
475        assert_eq!(units.len(), 2);
476        assert_eq!(units[0].label, "first");
477        assert_eq!(units[1].label, "second");
478        clear();
479    }
480
481    #[test]
482    fn clear_removes_histogram_registrations_too() {
483        let _g = GUARD.lock().unwrap();
484        clear();
485        let h_empty = hdrhistogram::Histogram::<u64>::new_with_bounds(1, 1_000_000, 3).unwrap();
486        let (_tx, rx) = tokio::sync::watch::channel(h_empty);
487        register_histogram("only", rx, std::time::Duration::from_secs(1));
488        assert_eq!(registered_histograms().len(), 1);
489        clear();
490        assert!(registered_histograms().is_empty());
491    }
492}