Skip to main content

ff_preview/playback/
decode_buffer.rs

1//! Background-threaded video frame buffer for ff-preview.
2//!
3//! [`DecodeBuffer`] decouples decoder latency from the presentation loop by
4//! running a [`VideoDecoder`] on a background thread and buffering decoded
5//! frames in a bounded ring channel.
6
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::mpsc::{Receiver, Sender, SyncSender, channel, sync_channel};
11use std::thread::{self, JoinHandle};
12use std::time::Duration;
13
14use ff_decode::{SeekMode, VideoDecoder};
15use ff_format::VideoFrame;
16
17use crate::error::PreviewError;
18
19// ── Constants ─────────────────────────────────────────────────────────────────
20
21/// Default ring buffer capacity for [`DecodeBuffer`] (frames).
22const DEFAULT_DECODE_BUFFER_CAPACITY: usize = 8;
23
24// ── FrameResult ───────────────────────────────────────────────────────────────
25
26/// The result of a [`DecodeBuffer::pop_frame`] call.
27///
28/// Callers should match on all three variants; discarding `Seeking` is a
29/// common pattern for scrub-bar UIs that want to display the last good frame
30/// while a seek is in progress.
31#[derive(Debug, Clone)]
32pub enum FrameResult {
33    /// A decoded frame ready for presentation.
34    Frame(VideoFrame),
35    /// A seek is in progress; the wrapped value is the last successfully
36    /// decoded frame, or `None` if no frame has been decoded yet.
37    /// Call [`pop_frame`](DecodeBuffer::pop_frame) again after a short delay
38    /// to check whether seeking has completed.
39    Seeking(Option<VideoFrame>),
40    /// End of file — no more frames will be produced.
41    Eof,
42}
43
44// ── SeekEvent ─────────────────────────────────────────────────────────────────
45
46/// An event emitted by [`DecodeBuffer`] after a
47/// [`seek_async`](DecodeBuffer::seek_async) completes.
48///
49/// Obtain the receiver via [`DecodeBuffer::seek_events`] and poll it with
50/// `try_recv()` (non-blocking) or `recv()` (blocking).
51#[derive(Debug)]
52pub enum SeekEvent {
53    /// The seek initiated by `seek_async` has completed.
54    ///
55    /// `pts` is the presentation timestamp of the first frame available after
56    /// the seek. Events are typically delivered within ~200 ms for local files.
57    Completed { pts: Duration },
58}
59
60// ── DecodeBufferBuilder ───────────────────────────────────────────────────────
61
62/// Builder for [`DecodeBuffer`].
63///
64/// Created via [`DecodeBuffer::open`]; call [`capacity`](Self::capacity) to
65/// override the default ring buffer size, then [`build`](Self::build) to start
66/// the background decode thread and obtain a [`DecodeBuffer`].
67pub struct DecodeBufferBuilder {
68    pub(super) path: PathBuf,
69    pub(super) capacity: usize,
70}
71
72impl DecodeBufferBuilder {
73    /// Set the ring buffer capacity in frames. Default: 8.
74    ///
75    /// The background thread blocks when the buffer is full and resumes as soon
76    /// as the consumer calls [`DecodeBuffer::pop_frame`].
77    #[must_use]
78    pub fn capacity(self, n: usize) -> Self {
79        Self {
80            capacity: n,
81            ..self
82        }
83    }
84
85    /// Build and start the background decode thread.
86    ///
87    /// The thread pre-fills the ring buffer; frames are delivered in
88    /// presentation order. The caller receives a [`DecodeBuffer`] immediately;
89    /// frames become available as the thread decodes them.
90    ///
91    /// # Errors
92    ///
93    /// Returns [`PreviewError`] if the video file cannot be opened or contains
94    /// no decodable video stream.
95    pub fn build(self) -> Result<DecodeBuffer, PreviewError> {
96        // Open decoder on the calling thread for early validation.
97        // Propagates FileNotFound / NoVideoStream / Ffmpeg errors immediately.
98        let mut decoder = VideoDecoder::open(&self.path).build()?;
99
100        let (tx, rx) = sync_channel(self.capacity);
101        let buffered = Arc::new(AtomicUsize::new(0));
102        let cancel = Arc::new(AtomicBool::new(false));
103
104        let buffered_thread = Arc::clone(&buffered);
105        let cancel_thread = Arc::clone(&cancel);
106
107        let handle = thread::spawn(move || -> VideoDecoder {
108            decode_loop(&mut decoder, &tx, &buffered_thread, &cancel_thread);
109            decoder
110        });
111
112        let (seek_tx, seek_rx) = channel::<SeekEvent>();
113
114        Ok(DecodeBuffer {
115            rx: Some(rx),
116            buffered,
117            handle: Some(handle),
118            cancel,
119            capacity: self.capacity,
120            seeking: Arc::new(AtomicBool::new(false)),
121            last_good_frame: None,
122            seek_tx,
123            seek_rx,
124        })
125    }
126}
127
128// ── DecodeBuffer ──────────────────────────────────────────────────────────────
129
130/// Pre-decodes frames from a video file into a ring buffer on a background thread.
131///
132/// `DecodeBuffer` decouples decoder latency from the presentation loop: the
133/// background thread keeps the buffer filled so [`pop_frame`](Self::pop_frame)
134/// can return the next frame without waiting for the decoder.
135///
136/// The default ring buffer capacity is 8 frames. Use
137/// [`open`](Self::open) → [`capacity`](DecodeBufferBuilder::capacity) →
138/// [`build`](DecodeBufferBuilder::build) to configure a different size.
139///
140/// # Usage
141///
142/// ```ignore
143/// let mut buf = DecodeBuffer::open(Path::new("clip.mp4"))
144///     .capacity(16)
145///     .build()?;
146///
147/// while let Some(frame) = buf.pop_frame() {
148///     // present frame…
149/// }
150/// ```
151///
152/// # Thread safety
153///
154/// `DecodeBuffer` is `Send` but **not** `Sync`; it must be owned by a single
155/// consumer. The internal [`std::sync::mpsc::Receiver`] enforces this.
156pub struct DecodeBuffer {
157    /// `Option` so `Drop` can take and drop the receiver before joining the thread.
158    rx: Option<Receiver<VideoFrame>>,
159    /// Approximate count of frames waiting in the ring buffer.
160    /// Incremented by the background thread on send; decremented by `pop_frame`.
161    buffered: Arc<AtomicUsize>,
162    /// Background decode thread handle. Returns the decoder on exit so `seek()`
163    /// can recover it without reopening the file.
164    handle: Option<JoinHandle<VideoDecoder>>,
165    /// Set to `true` to ask the background thread to exit its decode loop.
166    cancel: Arc<AtomicBool>,
167    /// Channel capacity; needed by `seek()` to create a replacement channel.
168    capacity: usize,
169    /// Set to `true` while an async seek is in progress.
170    seeking: Arc<AtomicBool>,
171    /// The last frame returned by `pop_frame`; replayed as a placeholder
172    /// while `seeking` is true.
173    last_good_frame: Option<VideoFrame>,
174    /// Sender side of the seek event channel; cloned into each seek worker.
175    seek_tx: Sender<SeekEvent>,
176    /// Receiver for seek completion events; exposed via `seek_events()`.
177    seek_rx: Receiver<SeekEvent>,
178}
179
180impl DecodeBuffer {
181    /// Open the video at `path` and return a builder for configuring the buffer.
182    ///
183    /// Chain with [`DecodeBufferBuilder::capacity`] and
184    /// [`DecodeBufferBuilder::build`] to start decoding.
185    #[must_use]
186    pub fn open(path: &Path) -> DecodeBufferBuilder {
187        DecodeBufferBuilder {
188            path: path.to_path_buf(),
189            capacity: DEFAULT_DECODE_BUFFER_CAPACITY,
190        }
191    }
192
193    /// Pop the next decoded frame.
194    ///
195    /// - Returns [`FrameResult::Seeking`] immediately (non-blocking) while a
196    ///   [`seek_async`](Self::seek_async) is in progress.
197    /// - Returns [`FrameResult::Frame`] when a frame is available; blocks until
198    ///   the background thread produces one.
199    /// - Returns [`FrameResult::Eof`] when the background thread reaches end of
200    ///   file or the channel is disconnected.
201    #[must_use]
202    pub fn pop_frame(&mut self) -> FrameResult {
203        if self.seeking.load(Ordering::Acquire) {
204            return FrameResult::Seeking(self.last_good_frame.clone());
205        }
206        match self.rx.as_ref().and_then(|rx| rx.recv().ok()) {
207            Some(frame) => {
208                self.buffered.fetch_sub(1, Ordering::Relaxed);
209                self.last_good_frame = Some(frame.clone());
210                FrameResult::Frame(frame)
211            }
212            None => FrameResult::Eof,
213        }
214    }
215
216    /// Returns an approximation of the number of decoded frames currently
217    /// waiting in the buffer.
218    ///
219    /// This value is advisory only; it may lag the actual buffer state by one
220    /// scheduling quantum. Use it for diagnostics, not flow control.
221    #[must_use]
222    pub fn buffered_frames(&self) -> usize {
223        self.buffered.load(Ordering::Relaxed)
224    }
225
226    /// Returns a reference to the seek event receiver.
227    ///
228    /// After calling [`seek_async`](Self::seek_async), poll this receiver to
229    /// detect when the seek has completed:
230    /// - `try_recv()` — non-blocking; returns `Err(TryRecvError::Empty)` while
231    ///   the seek is still in progress.
232    /// - `recv()` — blocks until the seek finishes.
233    ///
234    /// Events are delivered within ~200 ms for local files.
235    /// Unconsumed events accumulate in the channel (one per completed seek).
236    #[must_use]
237    pub fn seek_events(&self) -> &Receiver<SeekEvent> {
238        &self.seek_rx
239    }
240
241    /// Frame-accurate seek to `target_pts`.
242    ///
243    /// Stops the background decode thread, seeks the underlying decoder to the
244    /// nearest preceding I-frame (`AVSEEK_FLAG_BACKWARD` + codec buffer flush),
245    /// then restarts the thread. The restarted thread discards frames until
246    /// `PTS ≥ target_pts` before making them available via [`pop_frame`](Self::pop_frame).
247    ///
248    /// Blocks until the thread has stopped and the seek has been accepted by
249    /// the decoder. Frames are filled asynchronously after the method returns.
250    ///
251    /// # Errors
252    ///
253    /// Returns [`PreviewError::SeekFailed`] if the decode thread panicked or
254    /// if the underlying `FFmpeg` seek fails.
255    pub fn seek(&mut self, target_pts: Duration) -> Result<(), PreviewError> {
256        let (mut decoder, tx) = self.stop_and_seek(target_pts)?;
257        let buffered_thread = Arc::clone(&self.buffered);
258        let cancel_thread = Arc::clone(&self.cancel);
259
260        self.handle = Some(thread::spawn(move || -> VideoDecoder {
261            // Forward-decode discard: drop frames whose PTS is before target_pts.
262            loop {
263                if cancel_thread.load(Ordering::Acquire) {
264                    return decoder;
265                }
266                match decoder.decode_one() {
267                    Ok(Some(frame)) => {
268                        let pts = if frame.timestamp().is_valid() {
269                            frame.timestamp().as_duration()
270                        } else {
271                            Duration::ZERO
272                        };
273                        if pts >= target_pts {
274                            if tx.send(frame).is_ok() {
275                                buffered_thread.fetch_add(1, Ordering::Relaxed);
276                            } else {
277                                return decoder; // receiver dropped
278                            }
279                            break; // target frame sent; switch to normal loop
280                        }
281                        // Frame is before target — discard and continue.
282                    }
283                    Ok(None) => return decoder, // EOF before target
284                    Err(e) => {
285                        log::warn!("decode error during seek discard error={e}");
286                        return decoder;
287                    }
288                }
289            }
290
291            // Normal decode loop after the discard phase.
292            decode_loop(&mut decoder, &tx, &buffered_thread, &cancel_thread);
293            decoder
294        }));
295
296        Ok(())
297    }
298
299    /// Coarse seek to the nearest I-frame at or before `target_pts`.
300    ///
301    /// Faster than [`seek`](Self::seek) because it skips the forward-decode
302    /// discard step. The next [`pop_frame`](Self::pop_frame) returns the frame
303    /// at the I-frame position, which may be up to ±½ GOP before `target_pts`
304    /// (typically ±1–2 s for H.264 at default settings).
305    ///
306    /// **Typical use:** call repeatedly while a scrub-bar is being dragged;
307    /// call [`seek`](Self::seek) on mouse-up for frame accuracy.
308    ///
309    /// # Errors
310    ///
311    /// Returns [`PreviewError::SeekFailed`] if the decode thread panicked or
312    /// if the underlying `FFmpeg` seek fails.
313    pub fn seek_coarse(&mut self, target_pts: Duration) -> Result<(), PreviewError> {
314        log::debug!("coarse seek target_pts={target_pts:?}");
315        let (mut decoder, tx) = self.stop_and_seek(target_pts)?;
316        let buffered_thread = Arc::clone(&self.buffered);
317        let cancel_thread = Arc::clone(&self.cancel);
318
319        // No discard loop — start the normal decode loop directly from the I-frame.
320        self.handle = Some(thread::spawn(move || -> VideoDecoder {
321            decode_loop(&mut decoder, &tx, &buffered_thread, &cancel_thread);
322            decoder
323        }));
324
325        Ok(())
326    }
327
328    /// Initiate a frame-accurate seek on a background thread and return immediately.
329    ///
330    /// While seeking is in progress, [`pop_frame`](Self::pop_frame) returns
331    /// [`FrameResult::Seeking`] with the last successfully decoded frame as a
332    /// placeholder. Normal [`FrameResult::Frame`] values resume once the seek
333    /// completes.
334    ///
335    /// The seek uses the same frame-accurate strategy as [`seek`](Self::seek):
336    /// `FFmpeg` jumps to the nearest preceding I-frame, then frames before
337    /// `target_pts` are discarded before the first frame is made available.
338    ///
339    /// If called again before the previous seek completes, the new seek
340    /// supersedes the old one; the old worker exits at the next cancel check.
341    ///
342    /// # Panics
343    ///
344    /// Panics (inside the background worker thread) if the previous decode
345    /// thread panicked — an internal bug that should never occur in practice.
346    pub fn seek_async(&mut self, target_pts: Duration) {
347        log::debug!("async seek started target_pts={target_pts:?}");
348
349        self.seeking.store(true, Ordering::Release);
350        self.cancel.store(true, Ordering::Release);
351
352        if let Some(rx) = &self.rx {
353            while rx.try_recv().is_ok() {
354                self.buffered.fetch_sub(1, Ordering::Relaxed);
355            }
356        }
357
358        let old_handle = self.handle.take();
359        drop(self.rx.take());
360
361        let (new_tx, new_rx) = sync_channel(self.capacity);
362        self.rx = Some(new_rx);
363
364        let buffered = Arc::clone(&self.buffered);
365        let cancel = Arc::clone(&self.cancel);
366        let seeking = Arc::clone(&self.seeking);
367        let seek_event_tx = self.seek_tx.clone();
368
369        let worker = thread::spawn(move || -> VideoDecoder {
370            // Recover the decoder from the old thread. In normal operation the
371            // decode thread never panics so this always succeeds.
372            let Some(mut decoder) = old_handle.and_then(|h| h.join().ok()) else {
373                log::warn!(
374                    "seek_async: failed to recover decoder \
375                     target_pts={target_pts:?}"
376                );
377                if !cancel.load(Ordering::Acquire) {
378                    seeking.store(false, Ordering::Release);
379                }
380                // Unreachable: the decode thread never panics in normal operation.
381                unreachable!("seek_async: decode thread panicked; cannot recover decoder");
382            };
383
384            if let Err(e) = decoder.seek(target_pts, SeekMode::Backward) {
385                log::warn!("seek_async seek failed target_pts={target_pts:?} error={e}");
386                if !cancel.load(Ordering::Acquire) {
387                    seeking.store(false, Ordering::Release);
388                }
389                return decoder;
390            }
391
392            buffered.store(0, Ordering::Relaxed);
393            cancel.store(false, Ordering::Release);
394            // Mark seek as complete so pop_frame() transitions to blocking
395            // recv(). Only clear if no newer seek_async has superseded us.
396            if !cancel.load(Ordering::Acquire) {
397                seeking.store(false, Ordering::Release);
398            }
399
400            // Forward-decode discard: skip frames before target_pts.
401            loop {
402                if cancel.load(Ordering::Acquire) {
403                    return decoder;
404                }
405                match decoder.decode_one() {
406                    Ok(Some(frame)) => {
407                        let pts = if frame.timestamp().is_valid() {
408                            frame.timestamp().as_duration()
409                        } else {
410                            Duration::ZERO
411                        };
412                        if pts >= target_pts {
413                            let first_pts = pts;
414                            // Send the event BEFORE pushing the frame so that
415                            // when pop_frame() wakes up the event is already in
416                            // the seek_events channel (avoids a try_recv race).
417                            let _ = seek_event_tx.send(SeekEvent::Completed { pts: first_pts });
418                            if new_tx.send(frame).is_ok() {
419                                buffered.fetch_add(1, Ordering::Relaxed);
420                            } else {
421                                return decoder; // receiver dropped
422                            }
423                            break;
424                        }
425                        // Frame before target — discard.
426                    }
427                    Ok(None) => return decoder, // EOF before target
428                    Err(e) => {
429                        log::warn!("seek_async discard error error={e}");
430                        return decoder;
431                    }
432                }
433            }
434
435            decode_loop(&mut decoder, &new_tx, &buffered, &cancel);
436            decoder
437        });
438
439        self.handle = Some(worker);
440    }
441
442    /// Shared helper for `seek` and `seek_coarse`.
443    ///
444    /// 1. Signals cancel, drains the channel, joins the thread to recover the decoder.
445    /// 2. Seeks the decoder to the nearest I-frame at or before `target_pts`.
446    /// 3. Resets the buffered counter, creates a fresh channel, clears the cancel flag.
447    ///
448    /// Returns `(decoder, SyncSender)` ready for the caller to spawn a new thread.
449    fn stop_and_seek(
450        &mut self,
451        target_pts: Duration,
452    ) -> Result<(VideoDecoder, SyncSender<VideoFrame>), PreviewError> {
453        // 1. Signal the background thread to exit its decode loop.
454        self.cancel.store(true, Ordering::Release);
455
456        // 2. Drain the channel so the background thread is not blocked on send().
457        if let Some(rx) = &self.rx {
458            while rx.try_recv().is_ok() {
459                self.buffered.fetch_sub(1, Ordering::Relaxed);
460            }
461        }
462
463        // 3. Join the thread to recover the decoder.
464        let mut decoder = self
465            .handle
466            .take()
467            .and_then(|h| h.join().ok())
468            .ok_or_else(|| PreviewError::SeekFailed {
469                target: target_pts,
470                reason: "decode thread unavailable for seek".into(),
471            })?;
472
473        // 4. Seek to the nearest I-frame at or before target_pts.
474        //    avformat_seek_file with AVSEEK_FLAG_BACKWARD and avcodec_flush_buffers
475        //    are handled inside VideoDecoder::seek (ff-decode/video/decoder_inner/seeking.rs).
476        decoder
477            .seek(target_pts, SeekMode::Backward)
478            .map_err(|e| PreviewError::SeekFailed {
479                target: target_pts,
480                reason: e.to_string(),
481            })?;
482
483        // 5. Reset counter, create a fresh channel, clear the cancel flag.
484        self.buffered.store(0, Ordering::Relaxed);
485        let (tx, rx) = sync_channel(self.capacity);
486        self.rx = Some(rx);
487        self.cancel.store(false, Ordering::Release);
488
489        Ok((decoder, tx))
490    }
491}
492
493impl Drop for DecodeBuffer {
494    fn drop(&mut self) {
495        // Signal cancel so the thread exits the decode loop promptly.
496        self.cancel.store(true, Ordering::Release);
497        // Drop the receiver so SyncSender::send() returns Err, unblocking the
498        // thread if it is waiting for space in a full channel.
499        drop(self.rx.take());
500        // Join (ignoring the returned decoder).
501        if let Some(h) = self.handle.take() {
502            let _ = h.join();
503        }
504    }
505}
506
507// ── decode_loop ───────────────────────────────────────────────────────────────
508
509/// Normal decode loop body shared between `build()` and the post-seek thread.
510///
511/// Exits when EOF is reached, a decode error occurs, or the `cancel` flag is set,
512/// or the receiver drops (i.e., `DecodeBuffer` was dropped).
513pub(super) fn decode_loop(
514    decoder: &mut VideoDecoder,
515    tx: &SyncSender<VideoFrame>,
516    buffered: &AtomicUsize,
517    cancel: &AtomicBool,
518) {
519    loop {
520        if cancel.load(Ordering::Acquire) {
521            break;
522        }
523        match decoder.decode_one() {
524            Ok(Some(frame)) => {
525                if tx.send(frame).is_ok() {
526                    buffered.fetch_add(1, Ordering::Relaxed);
527                } else {
528                    // Receiver was dropped — DecodeBuffer has been dropped.
529                    break;
530                }
531            }
532            Ok(None) => break, // EOF
533            Err(e) => {
534                log::warn!("decode error in background thread error={e}");
535                break;
536            }
537        }
538    }
539}
540
541// ── Tests ─────────────────────────────────────────────────────────────────────
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546    use std::path::Path;
547    use std::thread;
548
549    fn test_video_path() -> std::path::PathBuf {
550        std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets/video/gameplay.mp4")
551    }
552
553    #[test]
554    fn decode_buffer_build_should_fail_for_nonexistent_file() {
555        let result = DecodeBuffer::open(Path::new("nonexistent_placeholder.mp4")).build();
556        assert!(
557            result.is_err(),
558            "build() must return Err for a non-existent file"
559        );
560    }
561
562    #[test]
563    fn decode_buffer_open_should_use_default_capacity() {
564        let path = test_video_path();
565        let buf = match DecodeBuffer::open(&path).build() {
566            Ok(buf) => buf,
567            Err(e) => {
568                println!("skipping: video file not available: {e}");
569                return;
570            }
571        };
572        // Buffer starts empty; frames arrive asynchronously.
573        assert_eq!(
574            buf.buffered_frames(),
575            0,
576            "buffer must report 0 before any frames have been consumed"
577        );
578    }
579
580    #[test]
581    fn decode_buffer_pop_frame_should_return_some_then_none_at_eof() {
582        let path = test_video_path();
583        let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
584            Ok(buf) => buf,
585            Err(e) => {
586                println!("skipping: video file not available: {e}");
587                return;
588            }
589        };
590        // Pop at least one frame to confirm the decoder is running.
591        assert!(
592            matches!(buf.pop_frame(), FrameResult::Frame(_)),
593            "pop_frame() must return Frame for a valid video file"
594        );
595    }
596
597    #[test]
598    fn seek_should_reposition_to_target_pts() {
599        let path = test_video_path();
600        let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
601            Ok(buf) => buf,
602            Err(e) => {
603                println!("skipping: video file not available: {e}");
604                return;
605            }
606        };
607
608        // Consume a few frames to advance past the start.
609        for _ in 0..5 {
610            if matches!(buf.pop_frame(), FrameResult::Eof) {
611                println!("skipping: EOF before seek target");
612                return;
613            }
614        }
615
616        let seek_target = Duration::from_secs(1);
617        match buf.seek(seek_target) {
618            Ok(()) => {}
619            Err(e) => {
620                println!("skipping: seek not supported or failed: {e}");
621                return;
622            }
623        }
624
625        // After seek, the first frame's PTS must be at or near the target.
626        let frame = match buf.pop_frame() {
627            FrameResult::Frame(f) => f,
628            FrameResult::Eof | FrameResult::Seeking(_) => {
629                println!("skipping: no frame after seek");
630                return;
631            }
632        };
633
634        if frame.timestamp().is_valid() {
635            let pts = frame.timestamp().as_duration();
636            // Allow ±1 second of tolerance (one GOP) for I-frame alignment.
637            assert!(
638                pts >= seek_target.saturating_sub(Duration::from_secs(1)),
639                "post-seek frame PTS must be near target; target={seek_target:?} pts={pts:?}"
640            );
641        }
642    }
643
644    #[test]
645    fn seek_should_fail_for_stopped_buffer() {
646        // Build with non-existent file → build() fails.
647        // This confirms seek errors are propagated correctly.
648        let result = DecodeBuffer::open(Path::new("nonexistent.mp4")).build();
649        assert!(
650            result.is_err(),
651            "build() must fail for non-existent file (precondition for seek error path)"
652        );
653    }
654
655    #[test]
656    fn seek_async_should_send_completed_event_with_first_frame_pts() {
657        let path = test_video_path();
658        let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
659            Ok(buf) => buf,
660            Err(e) => {
661                println!("skipping: video file not available: {e}");
662                return;
663            }
664        };
665
666        // Pop one frame to establish last_good_frame.
667        match buf.pop_frame() {
668            FrameResult::Frame(_) => {}
669            _ => {
670                println!("skipping: no initial frame available");
671                return;
672            }
673        }
674
675        let seek_target = Duration::from_secs(1);
676        buf.seek_async(seek_target);
677
678        // Drive the seek to completion by polling pop_frame.
679        let deadline = std::time::Instant::now() + Duration::from_secs(10);
680        loop {
681            assert!(
682                std::time::Instant::now() < deadline,
683                "timed out waiting for seek to complete"
684            );
685            match buf.pop_frame() {
686                FrameResult::Frame(_) => break, // seek done, first post-seek frame received
687                FrameResult::Seeking(_) => thread::sleep(Duration::from_millis(10)),
688                FrameResult::Eof => {
689                    println!("skipping: EOF reached during seek event test");
690                    return;
691                }
692            }
693        }
694
695        // After pop_frame returned Frame, SeekEvent::Completed must be in the channel.
696        let event = buf.seek_events().try_recv();
697        assert!(
698            event.is_ok(),
699            "expected SeekEvent::Completed after pop_frame returned Frame; got Err"
700        );
701        if let Ok(SeekEvent::Completed { pts }) = event {
702            assert!(
703                pts >= Duration::ZERO,
704                "seek event pts must be non-negative; got pts={pts:?}"
705            );
706        }
707    }
708
709    #[test]
710    fn seek_async_should_deliver_frames_after_completion() {
711        let path = test_video_path();
712        let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
713            Ok(buf) => buf,
714            Err(e) => {
715                println!("skipping: video file not available: {e}");
716                return;
717            }
718        };
719
720        // Pop one frame to establish last_good_frame.
721        match buf.pop_frame() {
722            FrameResult::Frame(_) => {}
723            _ => {
724                println!("skipping: no initial frame available");
725                return;
726            }
727        }
728
729        let seek_target = Duration::from_secs(1);
730        buf.seek_async(seek_target);
731
732        // Poll until a Frame arrives (seek complete) or we time out.
733        let deadline = std::time::Instant::now() + Duration::from_secs(10);
734        loop {
735            match buf.pop_frame() {
736                FrameResult::Frame(_) => break, // seek completed successfully
737                FrameResult::Seeking(_) => {
738                    thread::sleep(Duration::from_millis(10));
739                }
740                FrameResult::Eof => {
741                    println!("skipping: EOF reached during seek_async test");
742                    return;
743                }
744            }
745            assert!(
746                std::time::Instant::now() < deadline,
747                "seek_async: timed out waiting for seek to complete"
748            );
749        }
750    }
751}