Skip to main content

oximedia_analytics/
session.rs

1//! Viewer session analytics — playback events, session metrics, playback maps,
2//! and attention heatmaps.
3
4use rayon::prelude::*;
5
6/// A discrete event emitted during media playback.
7#[derive(Debug, Clone, PartialEq)]
8pub enum PlaybackEvent {
9    /// The viewer pressed play.
10    Play { timestamp_ms: i64 },
11    /// The viewer paused at a given content position.
12    Pause { timestamp_ms: i64, position_ms: u64 },
13    /// The viewer scrubbed from one position to another.
14    Seek { from_ms: u64, to_ms: u64 },
15    /// Buffering started at a content position.
16    BufferStart { position_ms: u64 },
17    /// Buffering ended; `duration_ms` is how long the stall lasted.
18    BufferEnd { position_ms: u64, duration_ms: u32 },
19    /// The player switched quality levels.
20    QualityChange {
21        from_height: u32,
22        to_height: u32,
23        bitrate: u32,
24    },
25    /// Playback reached the end (or the user closed the player).
26    End {
27        position_ms: u64,
28        watch_duration_ms: u64,
29    },
30}
31
32/// A single viewing session for one piece of content.
33#[derive(Debug, Clone)]
34pub struct ViewerSession {
35    pub session_id: String,
36    pub user_id: Option<String>,
37    pub content_id: String,
38    /// Wall-clock start time of the session (Unix epoch ms).
39    pub started_at_ms: i64,
40    pub events: Vec<PlaybackEvent>,
41}
42
43impl ViewerSession {
44    /// Create a new empty session.
45    pub fn new(
46        session_id: impl Into<String>,
47        user_id: Option<String>,
48        content_id: impl Into<String>,
49        started_at_ms: i64,
50    ) -> Self {
51        Self {
52            session_id: session_id.into(),
53            user_id,
54            content_id: content_id.into(),
55            started_at_ms,
56            events: Vec::new(),
57        }
58    }
59
60    /// Append a playback event.
61    pub fn push_event(&mut self, event: PlaybackEvent) {
62        self.events.push(event);
63    }
64}
65
66/// Aggregate metrics derived from a single `ViewerSession`.
67#[derive(Debug, Clone, PartialEq)]
68pub struct SessionMetrics {
69    /// Total milliseconds of content actually watched (sum of watch chunks).
70    pub total_watch_ms: u64,
71    /// Number of unique 1-second positions watched (distinct content seconds).
72    pub unique_positions_watched: u64,
73    /// How many `Seek` events were recorded.
74    pub seek_count: u32,
75    /// How many buffering interruptions occurred.
76    pub buffer_events: u32,
77    /// Total stall time in milliseconds.
78    pub buffer_time_ms: u64,
79    /// How many quality-level switches happened.
80    pub quality_changes: u32,
81    /// What fraction of the content was completed (0.0 – 100.0).
82    pub completion_pct: f32,
83}
84
85/// Analyse a session and return aggregate metrics.
86///
87/// `content_duration_ms` is required only to compute `completion_pct` and the
88/// unique-position count; pass `0` if unknown (completion will be `0.0`).
89pub fn analyze_session(session: &ViewerSession, content_duration_ms: u64) -> SessionMetrics {
90    let mut seek_count: u32 = 0;
91    let mut buffer_events: u32 = 0;
92    let mut buffer_time_ms: u64 = 0;
93    let mut quality_changes: u32 = 0;
94    let mut max_position_ms: u64 = 0;
95    let mut total_watch_ms: u64 = 0;
96
97    // We reconstruct watch intervals by treating Play→Pause/Seek/End pairs.
98    // A simple heuristic: use the `watch_duration_ms` field in `End` events,
99    // and for sessions without an End event fall back to the last known position.
100    let mut last_end_watch_ms: Option<u64> = None;
101
102    for event in &session.events {
103        match event {
104            PlaybackEvent::Seek { from_ms, to_ms } => {
105                seek_count += 1;
106                max_position_ms = max_position_ms.max(*from_ms).max(*to_ms);
107            }
108            PlaybackEvent::BufferStart { position_ms } => {
109                buffer_events += 1;
110                max_position_ms = max_position_ms.max(*position_ms);
111            }
112            PlaybackEvent::BufferEnd {
113                position_ms,
114                duration_ms,
115            } => {
116                buffer_time_ms += u64::from(*duration_ms);
117                max_position_ms = max_position_ms.max(*position_ms);
118            }
119            PlaybackEvent::QualityChange { .. } => {
120                quality_changes += 1;
121            }
122            PlaybackEvent::End {
123                position_ms,
124                watch_duration_ms,
125            } => {
126                max_position_ms = max_position_ms.max(*position_ms);
127                last_end_watch_ms = Some(*watch_duration_ms);
128                total_watch_ms = total_watch_ms.max(*watch_duration_ms);
129            }
130            PlaybackEvent::Pause { position_ms, .. } => {
131                max_position_ms = max_position_ms.max(*position_ms);
132            }
133            PlaybackEvent::Play { .. } => {}
134        }
135    }
136
137    if let Some(w) = last_end_watch_ms {
138        total_watch_ms = w;
139    }
140
141    // Build playback map for unique positions.
142    let map = build_playback_map(session, content_duration_ms);
143    let unique_positions_watched = if content_duration_ms > 0 {
144        map.positions_watched.iter().filter(|&&b| b).count() as u64
145    } else {
146        0
147    };
148
149    let completion_pct = if content_duration_ms > 0 {
150        (max_position_ms as f32 / content_duration_ms as f32 * 100.0).min(100.0)
151    } else {
152        0.0
153    };
154
155    SessionMetrics {
156        total_watch_ms,
157        unique_positions_watched,
158        seek_count,
159        buffer_events,
160        buffer_time_ms,
161        quality_changes,
162        completion_pct,
163    }
164}
165
166/// A boolean map of 1-second content buckets indicating which positions were
167/// watched during a session.
168#[derive(Debug, Clone)]
169pub struct PlaybackMap {
170    /// One `bool` per second of content (`true` = watched).
171    pub positions_watched: Vec<bool>,
172}
173
174impl PlaybackMap {
175    /// Mark every second in `[start_ms, end_ms)` as watched.
176    pub fn mark_range(&mut self, start_ms: u64, end_ms: u64) {
177        if start_ms >= end_ms || self.positions_watched.is_empty() {
178            return;
179        }
180        let start_sec = (start_ms / 1000) as usize;
181        let end_sec = ((end_ms + 999) / 1000) as usize; // round up
182        let cap = self.positions_watched.len();
183        let end_sec = end_sec.min(cap);
184        for i in start_sec..end_sec {
185            self.positions_watched[i] = true;
186        }
187    }
188
189    /// Fraction of the content that was watched (0.0 – 1.0).
190    pub fn coverage_pct(&self, total_ms: u64) -> f32 {
191        if total_ms == 0 || self.positions_watched.is_empty() {
192            return 0.0;
193        }
194        let total_sec = ((total_ms + 999) / 1000) as usize;
195        let total_sec = total_sec.min(self.positions_watched.len());
196        if total_sec == 0 {
197            return 0.0;
198        }
199        let watched = self.positions_watched[..total_sec]
200            .iter()
201            .filter(|&&b| b)
202            .count();
203        watched as f32 / total_sec as f32
204    }
205}
206
207/// Analyze multiple sessions in batch, returning one `SessionMetrics` per session.
208///
209/// Uses Rayon parallel iterators for throughput on large batches.  Results are
210/// returned in the same order as the input `sessions` slice.
211pub fn analyze_sessions_batch(
212    sessions: &[ViewerSession],
213    content_duration_ms: u64,
214) -> Vec<SessionMetrics> {
215    sessions
216        .par_iter()
217        .map(|s| analyze_session(s, content_duration_ms))
218        .collect()
219}
220
221/// Build a `PlaybackMap` from a session's events.
222///
223/// The function reconstructs watch intervals by pairing `Play` events with
224/// subsequent `Pause`, `Seek`, or `End` events.  A running `current_position`
225/// advances with each event so the heuristic is robust to sessions that only
226/// contain a single `End` event.
227pub fn build_playback_map(session: &ViewerSession, content_duration_ms: u64) -> PlaybackMap {
228    let num_seconds = if content_duration_ms > 0 {
229        ((content_duration_ms + 999) / 1000) as usize
230    } else {
231        // Derive a reasonable size from the maximum event position.
232        let max_pos = session.events.iter().fold(0u64, |acc, e| match e {
233            PlaybackEvent::Pause { position_ms, .. } => acc.max(*position_ms),
234            PlaybackEvent::Seek { from_ms, to_ms } => acc.max(*from_ms).max(*to_ms),
235            PlaybackEvent::BufferStart { position_ms } => acc.max(*position_ms),
236            PlaybackEvent::BufferEnd { position_ms, .. } => acc.max(*position_ms),
237            PlaybackEvent::End { position_ms, .. } => acc.max(*position_ms),
238            _ => acc,
239        });
240        if max_pos == 0 {
241            return PlaybackMap {
242                positions_watched: Vec::new(),
243            };
244        }
245        ((max_pos + 999) / 1000) as usize + 1
246    };
247
248    let mut map = PlaybackMap {
249        positions_watched: vec![false; num_seconds],
250    };
251
252    // State machine: track whether we are currently "playing" and from where.
253    let mut playing = false;
254    let mut play_start_pos: u64 = 0;
255    let mut current_pos: u64 = 0;
256
257    for event in &session.events {
258        match event {
259            PlaybackEvent::Play { .. } => {
260                playing = true;
261                play_start_pos = current_pos;
262            }
263            PlaybackEvent::Pause { position_ms, .. } => {
264                if playing {
265                    map.mark_range(play_start_pos, *position_ms);
266                    playing = false;
267                }
268                current_pos = *position_ms;
269            }
270            PlaybackEvent::Seek { from_ms, to_ms } => {
271                if playing {
272                    map.mark_range(play_start_pos, *from_ms);
273                }
274                current_pos = *to_ms;
275                if playing {
276                    play_start_pos = *to_ms;
277                }
278            }
279            PlaybackEvent::BufferStart { position_ms } => {
280                if playing {
281                    map.mark_range(play_start_pos, *position_ms);
282                    // Pause tracking during buffer stall.
283                    play_start_pos = *position_ms;
284                }
285                current_pos = *position_ms;
286            }
287            PlaybackEvent::BufferEnd { position_ms, .. } => {
288                current_pos = *position_ms;
289                if playing {
290                    play_start_pos = *position_ms;
291                }
292            }
293            PlaybackEvent::End { position_ms, .. } => {
294                if playing {
295                    map.mark_range(play_start_pos, *position_ms);
296                    playing = false;
297                }
298                current_pos = *position_ms;
299            }
300            PlaybackEvent::QualityChange { .. } => {}
301        }
302    }
303
304    // If still "playing" at end of event list (session cut off), mark up to last position.
305    if playing && current_pos > play_start_pos {
306        map.mark_range(play_start_pos, current_pos);
307    }
308
309    map
310}
311
312/// A single point on an attention heatmap.
313#[derive(Debug, Clone, PartialEq)]
314pub struct HeatPoint {
315    /// Content position in milliseconds (start of bucket).
316    pub position_ms: u64,
317    /// Normalised viewer attention intensity (0.0 – 1.0).
318    pub intensity: f32,
319}
320
321/// Compute an attention heatmap across a collection of sessions.
322///
323/// The content is divided into `bucket_ms`-wide buckets.  For each bucket the
324/// intensity is the fraction of sessions that watched any part of that bucket,
325/// normalised to the maximum bucket count so the peak bucket always has
326/// intensity 1.0.
327pub fn attention_heatmap(
328    sessions: &[ViewerSession],
329    content_duration_ms: u64,
330    bucket_ms: u32,
331) -> Vec<HeatPoint> {
332    if sessions.is_empty() || content_duration_ms == 0 || bucket_ms == 0 {
333        return Vec::new();
334    }
335
336    let bucket_ms_u64 = u64::from(bucket_ms);
337    let num_buckets = ((content_duration_ms + bucket_ms_u64 - 1) / bucket_ms_u64) as usize;
338    let mut counts = vec![0u32; num_buckets];
339
340    for session in sessions {
341        let map = build_playback_map(session, content_duration_ms);
342        // Aggregate per-bucket: bucket is "watched" if any second inside it was watched.
343        for (bucket_idx, count) in counts.iter_mut().enumerate() {
344            let bucket_start_ms = bucket_idx as u64 * bucket_ms_u64;
345            let bucket_end_ms = (bucket_start_ms + bucket_ms_u64).min(content_duration_ms);
346            let start_sec = (bucket_start_ms / 1000) as usize;
347            let end_sec = ((bucket_end_ms + 999) / 1000) as usize;
348            let end_sec = end_sec.min(map.positions_watched.len());
349            let watched = (start_sec..end_sec)
350                .any(|s| map.positions_watched.get(s).copied().unwrap_or(false));
351            if watched {
352                *count += 1;
353            }
354        }
355    }
356
357    let max_count = counts.iter().copied().max().unwrap_or(0);
358    if max_count == 0 {
359        return Vec::new();
360    }
361
362    counts
363        .into_iter()
364        .enumerate()
365        .map(|(idx, c)| HeatPoint {
366            position_ms: idx as u64 * bucket_ms_u64,
367            intensity: c as f32 / max_count as f32,
368        })
369        .collect()
370}
371
372// ─── Reservoir-sampled attention heatmap ──────────────────────────────────────
373
374/// Configuration for reservoir-sampled attention heatmap generation.
375///
376/// When there are too many sessions to keep in memory, a uniform random sample
377/// of size `reservoir_size` is maintained via Algorithm R (Vitter 1985).
378#[derive(Debug, Clone)]
379pub struct ReservoirHeatmapConfig {
380    /// Maximum number of sessions held in the reservoir at any time.
381    pub reservoir_size: usize,
382    /// Width of each heatmap bucket in milliseconds.
383    pub bucket_ms: u32,
384    /// 64-bit seed for the pseudorandom number generator.
385    pub seed: u64,
386}
387
388impl Default for ReservoirHeatmapConfig {
389    fn default() -> Self {
390        Self {
391            reservoir_size: 1_000,
392            bucket_ms: 5_000,
393            seed: 0xDEAD_BEEF_CAFE_1234,
394        }
395    }
396}
397
398/// A memory-bounded attention heatmap computed via reservoir sampling.
399#[derive(Debug, Clone)]
400pub struct SampledHeatmap {
401    /// Heatmap bucket values (same layout as [`attention_heatmap`]).
402    pub points: Vec<HeatPoint>,
403    /// Number of sessions offered to the reservoir.
404    pub total_sessions_seen: usize,
405    /// Actual number of sessions sampled (≤ `reservoir_size`).
406    pub sessions_sampled: usize,
407}
408
409/// Build a memory-bounded attention heatmap using reservoir sampling
410/// (Vitter Algorithm R).
411///
412/// Returns `None` when `content_duration_ms == 0`, `bucket_ms == 0`, or
413/// `reservoir_size == 0`.
414pub fn reservoir_sampled_heatmap<'a>(
415    sessions: impl Iterator<Item = &'a ViewerSession>,
416    content_duration_ms: u64,
417    config: &ReservoirHeatmapConfig,
418) -> Option<SampledHeatmap> {
419    if content_duration_ms == 0 || config.bucket_ms == 0 || config.reservoir_size == 0 {
420        return None;
421    }
422
423    let capacity = config.reservoir_size;
424    let mut reservoir: Vec<&ViewerSession> = Vec::with_capacity(capacity);
425    let mut total_seen: usize = 0;
426
427    // splitmix64 PRNG — dependency-free.
428    let mut rng_state: u64 = config.seed;
429    let next_u64 = |state: &mut u64| -> u64 {
430        *state = state.wrapping_add(0x9e37_79b9_7f4a_7c15);
431        let mut z: u64 = *state;
432        z = (z ^ (z >> 30)).wrapping_mul(0xbf58_476d_1ce4_e5b9);
433        z = (z ^ (z >> 27)).wrapping_mul(0x94d0_49bb_1331_11eb);
434        z ^ (z >> 31)
435    };
436
437    for session in sessions {
438        if total_seen < capacity {
439            reservoir.push(session);
440        } else {
441            let j = (next_u64(&mut rng_state) % (total_seen as u64 + 1)) as usize;
442            if j < capacity {
443                reservoir[j] = session;
444            }
445        }
446        total_seen += 1;
447    }
448
449    let sessions_sampled = reservoir.len();
450    let heat_points = attention_heatmap_refs(&reservoir, content_duration_ms, config.bucket_ms);
451
452    Some(SampledHeatmap {
453        points: heat_points,
454        total_sessions_seen: total_seen,
455        sessions_sampled,
456    })
457}
458
459/// Compute attention heatmap over a slice of session references.
460fn attention_heatmap_refs(
461    sessions: &[&ViewerSession],
462    content_duration_ms: u64,
463    bucket_ms: u32,
464) -> Vec<HeatPoint> {
465    if sessions.is_empty() || content_duration_ms == 0 || bucket_ms == 0 {
466        return Vec::new();
467    }
468    let bucket_ms_u64 = u64::from(bucket_ms);
469    let num_buckets = ((content_duration_ms + bucket_ms_u64 - 1) / bucket_ms_u64) as usize;
470    let mut counts = vec![0u32; num_buckets];
471
472    for &session in sessions {
473        let map = build_playback_map(session, content_duration_ms);
474        for (bucket_idx, count) in counts.iter_mut().enumerate() {
475            let bucket_start_ms = bucket_idx as u64 * bucket_ms_u64;
476            let bucket_end_ms = (bucket_start_ms + bucket_ms_u64).min(content_duration_ms);
477            let start_sec = (bucket_start_ms / 1000) as usize;
478            let end_sec = ((bucket_end_ms + 999) / 1000) as usize;
479            let end_sec = end_sec.min(map.positions_watched.len());
480            let watched = (start_sec..end_sec)
481                .any(|s| map.positions_watched.get(s).copied().unwrap_or(false));
482            if watched {
483                *count += 1;
484            }
485        }
486    }
487
488    let max_count = counts.iter().copied().max().unwrap_or(0);
489    if max_count == 0 {
490        return Vec::new();
491    }
492
493    counts
494        .into_iter()
495        .enumerate()
496        .map(|(idx, c)| HeatPoint {
497            position_ms: idx as u64 * bucket_ms_u64,
498            intensity: c as f32 / max_count as f32,
499        })
500        .collect()
501}
502
503// ─── Tests ──────────────────────────────────────────────────────────────────
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508
509    fn simple_session(id: &str, content_id: &str, events: Vec<PlaybackEvent>) -> ViewerSession {
510        ViewerSession {
511            session_id: id.to_string(),
512            user_id: None,
513            content_id: content_id.to_string(),
514            started_at_ms: 0,
515            events,
516        }
517    }
518
519    // ── PlaybackMap ──────────────────────────────────────────────────────────
520
521    #[test]
522    fn playback_map_mark_range_basic() {
523        let mut map = PlaybackMap {
524            positions_watched: vec![false; 10],
525        };
526        map.mark_range(0, 3000);
527        assert!(map.positions_watched[0]);
528        assert!(map.positions_watched[1]);
529        assert!(map.positions_watched[2]);
530        assert!(!map.positions_watched[3]);
531    }
532
533    #[test]
534    fn playback_map_mark_range_clamps_to_capacity() {
535        let mut map = PlaybackMap {
536            positions_watched: vec![false; 5],
537        };
538        map.mark_range(0, 100_000);
539        assert!(map.positions_watched.iter().all(|&b| b));
540    }
541
542    #[test]
543    fn playback_map_empty_range_is_noop() {
544        let mut map = PlaybackMap {
545            positions_watched: vec![false; 5],
546        };
547        map.mark_range(3000, 3000);
548        assert!(map.positions_watched.iter().all(|&b| !b));
549    }
550
551    #[test]
552    fn playback_map_coverage_full() {
553        let map = PlaybackMap {
554            positions_watched: vec![true; 10],
555        };
556        let pct = map.coverage_pct(10_000);
557        assert!((pct - 1.0).abs() < 1e-6);
558    }
559
560    #[test]
561    fn playback_map_coverage_half() {
562        let mut positions = vec![false; 10];
563        for i in 0..5 {
564            positions[i] = true;
565        }
566        let map = PlaybackMap {
567            positions_watched: positions,
568        };
569        let pct = map.coverage_pct(10_000);
570        assert!((pct - 0.5).abs() < 1e-6);
571    }
572
573    #[test]
574    fn playback_map_coverage_zero_duration() {
575        let map = PlaybackMap {
576            positions_watched: vec![true; 10],
577        };
578        assert_eq!(map.coverage_pct(0), 0.0);
579    }
580
581    // ── build_playback_map ───────────────────────────────────────────────────
582
583    #[test]
584    fn build_map_play_then_end() {
585        let session = simple_session(
586            "s1",
587            "c1",
588            vec![
589                PlaybackEvent::Play { timestamp_ms: 0 },
590                PlaybackEvent::End {
591                    position_ms: 5000,
592                    watch_duration_ms: 5000,
593                },
594            ],
595        );
596        let map = build_playback_map(&session, 10_000);
597        // Seconds 0-4 should be watched.
598        assert!(map.positions_watched[0]);
599        assert!(map.positions_watched[4]);
600        assert!(!map.positions_watched[5]);
601    }
602
603    #[test]
604    fn build_map_play_pause_play_end() {
605        let session = simple_session(
606            "s2",
607            "c1",
608            vec![
609                PlaybackEvent::Play { timestamp_ms: 0 },
610                PlaybackEvent::Pause {
611                    timestamp_ms: 1000,
612                    position_ms: 3000,
613                },
614                PlaybackEvent::Play { timestamp_ms: 2000 },
615                PlaybackEvent::End {
616                    position_ms: 7000,
617                    watch_duration_ms: 7000,
618                },
619            ],
620        );
621        let map = build_playback_map(&session, 10_000);
622        // 0-2 watched, 3-6 watched.
623        assert!(map.positions_watched[0]);
624        assert!(map.positions_watched[3]);
625        assert!(map.positions_watched[6]);
626        assert!(!map.positions_watched[7]);
627    }
628
629    #[test]
630    fn build_map_seek_forward() {
631        let session = simple_session(
632            "s3",
633            "c1",
634            vec![
635                PlaybackEvent::Play { timestamp_ms: 0 },
636                PlaybackEvent::Seek {
637                    from_ms: 2000,
638                    to_ms: 8000,
639                },
640                PlaybackEvent::End {
641                    position_ms: 10_000,
642                    watch_duration_ms: 4000,
643                },
644            ],
645        );
646        let map = build_playback_map(&session, 12_000);
647        // 0-1 watched, 2-7 NOT (skipped), 8-9 watched.
648        assert!(map.positions_watched[0]);
649        assert!(!map.positions_watched[5]);
650        assert!(map.positions_watched[8]);
651    }
652
653    #[test]
654    fn build_map_no_events_returns_all_false() {
655        let session = simple_session("s4", "c1", vec![]);
656        let map = build_playback_map(&session, 5_000);
657        assert!(map.positions_watched.iter().all(|&b| !b));
658    }
659
660    // ── analyze_session ──────────────────────────────────────────────────────
661
662    #[test]
663    fn analyze_session_basic() {
664        let session = simple_session(
665            "s5",
666            "c1",
667            vec![
668                PlaybackEvent::Play { timestamp_ms: 0 },
669                PlaybackEvent::BufferStart { position_ms: 2000 },
670                PlaybackEvent::BufferEnd {
671                    position_ms: 2000,
672                    duration_ms: 500,
673                },
674                PlaybackEvent::Seek {
675                    from_ms: 4000,
676                    to_ms: 1000,
677                },
678                PlaybackEvent::QualityChange {
679                    from_height: 720,
680                    to_height: 1080,
681                    bitrate: 5_000_000,
682                },
683                PlaybackEvent::End {
684                    position_ms: 10_000,
685                    watch_duration_ms: 9000,
686                },
687            ],
688        );
689        let metrics = analyze_session(&session, 10_000);
690        assert_eq!(metrics.buffer_events, 1);
691        assert_eq!(metrics.buffer_time_ms, 500);
692        assert_eq!(metrics.seek_count, 1);
693        assert_eq!(metrics.quality_changes, 1);
694        assert_eq!(metrics.total_watch_ms, 9000);
695        assert!((metrics.completion_pct - 100.0).abs() < 1e-3);
696    }
697
698    #[test]
699    fn analyze_session_no_end_event() {
700        let session = simple_session(
701            "s6",
702            "c1",
703            vec![
704                PlaybackEvent::Play { timestamp_ms: 0 },
705                PlaybackEvent::Pause {
706                    timestamp_ms: 5000,
707                    position_ms: 5000,
708                },
709            ],
710        );
711        let metrics = analyze_session(&session, 20_000);
712        assert_eq!(metrics.seek_count, 0);
713        assert!((metrics.completion_pct - 25.0).abs() < 1e-3);
714    }
715
716    #[test]
717    fn analyze_session_zero_duration() {
718        let session = simple_session(
719            "s7",
720            "c1",
721            vec![PlaybackEvent::End {
722                position_ms: 5000,
723                watch_duration_ms: 5000,
724            }],
725        );
726        let metrics = analyze_session(&session, 0);
727        assert_eq!(metrics.completion_pct, 0.0);
728    }
729
730    // ── attention_heatmap ────────────────────────────────────────────────────
731
732    #[test]
733    fn heatmap_empty_sessions() {
734        let result = attention_heatmap(&[], 60_000, 1000);
735        assert!(result.is_empty());
736    }
737
738    #[test]
739    fn heatmap_single_session_full_watch() {
740        let session = simple_session(
741            "h1",
742            "c1",
743            vec![
744                PlaybackEvent::Play { timestamp_ms: 0 },
745                PlaybackEvent::End {
746                    position_ms: 10_000,
747                    watch_duration_ms: 10_000,
748                },
749            ],
750        );
751        let heat = attention_heatmap(&[session], 10_000, 2000);
752        // Peak intensity = 1.0 for all buckets that were watched.
753        assert!(!heat.is_empty());
754        let peak = heat.iter().map(|h| h.intensity).fold(0.0f32, f32::max);
755        assert!((peak - 1.0).abs() < 1e-6);
756    }
757
758    #[test]
759    fn heatmap_bucket_positions_correct() {
760        let session = simple_session(
761            "h2",
762            "c1",
763            vec![
764                PlaybackEvent::Play { timestamp_ms: 0 },
765                PlaybackEvent::End {
766                    position_ms: 5000,
767                    watch_duration_ms: 5000,
768                },
769            ],
770        );
771        let heat = attention_heatmap(&[session], 10_000, 5000);
772        assert_eq!(heat.len(), 2);
773        assert_eq!(heat[0].position_ms, 0);
774        assert_eq!(heat[1].position_ms, 5000);
775    }
776
777    #[test]
778    fn heatmap_zero_bucket_ms_returns_empty() {
779        let session = simple_session("h3", "c1", vec![]);
780        let heat = attention_heatmap(&[session], 10_000, 0);
781        assert!(heat.is_empty());
782    }
783
784    #[test]
785    fn viewer_session_new() {
786        let s = ViewerSession::new("id1", Some("u1".to_string()), "vid1", 12345);
787        assert_eq!(s.session_id, "id1");
788        assert_eq!(s.user_id, Some("u1".to_string()));
789        assert!(s.events.is_empty());
790    }
791
792    #[test]
793    fn viewer_session_push_event() {
794        let mut s = ViewerSession::new("id2", None, "vid2", 0);
795        s.push_event(PlaybackEvent::Play { timestamp_ms: 100 });
796        assert_eq!(s.events.len(), 1);
797    }
798
799    // ── reservoir_sampled_heatmap ─────────────────────────────────────────────
800
801    fn full_watch(id: &str, content_ms: u64) -> ViewerSession {
802        simple_session(
803            id,
804            "c1",
805            vec![
806                PlaybackEvent::Play { timestamp_ms: 0 },
807                PlaybackEvent::End {
808                    position_ms: content_ms,
809                    watch_duration_ms: content_ms,
810                },
811            ],
812        )
813    }
814
815    fn half_watch(id: &str, content_ms: u64) -> ViewerSession {
816        simple_session(
817            id,
818            "c1",
819            vec![
820                PlaybackEvent::Play { timestamp_ms: 0 },
821                PlaybackEvent::End {
822                    position_ms: content_ms / 2,
823                    watch_duration_ms: content_ms / 2,
824                },
825            ],
826        )
827    }
828
829    #[test]
830    fn reservoir_none_on_zero_duration() {
831        let sessions = vec![full_watch("s1", 10_000)];
832        assert!(
833            reservoir_sampled_heatmap(sessions.iter(), 0, &ReservoirHeatmapConfig::default())
834                .is_none()
835        );
836    }
837
838    #[test]
839    fn reservoir_none_on_zero_bucket() {
840        let sessions = vec![full_watch("s1", 10_000)];
841        let cfg = ReservoirHeatmapConfig {
842            bucket_ms: 0,
843            ..Default::default()
844        };
845        assert!(reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).is_none());
846    }
847
848    #[test]
849    fn reservoir_none_on_zero_capacity() {
850        let sessions = vec![full_watch("s1", 10_000)];
851        let cfg = ReservoirHeatmapConfig {
852            reservoir_size: 0,
853            ..Default::default()
854        };
855        assert!(reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).is_none());
856    }
857
858    #[test]
859    fn reservoir_empty_input_empty_points() {
860        let sessions: Vec<ViewerSession> = vec![];
861        let cfg = ReservoirHeatmapConfig {
862            reservoir_size: 5,
863            bucket_ms: 2_000,
864            seed: 42,
865        };
866        let r = reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).expect("result");
867        assert_eq!(r.total_sessions_seen, 0);
868        assert_eq!(r.sessions_sampled, 0);
869        assert!(r.points.is_empty());
870    }
871
872    #[test]
873    fn reservoir_fewer_sessions_keeps_all() {
874        let sessions: Vec<ViewerSession> = (0..5)
875            .map(|i| full_watch(&format!("s{i}"), 10_000))
876            .collect();
877        let cfg = ReservoirHeatmapConfig {
878            reservoir_size: 100,
879            bucket_ms: 2_000,
880            seed: 1,
881        };
882        let r = reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).expect("result");
883        assert_eq!(r.total_sessions_seen, 5);
884        assert_eq!(r.sessions_sampled, 5);
885    }
886
887    #[test]
888    fn reservoir_caps_at_capacity() {
889        let sessions: Vec<ViewerSession> = (0..50)
890            .map(|i| full_watch(&format!("s{i}"), 10_000))
891            .collect();
892        let cfg = ReservoirHeatmapConfig {
893            reservoir_size: 10,
894            bucket_ms: 2_000,
895            seed: 42,
896        };
897        let r = reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).expect("result");
898        assert_eq!(r.total_sessions_seen, 50);
899        assert_eq!(r.sessions_sampled, 10);
900    }
901
902    #[test]
903    fn reservoir_full_watch_all_max_intensity() {
904        let sessions: Vec<ViewerSession> = (0..20)
905            .map(|i| full_watch(&format!("s{i}"), 10_000))
906            .collect();
907        let cfg = ReservoirHeatmapConfig {
908            reservoir_size: 20,
909            bucket_ms: 2_000,
910            seed: 7,
911        };
912        let r = reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).expect("result");
913        assert!(!r.points.is_empty());
914        for p in &r.points {
915            assert!(
916                (p.intensity - 1.0).abs() < 1e-6,
917                "bucket {}ms intensity={}",
918                p.position_ms,
919                p.intensity
920            );
921        }
922    }
923
924    #[test]
925    fn reservoir_partial_watch_intensity_gradient() {
926        let mut sessions: Vec<ViewerSession> = (0..10)
927            .map(|i| full_watch(&format!("full_{i}"), 10_000))
928            .collect();
929        sessions.extend((0..10).map(|i| half_watch(&format!("half_{i}"), 10_000)));
930        let cfg = ReservoirHeatmapConfig {
931            reservoir_size: 20,
932            bucket_ms: 5_000,
933            seed: 99,
934        };
935        let r = reservoir_sampled_heatmap(sessions.iter(), 10_000, &cfg).expect("result");
936        assert_eq!(r.points.len(), 2);
937        assert!((r.points[0].intensity - 1.0).abs() < 1e-6);
938        assert!(
939            (r.points[1].intensity - 0.5).abs() < 0.1,
940            "expected ~0.5, got {}",
941            r.points[1].intensity
942        );
943    }
944
945    #[test]
946    fn reservoir_deterministic_with_same_seed() {
947        let sessions: Vec<ViewerSession> = (0..100)
948            .map(|i| {
949                if i % 3 == 0 {
950                    half_watch(&format!("s{i}"), 30_000)
951                } else {
952                    full_watch(&format!("s{i}"), 30_000)
953                }
954            })
955            .collect();
956        let cfg = ReservoirHeatmapConfig {
957            reservoir_size: 30,
958            bucket_ms: 5_000,
959            seed: 0xABCD_EF01,
960        };
961        let r1 = reservoir_sampled_heatmap(sessions.iter(), 30_000, &cfg).expect("r1");
962        let r2 = reservoir_sampled_heatmap(sessions.iter(), 30_000, &cfg).expect("r2");
963        assert_eq!(r1.sessions_sampled, r2.sessions_sampled);
964        assert_eq!(r1.points.len(), r2.points.len());
965        for (a, b) in r1.points.iter().zip(r2.points.iter()) {
966            assert!((a.intensity - b.intensity).abs() < 1e-9);
967        }
968    }
969}