Skip to main content

ff_decode/audio/
async_decoder.rs

1//! Async audio decoder backed by `tokio::task::spawn_blocking`.
2
3use std::path::{Path, PathBuf};
4
5use ff_format::{AudioFrame, SampleFormat};
6use futures::stream::{self, Stream};
7
8use crate::async_decoder::AsyncDecoder;
9use crate::audio::builder::{AudioDecoder, AudioDecoderBuilder};
10use crate::error::DecodeError;
11
12/// Async builder for [`AsyncAudioDecoder`] that mirrors the options available
13/// on the synchronous [`AudioDecoderBuilder`].
14///
15/// Obtain one with [`AsyncAudioDecoder::builder`]. Call [`build`](Self::build)
16/// to open the file asynchronously on a `spawn_blocking` thread.
17///
18/// # Examples
19///
20/// ```ignore
21/// use ff_decode::AsyncAudioDecoder;
22/// use ff_format::SampleFormat;
23///
24/// let decoder = AsyncAudioDecoder::builder("audio.mp3")
25///     .output_format(SampleFormat::F32)
26///     .output_sample_rate(48_000)
27///     .build()
28///     .await?;
29/// ```
30pub struct AsyncAudioDecoderBuilder {
31    inner: AudioDecoderBuilder,
32}
33
34impl AsyncAudioDecoderBuilder {
35    fn new(path: PathBuf) -> Self {
36        Self {
37            inner: AudioDecoderBuilder::new(path),
38        }
39    }
40
41    /// Sets the output sample format for decoded frames.
42    ///
43    /// Equivalent to [`AudioDecoderBuilder::output_format`].
44    #[must_use]
45    pub fn output_format(mut self, format: SampleFormat) -> Self {
46        self.inner = self.inner.output_format(format);
47        self
48    }
49
50    /// Sets the output sample rate in Hz.
51    ///
52    /// Equivalent to [`AudioDecoderBuilder::output_sample_rate`].
53    #[must_use]
54    pub fn output_sample_rate(mut self, sample_rate: u32) -> Self {
55        self.inner = self.inner.output_sample_rate(sample_rate);
56        self
57    }
58
59    /// Sets the output channel count.
60    ///
61    /// Equivalent to [`AudioDecoderBuilder::output_channels`].
62    #[must_use]
63    pub fn output_channels(mut self, channels: u32) -> Self {
64        self.inner = self.inner.output_channels(channels);
65        self
66    }
67
68    /// Opens the file and builds the async decoder.
69    ///
70    /// File I/O and codec initialisation run on a `spawn_blocking` thread so
71    /// the async executor is not blocked. All errors from
72    /// [`AudioDecoderBuilder::build`] are propagated transparently.
73    ///
74    /// # Errors
75    ///
76    /// Returns [`DecodeError`] if the file is missing, contains no audio
77    /// stream, or uses an unsupported codec.
78    pub async fn build(self) -> Result<AsyncAudioDecoder, DecodeError> {
79        let builder = self.inner;
80        let decoder = tokio::task::spawn_blocking(move || builder.build())
81            .await
82            .map_err(|e| DecodeError::Ffmpeg {
83                code: 0,
84                message: format!("spawn_blocking panicked: {e}"),
85            })??;
86        Ok(AsyncAudioDecoder {
87            inner: AsyncDecoder::new(decoder),
88        })
89    }
90}
91
92/// Async wrapper around [`AudioDecoder`].
93///
94/// `open` and `decode_frame` both execute on a `spawn_blocking` thread so the
95/// Tokio executor is never blocked by `FFmpeg` I/O or decoding work.
96/// Multiple concurrent callers share the inner decoder through `Arc<Mutex<...>>`.
97///
98/// # Examples
99///
100/// ```ignore
101/// use ff_decode::AsyncAudioDecoder;
102/// use futures::StreamExt;
103///
104/// let mut decoder = AsyncAudioDecoder::open("audio.mp3").await?;
105/// while let Some(frame) = decoder.decode_frame().await? {
106///     println!("audio frame with {} samples", frame.samples());
107/// }
108/// ```
109pub struct AsyncAudioDecoder {
110    inner: AsyncDecoder<AudioDecoder>,
111}
112
113impl AsyncAudioDecoder {
114    /// Returns a builder for configuring the async audio decoder.
115    ///
116    /// Use this when you need to control the output sample format, sample rate,
117    /// or channel count. For zero-configuration decoding, prefer
118    /// [`AsyncAudioDecoder::open`].
119    ///
120    /// # Examples
121    ///
122    /// ```ignore
123    /// use ff_decode::AsyncAudioDecoder;
124    /// use ff_format::SampleFormat;
125    ///
126    /// let decoder = AsyncAudioDecoder::builder("audio.mp3")
127    ///     .output_format(SampleFormat::F32)
128    ///     .output_sample_rate(48_000)
129    ///     .build()
130    ///     .await?;
131    /// ```
132    pub fn builder(path: impl AsRef<Path>) -> AsyncAudioDecoderBuilder {
133        AsyncAudioDecoderBuilder::new(path.as_ref().to_path_buf())
134    }
135
136    /// Opens the audio file asynchronously.
137    ///
138    /// File I/O and codec initialisation are performed on a `spawn_blocking`
139    /// thread so the async executor is not blocked.
140    ///
141    /// # Errors
142    ///
143    /// Returns [`DecodeError`] if the file is missing, contains no audio
144    /// stream, or uses an unsupported codec.
145    pub async fn open(path: impl AsRef<Path> + Send + 'static) -> Result<Self, DecodeError> {
146        let path: PathBuf = path.as_ref().to_path_buf();
147        let decoder = tokio::task::spawn_blocking(move || AudioDecoder::open(&path).build())
148            .await
149            .map_err(|e| DecodeError::Ffmpeg {
150                code: 0,
151                message: format!("spawn_blocking panicked: {e}"),
152            })??;
153        Ok(Self {
154            inner: AsyncDecoder::new(decoder),
155        })
156    }
157
158    /// Decodes the next audio frame.
159    ///
160    /// The blocking `FFmpeg` call is offloaded to a `spawn_blocking` thread so
161    /// the Tokio executor is never blocked.
162    ///
163    /// Returns `Ok(None)` at end of stream.
164    ///
165    /// # Errors
166    ///
167    /// Returns [`DecodeError`] on codec or I/O errors.
168    pub async fn decode_frame(&mut self) -> Result<Option<AudioFrame>, DecodeError> {
169        self.inner.with(AudioDecoder::decode_one).await
170    }
171
172    /// Converts this decoder into a [`Stream`] of audio frames.
173    ///
174    /// Decoding is offloaded to a `spawn_blocking` thread on each poll via
175    /// [`Self::decode_frame`]. The stream is `Send` and can be used with
176    /// [`tokio::spawn`].
177    ///
178    /// The stream ends when the file is exhausted (`Ok(None)` from
179    /// `decode_frame`). Errors are yielded as `Err` items; the stream
180    /// terminates after the first error.
181    pub fn into_stream(self) -> impl Stream<Item = Result<AudioFrame, DecodeError>> + Send {
182        stream::unfold(Some(self), |state| async move {
183            let mut decoder = state?; // None → stream already ended
184            match decoder.decode_frame().await {
185                Ok(Some(frame)) => Some((Ok(frame), Some(decoder))),
186                Ok(None) => None,               // EOF → end stream
187                Err(e) => Some((Err(e), None)), // error → yield once, then end
188            }
189        })
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196
197    // Compile-time proof that AsyncAudioDecoder satisfies the Send bound
198    // required by tokio::spawn and other Send-requiring contexts.
199    fn _assert_send() {
200        fn is_send<T: Send>() {}
201        is_send::<AsyncAudioDecoder>();
202    }
203
204    #[tokio::test]
205    async fn async_audio_decoder_should_fail_on_missing_file() {
206        let result = AsyncAudioDecoder::open("/nonexistent/path/audio.mp3").await;
207        assert!(
208            matches!(result, Err(DecodeError::FileNotFound { .. })),
209            "expected FileNotFound"
210        );
211    }
212
213    #[tokio::test]
214    async fn async_audio_decoder_builder_output_format_should_propagate_to_sync_builder() {
215        let result = AsyncAudioDecoder::builder("/nonexistent/path/audio.mp3")
216            .output_format(SampleFormat::F32)
217            .build()
218            .await;
219        assert!(
220            matches!(result, Err(DecodeError::FileNotFound { .. })),
221            "builder with output_format must propagate FileNotFound"
222        );
223    }
224
225    #[tokio::test]
226    async fn async_audio_decoder_builder_output_sample_rate_should_propagate_to_sync_builder() {
227        let result = AsyncAudioDecoder::builder("/nonexistent/path/audio.mp3")
228            .output_sample_rate(48_000)
229            .build()
230            .await;
231        assert!(
232            matches!(result, Err(DecodeError::FileNotFound { .. })),
233            "builder with output_sample_rate must propagate FileNotFound"
234        );
235    }
236
237    /// Verifies the `Option<S>` unfold pattern used by `into_stream`: after the
238    /// error arm sets state to `None`, the stream yields no further items.
239    ///
240    /// Acceptance criterion for issue #1006.
241    #[tokio::test]
242    async fn into_stream_state_machine_should_terminate_after_error() {
243        use futures::StreamExt;
244
245        // Use the exact same stream::unfold(Option<S>, ...) pattern as
246        // into_stream() with a controlled error at position 2.
247        let items: Vec<Result<u32, u32>> = stream::unfold(Some(0u32), |state| async move {
248            let n = state?; // None → stream ends
249            match n {
250                0 | 1 => Some((Ok(n), Some(n + 1))),
251                _ => Some((Err(n), None)), // error → yield once, then end
252            }
253        })
254        .collect()
255        .await;
256
257        assert_eq!(
258            items.len(),
259            3,
260            "stream must stop after the error: expected 2 Ok + 1 Err, got {items:?}"
261        );
262        assert!(items[0].is_ok());
263        assert!(items[1].is_ok());
264        assert!(items[2].is_err());
265    }
266}