Skip to main content

lvqr_admin/
slo.rs

1//! Latency SLO tracker for the `/api/v1/slo` admin route.
2//!
3//! Tier 4 item 4.7 session A. Records per-subscriber glass-to-glass
4//! latency samples on every egress fragment delivery and exposes a
5//! percentile snapshot for the admin JSON route + a Prometheus
6//! histogram (`lvqr_subscriber_glass_to_glass_ms`) for long-term
7//! observability.
8//!
9//! # Shape
10//!
11//! The tracker is a shared `Arc<LatencyTracker>` handed to every
12//! egress surface that wants to contribute samples. Each
13//! `record(broadcast, transport, latency_ms)` call:
14//!
15//! 1. Fires a `metrics::histogram!` value on the process-wide
16//!    metrics recorder (one metric per sample, tagged with
17//!    `broadcast` + `transport` labels). The default Prometheus
18//!    histogram bucket layout covers the 0..=60_000 ms range we
19//!    care about for live video.
20//! 2. Pushes the latency sample into a per-(`broadcast`, `transport`)
21//!    ring buffer of up to `MAX_SAMPLES_PER_KEY` samples. The ring
22//!    buffer is the source of truth for the JSON `/api/v1/slo`
23//!    route; we compute p50 / p95 / p99 on demand by sorting the
24//!    buffer (cheap: 1024 u64 sort is ~10 us on a modern host, the
25//!    route is low-QPS).
26//!
27//! # Anti-scope (107 A)
28//!
29//! * **Streaming-quantile estimators (CKMS, HDR, etc.)**. Simple
30//!   fixed-size ring buffer + sort-on-query is fine for the admin
31//!   route's low-QPS use case and avoids a new dep.
32//! * **Per-subscriber breakdowns**. The tracker keys on
33//!   `(broadcast, transport)` so admin operators see the aggregate
34//!   picture per egress surface; per-subscriber latency drilldown
35//!   is a future Grafana-side query atop the histogram output.
36//! * **Time-windowed retention**. The ring buffer is size-bounded,
37//!   not time-bounded. Samples are FIFO-evicted regardless of age;
38//!   a busy broadcast keeps ~1024 recent samples, a quiet one keeps
39//!   older but still-relevant samples until new traffic arrives.
40
41use std::sync::Arc;
42
43use parking_lot::RwLock;
44use serde::{Deserialize, Serialize};
45
46/// Cap on how many samples we keep per (broadcast, transport) key.
47/// 1024 is a compromise: big enough that p99 is statistically
48/// meaningful (10 tail samples in the window), small enough that the
49/// sort-on-query path stays cheap.
50const MAX_SAMPLES_PER_KEY: usize = 1024;
51
52/// Combined key for a single tracker bucket. Kept separate from
53/// `SloEntry`'s shape so the tracker can hash/compare the key
54/// without double-cloning strings on every `record` call.
55#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56struct TrackerKey {
57    broadcast: String,
58    transport: String,
59}
60
61/// Ring buffer of latency samples. Oldest samples evict first.
62#[derive(Default)]
63struct SampleBuffer {
64    samples: Vec<u64>,
65    next_index: usize,
66    total: u64,
67}
68
69impl SampleBuffer {
70    fn push(&mut self, sample: u64) {
71        if self.samples.len() < MAX_SAMPLES_PER_KEY {
72            self.samples.push(sample);
73        } else {
74            self.samples[self.next_index] = sample;
75            self.next_index = (self.next_index + 1) % MAX_SAMPLES_PER_KEY;
76        }
77        self.total = self.total.saturating_add(1);
78    }
79
80    /// Compute p50 / p95 / p99 / max over the retained samples.
81    /// Returns all-zero values when the buffer is empty.
82    fn percentiles(&self) -> (u64, u64, u64, u64) {
83        if self.samples.is_empty() {
84            return (0, 0, 0, 0);
85        }
86        let mut sorted: Vec<u64> = self.samples.clone();
87        sorted.sort_unstable();
88        let n = sorted.len();
89        let p50 = sorted[(n * 50 / 100).min(n - 1)];
90        let p95 = sorted[(n * 95 / 100).min(n - 1)];
91        let p99 = sorted[(n * 99 / 100).min(n - 1)];
92        let max = *sorted.last().unwrap();
93        (p50, p95, p99, max)
94    }
95}
96
97/// Per-broadcast + per-transport latency snapshot returned by
98/// [`LatencyTracker::snapshot`] and serialized on the
99/// `GET /api/v1/slo` route.
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
101pub struct SloEntry {
102    /// Broadcast name (e.g. `"live/demo"`).
103    pub broadcast: String,
104    /// Egress surface: `"hls"`, `"ws"`, `"dash"`, `"moq"`, etc.
105    pub transport: String,
106    /// 50th percentile latency in milliseconds across the retained
107    /// sample window.
108    pub p50_ms: u64,
109    /// 95th percentile latency.
110    pub p95_ms: u64,
111    /// 99th percentile latency.
112    pub p99_ms: u64,
113    /// Peak observed latency in the retained sample window.
114    pub max_ms: u64,
115    /// Count of samples retained in the ring buffer (`<=
116    /// MAX_SAMPLES_PER_KEY`).
117    pub sample_count: usize,
118    /// Total samples ever observed for this key since tracker
119    /// construction (not bounded by the ring buffer).
120    pub total_observed: u64,
121}
122
123/// Thread-safe latency SLO tracker. Cheap to clone (internal state
124/// is behind `Arc`).
125#[derive(Clone, Default)]
126pub struct LatencyTracker {
127    buckets: Arc<RwLock<std::collections::HashMap<TrackerKey, SampleBuffer>>>,
128}
129
130impl LatencyTracker {
131    /// Build a fresh tracker with no samples. Cheap; the typical
132    /// caller constructs one per server and clones the handle out
133    /// to every egress surface + the admin state.
134    pub fn new() -> Self {
135        Self::default()
136    }
137
138    /// Record one latency sample. `broadcast` is the source
139    /// broadcast name (e.g. `"live/demo"`); `transport` is the
140    /// egress surface producing the delivery (e.g. `"hls"`,
141    /// `"ws"`). `latency_ms` is the UNIX-wall-clock delta between
142    /// the ingest-side fragment stamp and the egress-side emit
143    /// point. A sample of `0` is still recorded -- zero latency is
144    /// valid (same-tick delivery) and we do not want to conflate
145    /// that with an unset ingest timestamp (the caller is expected
146    /// to skip the call entirely when the ingest-time stamp is
147    /// missing).
148    pub fn record(&self, broadcast: &str, transport: &str, latency_ms: u64) {
149        metrics::histogram!(
150            "lvqr_subscriber_glass_to_glass_ms",
151            "broadcast" => broadcast.to_string(),
152            "transport" => transport.to_string(),
153        )
154        .record(latency_ms as f64);
155
156        let key = TrackerKey {
157            broadcast: broadcast.to_string(),
158            transport: transport.to_string(),
159        };
160        let mut guard = self.buckets.write();
161        guard.entry(key).or_default().push(latency_ms);
162    }
163
164    /// Snapshot every tracked `(broadcast, transport)` key with the
165    /// current p50 / p95 / p99 / max + sample counts. Sorted by
166    /// broadcast then transport so the admin route's JSON output is
167    /// deterministic.
168    pub fn snapshot(&self) -> Vec<SloEntry> {
169        let guard = self.buckets.read();
170        let mut out: Vec<SloEntry> = guard
171            .iter()
172            .map(|(key, buf)| {
173                let (p50, p95, p99, max) = buf.percentiles();
174                SloEntry {
175                    broadcast: key.broadcast.clone(),
176                    transport: key.transport.clone(),
177                    p50_ms: p50,
178                    p95_ms: p95,
179                    p99_ms: p99,
180                    max_ms: max,
181                    sample_count: buf.samples.len(),
182                    total_observed: buf.total,
183                }
184            })
185            .collect();
186        out.sort_by(|a, b| {
187            a.broadcast
188                .cmp(&b.broadcast)
189                .then_with(|| a.transport.cmp(&b.transport))
190        });
191        out
192    }
193
194    /// Clear every bucket. Test-oriented; no production caller
195    /// should need this.
196    #[cfg(test)]
197    pub(crate) fn clear(&self) {
198        self.buckets.write().clear();
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    #[test]
207    fn record_populates_snapshot_with_percentiles() {
208        let tracker = LatencyTracker::new();
209        // Monotonically increasing samples so p50 / p95 / p99 map
210        // cleanly to indices.
211        for ms in 1..=100u64 {
212            tracker.record("live/demo", "hls", ms);
213        }
214        let snap = tracker.snapshot();
215        assert_eq!(snap.len(), 1);
216        let e = &snap[0];
217        assert_eq!(e.broadcast, "live/demo");
218        assert_eq!(e.transport, "hls");
219        assert_eq!(e.sample_count, 100);
220        assert_eq!(e.total_observed, 100);
221        assert_eq!(e.p50_ms, 51);
222        assert_eq!(e.p95_ms, 96);
223        assert_eq!(e.p99_ms, 100);
224        assert_eq!(e.max_ms, 100);
225    }
226
227    #[test]
228    fn ring_buffer_evicts_oldest_past_cap() {
229        let tracker = LatencyTracker::new();
230        for ms in 1..=(MAX_SAMPLES_PER_KEY as u64 + 500) {
231            tracker.record("live/demo", "hls", ms);
232        }
233        let snap = tracker.snapshot();
234        let e = &snap[0];
235        assert_eq!(e.sample_count, MAX_SAMPLES_PER_KEY);
236        assert_eq!(e.total_observed, MAX_SAMPLES_PER_KEY as u64 + 500);
237        // Smallest sample in the retained window is at least
238        // (MAX+500 - MAX + 1) = 501.
239        assert!(e.p50_ms >= 501, "p50 should have shifted after eviction: {}", e.p50_ms);
240    }
241
242    #[test]
243    fn separate_keys_track_separately() {
244        let tracker = LatencyTracker::new();
245        tracker.record("live/demo", "hls", 100);
246        tracker.record("live/demo", "hls", 200);
247        tracker.record("live/demo", "ws", 50);
248        tracker.record("live/other", "hls", 10);
249
250        let snap = tracker.snapshot();
251        assert_eq!(snap.len(), 3);
252        // Sorted by (broadcast, transport).
253        assert_eq!(
254            (snap[0].broadcast.as_str(), snap[0].transport.as_str()),
255            ("live/demo", "hls")
256        );
257        assert_eq!(
258            (snap[1].broadcast.as_str(), snap[1].transport.as_str()),
259            ("live/demo", "ws")
260        );
261        assert_eq!(
262            (snap[2].broadcast.as_str(), snap[2].transport.as_str()),
263            ("live/other", "hls")
264        );
265        assert_eq!(snap[0].sample_count, 2);
266        assert_eq!(snap[1].sample_count, 1);
267        assert_eq!(snap[2].sample_count, 1);
268    }
269
270    #[test]
271    fn empty_tracker_snapshots_to_empty_vec() {
272        let tracker = LatencyTracker::new();
273        assert!(tracker.snapshot().is_empty());
274    }
275
276    #[test]
277    fn clear_resets_the_tracker() {
278        let tracker = LatencyTracker::new();
279        tracker.record("a", "hls", 10);
280        tracker.record("b", "hls", 20);
281        assert_eq!(tracker.snapshot().len(), 2);
282        tracker.clear();
283        assert!(tracker.snapshot().is_empty());
284    }
285}