playlists/
lib.rs

1pub mod chunk_cache;
2pub mod m3u8_cache;
3pub mod m3u8_manifest;
4
5use access_unit::Fmp4;
6use chunk_cache::ChunkCache;
7use m3u8_cache::M3u8Cache;
8use m3u8_manifest::M3u8Manifest;
9use std::collections::BTreeMap;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex};
12use thiserror::Error;
13use tracing::info;
14
15#[derive(Error, Debug)]
16pub enum CacheError {
17    #[error("IO error: {0}")]
18    Io(#[from] std::io::Error),
19    #[error("Index out of bounds")]
20    IndexOutOfBounds,
21    #[error("Stream not found")]
22    StreamNotFound,
23    #[error("Buffer overflow")]
24    BufferOverflow,
25    #[error("Arithmetic overflow")]
26    ArithmeticOverflow,
27}
28
29#[derive(Copy, Clone, Debug)]
30pub struct Options {
31    pub max_segments: usize,
32    pub num_playlists: usize,
33    pub max_parts_per_segment: usize,
34    pub max_parted_segments: usize,
35    pub segment_min_ms: u32,
36    pub buffer_size_kb: usize,
37    pub init_size_kb: usize,
38}
39
40impl Default for Options {
41    fn default() -> Self {
42        Options {
43            max_segments: 32,
44            num_playlists: 5,
45            max_parts_per_segment: 128,
46            max_parted_segments: 32,
47            segment_min_ms: 1500,
48            buffer_size_kb: 800,
49            init_size_kb: 5,
50        }
51    }
52}
53
54pub struct Playlists {
55    pub chunk_cache: Arc<ChunkCache>,
56    m3u8_cache: Arc<M3u8Cache>,
57    playlists: Mutex<BTreeMap<u64, M3u8Manifest>>,
58    active: AtomicUsize,
59    options: Options,
60}
61
62impl Playlists {
63    pub fn new(options: Options) -> (Arc<Self>, Arc<ChunkCache>, Arc<M3u8Cache>) {
64        let chunk_cache = Arc::new(ChunkCache::new(options));
65        let m3u8_cache = Arc::new(M3u8Cache::new(options));
66
67        (
68            Arc::new(Self {
69                chunk_cache: Arc::clone(&chunk_cache),
70                m3u8_cache: Arc::clone(&m3u8_cache),
71                playlists: Mutex::new(BTreeMap::new()),
72                active: AtomicUsize::new(0),
73                options,
74            }),
75            Arc::clone(&chunk_cache),
76            Arc::clone(&m3u8_cache),
77        )
78    }
79
80    pub fn active(&self) -> usize {
81        self.active.load(Ordering::SeqCst)
82    }
83
84    pub fn fin(&self, id: u64) {
85        let removed = {
86            let mut playlists = self.playlists.lock().unwrap();
87            playlists.remove(&id).is_some()
88        };
89        if removed {
90            self.active.fetch_sub(1, Ordering::SeqCst);
91        }
92        self.m3u8_cache.zero_stream_id(id);
93        if let Ok(handle) = tokio::runtime::Handle::try_current() {
94            let chunk_cache = Arc::clone(&self.chunk_cache);
95            let _ = handle.spawn(async move {
96                chunk_cache.zero_stream_id(id).await;
97            });
98        } else if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
99            .enable_all()
100            .build()
101        {
102            rt.block_on(self.chunk_cache.zero_stream_id(id));
103        }
104    }
105
106    pub fn add(&self, stream_id: u64, fmp4: Fmp4) -> bool {
107        let mut playlists = self.playlists.lock().unwrap();
108
109        if !playlists.contains_key(&stream_id) {
110            if self.active.load(Ordering::SeqCst) >= self.chunk_cache.options.num_playlists {
111                return false;
112            }
113
114            let n = self.active.fetch_add(1, Ordering::SeqCst);
115            info!("PLAY:NEW active={}", n + 1);
116        }
117
118        let (m3u8, seg, seq, idx, new_seg) = {
119            let playlist: &mut M3u8Manifest = playlists
120                .entry(stream_id)
121                .or_insert_with(|| M3u8Manifest::new(self.options));
122            playlist.add_part(fmp4.duration, fmp4.key)
123        };
124
125        if new_seg {
126            info!("PLAY:UP active={}", self.active());
127        }
128
129        if let Some(init) = fmp4.init {
130            let _ = self.m3u8_cache.set_init(stream_id, init);
131        }
132        //self.fmp4_cache.add(stream_id, seq as usize, fmp4.data);
133        let _ = self.m3u8_cache.add(stream_id, seg, seq, idx, m3u8);
134
135        true
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use bytes::Bytes;
143    use tokio::time::{timeout, Duration};
144
145    #[tokio::test]
146    async fn test_fin_clears_chunk_cache_entry() {
147        let options = Options::default();
148        let (playlists, chunk_cache, _m3u8_cache) = Playlists::new(options);
149        let stream_id = 101;
150
151        let fmp4 = Fmp4 {
152            init: None,
153            key: true,
154            data: Bytes::from_static(b"test"),
155            duration: 500,
156        };
157        assert!(playlists.add(stream_id, fmp4));
158        assert_eq!(playlists.active(), 1);
159
160        let _ = chunk_cache.add_stream_id(stream_id).await;
161        assert!(chunk_cache.get_stream_idx(stream_id).await.is_some());
162
163        playlists.fin(stream_id);
164        assert_eq!(playlists.active(), 0);
165
166        let cleared = timeout(Duration::from_millis(200), async {
167            loop {
168                if chunk_cache.get_stream_idx(stream_id).await.is_none() {
169                    break;
170                }
171                tokio::task::yield_now().await;
172            }
173        })
174        .await;
175
176        assert!(cleared.is_ok());
177    }
178}