use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, Sender, SyncSender, channel, sync_channel};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use ff_decode::{HardwareAccel, SeekMode, VideoDecoder};
use ff_format::VideoFrame;
use crate::error::PreviewError;
const DEFAULT_DECODE_BUFFER_CAPACITY: usize = 8;
#[derive(Debug, Clone)]
pub enum FrameResult {
Frame(VideoFrame),
Seeking(Option<VideoFrame>),
Eof,
}
#[derive(Debug)]
pub enum SeekEvent {
Completed { pts: Duration },
}
pub struct DecodeBufferBuilder {
pub(super) path: PathBuf,
pub(super) capacity: usize,
pub(super) hw_accel: HardwareAccel,
}
impl DecodeBufferBuilder {
#[must_use]
pub fn capacity(self, n: usize) -> Self {
Self {
capacity: n,
..self
}
}
#[must_use]
pub fn hardware_accel(self, accel: HardwareAccel) -> Self {
Self {
hw_accel: accel,
..self
}
}
pub fn build(self) -> Result<DecodeBuffer, PreviewError> {
let mut decoder = VideoDecoder::open(&self.path)
.hardware_accel(self.hw_accel)
.build()?;
let (tx, rx) = sync_channel(self.capacity);
let buffered = Arc::new(AtomicUsize::new(0));
let cancel = Arc::new(AtomicBool::new(false));
let buffered_thread = Arc::clone(&buffered);
let cancel_thread = Arc::clone(&cancel);
let (seek_tx, seek_rx) = channel::<SeekEvent>();
let (error_tx, error_rx) = channel::<String>();
let error_tx_thread = error_tx.clone();
let handle = thread::spawn(move || -> VideoDecoder {
decode_loop(
&mut decoder,
&tx,
&buffered_thread,
&cancel_thread,
&error_tx_thread,
);
decoder
});
Ok(DecodeBuffer {
rx: Some(rx),
buffered,
handle: Some(handle),
cancel,
capacity: self.capacity,
seeking: Arc::new(AtomicBool::new(false)),
last_good_frame: None,
seek_tx,
seek_rx,
error_tx,
error_rx,
})
}
}
pub struct DecodeBuffer {
rx: Option<Receiver<VideoFrame>>,
buffered: Arc<AtomicUsize>,
handle: Option<JoinHandle<VideoDecoder>>,
cancel: Arc<AtomicBool>,
capacity: usize,
seeking: Arc<AtomicBool>,
last_good_frame: Option<VideoFrame>,
seek_tx: Sender<SeekEvent>,
seek_rx: Receiver<SeekEvent>,
error_tx: Sender<String>,
error_rx: Receiver<String>,
}
impl DecodeBuffer {
#[must_use]
pub fn open(path: &Path) -> DecodeBufferBuilder {
DecodeBufferBuilder {
path: path.to_path_buf(),
capacity: DEFAULT_DECODE_BUFFER_CAPACITY,
hw_accel: HardwareAccel::Auto,
}
}
#[must_use]
pub fn pop_frame(&mut self) -> FrameResult {
if self.seeking.load(Ordering::Acquire) {
return FrameResult::Seeking(self.last_good_frame.clone());
}
match self.rx.as_ref().and_then(|rx| rx.recv().ok()) {
Some(frame) => {
self.buffered.fetch_sub(1, Ordering::Relaxed);
self.last_good_frame = Some(frame.clone());
FrameResult::Frame(frame)
}
None => FrameResult::Eof,
}
}
#[must_use]
pub fn buffered_frames(&self) -> usize {
self.buffered.load(Ordering::Relaxed)
}
#[must_use]
pub fn seek_events(&self) -> &Receiver<SeekEvent> {
&self.seek_rx
}
#[must_use]
pub fn error_events(&self) -> &Receiver<String> {
&self.error_rx
}
pub fn seek(&mut self, target_pts: Duration) -> Result<(), PreviewError> {
let (mut decoder, tx) = self.stop_and_seek(target_pts)?;
let buffered_thread = Arc::clone(&self.buffered);
let cancel_thread = Arc::clone(&self.cancel);
let error_tx_thread = self.error_tx.clone();
self.handle = Some(thread::spawn(move || -> VideoDecoder {
loop {
if cancel_thread.load(Ordering::Acquire) {
return decoder;
}
match decoder.decode_one() {
Ok(Some(frame)) => {
let pts = if frame.timestamp().is_valid() {
frame.timestamp().as_duration()
} else {
Duration::ZERO
};
if pts >= target_pts {
if tx.send(frame).is_ok() {
buffered_thread.fetch_add(1, Ordering::Relaxed);
} else {
return decoder; }
break; }
}
Ok(None) => return decoder, Err(e) => {
log::warn!("decode error during seek discard error={e}");
let _ = error_tx_thread.send(e.to_string());
return decoder;
}
}
}
decode_loop(
&mut decoder,
&tx,
&buffered_thread,
&cancel_thread,
&error_tx_thread,
);
decoder
}));
Ok(())
}
pub fn seek_coarse(&mut self, target_pts: Duration) -> Result<(), PreviewError> {
log::debug!("coarse seek target_pts={target_pts:?}");
let (mut decoder, tx) = self.stop_and_seek(target_pts)?;
let buffered_thread = Arc::clone(&self.buffered);
let cancel_thread = Arc::clone(&self.cancel);
let error_tx_thread = self.error_tx.clone();
self.handle = Some(thread::spawn(move || -> VideoDecoder {
decode_loop(
&mut decoder,
&tx,
&buffered_thread,
&cancel_thread,
&error_tx_thread,
);
decoder
}));
Ok(())
}
pub fn seek_async(&mut self, target_pts: Duration) {
log::debug!("async seek started target_pts={target_pts:?}");
self.seeking.store(true, Ordering::Release);
self.cancel.store(true, Ordering::Release);
if let Some(rx) = &self.rx {
while rx.try_recv().is_ok() {
self.buffered.fetch_sub(1, Ordering::Relaxed);
}
}
let old_handle = self.handle.take();
drop(self.rx.take());
let (new_tx, new_rx) = sync_channel(self.capacity);
self.rx = Some(new_rx);
let buffered = Arc::clone(&self.buffered);
let cancel = Arc::clone(&self.cancel);
let seeking = Arc::clone(&self.seeking);
let seek_event_tx = self.seek_tx.clone();
let error_tx_async = self.error_tx.clone();
let worker = thread::spawn(move || -> VideoDecoder {
let Some(mut decoder) = old_handle.and_then(|h| h.join().ok()) else {
log::warn!(
"seek_async: failed to recover decoder \
target_pts={target_pts:?}"
);
if !cancel.load(Ordering::Acquire) {
seeking.store(false, Ordering::Release);
}
unreachable!("seek_async: decode thread panicked; cannot recover decoder");
};
if let Err(e) = decoder.seek(target_pts, SeekMode::Backward) {
log::warn!("seek_async seek failed target_pts={target_pts:?} error={e}");
if !cancel.load(Ordering::Acquire) {
seeking.store(false, Ordering::Release);
}
return decoder;
}
buffered.store(0, Ordering::Relaxed);
cancel.store(false, Ordering::Release);
if !cancel.load(Ordering::Acquire) {
seeking.store(false, Ordering::Release);
}
loop {
if cancel.load(Ordering::Acquire) {
return decoder;
}
match decoder.decode_one() {
Ok(Some(frame)) => {
let pts = if frame.timestamp().is_valid() {
frame.timestamp().as_duration()
} else {
Duration::ZERO
};
if pts >= target_pts {
let first_pts = pts;
let _ = seek_event_tx.send(SeekEvent::Completed { pts: first_pts });
if new_tx.send(frame).is_ok() {
buffered.fetch_add(1, Ordering::Relaxed);
} else {
return decoder; }
break;
}
}
Ok(None) => return decoder, Err(e) => {
log::warn!("seek_async discard error error={e}");
let _ = error_tx_async.send(e.to_string());
return decoder;
}
}
}
decode_loop(&mut decoder, &new_tx, &buffered, &cancel, &error_tx_async);
decoder
});
self.handle = Some(worker);
}
fn stop_and_seek(
&mut self,
target_pts: Duration,
) -> Result<(VideoDecoder, SyncSender<VideoFrame>), PreviewError> {
self.cancel.store(true, Ordering::Release);
if let Some(rx) = &self.rx {
while rx.try_recv().is_ok() {
self.buffered.fetch_sub(1, Ordering::Relaxed);
}
}
let mut decoder = self
.handle
.take()
.and_then(|h| h.join().ok())
.ok_or_else(|| PreviewError::SeekFailed {
target: target_pts,
reason: "decode thread unavailable for seek".into(),
})?;
decoder
.seek(target_pts, SeekMode::Backward)
.map_err(|e| PreviewError::SeekFailed {
target: target_pts,
reason: e.to_string(),
})?;
self.buffered.store(0, Ordering::Relaxed);
let (tx, rx) = sync_channel(self.capacity);
self.rx = Some(rx);
self.cancel.store(false, Ordering::Release);
Ok((decoder, tx))
}
}
impl Drop for DecodeBuffer {
fn drop(&mut self) {
self.cancel.store(true, Ordering::Release);
drop(self.rx.take());
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
pub(super) fn decode_loop(
decoder: &mut VideoDecoder,
tx: &SyncSender<VideoFrame>,
buffered: &AtomicUsize,
cancel: &AtomicBool,
error_tx: &Sender<String>,
) {
loop {
if cancel.load(Ordering::Acquire) {
break;
}
match decoder.decode_one() {
Ok(Some(frame)) => {
if tx.send(frame).is_ok() {
buffered.fetch_add(1, Ordering::Relaxed);
} else {
break;
}
}
Ok(None) => break, Err(e) => {
log::warn!("decode error in background thread error={e}");
let _ = error_tx.send(e.to_string());
break;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
use std::thread;
fn test_video_path() -> std::path::PathBuf {
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets/video/gameplay.mp4")
}
#[test]
fn decode_buffer_build_should_fail_for_nonexistent_file() {
let result = DecodeBuffer::open(Path::new("nonexistent_placeholder.mp4")).build();
assert!(
result.is_err(),
"build() must return Err for a non-existent file"
);
}
#[test]
fn decode_buffer_open_should_use_default_capacity() {
let path = test_video_path();
let buf = match DecodeBuffer::open(&path).build() {
Ok(buf) => buf,
Err(e) => {
println!("skipping: video file not available: {e}");
return;
}
};
assert_eq!(
buf.buffered_frames(),
0,
"buffer must report 0 before any frames have been consumed"
);
}
#[test]
fn decode_buffer_pop_frame_should_return_some_then_none_at_eof() {
let path = test_video_path();
let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
Ok(buf) => buf,
Err(e) => {
println!("skipping: video file not available: {e}");
return;
}
};
assert!(
matches!(buf.pop_frame(), FrameResult::Frame(_)),
"pop_frame() must return Frame for a valid video file"
);
}
#[test]
fn seek_should_reposition_to_target_pts() {
let path = test_video_path();
let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
Ok(buf) => buf,
Err(e) => {
println!("skipping: video file not available: {e}");
return;
}
};
for _ in 0..5 {
if matches!(buf.pop_frame(), FrameResult::Eof) {
println!("skipping: EOF before seek target");
return;
}
}
let seek_target = Duration::from_secs(1);
match buf.seek(seek_target) {
Ok(()) => {}
Err(e) => {
println!("skipping: seek not supported or failed: {e}");
return;
}
}
let frame = match buf.pop_frame() {
FrameResult::Frame(f) => f,
FrameResult::Eof | FrameResult::Seeking(_) => {
println!("skipping: no frame after seek");
return;
}
};
if frame.timestamp().is_valid() {
let pts = frame.timestamp().as_duration();
assert!(
pts >= seek_target.saturating_sub(Duration::from_secs(1)),
"post-seek frame PTS must be near target; target={seek_target:?} pts={pts:?}"
);
}
}
#[test]
fn seek_should_fail_for_stopped_buffer() {
let result = DecodeBuffer::open(Path::new("nonexistent.mp4")).build();
assert!(
result.is_err(),
"build() must fail for non-existent file (precondition for seek error path)"
);
}
#[test]
fn seek_async_should_send_completed_event_with_first_frame_pts() {
let path = test_video_path();
let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
Ok(buf) => buf,
Err(e) => {
println!("skipping: video file not available: {e}");
return;
}
};
match buf.pop_frame() {
FrameResult::Frame(_) => {}
_ => {
println!("skipping: no initial frame available");
return;
}
}
let seek_target = Duration::from_secs(1);
buf.seek_async(seek_target);
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
assert!(
std::time::Instant::now() < deadline,
"timed out waiting for seek to complete"
);
match buf.pop_frame() {
FrameResult::Frame(_) => break, FrameResult::Seeking(_) => thread::sleep(Duration::from_millis(10)),
FrameResult::Eof => {
println!("skipping: EOF reached during seek event test");
return;
}
}
}
let event = buf.seek_events().try_recv();
assert!(
event.is_ok(),
"expected SeekEvent::Completed after pop_frame returned Frame; got Err"
);
if let Ok(SeekEvent::Completed { pts }) = event {
assert!(
pts >= Duration::ZERO,
"seek event pts must be non-negative; got pts={pts:?}"
);
}
}
#[test]
fn seek_async_should_deliver_frames_after_completion() {
let path = test_video_path();
let mut buf = match DecodeBuffer::open(&path).capacity(4).build() {
Ok(buf) => buf,
Err(e) => {
println!("skipping: video file not available: {e}");
return;
}
};
match buf.pop_frame() {
FrameResult::Frame(_) => {}
_ => {
println!("skipping: no initial frame available");
return;
}
}
let seek_target = Duration::from_secs(1);
buf.seek_async(seek_target);
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
match buf.pop_frame() {
FrameResult::Frame(_) => break, FrameResult::Seeking(_) => {
thread::sleep(Duration::from_millis(10));
}
FrameResult::Eof => {
println!("skipping: EOF reached during seek_async test");
return;
}
}
assert!(
std::time::Instant::now() < deadline,
"seek_async: timed out waiting for seek to complete"
);
}
}
}