Skip to main content

ff_preview/playback/
decode_buffer.rs

1//! Background-threaded video frame buffer for ff-preview.
2//!
3//! [`DecodeBuffer`] decouples decoder latency from the presentation loop by
4//! running a [`VideoDecoder`] on a background thread and buffering decoded
5//! frames in a bounded ring channel.
6
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::mpsc::{Receiver, Sender, SyncSender, channel, sync_channel};
11use std::thread::{self, JoinHandle};
12use std::time::Duration;
13
14use ff_decode::{HardwareAccel, SeekMode, VideoDecoder};
15use ff_format::VideoFrame;
16
17use crate::error::PreviewError;
18
19// ── Constants ─────────────────────────────────────────────────────────────────
20
21/// Default ring buffer capacity for [`DecodeBuffer`] (frames).
22const DEFAULT_DECODE_BUFFER_CAPACITY: usize = 8;
23
24// ── FrameResult ───────────────────────────────────────────────────────────────
25
26/// The result of a [`DecodeBuffer::pop_frame`] call.
27///
28/// Callers should match on all three variants; discarding `Seeking` is a
29/// common pattern for scrub-bar UIs that want to display the last good frame
30/// while a seek is in progress.
31#[derive(Debug, Clone)]
32pub enum FrameResult {
33    /// A decoded frame ready for presentation.
34    Frame(VideoFrame),
35    /// A seek is in progress; the wrapped value is the last successfully
36    /// decoded frame, or `None` if no frame has been decoded yet.
37    /// Call [`pop_frame`](DecodeBuffer::pop_frame) again after a short delay
38    /// to check whether seeking has completed.
39    Seeking(Option<VideoFrame>),
40    /// End of file — no more frames will be produced.
41    Eof,
42}
43
44// ── SeekEvent ─────────────────────────────────────────────────────────────────
45
46/// An event emitted by [`DecodeBuffer`] after a
47/// [`seek_async`](DecodeBuffer::seek_async) completes.
48///
49/// Obtain the receiver via [`DecodeBuffer::seek_events`] and poll it with
50/// `try_recv()` (non-blocking) or `recv()` (blocking).
51#[derive(Debug)]
52pub enum SeekEvent {
53    /// The seek initiated by `seek_async` has completed.
54    ///
55    /// `pts` is the presentation timestamp of the first frame available after
56    /// the seek. Events are typically delivered within ~200 ms for local files.
57    Completed { pts: Duration },
58}
59
60// ── DecodeBufferBuilder ───────────────────────────────────────────────────────
61
62/// Builder for [`DecodeBuffer`].
63///
64/// Created via [`DecodeBuffer::open`]; call [`capacity`](Self::capacity) to
65/// override the default ring buffer size, then [`build`](Self::build) to start
66/// the background decode thread and obtain a [`DecodeBuffer`].
67pub struct DecodeBufferBuilder {
68    pub(super) path: PathBuf,
69    pub(super) capacity: usize,
70    pub(super) hw_accel: HardwareAccel,
71}
72
73impl DecodeBufferBuilder {
74    /// Set the ring buffer capacity in frames. Default: 8.
75    ///
76    /// The background thread blocks when the buffer is full and resumes as soon
77    /// as the consumer calls [`DecodeBuffer::pop_frame`].
78    #[must_use]
79    pub fn capacity(self, n: usize) -> Self {
80        Self {
81            capacity: n,
82            ..self
83        }
84    }
85
86    /// Set the hardware acceleration mode. Default: [`HardwareAccel::Auto`].
87    ///
88    /// [`HardwareAccel::Auto`] probes available backends in priority order
89    /// (NVDEC → QSV → `VideoToolbox` → VAAPI → AMF) and falls back to software
90    /// decoding without error if none are available.
91    ///
92    /// [`HardwareAccel::None`] forces CPU-only decoding.
93    #[must_use]
94    pub fn hardware_accel(self, accel: HardwareAccel) -> Self {
95        Self {
96            hw_accel: accel,
97            ..self
98        }
99    }
100
101    /// Build and start the background decode thread.
102    ///
103    /// The thread pre-fills the ring buffer; frames are delivered in
104    /// presentation order. The caller receives a [`DecodeBuffer`] immediately;
105    /// frames become available as the thread decodes them.
106    ///
107    /// # Errors
108    ///
109    /// Returns [`PreviewError`] if the video file cannot be opened or contains
110    /// no decodable video stream.
111    pub fn build(self) -> Result<DecodeBuffer, PreviewError> {
112        // Open decoder on the calling thread for early validation.
113        // Propagates FileNotFound / NoVideoStream / Ffmpeg errors immediately.
114        let mut decoder = VideoDecoder::open(&self.path)
115            .hardware_accel(self.hw_accel)
116            .build()?;
117
118        let (tx, rx) = sync_channel(self.capacity);
119        let buffered = Arc::new(AtomicUsize::new(0));
120        let cancel = Arc::new(AtomicBool::new(false));
121
122        let buffered_thread = Arc::clone(&buffered);
123        let cancel_thread = Arc::clone(&cancel);
124
125        let (seek_tx, seek_rx) = channel::<SeekEvent>();
126        let (error_tx, error_rx) = channel::<String>();
127
128        let error_tx_thread = error_tx.clone();
129        let handle = thread::spawn(move || -> VideoDecoder {
130            decode_loop(
131                &mut decoder,
132                &tx,
133                &buffered_thread,
134                &cancel_thread,
135                &error_tx_thread,
136            );
137            decoder
138        });
139
140        Ok(DecodeBuffer {
141            rx: Some(rx),
142            buffered,
143            handle: Some(handle),
144            cancel,
145            capacity: self.capacity,
146            seeking: Arc::new(AtomicBool::new(false)),
147            last_good_frame: None,
148            seek_tx,
149            seek_rx,
150            error_tx,
151            error_rx,
152        })
153    }
154}
155
156// ── DecodeBuffer ──────────────────────────────────────────────────────────────
157
158/// Pre-decodes frames from a video file into a ring buffer on a background thread.
159///
160/// `DecodeBuffer` decouples decoder latency from the presentation loop: the
161/// background thread keeps the buffer filled so [`pop_frame`](Self::pop_frame)
162/// can return the next frame without waiting for the decoder.
163///
164/// The default ring buffer capacity is 8 frames. Use
165/// [`open`](Self::open) → [`capacity`](DecodeBufferBuilder::capacity) →
166/// [`build`](DecodeBufferBuilder::build) to configure a different size.
167///
168/// # Usage
169///
170/// ```ignore
171/// let mut buf = DecodeBuffer::open(Path::new("clip.mp4"))
172///     .capacity(16)
173///     .build()?;
174///
175/// while let Some(frame) = buf.pop_frame() {
176///     // present frame…
177/// }
178/// ```
179///
180/// # Thread safety
181///
182/// `DecodeBuffer` is `Send` but **not** `Sync`; it must be owned by a single
183/// consumer. The internal [`std::sync::mpsc::Receiver`] enforces this.
184pub struct DecodeBuffer {
185    /// `Option` so `Drop` can take and drop the receiver before joining the thread.
186    rx: Option<Receiver<VideoFrame>>,
187    /// Approximate count of frames waiting in the ring buffer.
188    /// Incremented by the background thread on send; decremented by `pop_frame`.
189    buffered: Arc<AtomicUsize>,
190    /// Background decode thread handle. Returns the decoder on exit so `seek()`
191    /// can recover it without reopening the file.
192    handle: Option<JoinHandle<VideoDecoder>>,
193    /// Set to `true` to ask the background thread to exit its decode loop.
194    cancel: Arc<AtomicBool>,
195    /// Channel capacity; needed by `seek()` to create a replacement channel.
196    capacity: usize,
197    /// Set to `true` while an async seek is in progress.
198    seeking: Arc<AtomicBool>,
199    /// The last frame returned by `pop_frame`; replayed as a placeholder
200    /// while `seeking` is true.
201    last_good_frame: Option<VideoFrame>,
202    /// Sender side of the seek event channel; cloned into each seek worker.
203    seek_tx: Sender<SeekEvent>,
204    /// Receiver for seek completion events; exposed via `seek_events()`.
205    seek_rx: Receiver<SeekEvent>,
206    /// Sender side of the decode error channel; cloned into each decode thread.
207    error_tx: Sender<String>,
208    /// Receiver for non-fatal decode error messages; exposed via `error_events()`.
209    error_rx: Receiver<String>,
210}
211
212impl DecodeBuffer {
213    /// Open the video at `path` and return a builder for configuring the buffer.
214    ///
215    /// Chain with [`DecodeBufferBuilder::capacity`] and
216    /// [`DecodeBufferBuilder::build`] to start decoding.
217    #[must_use]
218    pub fn open(path: &Path) -> DecodeBufferBuilder {
219        DecodeBufferBuilder {
220            path: path.to_path_buf(),
221            capacity: DEFAULT_DECODE_BUFFER_CAPACITY,
222            hw_accel: HardwareAccel::Auto,
223        }
224    }
225
226    /// Pop the next decoded frame.
227    ///
228    /// - Returns [`FrameResult::Seeking`] immediately (non-blocking) while a
229    ///   [`seek_async`](Self::seek_async) is in progress.
230    /// - Returns [`FrameResult::Frame`] when a frame is available; blocks until
231    ///   the background thread produces one.
232    /// - Returns [`FrameResult::Eof`] when the background thread reaches end of
233    ///   file or the channel is disconnected.
234    #[must_use]
235    pub fn pop_frame(&mut self) -> FrameResult {
236        if self.seeking.load(Ordering::Acquire) {
237            return FrameResult::Seeking(self.last_good_frame.clone());
238        }
239        match self.rx.as_ref().and_then(|rx| rx.recv().ok()) {
240            Some(frame) => {
241                self.buffered.fetch_sub(1, Ordering::Relaxed);
242                self.last_good_frame = Some(frame.clone());
243                FrameResult::Frame(frame)
244            }
245            None => FrameResult::Eof,
246        }
247    }
248
249    /// Returns an approximation of the number of decoded frames currently
250    /// waiting in the buffer.
251    ///
252    /// This value is advisory only; it may lag the actual buffer state by one
253    /// scheduling quantum. Use it for diagnostics, not flow control.
254    #[must_use]
255    pub fn buffered_frames(&self) -> usize {
256        self.buffered.load(Ordering::Relaxed)
257    }
258
259    /// Returns a reference to the seek event receiver.
260    ///
261    /// After calling [`seek_async`](Self::seek_async), poll this receiver to
262    /// detect when the seek has completed:
263    /// - `try_recv()` — non-blocking; returns `Err(TryRecvError::Empty)` while
264    ///   the seek is still in progress.
265    /// - `recv()` — blocks until the seek finishes.
266    ///
267    /// Events are delivered within ~200 ms for local files.
268    /// Unconsumed events accumulate in the channel (one per completed seek).
269    #[must_use]
270    pub fn seek_events(&self) -> &Receiver<SeekEvent> {
271        &self.seek_rx
272    }
273
274    /// Returns the receiver for non-fatal decode error messages.
275    ///
276    /// Poll with `try_recv()` in the presentation loop. Each message
277    /// corresponds to one failed `decode_one()` call in the background thread.
278    /// The background thread exits after sending the error, so
279    /// [`pop_frame`](Self::pop_frame) will return `Eof` shortly after.
280    #[must_use]
281    pub fn error_events(&self) -> &Receiver<String> {
282        &self.error_rx
283    }
284
285    /// Frame-accurate seek to `target_pts`.
286    ///
287    /// Stops the background decode thread, seeks the underlying decoder to the
288    /// nearest preceding I-frame (`AVSEEK_FLAG_BACKWARD` + codec buffer flush),
289    /// then restarts the thread. The restarted thread discards frames until
290    /// `PTS ≥ target_pts` before making them available via [`pop_frame`](Self::pop_frame).
291    ///
292    /// Blocks until the thread has stopped and the seek has been accepted by
293    /// the decoder. Frames are filled asynchronously after the method returns.
294    ///
295    /// # Errors
296    ///
297    /// Returns [`PreviewError::SeekFailed`] if the decode thread panicked or
298    /// if the underlying `FFmpeg` seek fails.
299    pub fn seek(&mut self, target_pts: Duration) -> Result<(), PreviewError> {
300        let (mut decoder, tx) = self.stop_and_seek(target_pts)?;
301        let buffered_thread = Arc::clone(&self.buffered);
302        let cancel_thread = Arc::clone(&self.cancel);
303        let error_tx_thread = self.error_tx.clone();
304
305        self.handle = Some(thread::spawn(move || -> VideoDecoder {
306            // Forward-decode discard: drop frames whose PTS is before target_pts.
307            loop {
308                if cancel_thread.load(Ordering::Acquire) {
309                    return decoder;
310                }
311                match decoder.decode_one() {
312                    Ok(Some(frame)) => {
313                        let pts = if frame.timestamp().is_valid() {
314                            frame.timestamp().as_duration()
315                        } else {
316                            Duration::ZERO
317                        };
318                        if pts >= target_pts {
319                            if tx.send(frame).is_ok() {
320                                buffered_thread.fetch_add(1, Ordering::Relaxed);
321                            } else {
322                                return decoder; // receiver dropped
323                            }
324                            break; // target frame sent; switch to normal loop
325                        }
326                        // Frame is before target — discard and continue.
327                    }
328                    Ok(None) => return decoder, // EOF before target
329                    Err(e) => {
330                        log::warn!("decode error during seek discard error={e}");
331                        let _ = error_tx_thread.send(e.to_string());
332                        return decoder;
333                    }
334                }
335            }
336
337            // Normal decode loop after the discard phase.
338            decode_loop(
339                &mut decoder,
340                &tx,
341                &buffered_thread,
342                &cancel_thread,
343                &error_tx_thread,
344            );
345            decoder
346        }));
347
348        Ok(())
349    }
350
351    /// Coarse seek to the nearest I-frame at or before `target_pts`.
352    ///
353    /// Faster than [`seek`](Self::seek) because it skips the forward-decode
354    /// discard step. The next [`pop_frame`](Self::pop_frame) returns the frame
355    /// at the I-frame position, which may be up to ±½ GOP before `target_pts`
356    /// (typically ±1–2 s for H.264 at default settings).
357    ///
358    /// **Typical use:** call repeatedly while a scrub-bar is being dragged;
359    /// call [`seek`](Self::seek) on mouse-up for frame accuracy.
360    ///
361    /// # Errors
362    ///
363    /// Returns [`PreviewError::SeekFailed`] if the decode thread panicked or
364    /// if the underlying `FFmpeg` seek fails.
365    pub fn seek_coarse(&mut self, target_pts: Duration) -> Result<(), PreviewError> {
366        log::debug!("coarse seek target_pts={target_pts:?}");
367        let (mut decoder, tx) = self.stop_and_seek(target_pts)?;
368        let buffered_thread = Arc::clone(&self.buffered);
369        let cancel_thread = Arc::clone(&self.cancel);
370        let error_tx_thread = self.error_tx.clone();
371
372        // No discard loop — start the normal decode loop directly from the I-frame.
373        self.handle = Some(thread::spawn(move || -> VideoDecoder {
374            decode_loop(
375                &mut decoder,
376                &tx,
377                &buffered_thread,
378                &cancel_thread,
379                &error_tx_thread,
380            );
381            decoder
382        }));
383
384        Ok(())
385    }
386
387    /// Initiate a frame-accurate seek on a background thread and return immediately.
388    ///
389    /// While seeking is in progress, [`pop_frame`](Self::pop_frame) returns
390    /// [`FrameResult::Seeking`] with the last successfully decoded frame as a
391    /// placeholder. Normal [`FrameResult::Frame`] values resume once the seek
392    /// completes.
393    ///
394    /// The seek uses the same frame-accurate strategy as [`seek`](Self::seek):
395    /// `FFmpeg` jumps to the nearest preceding I-frame, then frames before
396    /// `target_pts` are discarded before the first frame is made available.
397    ///
398    /// If called again before the previous seek completes, the new seek
399    /// supersedes the old one; the old worker exits at the next cancel check.
400    ///
401    /// # Panics
402    ///
403    /// Panics (inside the background worker thread) if the previous decode
404    /// thread panicked — an internal bug that should never occur in practice.
405    pub fn seek_async(&mut self, target_pts: Duration) {
406        log::debug!("async seek started target_pts={target_pts:?}");
407
408        self.seeking.store(true, Ordering::Release);
409        self.cancel.store(true, Ordering::Release);
410
411        if let Some(rx) = &self.rx {
412            while rx.try_recv().is_ok() {
413                self.buffered.fetch_sub(1, Ordering::Relaxed);
414            }
415        }
416
417        let old_handle = self.handle.take();
418        drop(self.rx.take());
419
420        let (new_tx, new_rx) = sync_channel(self.capacity);
421        self.rx = Some(new_rx);
422
423        let buffered = Arc::clone(&self.buffered);
424        let cancel = Arc::clone(&self.cancel);
425        let seeking = Arc::clone(&self.seeking);
426        let seek_event_tx = self.seek_tx.clone();
427        let error_tx_async = self.error_tx.clone();
428
429        let worker = thread::spawn(move || -> VideoDecoder {
430            // Recover the decoder from the old thread. In normal operation the
431            // decode thread never panics so this always succeeds.
432            let Some(mut decoder) = old_handle.and_then(|h| h.join().ok()) else {
433                log::warn!(
434                    "seek_async: failed to recover decoder \
435                     target_pts={target_pts:?}"
436                );
437                if !cancel.load(Ordering::Acquire) {
438                    seeking.store(false, Ordering::Release);
439                }
440                // Unreachable: the decode thread never panics in normal operation.
441                unreachable!("seek_async: decode thread panicked; cannot recover decoder");
442            };
443
444            if let Err(e) = decoder.seek(target_pts, SeekMode::Backward) {
445                log::warn!("seek_async seek failed target_pts={target_pts:?} error={e}");
446                if !cancel.load(Ordering::Acquire) {
447                    seeking.store(false, Ordering::Release);
448                }
449                return decoder;
450            }
451
452            buffered.store(0, Ordering::Relaxed);
453            cancel.store(false, Ordering::Release);
454            // Mark seek as complete so pop_frame() transitions to blocking
455            // recv(). Only clear if no newer seek_async has superseded us.
456            if !cancel.load(Ordering::Acquire) {
457                seeking.store(false, Ordering::Release);
458            }
459
460            // Forward-decode discard: skip frames before target_pts.
461            loop {
462                if cancel.load(Ordering::Acquire) {
463                    return decoder;
464                }
465                match decoder.decode_one() {
466                    Ok(Some(frame)) => {
467                        let pts = if frame.timestamp().is_valid() {
468                            frame.timestamp().as_duration()
469                        } else {
470                            Duration::ZERO
471                        };
472                        if pts >= target_pts {
473                            let first_pts = pts;
474                            // Send the event BEFORE pushing the frame so that
475                            // when pop_frame() wakes up the event is already in
476                            // the seek_events channel (avoids a try_recv race).
477                            let _ = seek_event_tx.send(SeekEvent::Completed { pts: first_pts });
478                            if new_tx.send(frame).is_ok() {
479                                buffered.fetch_add(1, Ordering::Relaxed);
480                            } else {
481                                return decoder; // receiver dropped
482                            }
483                            break;
484                        }
485                        // Frame before target — discard.
486                    }
487                    Ok(None) => return decoder, // EOF before target
488                    Err(e) => {
489                        log::warn!("seek_async discard error error={e}");
490                        let _ = error_tx_async.send(e.to_string());
491                        return decoder;
492                    }
493                }
494            }
495
496            decode_loop(&mut decoder, &new_tx, &buffered, &cancel, &error_tx_async);
497            decoder
498        });
499
500        self.handle = Some(worker);
501    }
502
503    /// Shared helper for `seek` and `seek_coarse`.
504    ///
505    /// 1. Signals cancel, drains the channel, joins the thread to recover the decoder.
506    /// 2. Seeks the decoder to the nearest I-frame at or before `target_pts`.
507    /// 3. Resets the buffered counter, creates a fresh channel, clears the cancel flag.
508    ///
509    /// Returns `(decoder, SyncSender)` ready for the caller to spawn a new thread.
510    fn stop_and_seek(
511        &mut self,
512        target_pts: Duration,
513    ) -> Result<(VideoDecoder, SyncSender<VideoFrame>), PreviewError> {
514        // 1. Signal the background thread to exit its decode loop.
515        self.cancel.store(true, Ordering::Release);
516
517        // 2. Drain the channel so the background thread is not blocked on send().
518        if let Some(rx) = &self.rx {
519            while rx.try_recv().is_ok() {
520                self.buffered.fetch_sub(1, Ordering::Relaxed);
521            }
522        }
523
524        // 3. Join the thread to recover the decoder.
525        let mut decoder = self
526            .handle
527            .take()
528            .and_then(|h| h.join().ok())
529            .ok_or_else(|| PreviewError::SeekFailed {
530                target: target_pts,
531                reason: "decode thread unavailable for seek".into(),
532            })?;
533
534        // 4. Seek to the nearest I-frame at or before target_pts.
535        //    avformat_seek_file with AVSEEK_FLAG_BACKWARD and avcodec_flush_buffers
536        //    are handled inside VideoDecoder::seek (ff-decode/video/decoder_inner/seeking.rs).
537        decoder
538            .seek(target_pts, SeekMode::Backward)
539            .map_err(|e| PreviewError::SeekFailed {
540                target: target_pts,
541                reason: e.to_string(),
542            })?;
543
544        // 5. Reset counter, create a fresh channel, clear the cancel flag.
545        self.buffered.store(0, Ordering::Relaxed);
546        let (tx, rx) = sync_channel(self.capacity);
547        self.rx = Some(rx);
548        self.cancel.store(false, Ordering::Release);
549
550        Ok((decoder, tx))
551    }
552}
553
554impl Drop for DecodeBuffer {
555    fn drop(&mut self) {
556        // Signal cancel so the thread exits the decode loop promptly.
557        self.cancel.store(true, Ordering::Release);
558        // Drop the receiver so SyncSender::send() returns Err, unblocking the
559        // thread if it is waiting for space in a full channel.
560        drop(self.rx.take());
561        // Join (ignoring the returned decoder).
562        if let Some(h) = self.handle.take() {
563            let _ = h.join();
564        }
565    }
566}
567
568// ── decode_loop ───────────────────────────────────────────────────────────────
569
570/// Normal decode loop body shared between `build()` and the post-seek thread.
571///
572/// Exits when EOF is reached, a decode error occurs, or the `cancel` flag is set,
573/// or the receiver drops (i.e., `DecodeBuffer` was dropped).
574pub(super) fn decode_loop(
575    decoder: &mut VideoDecoder,
576    tx: &SyncSender<VideoFrame>,
577    buffered: &AtomicUsize,
578    cancel: &AtomicBool,
579    error_tx: &Sender<String>,
580) {
581    loop {
582        if cancel.load(Ordering::Acquire) {
583            break;
584        }
585        match decoder.decode_one() {
586            Ok(Some(frame)) => {
587                if tx.send(frame).is_ok() {
588                    buffered.fetch_add(1, Ordering::Relaxed);
589                } else {
590                    // Receiver was dropped — DecodeBuffer has been dropped.
591                    break;
592                }
593            }
594            Ok(None) => break, // EOF
595            Err(e) => {
596                log::warn!("decode error in background thread error={e}");
597                let _ = error_tx.send(e.to_string());
598                break;
599            }
600        }
601    }
602}
603
604// ── Tests ─────────────────────────────────────────────────────────────────────
605
606#[cfg(test)]
607mod tests {
608    use super::*;
609    use std::path::Path;
610    use std::thread;
611
612    fn test_video_path() -> std::path::PathBuf {
613        std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets/video/gameplay.mp4")
614    }
615
616    #[test]
617    fn decode_buffer_build_should_fail_for_nonexistent_file() {
618        let result = DecodeBuffer::open(Path::new("nonexistent_placeholder.mp4")).build();
619        assert!(
620            result.is_err(),
621            "build() must return Err for a non-existent file"
622        );
623    }
624
625    #[test]
626    fn decode_buffer_open_should_use_default_capacity() {
627        let path = test_video_path();
628        let buf = match DecodeBuffer::open(&path).build() {
629            Ok(buf) => buf,
630            Err(e) => {
631                println!("skipping: video file not available: {e}");
632                return;
633            }
634        };
635        // Buffer starts empty; frames arrive asynchronously.
636        assert_eq!(
637            buf.buffered_frames(),
638            0,
639            "buffer must report 0 before any frames have been consumed"
640        );
641    }
642
643    #[test]
644    fn decode_buffer_pop_frame_should_return_some_then_none_at_eof() {
645        let path = test_video_path();
646        let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
647            Ok(buf) => buf,
648            Err(e) => {
649                println!("skipping: video file not available: {e}");
650                return;
651            }
652        };
653        // Pop at least one frame to confirm the decoder is running.
654        assert!(
655            matches!(buf.pop_frame(), FrameResult::Frame(_)),
656            "pop_frame() must return Frame for a valid video file"
657        );
658    }
659
660    #[test]
661    fn seek_should_reposition_to_target_pts() {
662        let path = test_video_path();
663        let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
664            Ok(buf) => buf,
665            Err(e) => {
666                println!("skipping: video file not available: {e}");
667                return;
668            }
669        };
670
671        // Consume a few frames to advance past the start.
672        for _ in 0..5 {
673            if matches!(buf.pop_frame(), FrameResult::Eof) {
674                println!("skipping: EOF before seek target");
675                return;
676            }
677        }
678
679        let seek_target = Duration::from_secs(1);
680        match buf.seek(seek_target) {
681            Ok(()) => {}
682            Err(e) => {
683                println!("skipping: seek not supported or failed: {e}");
684                return;
685            }
686        }
687
688        // After seek, the first frame's PTS must be at or near the target.
689        let frame = match buf.pop_frame() {
690            FrameResult::Frame(f) => f,
691            FrameResult::Eof | FrameResult::Seeking(_) => {
692                println!("skipping: no frame after seek");
693                return;
694            }
695        };
696
697        if frame.timestamp().is_valid() {
698            let pts = frame.timestamp().as_duration();
699            // Allow ±1 second of tolerance (one GOP) for I-frame alignment.
700            assert!(
701                pts >= seek_target.saturating_sub(Duration::from_secs(1)),
702                "post-seek frame PTS must be near target; target={seek_target:?} pts={pts:?}"
703            );
704        }
705    }
706
707    #[test]
708    fn seek_should_fail_for_stopped_buffer() {
709        // Build with non-existent file → build() fails.
710        // This confirms seek errors are propagated correctly.
711        let result = DecodeBuffer::open(Path::new("nonexistent.mp4")).build();
712        assert!(
713            result.is_err(),
714            "build() must fail for non-existent file (precondition for seek error path)"
715        );
716    }
717
718    #[test]
719    fn seek_async_should_send_completed_event_with_first_frame_pts() {
720        let path = test_video_path();
721        let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
722            Ok(buf) => buf,
723            Err(e) => {
724                println!("skipping: video file not available: {e}");
725                return;
726            }
727        };
728
729        // Pop one frame to establish last_good_frame.
730        match buf.pop_frame() {
731            FrameResult::Frame(_) => {}
732            _ => {
733                println!("skipping: no initial frame available");
734                return;
735            }
736        }
737
738        let seek_target = Duration::from_secs(1);
739        buf.seek_async(seek_target);
740
741        // Drive the seek to completion by polling pop_frame.
742        let deadline = std::time::Instant::now() + Duration::from_secs(10);
743        loop {
744            assert!(
745                std::time::Instant::now() < deadline,
746                "timed out waiting for seek to complete"
747            );
748            match buf.pop_frame() {
749                FrameResult::Frame(_) => break, // seek done, first post-seek frame received
750                FrameResult::Seeking(_) => thread::sleep(Duration::from_millis(10)),
751                FrameResult::Eof => {
752                    println!("skipping: EOF reached during seek event test");
753                    return;
754                }
755            }
756        }
757
758        // After pop_frame returned Frame, SeekEvent::Completed must be in the channel.
759        let event = buf.seek_events().try_recv();
760        assert!(
761            event.is_ok(),
762            "expected SeekEvent::Completed after pop_frame returned Frame; got Err"
763        );
764        if let Ok(SeekEvent::Completed { pts }) = event {
765            assert!(
766                pts >= Duration::ZERO,
767                "seek event pts must be non-negative; got pts={pts:?}"
768            );
769        }
770    }
771
772    #[test]
773    fn seek_async_should_deliver_frames_after_completion() {
774        let path = test_video_path();
775        let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
776            Ok(buf) => buf,
777            Err(e) => {
778                println!("skipping: video file not available: {e}");
779                return;
780            }
781        };
782
783        // Pop one frame to establish last_good_frame.
784        match buf.pop_frame() {
785            FrameResult::Frame(_) => {}
786            _ => {
787                println!("skipping: no initial frame available");
788                return;
789            }
790        }
791
792        let seek_target = Duration::from_secs(1);
793        buf.seek_async(seek_target);
794
795        // Poll until a Frame arrives (seek complete) or we time out.
796        let deadline = std::time::Instant::now() + Duration::from_secs(10);
797        loop {
798            match buf.pop_frame() {
799                FrameResult::Frame(_) => break, // seek completed successfully
800                FrameResult::Seeking(_) => {
801                    thread::sleep(Duration::from_millis(10));
802                }
803                FrameResult::Eof => {
804                    println!("skipping: EOF reached during seek_async test");
805                    return;
806                }
807            }
808            assert!(
809                std::time::Instant::now() < deadline,
810                "seek_async: timed out waiting for seek to complete"
811            );
812        }
813    }
814}