1pub mod chunk_cache;
2pub mod m3u8_cache;
3pub mod m3u8_manifest;
4
5use access_unit::Fmp4;
6use chunk_cache::ChunkCache;
7use m3u8_cache::M3u8Cache;
8use m3u8_manifest::M3u8Manifest;
9use std::collections::BTreeMap;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex};
12use thiserror::Error;
13use tracing::info;
14
15#[derive(Error, Debug)]
16pub enum CacheError {
17 #[error("IO error: {0}")]
18 Io(#[from] std::io::Error),
19 #[error("Index out of bounds")]
20 IndexOutOfBounds,
21 #[error("Stream not found")]
22 StreamNotFound,
23 #[error("Buffer overflow")]
24 BufferOverflow,
25 #[error("Arithmetic overflow")]
26 ArithmeticOverflow,
27}
28
29#[derive(Copy, Clone, Debug)]
30pub struct Options {
31 pub max_segments: usize,
32 pub num_playlists: usize,
33 pub max_parts_per_segment: usize,
34 pub max_parted_segments: usize,
35 pub segment_min_ms: u32,
36 pub buffer_size_kb: usize,
37 pub init_size_kb: usize,
38}
39
40impl Default for Options {
41 fn default() -> Self {
42 Options {
43 max_segments: 32,
44 num_playlists: 5,
45 max_parts_per_segment: 128,
46 max_parted_segments: 32,
47 segment_min_ms: 1500,
48 buffer_size_kb: 800,
49 init_size_kb: 5,
50 }
51 }
52}
53
54pub struct Playlists {
55 pub chunk_cache: Arc<ChunkCache>,
56 m3u8_cache: Arc<M3u8Cache>,
57 playlists: Mutex<BTreeMap<u64, M3u8Manifest>>,
58 active: AtomicUsize,
59 options: Options,
60}
61
62impl Playlists {
63 pub fn new(options: Options) -> (Arc<Self>, Arc<ChunkCache>, Arc<M3u8Cache>) {
64 let chunk_cache = Arc::new(ChunkCache::new(options));
65 let m3u8_cache = Arc::new(M3u8Cache::new(options));
66
67 (
68 Arc::new(Self {
69 chunk_cache: Arc::clone(&chunk_cache),
70 m3u8_cache: Arc::clone(&m3u8_cache),
71 playlists: Mutex::new(BTreeMap::new()),
72 active: AtomicUsize::new(0),
73 options,
74 }),
75 Arc::clone(&chunk_cache),
76 Arc::clone(&m3u8_cache),
77 )
78 }
79
80 pub fn active(&self) -> usize {
81 self.active.load(Ordering::SeqCst)
82 }
83
84 pub fn fin(&self, id: u64) {
85 let removed = {
86 let mut playlists = self.playlists.lock().unwrap();
87 playlists.remove(&id).is_some()
88 };
89 if removed {
90 self.active.fetch_sub(1, Ordering::SeqCst);
91 }
92 self.m3u8_cache.zero_stream_id(id);
93 if let Ok(handle) = tokio::runtime::Handle::try_current() {
94 let chunk_cache = Arc::clone(&self.chunk_cache);
95 let _ = handle.spawn(async move {
96 chunk_cache.zero_stream_id(id).await;
97 });
98 } else if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
99 .enable_all()
100 .build()
101 {
102 rt.block_on(self.chunk_cache.zero_stream_id(id));
103 }
104 }
105
106 pub fn add(&self, stream_id: u64, fmp4: Fmp4) -> bool {
107 let mut playlists = self.playlists.lock().unwrap();
108
109 if !playlists.contains_key(&stream_id) {
110 if self.active.load(Ordering::SeqCst) >= self.chunk_cache.options.num_playlists {
111 return false;
112 }
113
114 let n = self.active.fetch_add(1, Ordering::SeqCst);
115 info!("PLAY:NEW active={}", n + 1);
116 }
117
118 let (m3u8, seg, seq, idx, new_seg) = {
119 let playlist: &mut M3u8Manifest = playlists
120 .entry(stream_id)
121 .or_insert_with(|| M3u8Manifest::new(self.options));
122 playlist.add_part(fmp4.duration, fmp4.key)
123 };
124
125 if new_seg {
126 info!("PLAY:UP active={}", self.active());
127 }
128
129 if let Some(init) = fmp4.init {
130 let _ = self.m3u8_cache.set_init(stream_id, init);
131 }
132 let _ = self.m3u8_cache.add(stream_id, seg, seq, idx, m3u8);
134
135 true
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use bytes::Bytes;
143 use tokio::time::{timeout, Duration};
144
145 #[tokio::test]
146 async fn test_fin_clears_chunk_cache_entry() {
147 let options = Options::default();
148 let (playlists, chunk_cache, _m3u8_cache) = Playlists::new(options);
149 let stream_id = 101;
150
151 let fmp4 = Fmp4 {
152 init: None,
153 key: true,
154 data: Bytes::from_static(b"test"),
155 duration: 500,
156 };
157 assert!(playlists.add(stream_id, fmp4));
158 assert_eq!(playlists.active(), 1);
159
160 let _ = chunk_cache.add_stream_id(stream_id).await;
161 assert!(chunk_cache.get_stream_idx(stream_id).await.is_some());
162
163 playlists.fin(stream_id);
164 assert_eq!(playlists.active(), 0);
165
166 let cleared = timeout(Duration::from_millis(200), async {
167 loop {
168 if chunk_cache.get_stream_idx(stream_id).await.is_none() {
169 break;
170 }
171 tokio::task::yield_now().await;
172 }
173 })
174 .await;
175
176 assert!(cleared.is_ok());
177 }
178}