1use congestion::ControllerSnapshot;
13
14#[derive(Clone)]
19pub struct RegisteredUnit {
20 pub label: &'static str,
21 pub snapshot_rx: tokio::sync::watch::Receiver<ControllerSnapshot>,
22}
23
24static REGISTRY: std::sync::LazyLock<std::sync::RwLock<Vec<RegisteredUnit>>> =
25 std::sync::LazyLock::new(|| std::sync::RwLock::new(Vec::new()));
26
27pub fn register_unit(
33 label: &'static str,
34 snapshot_rx: tokio::sync::watch::Receiver<ControllerSnapshot>,
35) {
36 REGISTRY
37 .write()
38 .expect("observability registry poisoned")
39 .push(RegisteredUnit { label, snapshot_rx });
40}
41
42#[must_use]
46pub fn registered_units() -> Vec<RegisteredUnit> {
47 REGISTRY
48 .read()
49 .expect("observability registry poisoned")
50 .clone()
51}
52
53pub fn clear() {
57 REGISTRY
58 .write()
59 .expect("observability registry poisoned")
60 .clear();
61 HIST_REGISTRY
62 .write()
63 .expect("histogram registry poisoned")
64 .clear();
65}
66
67#[derive(Clone)]
69pub struct RegisteredHistogram {
70 pub label: &'static str,
71 pub snapshot_rx: tokio::sync::watch::Receiver<hdrhistogram::Histogram<u64>>,
72 pub interval: std::time::Duration,
76}
77
78static HIST_REGISTRY: std::sync::LazyLock<std::sync::RwLock<Vec<RegisteredHistogram>>> =
79 std::sync::LazyLock::new(|| std::sync::RwLock::new(Vec::new()));
80
81pub fn register_histogram(
84 label: &'static str,
85 snapshot_rx: tokio::sync::watch::Receiver<hdrhistogram::Histogram<u64>>,
86 interval: std::time::Duration,
87) {
88 HIST_REGISTRY
89 .write()
90 .expect("histogram registry poisoned")
91 .push(RegisteredHistogram {
92 label,
93 snapshot_rx,
94 interval,
95 });
96}
97
98#[must_use]
100pub fn registered_histograms() -> Vec<RegisteredHistogram> {
101 HIST_REGISTRY
102 .read()
103 .expect("histogram registry poisoned")
104 .clone()
105}
106
107const FIELD_WIDTH: usize = 7;
111
112const SEPARATOR: &str = "-----------------------";
116
117#[must_use]
142pub fn render_lines() -> String {
143 let units = registered_units();
144 if units.is_empty() {
145 return String::new();
146 }
147 let snapshots: Vec<(&'static str, ControllerSnapshot)> = units
150 .iter()
151 .map(|u| (u.label, *u.snapshot_rx.borrow()))
152 .collect();
153 let visible: Vec<(&'static str, ControllerSnapshot)> = snapshots
154 .into_iter()
155 .filter(|(_, snap)| snap.samples_seen > 0)
156 .collect();
157 if visible.is_empty() {
158 return String::new();
159 }
160 let label_width = visible.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
161 let mut out = String::new();
162 out.push('\n');
163 out.push_str(SEPARATOR);
164 for (label, snap) in &visible {
165 out.push('\n');
166 out.push_str(&format_unit_line(label, label_width, *snap));
167 }
168 out
169}
170
171fn format_unit_line(label: &str, label_width: usize, snap: ControllerSnapshot) -> String {
172 let ratio = if snap.baseline_latency.is_zero() || snap.current_latency.is_zero() {
173 String::from("—")
181 } else {
182 let ratio =
183 snap.current_latency.as_nanos() as f64 / snap.baseline_latency.as_nanos() as f64;
184 format!("{ratio:.1}×")
185 };
186 format!(
187 "{label:<lwidth$} cwnd={cwnd:>4} base={base:>fwidth$} curr={curr:>fwidth$} ratio={ratio:>fwidth$} samples={samples:>fwidth$}",
188 label = label,
189 lwidth = label_width,
190 fwidth = FIELD_WIDTH,
191 cwnd = snap.cwnd,
192 base = format_duration(snap.baseline_latency),
193 curr = format_duration(snap.current_latency),
194 ratio = ratio,
195 samples = format_count(snap.samples_seen),
196 )
197}
198
199fn format_duration(d: std::time::Duration) -> String {
203 if d.is_zero() {
204 return String::from("—");
205 }
206 let nanos = d.as_nanos();
207 if nanos < 1_000 {
208 format!("{nanos}ns")
209 } else if nanos < 1_000_000 {
210 format!("{:.1}µs", nanos as f64 / 1_000.0)
211 } else if nanos < 1_000_000_000 {
212 format!("{:.1}ms", nanos as f64 / 1_000_000.0)
213 } else {
214 format!("{:.1}s", nanos as f64 / 1_000_000_000.0)
215 }
216}
217
218fn format_count(n: u64) -> String {
221 if n < 1_000 {
222 n.to_string()
223 } else if n < 1_000_000 {
224 format!("{:.1}k", n as f64 / 1_000.0)
225 } else if n < 1_000_000_000 {
226 format!("{:.1}M", n as f64 / 1_000_000.0)
227 } else {
228 format!("{:.1}G", n as f64 / 1_000_000_000.0)
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 static GUARD: std::sync::Mutex<()> = std::sync::Mutex::new(());
239
240 #[test]
241 fn empty_registry_returns_empty_vec() {
242 let _g = GUARD.lock().unwrap();
243 clear();
244 assert!(registered_units().is_empty());
245 }
246
247 #[test]
248 fn registered_units_preserve_insertion_order() {
249 let _g = GUARD.lock().unwrap();
250 clear();
251 let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot::default());
252 let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot::default());
253 register_unit("first", rx_a);
254 register_unit("second", rx_b);
255 let units = registered_units();
256 assert_eq!(units.len(), 2);
257 assert_eq!(units[0].label, "first");
258 assert_eq!(units[1].label, "second");
259 clear();
260 }
261
262 #[test]
263 fn snapshot_updates_visible_via_registered_receiver() {
264 let _g = GUARD.lock().unwrap();
265 clear();
266 let (tx, rx) = tokio::sync::watch::channel(ControllerSnapshot::default());
267 register_unit("only", rx);
268 let new_snapshot = ControllerSnapshot {
269 cwnd: 42,
270 ..ControllerSnapshot::default()
271 };
272 tx.send(new_snapshot).expect("send snapshot");
273 let units = registered_units();
274 assert_eq!(units[0].snapshot_rx.borrow().cwnd, 42);
275 clear();
276 }
277
278 #[test]
279 fn render_lines_is_empty_when_registry_is_empty() {
280 let _g = GUARD.lock().unwrap();
281 clear();
282 assert_eq!(render_lines(), "");
283 }
284
285 #[test]
286 fn render_lines_shows_one_line_per_unit_with_aligned_labels() {
287 let _g = GUARD.lock().unwrap();
288 clear();
289 let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot {
290 cwnd: 8,
291 baseline_latency: std::time::Duration::from_micros(800),
292 current_latency: std::time::Duration::from_millis(2),
293 samples_seen: 1234,
294 });
295 let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot {
296 cwnd: 16,
297 baseline_latency: std::time::Duration::from_millis(1),
298 current_latency: std::time::Duration::from_millis(3),
299 samples_seen: 5678,
300 });
301 register_unit("walk-src", rx_a);
302 register_unit("meta-dst", rx_b);
303 let out = render_lines();
304 let lines: Vec<&str> = out.split('\n').filter(|s| !s.is_empty()).collect();
305 assert_eq!(lines.len(), 3);
307 assert_eq!(lines[0], SEPARATOR);
308 assert!(lines[1].contains("walk-src"));
309 assert!(lines[1].contains("cwnd= 8"));
311 assert!(lines[1].contains("ratio= 2.5×"));
313 assert!(lines[1].contains("samples= 1.2k"));
314 assert!(lines[2].contains("meta-dst"));
315 assert!(lines[2].contains("cwnd= 16"));
316 assert!(lines[2].contains("samples= 5.7k"));
317 clear();
318 }
319
320 #[test]
321 fn render_lines_skips_units_with_zero_samples() {
322 let _g = GUARD.lock().unwrap();
327 clear();
328 let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot {
329 cwnd: 8,
330 baseline_latency: std::time::Duration::from_micros(800),
331 current_latency: std::time::Duration::from_millis(2),
332 samples_seen: 1234,
333 });
334 let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot {
335 cwnd: 1,
336 baseline_latency: std::time::Duration::ZERO,
337 current_latency: std::time::Duration::ZERO,
338 samples_seen: 0,
339 });
340 register_unit("walk-src", rx_a);
341 register_unit("walk-dst", rx_b);
342 let out = render_lines();
343 assert!(out.contains("walk-src"));
344 assert!(!out.contains("walk-dst"));
345 clear();
346 }
347
348 #[test]
349 fn render_lines_is_empty_when_all_units_have_zero_samples() {
350 let _g = GUARD.lock().unwrap();
354 clear();
355 let (_tx, rx) = tokio::sync::watch::channel(ControllerSnapshot::default());
356 register_unit("walk-src", rx);
357 assert_eq!(render_lines(), "");
358 clear();
359 }
360
361 #[test]
362 fn render_lines_shows_em_dash_when_baseline_unset() {
363 let _g = GUARD.lock().unwrap();
369 clear();
370 let (_tx, rx) = tokio::sync::watch::channel(ControllerSnapshot {
371 cwnd: 1,
372 baseline_latency: std::time::Duration::ZERO,
373 current_latency: std::time::Duration::ZERO,
374 samples_seen: 1,
375 });
376 register_unit("walk-src", rx);
377 let out = render_lines();
378 assert!(out.contains("ratio="));
379 assert!(out.contains("—"));
380 clear();
381 }
382
383 #[test]
384 fn render_lines_shows_em_dash_when_only_current_unset() {
385 let _g = GUARD.lock().unwrap();
394 clear();
395 let (_tx, rx) = tokio::sync::watch::channel(ControllerSnapshot {
396 cwnd: 5,
397 baseline_latency: std::time::Duration::from_millis(2),
398 current_latency: std::time::Duration::ZERO,
399 samples_seen: 42,
400 });
401 register_unit("idle-short-window", rx);
402 let out = render_lines();
403 assert!(out.contains("ratio="));
404 assert!(
405 out.contains("—"),
406 "expected '—' for unset current, got {out:?}"
407 );
408 assert!(
409 !out.contains("ratio= 0.0×"),
410 "ratio must not render as 0.0× when current is unset: {out}",
411 );
412 clear();
413 }
414
415 #[test]
416 fn render_lines_columns_are_aligned_across_rows() {
417 let _g = GUARD.lock().unwrap();
424 clear();
425 let (_tx_a, rx_a) = tokio::sync::watch::channel(ControllerSnapshot {
426 cwnd: 1,
427 baseline_latency: std::time::Duration::from_nanos(58),
428 current_latency: std::time::Duration::from_micros(33),
429 samples_seen: 629_000,
430 });
431 let (_tx_b, rx_b) = tokio::sync::watch::channel(ControllerSnapshot {
432 cwnd: 1,
433 baseline_latency: std::time::Duration::from_micros(1700),
434 current_latency: std::time::Duration::from_micros(3500),
435 samples_seen: 64_600,
436 });
437 register_unit("walk-src", rx_a);
438 register_unit("meta-src", rx_b);
439 let out = render_lines();
440 let row_lines: Vec<&str> = out
441 .split('\n')
442 .filter(|s| !s.is_empty() && *s != SEPARATOR)
443 .collect();
444 assert_eq!(row_lines.len(), 2);
445 let char_offset = |row: &str, key: &str| -> Option<usize> {
446 let byte = row.find(key)?;
447 Some(row[..byte].chars().count())
448 };
449 for key in ["cwnd=", "base=", "curr=", "ratio=", "samples="] {
450 let col_a = char_offset(row_lines[0], key);
451 let col_b = char_offset(row_lines[1], key);
452 assert_eq!(col_a, col_b, "{key} column misaligned: {row_lines:?}");
453 assert!(col_a.is_some(), "{key} missing from row: {row_lines:?}");
454 }
455 clear();
456 }
457
458 #[test]
459 fn histogram_registry_starts_empty() {
460 let _g = GUARD.lock().unwrap();
461 clear();
462 assert!(registered_histograms().is_empty());
463 }
464
465 #[test]
466 fn registered_histograms_preserve_order() {
467 let _g = GUARD.lock().unwrap();
468 clear();
469 let h_empty = hdrhistogram::Histogram::<u64>::new_with_bounds(1, 1_000_000, 3).unwrap();
470 let (_tx_a, rx_a) = tokio::sync::watch::channel(h_empty.clone());
471 let (_tx_b, rx_b) = tokio::sync::watch::channel(h_empty);
472 register_histogram("first", rx_a, std::time::Duration::from_secs(1));
473 register_histogram("second", rx_b, std::time::Duration::from_secs(1));
474 let units = registered_histograms();
475 assert_eq!(units.len(), 2);
476 assert_eq!(units[0].label, "first");
477 assert_eq!(units[1].label, "second");
478 clear();
479 }
480
481 #[test]
482 fn clear_removes_histogram_registrations_too() {
483 let _g = GUARD.lock().unwrap();
484 clear();
485 let h_empty = hdrhistogram::Histogram::<u64>::new_with_bounds(1, 1_000_000, 3).unwrap();
486 let (_tx, rx) = tokio::sync::watch::channel(h_empty);
487 register_histogram("only", rx, std::time::Duration::from_secs(1));
488 assert_eq!(registered_histograms().len(), 1);
489 clear();
490 assert!(registered_histograms().is_empty());
491 }
492}