ff_preview/playback/decode_buffer.rs
1//! Background-threaded video frame buffer for ff-preview.
2//!
3//! [`DecodeBuffer`] decouples decoder latency from the presentation loop by
4//! running a [`VideoDecoder`] on a background thread and buffering decoded
5//! frames in a bounded ring channel.
6
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::mpsc::{Receiver, Sender, SyncSender, channel, sync_channel};
11use std::thread::{self, JoinHandle};
12use std::time::Duration;
13
14use ff_decode::{HardwareAccel, SeekMode, VideoDecoder};
15use ff_format::VideoFrame;
16
17use crate::error::PreviewError;
18
19// ── Constants ─────────────────────────────────────────────────────────────────
20
21/// Default ring buffer capacity for [`DecodeBuffer`] (frames).
22const DEFAULT_DECODE_BUFFER_CAPACITY: usize = 8;
23
24// ── FrameResult ───────────────────────────────────────────────────────────────
25
26/// The result of a [`DecodeBuffer::pop_frame`] call.
27///
28/// Callers should match on all three variants; discarding `Seeking` is a
29/// common pattern for scrub-bar UIs that want to display the last good frame
30/// while a seek is in progress.
31#[derive(Debug, Clone)]
32pub enum FrameResult {
33 /// A decoded frame ready for presentation.
34 Frame(VideoFrame),
35 /// A seek is in progress; the wrapped value is the last successfully
36 /// decoded frame, or `None` if no frame has been decoded yet.
37 /// Call [`pop_frame`](DecodeBuffer::pop_frame) again after a short delay
38 /// to check whether seeking has completed.
39 Seeking(Option<VideoFrame>),
40 /// End of file — no more frames will be produced.
41 Eof,
42}
43
44// ── SeekEvent ─────────────────────────────────────────────────────────────────
45
46/// An event emitted by [`DecodeBuffer`] after a
47/// [`seek_async`](DecodeBuffer::seek_async) completes.
48///
49/// Obtain the receiver via [`DecodeBuffer::seek_events`] and poll it with
50/// `try_recv()` (non-blocking) or `recv()` (blocking).
51#[derive(Debug)]
52pub enum SeekEvent {
53 /// The seek initiated by `seek_async` has completed.
54 ///
55 /// `pts` is the presentation timestamp of the first frame available after
56 /// the seek. Events are typically delivered within ~200 ms for local files.
57 Completed { pts: Duration },
58}
59
60// ── DecodeBufferBuilder ───────────────────────────────────────────────────────
61
62/// Builder for [`DecodeBuffer`].
63///
64/// Created via [`DecodeBuffer::open`]; call [`capacity`](Self::capacity) to
65/// override the default ring buffer size, then [`build`](Self::build) to start
66/// the background decode thread and obtain a [`DecodeBuffer`].
67pub struct DecodeBufferBuilder {
68 pub(super) path: PathBuf,
69 pub(super) capacity: usize,
70 pub(super) hw_accel: HardwareAccel,
71}
72
73impl DecodeBufferBuilder {
74 /// Set the ring buffer capacity in frames. Default: 8.
75 ///
76 /// The background thread blocks when the buffer is full and resumes as soon
77 /// as the consumer calls [`DecodeBuffer::pop_frame`].
78 #[must_use]
79 pub fn capacity(self, n: usize) -> Self {
80 Self {
81 capacity: n,
82 ..self
83 }
84 }
85
86 /// Set the hardware acceleration mode. Default: [`HardwareAccel::Auto`].
87 ///
88 /// [`HardwareAccel::Auto`] probes available backends in priority order
89 /// (NVDEC → QSV → `VideoToolbox` → VAAPI → AMF) and falls back to software
90 /// decoding without error if none are available.
91 ///
92 /// [`HardwareAccel::None`] forces CPU-only decoding.
93 #[must_use]
94 pub fn hardware_accel(self, accel: HardwareAccel) -> Self {
95 Self {
96 hw_accel: accel,
97 ..self
98 }
99 }
100
101 /// Build and start the background decode thread.
102 ///
103 /// The thread pre-fills the ring buffer; frames are delivered in
104 /// presentation order. The caller receives a [`DecodeBuffer`] immediately;
105 /// frames become available as the thread decodes them.
106 ///
107 /// # Errors
108 ///
109 /// Returns [`PreviewError`] if the video file cannot be opened or contains
110 /// no decodable video stream.
111 pub fn build(self) -> Result<DecodeBuffer, PreviewError> {
112 // Open decoder on the calling thread for early validation.
113 // Propagates FileNotFound / NoVideoStream / Ffmpeg errors immediately.
114 let mut decoder = VideoDecoder::open(&self.path)
115 .hardware_accel(self.hw_accel)
116 .build()?;
117
118 let (tx, rx) = sync_channel(self.capacity);
119 let buffered = Arc::new(AtomicUsize::new(0));
120 let cancel = Arc::new(AtomicBool::new(false));
121
122 let buffered_thread = Arc::clone(&buffered);
123 let cancel_thread = Arc::clone(&cancel);
124
125 let (seek_tx, seek_rx) = channel::<SeekEvent>();
126 let (error_tx, error_rx) = channel::<String>();
127
128 let error_tx_thread = error_tx.clone();
129 let handle = thread::spawn(move || -> VideoDecoder {
130 decode_loop(
131 &mut decoder,
132 &tx,
133 &buffered_thread,
134 &cancel_thread,
135 &error_tx_thread,
136 );
137 decoder
138 });
139
140 Ok(DecodeBuffer {
141 rx: Some(rx),
142 buffered,
143 handle: Some(handle),
144 cancel,
145 capacity: self.capacity,
146 seeking: Arc::new(AtomicBool::new(false)),
147 last_good_frame: None,
148 seek_tx,
149 seek_rx,
150 error_tx,
151 error_rx,
152 })
153 }
154}
155
156// ── DecodeBuffer ──────────────────────────────────────────────────────────────
157
158/// Pre-decodes frames from a video file into a ring buffer on a background thread.
159///
160/// `DecodeBuffer` decouples decoder latency from the presentation loop: the
161/// background thread keeps the buffer filled so [`pop_frame`](Self::pop_frame)
162/// can return the next frame without waiting for the decoder.
163///
164/// The default ring buffer capacity is 8 frames. Use
165/// [`open`](Self::open) → [`capacity`](DecodeBufferBuilder::capacity) →
166/// [`build`](DecodeBufferBuilder::build) to configure a different size.
167///
168/// # Usage
169///
170/// ```ignore
171/// let mut buf = DecodeBuffer::open(Path::new("clip.mp4"))
172/// .capacity(16)
173/// .build()?;
174///
175/// while let Some(frame) = buf.pop_frame() {
176/// // present frame…
177/// }
178/// ```
179///
180/// # Thread safety
181///
182/// `DecodeBuffer` is `Send` but **not** `Sync`; it must be owned by a single
183/// consumer. The internal [`std::sync::mpsc::Receiver`] enforces this.
184pub struct DecodeBuffer {
185 /// `Option` so `Drop` can take and drop the receiver before joining the thread.
186 rx: Option<Receiver<VideoFrame>>,
187 /// Approximate count of frames waiting in the ring buffer.
188 /// Incremented by the background thread on send; decremented by `pop_frame`.
189 buffered: Arc<AtomicUsize>,
190 /// Background decode thread handle. Returns the decoder on exit so `seek()`
191 /// can recover it without reopening the file.
192 handle: Option<JoinHandle<VideoDecoder>>,
193 /// Set to `true` to ask the background thread to exit its decode loop.
194 cancel: Arc<AtomicBool>,
195 /// Channel capacity; needed by `seek()` to create a replacement channel.
196 capacity: usize,
197 /// Set to `true` while an async seek is in progress.
198 seeking: Arc<AtomicBool>,
199 /// The last frame returned by `pop_frame`; replayed as a placeholder
200 /// while `seeking` is true.
201 last_good_frame: Option<VideoFrame>,
202 /// Sender side of the seek event channel; cloned into each seek worker.
203 seek_tx: Sender<SeekEvent>,
204 /// Receiver for seek completion events; exposed via `seek_events()`.
205 seek_rx: Receiver<SeekEvent>,
206 /// Sender side of the decode error channel; cloned into each decode thread.
207 error_tx: Sender<String>,
208 /// Receiver for non-fatal decode error messages; exposed via `error_events()`.
209 error_rx: Receiver<String>,
210}
211
212impl DecodeBuffer {
213 /// Open the video at `path` and return a builder for configuring the buffer.
214 ///
215 /// Chain with [`DecodeBufferBuilder::capacity`] and
216 /// [`DecodeBufferBuilder::build`] to start decoding.
217 #[must_use]
218 pub fn open(path: &Path) -> DecodeBufferBuilder {
219 DecodeBufferBuilder {
220 path: path.to_path_buf(),
221 capacity: DEFAULT_DECODE_BUFFER_CAPACITY,
222 hw_accel: HardwareAccel::Auto,
223 }
224 }
225
226 /// Pop the next decoded frame.
227 ///
228 /// - Returns [`FrameResult::Seeking`] immediately (non-blocking) while a
229 /// [`seek_async`](Self::seek_async) is in progress.
230 /// - Returns [`FrameResult::Frame`] when a frame is available; blocks until
231 /// the background thread produces one.
232 /// - Returns [`FrameResult::Eof`] when the background thread reaches end of
233 /// file or the channel is disconnected.
234 #[must_use]
235 pub fn pop_frame(&mut self) -> FrameResult {
236 if self.seeking.load(Ordering::Acquire) {
237 return FrameResult::Seeking(self.last_good_frame.clone());
238 }
239 match self.rx.as_ref().and_then(|rx| rx.recv().ok()) {
240 Some(frame) => {
241 self.buffered.fetch_sub(1, Ordering::Relaxed);
242 self.last_good_frame = Some(frame.clone());
243 FrameResult::Frame(frame)
244 }
245 None => FrameResult::Eof,
246 }
247 }
248
249 /// Returns an approximation of the number of decoded frames currently
250 /// waiting in the buffer.
251 ///
252 /// This value is advisory only; it may lag the actual buffer state by one
253 /// scheduling quantum. Use it for diagnostics, not flow control.
254 #[must_use]
255 pub fn buffered_frames(&self) -> usize {
256 self.buffered.load(Ordering::Relaxed)
257 }
258
259 /// Returns a reference to the seek event receiver.
260 ///
261 /// After calling [`seek_async`](Self::seek_async), poll this receiver to
262 /// detect when the seek has completed:
263 /// - `try_recv()` — non-blocking; returns `Err(TryRecvError::Empty)` while
264 /// the seek is still in progress.
265 /// - `recv()` — blocks until the seek finishes.
266 ///
267 /// Events are delivered within ~200 ms for local files.
268 /// Unconsumed events accumulate in the channel (one per completed seek).
269 #[must_use]
270 pub fn seek_events(&self) -> &Receiver<SeekEvent> {
271 &self.seek_rx
272 }
273
274 /// Returns the receiver for non-fatal decode error messages.
275 ///
276 /// Poll with `try_recv()` in the presentation loop. Each message
277 /// corresponds to one failed `decode_one()` call in the background thread.
278 /// The background thread exits after sending the error, so
279 /// [`pop_frame`](Self::pop_frame) will return `Eof` shortly after.
280 #[must_use]
281 pub fn error_events(&self) -> &Receiver<String> {
282 &self.error_rx
283 }
284
285 /// Frame-accurate seek to `target_pts`.
286 ///
287 /// Stops the background decode thread, seeks the underlying decoder to the
288 /// nearest preceding I-frame (`AVSEEK_FLAG_BACKWARD` + codec buffer flush),
289 /// then restarts the thread. The restarted thread discards frames until
290 /// `PTS ≥ target_pts` before making them available via [`pop_frame`](Self::pop_frame).
291 ///
292 /// Blocks until the thread has stopped and the seek has been accepted by
293 /// the decoder. Frames are filled asynchronously after the method returns.
294 ///
295 /// # Errors
296 ///
297 /// Returns [`PreviewError::SeekFailed`] if the decode thread panicked or
298 /// if the underlying `FFmpeg` seek fails.
299 pub fn seek(&mut self, target_pts: Duration) -> Result<(), PreviewError> {
300 let (mut decoder, tx) = self.stop_and_seek(target_pts)?;
301 let buffered_thread = Arc::clone(&self.buffered);
302 let cancel_thread = Arc::clone(&self.cancel);
303 let error_tx_thread = self.error_tx.clone();
304
305 self.handle = Some(thread::spawn(move || -> VideoDecoder {
306 // Forward-decode discard: drop frames whose PTS is before target_pts.
307 loop {
308 if cancel_thread.load(Ordering::Acquire) {
309 return decoder;
310 }
311 match decoder.decode_one() {
312 Ok(Some(frame)) => {
313 let pts = if frame.timestamp().is_valid() {
314 frame.timestamp().as_duration()
315 } else {
316 Duration::ZERO
317 };
318 if pts >= target_pts {
319 if tx.send(frame).is_ok() {
320 buffered_thread.fetch_add(1, Ordering::Relaxed);
321 } else {
322 return decoder; // receiver dropped
323 }
324 break; // target frame sent; switch to normal loop
325 }
326 // Frame is before target — discard and continue.
327 }
328 Ok(None) => return decoder, // EOF before target
329 Err(e) => {
330 log::warn!("decode error during seek discard error={e}");
331 let _ = error_tx_thread.send(e.to_string());
332 return decoder;
333 }
334 }
335 }
336
337 // Normal decode loop after the discard phase.
338 decode_loop(
339 &mut decoder,
340 &tx,
341 &buffered_thread,
342 &cancel_thread,
343 &error_tx_thread,
344 );
345 decoder
346 }));
347
348 Ok(())
349 }
350
351 /// Coarse seek to the nearest I-frame at or before `target_pts`.
352 ///
353 /// Faster than [`seek`](Self::seek) because it skips the forward-decode
354 /// discard step. The next [`pop_frame`](Self::pop_frame) returns the frame
355 /// at the I-frame position, which may be up to ±½ GOP before `target_pts`
356 /// (typically ±1–2 s for H.264 at default settings).
357 ///
358 /// **Typical use:** call repeatedly while a scrub-bar is being dragged;
359 /// call [`seek`](Self::seek) on mouse-up for frame accuracy.
360 ///
361 /// # Errors
362 ///
363 /// Returns [`PreviewError::SeekFailed`] if the decode thread panicked or
364 /// if the underlying `FFmpeg` seek fails.
365 pub fn seek_coarse(&mut self, target_pts: Duration) -> Result<(), PreviewError> {
366 log::debug!("coarse seek target_pts={target_pts:?}");
367 let (mut decoder, tx) = self.stop_and_seek(target_pts)?;
368 let buffered_thread = Arc::clone(&self.buffered);
369 let cancel_thread = Arc::clone(&self.cancel);
370 let error_tx_thread = self.error_tx.clone();
371
372 // No discard loop — start the normal decode loop directly from the I-frame.
373 self.handle = Some(thread::spawn(move || -> VideoDecoder {
374 decode_loop(
375 &mut decoder,
376 &tx,
377 &buffered_thread,
378 &cancel_thread,
379 &error_tx_thread,
380 );
381 decoder
382 }));
383
384 Ok(())
385 }
386
387 /// Initiate a frame-accurate seek on a background thread and return immediately.
388 ///
389 /// While seeking is in progress, [`pop_frame`](Self::pop_frame) returns
390 /// [`FrameResult::Seeking`] with the last successfully decoded frame as a
391 /// placeholder. Normal [`FrameResult::Frame`] values resume once the seek
392 /// completes.
393 ///
394 /// The seek uses the same frame-accurate strategy as [`seek`](Self::seek):
395 /// `FFmpeg` jumps to the nearest preceding I-frame, then frames before
396 /// `target_pts` are discarded before the first frame is made available.
397 ///
398 /// If called again before the previous seek completes, the new seek
399 /// supersedes the old one; the old worker exits at the next cancel check.
400 ///
401 /// # Panics
402 ///
403 /// Panics (inside the background worker thread) if the previous decode
404 /// thread panicked — an internal bug that should never occur in practice.
405 pub fn seek_async(&mut self, target_pts: Duration) {
406 log::debug!("async seek started target_pts={target_pts:?}");
407
408 self.seeking.store(true, Ordering::Release);
409 self.cancel.store(true, Ordering::Release);
410
411 if let Some(rx) = &self.rx {
412 while rx.try_recv().is_ok() {
413 self.buffered.fetch_sub(1, Ordering::Relaxed);
414 }
415 }
416
417 let old_handle = self.handle.take();
418 drop(self.rx.take());
419
420 let (new_tx, new_rx) = sync_channel(self.capacity);
421 self.rx = Some(new_rx);
422
423 let buffered = Arc::clone(&self.buffered);
424 let cancel = Arc::clone(&self.cancel);
425 let seeking = Arc::clone(&self.seeking);
426 let seek_event_tx = self.seek_tx.clone();
427 let error_tx_async = self.error_tx.clone();
428
429 let worker = thread::spawn(move || -> VideoDecoder {
430 // Recover the decoder from the old thread. In normal operation the
431 // decode thread never panics so this always succeeds.
432 let Some(mut decoder) = old_handle.and_then(|h| h.join().ok()) else {
433 log::warn!(
434 "seek_async: failed to recover decoder \
435 target_pts={target_pts:?}"
436 );
437 if !cancel.load(Ordering::Acquire) {
438 seeking.store(false, Ordering::Release);
439 }
440 // Unreachable: the decode thread never panics in normal operation.
441 unreachable!("seek_async: decode thread panicked; cannot recover decoder");
442 };
443
444 if let Err(e) = decoder.seek(target_pts, SeekMode::Backward) {
445 log::warn!("seek_async seek failed target_pts={target_pts:?} error={e}");
446 if !cancel.load(Ordering::Acquire) {
447 seeking.store(false, Ordering::Release);
448 }
449 return decoder;
450 }
451
452 buffered.store(0, Ordering::Relaxed);
453 cancel.store(false, Ordering::Release);
454 // Mark seek as complete so pop_frame() transitions to blocking
455 // recv(). Only clear if no newer seek_async has superseded us.
456 if !cancel.load(Ordering::Acquire) {
457 seeking.store(false, Ordering::Release);
458 }
459
460 // Forward-decode discard: skip frames before target_pts.
461 loop {
462 if cancel.load(Ordering::Acquire) {
463 return decoder;
464 }
465 match decoder.decode_one() {
466 Ok(Some(frame)) => {
467 let pts = if frame.timestamp().is_valid() {
468 frame.timestamp().as_duration()
469 } else {
470 Duration::ZERO
471 };
472 if pts >= target_pts {
473 let first_pts = pts;
474 // Send the event BEFORE pushing the frame so that
475 // when pop_frame() wakes up the event is already in
476 // the seek_events channel (avoids a try_recv race).
477 let _ = seek_event_tx.send(SeekEvent::Completed { pts: first_pts });
478 if new_tx.send(frame).is_ok() {
479 buffered.fetch_add(1, Ordering::Relaxed);
480 } else {
481 return decoder; // receiver dropped
482 }
483 break;
484 }
485 // Frame before target — discard.
486 }
487 Ok(None) => return decoder, // EOF before target
488 Err(e) => {
489 log::warn!("seek_async discard error error={e}");
490 let _ = error_tx_async.send(e.to_string());
491 return decoder;
492 }
493 }
494 }
495
496 decode_loop(&mut decoder, &new_tx, &buffered, &cancel, &error_tx_async);
497 decoder
498 });
499
500 self.handle = Some(worker);
501 }
502
503 /// Shared helper for `seek` and `seek_coarse`.
504 ///
505 /// 1. Signals cancel, drains the channel, joins the thread to recover the decoder.
506 /// 2. Seeks the decoder to the nearest I-frame at or before `target_pts`.
507 /// 3. Resets the buffered counter, creates a fresh channel, clears the cancel flag.
508 ///
509 /// Returns `(decoder, SyncSender)` ready for the caller to spawn a new thread.
510 fn stop_and_seek(
511 &mut self,
512 target_pts: Duration,
513 ) -> Result<(VideoDecoder, SyncSender<VideoFrame>), PreviewError> {
514 // 1. Signal the background thread to exit its decode loop.
515 self.cancel.store(true, Ordering::Release);
516
517 // 2. Drain the channel so the background thread is not blocked on send().
518 if let Some(rx) = &self.rx {
519 while rx.try_recv().is_ok() {
520 self.buffered.fetch_sub(1, Ordering::Relaxed);
521 }
522 }
523
524 // 3. Join the thread to recover the decoder.
525 let mut decoder = self
526 .handle
527 .take()
528 .and_then(|h| h.join().ok())
529 .ok_or_else(|| PreviewError::SeekFailed {
530 target: target_pts,
531 reason: "decode thread unavailable for seek".into(),
532 })?;
533
534 // 4. Seek to the nearest I-frame at or before target_pts.
535 // avformat_seek_file with AVSEEK_FLAG_BACKWARD and avcodec_flush_buffers
536 // are handled inside VideoDecoder::seek (ff-decode/video/decoder_inner/seeking.rs).
537 decoder
538 .seek(target_pts, SeekMode::Backward)
539 .map_err(|e| PreviewError::SeekFailed {
540 target: target_pts,
541 reason: e.to_string(),
542 })?;
543
544 // 5. Reset counter, create a fresh channel, clear the cancel flag.
545 self.buffered.store(0, Ordering::Relaxed);
546 let (tx, rx) = sync_channel(self.capacity);
547 self.rx = Some(rx);
548 self.cancel.store(false, Ordering::Release);
549
550 Ok((decoder, tx))
551 }
552}
553
554impl Drop for DecodeBuffer {
555 fn drop(&mut self) {
556 // Signal cancel so the thread exits the decode loop promptly.
557 self.cancel.store(true, Ordering::Release);
558 // Drop the receiver so SyncSender::send() returns Err, unblocking the
559 // thread if it is waiting for space in a full channel.
560 drop(self.rx.take());
561 // Join (ignoring the returned decoder).
562 if let Some(h) = self.handle.take() {
563 let _ = h.join();
564 }
565 }
566}
567
568// ── decode_loop ───────────────────────────────────────────────────────────────
569
570/// Normal decode loop body shared between `build()` and the post-seek thread.
571///
572/// Exits when EOF is reached, a decode error occurs, or the `cancel` flag is set,
573/// or the receiver drops (i.e., `DecodeBuffer` was dropped).
574pub(super) fn decode_loop(
575 decoder: &mut VideoDecoder,
576 tx: &SyncSender<VideoFrame>,
577 buffered: &AtomicUsize,
578 cancel: &AtomicBool,
579 error_tx: &Sender<String>,
580) {
581 loop {
582 if cancel.load(Ordering::Acquire) {
583 break;
584 }
585 match decoder.decode_one() {
586 Ok(Some(frame)) => {
587 if tx.send(frame).is_ok() {
588 buffered.fetch_add(1, Ordering::Relaxed);
589 } else {
590 // Receiver was dropped — DecodeBuffer has been dropped.
591 break;
592 }
593 }
594 Ok(None) => break, // EOF
595 Err(e) => {
596 log::warn!("decode error in background thread error={e}");
597 let _ = error_tx.send(e.to_string());
598 break;
599 }
600 }
601 }
602}
603
604// ── Tests ─────────────────────────────────────────────────────────────────────
605
606#[cfg(test)]
607mod tests {
608 use super::*;
609 use std::path::Path;
610 use std::thread;
611
612 fn test_video_path() -> std::path::PathBuf {
613 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets/video/gameplay.mp4")
614 }
615
616 #[test]
617 fn decode_buffer_build_should_fail_for_nonexistent_file() {
618 let result = DecodeBuffer::open(Path::new("nonexistent_placeholder.mp4")).build();
619 assert!(
620 result.is_err(),
621 "build() must return Err for a non-existent file"
622 );
623 }
624
625 #[test]
626 fn decode_buffer_open_should_use_default_capacity() {
627 let path = test_video_path();
628 let buf = match DecodeBuffer::open(&path).build() {
629 Ok(buf) => buf,
630 Err(e) => {
631 println!("skipping: video file not available: {e}");
632 return;
633 }
634 };
635 // Buffer starts empty; frames arrive asynchronously.
636 assert_eq!(
637 buf.buffered_frames(),
638 0,
639 "buffer must report 0 before any frames have been consumed"
640 );
641 }
642
643 #[test]
644 fn decode_buffer_pop_frame_should_return_some_then_none_at_eof() {
645 let path = test_video_path();
646 let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
647 Ok(buf) => buf,
648 Err(e) => {
649 println!("skipping: video file not available: {e}");
650 return;
651 }
652 };
653 // Pop at least one frame to confirm the decoder is running.
654 assert!(
655 matches!(buf.pop_frame(), FrameResult::Frame(_)),
656 "pop_frame() must return Frame for a valid video file"
657 );
658 }
659
660 #[test]
661 fn seek_should_reposition_to_target_pts() {
662 let path = test_video_path();
663 let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
664 Ok(buf) => buf,
665 Err(e) => {
666 println!("skipping: video file not available: {e}");
667 return;
668 }
669 };
670
671 // Consume a few frames to advance past the start.
672 for _ in 0..5 {
673 if matches!(buf.pop_frame(), FrameResult::Eof) {
674 println!("skipping: EOF before seek target");
675 return;
676 }
677 }
678
679 let seek_target = Duration::from_secs(1);
680 match buf.seek(seek_target) {
681 Ok(()) => {}
682 Err(e) => {
683 println!("skipping: seek not supported or failed: {e}");
684 return;
685 }
686 }
687
688 // After seek, the first frame's PTS must be at or near the target.
689 let frame = match buf.pop_frame() {
690 FrameResult::Frame(f) => f,
691 FrameResult::Eof | FrameResult::Seeking(_) => {
692 println!("skipping: no frame after seek");
693 return;
694 }
695 };
696
697 if frame.timestamp().is_valid() {
698 let pts = frame.timestamp().as_duration();
699 // Allow ±1 second of tolerance (one GOP) for I-frame alignment.
700 assert!(
701 pts >= seek_target.saturating_sub(Duration::from_secs(1)),
702 "post-seek frame PTS must be near target; target={seek_target:?} pts={pts:?}"
703 );
704 }
705 }
706
707 #[test]
708 fn seek_should_fail_for_stopped_buffer() {
709 // Build with non-existent file → build() fails.
710 // This confirms seek errors are propagated correctly.
711 let result = DecodeBuffer::open(Path::new("nonexistent.mp4")).build();
712 assert!(
713 result.is_err(),
714 "build() must fail for non-existent file (precondition for seek error path)"
715 );
716 }
717
718 #[test]
719 fn seek_async_should_send_completed_event_with_first_frame_pts() {
720 let path = test_video_path();
721 let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
722 Ok(buf) => buf,
723 Err(e) => {
724 println!("skipping: video file not available: {e}");
725 return;
726 }
727 };
728
729 // Pop one frame to establish last_good_frame.
730 match buf.pop_frame() {
731 FrameResult::Frame(_) => {}
732 _ => {
733 println!("skipping: no initial frame available");
734 return;
735 }
736 }
737
738 let seek_target = Duration::from_secs(1);
739 buf.seek_async(seek_target);
740
741 // Drive the seek to completion by polling pop_frame.
742 let deadline = std::time::Instant::now() + Duration::from_secs(10);
743 loop {
744 assert!(
745 std::time::Instant::now() < deadline,
746 "timed out waiting for seek to complete"
747 );
748 match buf.pop_frame() {
749 FrameResult::Frame(_) => break, // seek done, first post-seek frame received
750 FrameResult::Seeking(_) => thread::sleep(Duration::from_millis(10)),
751 FrameResult::Eof => {
752 println!("skipping: EOF reached during seek event test");
753 return;
754 }
755 }
756 }
757
758 // After pop_frame returned Frame, SeekEvent::Completed must be in the channel.
759 let event = buf.seek_events().try_recv();
760 assert!(
761 event.is_ok(),
762 "expected SeekEvent::Completed after pop_frame returned Frame; got Err"
763 );
764 if let Ok(SeekEvent::Completed { pts }) = event {
765 assert!(
766 pts >= Duration::ZERO,
767 "seek event pts must be non-negative; got pts={pts:?}"
768 );
769 }
770 }
771
772 #[test]
773 fn seek_async_should_deliver_frames_after_completion() {
774 let path = test_video_path();
775 let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
776 Ok(buf) => buf,
777 Err(e) => {
778 println!("skipping: video file not available: {e}");
779 return;
780 }
781 };
782
783 // Pop one frame to establish last_good_frame.
784 match buf.pop_frame() {
785 FrameResult::Frame(_) => {}
786 _ => {
787 println!("skipping: no initial frame available");
788 return;
789 }
790 }
791
792 let seek_target = Duration::from_secs(1);
793 buf.seek_async(seek_target);
794
795 // Poll until a Frame arrives (seek complete) or we time out.
796 let deadline = std::time::Instant::now() + Duration::from_secs(10);
797 loop {
798 match buf.pop_frame() {
799 FrameResult::Frame(_) => break, // seek completed successfully
800 FrameResult::Seeking(_) => {
801 thread::sleep(Duration::from_millis(10));
802 }
803 FrameResult::Eof => {
804 println!("skipping: EOF reached during seek_async test");
805 return;
806 }
807 }
808 assert!(
809 std::time::Instant::now() < deadline,
810 "seek_async: timed out waiting for seek to complete"
811 );
812 }
813 }
814}