use congestion::ControllerSnapshot;
#[derive(Clone)]
pub struct RegisteredUnit {
pub label: &'static str,
pub snapshot_rx: tokio::sync::watch::Receiver<ControllerSnapshot>,
}
static REGISTRY: std::sync::LazyLock<std::sync::RwLock<Vec<RegisteredUnit>>> =
std::sync::LazyLock::new(|| std::sync::RwLock::new(Vec::new()));
pub fn register_unit(
label: &'static str,
snapshot_rx: tokio::sync::watch::Receiver<ControllerSnapshot>,
) {
REGISTRY
.write()
.expect("observability registry poisoned")
.push(RegisteredUnit { label, snapshot_rx });
}
#[must_use]
pub fn registered_units() -> Vec<RegisteredUnit> {
REGISTRY
.read()
.expect("observability registry poisoned")
.clone()
}
pub fn clear() {
REGISTRY
.write()
.expect("observability registry poisoned")
.clear();
HIST_REGISTRY
.write()
.expect("histogram registry poisoned")
.clear();
}
#[derive(Clone)]
pub struct RegisteredHistogram {
pub label: &'static str,
pub snapshot_rx: tokio::sync::watch::Receiver<hdrhistogram::Histogram<u64>>,
pub interval: std::time::Duration,
}
static HIST_REGISTRY: std::sync::LazyLock<std::sync::RwLock<Vec<RegisteredHistogram>>> =
std::sync::LazyLock::new(|| std::sync::RwLock::new(Vec::new()));
pub fn register_histogram(
label: &'static str,
snapshot_rx: tokio::sync::watch::Receiver<hdrhistogram::Histogram<u64>>,
interval: std::time::Duration,
) {
HIST_REGISTRY
.write()
.expect("histogram registry poisoned")
.push(RegisteredHistogram {
label,
snapshot_rx,
interval,
});
}
#[must_use]
pub fn registered_histograms() -> Vec<RegisteredHistogram> {
HIST_REGISTRY
.read()
.expect("histogram registry poisoned")
.clone()
}
const FIELD_WIDTH: usize = 7;
const SEPARATOR: &str = "-----------------------";
#[must_use]
pub fn render_lines() -> String {
let units = registered_units();
if units.is_empty() {
return String::new();
}
let snapshots: Vec<(&'static str, ControllerSnapshot)> = units
.iter()
.map(|u| (u.label, *u.snapshot_rx.borrow()))
.collect();
let visible: Vec<(&'static str, ControllerSnapshot)> = snapshots
.into_iter()
.filter(|(_, snap)| snap.samples_seen > 0)
.collect();
if visible.is_empty() {
return String::new();
}
let label_width = visible.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
let mut out = String::new();
out.push('\n');
out.push_str(SEPARATOR);
for (label, snap) in &visible {
out.push('\n');
out.push_str(&format_unit_line(label, label_width, *snap));
}
out
}
fn format_unit_line(label: &str, label_width: usize, snap: ControllerSnapshot) -> String {
let ratio = if snap.baseline_latency.is_zero() || snap.current_latency.is_zero() {
String::from("—")
} else {
let ratio =
snap.current_latency.as_nanos() as f64 / snap.baseline_latency.as_nanos() as f64;
format!("{ratio:.1}×")
};
format!(
"{label:<lwidth$} cwnd={cwnd:>4} base={base:>fwidth$} curr={curr:>fwidth$} ratio={ratio:>fwidth$} samples={samples:>fwidth$}",
label = label,
lwidth = label_width,
fwidth = FIELD_WIDTH,
cwnd = snap.cwnd,
base = format_duration(snap.baseline_latency),
curr = format_duration(snap.current_latency),
ratio = ratio,
samples = format_count(snap.samples_seen),
)
}
fn format_duration(d: std::time::Duration) -> String {
if d.is_zero() {
return String::from("—");
}
let nanos = d.as_nanos();
if nanos < 1_000 {
format!("{nanos}ns")
} else if nanos < 1_000_000 {
format!("{:.1}µs", nanos as f64 / 1_000.0)
} else if nanos < 1_000_000_000 {
format!("{:.1}ms", nanos as f64 / 1_000_000.0)
} else {
format!("{:.1}s", nanos as f64 / 1_000_000_000.0)
}
}
fn format_count(n: u64) -> String {
if n < 1_000 {
n.to_string()
} else if n < 1_000_000 {
format!("{:.1}k", n as f64 / 1_000.0)
} else if n < 1_000_000_000 {
format!("{:.1}M", n as f64 / 1_000_000.0)
} else {
format!("{:.1}G", n as f64 / 1_000_000_000.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
static GUARD: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[test]
fn empty_registry_returns_empty_vec() {
let _g = GUARD.lock().unwrap();
clear();
assert!(registered_units().is_empty());
}
#[test]
fn registered_units_preserve_insertion_order() {
let _g = GUARD.lock().unwrap();
clear();
let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot::default());
let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot::default());
register_unit("first", rx_a);
register_unit("second", rx_b);
let units = registered_units();
assert_eq!(units.len(), 2);
assert_eq!(units[0].label, "first");
assert_eq!(units[1].label, "second");
clear();
}
#[test]
fn snapshot_updates_visible_via_registered_receiver() {
let _g = GUARD.lock().unwrap();
clear();
let (tx, rx) = tokio::sync::watch::channel(ControllerSnapshot::default());
register_unit("only", rx);
let new_snapshot = ControllerSnapshot {
cwnd: 42,
..ControllerSnapshot::default()
};
tx.send(new_snapshot).expect("send snapshot");
let units = registered_units();
assert_eq!(units[0].snapshot_rx.borrow().cwnd, 42);
clear();
}
#[test]
fn render_lines_is_empty_when_registry_is_empty() {
let _g = GUARD.lock().unwrap();
clear();
assert_eq!(render_lines(), "");
}
#[test]
fn render_lines_shows_one_line_per_unit_with_aligned_labels() {
let _g = GUARD.lock().unwrap();
clear();
let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot {
cwnd: 8,
baseline_latency: std::time::Duration::from_micros(800),
current_latency: std::time::Duration::from_millis(2),
samples_seen: 1234,
});
let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot {
cwnd: 16,
baseline_latency: std::time::Duration::from_millis(1),
current_latency: std::time::Duration::from_millis(3),
samples_seen: 5678,
});
register_unit("walk-src", rx_a);
register_unit("meta-dst", rx_b);
let out = render_lines();
let lines: Vec<&str> = out.split('\n').filter(|s| !s.is_empty()).collect();
assert_eq!(lines.len(), 3);
assert_eq!(lines[0], SEPARATOR);
assert!(lines[1].contains("walk-src"));
assert!(lines[1].contains("cwnd= 8"));
assert!(lines[1].contains("ratio= 2.5×"));
assert!(lines[1].contains("samples= 1.2k"));
assert!(lines[2].contains("meta-dst"));
assert!(lines[2].contains("cwnd= 16"));
assert!(lines[2].contains("samples= 5.7k"));
clear();
}
#[test]
fn render_lines_skips_units_with_zero_samples() {
let _g = GUARD.lock().unwrap();
clear();
let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot {
cwnd: 8,
baseline_latency: std::time::Duration::from_micros(800),
current_latency: std::time::Duration::from_millis(2),
samples_seen: 1234,
});
let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot {
cwnd: 1,
baseline_latency: std::time::Duration::ZERO,
current_latency: std::time::Duration::ZERO,
samples_seen: 0,
});
register_unit("walk-src", rx_a);
register_unit("walk-dst", rx_b);
let out = render_lines();
assert!(out.contains("walk-src"));
assert!(!out.contains("walk-dst"));
clear();
}
#[test]
fn render_lines_is_empty_when_all_units_have_zero_samples() {
let _g = GUARD.lock().unwrap();
clear();
let (_tx, rx) = tokio::sync::watch::channel(ControllerSnapshot::default());
register_unit("walk-src", rx);
assert_eq!(render_lines(), "");
clear();
}
#[test]
fn render_lines_shows_em_dash_when_baseline_unset() {
let _g = GUARD.lock().unwrap();
clear();
let (_tx, rx) = tokio::sync::watch::channel(ControllerSnapshot {
cwnd: 1,
baseline_latency: std::time::Duration::ZERO,
current_latency: std::time::Duration::ZERO,
samples_seen: 1,
});
register_unit("walk-src", rx);
let out = render_lines();
assert!(out.contains("ratio="));
assert!(out.contains("—"));
clear();
}
#[test]
fn render_lines_shows_em_dash_when_only_current_unset() {
let _g = GUARD.lock().unwrap();
clear();
let (_tx, rx) = tokio::sync::watch::channel(ControllerSnapshot {
cwnd: 5,
baseline_latency: std::time::Duration::from_millis(2),
current_latency: std::time::Duration::ZERO,
samples_seen: 42,
});
register_unit("idle-short-window", rx);
let out = render_lines();
assert!(out.contains("ratio="));
assert!(
out.contains("—"),
"expected '—' for unset current, got {out:?}"
);
assert!(
!out.contains("ratio= 0.0×"),
"ratio must not render as 0.0× when current is unset: {out}",
);
clear();
}
#[test]
fn render_lines_columns_are_aligned_across_rows() {
let _g = GUARD.lock().unwrap();
clear();
let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot {
cwnd: 1,
baseline_latency: std::time::Duration::from_nanos(58),
current_latency: std::time::Duration::from_micros(33),
samples_seen: 629_000,
});
let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot {
cwnd: 1,
baseline_latency: std::time::Duration::from_micros(1700),
current_latency: std::time::Duration::from_micros(3500),
samples_seen: 64_600,
});
register_unit("walk-src", rx_a);
register_unit("meta-src", rx_b);
let out = render_lines();
let row_lines: Vec<&str> = out
.split('\n')
.filter(|s| !s.is_empty() && *s != SEPARATOR)
.collect();
assert_eq!(row_lines.len(), 2);
let char_offset = |row: &str, key: &str| -> Option<usize> {
let byte = row.find(key)?;
Some(row[..byte].chars().count())
};
for key in ["cwnd=", "base=", "curr=", "ratio=", "samples="] {
let col_a = char_offset(row_lines[0], key);
let col_b = char_offset(row_lines[1], key);
assert_eq!(col_a, col_b, "{key} column misaligned: {row_lines:?}");
assert!(col_a.is_some(), "{key} missing from row: {row_lines:?}");
}
clear();
}
#[test]
fn histogram_registry_starts_empty() {
let _g = GUARD.lock().unwrap();
clear();
assert!(registered_histograms().is_empty());
}
#[test]
fn registered_histograms_preserve_order() {
let _g = GUARD.lock().unwrap();
clear();
let h_empty = hdrhistogram::Histogram::<u64>::new_with_bounds(1, 1_000_000, 3).unwrap();
let (_tx_a, rx_a) = tokio::sync::watch::channel(h_empty.clone());
let (_tx_b, rx_b) = tokio::sync::watch::channel(h_empty);
register_histogram("first", rx_a, std::time::Duration::from_secs(1));
register_histogram("second", rx_b, std::time::Duration::from_secs(1));
let units = registered_histograms();
assert_eq!(units.len(), 2);
assert_eq!(units[0].label, "first");
assert_eq!(units[1].label, "second");
clear();
}
#[test]
fn clear_removes_histogram_registrations_too() {
let _g = GUARD.lock().unwrap();
clear();
let h_empty = hdrhistogram::Histogram::<u64>::new_with_bounds(1, 1_000_000, 3).unwrap();
let (_tx, rx) = tokio::sync::watch::channel(h_empty);
register_histogram("only", rx, std::time::Duration::from_secs(1));
assert_eq!(registered_histograms().len(), 1);
clear();
assert!(registered_histograms().is_empty());
}
}