Skip to main content

video_sys/
core.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::sync::{
4    atomic::{AtomicBool, Ordering},
5    Arc,
6};
7use std::thread;
8use std::time::Duration;
9
10use anyhow::{Context, Result};
11use crossbeam_channel::{Receiver, Sender, TrySendError};
12
13use crate::backend::{create_default_h264_decoder, H264Decoder};
14use crate::mp4::{EncodedSample, Mp4H264Source};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum PixelFormat {
18    /// 8-bit RGBA (R,G,B,A in memory order).
19    Rgba8,
20    /// 8-bit BGRA (B,G,R,A in memory order).
21    Bgra8,
22}
23
24/// Frame payload bytes.
25///
26/// Some backends (notably macOS VideoToolbox) can allocate multi-megabyte
27/// buffers per frame. Repeated allocations can cause RSS growth due to allocator
28/// behavior. `FrameData` optionally carries a pool handle so the buffer can be
29/// recycled on drop, keeping memory stable.
30#[derive(Debug)]
31pub struct FrameData {
32    buf: Vec<u8>,
33    pool: Option<Arc<parking_lot::Mutex<Vec<Vec<u8>>>>>,
34    pool_cap: usize,
35}
36
37impl FrameData {
38    pub fn new(buf: Vec<u8>) -> Self {
39        Self {
40            buf,
41            pool: None,
42            pool_cap: 0,
43        }
44    }
45
46    pub fn with_pool(
47        buf: Vec<u8>,
48        pool: Arc<parking_lot::Mutex<Vec<Vec<u8>>>>,
49        pool_cap: usize,
50    ) -> Self {
51        Self {
52            buf,
53            pool: Some(pool),
54            pool_cap,
55        }
56    }
57
58    pub fn as_slice(&self) -> &[u8] {
59        &self.buf
60    }
61
62    pub fn as_mut_slice(&mut self) -> &mut [u8] {
63        &mut self.buf
64    }
65
66    /// Detach from the pool and return the owned `Vec<u8>`.
67    pub fn into_vec(mut self) -> Vec<u8> {
68        self.pool = None;
69        std::mem::take(&mut self.buf)
70    }
71}
72
73impl std::ops::Deref for FrameData {
74    type Target = [u8];
75    fn deref(&self) -> &Self::Target {
76        self.as_slice()
77    }
78}
79
80impl std::ops::DerefMut for FrameData {
81    fn deref_mut(&mut self) -> &mut Self::Target {
82        self.as_mut_slice()
83    }
84}
85
86impl Drop for FrameData {
87    fn drop(&mut self) {
88        let Some(pool) = self.pool.take() else { return; };
89        if self.pool_cap == 0 {
90            return;
91        }
92        // Return buffer to pool if there's room.
93        let mut g = pool.lock();
94        if g.len() < self.pool_cap {
95            let mut v = Vec::new();
96            std::mem::swap(&mut v, &mut self.buf);
97            g.push(v);
98        }
99    }
100}
101
102#[derive(Debug)]
103pub struct VideoFrame {
104    pub width: u32,
105    pub height: u32,
106    pub pts_us: i64,
107    pub format: PixelFormat,
108    /// Tight-packed pixel buffer.
109    ///
110    /// For `Rgba8` or `Bgra8`, length is `width * height * 4`.
111    pub data: FrameData,
112}
113
114pub struct VideoCore {
115    path: PathBuf,
116    src: Mp4H264Source,
117    dec: Box<dyn H264Decoder>,
118
119    pending: Option<EncodedSample>,
120    stash: VecDeque<VideoFrame>,
121
122    eof: bool,
123    flushed: bool,
124}
125
126impl VideoCore {
127    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
128        let path = path.as_ref().to_path_buf();
129        let src = Mp4H264Source::open(&path).context("open mp4 source")?;
130        let dec = create_default_h264_decoder(&src.config).context("create decoder backend")?;
131
132        Ok(Self {
133            path,
134            src,
135            dec,
136            pending: None,
137            stash: VecDeque::new(),
138            eof: false,
139            flushed: false,
140        })
141    }
142
143    pub fn width(&self) -> u32 {
144        self.src.config.width
145    }
146
147    pub fn height(&self) -> u32 {
148        self.src.config.height
149    }
150
151    pub fn is_eof(&self) -> bool {
152        self.eof
153    }
154
155    pub fn reset(&mut self) -> Result<()> {
156        let src = Mp4H264Source::open(&self.path)?;
157        let dec = create_default_h264_decoder(&src.config)?;
158        self.src = src;
159        self.dec = dec;
160
161        self.pending = None;
162        self.stash.clear();
163        self.eof = false;
164        self.flushed = false;
165        Ok(())
166    }
167
168    /// Feed some compressed samples into the decoder and drain all available decoded frames.
169    ///
170    /// This is a pure "producer" pump: it does NOT follow wall-clock or playhead.
171    /// The renderer/player should decide what to present based on PTS.
172    pub fn pump(&mut self) -> Result<()> {
173        // Feed a bounded number of samples per pump to avoid monopolizing CPU.
174        const FEED_BUDGET: usize = 4;
175
176        if !self.eof {
177            for _ in 0..FEED_BUDGET {
178                match self.next_sample_cached()? {
179                    Some(s) => {
180                        self.dec.push(s)?;
181                    }
182                    None => {
183                        self.eof = true;
184                        if !self.flushed {
185                            self.dec.flush()?;
186                            self.flushed = true;
187                        }
188                        break;
189                    }
190                }
191            }
192        }
193
194        // Drain decoder outputs into stash (do NOT drop anything here).
195        while let Some(f) = self.dec.try_receive()? {
196            self.stash.push_back(VideoFrame {
197                width: f.width,
198                height: f.height,
199                pts_us: f.pts_us,
200                format: f.format,
201                data: f.data,
202            });
203        }
204
205        Ok(())
206    }
207
208    /// Pop the next decoded frame in presentation order (if any).
209    pub fn pop_decoded(&mut self) -> Option<VideoFrame> {
210        self.stash.pop_front()
211    }
212
213    /// Finished means: we reached EOF, flushed, and no pending input/output remains.
214    pub fn is_finished(&self) -> bool {
215        self.eof && self.flushed && self.pending.is_none() && self.stash.is_empty()
216    }
217
218    fn next_sample_cached(&mut self) -> Result<Option<EncodedSample>> {
219        if let Some(s) = self.pending.take() {
220            return Ok(Some(s));
221        }
222        self.src.next_sample()
223    }
224}
225
226#[derive(Debug)]
227pub struct VideoStream {
228    width: u32,
229    height: u32,
230    rx: Receiver<VideoFrame>,
231    stop: Arc<AtomicBool>,
232    finished: Arc<AtomicBool>,
233    join: Option<std::thread::JoinHandle<()>>,
234}
235
236impl VideoStream {
237    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
238        Self::open_with_options(path, VideoStreamOptions::default())
239    }
240
241    pub fn open_with_options(path: impl AsRef<Path>, opt: VideoStreamOptions) -> Result<Self> {
242        let path = path.as_ref().to_path_buf();
243
244        let src = Mp4H264Source::open(&path).context("open mp4 source for config")?;
245        let width = src.config.width;
246        let height = src.config.height;
247        drop(src);
248
249        let (tx, rx) = crossbeam_channel::bounded::<VideoFrame>(opt.channel_depth.max(1));
250        let stop = Arc::new(AtomicBool::new(false));
251        let finished = Arc::new(AtomicBool::new(false));
252
253        let stop_t = stop.clone();
254        let finished_t = finished.clone();
255        let path_t = path.clone();
256
257        // Helper functions for PTS reordering.
258        // NOTE: these are plain functions (not closures) to avoid borrowing `reorder`
259        // mutably from multiple closures at the same time.
260        fn push_reorder(
261            reorder: &mut BTreeMap<i64, VecDeque<VideoFrame>>,
262            reorder_len: &mut usize,
263            opt: &VideoStreamOptions,
264            f: VideoFrame,
265        ) {
266            reorder.entry(f.pts_us).or_default().push_back(f);
267            *reorder_len += 1;
268
269            // Hard cap: if something goes wrong (consumer stalls, timestamps weird),
270            // never allow unbounded growth.
271            while *reorder_len > opt.reorder_max_frames {
272                let k = match reorder.keys().next().copied() {
273                    Some(k) => k,
274                    None => break,
275                };
276
277                let mut remove_key = false;
278                let dropped = {
279                    let q = match reorder.get_mut(&k) {
280                        Some(q) => q,
281                        None => break,
282                    };
283                    let dropped = q.pop_front().is_some();
284                    if q.is_empty() {
285                        remove_key = true;
286                    }
287                    dropped
288                };
289
290                if remove_key {
291                    reorder.remove(&k);
292                }
293
294                if dropped {
295                    *reorder_len = (*reorder_len).saturating_sub(1);
296                } else {
297                    // If we couldn't drop anything, stop to avoid spinning.
298                    break;
299                }
300            }
301        }
302
303        fn pop_next_ready(
304            reorder: &mut BTreeMap<i64, VecDeque<VideoFrame>>,
305            reorder_len: &mut usize,
306        ) -> Option<VideoFrame> {
307            let k = reorder.keys().next().copied()?;
308
309            let mut remove_key = false;
310            let f = {
311                let q = reorder.get_mut(&k)?;
312                let f = q.pop_front();
313                if q.is_empty() {
314                    remove_key = true;
315                }
316                f
317            };
318
319            if remove_key {
320                reorder.remove(&k);
321            }
322            if f.is_some() {
323                *reorder_len = (*reorder_len).saturating_sub(1);
324            }
325            f
326        }
327
328        let join = thread::spawn(move || {
329            let mut core = match VideoCore::open(&path_t) {
330                Ok(c) => c,
331                Err(e) => {
332                    log::error!("VideoCore::open failed in decode thread: {e:?}");
333                    finished_t.store(true, Ordering::Relaxed);
334                    return;
335                }
336            };
337
338            // PTS reordering buffer: some decoders (depending on backend and flags)
339            // can output frames in decode order. For H.264 with B-frames, PTS is
340            // not monotonic in decode order. We reorder by PTS with a small window.
341            let mut reorder: BTreeMap<i64, VecDeque<VideoFrame>> = BTreeMap::new();
342            let mut reorder_len: usize = 0;
343            let mut started_at: Option<std::time::Instant> = None;
344            let mut base_pts_us: i64 = 0;
345
346            let mut pace_next = |pts_us: i64| {
347                if !opt.paced {
348                    return;
349                }
350                let now = std::time::Instant::now();
351                if started_at.is_none() {
352                    started_at = Some(now);
353                    base_pts_us = pts_us;
354                    return;
355                }
356                let delta_us = pts_us.saturating_sub(base_pts_us).max(0) as u64;
357                let target = started_at.unwrap() + Duration::from_micros(delta_us);
358                if target > now {
359                    // Sleep in small slices so we can still observe stop signals.
360                    let mut remaining = target.duration_since(now);
361                    while remaining > Duration::from_millis(5) {
362                        thread::sleep(Duration::from_millis(5));
363                        if stop_t.load(Ordering::Relaxed) {
364                            return;
365                        }
366                        remaining = target.saturating_duration_since(std::time::Instant::now());
367                    }
368                    if remaining > Duration::from_micros(0) {
369                        thread::sleep(remaining);
370                    }
371                }
372            };
373
374            // `push_reorder`/`pop_next_ready` are plain fns (defined above).
375
376            loop {
377                if stop_t.load(Ordering::Relaxed) {
378                    break;
379                }
380
381                // 1) Ensure we have a small amount of decoded frames buffered.
382                let want = opt.ahead_frames.max(opt.reorder_depth_frames);
383                while reorder_len < want {
384                    if let Err(e) = core.pump() {
385                        log::error!("video decode thread pump error: {e:?}");
386                        finished_t.store(true, Ordering::Relaxed);
387                        return;
388                    }
389                    let mut produced_any = false;
390                    while let Some(frame) = core.pop_decoded() {
391                        produced_any = true;
392                        push_reorder(&mut reorder, &mut reorder_len, &opt, frame);
393                    }
394                    if !produced_any {
395                        break;
396                    }
397                }
398
399                // 2) Ship the next frame (paced) or idle.
400                // We wait until we have enough frames to safely reorder, unless at EOF.
401                let have_ready = reorder_len >= opt.reorder_depth_frames || core.is_finished();
402                if have_ready {
403                    if let Some(mut frame) = pop_next_ready(&mut reorder, &mut reorder_len) {
404                        pace_next(frame.pts_us);
405
406                        if stop_t.load(Ordering::Relaxed) {
407                            break;
408                        }
409
410                        // Try to send; if consumer is behind, drop frames (real-time).
411                        match tx.try_send(frame) {
412                            Ok(()) => {}
413                            Err(TrySendError::Full(f)) => {
414                                // Drop this frame to avoid unbounded latency.
415                                drop(f);
416                                thread::yield_now();
417                            }
418                            Err(TrySendError::Disconnected(_)) => {
419                                finished_t.store(true, Ordering::Relaxed);
420                                return;
421                            }
422                        }
423                    } else {
424                        thread::sleep(Duration::from_millis(1));
425                    }
426                } else {
427                    // No decoded frames available yet.
428                    thread::sleep(Duration::from_millis(1));
429                }
430
431                if core.is_finished() && reorder_len == 0 {
432                    finished_t.store(true, Ordering::Relaxed);
433                    break;
434                }
435            }
436        });
437
438        Ok(Self {
439            width,
440            height,
441            rx,
442            stop,
443            finished,
444            join: Some(join),
445        })
446    }
447
448    pub fn width(&self) -> u32 {
449        self.width
450    }
451
452    pub fn height(&self) -> u32 {
453        self.height
454    }
455
456    pub fn is_finished(&self) -> bool {
457        self.finished.load(Ordering::Relaxed)
458    }
459
460    pub fn try_recv_one(&self) -> Option<VideoFrame> {
461        match self.rx.try_recv() {
462            Ok(f) => Some(f),
463            Err(_) => None,
464        }
465    }
466
467    pub fn stop(&self) {
468        self.stop.store(true, Ordering::Relaxed);
469    }
470}
471
472#[derive(Debug, Clone, Copy)]
473pub struct VideoStreamOptions {
474    /// If true, the stream will pace frames according to PTS.
475    pub paced: bool,
476    /// Channel depth between decode thread and consumer.
477    pub channel_depth: usize,
478    /// How many decoded frames to keep buffered ahead.
479    pub ahead_frames: usize,
480
481    /// Reorder window (frames). If output PTS is already monotonic, this is harmless.
482    pub reorder_depth_frames: usize,
483
484    /// Hard cap to prevent unbounded memory growth due to timestamp/pathological cases.
485    pub reorder_max_frames: usize,
486}
487
488impl Default for VideoStreamOptions {
489    fn default() -> Self {
490        Self {
491            paced: true,
492            channel_depth: 8,
493            ahead_frames: 6,
494            reorder_depth_frames: 16,
495            reorder_max_frames: 64,
496        }
497    }
498}
499
500impl Drop for VideoStream {
501    fn drop(&mut self) {
502        self.stop.store(true, Ordering::Relaxed);
503        if let Some(j) = self.join.take() {
504            let _ = j.join();
505        }
506    }
507}