ff_preview/playback/decode_buffer.rs
1//! Background-threaded video frame buffer for ff-preview.
2//!
3//! [`DecodeBuffer`] decouples decoder latency from the presentation loop by
4//! running a [`VideoDecoder`] on a background thread and buffering decoded
5//! frames in a bounded ring channel.
6
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::mpsc::{Receiver, Sender, SyncSender, channel, sync_channel};
11use std::thread::{self, JoinHandle};
12use std::time::Duration;
13
14use ff_decode::{SeekMode, VideoDecoder};
15use ff_format::VideoFrame;
16
17use crate::error::PreviewError;
18
19// ── Constants ─────────────────────────────────────────────────────────────────
20
21/// Default ring buffer capacity for [`DecodeBuffer`] (frames).
22const DEFAULT_DECODE_BUFFER_CAPACITY: usize = 8;
23
24// ── FrameResult ───────────────────────────────────────────────────────────────
25
26/// The result of a [`DecodeBuffer::pop_frame`] call.
27///
28/// Callers should match on all three variants; discarding `Seeking` is a
29/// common pattern for scrub-bar UIs that want to display the last good frame
30/// while a seek is in progress.
31#[derive(Debug, Clone)]
32pub enum FrameResult {
33 /// A decoded frame ready for presentation.
34 Frame(VideoFrame),
35 /// A seek is in progress; the wrapped value is the last successfully
36 /// decoded frame, or `None` if no frame has been decoded yet.
37 /// Call [`pop_frame`](DecodeBuffer::pop_frame) again after a short delay
38 /// to check whether seeking has completed.
39 Seeking(Option<VideoFrame>),
40 /// End of file — no more frames will be produced.
41 Eof,
42}
43
44// ── SeekEvent ─────────────────────────────────────────────────────────────────
45
46/// An event emitted by [`DecodeBuffer`] after a
47/// [`seek_async`](DecodeBuffer::seek_async) completes.
48///
49/// Obtain the receiver via [`DecodeBuffer::seek_events`] and poll it with
50/// `try_recv()` (non-blocking) or `recv()` (blocking).
51#[derive(Debug)]
52pub enum SeekEvent {
53 /// The seek initiated by `seek_async` has completed.
54 ///
55 /// `pts` is the presentation timestamp of the first frame available after
56 /// the seek. Events are typically delivered within ~200 ms for local files.
57 Completed { pts: Duration },
58}
59
60// ── DecodeBufferBuilder ───────────────────────────────────────────────────────
61
62/// Builder for [`DecodeBuffer`].
63///
64/// Created via [`DecodeBuffer::open`]; call [`capacity`](Self::capacity) to
65/// override the default ring buffer size, then [`build`](Self::build) to start
66/// the background decode thread and obtain a [`DecodeBuffer`].
67pub struct DecodeBufferBuilder {
68 pub(super) path: PathBuf,
69 pub(super) capacity: usize,
70}
71
72impl DecodeBufferBuilder {
73 /// Set the ring buffer capacity in frames. Default: 8.
74 ///
75 /// The background thread blocks when the buffer is full and resumes as soon
76 /// as the consumer calls [`DecodeBuffer::pop_frame`].
77 #[must_use]
78 pub fn capacity(self, n: usize) -> Self {
79 Self {
80 capacity: n,
81 ..self
82 }
83 }
84
85 /// Build and start the background decode thread.
86 ///
87 /// The thread pre-fills the ring buffer; frames are delivered in
88 /// presentation order. The caller receives a [`DecodeBuffer`] immediately;
89 /// frames become available as the thread decodes them.
90 ///
91 /// # Errors
92 ///
93 /// Returns [`PreviewError`] if the video file cannot be opened or contains
94 /// no decodable video stream.
95 pub fn build(self) -> Result<DecodeBuffer, PreviewError> {
96 // Open decoder on the calling thread for early validation.
97 // Propagates FileNotFound / NoVideoStream / Ffmpeg errors immediately.
98 let mut decoder = VideoDecoder::open(&self.path).build()?;
99
100 let (tx, rx) = sync_channel(self.capacity);
101 let buffered = Arc::new(AtomicUsize::new(0));
102 let cancel = Arc::new(AtomicBool::new(false));
103
104 let buffered_thread = Arc::clone(&buffered);
105 let cancel_thread = Arc::clone(&cancel);
106
107 let handle = thread::spawn(move || -> VideoDecoder {
108 decode_loop(&mut decoder, &tx, &buffered_thread, &cancel_thread);
109 decoder
110 });
111
112 let (seek_tx, seek_rx) = channel::<SeekEvent>();
113
114 Ok(DecodeBuffer {
115 rx: Some(rx),
116 buffered,
117 handle: Some(handle),
118 cancel,
119 capacity: self.capacity,
120 seeking: Arc::new(AtomicBool::new(false)),
121 last_good_frame: None,
122 seek_tx,
123 seek_rx,
124 })
125 }
126}
127
128// ── DecodeBuffer ──────────────────────────────────────────────────────────────
129
130/// Pre-decodes frames from a video file into a ring buffer on a background thread.
131///
132/// `DecodeBuffer` decouples decoder latency from the presentation loop: the
133/// background thread keeps the buffer filled so [`pop_frame`](Self::pop_frame)
134/// can return the next frame without waiting for the decoder.
135///
136/// The default ring buffer capacity is 8 frames. Use
137/// [`open`](Self::open) → [`capacity`](DecodeBufferBuilder::capacity) →
138/// [`build`](DecodeBufferBuilder::build) to configure a different size.
139///
140/// # Usage
141///
142/// ```ignore
143/// let mut buf = DecodeBuffer::open(Path::new("clip.mp4"))
144/// .capacity(16)
145/// .build()?;
146///
147/// while let Some(frame) = buf.pop_frame() {
148/// // present frame…
149/// }
150/// ```
151///
152/// # Thread safety
153///
154/// `DecodeBuffer` is `Send` but **not** `Sync`; it must be owned by a single
155/// consumer. The internal [`std::sync::mpsc::Receiver`] enforces this.
156pub struct DecodeBuffer {
157 /// `Option` so `Drop` can take and drop the receiver before joining the thread.
158 rx: Option<Receiver<VideoFrame>>,
159 /// Approximate count of frames waiting in the ring buffer.
160 /// Incremented by the background thread on send; decremented by `pop_frame`.
161 buffered: Arc<AtomicUsize>,
162 /// Background decode thread handle. Returns the decoder on exit so `seek()`
163 /// can recover it without reopening the file.
164 handle: Option<JoinHandle<VideoDecoder>>,
165 /// Set to `true` to ask the background thread to exit its decode loop.
166 cancel: Arc<AtomicBool>,
167 /// Channel capacity; needed by `seek()` to create a replacement channel.
168 capacity: usize,
169 /// Set to `true` while an async seek is in progress.
170 seeking: Arc<AtomicBool>,
171 /// The last frame returned by `pop_frame`; replayed as a placeholder
172 /// while `seeking` is true.
173 last_good_frame: Option<VideoFrame>,
174 /// Sender side of the seek event channel; cloned into each seek worker.
175 seek_tx: Sender<SeekEvent>,
176 /// Receiver for seek completion events; exposed via `seek_events()`.
177 seek_rx: Receiver<SeekEvent>,
178}
179
180impl DecodeBuffer {
181 /// Open the video at `path` and return a builder for configuring the buffer.
182 ///
183 /// Chain with [`DecodeBufferBuilder::capacity`] and
184 /// [`DecodeBufferBuilder::build`] to start decoding.
185 #[must_use]
186 pub fn open(path: &Path) -> DecodeBufferBuilder {
187 DecodeBufferBuilder {
188 path: path.to_path_buf(),
189 capacity: DEFAULT_DECODE_BUFFER_CAPACITY,
190 }
191 }
192
193 /// Pop the next decoded frame.
194 ///
195 /// - Returns [`FrameResult::Seeking`] immediately (non-blocking) while a
196 /// [`seek_async`](Self::seek_async) is in progress.
197 /// - Returns [`FrameResult::Frame`] when a frame is available; blocks until
198 /// the background thread produces one.
199 /// - Returns [`FrameResult::Eof`] when the background thread reaches end of
200 /// file or the channel is disconnected.
201 #[must_use]
202 pub fn pop_frame(&mut self) -> FrameResult {
203 if self.seeking.load(Ordering::Acquire) {
204 return FrameResult::Seeking(self.last_good_frame.clone());
205 }
206 match self.rx.as_ref().and_then(|rx| rx.recv().ok()) {
207 Some(frame) => {
208 self.buffered.fetch_sub(1, Ordering::Relaxed);
209 self.last_good_frame = Some(frame.clone());
210 FrameResult::Frame(frame)
211 }
212 None => FrameResult::Eof,
213 }
214 }
215
216 /// Returns an approximation of the number of decoded frames currently
217 /// waiting in the buffer.
218 ///
219 /// This value is advisory only; it may lag the actual buffer state by one
220 /// scheduling quantum. Use it for diagnostics, not flow control.
221 #[must_use]
222 pub fn buffered_frames(&self) -> usize {
223 self.buffered.load(Ordering::Relaxed)
224 }
225
226 /// Returns a reference to the seek event receiver.
227 ///
228 /// After calling [`seek_async`](Self::seek_async), poll this receiver to
229 /// detect when the seek has completed:
230 /// - `try_recv()` — non-blocking; returns `Err(TryRecvError::Empty)` while
231 /// the seek is still in progress.
232 /// - `recv()` — blocks until the seek finishes.
233 ///
234 /// Events are delivered within ~200 ms for local files.
235 /// Unconsumed events accumulate in the channel (one per completed seek).
236 #[must_use]
237 pub fn seek_events(&self) -> &Receiver<SeekEvent> {
238 &self.seek_rx
239 }
240
241 /// Frame-accurate seek to `target_pts`.
242 ///
243 /// Stops the background decode thread, seeks the underlying decoder to the
244 /// nearest preceding I-frame (`AVSEEK_FLAG_BACKWARD` + codec buffer flush),
245 /// then restarts the thread. The restarted thread discards frames until
246 /// `PTS ≥ target_pts` before making them available via [`pop_frame`](Self::pop_frame).
247 ///
248 /// Blocks until the thread has stopped and the seek has been accepted by
249 /// the decoder. Frames are filled asynchronously after the method returns.
250 ///
251 /// # Errors
252 ///
253 /// Returns [`PreviewError::SeekFailed`] if the decode thread panicked or
254 /// if the underlying `FFmpeg` seek fails.
255 pub fn seek(&mut self, target_pts: Duration) -> Result<(), PreviewError> {
256 let (mut decoder, tx) = self.stop_and_seek(target_pts)?;
257 let buffered_thread = Arc::clone(&self.buffered);
258 let cancel_thread = Arc::clone(&self.cancel);
259
260 self.handle = Some(thread::spawn(move || -> VideoDecoder {
261 // Forward-decode discard: drop frames whose PTS is before target_pts.
262 loop {
263 if cancel_thread.load(Ordering::Acquire) {
264 return decoder;
265 }
266 match decoder.decode_one() {
267 Ok(Some(frame)) => {
268 let pts = if frame.timestamp().is_valid() {
269 frame.timestamp().as_duration()
270 } else {
271 Duration::ZERO
272 };
273 if pts >= target_pts {
274 if tx.send(frame).is_ok() {
275 buffered_thread.fetch_add(1, Ordering::Relaxed);
276 } else {
277 return decoder; // receiver dropped
278 }
279 break; // target frame sent; switch to normal loop
280 }
281 // Frame is before target — discard and continue.
282 }
283 Ok(None) => return decoder, // EOF before target
284 Err(e) => {
285 log::warn!("decode error during seek discard error={e}");
286 return decoder;
287 }
288 }
289 }
290
291 // Normal decode loop after the discard phase.
292 decode_loop(&mut decoder, &tx, &buffered_thread, &cancel_thread);
293 decoder
294 }));
295
296 Ok(())
297 }
298
299 /// Coarse seek to the nearest I-frame at or before `target_pts`.
300 ///
301 /// Faster than [`seek`](Self::seek) because it skips the forward-decode
302 /// discard step. The next [`pop_frame`](Self::pop_frame) returns the frame
303 /// at the I-frame position, which may be up to ±½ GOP before `target_pts`
304 /// (typically ±1–2 s for H.264 at default settings).
305 ///
306 /// **Typical use:** call repeatedly while a scrub-bar is being dragged;
307 /// call [`seek`](Self::seek) on mouse-up for frame accuracy.
308 ///
309 /// # Errors
310 ///
311 /// Returns [`PreviewError::SeekFailed`] if the decode thread panicked or
312 /// if the underlying `FFmpeg` seek fails.
313 pub fn seek_coarse(&mut self, target_pts: Duration) -> Result<(), PreviewError> {
314 log::debug!("coarse seek target_pts={target_pts:?}");
315 let (mut decoder, tx) = self.stop_and_seek(target_pts)?;
316 let buffered_thread = Arc::clone(&self.buffered);
317 let cancel_thread = Arc::clone(&self.cancel);
318
319 // No discard loop — start the normal decode loop directly from the I-frame.
320 self.handle = Some(thread::spawn(move || -> VideoDecoder {
321 decode_loop(&mut decoder, &tx, &buffered_thread, &cancel_thread);
322 decoder
323 }));
324
325 Ok(())
326 }
327
328 /// Initiate a frame-accurate seek on a background thread and return immediately.
329 ///
330 /// While seeking is in progress, [`pop_frame`](Self::pop_frame) returns
331 /// [`FrameResult::Seeking`] with the last successfully decoded frame as a
332 /// placeholder. Normal [`FrameResult::Frame`] values resume once the seek
333 /// completes.
334 ///
335 /// The seek uses the same frame-accurate strategy as [`seek`](Self::seek):
336 /// `FFmpeg` jumps to the nearest preceding I-frame, then frames before
337 /// `target_pts` are discarded before the first frame is made available.
338 ///
339 /// If called again before the previous seek completes, the new seek
340 /// supersedes the old one; the old worker exits at the next cancel check.
341 ///
342 /// # Panics
343 ///
344 /// Panics (inside the background worker thread) if the previous decode
345 /// thread panicked — an internal bug that should never occur in practice.
346 pub fn seek_async(&mut self, target_pts: Duration) {
347 log::debug!("async seek started target_pts={target_pts:?}");
348
349 self.seeking.store(true, Ordering::Release);
350 self.cancel.store(true, Ordering::Release);
351
352 if let Some(rx) = &self.rx {
353 while rx.try_recv().is_ok() {
354 self.buffered.fetch_sub(1, Ordering::Relaxed);
355 }
356 }
357
358 let old_handle = self.handle.take();
359 drop(self.rx.take());
360
361 let (new_tx, new_rx) = sync_channel(self.capacity);
362 self.rx = Some(new_rx);
363
364 let buffered = Arc::clone(&self.buffered);
365 let cancel = Arc::clone(&self.cancel);
366 let seeking = Arc::clone(&self.seeking);
367 let seek_event_tx = self.seek_tx.clone();
368
369 let worker = thread::spawn(move || -> VideoDecoder {
370 // Recover the decoder from the old thread. In normal operation the
371 // decode thread never panics so this always succeeds.
372 let Some(mut decoder) = old_handle.and_then(|h| h.join().ok()) else {
373 log::warn!(
374 "seek_async: failed to recover decoder \
375 target_pts={target_pts:?}"
376 );
377 if !cancel.load(Ordering::Acquire) {
378 seeking.store(false, Ordering::Release);
379 }
380 // Unreachable: the decode thread never panics in normal operation.
381 unreachable!("seek_async: decode thread panicked; cannot recover decoder");
382 };
383
384 if let Err(e) = decoder.seek(target_pts, SeekMode::Backward) {
385 log::warn!("seek_async seek failed target_pts={target_pts:?} error={e}");
386 if !cancel.load(Ordering::Acquire) {
387 seeking.store(false, Ordering::Release);
388 }
389 return decoder;
390 }
391
392 buffered.store(0, Ordering::Relaxed);
393 cancel.store(false, Ordering::Release);
394 // Mark seek as complete so pop_frame() transitions to blocking
395 // recv(). Only clear if no newer seek_async has superseded us.
396 if !cancel.load(Ordering::Acquire) {
397 seeking.store(false, Ordering::Release);
398 }
399
400 // Forward-decode discard: skip frames before target_pts.
401 loop {
402 if cancel.load(Ordering::Acquire) {
403 return decoder;
404 }
405 match decoder.decode_one() {
406 Ok(Some(frame)) => {
407 let pts = if frame.timestamp().is_valid() {
408 frame.timestamp().as_duration()
409 } else {
410 Duration::ZERO
411 };
412 if pts >= target_pts {
413 let first_pts = pts;
414 // Send the event BEFORE pushing the frame so that
415 // when pop_frame() wakes up the event is already in
416 // the seek_events channel (avoids a try_recv race).
417 let _ = seek_event_tx.send(SeekEvent::Completed { pts: first_pts });
418 if new_tx.send(frame).is_ok() {
419 buffered.fetch_add(1, Ordering::Relaxed);
420 } else {
421 return decoder; // receiver dropped
422 }
423 break;
424 }
425 // Frame before target — discard.
426 }
427 Ok(None) => return decoder, // EOF before target
428 Err(e) => {
429 log::warn!("seek_async discard error error={e}");
430 return decoder;
431 }
432 }
433 }
434
435 decode_loop(&mut decoder, &new_tx, &buffered, &cancel);
436 decoder
437 });
438
439 self.handle = Some(worker);
440 }
441
442 /// Shared helper for `seek` and `seek_coarse`.
443 ///
444 /// 1. Signals cancel, drains the channel, joins the thread to recover the decoder.
445 /// 2. Seeks the decoder to the nearest I-frame at or before `target_pts`.
446 /// 3. Resets the buffered counter, creates a fresh channel, clears the cancel flag.
447 ///
448 /// Returns `(decoder, SyncSender)` ready for the caller to spawn a new thread.
449 fn stop_and_seek(
450 &mut self,
451 target_pts: Duration,
452 ) -> Result<(VideoDecoder, SyncSender<VideoFrame>), PreviewError> {
453 // 1. Signal the background thread to exit its decode loop.
454 self.cancel.store(true, Ordering::Release);
455
456 // 2. Drain the channel so the background thread is not blocked on send().
457 if let Some(rx) = &self.rx {
458 while rx.try_recv().is_ok() {
459 self.buffered.fetch_sub(1, Ordering::Relaxed);
460 }
461 }
462
463 // 3. Join the thread to recover the decoder.
464 let mut decoder = self
465 .handle
466 .take()
467 .and_then(|h| h.join().ok())
468 .ok_or_else(|| PreviewError::SeekFailed {
469 target: target_pts,
470 reason: "decode thread unavailable for seek".into(),
471 })?;
472
473 // 4. Seek to the nearest I-frame at or before target_pts.
474 // avformat_seek_file with AVSEEK_FLAG_BACKWARD and avcodec_flush_buffers
475 // are handled inside VideoDecoder::seek (ff-decode/video/decoder_inner/seeking.rs).
476 decoder
477 .seek(target_pts, SeekMode::Backward)
478 .map_err(|e| PreviewError::SeekFailed {
479 target: target_pts,
480 reason: e.to_string(),
481 })?;
482
483 // 5. Reset counter, create a fresh channel, clear the cancel flag.
484 self.buffered.store(0, Ordering::Relaxed);
485 let (tx, rx) = sync_channel(self.capacity);
486 self.rx = Some(rx);
487 self.cancel.store(false, Ordering::Release);
488
489 Ok((decoder, tx))
490 }
491}
492
493impl Drop for DecodeBuffer {
494 fn drop(&mut self) {
495 // Signal cancel so the thread exits the decode loop promptly.
496 self.cancel.store(true, Ordering::Release);
497 // Drop the receiver so SyncSender::send() returns Err, unblocking the
498 // thread if it is waiting for space in a full channel.
499 drop(self.rx.take());
500 // Join (ignoring the returned decoder).
501 if let Some(h) = self.handle.take() {
502 let _ = h.join();
503 }
504 }
505}
506
507// ── decode_loop ───────────────────────────────────────────────────────────────
508
509/// Normal decode loop body shared between `build()` and the post-seek thread.
510///
511/// Exits when EOF is reached, a decode error occurs, or the `cancel` flag is set,
512/// or the receiver drops (i.e., `DecodeBuffer` was dropped).
513pub(super) fn decode_loop(
514 decoder: &mut VideoDecoder,
515 tx: &SyncSender<VideoFrame>,
516 buffered: &AtomicUsize,
517 cancel: &AtomicBool,
518) {
519 loop {
520 if cancel.load(Ordering::Acquire) {
521 break;
522 }
523 match decoder.decode_one() {
524 Ok(Some(frame)) => {
525 if tx.send(frame).is_ok() {
526 buffered.fetch_add(1, Ordering::Relaxed);
527 } else {
528 // Receiver was dropped — DecodeBuffer has been dropped.
529 break;
530 }
531 }
532 Ok(None) => break, // EOF
533 Err(e) => {
534 log::warn!("decode error in background thread error={e}");
535 break;
536 }
537 }
538 }
539}
540
541// ── Tests ─────────────────────────────────────────────────────────────────────
542
543#[cfg(test)]
544mod tests {
545 use super::*;
546 use std::path::Path;
547 use std::thread;
548
549 fn test_video_path() -> std::path::PathBuf {
550 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets/video/gameplay.mp4")
551 }
552
553 #[test]
554 fn decode_buffer_build_should_fail_for_nonexistent_file() {
555 let result = DecodeBuffer::open(Path::new("nonexistent_placeholder.mp4")).build();
556 assert!(
557 result.is_err(),
558 "build() must return Err for a non-existent file"
559 );
560 }
561
562 #[test]
563 fn decode_buffer_open_should_use_default_capacity() {
564 let path = test_video_path();
565 let buf = match DecodeBuffer::open(&path).build() {
566 Ok(buf) => buf,
567 Err(e) => {
568 println!("skipping: video file not available: {e}");
569 return;
570 }
571 };
572 // Buffer starts empty; frames arrive asynchronously.
573 assert_eq!(
574 buf.buffered_frames(),
575 0,
576 "buffer must report 0 before any frames have been consumed"
577 );
578 }
579
580 #[test]
581 fn decode_buffer_pop_frame_should_return_some_then_none_at_eof() {
582 let path = test_video_path();
583 let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
584 Ok(buf) => buf,
585 Err(e) => {
586 println!("skipping: video file not available: {e}");
587 return;
588 }
589 };
590 // Pop at least one frame to confirm the decoder is running.
591 assert!(
592 matches!(buf.pop_frame(), FrameResult::Frame(_)),
593 "pop_frame() must return Frame for a valid video file"
594 );
595 }
596
597 #[test]
598 fn seek_should_reposition_to_target_pts() {
599 let path = test_video_path();
600 let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
601 Ok(buf) => buf,
602 Err(e) => {
603 println!("skipping: video file not available: {e}");
604 return;
605 }
606 };
607
608 // Consume a few frames to advance past the start.
609 for _ in 0..5 {
610 if matches!(buf.pop_frame(), FrameResult::Eof) {
611 println!("skipping: EOF before seek target");
612 return;
613 }
614 }
615
616 let seek_target = Duration::from_secs(1);
617 match buf.seek(seek_target) {
618 Ok(()) => {}
619 Err(e) => {
620 println!("skipping: seek not supported or failed: {e}");
621 return;
622 }
623 }
624
625 // After seek, the first frame's PTS must be at or near the target.
626 let frame = match buf.pop_frame() {
627 FrameResult::Frame(f) => f,
628 FrameResult::Eof | FrameResult::Seeking(_) => {
629 println!("skipping: no frame after seek");
630 return;
631 }
632 };
633
634 if frame.timestamp().is_valid() {
635 let pts = frame.timestamp().as_duration();
636 // Allow ±1 second of tolerance (one GOP) for I-frame alignment.
637 assert!(
638 pts >= seek_target.saturating_sub(Duration::from_secs(1)),
639 "post-seek frame PTS must be near target; target={seek_target:?} pts={pts:?}"
640 );
641 }
642 }
643
644 #[test]
645 fn seek_should_fail_for_stopped_buffer() {
646 // Build with non-existent file → build() fails.
647 // This confirms seek errors are propagated correctly.
648 let result = DecodeBuffer::open(Path::new("nonexistent.mp4")).build();
649 assert!(
650 result.is_err(),
651 "build() must fail for non-existent file (precondition for seek error path)"
652 );
653 }
654
655 #[test]
656 fn seek_async_should_send_completed_event_with_first_frame_pts() {
657 let path = test_video_path();
658 let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
659 Ok(buf) => buf,
660 Err(e) => {
661 println!("skipping: video file not available: {e}");
662 return;
663 }
664 };
665
666 // Pop one frame to establish last_good_frame.
667 match buf.pop_frame() {
668 FrameResult::Frame(_) => {}
669 _ => {
670 println!("skipping: no initial frame available");
671 return;
672 }
673 }
674
675 let seek_target = Duration::from_secs(1);
676 buf.seek_async(seek_target);
677
678 // Drive the seek to completion by polling pop_frame.
679 let deadline = std::time::Instant::now() + Duration::from_secs(10);
680 loop {
681 assert!(
682 std::time::Instant::now() < deadline,
683 "timed out waiting for seek to complete"
684 );
685 match buf.pop_frame() {
686 FrameResult::Frame(_) => break, // seek done, first post-seek frame received
687 FrameResult::Seeking(_) => thread::sleep(Duration::from_millis(10)),
688 FrameResult::Eof => {
689 println!("skipping: EOF reached during seek event test");
690 return;
691 }
692 }
693 }
694
695 // After pop_frame returned Frame, SeekEvent::Completed must be in the channel.
696 let event = buf.seek_events().try_recv();
697 assert!(
698 event.is_ok(),
699 "expected SeekEvent::Completed after pop_frame returned Frame; got Err"
700 );
701 if let Ok(SeekEvent::Completed { pts }) = event {
702 assert!(
703 pts >= Duration::ZERO,
704 "seek event pts must be non-negative; got pts={pts:?}"
705 );
706 }
707 }
708
709 #[test]
710 fn seek_async_should_deliver_frames_after_completion() {
711 let path = test_video_path();
712 let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
713 Ok(buf) => buf,
714 Err(e) => {
715 println!("skipping: video file not available: {e}");
716 return;
717 }
718 };
719
720 // Pop one frame to establish last_good_frame.
721 match buf.pop_frame() {
722 FrameResult::Frame(_) => {}
723 _ => {
724 println!("skipping: no initial frame available");
725 return;
726 }
727 }
728
729 let seek_target = Duration::from_secs(1);
730 buf.seek_async(seek_target);
731
732 // Poll until a Frame arrives (seek complete) or we time out.
733 let deadline = std::time::Instant::now() + Duration::from_secs(10);
734 loop {
735 match buf.pop_frame() {
736 FrameResult::Frame(_) => break, // seek completed successfully
737 FrameResult::Seeking(_) => {
738 thread::sleep(Duration::from_millis(10));
739 }
740 FrameResult::Eof => {
741 println!("skipping: EOF reached during seek_async test");
742 return;
743 }
744 }
745 assert!(
746 std::time::Instant::now() < deadline,
747 "seek_async: timed out waiting for seek to complete"
748 );
749 }
750 }
751}