use std::path::{Path, PathBuf};
use ff_format::{PixelFormat, VideoFrame};
use futures::stream::{self, Stream};
use crate::async_decoder::AsyncDecoder;
use crate::error::DecodeError;
use crate::video::builder::{VideoDecoder, VideoDecoderBuilder};
pub struct AsyncVideoDecoderBuilder {
inner: VideoDecoderBuilder,
}
impl AsyncVideoDecoderBuilder {
fn new(path: PathBuf) -> Self {
Self {
inner: VideoDecoderBuilder::new(path),
}
}
#[must_use]
pub fn output_format(mut self, format: PixelFormat) -> Self {
self.inner = self.inner.output_format(format);
self
}
#[must_use]
pub fn output_size(mut self, width: u32, height: u32) -> Self {
self.inner = self.inner.output_size(width, height);
self
}
#[must_use]
pub fn output_width(mut self, width: u32) -> Self {
self.inner = self.inner.output_width(width);
self
}
#[must_use]
pub fn output_height(mut self, height: u32) -> Self {
self.inner = self.inner.output_height(height);
self
}
pub async fn build(self) -> Result<AsyncVideoDecoder, DecodeError> {
let builder = self.inner;
let decoder = tokio::task::spawn_blocking(move || builder.build())
.await
.map_err(|e| DecodeError::Ffmpeg {
code: 0,
message: format!("spawn_blocking panicked: {e}"),
})??;
Ok(AsyncVideoDecoder {
inner: AsyncDecoder::new(decoder),
})
}
}
pub struct AsyncVideoDecoder {
inner: AsyncDecoder<VideoDecoder>,
}
impl AsyncVideoDecoder {
pub fn builder(path: impl AsRef<Path>) -> AsyncVideoDecoderBuilder {
AsyncVideoDecoderBuilder::new(path.as_ref().to_path_buf())
}
pub async fn open(path: impl AsRef<Path> + Send + 'static) -> Result<Self, DecodeError> {
let path: PathBuf = path.as_ref().to_path_buf();
let decoder = tokio::task::spawn_blocking(move || VideoDecoder::open(&path).build())
.await
.map_err(|e| DecodeError::Ffmpeg {
code: 0,
message: format!("spawn_blocking panicked: {e}"),
})??;
Ok(Self {
inner: AsyncDecoder::new(decoder),
})
}
pub async fn decode_frame(&mut self) -> Result<Option<VideoFrame>, DecodeError> {
self.inner.with(VideoDecoder::decode_one).await
}
pub fn into_stream(self) -> impl Stream<Item = Result<VideoFrame, DecodeError>> + Send {
stream::unfold(Some(self), |state| async move {
let mut decoder = state?; match decoder.decode_frame().await {
Ok(Some(frame)) => Some((Ok(frame), Some(decoder))),
Ok(None) => None, Err(e) => Some((Err(e), None)), }
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn _assert_send() {
fn is_send<T: Send>() {}
is_send::<AsyncVideoDecoder>();
}
#[tokio::test]
async fn async_video_decoder_should_fail_on_missing_file() {
let result = AsyncVideoDecoder::open("/nonexistent/path/video.mp4").await;
assert!(
matches!(result, Err(DecodeError::FileNotFound { .. })),
"expected FileNotFound"
);
}
#[tokio::test]
async fn async_video_decoder_builder_output_format_should_propagate_to_sync_builder() {
let result = AsyncVideoDecoder::builder("/nonexistent/path/video.mp4")
.output_format(PixelFormat::Rgb24)
.build()
.await;
assert!(
matches!(result, Err(DecodeError::FileNotFound { .. })),
"builder with output_format must propagate FileNotFound"
);
}
#[tokio::test]
async fn async_video_decoder_builder_zero_size_should_return_invalid_dimensions() {
let result = AsyncVideoDecoder::builder("/nonexistent/path/video.mp4")
.output_size(0, 480)
.build()
.await;
assert!(
matches!(result, Err(DecodeError::InvalidOutputDimensions { .. })),
"output_size(0, 480) must return InvalidOutputDimensions"
);
}
#[tokio::test]
async fn into_stream_state_machine_should_terminate_after_error() {
use futures::StreamExt;
let items: Vec<Result<u32, u32>> = stream::unfold(Some(0u32), |state| async move {
let n = state?; match n {
0 | 1 => Some((Ok(n), Some(n + 1))),
_ => Some((Err(n), None)), }
})
.collect()
.await;
assert_eq!(
items.len(),
3,
"stream must stop after the error: expected 2 Ok + 1 Err, got {items:?}"
);
assert!(items[0].is_ok());
assert!(items[1].is_ok());
assert!(items[2].is_err());
}
}