ff_decode/video/async_decoder.rs
1//! Async video decoder backed by `tokio::task::spawn_blocking`.
2
3use std::path::{Path, PathBuf};
4
5use ff_format::{PixelFormat, VideoFrame};
6use futures::stream::{self, Stream};
7
8use crate::async_decoder::AsyncDecoder;
9use crate::error::DecodeError;
10use crate::video::builder::{VideoDecoder, VideoDecoderBuilder};
11
12/// Async builder for [`AsyncVideoDecoder`] that mirrors the options available
13/// on the synchronous [`VideoDecoderBuilder`].
14///
15/// Obtain one with [`AsyncVideoDecoder::builder`]. Call [`build`](Self::build)
16/// to open the file asynchronously on a `spawn_blocking` thread.
17///
18/// # Examples
19///
20/// ```ignore
21/// use ff_decode::AsyncVideoDecoder;
22/// use ff_format::PixelFormat;
23///
24/// let decoder = AsyncVideoDecoder::builder("video.mp4")
25/// .output_format(PixelFormat::Rgb24)
26/// .build()
27/// .await?;
28/// ```
29pub struct AsyncVideoDecoderBuilder {
30 inner: VideoDecoderBuilder,
31}
32
33impl AsyncVideoDecoderBuilder {
34 fn new(path: PathBuf) -> Self {
35 Self {
36 inner: VideoDecoderBuilder::new(path),
37 }
38 }
39
40 /// Sets the output pixel format for decoded frames.
41 ///
42 /// Equivalent to [`VideoDecoderBuilder::output_format`].
43 #[must_use]
44 pub fn output_format(mut self, format: PixelFormat) -> Self {
45 self.inner = self.inner.output_format(format);
46 self
47 }
48
49 /// Scales decoded frames to exact dimensions.
50 ///
51 /// Equivalent to [`VideoDecoderBuilder::output_size`].
52 #[must_use]
53 pub fn output_size(mut self, width: u32, height: u32) -> Self {
54 self.inner = self.inner.output_size(width, height);
55 self
56 }
57
58 /// Scales decoded frames to the given width, preserving the aspect ratio.
59 ///
60 /// Equivalent to [`VideoDecoderBuilder::output_width`].
61 #[must_use]
62 pub fn output_width(mut self, width: u32) -> Self {
63 self.inner = self.inner.output_width(width);
64 self
65 }
66
67 /// Scales decoded frames to the given height, preserving the aspect ratio.
68 ///
69 /// Equivalent to [`VideoDecoderBuilder::output_height`].
70 #[must_use]
71 pub fn output_height(mut self, height: u32) -> Self {
72 self.inner = self.inner.output_height(height);
73 self
74 }
75
76 /// Opens the file and builds the async decoder.
77 ///
78 /// File I/O and codec initialisation run on a `spawn_blocking` thread so
79 /// the async executor is not blocked. All errors from
80 /// [`VideoDecoderBuilder::build`] are propagated transparently.
81 ///
82 /// # Errors
83 ///
84 /// Returns [`DecodeError`] if the file is missing, contains no video
85 /// stream, uses an unsupported codec, or has invalid output dimensions.
86 pub async fn build(self) -> Result<AsyncVideoDecoder, DecodeError> {
87 let builder = self.inner;
88 let decoder = tokio::task::spawn_blocking(move || builder.build())
89 .await
90 .map_err(|e| DecodeError::Ffmpeg {
91 code: 0,
92 message: format!("spawn_blocking panicked: {e}"),
93 })??;
94 Ok(AsyncVideoDecoder {
95 inner: AsyncDecoder::new(decoder),
96 })
97 }
98}
99
100/// Async wrapper around [`VideoDecoder`].
101///
102/// `open` and `decode_frame` both execute on a `spawn_blocking` thread so the
103/// Tokio executor is never blocked by `FFmpeg` I/O or decoding work.
104/// Multiple concurrent callers share the inner decoder through `Arc<Mutex<...>>`.
105///
106/// # Examples
107///
108/// ```ignore
109/// use ff_decode::AsyncVideoDecoder;
110/// use futures::StreamExt;
111///
112/// let mut decoder = AsyncVideoDecoder::open("video.mp4").await?;
113/// while let Some(frame) = decoder.decode_frame().await? {
114/// println!("frame at {:?}", frame.timestamp().as_duration());
115/// }
116/// ```
117pub struct AsyncVideoDecoder {
118 inner: AsyncDecoder<VideoDecoder>,
119}
120
121impl AsyncVideoDecoder {
122 /// Returns a builder for configuring the async video decoder.
123 ///
124 /// Use this when you need to control the output pixel format or frame
125 /// scaling. For zero-configuration decoding, prefer [`AsyncVideoDecoder::open`].
126 ///
127 /// # Examples
128 ///
129 /// ```ignore
130 /// use ff_decode::AsyncVideoDecoder;
131 /// use ff_format::PixelFormat;
132 ///
133 /// let decoder = AsyncVideoDecoder::builder("video.mp4")
134 /// .output_format(PixelFormat::Rgb24)
135 /// .output_size(640, 360)
136 /// .build()
137 /// .await?;
138 /// ```
139 pub fn builder(path: impl AsRef<Path>) -> AsyncVideoDecoderBuilder {
140 AsyncVideoDecoderBuilder::new(path.as_ref().to_path_buf())
141 }
142
143 /// Opens the video file asynchronously.
144 ///
145 /// File I/O and codec initialisation are performed on a `spawn_blocking`
146 /// thread so the async executor is not blocked.
147 ///
148 /// # Errors
149 ///
150 /// Returns [`DecodeError`] if the file is missing, contains no video
151 /// stream, or uses an unsupported codec.
152 pub async fn open(path: impl AsRef<Path> + Send + 'static) -> Result<Self, DecodeError> {
153 let path: PathBuf = path.as_ref().to_path_buf();
154 let decoder = tokio::task::spawn_blocking(move || VideoDecoder::open(&path).build())
155 .await
156 .map_err(|e| DecodeError::Ffmpeg {
157 code: 0,
158 message: format!("spawn_blocking panicked: {e}"),
159 })??;
160 Ok(Self {
161 inner: AsyncDecoder::new(decoder),
162 })
163 }
164
165 /// Decodes the next video frame.
166 ///
167 /// The blocking `FFmpeg` call is offloaded to a `spawn_blocking` thread so
168 /// the Tokio executor is never blocked.
169 ///
170 /// Returns `Ok(None)` at end of stream.
171 ///
172 /// # Errors
173 ///
174 /// Returns [`DecodeError`] on codec or I/O errors.
175 pub async fn decode_frame(&mut self) -> Result<Option<VideoFrame>, DecodeError> {
176 self.inner.with(VideoDecoder::decode_one).await
177 }
178
179 /// Converts this decoder into a [`Stream`] of video frames.
180 ///
181 /// Decoding is offloaded to a `spawn_blocking` thread on each poll via
182 /// [`Self::decode_frame`]. The stream is `Send` and can be used with
183 /// [`tokio::spawn`].
184 ///
185 /// The stream ends when the file is exhausted (`Ok(None)` from
186 /// `decode_frame`). Errors are yielded as `Err` items; the stream
187 /// terminates after the first error.
188 pub fn into_stream(self) -> impl Stream<Item = Result<VideoFrame, DecodeError>> + Send {
189 stream::unfold(Some(self), |state| async move {
190 let mut decoder = state?; // None → stream already ended
191 match decoder.decode_frame().await {
192 Ok(Some(frame)) => Some((Ok(frame), Some(decoder))),
193 Ok(None) => None, // EOF → end stream
194 Err(e) => Some((Err(e), None)), // error → yield once, then end
195 }
196 })
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203
204 // Compile-time proof that AsyncVideoDecoder satisfies the Send bound
205 // required by tokio::spawn and other Send-requiring contexts.
206 fn _assert_send() {
207 fn is_send<T: Send>() {}
208 is_send::<AsyncVideoDecoder>();
209 }
210
211 #[tokio::test]
212 async fn async_video_decoder_should_fail_on_missing_file() {
213 let result = AsyncVideoDecoder::open("/nonexistent/path/video.mp4").await;
214 assert!(
215 matches!(result, Err(DecodeError::FileNotFound { .. })),
216 "expected FileNotFound"
217 );
218 }
219
220 #[tokio::test]
221 async fn async_video_decoder_builder_output_format_should_propagate_to_sync_builder() {
222 let result = AsyncVideoDecoder::builder("/nonexistent/path/video.mp4")
223 .output_format(PixelFormat::Rgb24)
224 .build()
225 .await;
226 assert!(
227 matches!(result, Err(DecodeError::FileNotFound { .. })),
228 "builder with output_format must propagate FileNotFound"
229 );
230 }
231
232 #[tokio::test]
233 async fn async_video_decoder_builder_zero_size_should_return_invalid_dimensions() {
234 let result = AsyncVideoDecoder::builder("/nonexistent/path/video.mp4")
235 .output_size(0, 480)
236 .build()
237 .await;
238 assert!(
239 matches!(result, Err(DecodeError::InvalidOutputDimensions { .. })),
240 "output_size(0, 480) must return InvalidOutputDimensions"
241 );
242 }
243
244 /// Verifies the `Option<S>` unfold pattern used by `into_stream`: after the
245 /// error arm sets state to `None`, the stream yields no further items.
246 ///
247 /// Acceptance criterion for issue #1006.
248 #[tokio::test]
249 async fn into_stream_state_machine_should_terminate_after_error() {
250 use futures::StreamExt;
251
252 // Use the exact same stream::unfold(Option<S>, ...) pattern as
253 // into_stream() with a controlled error at position 2.
254 let items: Vec<Result<u32, u32>> = stream::unfold(Some(0u32), |state| async move {
255 let n = state?; // None → stream ends
256 match n {
257 0 | 1 => Some((Ok(n), Some(n + 1))),
258 _ => Some((Err(n), None)), // error → yield once, then end
259 }
260 })
261 .collect()
262 .await;
263
264 assert_eq!(
265 items.len(),
266 3,
267 "stream must stop after the error: expected 2 Ok + 1 Err, got {items:?}"
268 );
269 assert!(items[0].is_ok());
270 assert!(items[1].is_ok());
271 assert!(items[2].is_err());
272 }
273}