1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6};
7use std::thread;
8use std::time::Duration;
9
10use anyhow::{Context, Result};
11use crossbeam_channel::{Receiver, Sender, TrySendError};
12
13use crate::backend::{create_default_h264_decoder, H264Decoder};
14use crate::mp4::{EncodedSample, Mp4H264Source};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum PixelFormat {
18 Rgba8,
20 Bgra8,
22}
23
24#[derive(Debug)]
31pub struct FrameData {
32 buf: Vec<u8>,
33 pool: Option<Arc<parking_lot::Mutex<Vec<Vec<u8>>>>>,
34 pool_cap: usize,
35}
36
37impl FrameData {
38 pub fn new(buf: Vec<u8>) -> Self {
39 Self {
40 buf,
41 pool: None,
42 pool_cap: 0,
43 }
44 }
45
46 pub fn with_pool(
47 buf: Vec<u8>,
48 pool: Arc<parking_lot::Mutex<Vec<Vec<u8>>>>,
49 pool_cap: usize,
50 ) -> Self {
51 Self {
52 buf,
53 pool: Some(pool),
54 pool_cap,
55 }
56 }
57
58 pub fn as_slice(&self) -> &[u8] {
59 &self.buf
60 }
61
62 pub fn as_mut_slice(&mut self) -> &mut [u8] {
63 &mut self.buf
64 }
65
66 pub fn into_vec(mut self) -> Vec<u8> {
68 self.pool = None;
69 std::mem::take(&mut self.buf)
70 }
71}
72
73impl std::ops::Deref for FrameData {
74 type Target = [u8];
75 fn deref(&self) -> &Self::Target {
76 self.as_slice()
77 }
78}
79
80impl std::ops::DerefMut for FrameData {
81 fn deref_mut(&mut self) -> &mut Self::Target {
82 self.as_mut_slice()
83 }
84}
85
86impl Drop for FrameData {
87 fn drop(&mut self) {
88 let Some(pool) = self.pool.take() else { return; };
89 if self.pool_cap == 0 {
90 return;
91 }
92 let mut g = pool.lock();
94 if g.len() < self.pool_cap {
95 let mut v = Vec::new();
96 std::mem::swap(&mut v, &mut self.buf);
97 g.push(v);
98 }
99 }
100}
101
102#[derive(Debug)]
103pub struct VideoFrame {
104 pub width: u32,
105 pub height: u32,
106 pub pts_us: i64,
107 pub format: PixelFormat,
108 pub data: FrameData,
112}
113
114pub struct VideoCore {
115 path: PathBuf,
116 src: Mp4H264Source,
117 dec: Box<dyn H264Decoder>,
118
119 pending: Option<EncodedSample>,
120 stash: VecDeque<VideoFrame>,
121
122 eof: bool,
123 flushed: bool,
124}
125
126impl VideoCore {
127 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
128 let path = path.as_ref().to_path_buf();
129 let src = Mp4H264Source::open(&path).context("open mp4 source")?;
130 let dec = create_default_h264_decoder(&src.config).context("create decoder backend")?;
131
132 Ok(Self {
133 path,
134 src,
135 dec,
136 pending: None,
137 stash: VecDeque::new(),
138 eof: false,
139 flushed: false,
140 })
141 }
142
143 pub fn width(&self) -> u32 {
144 self.src.config.width
145 }
146
147 pub fn height(&self) -> u32 {
148 self.src.config.height
149 }
150
151 pub fn is_eof(&self) -> bool {
152 self.eof
153 }
154
155 pub fn reset(&mut self) -> Result<()> {
156 let src = Mp4H264Source::open(&self.path)?;
157 let dec = create_default_h264_decoder(&src.config)?;
158 self.src = src;
159 self.dec = dec;
160
161 self.pending = None;
162 self.stash.clear();
163 self.eof = false;
164 self.flushed = false;
165 Ok(())
166 }
167
168 pub fn pump(&mut self) -> Result<()> {
173 const FEED_BUDGET: usize = 4;
175
176 if !self.eof {
177 for _ in 0..FEED_BUDGET {
178 match self.next_sample_cached()? {
179 Some(s) => {
180 self.dec.push(s)?;
181 }
182 None => {
183 self.eof = true;
184 if !self.flushed {
185 self.dec.flush()?;
186 self.flushed = true;
187 }
188 break;
189 }
190 }
191 }
192 }
193
194 while let Some(f) = self.dec.try_receive()? {
196 self.stash.push_back(VideoFrame {
197 width: f.width,
198 height: f.height,
199 pts_us: f.pts_us,
200 format: f.format,
201 data: f.data,
202 });
203 }
204
205 Ok(())
206 }
207
208 pub fn pop_decoded(&mut self) -> Option<VideoFrame> {
210 self.stash.pop_front()
211 }
212
213 pub fn is_finished(&self) -> bool {
215 self.eof && self.flushed && self.pending.is_none() && self.stash.is_empty()
216 }
217
218 fn next_sample_cached(&mut self) -> Result<Option<EncodedSample>> {
219 if let Some(s) = self.pending.take() {
220 return Ok(Some(s));
221 }
222 self.src.next_sample()
223 }
224}
225
226#[derive(Debug)]
227pub struct VideoStream {
228 width: u32,
229 height: u32,
230 rx: Receiver<VideoFrame>,
231 stop: Arc<AtomicBool>,
232 finished: Arc<AtomicBool>,
233 join: Option<std::thread::JoinHandle<()>>,
234}
235
236impl VideoStream {
237 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
238 Self::open_with_options(path, VideoStreamOptions::default())
239 }
240
241 pub fn open_with_options(path: impl AsRef<Path>, opt: VideoStreamOptions) -> Result<Self> {
242 let path = path.as_ref().to_path_buf();
243
244 let src = Mp4H264Source::open(&path).context("open mp4 source for config")?;
245 let width = src.config.width;
246 let height = src.config.height;
247 drop(src);
248
249 let (tx, rx) = crossbeam_channel::bounded::<VideoFrame>(opt.channel_depth.max(1));
250 let stop = Arc::new(AtomicBool::new(false));
251 let finished = Arc::new(AtomicBool::new(false));
252
253 let stop_t = stop.clone();
254 let finished_t = finished.clone();
255 let path_t = path.clone();
256
257 fn push_reorder(
261 reorder: &mut BTreeMap<i64, VecDeque<VideoFrame>>,
262 reorder_len: &mut usize,
263 opt: &VideoStreamOptions,
264 f: VideoFrame,
265 ) {
266 reorder.entry(f.pts_us).or_default().push_back(f);
267 *reorder_len += 1;
268
269 while *reorder_len > opt.reorder_max_frames {
272 let k = match reorder.keys().next().copied() {
273 Some(k) => k,
274 None => break,
275 };
276
277 let mut remove_key = false;
278 let dropped = {
279 let q = match reorder.get_mut(&k) {
280 Some(q) => q,
281 None => break,
282 };
283 let dropped = q.pop_front().is_some();
284 if q.is_empty() {
285 remove_key = true;
286 }
287 dropped
288 };
289
290 if remove_key {
291 reorder.remove(&k);
292 }
293
294 if dropped {
295 *reorder_len = (*reorder_len).saturating_sub(1);
296 } else {
297 break;
299 }
300 }
301 }
302
303 fn pop_next_ready(
304 reorder: &mut BTreeMap<i64, VecDeque<VideoFrame>>,
305 reorder_len: &mut usize,
306 ) -> Option<VideoFrame> {
307 let k = reorder.keys().next().copied()?;
308
309 let mut remove_key = false;
310 let f = {
311 let q = reorder.get_mut(&k)?;
312 let f = q.pop_front();
313 if q.is_empty() {
314 remove_key = true;
315 }
316 f
317 };
318
319 if remove_key {
320 reorder.remove(&k);
321 }
322 if f.is_some() {
323 *reorder_len = (*reorder_len).saturating_sub(1);
324 }
325 f
326 }
327
328 let join = thread::spawn(move || {
329 let mut core = match VideoCore::open(&path_t) {
330 Ok(c) => c,
331 Err(e) => {
332 log::error!("VideoCore::open failed in decode thread: {e:?}");
333 finished_t.store(true, Ordering::Relaxed);
334 return;
335 }
336 };
337
338 let mut reorder: BTreeMap<i64, VecDeque<VideoFrame>> = BTreeMap::new();
342 let mut reorder_len: usize = 0;
343 let mut started_at: Option<std::time::Instant> = None;
344 let mut base_pts_us: i64 = 0;
345
346 let mut pace_next = |pts_us: i64| {
347 if !opt.paced {
348 return;
349 }
350 let now = std::time::Instant::now();
351 if started_at.is_none() {
352 started_at = Some(now);
353 base_pts_us = pts_us;
354 return;
355 }
356 let delta_us = pts_us.saturating_sub(base_pts_us).max(0) as u64;
357 let target = started_at.unwrap() + Duration::from_micros(delta_us);
358 if target > now {
359 let mut remaining = target.duration_since(now);
361 while remaining > Duration::from_millis(5) {
362 thread::sleep(Duration::from_millis(5));
363 if stop_t.load(Ordering::Relaxed) {
364 return;
365 }
366 remaining = target.saturating_duration_since(std::time::Instant::now());
367 }
368 if remaining > Duration::from_micros(0) {
369 thread::sleep(remaining);
370 }
371 }
372 };
373
374 loop {
377 if stop_t.load(Ordering::Relaxed) {
378 break;
379 }
380
381 let want = opt.ahead_frames.max(opt.reorder_depth_frames);
383 while reorder_len < want {
384 if let Err(e) = core.pump() {
385 log::error!("video decode thread pump error: {e:?}");
386 finished_t.store(true, Ordering::Relaxed);
387 return;
388 }
389 let mut produced_any = false;
390 while let Some(frame) = core.pop_decoded() {
391 produced_any = true;
392 push_reorder(&mut reorder, &mut reorder_len, &opt, frame);
393 }
394 if !produced_any {
395 break;
396 }
397 }
398
399 let have_ready = reorder_len >= opt.reorder_depth_frames || core.is_finished();
402 if have_ready {
403 if let Some(mut frame) = pop_next_ready(&mut reorder, &mut reorder_len) {
404 pace_next(frame.pts_us);
405
406 if stop_t.load(Ordering::Relaxed) {
407 break;
408 }
409
410 match tx.try_send(frame) {
412 Ok(()) => {}
413 Err(TrySendError::Full(f)) => {
414 drop(f);
416 thread::yield_now();
417 }
418 Err(TrySendError::Disconnected(_)) => {
419 finished_t.store(true, Ordering::Relaxed);
420 return;
421 }
422 }
423 } else {
424 thread::sleep(Duration::from_millis(1));
425 }
426 } else {
427 thread::sleep(Duration::from_millis(1));
429 }
430
431 if core.is_finished() && reorder_len == 0 {
432 finished_t.store(true, Ordering::Relaxed);
433 break;
434 }
435 }
436 });
437
438 Ok(Self {
439 width,
440 height,
441 rx,
442 stop,
443 finished,
444 join: Some(join),
445 })
446 }
447
448 pub fn width(&self) -> u32 {
449 self.width
450 }
451
452 pub fn height(&self) -> u32 {
453 self.height
454 }
455
456 pub fn is_finished(&self) -> bool {
457 self.finished.load(Ordering::Relaxed)
458 }
459
460 pub fn try_recv_one(&self) -> Option<VideoFrame> {
461 match self.rx.try_recv() {
462 Ok(f) => Some(f),
463 Err(_) => None,
464 }
465 }
466
467 pub fn stop(&self) {
468 self.stop.store(true, Ordering::Relaxed);
469 }
470}
471
472#[derive(Debug, Clone, Copy)]
473pub struct VideoStreamOptions {
474 pub paced: bool,
476 pub channel_depth: usize,
478 pub ahead_frames: usize,
480
481 pub reorder_depth_frames: usize,
483
484 pub reorder_max_frames: usize,
486}
487
488impl Default for VideoStreamOptions {
489 fn default() -> Self {
490 Self {
491 paced: true,
492 channel_depth: 8,
493 ahead_frames: 6,
494 reorder_depth_frames: 16,
495 reorder_max_frames: 64,
496 }
497 }
498}
499
500impl Drop for VideoStream {
501 fn drop(&mut self) {
502 self.stop.store(true, Ordering::Relaxed);
503 if let Some(j) = self.join.take() {
504 let _ = j.join();
505 }
506 }
507}