use crate::session::{build_playback_map, ViewerSession};
#[derive(Debug, Clone, PartialEq)]
pub struct ContentSegment {
pub name: String,
pub start_ms: u64,
pub end_ms: u64,
}
impl ContentSegment {
pub fn duration_ms(&self) -> u64 {
self.end_ms.saturating_sub(self.start_ms)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct SegmentRetentionResult {
pub segment_name: String,
pub start_ms: u64,
pub end_ms: u64,
pub entry_retention_pct: f32,
pub exit_retention_pct: f32,
pub avg_segment_completion: f32,
pub viewers_entered: u32,
pub viewers_completed: u32,
}
pub fn compute_segment_retention(
sessions: &[ViewerSession],
segments: &[ContentSegment],
content_duration_ms: u64,
) -> Vec<SegmentRetentionResult> {
if sessions.is_empty() || segments.is_empty() || content_duration_ms == 0 {
return Vec::new();
}
let maps: Vec<_> = sessions
.iter()
.map(|s| build_playback_map(s, content_duration_ms))
.collect();
let total_starts = sessions.len() as f32;
segments
.iter()
.map(|seg| {
let start_sec = (seg.start_ms / 1000) as usize;
let end_sec_exclusive = ((seg.end_ms + 999) / 1000) as usize;
let last_sec = end_sec_exclusive.saturating_sub(1);
let mut viewers_entered = 0u32;
let mut viewers_completed = 0u32;
let mut total_segment_coverage = 0.0f64;
let seg_len = end_sec_exclusive.saturating_sub(start_sec).max(1);
for map in &maps {
let entered = (start_sec..end_sec_exclusive)
.any(|s| map.positions_watched.get(s).copied().unwrap_or(false));
if entered {
viewers_entered += 1;
let watched_in_seg = (start_sec..end_sec_exclusive)
.filter(|&s| map.positions_watched.get(s).copied().unwrap_or(false))
.count();
total_segment_coverage += watched_in_seg as f64 / seg_len as f64;
}
let completed = map
.positions_watched
.get(last_sec)
.copied()
.unwrap_or(false);
if completed {
viewers_completed += 1;
}
}
let entry_retention_pct = viewers_entered as f32 / total_starts * 100.0;
let exit_retention_pct = viewers_completed as f32 / total_starts * 100.0;
let avg_segment_completion = if viewers_entered > 0 {
(total_segment_coverage / viewers_entered as f64) as f32
} else {
0.0
};
SegmentRetentionResult {
segment_name: seg.name.clone(),
start_ms: seg.start_ms,
end_ms: seg.end_ms,
entry_retention_pct,
exit_retention_pct,
avg_segment_completion,
viewers_entered,
viewers_completed,
}
})
.collect()
}
#[derive(Debug, Clone)]
pub struct IncrementalRetentionState {
counts: Vec<u32>,
total_starts: u32,
completed_views: u32,
content_duration_ms: u64,
#[allow(dead_code)]
total_seconds: usize,
completion_threshold_sec: usize,
num_buckets: usize,
}
impl IncrementalRetentionState {
pub fn new(content_duration_ms: u64, num_buckets: usize) -> Option<Self> {
if content_duration_ms == 0 || num_buckets == 0 {
return None;
}
let total_seconds = ((content_duration_ms + 999) / 1000) as usize;
let completion_threshold_ms = (content_duration_ms as f64 * 0.95) as u64;
let completion_threshold_sec = (completion_threshold_ms / 1000) as usize;
Some(Self {
counts: vec![0u32; total_seconds],
total_starts: 0,
completed_views: 0,
content_duration_ms,
total_seconds,
completion_threshold_sec,
num_buckets,
})
}
pub fn add_session(&mut self, session: &ViewerSession) {
let map = build_playback_map(session, self.content_duration_ms);
self.total_starts += 1;
for (sec, watched) in map.positions_watched.iter().enumerate() {
if *watched && sec < self.counts.len() {
self.counts[sec] += 1;
}
}
if map
.positions_watched
.get(self.completion_threshold_sec)
.copied()
.unwrap_or(false)
{
self.completed_views += 1;
}
}
pub fn add_sessions(&mut self, sessions: &[ViewerSession]) {
for session in sessions {
self.add_session(session);
}
}
pub fn finalise(&self) -> RetentionCurve {
if self.total_starts == 0 {
return RetentionCurve {
buckets: Vec::new(),
total_starts: 0,
completed_views: 0,
};
}
let mut buckets = Vec::with_capacity(self.num_buckets);
for i in 0..self.num_buckets {
let position_pct = i as f32 / (self.num_buckets - 1).max(1) as f32 * 100.0;
let position_ms =
(position_pct as f64 / 100.0 * self.content_duration_ms as f64) as u64;
let position_sec = (position_ms / 1000) as usize;
let viewers = self.counts.get(position_sec).copied().unwrap_or(0);
let retention_pct = viewers as f32 / self.total_starts as f32 * 100.0;
buckets.push(RetentionBucket {
position_pct,
retention_pct,
});
}
RetentionCurve {
buckets,
total_starts: self.total_starts,
completed_views: self.completed_views,
}
}
pub fn sessions_processed(&self) -> u32 {
self.total_starts
}
}
pub fn compute_retention_incremental(
sessions: &[ViewerSession],
content_duration_ms: u64,
num_buckets: usize,
chunk_size: usize,
) -> RetentionCurve {
if sessions.is_empty() || num_buckets == 0 || content_duration_ms == 0 {
return RetentionCurve {
buckets: Vec::new(),
total_starts: 0,
completed_views: 0,
};
}
let state = match IncrementalRetentionState::new(content_duration_ms, num_buckets) {
Some(s) => s,
None => {
return RetentionCurve {
buckets: Vec::new(),
total_starts: 0,
completed_views: 0,
}
}
};
let effective_chunk = chunk_size.max(1);
let mut state = state;
for chunk in sessions.chunks(effective_chunk) {
state.add_sessions(chunk);
}
state.finalise()
}
#[derive(Debug, Clone, PartialEq)]
pub struct RetentionBucket {
pub position_pct: f32,
pub retention_pct: f32,
}
#[derive(Debug, Clone)]
pub struct RetentionCurve {
pub buckets: Vec<RetentionBucket>,
pub total_starts: u32,
pub completed_views: u32,
}
pub fn compute_retention(
sessions: &[ViewerSession],
content_duration_ms: u64,
num_buckets: usize,
) -> RetentionCurve {
if sessions.is_empty() || num_buckets == 0 || content_duration_ms == 0 {
return RetentionCurve {
buckets: Vec::new(),
total_starts: 0,
completed_views: 0,
};
}
let maps: Vec<_> = sessions
.iter()
.map(|s| build_playback_map(s, content_duration_ms))
.collect();
let total_starts = sessions.len() as u32;
let completion_threshold_ms = (content_duration_ms as f64 * 0.95) as u64;
let completed_views = maps
.iter()
.filter(|m| {
let sec = (completion_threshold_ms / 1000) as usize;
m.positions_watched.get(sec).copied().unwrap_or(false)
})
.count() as u32;
let mut buckets = Vec::with_capacity(num_buckets);
for i in 0..num_buckets {
let position_pct = i as f32 / (num_buckets - 1).max(1) as f32 * 100.0;
let position_ms = (position_pct as f64 / 100.0 * content_duration_ms as f64) as u64;
let position_sec = (position_ms / 1000) as usize;
let viewers_at_position = maps
.iter()
.filter(|m| {
m.positions_watched
.get(position_sec)
.copied()
.unwrap_or(false)
})
.count();
let retention_pct = viewers_at_position as f32 / total_starts as f32 * 100.0;
buckets.push(RetentionBucket {
position_pct,
retention_pct,
});
}
RetentionCurve {
buckets,
total_starts,
completed_views,
}
}
pub fn average_view_duration(curve: &RetentionCurve) -> f32 {
if curve.buckets.len() < 2 {
return curve
.buckets
.first()
.map(|b| b.retention_pct)
.unwrap_or(0.0);
}
let mut area = 0.0f32;
let n = curve.buckets.len();
for i in 1..n {
let dx = curve.buckets[i].position_pct - curve.buckets[i - 1].position_pct;
let avg_y = (curve.buckets[i].retention_pct + curve.buckets[i - 1].retention_pct) / 2.0;
area += dx * avg_y;
}
area / 100.0
}
pub fn drop_off_points(curve: &RetentionCurve, threshold_pct_drop: f32) -> Vec<f32> {
let mut drops = Vec::new();
for i in 1..curve.buckets.len() {
let delta = curve.buckets[i - 1].retention_pct - curve.buckets[i].retention_pct;
if delta > threshold_pct_drop {
drops.push(curve.buckets[i].position_pct);
}
}
drops
}
pub fn re_watch_segments(sessions: &[ViewerSession], content_duration_ms: u64) -> Vec<(u64, u64)> {
if sessions.is_empty() || content_duration_ms == 0 {
return Vec::new();
}
let total_sec = ((content_duration_ms + 999) / 1000) as usize;
let mut watch_counts = vec![0u32; total_sec];
for session in sessions {
let map = build_playback_map(session, content_duration_ms);
for (sec, watched) in map.positions_watched.iter().enumerate() {
if *watched {
if sec < watch_counts.len() {
watch_counts[sec] += 1;
}
}
}
}
let n = sessions.len() as f32;
let is_rewatched =
|sec: usize| -> bool { watch_counts.get(sec).copied().unwrap_or(0) as f32 > n };
let mut segments: Vec<(u64, u64)> = Vec::new();
let mut in_segment = false;
let mut seg_start = 0usize;
for sec in 0..total_sec {
if is_rewatched(sec) {
if !in_segment {
seg_start = sec;
in_segment = true;
}
} else if in_segment {
segments.push((seg_start as u64 * 1000, sec as u64 * 1000));
in_segment = false;
}
}
if in_segment {
segments.push((seg_start as u64 * 1000, total_sec as u64 * 1000));
}
segments
}
#[derive(Debug, Clone)]
pub struct RetentionBenchmark {
pub content_type: String,
pub expected_at_25pct: f32,
pub expected_at_50pct: f32,
pub expected_at_75pct: f32,
}
pub const BROADCAST_BENCHMARK: RetentionBenchmark = RetentionBenchmark {
content_type: String::new(), expected_at_25pct: 85.0,
expected_at_50pct: 70.0,
expected_at_75pct: 55.0,
};
pub const VOD_BENCHMARK: RetentionBenchmark = RetentionBenchmark {
content_type: String::new(),
expected_at_25pct: 80.0,
expected_at_50pct: 60.0,
expected_at_75pct: 40.0,
};
pub const SHORT_FORM_BENCHMARK: RetentionBenchmark = RetentionBenchmark {
content_type: String::new(),
expected_at_25pct: 95.0,
expected_at_50pct: 88.0,
expected_at_75pct: 78.0,
};
pub fn broadcast_benchmark() -> RetentionBenchmark {
RetentionBenchmark {
content_type: "broadcast".to_string(),
..BROADCAST_BENCHMARK.clone()
}
}
pub fn vod_benchmark() -> RetentionBenchmark {
RetentionBenchmark {
content_type: "vod".to_string(),
..VOD_BENCHMARK.clone()
}
}
pub fn short_form_benchmark() -> RetentionBenchmark {
RetentionBenchmark {
content_type: "short_form".to_string(),
..SHORT_FORM_BENCHMARK.clone()
}
}
pub fn compare_to_benchmark(curve: &RetentionCurve, benchmark: &RetentionBenchmark) -> f32 {
if curve.buckets.is_empty() {
return 0.0;
}
let retention_at = |target_pct: f32| -> f32 {
let n = curve.buckets.len();
if n == 1 {
return curve.buckets[0].retention_pct;
}
let lower = curve
.buckets
.iter()
.rev()
.find(|b| b.position_pct <= target_pct);
let upper = curve.buckets.iter().find(|b| b.position_pct >= target_pct);
match (lower, upper) {
(Some(lo), Some(hi)) if (hi.position_pct - lo.position_pct).abs() < 1e-6 => {
lo.retention_pct
}
(Some(lo), Some(hi)) => {
let t = (target_pct - lo.position_pct) / (hi.position_pct - lo.position_pct);
lo.retention_pct + t * (hi.retention_pct - lo.retention_pct)
}
(Some(lo), None) => lo.retention_pct,
(None, Some(hi)) => hi.retention_pct,
(None, None) => 0.0,
}
};
let r25 = retention_at(25.0);
let r50 = retention_at(50.0);
let r75 = retention_at(75.0);
let score_25 = if benchmark.expected_at_25pct > 0.0 {
(r25 / benchmark.expected_at_25pct).min(1.0)
} else {
1.0
};
let score_50 = if benchmark.expected_at_50pct > 0.0 {
(r50 / benchmark.expected_at_50pct).min(1.0)
} else {
1.0
};
let score_75 = if benchmark.expected_at_75pct > 0.0 {
(r75 / benchmark.expected_at_75pct).min(1.0)
} else {
1.0
};
(score_25 + score_50 + score_75) / 3.0 * 100.0
}
#[cfg(test)]
mod tests {
use super::*;
use crate::session::{PlaybackEvent, ViewerSession};
fn make_session(id: &str, watch_end_ms: u64, _content_ms: u64) -> ViewerSession {
ViewerSession {
session_id: id.to_string(),
user_id: None,
content_id: "c1".to_string(),
started_at_ms: 0,
events: vec![
PlaybackEvent::Play { timestamp_ms: 0 },
PlaybackEvent::End {
position_ms: watch_end_ms,
watch_duration_ms: watch_end_ms,
},
],
}
}
#[test]
fn compute_retention_empty_sessions() {
let curve = compute_retention(&[], 60_000, 10);
assert!(curve.buckets.is_empty());
assert_eq!(curve.total_starts, 0);
}
#[test]
fn compute_retention_basic() {
let sessions = vec![
make_session("s1", 10_000, 10_000),
make_session("s2", 10_000, 10_000),
make_session("s3", 5_000, 10_000),
];
let curve = compute_retention(&sessions, 10_000, 5);
assert_eq!(curve.total_starts, 3);
assert!((curve.buckets[0].retention_pct - 100.0).abs() < 5.0);
}
#[test]
fn compute_retention_completed_views() {
let sessions = vec![
make_session("s1", 10_000, 10_000),
make_session("s2", 9_500, 10_000), make_session("s3", 5_000, 10_000), ];
let curve = compute_retention(&sessions, 10_000, 10);
assert_eq!(curve.completed_views, 2);
}
#[test]
fn average_view_duration_full_retention() {
let curve = RetentionCurve {
buckets: vec![
RetentionBucket {
position_pct: 0.0,
retention_pct: 100.0,
},
RetentionBucket {
position_pct: 50.0,
retention_pct: 100.0,
},
RetentionBucket {
position_pct: 100.0,
retention_pct: 100.0,
},
],
total_starts: 1,
completed_views: 1,
};
let avg = average_view_duration(&curve);
assert!((avg - 100.0).abs() < 1e-3);
}
#[test]
fn average_view_duration_empty_curve() {
let curve = RetentionCurve {
buckets: vec![],
total_starts: 0,
completed_views: 0,
};
assert_eq!(average_view_duration(&curve), 0.0);
}
#[test]
fn average_view_duration_linear_decay() {
let curve = RetentionCurve {
buckets: vec![
RetentionBucket {
position_pct: 0.0,
retention_pct: 100.0,
},
RetentionBucket {
position_pct: 100.0,
retention_pct: 0.0,
},
],
total_starts: 1,
completed_views: 0,
};
let avg = average_view_duration(&curve);
assert!((avg - 50.0).abs() < 1e-3);
}
#[test]
fn drop_off_points_detects_large_drop() {
let curve = RetentionCurve {
buckets: vec![
RetentionBucket {
position_pct: 0.0,
retention_pct: 100.0,
},
RetentionBucket {
position_pct: 25.0,
retention_pct: 90.0,
},
RetentionBucket {
position_pct: 50.0,
retention_pct: 60.0,
}, RetentionBucket {
position_pct: 75.0,
retention_pct: 58.0,
},
RetentionBucket {
position_pct: 100.0,
retention_pct: 55.0,
},
],
total_starts: 10,
completed_views: 5,
};
let drops = drop_off_points(&curve, 20.0);
assert_eq!(drops.len(), 1);
assert!((drops[0] - 50.0).abs() < 1e-3);
}
#[test]
fn drop_off_points_no_drop() {
let curve = RetentionCurve {
buckets: vec![
RetentionBucket {
position_pct: 0.0,
retention_pct: 100.0,
},
RetentionBucket {
position_pct: 100.0,
retention_pct: 98.0,
},
],
total_starts: 1,
completed_views: 1,
};
let drops = drop_off_points(&curve, 5.0);
assert!(drops.is_empty());
}
#[test]
fn re_watch_segments_none() {
let sessions = vec![make_session("s1", 5000, 10_000)];
let segs = re_watch_segments(&sessions, 10_000);
assert!(segs.is_empty());
}
#[test]
fn re_watch_segments_detected() {
let _sessions = vec![
make_session("s1", 3000, 10_000),
make_session("s2", 3000, 10_000),
];
let sessions3 = vec![
make_session("s1", 3000, 10_000),
make_session("s2", 3000, 10_000),
make_session("s3", 3000, 10_000),
];
let segs = re_watch_segments(&sessions3, 10_000);
assert!(segs.is_empty());
}
#[test]
fn compare_to_benchmark_perfect_match() {
let benchmark = vod_benchmark();
let curve = RetentionCurve {
buckets: vec![
RetentionBucket {
position_pct: 0.0,
retention_pct: 100.0,
},
RetentionBucket {
position_pct: 25.0,
retention_pct: 80.0,
},
RetentionBucket {
position_pct: 50.0,
retention_pct: 60.0,
},
RetentionBucket {
position_pct: 75.0,
retention_pct: 40.0,
},
RetentionBucket {
position_pct: 100.0,
retention_pct: 20.0,
},
],
total_starts: 100,
completed_views: 20,
};
let score = compare_to_benchmark(&curve, &benchmark);
assert!((score - 100.0).abs() < 1e-3);
}
#[test]
fn compare_to_benchmark_empty_curve() {
let benchmark = broadcast_benchmark();
let curve = RetentionCurve {
buckets: vec![],
total_starts: 0,
completed_views: 0,
};
assert_eq!(compare_to_benchmark(&curve, &benchmark), 0.0);
}
#[test]
fn compare_to_benchmark_below_benchmark() {
let benchmark = vod_benchmark();
let curve = RetentionCurve {
buckets: vec![
RetentionBucket {
position_pct: 0.0,
retention_pct: 100.0,
},
RetentionBucket {
position_pct: 25.0,
retention_pct: 40.0,
}, RetentionBucket {
position_pct: 50.0,
retention_pct: 30.0,
}, RetentionBucket {
position_pct: 75.0,
retention_pct: 20.0,
}, RetentionBucket {
position_pct: 100.0,
retention_pct: 5.0,
},
],
total_starts: 100,
completed_views: 5,
};
let score = compare_to_benchmark(&curve, &benchmark);
assert!(score < 60.0, "score={score}");
}
#[test]
fn benchmark_constants_values() {
assert!((BROADCAST_BENCHMARK.expected_at_25pct - 85.0).abs() < 1e-6);
assert!((VOD_BENCHMARK.expected_at_50pct - 60.0).abs() < 1e-6);
assert!((SHORT_FORM_BENCHMARK.expected_at_75pct - 78.0).abs() < 1e-6);
}
#[test]
fn compute_retention_single_bucket() {
let sessions = vec![make_session("s1", 10_000, 10_000)];
let curve = compute_retention(&sessions, 10_000, 1);
assert_eq!(curve.buckets.len(), 1);
}
#[test]
fn retention_curve_total_starts_matches_sessions() {
let sessions: Vec<_> = (0..7)
.map(|i| make_session(&i.to_string(), 5000, 10_000))
.collect();
let curve = compute_retention(&sessions, 10_000, 5);
assert_eq!(curve.total_starts, 7);
}
#[test]
fn content_segment_duration_ms() {
let seg = ContentSegment {
name: "intro".to_string(),
start_ms: 0,
end_ms: 5_000,
};
assert_eq!(seg.duration_ms(), 5_000);
}
#[test]
fn content_segment_duration_ms_saturates_on_underflow() {
let seg = ContentSegment {
name: "bad".to_string(),
start_ms: 10_000,
end_ms: 5_000,
};
assert_eq!(seg.duration_ms(), 0);
}
#[test]
fn segment_retention_all_viewers_watch_all_segments() {
let sessions = vec![
make_session("s1", 10_000, 10_000),
make_session("s2", 10_000, 10_000),
make_session("s3", 10_000, 10_000),
];
let segments = vec![
ContentSegment {
name: "intro".to_string(),
start_ms: 0,
end_ms: 5_000,
},
ContentSegment {
name: "main".to_string(),
start_ms: 5_000,
end_ms: 10_000,
},
];
let results = compute_segment_retention(&sessions, &segments, 10_000);
assert_eq!(results.len(), 2);
assert!((results[0].entry_retention_pct - 100.0).abs() < 1.0);
assert_eq!(results[0].viewers_entered, 3);
assert_eq!(results[1].viewers_entered, 3);
}
#[test]
fn segment_retention_partial_viewers() {
let sessions = vec![
make_session("s1", 3_000, 3_000),
make_session("s2", 10_000, 10_000),
make_session("s3", 10_000, 10_000),
];
let segments = vec![
ContentSegment {
name: "intro".to_string(),
start_ms: 0,
end_ms: 3_000,
},
ContentSegment {
name: "main".to_string(),
start_ms: 5_000,
end_ms: 10_000,
},
];
let results = compute_segment_retention(&sessions, &segments, 10_000);
assert_eq!(results[0].viewers_entered, 3);
assert_eq!(results[1].viewers_entered, 2);
assert!((results[1].entry_retention_pct - 100.0 * 2.0 / 3.0).abs() < 1.0);
}
#[test]
fn segment_retention_empty_sessions() {
let segments = vec![ContentSegment {
name: "s".to_string(),
start_ms: 0,
end_ms: 5_000,
}];
let results = compute_segment_retention(&[], &segments, 10_000);
assert!(results.is_empty());
}
#[test]
fn segment_retention_empty_segments() {
let sessions = vec![make_session("s1", 10_000, 10_000)];
let results = compute_segment_retention(&sessions, &[], 10_000);
assert!(results.is_empty());
}
#[test]
fn incremental_state_matches_batch_compute() {
let sessions: Vec<_> = (0..20)
.map(|i| {
if i % 2 == 0 {
make_session(&i.to_string(), 10_000, 10_000)
} else {
make_session(&i.to_string(), 5_000, 5_000)
}
})
.collect();
let batch = compute_retention(&sessions, 10_000, 10);
let incremental = compute_retention_incremental(&sessions, 10_000, 10, 5);
assert_eq!(batch.total_starts, incremental.total_starts);
assert_eq!(batch.completed_views, incremental.completed_views);
assert_eq!(batch.buckets.len(), incremental.buckets.len());
for (b, inc) in batch.buckets.iter().zip(incremental.buckets.iter()) {
assert!(
(b.retention_pct - inc.retention_pct).abs() < 1.0,
"mismatch at pos={}: batch={} incremental={}",
b.position_pct,
b.retention_pct,
inc.retention_pct
);
}
}
#[test]
fn incremental_state_add_sessions_incrementally() {
let sessions: Vec<_> = (0..10)
.map(|i| make_session(&i.to_string(), 10_000, 10_000))
.collect();
let mut state = IncrementalRetentionState::new(10_000, 5).expect("new should succeed");
for s in &sessions {
state.add_session(s);
}
let curve = state.finalise();
assert_eq!(curve.total_starts, 10);
assert_eq!(state.sessions_processed(), 10);
}
#[test]
fn incremental_state_new_invalid_returns_none() {
assert!(IncrementalRetentionState::new(0, 10).is_none());
assert!(IncrementalRetentionState::new(10_000, 0).is_none());
}
#[test]
fn compute_retention_incremental_empty_sessions() {
let curve = compute_retention_incremental(&[], 10_000, 10, 100);
assert!(curve.buckets.is_empty());
assert_eq!(curve.total_starts, 0);
}
}