use crate::error::AnalyticsError;
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DeviceType {
Desktop,
Mobile,
Tablet,
SmartTv,
Console,
Unknown,
}
impl DeviceType {
#[must_use]
pub fn label(&self) -> &'static str {
match self {
Self::Desktop => "desktop",
Self::Mobile => "mobile",
Self::Tablet => "tablet",
Self::SmartTv => "smart_tv",
Self::Console => "console",
Self::Unknown => "unknown",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Region {
NorthAmerica,
LatinAmerica,
Europe,
EasternEurope,
Mea,
AsiaPacific,
EastAsia,
Unknown,
}
impl Region {
#[must_use]
pub fn label(&self) -> &'static str {
match self {
Self::NorthAmerica => "north_america",
Self::LatinAmerica => "latin_america",
Self::Europe => "europe",
Self::EasternEurope => "eastern_europe",
Self::Mea => "mea",
Self::AsiaPacific => "asia_pacific",
Self::EastAsia => "east_asia",
Self::Unknown => "unknown",
}
}
}
#[derive(Debug, Clone)]
pub struct SessionRecord {
pub viewer_id: String,
pub region: Region,
pub device: DeviceType,
pub watch_seconds: f64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct SliceMetrics {
pub sessions: u64,
pub unique_viewers: u64,
pub total_watch_seconds: f64,
pub avg_watch_seconds: f64,
}
impl SliceMetrics {
fn new(sessions: u64, unique_viewers: u64, total_watch_seconds: f64) -> Self {
let avg = if sessions == 0 {
0.0
} else {
total_watch_seconds / sessions as f64
};
Self {
sessions,
unique_viewers,
total_watch_seconds,
avg_watch_seconds: avg,
}
}
}
#[derive(Debug, Default)]
pub struct BreakdownAnalyzer {
records: Vec<SessionRecord>,
}
impl BreakdownAnalyzer {
#[must_use]
pub fn new() -> Self {
Self {
records: Vec::new(),
}
}
pub fn ingest(&mut self, record: SessionRecord) {
self.records.push(record);
}
pub fn ingest_batch(&mut self, records: impl IntoIterator<Item = SessionRecord>) {
self.records.extend(records);
}
#[must_use]
pub fn session_count(&self) -> usize {
self.records.len()
}
#[must_use]
pub fn breakdown_by_region(&self) -> HashMap<Region, SliceMetrics> {
self.aggregate(|r| r.region)
}
#[must_use]
pub fn breakdown_by_device(&self) -> HashMap<DeviceType, SliceMetrics> {
self.aggregate(|r| r.device)
}
#[must_use]
pub fn breakdown_by_region_and_device(&self) -> HashMap<(Region, DeviceType), SliceMetrics> {
self.aggregate(|r| (r.region, r.device))
}
pub fn top_region_by_watch_time(&self) -> Result<Region, AnalyticsError> {
if self.records.is_empty() {
return Err(AnalyticsError::InsufficientData(
"no sessions ingested".into(),
));
}
let breakdown = self.breakdown_by_region();
breakdown
.into_iter()
.max_by(|(_, a), (_, b)| {
a.total_watch_seconds
.partial_cmp(&b.total_watch_seconds)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(region, _)| region)
.ok_or_else(|| AnalyticsError::InsufficientData("breakdown empty".into()))
}
pub fn top_device_by_sessions(&self) -> Result<DeviceType, AnalyticsError> {
if self.records.is_empty() {
return Err(AnalyticsError::InsufficientData(
"no sessions ingested".into(),
));
}
let breakdown = self.breakdown_by_device();
breakdown
.into_iter()
.max_by_key(|(_, m)| m.sessions)
.map(|(device, _)| device)
.ok_or_else(|| AnalyticsError::InsufficientData("breakdown empty".into()))
}
fn aggregate<K, F>(&self, key_fn: F) -> HashMap<K, SliceMetrics>
where
K: Eq + std::hash::Hash,
F: Fn(&SessionRecord) -> K,
{
struct Acc {
sessions: u64,
viewers: std::collections::HashSet<String>,
total_watch: f64,
}
let mut map: HashMap<K, Acc> = HashMap::new();
for rec in &self.records {
let key = key_fn(rec);
let acc = map.entry(key).or_insert_with(|| Acc {
sessions: 0,
viewers: std::collections::HashSet::new(),
total_watch: 0.0,
});
acc.sessions += 1;
acc.viewers.insert(rec.viewer_id.clone());
acc.total_watch += rec.watch_seconds;
}
map.into_iter()
.map(|(k, acc)| {
(
k,
SliceMetrics::new(acc.sessions, acc.viewers.len() as u64, acc.total_watch),
)
})
.collect()
}
}
#[derive(Debug, Clone)]
pub struct TimestampedRecord {
pub record: SessionRecord,
pub timestamp_s: i64,
}
impl TimestampedRecord {
pub fn new(record: SessionRecord, timestamp_s: i64) -> Self {
Self {
record,
timestamp_s,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct PeriodDelta {
pub baseline: f64,
pub comparison: f64,
pub absolute_change: f64,
pub relative_change_pct: f64,
}
impl PeriodDelta {
fn new(baseline: f64, comparison: f64) -> Self {
let absolute_change = comparison - baseline;
let relative_change_pct = if baseline.abs() < f64::EPSILON {
f64::NAN
} else {
absolute_change / baseline * 100.0
};
Self {
baseline,
comparison,
absolute_change,
relative_change_pct,
}
}
pub fn is_growing(&self) -> bool {
self.absolute_change > 0.0
}
}
#[derive(Debug, Clone)]
pub struct SliceComparison {
pub sessions: PeriodDelta,
pub unique_viewers: PeriodDelta,
pub total_watch_seconds: PeriodDelta,
pub avg_watch_seconds: PeriodDelta,
}
#[derive(Debug, Clone)]
pub struct GeoDeviceReport {
pub by_region: HashMap<Region, SliceMetrics>,
pub by_device: HashMap<DeviceType, SliceMetrics>,
pub cross_tab: HashMap<(Region, DeviceType), SliceMetrics>,
pub total_sessions: u64,
pub total_unique_viewers: u64,
pub total_watch_seconds: f64,
pub overall_avg_watch_seconds: f64,
}
impl GeoDeviceReport {
pub fn region_share(&self, region: Region) -> f64 {
if self.total_sessions == 0 {
return 0.0;
}
self.by_region
.get(®ion)
.map(|m| m.sessions as f64 / self.total_sessions as f64)
.unwrap_or(0.0)
}
pub fn device_share(&self, device: DeviceType) -> f64 {
if self.total_sessions == 0 {
return 0.0;
}
self.by_device
.get(&device)
.map(|m| m.sessions as f64 / self.total_sessions as f64)
.unwrap_or(0.0)
}
pub fn dominant_region_by_sessions(&self) -> Option<Region> {
self.by_region
.iter()
.max_by_key(|(_, m)| m.sessions)
.map(|(&r, _)| r)
}
pub fn dominant_device_by_viewers(&self) -> Option<DeviceType> {
self.by_device
.iter()
.max_by_key(|(_, m)| m.unique_viewers)
.map(|(&d, _)| d)
}
}
impl BreakdownAnalyzer {
pub fn build_report(&self) -> GeoDeviceReport {
let by_region = self.breakdown_by_region();
let by_device = self.breakdown_by_device();
let cross_tab = self.breakdown_by_region_and_device();
let total_sessions = self.records.len() as u64;
let total_watch_seconds: f64 = self.records.iter().map(|r| r.watch_seconds).sum();
let total_unique_viewers = {
let unique: std::collections::HashSet<&str> =
self.records.iter().map(|r| r.viewer_id.as_str()).collect();
unique.len() as u64
};
let overall_avg_watch_seconds = if total_sessions == 0 {
0.0
} else {
total_watch_seconds / total_sessions as f64
};
GeoDeviceReport {
by_region,
by_device,
cross_tab,
total_sessions,
total_unique_viewers,
total_watch_seconds,
overall_avg_watch_seconds,
}
}
pub fn compare_region_periods(
&self,
timestamped: &[TimestampedRecord],
region: Region,
split_timestamp_s: i64,
) -> Option<SliceComparison> {
let baseline: Vec<SessionRecord> = timestamped
.iter()
.filter(|t| t.timestamp_s <= split_timestamp_s && t.record.region == region)
.map(|t| t.record.clone())
.collect();
let comparison: Vec<SessionRecord> = timestamped
.iter()
.filter(|t| t.timestamp_s > split_timestamp_s && t.record.region == region)
.map(|t| t.record.clone())
.collect();
if baseline.is_empty() && comparison.is_empty() {
return None;
}
let base_metrics = Self::compute_slice_metrics(&baseline);
let comp_metrics = Self::compute_slice_metrics(&comparison);
Some(SliceComparison {
sessions: PeriodDelta::new(base_metrics.sessions as f64, comp_metrics.sessions as f64),
unique_viewers: PeriodDelta::new(
base_metrics.unique_viewers as f64,
comp_metrics.unique_viewers as f64,
),
total_watch_seconds: PeriodDelta::new(
base_metrics.total_watch_seconds,
comp_metrics.total_watch_seconds,
),
avg_watch_seconds: PeriodDelta::new(
base_metrics.avg_watch_seconds,
comp_metrics.avg_watch_seconds,
),
})
}
pub fn compare_device_periods(
&self,
timestamped: &[TimestampedRecord],
device: DeviceType,
split_timestamp_s: i64,
) -> Option<SliceComparison> {
let baseline: Vec<SessionRecord> = timestamped
.iter()
.filter(|t| t.timestamp_s <= split_timestamp_s && t.record.device == device)
.map(|t| t.record.clone())
.collect();
let comparison: Vec<SessionRecord> = timestamped
.iter()
.filter(|t| t.timestamp_s > split_timestamp_s && t.record.device == device)
.map(|t| t.record.clone())
.collect();
if baseline.is_empty() && comparison.is_empty() {
return None;
}
let base_metrics = Self::compute_slice_metrics(&baseline);
let comp_metrics = Self::compute_slice_metrics(&comparison);
Some(SliceComparison {
sessions: PeriodDelta::new(base_metrics.sessions as f64, comp_metrics.sessions as f64),
unique_viewers: PeriodDelta::new(
base_metrics.unique_viewers as f64,
comp_metrics.unique_viewers as f64,
),
total_watch_seconds: PeriodDelta::new(
base_metrics.total_watch_seconds,
comp_metrics.total_watch_seconds,
),
avg_watch_seconds: PeriodDelta::new(
base_metrics.avg_watch_seconds,
comp_metrics.avg_watch_seconds,
),
})
}
pub fn ingest_timestamped(&mut self, records: &[TimestampedRecord]) {
for tr in records {
self.records.push(tr.record.clone());
}
}
fn compute_slice_metrics(records: &[SessionRecord]) -> SliceMetrics {
if records.is_empty() {
return SliceMetrics::new(0, 0, 0.0);
}
let sessions = records.len() as u64;
let total_watch: f64 = records.iter().map(|r| r.watch_seconds).sum();
let unique: std::collections::HashSet<&str> =
records.iter().map(|r| r.viewer_id.as_str()).collect();
SliceMetrics::new(sessions, unique.len() as u64, total_watch)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn rec(viewer_id: &str, region: Region, device: DeviceType, watch: f64) -> SessionRecord {
SessionRecord {
viewer_id: viewer_id.into(),
region,
device,
watch_seconds: watch,
}
}
#[test]
fn empty_analyzer_has_zero_sessions() {
let a = BreakdownAnalyzer::new();
assert_eq!(a.session_count(), 0);
}
#[test]
fn single_record_breakdown_by_region() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 300.0));
let bd = a.breakdown_by_region();
let m = bd.get(&Region::Europe).expect("Europe present");
assert_eq!(m.sessions, 1);
assert_eq!(m.unique_viewers, 1);
assert!((m.total_watch_seconds - 300.0).abs() < 1e-9);
assert!((m.avg_watch_seconds - 300.0).abs() < 1e-9);
}
#[test]
fn multiple_regions_counted_separately() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 100.0));
a.ingest(rec("v2", Region::AsiaPacific, DeviceType::Mobile, 200.0));
a.ingest(rec("v3", Region::Europe, DeviceType::Tablet, 150.0));
let bd = a.breakdown_by_region();
let eu = bd.get(&Region::Europe).expect("EU");
let ap = bd.get(&Region::AsiaPacific).expect("AP");
assert_eq!(eu.sessions, 2);
assert_eq!(ap.sessions, 1);
assert!((eu.total_watch_seconds - 250.0).abs() < 1e-9);
}
#[test]
fn unique_viewers_deduplicated() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 60.0));
a.ingest(rec("v1", Region::Europe, DeviceType::Mobile, 45.0));
a.ingest(rec("v2", Region::Europe, DeviceType::Desktop, 90.0));
let bd = a.breakdown_by_region();
let eu = bd.get(&Region::Europe).expect("EU");
assert_eq!(eu.sessions, 3);
assert_eq!(eu.unique_viewers, 2); }
#[test]
fn breakdown_by_device_type() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::NorthAmerica, DeviceType::Mobile, 100.0));
a.ingest(rec("v2", Region::NorthAmerica, DeviceType::Mobile, 80.0));
a.ingest(rec("v3", Region::Europe, DeviceType::Desktop, 200.0));
let bd = a.breakdown_by_device();
let mob = bd.get(&DeviceType::Mobile).expect("Mobile");
let desk = bd.get(&DeviceType::Desktop).expect("Desktop");
assert_eq!(mob.sessions, 2);
assert_eq!(desk.sessions, 1);
}
#[test]
fn breakdown_cross_tab() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::Europe, DeviceType::Mobile, 60.0));
a.ingest(rec("v2", Region::Europe, DeviceType::Desktop, 120.0));
a.ingest(rec("v3", Region::NorthAmerica, DeviceType::Mobile, 90.0));
let bd = a.breakdown_by_region_and_device();
assert_eq!(
bd.get(&(Region::Europe, DeviceType::Mobile))
.map(|m| m.sessions),
Some(1)
);
assert_eq!(
bd.get(&(Region::Europe, DeviceType::Desktop))
.map(|m| m.sessions),
Some(1)
);
assert_eq!(
bd.get(&(Region::NorthAmerica, DeviceType::Mobile))
.map(|m| m.sessions),
Some(1)
);
}
#[test]
fn top_region_by_watch_time() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 100.0));
a.ingest(rec("v2", Region::AsiaPacific, DeviceType::Mobile, 500.0));
a.ingest(rec("v3", Region::AsiaPacific, DeviceType::Tablet, 300.0));
let top = a.top_region_by_watch_time().expect("top region");
assert_eq!(top, Region::AsiaPacific);
}
#[test]
fn top_device_by_sessions() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::Europe, DeviceType::Mobile, 60.0));
a.ingest(rec("v2", Region::Europe, DeviceType::Mobile, 60.0));
a.ingest(rec("v3", Region::Europe, DeviceType::SmartTv, 200.0));
let top = a.top_device_by_sessions().expect("top device");
assert_eq!(top, DeviceType::Mobile);
}
#[test]
fn empty_analyzer_top_region_errors() {
let a = BreakdownAnalyzer::new();
assert!(a.top_region_by_watch_time().is_err());
}
#[test]
fn empty_analyzer_top_device_errors() {
let a = BreakdownAnalyzer::new();
assert!(a.top_device_by_sessions().is_err());
}
#[test]
fn device_type_labels_are_stable() {
assert_eq!(DeviceType::Desktop.label(), "desktop");
assert_eq!(DeviceType::Mobile.label(), "mobile");
assert_eq!(DeviceType::SmartTv.label(), "smart_tv");
assert_eq!(DeviceType::Unknown.label(), "unknown");
}
#[test]
fn region_labels_are_stable() {
assert_eq!(Region::NorthAmerica.label(), "north_america");
assert_eq!(Region::AsiaPacific.label(), "asia_pacific");
assert_eq!(Region::Unknown.label(), "unknown");
}
#[test]
fn ingest_batch() {
let mut a = BreakdownAnalyzer::new();
let records = vec![
rec("v1", Region::Europe, DeviceType::Desktop, 100.0),
rec("v2", Region::Europe, DeviceType::Desktop, 200.0),
];
a.ingest_batch(records);
assert_eq!(a.session_count(), 2);
}
#[test]
fn build_report_totals_correct() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 100.0));
a.ingest(rec("v2", Region::Europe, DeviceType::Mobile, 200.0));
a.ingest(rec("v3", Region::AsiaPacific, DeviceType::Mobile, 300.0));
let report = a.build_report();
assert_eq!(report.total_sessions, 3);
assert_eq!(report.total_unique_viewers, 3);
assert!((report.total_watch_seconds - 600.0).abs() < 1e-9);
assert!((report.overall_avg_watch_seconds - 200.0).abs() < 1e-9);
}
#[test]
fn build_report_empty_analyzer() {
let a = BreakdownAnalyzer::new();
let report = a.build_report();
assert_eq!(report.total_sessions, 0);
assert_eq!(report.total_unique_viewers, 0);
assert_eq!(report.total_watch_seconds, 0.0);
assert_eq!(report.overall_avg_watch_seconds, 0.0);
assert!(report.by_region.is_empty());
}
#[test]
fn region_share_sums_to_one() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 100.0));
a.ingest(rec("v2", Region::Europe, DeviceType::Desktop, 100.0));
a.ingest(rec("v3", Region::AsiaPacific, DeviceType::Mobile, 100.0));
let report = a.build_report();
let eu_share = report.region_share(Region::Europe);
let ap_share = report.region_share(Region::AsiaPacific);
let unknown_share = report.region_share(Region::Unknown);
let total = eu_share + ap_share + unknown_share;
assert!((total - (eu_share + ap_share)).abs() < 1e-9); assert!((eu_share + ap_share - 1.0).abs() < 1e-9);
}
#[test]
fn device_share_correct() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::Europe, DeviceType::Mobile, 100.0));
a.ingest(rec("v2", Region::Europe, DeviceType::Mobile, 100.0));
a.ingest(rec("v3", Region::Europe, DeviceType::Desktop, 100.0));
let report = a.build_report();
let mobile_share = report.device_share(DeviceType::Mobile);
let desktop_share = report.device_share(DeviceType::Desktop);
assert!((mobile_share - 2.0 / 3.0).abs() < 1e-9);
assert!((desktop_share - 1.0 / 3.0).abs() < 1e-9);
}
#[test]
fn dominant_region_by_sessions() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::Europe, DeviceType::Desktop, 100.0));
a.ingest(rec("v2", Region::Europe, DeviceType::Mobile, 100.0));
a.ingest(rec("v3", Region::AsiaPacific, DeviceType::Mobile, 100.0));
let report = a.build_report();
assert_eq!(report.dominant_region_by_sessions(), Some(Region::Europe));
}
#[test]
fn dominant_device_by_viewers() {
let mut a = BreakdownAnalyzer::new();
a.ingest(rec("v1", Region::Europe, DeviceType::SmartTv, 100.0));
a.ingest(rec("v2", Region::Europe, DeviceType::SmartTv, 100.0));
a.ingest(rec("v3", Region::Europe, DeviceType::Desktop, 100.0));
let report = a.build_report();
assert_eq!(
report.dominant_device_by_viewers(),
Some(DeviceType::SmartTv)
);
}
fn ts_rec(
viewer_id: &str,
region: Region,
device: DeviceType,
watch: f64,
ts: i64,
) -> TimestampedRecord {
TimestampedRecord::new(
SessionRecord {
viewer_id: viewer_id.into(),
region,
device,
watch_seconds: watch,
},
ts,
)
}
#[test]
fn compare_region_periods_sessions_grow() {
let a = BreakdownAnalyzer::new();
let records = vec![
ts_rec("v1", Region::Europe, DeviceType::Desktop, 60.0, 100),
ts_rec("v2", Region::Europe, DeviceType::Mobile, 90.0, 200),
ts_rec("v3", Region::Europe, DeviceType::Desktop, 120.0, 201),
ts_rec("v4", Region::Europe, DeviceType::Mobile, 80.0, 300),
];
let cmp = a
.compare_region_periods(&records, Region::Europe, 200)
.expect("comparison");
assert_eq!(cmp.sessions.baseline as u64, 2);
assert_eq!(cmp.sessions.comparison as u64, 2);
assert!((cmp.sessions.absolute_change).abs() < 1e-9); }
#[test]
fn compare_region_periods_watch_time_grows() {
let a = BreakdownAnalyzer::new();
let records = vec![
ts_rec("v1", Region::NorthAmerica, DeviceType::Mobile, 60.0, 50),
ts_rec("v2", Region::NorthAmerica, DeviceType::Mobile, 600.0, 150),
];
let cmp = a
.compare_region_periods(&records, Region::NorthAmerica, 100)
.expect("comparison");
assert!((cmp.total_watch_seconds.baseline - 60.0).abs() < 1e-9);
assert!((cmp.total_watch_seconds.comparison - 600.0).abs() < 1e-9);
assert!(cmp.total_watch_seconds.is_growing());
}
#[test]
fn compare_device_periods_absent_region_returns_none() {
let a = BreakdownAnalyzer::new();
let records = vec![ts_rec(
"v1",
Region::AsiaPacific,
DeviceType::Mobile,
60.0,
100,
)];
let cmp = a.compare_device_periods(&records, DeviceType::Tablet, 50);
assert!(cmp.is_none());
}
#[test]
fn period_delta_relative_change_computed() {
let delta = PeriodDelta::new(100.0, 150.0);
assert!((delta.relative_change_pct - 50.0).abs() < 1e-9);
assert!(delta.is_growing());
}
#[test]
fn period_delta_zero_baseline_gives_nan_relative() {
let delta = PeriodDelta::new(0.0, 10.0);
assert!(delta.relative_change_pct.is_nan());
}
#[test]
fn ingest_timestamped_populates_analyzer() {
let mut a = BreakdownAnalyzer::new();
let records = vec![
ts_rec("v1", Region::Europe, DeviceType::Desktop, 100.0, 1),
ts_rec("v2", Region::Europe, DeviceType::Mobile, 200.0, 2),
];
a.ingest_timestamped(&records);
assert_eq!(a.session_count(), 2);
}
}