Skip to main content

oximedia_analytics/
geo_device.rs

1//! Geographic and device breakdown analytics for viewer session metrics.
2//!
3//! Aggregates session-level data across two orthogonal dimensions — geographic
4//! region and device type — and computes per-slice metrics such as view count,
5//! unique viewer count, total watch time, and average watch time.
6//!
7//! ## Usage
8//!
9//! ```rust
10//! use oximedia_analytics::geo_device::{SessionRecord, BreakdownAnalyzer, DeviceType, Region};
11//!
12//! let mut analyzer = BreakdownAnalyzer::new();
13//! analyzer.ingest(SessionRecord {
14//!     viewer_id: "v1".into(),
15//!     region: Region::NorthAmerica,
16//!     device: DeviceType::Desktop,
17//!     watch_seconds: 120.0,
18//! });
19//! analyzer.ingest(SessionRecord {
20//!     viewer_id: "v2".into(),
21//!     region: Region::NorthAmerica,
22//!     device: DeviceType::Mobile,
23//!     watch_seconds: 60.0,
24//! });
25//! let by_region = analyzer.breakdown_by_region();
26//! assert_eq!(by_region.len(), 1); // one region: NorthAmerica
27//! ```
28
29use crate::error::AnalyticsError;
30use std::collections::HashMap;
31
32// ─── DeviceType ──────────────────────────────────────────────────────────────
33
34/// Broad device category inferred from the user-agent or client metadata.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36pub enum DeviceType {
37    /// Traditional laptop or desktop computer.
38    Desktop,
39    /// Smartphone.
40    Mobile,
41    /// Tablet (iPad, Android tablet, etc.).
42    Tablet,
43    /// Smart TV, streaming stick, or set-top box.
44    SmartTv,
45    /// Game console.
46    Console,
47    /// Device type could not be determined.
48    Unknown,
49}
50
51impl DeviceType {
52    /// Returns a human-readable label for the device type.
53    #[must_use]
54    pub fn label(&self) -> &'static str {
55        match self {
56            Self::Desktop => "desktop",
57            Self::Mobile => "mobile",
58            Self::Tablet => "tablet",
59            Self::SmartTv => "smart_tv",
60            Self::Console => "console",
61            Self::Unknown => "unknown",
62        }
63    }
64}
65
66// ─── Region ──────────────────────────────────────────────────────────────────
67
68/// Geographic region grouping for broadcast and streaming analytics.
69///
70/// Regions correspond to broad audience groupings commonly used in media
71/// rights and advertising markets.
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
73pub enum Region {
74    /// United States, Canada, Mexico.
75    NorthAmerica,
76    /// Brazil, Argentina, Colombia, and other LATAM markets.
77    LatinAmerica,
78    /// UK, France, Germany, and other European markets.
79    Europe,
80    /// Russia and other CIS/Eastern European markets.
81    EasternEurope,
82    /// Middle East and Africa.
83    Mea,
84    /// India, Southeast Asia, Australia, New Zealand.
85    AsiaPacific,
86    /// Japan, South Korea, China.
87    EastAsia,
88    /// Region could not be mapped.
89    Unknown,
90}
91
92impl Region {
93    /// Returns a human-readable label for the region.
94    #[must_use]
95    pub fn label(&self) -> &'static str {
96        match self {
97            Self::NorthAmerica => "north_america",
98            Self::LatinAmerica => "latin_america",
99            Self::Europe => "europe",
100            Self::EasternEurope => "eastern_europe",
101            Self::Mea => "mea",
102            Self::AsiaPacific => "asia_pacific",
103            Self::EastAsia => "east_asia",
104            Self::Unknown => "unknown",
105        }
106    }
107}
108
109// ─── SessionRecord ───────────────────────────────────────────────────────────
110
111/// A single viewer session enriched with geographic and device metadata.
112#[derive(Debug, Clone)]
113pub struct SessionRecord {
114    /// Opaque viewer / user identifier (used for unique viewer counting).
115    pub viewer_id: String,
116    /// Geographic region of the viewer.
117    pub region: Region,
118    /// Device category used for playback.
119    pub device: DeviceType,
120    /// Total watch time for this session in seconds.
121    pub watch_seconds: f64,
122}
123
124// ─── SliceMetrics ────────────────────────────────────────────────────────────
125
126/// Aggregated metrics for a single dimension slice (region or device).
127#[derive(Debug, Clone, PartialEq)]
128pub struct SliceMetrics {
129    /// Total number of sessions in this slice.
130    pub sessions: u64,
131    /// Number of distinct `viewer_id` values (approximate unique viewers).
132    pub unique_viewers: u64,
133    /// Sum of all `watch_seconds` values in this slice.
134    pub total_watch_seconds: f64,
135    /// Average watch time per session (`total_watch_seconds / sessions`).
136    pub avg_watch_seconds: f64,
137}
138
139impl SliceMetrics {
140    fn new(sessions: u64, unique_viewers: u64, total_watch_seconds: f64) -> Self {
141        let avg = if sessions == 0 {
142            0.0
143        } else {
144            total_watch_seconds / sessions as f64
145        };
146        Self {
147            sessions,
148            unique_viewers,
149            total_watch_seconds,
150            avg_watch_seconds: avg,
151        }
152    }
153}
154
155// ─── BreakdownAnalyzer ───────────────────────────────────────────────────────
156
157/// Accumulates session records and computes breakdowns by region and device.
158#[derive(Debug, Default)]
159pub struct BreakdownAnalyzer {
160    records: Vec<SessionRecord>,
161}
162
163impl BreakdownAnalyzer {
164    /// Creates an empty analyzer.
165    #[must_use]
166    pub fn new() -> Self {
167        Self {
168            records: Vec::new(),
169        }
170    }
171
172    /// Ingests a single session record.
173    pub fn ingest(&mut self, record: SessionRecord) {
174        self.records.push(record);
175    }
176
177    /// Ingests a batch of session records.
178    pub fn ingest_batch(&mut self, records: impl IntoIterator<Item = SessionRecord>) {
179        self.records.extend(records);
180    }
181
182    /// Total number of session records ingested so far.
183    #[must_use]
184    pub fn session_count(&self) -> usize {
185        self.records.len()
186    }
187
188    /// Computes per-region aggregates.
189    ///
190    /// Returns a map from [`Region`] to [`SliceMetrics`].
191    /// Regions with no sessions are omitted.
192    #[must_use]
193    pub fn breakdown_by_region(&self) -> HashMap<Region, SliceMetrics> {
194        self.aggregate(|r| r.region)
195    }
196
197    /// Computes per-device aggregates.
198    ///
199    /// Returns a map from [`DeviceType`] to [`SliceMetrics`].
200    /// Device types with no sessions are omitted.
201    #[must_use]
202    pub fn breakdown_by_device(&self) -> HashMap<DeviceType, SliceMetrics> {
203        self.aggregate(|r| r.device)
204    }
205
206    /// Computes a cross-tab: breakdown by `(region, device)` pair.
207    ///
208    /// Returns a map from `(Region, DeviceType)` to [`SliceMetrics`].
209    #[must_use]
210    pub fn breakdown_by_region_and_device(&self) -> HashMap<(Region, DeviceType), SliceMetrics> {
211        self.aggregate(|r| (r.region, r.device))
212    }
213
214    /// Finds the region with the highest total watch time.
215    ///
216    /// # Errors
217    ///
218    /// Returns [`AnalyticsError::InsufficientData`] when no records have been
219    /// ingested.
220    pub fn top_region_by_watch_time(&self) -> Result<Region, AnalyticsError> {
221        if self.records.is_empty() {
222            return Err(AnalyticsError::InsufficientData(
223                "no sessions ingested".into(),
224            ));
225        }
226        let breakdown = self.breakdown_by_region();
227        breakdown
228            .into_iter()
229            .max_by(|(_, a), (_, b)| {
230                a.total_watch_seconds
231                    .partial_cmp(&b.total_watch_seconds)
232                    .unwrap_or(std::cmp::Ordering::Equal)
233            })
234            .map(|(region, _)| region)
235            .ok_or_else(|| AnalyticsError::InsufficientData("breakdown empty".into()))
236    }
237
238    /// Finds the device type with the highest session count.
239    ///
240    /// # Errors
241    ///
242    /// Returns [`AnalyticsError::InsufficientData`] when no records have been
243    /// ingested.
244    pub fn top_device_by_sessions(&self) -> Result<DeviceType, AnalyticsError> {
245        if self.records.is_empty() {
246            return Err(AnalyticsError::InsufficientData(
247                "no sessions ingested".into(),
248            ));
249        }
250        let breakdown = self.breakdown_by_device();
251        breakdown
252            .into_iter()
253            .max_by_key(|(_, m)| m.sessions)
254            .map(|(device, _)| device)
255            .ok_or_else(|| AnalyticsError::InsufficientData("breakdown empty".into()))
256    }
257
258    // ── private helpers ──────────────────────────────────────────────────────
259
260    /// Generic aggregation keyed by the output of `key_fn`.
261    fn aggregate<K, F>(&self, key_fn: F) -> HashMap<K, SliceMetrics>
262    where
263        K: Eq + std::hash::Hash,
264        F: Fn(&SessionRecord) -> K,
265    {
266        // Phase 1: accumulate totals.
267        struct Acc {
268            sessions: u64,
269            viewers: std::collections::HashSet<String>,
270            total_watch: f64,
271        }
272
273        let mut map: HashMap<K, Acc> = HashMap::new();
274
275        for rec in &self.records {
276            let key = key_fn(rec);
277            let acc = map.entry(key).or_insert_with(|| Acc {
278                sessions: 0,
279                viewers: std::collections::HashSet::new(),
280                total_watch: 0.0,
281            });
282            acc.sessions += 1;
283            acc.viewers.insert(rec.viewer_id.clone());
284            acc.total_watch += rec.watch_seconds;
285        }
286
287        // Phase 2: finalise into SliceMetrics.
288        map.into_iter()
289            .map(|(k, acc)| {
290                (
291                    k,
292                    SliceMetrics::new(acc.sessions, acc.viewers.len() as u64, acc.total_watch),
293                )
294            })
295            .collect()
296    }
297}
298
299// ─── Period Comparison ───────────────────────────────────────────────────────
300
301/// A tagged session record with an epoch timestamp for temporal comparisons.
302#[derive(Debug, Clone)]
303pub struct TimestampedRecord {
304    /// The underlying session record.
305    pub record: SessionRecord,
306    /// Unix epoch timestamp in seconds when this session occurred.
307    pub timestamp_s: i64,
308}
309
310impl TimestampedRecord {
311    /// Wrap an existing record with a timestamp.
312    pub fn new(record: SessionRecord, timestamp_s: i64) -> Self {
313        Self {
314            record,
315            timestamp_s,
316        }
317    }
318}
319
320/// Comparison of a metric across two time periods (baseline vs comparison).
321#[derive(Debug, Clone, PartialEq)]
322pub struct PeriodDelta {
323    /// Metric value in the baseline period.
324    pub baseline: f64,
325    /// Metric value in the comparison period.
326    pub comparison: f64,
327    /// Absolute change: `comparison − baseline`.
328    pub absolute_change: f64,
329    /// Relative (percentage) change: `(comparison − baseline) / baseline * 100`.
330    ///
331    /// `NaN` when `baseline` is zero.
332    pub relative_change_pct: f64,
333}
334
335impl PeriodDelta {
336    fn new(baseline: f64, comparison: f64) -> Self {
337        let absolute_change = comparison - baseline;
338        let relative_change_pct = if baseline.abs() < f64::EPSILON {
339            f64::NAN
340        } else {
341            absolute_change / baseline * 100.0
342        };
343        Self {
344            baseline,
345            comparison,
346            absolute_change,
347            relative_change_pct,
348        }
349    }
350
351    /// Returns `true` when the comparison period shows growth over baseline.
352    pub fn is_growing(&self) -> bool {
353        self.absolute_change > 0.0
354    }
355}
356
357/// Period-over-period breakdown comparison for a single region or device slice.
358#[derive(Debug, Clone)]
359pub struct SliceComparison {
360    /// Change in session count.
361    pub sessions: PeriodDelta,
362    /// Change in unique viewers.
363    pub unique_viewers: PeriodDelta,
364    /// Change in total watch seconds.
365    pub total_watch_seconds: PeriodDelta,
366    /// Change in average watch seconds per session.
367    pub avg_watch_seconds: PeriodDelta,
368}
369
370/// A full geo/device breakdown report with optional period comparison.
371#[derive(Debug, Clone)]
372pub struct GeoDeviceReport {
373    /// Per-region session metrics.
374    pub by_region: HashMap<Region, SliceMetrics>,
375    /// Per-device session metrics.
376    pub by_device: HashMap<DeviceType, SliceMetrics>,
377    /// Cross-tab (region × device) metrics.
378    pub cross_tab: HashMap<(Region, DeviceType), SliceMetrics>,
379    /// Total number of sessions in the report.
380    pub total_sessions: u64,
381    /// Total unique viewer count across all sessions.
382    pub total_unique_viewers: u64,
383    /// Total watch time in seconds.
384    pub total_watch_seconds: f64,
385    /// Average watch time per session across all sessions.
386    pub overall_avg_watch_seconds: f64,
387}
388
389impl GeoDeviceReport {
390    /// Fraction of total sessions that belong to `region` (0.0–1.0).
391    ///
392    /// Returns `0.0` when no sessions are recorded.
393    pub fn region_share(&self, region: Region) -> f64 {
394        if self.total_sessions == 0 {
395            return 0.0;
396        }
397        self.by_region
398            .get(&region)
399            .map(|m| m.sessions as f64 / self.total_sessions as f64)
400            .unwrap_or(0.0)
401    }
402
403    /// Fraction of total sessions that belong to `device` (0.0–1.0).
404    ///
405    /// Returns `0.0` when no sessions are recorded.
406    pub fn device_share(&self, device: DeviceType) -> f64 {
407        if self.total_sessions == 0 {
408            return 0.0;
409        }
410        self.by_device
411            .get(&device)
412            .map(|m| m.sessions as f64 / self.total_sessions as f64)
413            .unwrap_or(0.0)
414    }
415
416    /// Returns the region with the highest session count, or `None` when empty.
417    pub fn dominant_region_by_sessions(&self) -> Option<Region> {
418        self.by_region
419            .iter()
420            .max_by_key(|(_, m)| m.sessions)
421            .map(|(&r, _)| r)
422    }
423
424    /// Returns the device type with the highest unique viewer count, or `None`
425    /// when empty.
426    pub fn dominant_device_by_viewers(&self) -> Option<DeviceType> {
427        self.by_device
428            .iter()
429            .max_by_key(|(_, m)| m.unique_viewers)
430            .map(|(&d, _)| d)
431    }
432}
433
434impl BreakdownAnalyzer {
435    /// Generate a full [`GeoDeviceReport`] from the ingested records.
436    pub fn build_report(&self) -> GeoDeviceReport {
437        let by_region = self.breakdown_by_region();
438        let by_device = self.breakdown_by_device();
439        let cross_tab = self.breakdown_by_region_and_device();
440
441        let total_sessions = self.records.len() as u64;
442        let total_watch_seconds: f64 = self.records.iter().map(|r| r.watch_seconds).sum();
443        let total_unique_viewers = {
444            let unique: std::collections::HashSet<&str> =
445                self.records.iter().map(|r| r.viewer_id.as_str()).collect();
446            unique.len() as u64
447        };
448        let overall_avg_watch_seconds = if total_sessions == 0 {
449            0.0
450        } else {
451            total_watch_seconds / total_sessions as f64
452        };
453
454        GeoDeviceReport {
455            by_region,
456            by_device,
457            cross_tab,
458            total_sessions,
459            total_unique_viewers,
460            total_watch_seconds,
461            overall_avg_watch_seconds,
462        }
463    }
464
465    /// Compare metrics for a [`Region`] between two sub-sets of records split
466    /// at `split_timestamp_s`.  Records at or before the split are the
467    /// *baseline*; records after are the *comparison* period.
468    ///
469    /// Returns `None` when no records are tagged with timestamps (i.e., the
470    /// [`TimestampedRecord`] API has not been used) or when the region is
471    /// absent from either period.
472    ///
473    /// Use [`BreakdownAnalyzer::ingest_timestamped`] to build an analyzer with
474    /// timestamp metadata.
475    pub fn compare_region_periods(
476        &self,
477        timestamped: &[TimestampedRecord],
478        region: Region,
479        split_timestamp_s: i64,
480    ) -> Option<SliceComparison> {
481        let baseline: Vec<SessionRecord> = timestamped
482            .iter()
483            .filter(|t| t.timestamp_s <= split_timestamp_s && t.record.region == region)
484            .map(|t| t.record.clone())
485            .collect();
486        let comparison: Vec<SessionRecord> = timestamped
487            .iter()
488            .filter(|t| t.timestamp_s > split_timestamp_s && t.record.region == region)
489            .map(|t| t.record.clone())
490            .collect();
491
492        if baseline.is_empty() && comparison.is_empty() {
493            return None;
494        }
495
496        let base_metrics = Self::compute_slice_metrics(&baseline);
497        let comp_metrics = Self::compute_slice_metrics(&comparison);
498
499        Some(SliceComparison {
500            sessions: PeriodDelta::new(base_metrics.sessions as f64, comp_metrics.sessions as f64),
501            unique_viewers: PeriodDelta::new(
502                base_metrics.unique_viewers as f64,
503                comp_metrics.unique_viewers as f64,
504            ),
505            total_watch_seconds: PeriodDelta::new(
506                base_metrics.total_watch_seconds,
507                comp_metrics.total_watch_seconds,
508            ),
509            avg_watch_seconds: PeriodDelta::new(
510                base_metrics.avg_watch_seconds,
511                comp_metrics.avg_watch_seconds,
512            ),
513        })
514    }
515
516    /// Compare metrics for a [`DeviceType`] between two time periods split at
517    /// `split_timestamp_s`.
518    pub fn compare_device_periods(
519        &self,
520        timestamped: &[TimestampedRecord],
521        device: DeviceType,
522        split_timestamp_s: i64,
523    ) -> Option<SliceComparison> {
524        let baseline: Vec<SessionRecord> = timestamped
525            .iter()
526            .filter(|t| t.timestamp_s <= split_timestamp_s && t.record.device == device)
527            .map(|t| t.record.clone())
528            .collect();
529        let comparison: Vec<SessionRecord> = timestamped
530            .iter()
531            .filter(|t| t.timestamp_s > split_timestamp_s && t.record.device == device)
532            .map(|t| t.record.clone())
533            .collect();
534
535        if baseline.is_empty() && comparison.is_empty() {
536            return None;
537        }
538
539        let base_metrics = Self::compute_slice_metrics(&baseline);
540        let comp_metrics = Self::compute_slice_metrics(&comparison);
541
542        Some(SliceComparison {
543            sessions: PeriodDelta::new(base_metrics.sessions as f64, comp_metrics.sessions as f64),
544            unique_viewers: PeriodDelta::new(
545                base_metrics.unique_viewers as f64,
546                comp_metrics.unique_viewers as f64,
547            ),
548            total_watch_seconds: PeriodDelta::new(
549                base_metrics.total_watch_seconds,
550                comp_metrics.total_watch_seconds,
551            ),
552            avg_watch_seconds: PeriodDelta::new(
553                base_metrics.avg_watch_seconds,
554                comp_metrics.avg_watch_seconds,
555            ),
556        })
557    }
558
559    /// Ingest a collection of [`TimestampedRecord`]s (discards the timestamp
560    /// metadata for the core analyzer, but returns the slice for use with
561    /// comparison functions).
562    pub fn ingest_timestamped(&mut self, records: &[TimestampedRecord]) {
563        for tr in records {
564            self.records.push(tr.record.clone());
565        }
566    }
567
568    // ── private ──────────────────────────────────────────────────────────────
569
570    /// Compute `SliceMetrics` directly from an arbitrary slice of records.
571    fn compute_slice_metrics(records: &[SessionRecord]) -> SliceMetrics {
572        if records.is_empty() {
573            return SliceMetrics::new(0, 0, 0.0);
574        }
575        let sessions = records.len() as u64;
576        let total_watch: f64 = records.iter().map(|r| r.watch_seconds).sum();
577        let unique: std::collections::HashSet<&str> =
578            records.iter().map(|r| r.viewer_id.as_str()).collect();
579        SliceMetrics::new(sessions, unique.len() as u64, total_watch)
580    }
581}
582
583// ─── Tests ────────────────────────────────────────────────────────────────────
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588
589    fn rec(viewer_id: &str, region: Region, device: DeviceType, watch: f64) -> SessionRecord {
590        SessionRecord {
591            viewer_id: viewer_id.into(),
592            region,
593            device,
594            watch_seconds: watch,
595        }
596    }
597
598    #[test]
599    fn empty_analyzer_has_zero_sessions() {
600        let a = BreakdownAnalyzer::new();
601        assert_eq!(a.session_count(), 0);
602    }
603
604    #[test]
605    fn single_record_breakdown_by_region() {
606        let mut a = BreakdownAnalyzer::new();
607        a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 300.0));
608        let bd = a.breakdown_by_region();
609        let m = bd.get(&Region::Europe).expect("Europe present");
610        assert_eq!(m.sessions, 1);
611        assert_eq!(m.unique_viewers, 1);
612        assert!((m.total_watch_seconds - 300.0).abs() < 1e-9);
613        assert!((m.avg_watch_seconds - 300.0).abs() < 1e-9);
614    }
615
616    #[test]
617    fn multiple_regions_counted_separately() {
618        let mut a = BreakdownAnalyzer::new();
619        a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 100.0));
620        a.ingest(rec("v2", Region::AsiaPacific, DeviceType::Mobile, 200.0));
621        a.ingest(rec("v3", Region::Europe, DeviceType::Tablet, 150.0));
622
623        let bd = a.breakdown_by_region();
624        let eu = bd.get(&Region::Europe).expect("EU");
625        let ap = bd.get(&Region::AsiaPacific).expect("AP");
626
627        assert_eq!(eu.sessions, 2);
628        assert_eq!(ap.sessions, 1);
629        assert!((eu.total_watch_seconds - 250.0).abs() < 1e-9);
630    }
631
632    #[test]
633    fn unique_viewers_deduplicated() {
634        let mut a = BreakdownAnalyzer::new();
635        // Same viewer, three sessions in Europe.
636        a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 60.0));
637        a.ingest(rec("v1", Region::Europe, DeviceType::Mobile, 45.0));
638        a.ingest(rec("v2", Region::Europe, DeviceType::Desktop, 90.0));
639
640        let bd = a.breakdown_by_region();
641        let eu = bd.get(&Region::Europe).expect("EU");
642        assert_eq!(eu.sessions, 3);
643        assert_eq!(eu.unique_viewers, 2); // v1 and v2
644    }
645
646    #[test]
647    fn breakdown_by_device_type() {
648        let mut a = BreakdownAnalyzer::new();
649        a.ingest(rec("v1", Region::NorthAmerica, DeviceType::Mobile, 100.0));
650        a.ingest(rec("v2", Region::NorthAmerica, DeviceType::Mobile, 80.0));
651        a.ingest(rec("v3", Region::Europe, DeviceType::Desktop, 200.0));
652
653        let bd = a.breakdown_by_device();
654        let mob = bd.get(&DeviceType::Mobile).expect("Mobile");
655        let desk = bd.get(&DeviceType::Desktop).expect("Desktop");
656
657        assert_eq!(mob.sessions, 2);
658        assert_eq!(desk.sessions, 1);
659    }
660
661    #[test]
662    fn breakdown_cross_tab() {
663        let mut a = BreakdownAnalyzer::new();
664        a.ingest(rec("v1", Region::Europe, DeviceType::Mobile, 60.0));
665        a.ingest(rec("v2", Region::Europe, DeviceType::Desktop, 120.0));
666        a.ingest(rec("v3", Region::NorthAmerica, DeviceType::Mobile, 90.0));
667
668        let bd = a.breakdown_by_region_and_device();
669        assert_eq!(
670            bd.get(&(Region::Europe, DeviceType::Mobile))
671                .map(|m| m.sessions),
672            Some(1)
673        );
674        assert_eq!(
675            bd.get(&(Region::Europe, DeviceType::Desktop))
676                .map(|m| m.sessions),
677            Some(1)
678        );
679        assert_eq!(
680            bd.get(&(Region::NorthAmerica, DeviceType::Mobile))
681                .map(|m| m.sessions),
682            Some(1)
683        );
684    }
685
686    #[test]
687    fn top_region_by_watch_time() {
688        let mut a = BreakdownAnalyzer::new();
689        a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 100.0));
690        a.ingest(rec("v2", Region::AsiaPacific, DeviceType::Mobile, 500.0));
691        a.ingest(rec("v3", Region::AsiaPacific, DeviceType::Tablet, 300.0));
692
693        let top = a.top_region_by_watch_time().expect("top region");
694        assert_eq!(top, Region::AsiaPacific);
695    }
696
697    #[test]
698    fn top_device_by_sessions() {
699        let mut a = BreakdownAnalyzer::new();
700        a.ingest(rec("v1", Region::Europe, DeviceType::Mobile, 60.0));
701        a.ingest(rec("v2", Region::Europe, DeviceType::Mobile, 60.0));
702        a.ingest(rec("v3", Region::Europe, DeviceType::SmartTv, 200.0));
703
704        let top = a.top_device_by_sessions().expect("top device");
705        assert_eq!(top, DeviceType::Mobile);
706    }
707
708    #[test]
709    fn empty_analyzer_top_region_errors() {
710        let a = BreakdownAnalyzer::new();
711        assert!(a.top_region_by_watch_time().is_err());
712    }
713
714    #[test]
715    fn empty_analyzer_top_device_errors() {
716        let a = BreakdownAnalyzer::new();
717        assert!(a.top_device_by_sessions().is_err());
718    }
719
720    #[test]
721    fn device_type_labels_are_stable() {
722        assert_eq!(DeviceType::Desktop.label(), "desktop");
723        assert_eq!(DeviceType::Mobile.label(), "mobile");
724        assert_eq!(DeviceType::SmartTv.label(), "smart_tv");
725        assert_eq!(DeviceType::Unknown.label(), "unknown");
726    }
727
728    #[test]
729    fn region_labels_are_stable() {
730        assert_eq!(Region::NorthAmerica.label(), "north_america");
731        assert_eq!(Region::AsiaPacific.label(), "asia_pacific");
732        assert_eq!(Region::Unknown.label(), "unknown");
733    }
734
735    #[test]
736    fn ingest_batch() {
737        let mut a = BreakdownAnalyzer::new();
738        let records = vec![
739            rec("v1", Region::Europe, DeviceType::Desktop, 100.0),
740            rec("v2", Region::Europe, DeviceType::Desktop, 200.0),
741        ];
742        a.ingest_batch(records);
743        assert_eq!(a.session_count(), 2);
744    }
745
746    // ── GeoDeviceReport ───────────────────────────────────────────────────────
747
748    #[test]
749    fn build_report_totals_correct() {
750        let mut a = BreakdownAnalyzer::new();
751        a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 100.0));
752        a.ingest(rec("v2", Region::Europe, DeviceType::Mobile, 200.0));
753        a.ingest(rec("v3", Region::AsiaPacific, DeviceType::Mobile, 300.0));
754
755        let report = a.build_report();
756        assert_eq!(report.total_sessions, 3);
757        assert_eq!(report.total_unique_viewers, 3);
758        assert!((report.total_watch_seconds - 600.0).abs() < 1e-9);
759        assert!((report.overall_avg_watch_seconds - 200.0).abs() < 1e-9);
760    }
761
762    #[test]
763    fn build_report_empty_analyzer() {
764        let a = BreakdownAnalyzer::new();
765        let report = a.build_report();
766        assert_eq!(report.total_sessions, 0);
767        assert_eq!(report.total_unique_viewers, 0);
768        assert_eq!(report.total_watch_seconds, 0.0);
769        assert_eq!(report.overall_avg_watch_seconds, 0.0);
770        assert!(report.by_region.is_empty());
771    }
772
773    #[test]
774    fn region_share_sums_to_one() {
775        let mut a = BreakdownAnalyzer::new();
776        a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 100.0));
777        a.ingest(rec("v2", Region::Europe, DeviceType::Desktop, 100.0));
778        a.ingest(rec("v3", Region::AsiaPacific, DeviceType::Mobile, 100.0));
779
780        let report = a.build_report();
781        let eu_share = report.region_share(Region::Europe);
782        let ap_share = report.region_share(Region::AsiaPacific);
783        let unknown_share = report.region_share(Region::Unknown);
784        let total = eu_share + ap_share + unknown_share;
785        assert!((total - (eu_share + ap_share)).abs() < 1e-9); // unknown is 0
786        assert!((eu_share + ap_share - 1.0).abs() < 1e-9);
787    }
788
789    #[test]
790    fn device_share_correct() {
791        let mut a = BreakdownAnalyzer::new();
792        a.ingest(rec("v1", Region::Europe, DeviceType::Mobile, 100.0));
793        a.ingest(rec("v2", Region::Europe, DeviceType::Mobile, 100.0));
794        a.ingest(rec("v3", Region::Europe, DeviceType::Desktop, 100.0));
795
796        let report = a.build_report();
797        let mobile_share = report.device_share(DeviceType::Mobile);
798        let desktop_share = report.device_share(DeviceType::Desktop);
799        assert!((mobile_share - 2.0 / 3.0).abs() < 1e-9);
800        assert!((desktop_share - 1.0 / 3.0).abs() < 1e-9);
801    }
802
803    #[test]
804    fn dominant_region_by_sessions() {
805        let mut a = BreakdownAnalyzer::new();
806        a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 100.0));
807        a.ingest(rec("v2", Region::Europe, DeviceType::Mobile, 100.0));
808        a.ingest(rec("v3", Region::AsiaPacific, DeviceType::Mobile, 100.0));
809
810        let report = a.build_report();
811        assert_eq!(report.dominant_region_by_sessions(), Some(Region::Europe));
812    }
813
814    #[test]
815    fn dominant_device_by_viewers() {
816        let mut a = BreakdownAnalyzer::new();
817        a.ingest(rec("v1", Region::Europe, DeviceType::SmartTv, 100.0));
818        a.ingest(rec("v2", Region::Europe, DeviceType::SmartTv, 100.0));
819        a.ingest(rec("v3", Region::Europe, DeviceType::Desktop, 100.0));
820
821        let report = a.build_report();
822        assert_eq!(
823            report.dominant_device_by_viewers(),
824            Some(DeviceType::SmartTv)
825        );
826    }
827
828    // ── Period comparison ─────────────────────────────────────────────────────
829
830    fn ts_rec(
831        viewer_id: &str,
832        region: Region,
833        device: DeviceType,
834        watch: f64,
835        ts: i64,
836    ) -> TimestampedRecord {
837        TimestampedRecord::new(
838            SessionRecord {
839                viewer_id: viewer_id.into(),
840                region,
841                device,
842                watch_seconds: watch,
843            },
844            ts,
845        )
846    }
847
848    #[test]
849    fn compare_region_periods_sessions_grow() {
850        let a = BreakdownAnalyzer::new();
851        let records = vec![
852            ts_rec("v1", Region::Europe, DeviceType::Desktop, 60.0, 100),
853            ts_rec("v2", Region::Europe, DeviceType::Mobile, 90.0, 200),
854            ts_rec("v3", Region::Europe, DeviceType::Desktop, 120.0, 201),
855            ts_rec("v4", Region::Europe, DeviceType::Mobile, 80.0, 300),
856        ];
857        // Split at 200: baseline has 2 sessions (v1,v2), comparison has 2 (v3,v4).
858        let cmp = a
859            .compare_region_periods(&records, Region::Europe, 200)
860            .expect("comparison");
861        assert_eq!(cmp.sessions.baseline as u64, 2);
862        assert_eq!(cmp.sessions.comparison as u64, 2);
863        assert!((cmp.sessions.absolute_change).abs() < 1e-9); // no change
864    }
865
866    #[test]
867    fn compare_region_periods_watch_time_grows() {
868        let a = BreakdownAnalyzer::new();
869        let records = vec![
870            ts_rec("v1", Region::NorthAmerica, DeviceType::Mobile, 60.0, 50),
871            ts_rec("v2", Region::NorthAmerica, DeviceType::Mobile, 600.0, 150),
872        ];
873        // Baseline (≤100): 60s watch; comparison (>100): 600s watch.
874        let cmp = a
875            .compare_region_periods(&records, Region::NorthAmerica, 100)
876            .expect("comparison");
877        assert!((cmp.total_watch_seconds.baseline - 60.0).abs() < 1e-9);
878        assert!((cmp.total_watch_seconds.comparison - 600.0).abs() < 1e-9);
879        assert!(cmp.total_watch_seconds.is_growing());
880    }
881
882    #[test]
883    fn compare_device_periods_absent_region_returns_none() {
884        let a = BreakdownAnalyzer::new();
885        let records = vec![ts_rec(
886            "v1",
887            Region::AsiaPacific,
888            DeviceType::Mobile,
889            60.0,
890            100,
891        )];
892        // Tablet has no records in either period.
893        let cmp = a.compare_device_periods(&records, DeviceType::Tablet, 50);
894        assert!(cmp.is_none());
895    }
896
897    #[test]
898    fn period_delta_relative_change_computed() {
899        let delta = PeriodDelta::new(100.0, 150.0);
900        assert!((delta.relative_change_pct - 50.0).abs() < 1e-9);
901        assert!(delta.is_growing());
902    }
903
904    #[test]
905    fn period_delta_zero_baseline_gives_nan_relative() {
906        let delta = PeriodDelta::new(0.0, 10.0);
907        assert!(delta.relative_change_pct.is_nan());
908    }
909
910    #[test]
911    fn ingest_timestamped_populates_analyzer() {
912        let mut a = BreakdownAnalyzer::new();
913        let records = vec![
914            ts_rec("v1", Region::Europe, DeviceType::Desktop, 100.0, 1),
915            ts_rec("v2", Region::Europe, DeviceType::Mobile, 200.0, 2),
916        ];
917        a.ingest_timestamped(&records);
918        assert_eq!(a.session_count(), 2);
919    }
920}