use crate::video_transcode::{TranscodeError, TranscodeTarget};
use papaya::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use tokio::sync::Notify;
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum HlsCacheKey {
Playlist {
path: PathBuf,
target: TranscodeTarget,
},
Segment {
path: PathBuf,
target: TranscodeTarget,
segment_index: u32,
},
}
impl HlsCacheKey {
pub fn playlist(path: PathBuf, target: TranscodeTarget) -> Self {
Self::Playlist { path, target }
}
pub fn segment(path: PathBuf, target: TranscodeTarget, segment_index: u32) -> Self {
Self::Segment {
path,
target,
segment_index,
}
}
}
#[derive(Clone)]
pub enum HlsCacheState {
InProgress(Arc<Notify>),
Complete(Arc<Vec<u8>>),
Failed(String),
}
struct CacheEntry {
state: HlsCacheState,
created_at: Instant,
size_bytes: usize,
}
pub struct HlsCache {
cache: HashMap<HlsCacheKey, CacheEntry>,
current_size: AtomicUsize,
max_size: usize,
}
impl HlsCache {
pub fn new(max_size_bytes: usize) -> Self {
Self {
cache: HashMap::new(),
current_size: AtomicUsize::new(0),
max_size: max_size_bytes,
}
}
pub fn get_state(&self, key: &HlsCacheKey) -> Option<HlsCacheState> {
if self.max_size == 0 {
return None;
}
let guard = self.cache.pin();
guard.get(key).map(|entry| entry.state.clone())
}
pub fn start_generation(&self, key: HlsCacheKey) -> HlsCacheStartResult {
if self.max_size == 0 {
return HlsCacheStartResult::CacheDisabled;
}
let guard = self.cache.pin();
if let Some(entry) = guard.get(&key) {
return match &entry.state {
HlsCacheState::InProgress(notify) => {
HlsCacheStartResult::AlreadyInProgress(notify.clone())
}
HlsCacheState::Complete(data) => HlsCacheStartResult::AlreadyComplete(data.clone()),
HlsCacheState::Failed(msg) => HlsCacheStartResult::PreviouslyFailed(msg.clone()),
};
}
let notify = Arc::new(Notify::new());
let entry = CacheEntry {
state: HlsCacheState::InProgress(notify.clone()),
created_at: Instant::now(),
size_bytes: 0,
};
guard.insert(key.clone(), entry);
tracing::debug!("Started generation for {:?}", key);
HlsCacheStartResult::Started(notify)
}
pub fn complete_generation(&self, key: HlsCacheKey, data: Vec<u8>) {
if self.max_size == 0 {
return;
}
let guard = self.cache.pin();
let size_bytes = data.len();
let notify = if let Some(entry) = guard.get(&key) {
if let HlsCacheState::InProgress(n) = &entry.state {
Some(n.clone())
} else {
None
}
} else {
None
};
let data = Arc::new(data);
let entry = CacheEntry {
state: HlsCacheState::Complete(data),
created_at: Instant::now(),
size_bytes,
};
guard.insert(key.clone(), entry);
let new_size = self.current_size.fetch_add(size_bytes, Ordering::Relaxed) + size_bytes;
tracing::debug!(
"Generation complete for {:?} ({} bytes, cache size: {} bytes)",
key,
size_bytes,
new_size
);
if let Some(n) = notify {
n.notify_waiters();
}
if new_size > self.max_size {
self.evict_oldest(new_size - self.max_size);
}
}
pub fn fail_generation(&self, key: HlsCacheKey, error: &TranscodeError) {
if self.max_size == 0 {
return;
}
let guard = self.cache.pin();
let notify = if let Some(entry) = guard.get(&key) {
if let HlsCacheState::InProgress(n) = &entry.state {
Some(n.clone())
} else {
None
}
} else {
None
};
let entry = CacheEntry {
state: HlsCacheState::Failed(error.to_string()),
created_at: Instant::now(),
size_bytes: 0,
};
guard.insert(key.clone(), entry);
tracing::warn!("Generation failed for {:?}: {}", key, error);
if let Some(n) = notify {
n.notify_waiters();
}
}
#[allow(dead_code)]
pub fn clear_failed(&self, key: &HlsCacheKey) {
let guard = self.cache.pin();
if let Some(entry) = guard.get(key)
&& matches!(entry.state, HlsCacheState::Failed(_))
{
guard.remove(key);
tracing::debug!("Cleared failed entry for {:?}", key);
}
}
fn evict_oldest(&self, target_bytes: usize) {
let guard = self.cache.pin();
let mut entries: Vec<(HlsCacheKey, Instant, usize, bool)> = guard
.iter()
.filter_map(|(k, v)| {
if matches!(v.state, HlsCacheState::Complete(_)) && v.size_bytes > 0 {
let is_playlist = matches!(k, HlsCacheKey::Playlist { .. });
Some((k.clone(), v.created_at, v.size_bytes, is_playlist))
} else {
None
}
})
.collect();
entries.sort_by(
|(_, time_a, _, is_playlist_a), (_, time_b, _, is_playlist_b)| {
is_playlist_a
.cmp(is_playlist_b)
.then_with(|| time_a.cmp(time_b))
},
);
let mut freed = 0usize;
let mut evict_count = 0usize;
for (key, _, size, _) in entries {
if freed >= target_bytes {
break;
}
if guard.remove(&key).is_some() {
freed += size;
evict_count += 1;
self.current_size.fetch_sub(size, Ordering::Relaxed);
}
}
if evict_count > 0 {
tracing::info!(
"HLS cache evicted {} entries ({} bytes freed)",
evict_count,
freed
);
}
}
#[cfg(test)]
pub fn current_size(&self) -> usize {
self.current_size.load(Ordering::Relaxed)
}
#[cfg(test)]
pub fn len(&self) -> usize {
self.cache.pin().len()
}
#[cfg(test)]
pub fn is_empty(&self) -> bool {
self.cache.pin().is_empty()
}
}
pub enum HlsCacheStartResult {
Started(Arc<Notify>),
AlreadyInProgress(Arc<Notify>),
AlreadyComplete(Arc<Vec<u8>>),
PreviouslyFailed(String),
CacheDisabled,
}
#[cfg(test)]
mod tests {
use super::*;
fn make_playlist_key(path: &str, target: TranscodeTarget) -> HlsCacheKey {
HlsCacheKey::playlist(PathBuf::from(path), target)
}
fn make_segment_key(path: &str, target: TranscodeTarget, index: u32) -> HlsCacheKey {
HlsCacheKey::segment(PathBuf::from(path), target, index)
}
#[test]
fn test_start_and_complete_playlist() {
let cache = HlsCache::new(1024 * 1024);
let key = make_playlist_key("/videos/test.mp4", TranscodeTarget::Resolution720p);
let result = cache.start_generation(key.clone());
assert!(matches!(result, HlsCacheStartResult::Started(_)));
let state = cache.get_state(&key);
assert!(matches!(state, Some(HlsCacheState::InProgress(_))));
let data = b"#EXTM3U\n#EXT-X-VERSION:3\n".to_vec();
cache.complete_generation(key.clone(), data);
let state = cache.get_state(&key);
assert!(matches!(state, Some(HlsCacheState::Complete(_))));
}
#[test]
fn test_start_and_complete_segment() {
let cache = HlsCache::new(1024 * 1024);
let key = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, 5);
let result = cache.start_generation(key.clone());
assert!(matches!(result, HlsCacheStartResult::Started(_)));
let data = vec![0u8; 1000]; cache.complete_generation(key.clone(), data);
let state = cache.get_state(&key);
assert!(matches!(state, Some(HlsCacheState::Complete(_))));
}
#[test]
fn test_concurrent_start_returns_in_progress() {
let cache = HlsCache::new(1024 * 1024);
let key = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, 0);
let result1 = cache.start_generation(key.clone());
assert!(matches!(result1, HlsCacheStartResult::Started(_)));
let result2 = cache.start_generation(key.clone());
assert!(matches!(result2, HlsCacheStartResult::AlreadyInProgress(_)));
}
#[test]
fn test_complete_returns_already_complete() {
let cache = HlsCache::new(1024 * 1024);
let key = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, 0);
cache.start_generation(key.clone());
cache.complete_generation(key.clone(), vec![0u8; 100]);
let result = cache.start_generation(key.clone());
assert!(matches!(result, HlsCacheStartResult::AlreadyComplete(_)));
}
#[test]
fn test_failed_generation() {
let cache = HlsCache::new(1024 * 1024);
let key = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, 0);
cache.start_generation(key.clone());
cache.fail_generation(
key.clone(),
&TranscodeError::TranscodeFailed("Test failure".to_string()),
);
let state = cache.get_state(&key);
assert!(matches!(state, Some(HlsCacheState::Failed(_))));
let result = cache.start_generation(key.clone());
assert!(matches!(result, HlsCacheStartResult::PreviouslyFailed(_)));
}
#[test]
fn test_clear_failed_allows_retry() {
let cache = HlsCache::new(1024 * 1024);
let key = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, 0);
cache.start_generation(key.clone());
cache.fail_generation(
key.clone(),
&TranscodeError::TranscodeFailed("Test failure".to_string()),
);
cache.clear_failed(&key);
let result = cache.start_generation(key.clone());
assert!(matches!(result, HlsCacheStartResult::Started(_)));
}
#[test]
fn test_disabled_cache() {
let cache = HlsCache::new(0);
let key = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, 0);
let result = cache.start_generation(key.clone());
assert!(matches!(result, HlsCacheStartResult::CacheDisabled));
assert!(cache.get_state(&key).is_none());
}
#[test]
fn test_different_resolutions_are_separate() {
let cache = HlsCache::new(1024 * 1024);
let key_720 = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, 0);
let key_480 = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution480p, 0);
cache.start_generation(key_720.clone());
cache.complete_generation(key_720.clone(), vec![0u8; 100]);
let result = cache.start_generation(key_480.clone());
assert!(matches!(result, HlsCacheStartResult::Started(_)));
}
#[test]
fn test_different_segments_are_separate() {
let cache = HlsCache::new(1024 * 1024);
let key_0 = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, 0);
let key_1 = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, 1);
cache.start_generation(key_0.clone());
cache.complete_generation(key_0.clone(), vec![0u8; 100]);
let result = cache.start_generation(key_1.clone());
assert!(matches!(result, HlsCacheStartResult::Started(_)));
}
#[test]
fn test_playlist_and_segment_are_separate() {
let cache = HlsCache::new(1024 * 1024);
let playlist_key = make_playlist_key("/videos/test.mp4", TranscodeTarget::Resolution720p);
let segment_key = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, 0);
cache.start_generation(playlist_key.clone());
cache.complete_generation(playlist_key.clone(), b"#EXTM3U\n".to_vec());
let result = cache.start_generation(segment_key.clone());
assert!(matches!(result, HlsCacheStartResult::Started(_)));
}
#[test]
fn test_size_tracking() {
let cache = HlsCache::new(1024 * 1024);
assert_eq!(cache.current_size(), 0);
let key = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, 0);
cache.start_generation(key.clone());
cache.complete_generation(key.clone(), vec![0u8; 500]);
assert_eq!(cache.current_size(), 500);
}
#[test]
fn test_eviction_prefers_segments_over_playlists() {
let cache = HlsCache::new(500);
let playlist_key = make_playlist_key("/videos/test.mp4", TranscodeTarget::Resolution720p);
cache.start_generation(playlist_key.clone());
cache.complete_generation(playlist_key.clone(), vec![0u8; 50]);
for i in 0..10 {
let key = make_segment_key("/videos/test.mp4", TranscodeTarget::Resolution720p, i);
cache.start_generation(key.clone());
cache.complete_generation(key, vec![0u8; 100]);
}
assert!(matches!(
cache.get_state(&playlist_key),
Some(HlsCacheState::Complete(_))
));
}
#[test]
fn test_eviction_on_size_limit() {
let cache = HlsCache::new(1000);
for i in 0..10 {
let key = make_segment_key(
&format!("/videos/test{}.mp4", i),
TranscodeTarget::Resolution720p,
0,
);
cache.start_generation(key.clone());
cache.complete_generation(key, vec![0u8; 200]);
}
assert!(cache.current_size() <= 1200); }
}