Skip to main content

oximedia_analytics/
realtime.rs

1//! Real-time analytics aggregation with sliding window metrics.
2//!
3//! Provides a time-bucketed sliding window that tracks concurrent viewers,
4//! bitrate statistics, and buffer event counts over a configurable window
5//! horizon and bucket granularity.
6//!
7//! All timestamps are Unix epoch milliseconds.
8
9use std::collections::VecDeque;
10
11use crate::error::AnalyticsError;
12
13// ─── Viewer event ─────────────────────────────────────────────────────────────
14
15/// A real-time analytics event representing one viewer action.
16#[derive(Debug, Clone)]
17pub enum RealtimeEvent {
18    /// A viewer started (or resumed) watching.
19    ViewerJoin {
20        viewer_id: String,
21        timestamp_ms: i64,
22    },
23    /// A viewer left (or paused / closed the player).
24    ViewerLeave {
25        viewer_id: String,
26        timestamp_ms: i64,
27    },
28    /// A bitrate sample from the player (bits per second).
29    BitrateReport {
30        viewer_id: String,
31        timestamp_ms: i64,
32        bitrate_bps: u64,
33    },
34    /// A buffering event with duration.
35    BufferEvent {
36        viewer_id: String,
37        timestamp_ms: i64,
38        duration_ms: u32,
39    },
40}
41
42impl RealtimeEvent {
43    fn timestamp_ms(&self) -> i64 {
44        match self {
45            RealtimeEvent::ViewerJoin { timestamp_ms, .. } => *timestamp_ms,
46            RealtimeEvent::ViewerLeave { timestamp_ms, .. } => *timestamp_ms,
47            RealtimeEvent::BitrateReport { timestamp_ms, .. } => *timestamp_ms,
48            RealtimeEvent::BufferEvent { timestamp_ms, .. } => *timestamp_ms,
49        }
50    }
51}
52
53// ─── Per-bucket aggregation ───────────────────────────────────────────────────
54
55/// Aggregated metrics for one time bucket.
56#[derive(Debug, Clone, Default)]
57pub struct BucketMetrics {
58    /// Start of this bucket (epoch ms).
59    pub bucket_start_ms: i64,
60    /// Peak concurrent-viewer count observed within this bucket.
61    pub peak_concurrent_viewers: u32,
62    /// Average bitrate across all samples in this bucket (bps, 0 if none).
63    pub avg_bitrate_bps: f64,
64    /// Minimum bitrate sample in this bucket (0 if no samples).
65    pub min_bitrate_bps: u64,
66    /// Maximum bitrate sample in this bucket (0 if no samples).
67    pub max_bitrate_bps: u64,
68    /// Total number of buffer events in this bucket.
69    pub buffer_event_count: u32,
70    /// Total buffer stall time in this bucket (ms).
71    pub buffer_stall_ms: u64,
72    /// Number of bitrate samples contributing to avg/min/max.
73    pub bitrate_sample_count: u32,
74}
75
76// ─── Sliding window aggregator ────────────────────────────────────────────────
77
78/// A sliding window analytics aggregator for real-time media metrics.
79///
80/// Events are ingested in order via [`SlidingWindowAggregator::ingest`].  The
81/// aggregator maintains a rolling window of `window_duration_ms` worth of
82/// time buckets, each `bucket_ms` wide.  Old buckets outside the window are
83/// automatically evicted.
84#[derive(Debug)]
85pub struct SlidingWindowAggregator {
86    /// Window duration in milliseconds.
87    window_duration_ms: i64,
88    /// Bucket width in milliseconds.
89    bucket_ms: i64,
90    /// Ordered queue of active buckets (front = oldest).
91    buckets: VecDeque<BucketMetrics>,
92    /// Current concurrent viewer count (active joins minus leaves).
93    concurrent_viewers: i64,
94    /// Watermark: the latest timestamp seen.
95    latest_ms: i64,
96}
97
98impl SlidingWindowAggregator {
99    /// Create a new aggregator.
100    ///
101    /// Returns an error if `window_duration_ms < bucket_ms` or either is zero.
102    pub fn new(window_duration_ms: i64, bucket_ms: i64) -> Result<Self, AnalyticsError> {
103        if bucket_ms <= 0 || window_duration_ms <= 0 {
104            return Err(AnalyticsError::ConfigError(
105                "window and bucket duration must be positive".to_string(),
106            ));
107        }
108        if window_duration_ms < bucket_ms {
109            return Err(AnalyticsError::ConfigError(
110                "window_duration_ms must be >= bucket_ms".to_string(),
111            ));
112        }
113        Ok(Self {
114            window_duration_ms,
115            bucket_ms,
116            buckets: VecDeque::new(),
117            concurrent_viewers: 0,
118            latest_ms: i64::MIN,
119        })
120    }
121
122    /// Ingest a real-time event.
123    ///
124    /// Events should be delivered roughly in time order; out-of-order events
125    /// within the current bucket are merged correctly, but events older than
126    /// the window start are silently dropped.
127    pub fn ingest(&mut self, event: RealtimeEvent) {
128        let ts = event.timestamp_ms();
129        if self.latest_ms == i64::MIN {
130            self.latest_ms = ts;
131        } else {
132            self.latest_ms = self.latest_ms.max(ts);
133        }
134
135        // Evict expired buckets.
136        let window_start = self.latest_ms - self.window_duration_ms;
137        while self
138            .buckets
139            .front()
140            .map(|b| b.bucket_start_ms + self.bucket_ms <= window_start)
141            .unwrap_or(false)
142        {
143            self.buckets.pop_front();
144        }
145
146        // Find or create the bucket for this timestamp.
147        let bucket_start = ts - ts.rem_euclid(self.bucket_ms);
148        if bucket_start < window_start {
149            // Event is too old; drop it.
150            return;
151        }
152
153        let _bucket = self.get_or_create_bucket(bucket_start);
154
155        // Update concurrent_viewers for join/leave before borrowing bucket.
156        let new_concurrent = match &event {
157            RealtimeEvent::ViewerJoin { .. } => {
158                self.concurrent_viewers += 1;
159                Some(self.concurrent_viewers.max(0) as u32)
160            }
161            RealtimeEvent::ViewerLeave { .. } => {
162                self.concurrent_viewers = (self.concurrent_viewers - 1).max(0);
163                None
164            }
165            _ => None,
166        };
167
168        let bucket = self.get_or_create_bucket(bucket_start);
169
170        match &event {
171            RealtimeEvent::ViewerJoin { .. } => {
172                if let Some(c) = new_concurrent {
173                    if c > bucket.peak_concurrent_viewers {
174                        bucket.peak_concurrent_viewers = c;
175                    }
176                }
177            }
178            RealtimeEvent::ViewerLeave { .. } => {}
179            RealtimeEvent::BitrateReport { bitrate_bps, .. } => {
180                let bps = *bitrate_bps;
181                bucket.bitrate_sample_count += 1;
182                let n = bucket.bitrate_sample_count as f64;
183                bucket.avg_bitrate_bps += (bps as f64 - bucket.avg_bitrate_bps) / n;
184                if bucket.min_bitrate_bps == 0 || bps < bucket.min_bitrate_bps {
185                    bucket.min_bitrate_bps = bps;
186                }
187                if bps > bucket.max_bitrate_bps {
188                    bucket.max_bitrate_bps = bps;
189                }
190            }
191            RealtimeEvent::BufferEvent { duration_ms, .. } => {
192                bucket.buffer_event_count += 1;
193                bucket.buffer_stall_ms += u64::from(*duration_ms);
194            }
195        }
196    }
197
198    /// Return a snapshot of all active buckets (oldest first).
199    pub fn buckets(&self) -> &VecDeque<BucketMetrics> {
200        &self.buckets
201    }
202
203    /// Current instantaneous concurrent viewer count.
204    pub fn concurrent_viewers(&self) -> u32 {
205        self.concurrent_viewers.max(0) as u32
206    }
207
208    /// Aggregate bitrate statistics across all active buckets.
209    ///
210    /// Returns `(avg_bps, min_bps, max_bps)`.  Returns `(0.0, 0, 0)` if no
211    /// bitrate samples exist in the window.
212    pub fn window_bitrate_stats(&self) -> (f64, u64, u64) {
213        let mut total_weight = 0u64;
214        let mut weighted_sum = 0.0f64;
215        let mut min_bps = u64::MAX;
216        let mut max_bps = 0u64;
217
218        for bucket in &self.buckets {
219            if bucket.bitrate_sample_count > 0 {
220                let w = bucket.bitrate_sample_count as u64;
221                total_weight += w;
222                weighted_sum += bucket.avg_bitrate_bps * w as f64;
223                if bucket.min_bitrate_bps < min_bps {
224                    min_bps = bucket.min_bitrate_bps;
225                }
226                if bucket.max_bitrate_bps > max_bps {
227                    max_bps = bucket.max_bitrate_bps;
228                }
229            }
230        }
231
232        if total_weight == 0 {
233            return (0.0, 0, 0);
234        }
235        (weighted_sum / total_weight as f64, min_bps, max_bps)
236    }
237
238    /// Total buffer events in the current window.
239    pub fn window_buffer_events(&self) -> u32 {
240        self.buckets.iter().map(|b| b.buffer_event_count).sum()
241    }
242
243    /// Peak concurrent viewers across all active buckets.
244    pub fn window_peak_concurrent(&self) -> u32 {
245        self.buckets
246            .iter()
247            .map(|b| b.peak_concurrent_viewers)
248            .max()
249            .unwrap_or(0)
250    }
251
252    // ── Helpers ───────────────────────────────────────────────────────────────
253
254    fn get_or_create_bucket(&mut self, bucket_start: i64) -> &mut BucketMetrics {
255        // Check if the newest bucket matches.
256        if self
257            .buckets
258            .back()
259            .map(|b| b.bucket_start_ms == bucket_start)
260            .unwrap_or(false)
261        {
262            return self.buckets.back_mut().unwrap_or_else(|| {
263                // Unreachable but needed for type safety.
264                unreachable!("back_mut after back returned Some")
265            });
266        }
267
268        // Find existing bucket or insert new one at the right position.
269        let pos = self
270            .buckets
271            .iter()
272            .position(|b| b.bucket_start_ms == bucket_start);
273
274        if pos.is_none() {
275            // Insert in sorted order.
276            let insert_pos = self
277                .buckets
278                .iter()
279                .position(|b| b.bucket_start_ms > bucket_start)
280                .unwrap_or(self.buckets.len());
281            self.buckets.insert(
282                insert_pos,
283                BucketMetrics {
284                    bucket_start_ms: bucket_start,
285                    ..Default::default()
286                },
287            );
288        }
289
290        // Now find and return the mutable reference.
291        // Safety: we just inserted or confirmed the bucket exists above,
292        // so this position lookup will always succeed. Use saturating
293        // fallback to last element if somehow not found.
294        let idx = self
295            .buckets
296            .iter()
297            .position(|b| b.bucket_start_ms == bucket_start)
298            .unwrap_or(self.buckets.len().saturating_sub(1));
299        &mut self.buckets[idx]
300    }
301}
302
303// ─── Tests ────────────────────────────────────────────────────────────────────
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308
309    fn aggregator() -> SlidingWindowAggregator {
310        SlidingWindowAggregator::new(60_000, 10_000).expect("new should succeed")
311    }
312
313    // ── constructor ──────────────────────────────────────────────────────────
314
315    #[test]
316    fn aggregator_new_invalid_params() {
317        assert!(SlidingWindowAggregator::new(0, 1000).is_err());
318        assert!(SlidingWindowAggregator::new(1000, 0).is_err());
319        assert!(SlidingWindowAggregator::new(500, 1000).is_err());
320    }
321
322    #[test]
323    fn aggregator_new_valid() {
324        assert!(SlidingWindowAggregator::new(60_000, 10_000).is_ok());
325    }
326
327    // ── concurrent viewers ───────────────────────────────────────────────────
328
329    #[test]
330    fn concurrent_viewers_join_leave() {
331        let mut agg = aggregator();
332        agg.ingest(RealtimeEvent::ViewerJoin {
333            viewer_id: "a".to_string(),
334            timestamp_ms: 1_000,
335        });
336        agg.ingest(RealtimeEvent::ViewerJoin {
337            viewer_id: "b".to_string(),
338            timestamp_ms: 2_000,
339        });
340        assert_eq!(agg.concurrent_viewers(), 2);
341        agg.ingest(RealtimeEvent::ViewerLeave {
342            viewer_id: "a".to_string(),
343            timestamp_ms: 3_000,
344        });
345        assert_eq!(agg.concurrent_viewers(), 1);
346    }
347
348    #[test]
349    fn concurrent_viewers_does_not_go_negative() {
350        let mut agg = aggregator();
351        agg.ingest(RealtimeEvent::ViewerLeave {
352            viewer_id: "ghost".to_string(),
353            timestamp_ms: 1_000,
354        });
355        assert_eq!(agg.concurrent_viewers(), 0);
356    }
357
358    // ── bitrate stats ────────────────────────────────────────────────────────
359
360    #[test]
361    fn bitrate_stats_basic() {
362        let mut agg = aggregator();
363        for bps in [1_000_000u64, 2_000_000, 3_000_000] {
364            agg.ingest(RealtimeEvent::BitrateReport {
365                viewer_id: "v".to_string(),
366                timestamp_ms: 5_000,
367                bitrate_bps: bps,
368            });
369        }
370        let (avg, min, max) = agg.window_bitrate_stats();
371        assert!((avg - 2_000_000.0).abs() < 1.0, "avg={avg}");
372        assert_eq!(min, 1_000_000);
373        assert_eq!(max, 3_000_000);
374    }
375
376    #[test]
377    fn bitrate_stats_empty_window() {
378        let agg = aggregator();
379        assert_eq!(agg.window_bitrate_stats(), (0.0, 0, 0));
380    }
381
382    // ── buffer events ────────────────────────────────────────────────────────
383
384    #[test]
385    fn buffer_events_counted() {
386        let mut agg = aggregator();
387        for i in 0..5 {
388            agg.ingest(RealtimeEvent::BufferEvent {
389                viewer_id: "v".to_string(),
390                timestamp_ms: i * 1_000 + 1_000,
391                duration_ms: 200,
392            });
393        }
394        assert_eq!(agg.window_buffer_events(), 5);
395    }
396
397    // ── window eviction ──────────────────────────────────────────────────────
398
399    #[test]
400    fn window_evicts_old_buckets() {
401        let mut agg = SlidingWindowAggregator::new(20_000, 10_000).expect("new should succeed");
402        // Bucket at t=0–10s.
403        agg.ingest(RealtimeEvent::BitrateReport {
404            viewer_id: "v".to_string(),
405            timestamp_ms: 5_000,
406            bitrate_bps: 1_000_000,
407        });
408        // Advance time beyond window: t=30s → bucket at t=0 should evict.
409        agg.ingest(RealtimeEvent::BitrateReport {
410            viewer_id: "v".to_string(),
411            timestamp_ms: 35_000,
412            bitrate_bps: 2_000_000,
413        });
414        // Only the recent bucket should remain.
415        let (avg, _, _) = agg.window_bitrate_stats();
416        // avg should be 2_000_000 (old bucket evicted).
417        assert!((avg - 2_000_000.0).abs() < 1.0, "avg after eviction={avg}");
418    }
419
420    // ── peak concurrent ──────────────────────────────────────────────────────
421
422    #[test]
423    fn peak_concurrent_tracked_per_bucket() {
424        let mut agg = aggregator();
425        for i in 0..5 {
426            agg.ingest(RealtimeEvent::ViewerJoin {
427                viewer_id: format!("v{i}"),
428                timestamp_ms: 5_000,
429            });
430        }
431        assert_eq!(agg.window_peak_concurrent(), 5);
432    }
433}