1use rayon::prelude::*;
5
6#[derive(Debug, Clone, PartialEq)]
8pub enum PlaybackEvent {
9 Play { timestamp_ms: i64 },
11 Pause { timestamp_ms: i64, position_ms: u64 },
13 Seek { from_ms: u64, to_ms: u64 },
15 BufferStart { position_ms: u64 },
17 BufferEnd { position_ms: u64, duration_ms: u32 },
19 QualityChange {
21 from_height: u32,
22 to_height: u32,
23 bitrate: u32,
24 },
25 End {
27 position_ms: u64,
28 watch_duration_ms: u64,
29 },
30}
31
32#[derive(Debug, Clone)]
34pub struct ViewerSession {
35 pub session_id: String,
36 pub user_id: Option<String>,
37 pub content_id: String,
38 pub started_at_ms: i64,
40 pub events: Vec<PlaybackEvent>,
41}
42
43impl ViewerSession {
44 pub fn new(
46 session_id: impl Into<String>,
47 user_id: Option<String>,
48 content_id: impl Into<String>,
49 started_at_ms: i64,
50 ) -> Self {
51 Self {
52 session_id: session_id.into(),
53 user_id,
54 content_id: content_id.into(),
55 started_at_ms,
56 events: Vec::new(),
57 }
58 }
59
60 pub fn push_event(&mut self, event: PlaybackEvent) {
62 self.events.push(event);
63 }
64}
65
66#[derive(Debug, Clone, PartialEq)]
68pub struct SessionMetrics {
69 pub total_watch_ms: u64,
71 pub unique_positions_watched: u64,
73 pub seek_count: u32,
75 pub buffer_events: u32,
77 pub buffer_time_ms: u64,
79 pub quality_changes: u32,
81 pub completion_pct: f32,
83}
84
85pub fn analyze_session(session: &ViewerSession, content_duration_ms: u64) -> SessionMetrics {
90 let mut seek_count: u32 = 0;
91 let mut buffer_events: u32 = 0;
92 let mut buffer_time_ms: u64 = 0;
93 let mut quality_changes: u32 = 0;
94 let mut max_position_ms: u64 = 0;
95 let mut total_watch_ms: u64 = 0;
96
97 let mut last_end_watch_ms: Option<u64> = None;
101
102 for event in &session.events {
103 match event {
104 PlaybackEvent::Seek { from_ms, to_ms } => {
105 seek_count += 1;
106 max_position_ms = max_position_ms.max(*from_ms).max(*to_ms);
107 }
108 PlaybackEvent::BufferStart { position_ms } => {
109 buffer_events += 1;
110 max_position_ms = max_position_ms.max(*position_ms);
111 }
112 PlaybackEvent::BufferEnd {
113 position_ms,
114 duration_ms,
115 } => {
116 buffer_time_ms += u64::from(*duration_ms);
117 max_position_ms = max_position_ms.max(*position_ms);
118 }
119 PlaybackEvent::QualityChange { .. } => {
120 quality_changes += 1;
121 }
122 PlaybackEvent::End {
123 position_ms,
124 watch_duration_ms,
125 } => {
126 max_position_ms = max_position_ms.max(*position_ms);
127 last_end_watch_ms = Some(*watch_duration_ms);
128 total_watch_ms = total_watch_ms.max(*watch_duration_ms);
129 }
130 PlaybackEvent::Pause { position_ms, .. } => {
131 max_position_ms = max_position_ms.max(*position_ms);
132 }
133 PlaybackEvent::Play { .. } => {}
134 }
135 }
136
137 if let Some(w) = last_end_watch_ms {
138 total_watch_ms = w;
139 }
140
141 let map = build_playback_map(session, content_duration_ms);
143 let unique_positions_watched = if content_duration_ms > 0 {
144 map.positions_watched.iter().filter(|&&b| b).count() as u64
145 } else {
146 0
147 };
148
149 let completion_pct = if content_duration_ms > 0 {
150 (max_position_ms as f32 / content_duration_ms as f32 * 100.0).min(100.0)
151 } else {
152 0.0
153 };
154
155 SessionMetrics {
156 total_watch_ms,
157 unique_positions_watched,
158 seek_count,
159 buffer_events,
160 buffer_time_ms,
161 quality_changes,
162 completion_pct,
163 }
164}
165
166#[derive(Debug, Clone)]
169pub struct PlaybackMap {
170 pub positions_watched: Vec<bool>,
172}
173
174impl PlaybackMap {
175 pub fn mark_range(&mut self, start_ms: u64, end_ms: u64) {
177 if start_ms >= end_ms || self.positions_watched.is_empty() {
178 return;
179 }
180 let start_sec = (start_ms / 1000) as usize;
181 let end_sec = ((end_ms + 999) / 1000) as usize; let cap = self.positions_watched.len();
183 let end_sec = end_sec.min(cap);
184 for i in start_sec..end_sec {
185 self.positions_watched[i] = true;
186 }
187 }
188
189 pub fn coverage_pct(&self, total_ms: u64) -> f32 {
191 if total_ms == 0 || self.positions_watched.is_empty() {
192 return 0.0;
193 }
194 let total_sec = ((total_ms + 999) / 1000) as usize;
195 let total_sec = total_sec.min(self.positions_watched.len());
196 if total_sec == 0 {
197 return 0.0;
198 }
199 let watched = self.positions_watched[..total_sec]
200 .iter()
201 .filter(|&&b| b)
202 .count();
203 watched as f32 / total_sec as f32
204 }
205}
206
207pub fn analyze_sessions_batch(
212 sessions: &[ViewerSession],
213 content_duration_ms: u64,
214) -> Vec<SessionMetrics> {
215 sessions
216 .par_iter()
217 .map(|s| analyze_session(s, content_duration_ms))
218 .collect()
219}
220
221pub fn build_playback_map(session: &ViewerSession, content_duration_ms: u64) -> PlaybackMap {
228 let num_seconds = if content_duration_ms > 0 {
229 ((content_duration_ms + 999) / 1000) as usize
230 } else {
231 let max_pos = session.events.iter().fold(0u64, |acc, e| match e {
233 PlaybackEvent::Pause { position_ms, .. } => acc.max(*position_ms),
234 PlaybackEvent::Seek { from_ms, to_ms } => acc.max(*from_ms).max(*to_ms),
235 PlaybackEvent::BufferStart { position_ms } => acc.max(*position_ms),
236 PlaybackEvent::BufferEnd { position_ms, .. } => acc.max(*position_ms),
237 PlaybackEvent::End { position_ms, .. } => acc.max(*position_ms),
238 _ => acc,
239 });
240 if max_pos == 0 {
241 return PlaybackMap {
242 positions_watched: Vec::new(),
243 };
244 }
245 ((max_pos + 999) / 1000) as usize + 1
246 };
247
248 let mut map = PlaybackMap {
249 positions_watched: vec![false; num_seconds],
250 };
251
252 let mut playing = false;
254 let mut play_start_pos: u64 = 0;
255 let mut current_pos: u64 = 0;
256
257 for event in &session.events {
258 match event {
259 PlaybackEvent::Play { .. } => {
260 playing = true;
261 play_start_pos = current_pos;
262 }
263 PlaybackEvent::Pause { position_ms, .. } => {
264 if playing {
265 map.mark_range(play_start_pos, *position_ms);
266 playing = false;
267 }
268 current_pos = *position_ms;
269 }
270 PlaybackEvent::Seek { from_ms, to_ms } => {
271 if playing {
272 map.mark_range(play_start_pos, *from_ms);
273 }
274 current_pos = *to_ms;
275 if playing {
276 play_start_pos = *to_ms;
277 }
278 }
279 PlaybackEvent::BufferStart { position_ms } => {
280 if playing {
281 map.mark_range(play_start_pos, *position_ms);
282 play_start_pos = *position_ms;
284 }
285 current_pos = *position_ms;
286 }
287 PlaybackEvent::BufferEnd { position_ms, .. } => {
288 current_pos = *position_ms;
289 if playing {
290 play_start_pos = *position_ms;
291 }
292 }
293 PlaybackEvent::End { position_ms, .. } => {
294 if playing {
295 map.mark_range(play_start_pos, *position_ms);
296 playing = false;
297 }
298 current_pos = *position_ms;
299 }
300 PlaybackEvent::QualityChange { .. } => {}
301 }
302 }
303
304 if playing && current_pos > play_start_pos {
306 map.mark_range(play_start_pos, current_pos);
307 }
308
309 map
310}
311
312#[derive(Debug, Clone, PartialEq)]
314pub struct HeatPoint {
315 pub position_ms: u64,
317 pub intensity: f32,
319}
320
321pub fn attention_heatmap(
328 sessions: &[ViewerSession],
329 content_duration_ms: u64,
330 bucket_ms: u32,
331) -> Vec<HeatPoint> {
332 if sessions.is_empty() || content_duration_ms == 0 || bucket_ms == 0 {
333 return Vec::new();
334 }
335
336 let bucket_ms_u64 = u64::from(bucket_ms);
337 let num_buckets = ((content_duration_ms + bucket_ms_u64 - 1) / bucket_ms_u64) as usize;
338 let mut counts = vec![0u32; num_buckets];
339
340 for session in sessions {
341 let map = build_playback_map(session, content_duration_ms);
342 for (bucket_idx, count) in counts.iter_mut().enumerate() {
344 let bucket_start_ms = bucket_idx as u64 * bucket_ms_u64;
345 let bucket_end_ms = (bucket_start_ms + bucket_ms_u64).min(content_duration_ms);
346 let start_sec = (bucket_start_ms / 1000) as usize;
347 let end_sec = ((bucket_end_ms + 999) / 1000) as usize;
348 let end_sec = end_sec.min(map.positions_watched.len());
349 let watched = (start_sec..end_sec)
350 .any(|s| map.positions_watched.get(s).copied().unwrap_or(false));
351 if watched {
352 *count += 1;
353 }
354 }
355 }
356
357 let max_count = counts.iter().copied().max().unwrap_or(0);
358 if max_count == 0 {
359 return Vec::new();
360 }
361
362 counts
363 .into_iter()
364 .enumerate()
365 .map(|(idx, c)| HeatPoint {
366 position_ms: idx as u64 * bucket_ms_u64,
367 intensity: c as f32 / max_count as f32,
368 })
369 .collect()
370}
371
372#[derive(Debug, Clone)]
379pub struct ReservoirHeatmapConfig {
380 pub reservoir_size: usize,
382 pub bucket_ms: u32,
384 pub seed: u64,
386}
387
388impl Default for ReservoirHeatmapConfig {
389 fn default() -> Self {
390 Self {
391 reservoir_size: 1_000,
392 bucket_ms: 5_000,
393 seed: 0xDEAD_BEEF_CAFE_1234,
394 }
395 }
396}
397
398#[derive(Debug, Clone)]
400pub struct SampledHeatmap {
401 pub points: Vec<HeatPoint>,
403 pub total_sessions_seen: usize,
405 pub sessions_sampled: usize,
407}
408
409pub fn reservoir_sampled_heatmap<'a>(
415 sessions: impl Iterator<Item = &'a ViewerSession>,
416 content_duration_ms: u64,
417 config: &ReservoirHeatmapConfig,
418) -> Option<SampledHeatmap> {
419 if content_duration_ms == 0 || config.bucket_ms == 0 || config.reservoir_size == 0 {
420 return None;
421 }
422
423 let capacity = config.reservoir_size;
424 let mut reservoir: Vec<&ViewerSession> = Vec::with_capacity(capacity);
425 let mut total_seen: usize = 0;
426
427 let mut rng_state: u64 = config.seed;
429 let next_u64 = |state: &mut u64| -> u64 {
430 *state = state.wrapping_add(0x9e37_79b9_7f4a_7c15);
431 let mut z: u64 = *state;
432 z = (z ^ (z >> 30)).wrapping_mul(0xbf58_476d_1ce4_e5b9);
433 z = (z ^ (z >> 27)).wrapping_mul(0x94d0_49bb_1331_11eb);
434 z ^ (z >> 31)
435 };
436
437 for session in sessions {
438 if total_seen < capacity {
439 reservoir.push(session);
440 } else {
441 let j = (next_u64(&mut rng_state) % (total_seen as u64 + 1)) as usize;
442 if j < capacity {
443 reservoir[j] = session;
444 }
445 }
446 total_seen += 1;
447 }
448
449 let sessions_sampled = reservoir.len();
450 let heat_points = attention_heatmap_refs(&reservoir, content_duration_ms, config.bucket_ms);
451
452 Some(SampledHeatmap {
453 points: heat_points,
454 total_sessions_seen: total_seen,
455 sessions_sampled,
456 })
457}
458
459fn attention_heatmap_refs(
461 sessions: &[&ViewerSession],
462 content_duration_ms: u64,
463 bucket_ms: u32,
464) -> Vec<HeatPoint> {
465 if sessions.is_empty() || content_duration_ms == 0 || bucket_ms == 0 {
466 return Vec::new();
467 }
468 let bucket_ms_u64 = u64::from(bucket_ms);
469 let num_buckets = ((content_duration_ms + bucket_ms_u64 - 1) / bucket_ms_u64) as usize;
470 let mut counts = vec![0u32; num_buckets];
471
472 for &session in sessions {
473 let map = build_playback_map(session, content_duration_ms);
474 for (bucket_idx, count) in counts.iter_mut().enumerate() {
475 let bucket_start_ms = bucket_idx as u64 * bucket_ms_u64;
476 let bucket_end_ms = (bucket_start_ms + bucket_ms_u64).min(content_duration_ms);
477 let start_sec = (bucket_start_ms / 1000) as usize;
478 let end_sec = ((bucket_end_ms + 999) / 1000) as usize;
479 let end_sec = end_sec.min(map.positions_watched.len());
480 let watched = (start_sec..end_sec)
481 .any(|s| map.positions_watched.get(s).copied().unwrap_or(false));
482 if watched {
483 *count += 1;
484 }
485 }
486 }
487
488 let max_count = counts.iter().copied().max().unwrap_or(0);
489 if max_count == 0 {
490 return Vec::new();
491 }
492
493 counts
494 .into_iter()
495 .enumerate()
496 .map(|(idx, c)| HeatPoint {
497 position_ms: idx as u64 * bucket_ms_u64,
498 intensity: c as f32 / max_count as f32,
499 })
500 .collect()
501}
502
503#[cfg(test)]
506mod tests {
507 use super::*;
508
509 fn simple_session(id: &str, content_id: &str, events: Vec<PlaybackEvent>) -> ViewerSession {
510 ViewerSession {
511 session_id: id.to_string(),
512 user_id: None,
513 content_id: content_id.to_string(),
514 started_at_ms: 0,
515 events,
516 }
517 }
518
519 #[test]
522 fn playback_map_mark_range_basic() {
523 let mut map = PlaybackMap {
524 positions_watched: vec![false; 10],
525 };
526 map.mark_range(0, 3000);
527 assert!(map.positions_watched[0]);
528 assert!(map.positions_watched[1]);
529 assert!(map.positions_watched[2]);
530 assert!(!map.positions_watched[3]);
531 }
532
533 #[test]
534 fn playback_map_mark_range_clamps_to_capacity() {
535 let mut map = PlaybackMap {
536 positions_watched: vec![false; 5],
537 };
538 map.mark_range(0, 100_000);
539 assert!(map.positions_watched.iter().all(|&b| b));
540 }
541
542 #[test]
543 fn playback_map_empty_range_is_noop() {
544 let mut map = PlaybackMap {
545 positions_watched: vec![false; 5],
546 };
547 map.mark_range(3000, 3000);
548 assert!(map.positions_watched.iter().all(|&b| !b));
549 }
550
551 #[test]
552 fn playback_map_coverage_full() {
553 let map = PlaybackMap {
554 positions_watched: vec![true; 10],
555 };
556 let pct = map.coverage_pct(10_000);
557 assert!((pct - 1.0).abs() < 1e-6);
558 }
559
560 #[test]
561 fn playback_map_coverage_half() {
562 let mut positions = vec![false; 10];
563 for i in 0..5 {
564 positions[i] = true;
565 }
566 let map = PlaybackMap {
567 positions_watched: positions,
568 };
569 let pct = map.coverage_pct(10_000);
570 assert!((pct - 0.5).abs() < 1e-6);
571 }
572
573 #[test]
574 fn playback_map_coverage_zero_duration() {
575 let map = PlaybackMap {
576 positions_watched: vec![true; 10],
577 };
578 assert_eq!(map.coverage_pct(0), 0.0);
579 }
580
581 #[test]
584 fn build_map_play_then_end() {
585 let session = simple_session(
586 "s1",
587 "c1",
588 vec![
589 PlaybackEvent::Play { timestamp_ms: 0 },
590 PlaybackEvent::End {
591 position_ms: 5000,
592 watch_duration_ms: 5000,
593 },
594 ],
595 );
596 let map = build_playback_map(&session, 10_000);
597 assert!(map.positions_watched[0]);
599 assert!(map.positions_watched[4]);
600 assert!(!map.positions_watched[5]);
601 }
602
603 #[test]
604 fn build_map_play_pause_play_end() {
605 let session = simple_session(
606 "s2",
607 "c1",
608 vec![
609 PlaybackEvent::Play { timestamp_ms: 0 },
610 PlaybackEvent::Pause {
611 timestamp_ms: 1000,
612 position_ms: 3000,
613 },
614 PlaybackEvent::Play { timestamp_ms: 2000 },
615 PlaybackEvent::End {
616 position_ms: 7000,
617 watch_duration_ms: 7000,
618 },
619 ],
620 );
621 let map = build_playback_map(&session, 10_000);
622 assert!(map.positions_watched[0]);
624 assert!(map.positions_watched[3]);
625 assert!(map.positions_watched[6]);
626 assert!(!map.positions_watched[7]);
627 }
628
629 #[test]
630 fn build_map_seek_forward() {
631 let session = simple_session(
632 "s3",
633 "c1",
634 vec![
635 PlaybackEvent::Play { timestamp_ms: 0 },
636 PlaybackEvent::Seek {
637 from_ms: 2000,
638 to_ms: 8000,
639 },
640 PlaybackEvent::End {
641 position_ms: 10_000,
642 watch_duration_ms: 4000,
643 },
644 ],
645 );
646 let map = build_playback_map(&session, 12_000);
647 assert!(map.positions_watched[0]);
649 assert!(!map.positions_watched[5]);
650 assert!(map.positions_watched[8]);
651 }
652
653 #[test]
654 fn build_map_no_events_returns_all_false() {
655 let session = simple_session("s4", "c1", vec![]);
656 let map = build_playback_map(&session, 5_000);
657 assert!(map.positions_watched.iter().all(|&b| !b));
658 }
659
660 #[test]
663 fn analyze_session_basic() {
664 let session = simple_session(
665 "s5",
666 "c1",
667 vec![
668 PlaybackEvent::Play { timestamp_ms: 0 },
669 PlaybackEvent::BufferStart { position_ms: 2000 },
670 PlaybackEvent::BufferEnd {
671 position_ms: 2000,
672 duration_ms: 500,
673 },
674 PlaybackEvent::Seek {
675 from_ms: 4000,
676 to_ms: 1000,
677 },
678 PlaybackEvent::QualityChange {
679 from_height: 720,
680 to_height: 1080,
681 bitrate: 5_000_000,
682 },
683 PlaybackEvent::End {
684 position_ms: 10_000,
685 watch_duration_ms: 9000,
686 },
687 ],
688 );
689 let metrics = analyze_session(&session, 10_000);
690 assert_eq!(metrics.buffer_events, 1);
691 assert_eq!(metrics.buffer_time_ms, 500);
692 assert_eq!(metrics.seek_count, 1);
693 assert_eq!(metrics.quality_changes, 1);
694 assert_eq!(metrics.total_watch_ms, 9000);
695 assert!((metrics.completion_pct - 100.0).abs() < 1e-3);
696 }
697
698 #[test]
699 fn analyze_session_no_end_event() {
700 let session = simple_session(
701 "s6",
702 "c1",
703 vec![
704 PlaybackEvent::Play { timestamp_ms: 0 },
705 PlaybackEvent::Pause {
706 timestamp_ms: 5000,
707 position_ms: 5000,
708 },
709 ],
710 );
711 let metrics = analyze_session(&session, 20_000);
712 assert_eq!(metrics.seek_count, 0);
713 assert!((metrics.completion_pct - 25.0).abs() < 1e-3);
714 }
715
716 #[test]
717 fn analyze_session_zero_duration() {
718 let session = simple_session(
719 "s7",
720 "c1",
721 vec![PlaybackEvent::End {
722 position_ms: 5000,
723 watch_duration_ms: 5000,
724 }],
725 );
726 let metrics = analyze_session(&session, 0);
727 assert_eq!(metrics.completion_pct, 0.0);
728 }
729
730 #[test]
733 fn heatmap_empty_sessions() {
734 let result = attention_heatmap(&[], 60_000, 1000);
735 assert!(result.is_empty());
736 }
737
738 #[test]
739 fn heatmap_single_session_full_watch() {
740 let session = simple_session(
741 "h1",
742 "c1",
743 vec![
744 PlaybackEvent::Play { timestamp_ms: 0 },
745 PlaybackEvent::End {
746 position_ms: 10_000,
747 watch_duration_ms: 10_000,
748 },
749 ],
750 );
751 let heat = attention_heatmap(&[session], 10_000, 2000);
752 assert!(!heat.is_empty());
754 let peak = heat.iter().map(|h| h.intensity).fold(0.0f32, f32::max);
755 assert!((peak - 1.0).abs() < 1e-6);
756 }
757
758 #[test]
759 fn heatmap_bucket_positions_correct() {
760 let session = simple_session(
761 "h2",
762 "c1",
763 vec![
764 PlaybackEvent::Play { timestamp_ms: 0 },
765 PlaybackEvent::End {
766 position_ms: 5000,
767 watch_duration_ms: 5000,
768 },
769 ],
770 );
771 let heat = attention_heatmap(&[session], 10_000, 5000);
772 assert_eq!(heat.len(), 2);
773 assert_eq!(heat[0].position_ms, 0);
774 assert_eq!(heat[1].position_ms, 5000);
775 }
776
777 #[test]
778 fn heatmap_zero_bucket_ms_returns_empty() {
779 let session = simple_session("h3", "c1", vec![]);
780 let heat = attention_heatmap(&[session], 10_000, 0);
781 assert!(heat.is_empty());
782 }
783
784 #[test]
785 fn viewer_session_new() {
786 let s = ViewerSession::new("id1", Some("u1".to_string()), "vid1", 12345);
787 assert_eq!(s.session_id, "id1");
788 assert_eq!(s.user_id, Some("u1".to_string()));
789 assert!(s.events.is_empty());
790 }
791
792 #[test]
793 fn viewer_session_push_event() {
794 let mut s = ViewerSession::new("id2", None, "vid2", 0);
795 s.push_event(PlaybackEvent::Play { timestamp_ms: 100 });
796 assert_eq!(s.events.len(), 1);
797 }
798
799 fn full_watch(id: &str, content_ms: u64) -> ViewerSession {
802 simple_session(
803 id,
804 "c1",
805 vec![
806 PlaybackEvent::Play { timestamp_ms: 0 },
807 PlaybackEvent::End {
808 position_ms: content_ms,
809 watch_duration_ms: content_ms,
810 },
811 ],
812 )
813 }
814
815 fn half_watch(id: &str, content_ms: u64) -> ViewerSession {
816 simple_session(
817 id,
818 "c1",
819 vec![
820 PlaybackEvent::Play { timestamp_ms: 0 },
821 PlaybackEvent::End {
822 position_ms: content_ms / 2,
823 watch_duration_ms: content_ms / 2,
824 },
825 ],
826 )
827 }
828
829 #[test]
830 fn reservoir_none_on_zero_duration() {
831 let sessions = vec![full_watch("s1", 10_000)];
832 assert!(
833 reservoir_sampled_heatmap(sessions.iter(), 0, &ReservoirHeatmapConfig::default())
834 .is_none()
835 );
836 }
837
838 #[test]
839 fn reservoir_none_on_zero_bucket() {
840 let sessions = vec![full_watch("s1", 10_000)];
841 let cfg = ReservoirHeatmapConfig {
842 bucket_ms: 0,
843 ..Default::default()
844 };
845 assert!(reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).is_none());
846 }
847
848 #[test]
849 fn reservoir_none_on_zero_capacity() {
850 let sessions = vec![full_watch("s1", 10_000)];
851 let cfg = ReservoirHeatmapConfig {
852 reservoir_size: 0,
853 ..Default::default()
854 };
855 assert!(reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).is_none());
856 }
857
858 #[test]
859 fn reservoir_empty_input_empty_points() {
860 let sessions: Vec<ViewerSession> = vec![];
861 let cfg = ReservoirHeatmapConfig {
862 reservoir_size: 5,
863 bucket_ms: 2_000,
864 seed: 42,
865 };
866 let r = reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).expect("result");
867 assert_eq!(r.total_sessions_seen, 0);
868 assert_eq!(r.sessions_sampled, 0);
869 assert!(r.points.is_empty());
870 }
871
872 #[test]
873 fn reservoir_fewer_sessions_keeps_all() {
874 let sessions: Vec<ViewerSession> = (0..5)
875 .map(|i| full_watch(&format!("s{i}"), 10_000))
876 .collect();
877 let cfg = ReservoirHeatmapConfig {
878 reservoir_size: 100,
879 bucket_ms: 2_000,
880 seed: 1,
881 };
882 let r = reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).expect("result");
883 assert_eq!(r.total_sessions_seen, 5);
884 assert_eq!(r.sessions_sampled, 5);
885 }
886
887 #[test]
888 fn reservoir_caps_at_capacity() {
889 let sessions: Vec<ViewerSession> = (0..50)
890 .map(|i| full_watch(&format!("s{i}"), 10_000))
891 .collect();
892 let cfg = ReservoirHeatmapConfig {
893 reservoir_size: 10,
894 bucket_ms: 2_000,
895 seed: 42,
896 };
897 let r = reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).expect("result");
898 assert_eq!(r.total_sessions_seen, 50);
899 assert_eq!(r.sessions_sampled, 10);
900 }
901
902 #[test]
903 fn reservoir_full_watch_all_max_intensity() {
904 let sessions: Vec<ViewerSession> = (0..20)
905 .map(|i| full_watch(&format!("s{i}"), 10_000))
906 .collect();
907 let cfg = ReservoirHeatmapConfig {
908 reservoir_size: 20,
909 bucket_ms: 2_000,
910 seed: 7,
911 };
912 let r = reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).expect("result");
913 assert!(!r.points.is_empty());
914 for p in &r.points {
915 assert!(
916 (p.intensity - 1.0).abs() < 1e-6,
917 "bucket {}ms intensity={}",
918 p.position_ms,
919 p.intensity
920 );
921 }
922 }
923
924 #[test]
925 fn reservoir_partial_watch_intensity_gradient() {
926 let mut sessions: Vec<ViewerSession> = (0..10)
927 .map(|i| full_watch(&format!("full_{i}"), 10_000))
928 .collect();
929 sessions.extend((0..10).map(|i| half_watch(&format!("half_{i}"), 10_000)));
930 let cfg = ReservoirHeatmapConfig {
931 reservoir_size: 20,
932 bucket_ms: 5_000,
933 seed: 99,
934 };
935 let r = reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).expect("result");
936 assert_eq!(r.points.len(), 2);
937 assert!((r.points[0].intensity - 1.0).abs() < 1e-6);
938 assert!(
939 (r.points[1].intensity - 0.5).abs() < 0.1,
940 "expected ~0.5, got {}",
941 r.points[1].intensity
942 );
943 }
944
945 #[test]
946 fn reservoir_deterministic_with_same_seed() {
947 let sessions: Vec<ViewerSession> = (0..100)
948 .map(|i| {
949 if i % 3 == 0 {
950 half_watch(&format!("s{i}"), 30_000)
951 } else {
952 full_watch(&format!("s{i}"), 30_000)
953 }
954 })
955 .collect();
956 let cfg = ReservoirHeatmapConfig {
957 reservoir_size: 30,
958 bucket_ms: 5_000,
959 seed: 0xABCD_EF01,
960 };
961 let r1 = reservoir_sampled_heatmap(sessions.iter(), 30_000, &cfg).expect("r1");
962 let r2 = reservoir_sampled_heatmap(sessions.iter(), 30_000, &cfg).expect("r2");
963 assert_eq!(r1.sessions_sampled, r2.sessions_sampled);
964 assert_eq!(r1.points.len(), r2.points.len());
965 for (a, b) in r1.points.iter().zip(r2.points.iter()) {
966 assert!((a.intensity - b.intensity).abs() < 1e-9);
967 }
968 }
969}