lvqr_admin/slo.rs
1//! Latency SLO tracker for the `/api/v1/slo` admin route.
2//!
3//! Tier 4 item 4.7 session A. Records per-subscriber glass-to-glass
4//! latency samples on every egress fragment delivery and exposes a
5//! percentile snapshot for the admin JSON route + a Prometheus
6//! histogram (`lvqr_subscriber_glass_to_glass_ms`) for long-term
7//! observability.
8//!
9//! # Shape
10//!
11//! The tracker is a shared `Arc<LatencyTracker>` handed to every
12//! egress surface that wants to contribute samples. Each
13//! `record(broadcast, transport, latency_ms)` call:
14//!
15//! 1. Fires a `metrics::histogram!` value on the process-wide
16//! metrics recorder (one metric per sample, tagged with
17//! `broadcast` + `transport` labels). The default Prometheus
18//! histogram bucket layout covers the 0..=60_000 ms range we
19//! care about for live video.
20//! 2. Pushes the latency sample into a per-(`broadcast`, `transport`)
21//! ring buffer of up to `MAX_SAMPLES_PER_KEY` samples. The ring
22//! buffer is the source of truth for the JSON `/api/v1/slo`
23//! route; we compute p50 / p95 / p99 on demand by sorting the
24//! buffer (cheap: 1024 u64 sort is ~10 us on a modern host, the
25//! route is low-QPS).
26//!
27//! # Anti-scope (107 A)
28//!
29//! * **Streaming-quantile estimators (CKMS, HDR, etc.)**. Simple
30//! fixed-size ring buffer + sort-on-query is fine for the admin
31//! route's low-QPS use case and avoids a new dep.
32//! * **Per-subscriber breakdowns**. The tracker keys on
33//! `(broadcast, transport)` so admin operators see the aggregate
34//! picture per egress surface; per-subscriber latency drilldown
35//! is a future Grafana-side query atop the histogram output.
36//! * **Time-windowed retention**. The ring buffer is size-bounded,
37//! not time-bounded. Samples are FIFO-evicted regardless of age;
38//! a busy broadcast keeps ~1024 recent samples, a quiet one keeps
39//! older but still-relevant samples until new traffic arrives.
40
41use std::sync::Arc;
42
43use parking_lot::RwLock;
44use serde::{Deserialize, Serialize};
45
46/// Cap on how many samples we keep per (broadcast, transport) key.
47/// 1024 is a compromise: big enough that p99 is statistically
48/// meaningful (10 tail samples in the window), small enough that the
49/// sort-on-query path stays cheap.
50const MAX_SAMPLES_PER_KEY: usize = 1024;
51
52/// Combined key for a single tracker bucket. Kept separate from
53/// `SloEntry`'s shape so the tracker can hash/compare the key
54/// without double-cloning strings on every `record` call.
55#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56struct TrackerKey {
57 broadcast: String,
58 transport: String,
59}
60
61/// Ring buffer of latency samples. Oldest samples evict first.
62#[derive(Default)]
63struct SampleBuffer {
64 samples: Vec<u64>,
65 next_index: usize,
66 total: u64,
67}
68
69impl SampleBuffer {
70 fn push(&mut self, sample: u64) {
71 if self.samples.len() < MAX_SAMPLES_PER_KEY {
72 self.samples.push(sample);
73 } else {
74 self.samples[self.next_index] = sample;
75 self.next_index = (self.next_index + 1) % MAX_SAMPLES_PER_KEY;
76 }
77 self.total = self.total.saturating_add(1);
78 }
79
80 /// Compute p50 / p95 / p99 / max over the retained samples.
81 /// Returns all-zero values when the buffer is empty.
82 fn percentiles(&self) -> (u64, u64, u64, u64) {
83 if self.samples.is_empty() {
84 return (0, 0, 0, 0);
85 }
86 let mut sorted: Vec<u64> = self.samples.clone();
87 sorted.sort_unstable();
88 let n = sorted.len();
89 let p50 = sorted[(n * 50 / 100).min(n - 1)];
90 let p95 = sorted[(n * 95 / 100).min(n - 1)];
91 let p99 = sorted[(n * 99 / 100).min(n - 1)];
92 let max = *sorted.last().unwrap();
93 (p50, p95, p99, max)
94 }
95}
96
97/// Per-broadcast + per-transport latency snapshot returned by
98/// [`LatencyTracker::snapshot`] and serialized on the
99/// `GET /api/v1/slo` route.
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
101pub struct SloEntry {
102 /// Broadcast name (e.g. `"live/demo"`).
103 pub broadcast: String,
104 /// Egress surface: `"hls"`, `"ws"`, `"dash"`, `"moq"`, etc.
105 pub transport: String,
106 /// 50th percentile latency in milliseconds across the retained
107 /// sample window.
108 pub p50_ms: u64,
109 /// 95th percentile latency.
110 pub p95_ms: u64,
111 /// 99th percentile latency.
112 pub p99_ms: u64,
113 /// Peak observed latency in the retained sample window.
114 pub max_ms: u64,
115 /// Count of samples retained in the ring buffer (`<=
116 /// MAX_SAMPLES_PER_KEY`).
117 pub sample_count: usize,
118 /// Total samples ever observed for this key since tracker
119 /// construction (not bounded by the ring buffer).
120 pub total_observed: u64,
121}
122
123/// Thread-safe latency SLO tracker. Cheap to clone (internal state
124/// is behind `Arc`).
125#[derive(Clone, Default)]
126pub struct LatencyTracker {
127 buckets: Arc<RwLock<std::collections::HashMap<TrackerKey, SampleBuffer>>>,
128}
129
130impl LatencyTracker {
131 /// Build a fresh tracker with no samples. Cheap; the typical
132 /// caller constructs one per server and clones the handle out
133 /// to every egress surface + the admin state.
134 pub fn new() -> Self {
135 Self::default()
136 }
137
138 /// Record one latency sample. `broadcast` is the source
139 /// broadcast name (e.g. `"live/demo"`); `transport` is the
140 /// egress surface producing the delivery (e.g. `"hls"`,
141 /// `"ws"`). `latency_ms` is the UNIX-wall-clock delta between
142 /// the ingest-side fragment stamp and the egress-side emit
143 /// point. A sample of `0` is still recorded -- zero latency is
144 /// valid (same-tick delivery) and we do not want to conflate
145 /// that with an unset ingest timestamp (the caller is expected
146 /// to skip the call entirely when the ingest-time stamp is
147 /// missing).
148 pub fn record(&self, broadcast: &str, transport: &str, latency_ms: u64) {
149 metrics::histogram!(
150 "lvqr_subscriber_glass_to_glass_ms",
151 "broadcast" => broadcast.to_string(),
152 "transport" => transport.to_string(),
153 )
154 .record(latency_ms as f64);
155
156 let key = TrackerKey {
157 broadcast: broadcast.to_string(),
158 transport: transport.to_string(),
159 };
160 let mut guard = self.buckets.write();
161 guard.entry(key).or_default().push(latency_ms);
162 }
163
164 /// Snapshot every tracked `(broadcast, transport)` key with the
165 /// current p50 / p95 / p99 / max + sample counts. Sorted by
166 /// broadcast then transport so the admin route's JSON output is
167 /// deterministic.
168 pub fn snapshot(&self) -> Vec<SloEntry> {
169 let guard = self.buckets.read();
170 let mut out: Vec<SloEntry> = guard
171 .iter()
172 .map(|(key, buf)| {
173 let (p50, p95, p99, max) = buf.percentiles();
174 SloEntry {
175 broadcast: key.broadcast.clone(),
176 transport: key.transport.clone(),
177 p50_ms: p50,
178 p95_ms: p95,
179 p99_ms: p99,
180 max_ms: max,
181 sample_count: buf.samples.len(),
182 total_observed: buf.total,
183 }
184 })
185 .collect();
186 out.sort_by(|a, b| {
187 a.broadcast
188 .cmp(&b.broadcast)
189 .then_with(|| a.transport.cmp(&b.transport))
190 });
191 out
192 }
193
194 /// Clear every bucket. Test-oriented; no production caller
195 /// should need this.
196 #[cfg(test)]
197 pub(crate) fn clear(&self) {
198 self.buckets.write().clear();
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 #[test]
207 fn record_populates_snapshot_with_percentiles() {
208 let tracker = LatencyTracker::new();
209 // Monotonically increasing samples so p50 / p95 / p99 map
210 // cleanly to indices.
211 for ms in 1..=100u64 {
212 tracker.record("live/demo", "hls", ms);
213 }
214 let snap = tracker.snapshot();
215 assert_eq!(snap.len(), 1);
216 let e = &snap[0];
217 assert_eq!(e.broadcast, "live/demo");
218 assert_eq!(e.transport, "hls");
219 assert_eq!(e.sample_count, 100);
220 assert_eq!(e.total_observed, 100);
221 assert_eq!(e.p50_ms, 51);
222 assert_eq!(e.p95_ms, 96);
223 assert_eq!(e.p99_ms, 100);
224 assert_eq!(e.max_ms, 100);
225 }
226
227 #[test]
228 fn ring_buffer_evicts_oldest_past_cap() {
229 let tracker = LatencyTracker::new();
230 for ms in 1..=(MAX_SAMPLES_PER_KEY as u64 + 500) {
231 tracker.record("live/demo", "hls", ms);
232 }
233 let snap = tracker.snapshot();
234 let e = &snap[0];
235 assert_eq!(e.sample_count, MAX_SAMPLES_PER_KEY);
236 assert_eq!(e.total_observed, MAX_SAMPLES_PER_KEY as u64 + 500);
237 // Smallest sample in the retained window is at least
238 // (MAX+500 - MAX + 1) = 501.
239 assert!(e.p50_ms >= 501, "p50 should have shifted after eviction: {}", e.p50_ms);
240 }
241
242 #[test]
243 fn separate_keys_track_separately() {
244 let tracker = LatencyTracker::new();
245 tracker.record("live/demo", "hls", 100);
246 tracker.record("live/demo", "hls", 200);
247 tracker.record("live/demo", "ws", 50);
248 tracker.record("live/other", "hls", 10);
249
250 let snap = tracker.snapshot();
251 assert_eq!(snap.len(), 3);
252 // Sorted by (broadcast, transport).
253 assert_eq!(
254 (snap[0].broadcast.as_str(), snap[0].transport.as_str()),
255 ("live/demo", "hls")
256 );
257 assert_eq!(
258 (snap[1].broadcast.as_str(), snap[1].transport.as_str()),
259 ("live/demo", "ws")
260 );
261 assert_eq!(
262 (snap[2].broadcast.as_str(), snap[2].transport.as_str()),
263 ("live/other", "hls")
264 );
265 assert_eq!(snap[0].sample_count, 2);
266 assert_eq!(snap[1].sample_count, 1);
267 assert_eq!(snap[2].sample_count, 1);
268 }
269
270 #[test]
271 fn empty_tracker_snapshots_to_empty_vec() {
272 let tracker = LatencyTracker::new();
273 assert!(tracker.snapshot().is_empty());
274 }
275
276 #[test]
277 fn clear_resets_the_tracker() {
278 let tracker = LatencyTracker::new();
279 tracker.record("a", "hls", 10);
280 tracker.record("b", "hls", 20);
281 assert_eq!(tracker.snapshot().len(), 2);
282 tracker.clear();
283 assert!(tracker.snapshot().is_empty());
284 }
285}