Skip to main content

ff_decode/video/
async_decoder.rs

1//! Async video decoder backed by `tokio::task::spawn_blocking`.
2
3use std::path::{Path, PathBuf};
4
5use ff_format::{PixelFormat, VideoFrame};
6use futures::stream::{self, Stream};
7
8use crate::async_decoder::AsyncDecoder;
9use crate::error::DecodeError;
10use crate::video::builder::{VideoDecoder, VideoDecoderBuilder};
11
12/// Async builder for [`AsyncVideoDecoder`] that mirrors the options available
13/// on the synchronous [`VideoDecoderBuilder`].
14///
15/// Obtain one with [`AsyncVideoDecoder::builder`]. Call [`build`](Self::build)
16/// to open the file asynchronously on a `spawn_blocking` thread.
17///
18/// # Examples
19///
20/// ```ignore
21/// use ff_decode::AsyncVideoDecoder;
22/// use ff_format::PixelFormat;
23///
24/// let decoder = AsyncVideoDecoder::builder("video.mp4")
25///     .output_format(PixelFormat::Rgb24)
26///     .build()
27///     .await?;
28/// ```
29pub struct AsyncVideoDecoderBuilder {
30    inner: VideoDecoderBuilder,
31}
32
33impl AsyncVideoDecoderBuilder {
34    fn new(path: PathBuf) -> Self {
35        Self {
36            inner: VideoDecoderBuilder::new(path),
37        }
38    }
39
40    /// Sets the output pixel format for decoded frames.
41    ///
42    /// Equivalent to [`VideoDecoderBuilder::output_format`].
43    #[must_use]
44    pub fn output_format(mut self, format: PixelFormat) -> Self {
45        self.inner = self.inner.output_format(format);
46        self
47    }
48
49    /// Scales decoded frames to exact dimensions.
50    ///
51    /// Equivalent to [`VideoDecoderBuilder::output_size`].
52    #[must_use]
53    pub fn output_size(mut self, width: u32, height: u32) -> Self {
54        self.inner = self.inner.output_size(width, height);
55        self
56    }
57
58    /// Scales decoded frames to the given width, preserving the aspect ratio.
59    ///
60    /// Equivalent to [`VideoDecoderBuilder::output_width`].
61    #[must_use]
62    pub fn output_width(mut self, width: u32) -> Self {
63        self.inner = self.inner.output_width(width);
64        self
65    }
66
67    /// Scales decoded frames to the given height, preserving the aspect ratio.
68    ///
69    /// Equivalent to [`VideoDecoderBuilder::output_height`].
70    #[must_use]
71    pub fn output_height(mut self, height: u32) -> Self {
72        self.inner = self.inner.output_height(height);
73        self
74    }
75
76    /// Opens the file and builds the async decoder.
77    ///
78    /// File I/O and codec initialisation run on a `spawn_blocking` thread so
79    /// the async executor is not blocked. All errors from
80    /// [`VideoDecoderBuilder::build`] are propagated transparently.
81    ///
82    /// # Errors
83    ///
84    /// Returns [`DecodeError`] if the file is missing, contains no video
85    /// stream, uses an unsupported codec, or has invalid output dimensions.
86    pub async fn build(self) -> Result<AsyncVideoDecoder, DecodeError> {
87        let builder = self.inner;
88        let decoder = tokio::task::spawn_blocking(move || builder.build())
89            .await
90            .map_err(|e| DecodeError::Ffmpeg {
91                code: 0,
92                message: format!("spawn_blocking panicked: {e}"),
93            })??;
94        Ok(AsyncVideoDecoder {
95            inner: AsyncDecoder::new(decoder),
96        })
97    }
98}
99
100/// Async wrapper around [`VideoDecoder`].
101///
102/// `open` and `decode_frame` both execute on a `spawn_blocking` thread so the
103/// Tokio executor is never blocked by `FFmpeg` I/O or decoding work.
104/// Multiple concurrent callers share the inner decoder through `Arc<Mutex<...>>`.
105///
106/// # Examples
107///
108/// ```ignore
109/// use ff_decode::AsyncVideoDecoder;
110/// use futures::StreamExt;
111///
112/// let mut decoder = AsyncVideoDecoder::open("video.mp4").await?;
113/// while let Some(frame) = decoder.decode_frame().await? {
114///     println!("frame at {:?}", frame.timestamp().as_duration());
115/// }
116/// ```
117pub struct AsyncVideoDecoder {
118    inner: AsyncDecoder<VideoDecoder>,
119}
120
121impl AsyncVideoDecoder {
122    /// Returns a builder for configuring the async video decoder.
123    ///
124    /// Use this when you need to control the output pixel format or frame
125    /// scaling. For zero-configuration decoding, prefer [`AsyncVideoDecoder::open`].
126    ///
127    /// # Examples
128    ///
129    /// ```ignore
130    /// use ff_decode::AsyncVideoDecoder;
131    /// use ff_format::PixelFormat;
132    ///
133    /// let decoder = AsyncVideoDecoder::builder("video.mp4")
134    ///     .output_format(PixelFormat::Rgb24)
135    ///     .output_size(640, 360)
136    ///     .build()
137    ///     .await?;
138    /// ```
139    pub fn builder(path: impl AsRef<Path>) -> AsyncVideoDecoderBuilder {
140        AsyncVideoDecoderBuilder::new(path.as_ref().to_path_buf())
141    }
142
143    /// Opens the video file asynchronously.
144    ///
145    /// File I/O and codec initialisation are performed on a `spawn_blocking`
146    /// thread so the async executor is not blocked.
147    ///
148    /// # Errors
149    ///
150    /// Returns [`DecodeError`] if the file is missing, contains no video
151    /// stream, or uses an unsupported codec.
152    pub async fn open(path: impl AsRef<Path> + Send + 'static) -> Result<Self, DecodeError> {
153        let path: PathBuf = path.as_ref().to_path_buf();
154        let decoder = tokio::task::spawn_blocking(move || VideoDecoder::open(&path).build())
155            .await
156            .map_err(|e| DecodeError::Ffmpeg {
157                code: 0,
158                message: format!("spawn_blocking panicked: {e}"),
159            })??;
160        Ok(Self {
161            inner: AsyncDecoder::new(decoder),
162        })
163    }
164
165    /// Decodes the next video frame.
166    ///
167    /// The blocking `FFmpeg` call is offloaded to a `spawn_blocking` thread so
168    /// the Tokio executor is never blocked.
169    ///
170    /// Returns `Ok(None)` at end of stream.
171    ///
172    /// # Errors
173    ///
174    /// Returns [`DecodeError`] on codec or I/O errors.
175    pub async fn decode_frame(&mut self) -> Result<Option<VideoFrame>, DecodeError> {
176        self.inner.with(VideoDecoder::decode_one).await
177    }
178
179    /// Converts this decoder into a [`Stream`] of video frames.
180    ///
181    /// Decoding is offloaded to a `spawn_blocking` thread on each poll via
182    /// [`Self::decode_frame`]. The stream is `Send` and can be used with
183    /// [`tokio::spawn`].
184    ///
185    /// The stream ends when the file is exhausted (`Ok(None)` from
186    /// `decode_frame`). Errors are yielded as `Err` items; the stream
187    /// terminates after the first error.
188    pub fn into_stream(self) -> impl Stream<Item = Result<VideoFrame, DecodeError>> + Send {
189        stream::unfold(Some(self), |state| async move {
190            let mut decoder = state?; // None → stream already ended
191            match decoder.decode_frame().await {
192                Ok(Some(frame)) => Some((Ok(frame), Some(decoder))),
193                Ok(None) => None,               // EOF → end stream
194                Err(e) => Some((Err(e), None)), // error → yield once, then end
195            }
196        })
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203
204    // Compile-time proof that AsyncVideoDecoder satisfies the Send bound
205    // required by tokio::spawn and other Send-requiring contexts.
206    fn _assert_send() {
207        fn is_send<T: Send>() {}
208        is_send::<AsyncVideoDecoder>();
209    }
210
211    #[tokio::test]
212    async fn async_video_decoder_should_fail_on_missing_file() {
213        let result = AsyncVideoDecoder::open("/nonexistent/path/video.mp4").await;
214        assert!(
215            matches!(result, Err(DecodeError::FileNotFound { .. })),
216            "expected FileNotFound"
217        );
218    }
219
220    #[tokio::test]
221    async fn async_video_decoder_builder_output_format_should_propagate_to_sync_builder() {
222        let result = AsyncVideoDecoder::builder("/nonexistent/path/video.mp4")
223            .output_format(PixelFormat::Rgb24)
224            .build()
225            .await;
226        assert!(
227            matches!(result, Err(DecodeError::FileNotFound { .. })),
228            "builder with output_format must propagate FileNotFound"
229        );
230    }
231
232    #[tokio::test]
233    async fn async_video_decoder_builder_zero_size_should_return_invalid_dimensions() {
234        let result = AsyncVideoDecoder::builder("/nonexistent/path/video.mp4")
235            .output_size(0, 480)
236            .build()
237            .await;
238        assert!(
239            matches!(result, Err(DecodeError::InvalidOutputDimensions { .. })),
240            "output_size(0, 480) must return InvalidOutputDimensions"
241        );
242    }
243
244    /// Verifies the `Option<S>` unfold pattern used by `into_stream`: after the
245    /// error arm sets state to `None`, the stream yields no further items.
246    ///
247    /// Acceptance criterion for issue #1006.
248    #[tokio::test]
249    async fn into_stream_state_machine_should_terminate_after_error() {
250        use futures::StreamExt;
251
252        // Use the exact same stream::unfold(Option<S>, ...) pattern as
253        // into_stream() with a controlled error at position 2.
254        let items: Vec<Result<u32, u32>> = stream::unfold(Some(0u32), |state| async move {
255            let n = state?; // None → stream ends
256            match n {
257                0 | 1 => Some((Ok(n), Some(n + 1))),
258                _ => Some((Err(n), None)), // error → yield once, then end
259            }
260        })
261        .collect()
262        .await;
263
264        assert_eq!(
265            items.len(),
266            3,
267            "stream must stop after the error: expected 2 Ok + 1 Err, got {items:?}"
268        );
269        assert!(items[0].is_ok());
270        assert!(items[1].is_ok());
271        assert!(items[2].is_err());
272    }
273}