1use std::collections::VecDeque;
10
11use crate::error::AnalyticsError;
12
13#[derive(Debug, Clone)]
17pub enum RealtimeEvent {
18 ViewerJoin {
20 viewer_id: String,
21 timestamp_ms: i64,
22 },
23 ViewerLeave {
25 viewer_id: String,
26 timestamp_ms: i64,
27 },
28 BitrateReport {
30 viewer_id: String,
31 timestamp_ms: i64,
32 bitrate_bps: u64,
33 },
34 BufferEvent {
36 viewer_id: String,
37 timestamp_ms: i64,
38 duration_ms: u32,
39 },
40}
41
42impl RealtimeEvent {
43 fn timestamp_ms(&self) -> i64 {
44 match self {
45 RealtimeEvent::ViewerJoin { timestamp_ms, .. } => *timestamp_ms,
46 RealtimeEvent::ViewerLeave { timestamp_ms, .. } => *timestamp_ms,
47 RealtimeEvent::BitrateReport { timestamp_ms, .. } => *timestamp_ms,
48 RealtimeEvent::BufferEvent { timestamp_ms, .. } => *timestamp_ms,
49 }
50 }
51}
52
53#[derive(Debug, Clone, Default)]
57pub struct BucketMetrics {
58 pub bucket_start_ms: i64,
60 pub peak_concurrent_viewers: u32,
62 pub avg_bitrate_bps: f64,
64 pub min_bitrate_bps: u64,
66 pub max_bitrate_bps: u64,
68 pub buffer_event_count: u32,
70 pub buffer_stall_ms: u64,
72 pub bitrate_sample_count: u32,
74}
75
76#[derive(Debug)]
85pub struct SlidingWindowAggregator {
86 window_duration_ms: i64,
88 bucket_ms: i64,
90 buckets: VecDeque<BucketMetrics>,
92 concurrent_viewers: i64,
94 latest_ms: i64,
96}
97
98impl SlidingWindowAggregator {
99 pub fn new(window_duration_ms: i64, bucket_ms: i64) -> Result<Self, AnalyticsError> {
103 if bucket_ms <= 0 || window_duration_ms <= 0 {
104 return Err(AnalyticsError::ConfigError(
105 "window and bucket duration must be positive".to_string(),
106 ));
107 }
108 if window_duration_ms < bucket_ms {
109 return Err(AnalyticsError::ConfigError(
110 "window_duration_ms must be >= bucket_ms".to_string(),
111 ));
112 }
113 Ok(Self {
114 window_duration_ms,
115 bucket_ms,
116 buckets: VecDeque::new(),
117 concurrent_viewers: 0,
118 latest_ms: i64::MIN,
119 })
120 }
121
122 pub fn ingest(&mut self, event: RealtimeEvent) {
128 let ts = event.timestamp_ms();
129 if self.latest_ms == i64::MIN {
130 self.latest_ms = ts;
131 } else {
132 self.latest_ms = self.latest_ms.max(ts);
133 }
134
135 let window_start = self.latest_ms - self.window_duration_ms;
137 while self
138 .buckets
139 .front()
140 .map(|b| b.bucket_start_ms + self.bucket_ms <= window_start)
141 .unwrap_or(false)
142 {
143 self.buckets.pop_front();
144 }
145
146 let bucket_start = ts - ts.rem_euclid(self.bucket_ms);
148 if bucket_start < window_start {
149 return;
151 }
152
153 let _bucket = self.get_or_create_bucket(bucket_start);
154
155 let new_concurrent = match &event {
157 RealtimeEvent::ViewerJoin { .. } => {
158 self.concurrent_viewers += 1;
159 Some(self.concurrent_viewers.max(0) as u32)
160 }
161 RealtimeEvent::ViewerLeave { .. } => {
162 self.concurrent_viewers = (self.concurrent_viewers - 1).max(0);
163 None
164 }
165 _ => None,
166 };
167
168 let bucket = self.get_or_create_bucket(bucket_start);
169
170 match &event {
171 RealtimeEvent::ViewerJoin { .. } => {
172 if let Some(c) = new_concurrent {
173 if c > bucket.peak_concurrent_viewers {
174 bucket.peak_concurrent_viewers = c;
175 }
176 }
177 }
178 RealtimeEvent::ViewerLeave { .. } => {}
179 RealtimeEvent::BitrateReport { bitrate_bps, .. } => {
180 let bps = *bitrate_bps;
181 bucket.bitrate_sample_count += 1;
182 let n = bucket.bitrate_sample_count as f64;
183 bucket.avg_bitrate_bps += (bps as f64 - bucket.avg_bitrate_bps) / n;
184 if bucket.min_bitrate_bps == 0 || bps < bucket.min_bitrate_bps {
185 bucket.min_bitrate_bps = bps;
186 }
187 if bps > bucket.max_bitrate_bps {
188 bucket.max_bitrate_bps = bps;
189 }
190 }
191 RealtimeEvent::BufferEvent { duration_ms, .. } => {
192 bucket.buffer_event_count += 1;
193 bucket.buffer_stall_ms += u64::from(*duration_ms);
194 }
195 }
196 }
197
198 pub fn buckets(&self) -> &VecDeque<BucketMetrics> {
200 &self.buckets
201 }
202
203 pub fn concurrent_viewers(&self) -> u32 {
205 self.concurrent_viewers.max(0) as u32
206 }
207
208 pub fn window_bitrate_stats(&self) -> (f64, u64, u64) {
213 let mut total_weight = 0u64;
214 let mut weighted_sum = 0.0f64;
215 let mut min_bps = u64::MAX;
216 let mut max_bps = 0u64;
217
218 for bucket in &self.buckets {
219 if bucket.bitrate_sample_count > 0 {
220 let w = bucket.bitrate_sample_count as u64;
221 total_weight += w;
222 weighted_sum += bucket.avg_bitrate_bps * w as f64;
223 if bucket.min_bitrate_bps < min_bps {
224 min_bps = bucket.min_bitrate_bps;
225 }
226 if bucket.max_bitrate_bps > max_bps {
227 max_bps = bucket.max_bitrate_bps;
228 }
229 }
230 }
231
232 if total_weight == 0 {
233 return (0.0, 0, 0);
234 }
235 (weighted_sum / total_weight as f64, min_bps, max_bps)
236 }
237
238 pub fn window_buffer_events(&self) -> u32 {
240 self.buckets.iter().map(|b| b.buffer_event_count).sum()
241 }
242
243 pub fn window_peak_concurrent(&self) -> u32 {
245 self.buckets
246 .iter()
247 .map(|b| b.peak_concurrent_viewers)
248 .max()
249 .unwrap_or(0)
250 }
251
252 fn get_or_create_bucket(&mut self, bucket_start: i64) -> &mut BucketMetrics {
255 if self
257 .buckets
258 .back()
259 .map(|b| b.bucket_start_ms == bucket_start)
260 .unwrap_or(false)
261 {
262 return self.buckets.back_mut().unwrap_or_else(|| {
263 unreachable!("back_mut after back returned Some")
265 });
266 }
267
268 let pos = self
270 .buckets
271 .iter()
272 .position(|b| b.bucket_start_ms == bucket_start);
273
274 if pos.is_none() {
275 let insert_pos = self
277 .buckets
278 .iter()
279 .position(|b| b.bucket_start_ms > bucket_start)
280 .unwrap_or(self.buckets.len());
281 self.buckets.insert(
282 insert_pos,
283 BucketMetrics {
284 bucket_start_ms: bucket_start,
285 ..Default::default()
286 },
287 );
288 }
289
290 let idx = self
295 .buckets
296 .iter()
297 .position(|b| b.bucket_start_ms == bucket_start)
298 .unwrap_or(self.buckets.len().saturating_sub(1));
299 &mut self.buckets[idx]
300 }
301}
302
303#[cfg(test)]
306mod tests {
307 use super::*;
308
309 fn aggregator() -> SlidingWindowAggregator {
310 SlidingWindowAggregator::new(60_000, 10_000).expect("new should succeed")
311 }
312
313 #[test]
316 fn aggregator_new_invalid_params() {
317 assert!(SlidingWindowAggregator::new(0, 1000).is_err());
318 assert!(SlidingWindowAggregator::new(1000, 0).is_err());
319 assert!(SlidingWindowAggregator::new(500, 1000).is_err());
320 }
321
322 #[test]
323 fn aggregator_new_valid() {
324 assert!(SlidingWindowAggregator::new(60_000, 10_000).is_ok());
325 }
326
327 #[test]
330 fn concurrent_viewers_join_leave() {
331 let mut agg = aggregator();
332 agg.ingest(RealtimeEvent::ViewerJoin {
333 viewer_id: "a".to_string(),
334 timestamp_ms: 1_000,
335 });
336 agg.ingest(RealtimeEvent::ViewerJoin {
337 viewer_id: "b".to_string(),
338 timestamp_ms: 2_000,
339 });
340 assert_eq!(agg.concurrent_viewers(), 2);
341 agg.ingest(RealtimeEvent::ViewerLeave {
342 viewer_id: "a".to_string(),
343 timestamp_ms: 3_000,
344 });
345 assert_eq!(agg.concurrent_viewers(), 1);
346 }
347
348 #[test]
349 fn concurrent_viewers_does_not_go_negative() {
350 let mut agg = aggregator();
351 agg.ingest(RealtimeEvent::ViewerLeave {
352 viewer_id: "ghost".to_string(),
353 timestamp_ms: 1_000,
354 });
355 assert_eq!(agg.concurrent_viewers(), 0);
356 }
357
358 #[test]
361 fn bitrate_stats_basic() {
362 let mut agg = aggregator();
363 for bps in [1_000_000u64, 2_000_000, 3_000_000] {
364 agg.ingest(RealtimeEvent::BitrateReport {
365 viewer_id: "v".to_string(),
366 timestamp_ms: 5_000,
367 bitrate_bps: bps,
368 });
369 }
370 let (avg, min, max) = agg.window_bitrate_stats();
371 assert!((avg - 2_000_000.0).abs() < 1.0, "avg={avg}");
372 assert_eq!(min, 1_000_000);
373 assert_eq!(max, 3_000_000);
374 }
375
376 #[test]
377 fn bitrate_stats_empty_window() {
378 let agg = aggregator();
379 assert_eq!(agg.window_bitrate_stats(), (0.0, 0, 0));
380 }
381
382 #[test]
385 fn buffer_events_counted() {
386 let mut agg = aggregator();
387 for i in 0..5 {
388 agg.ingest(RealtimeEvent::BufferEvent {
389 viewer_id: "v".to_string(),
390 timestamp_ms: i * 1_000 + 1_000,
391 duration_ms: 200,
392 });
393 }
394 assert_eq!(agg.window_buffer_events(), 5);
395 }
396
397 #[test]
400 fn window_evicts_old_buckets() {
401 let mut agg = SlidingWindowAggregator::new(20_000, 10_000).expect("new should succeed");
402 agg.ingest(RealtimeEvent::BitrateReport {
404 viewer_id: "v".to_string(),
405 timestamp_ms: 5_000,
406 bitrate_bps: 1_000_000,
407 });
408 agg.ingest(RealtimeEvent::BitrateReport {
410 viewer_id: "v".to_string(),
411 timestamp_ms: 35_000,
412 bitrate_bps: 2_000_000,
413 });
414 let (avg, _, _) = agg.window_bitrate_stats();
416 assert!((avg - 2_000_000.0).abs() < 1.0, "avg after eviction={avg}");
418 }
419
420 #[test]
423 fn peak_concurrent_tracked_per_bucket() {
424 let mut agg = aggregator();
425 for i in 0..5 {
426 agg.ingest(RealtimeEvent::ViewerJoin {
427 viewer_id: format!("v{i}"),
428 timestamp_ms: 5_000,
429 });
430 }
431 assert_eq!(agg.window_peak_concurrent(), 5);
432 }
433}