use std::path::{Path, PathBuf};
use ff_format::{AudioFrame, SampleFormat};
use futures::stream::{self, Stream};
use crate::async_decoder::AsyncDecoder;
use crate::audio::builder::{AudioDecoder, AudioDecoderBuilder};
use crate::error::DecodeError;
pub struct AsyncAudioDecoderBuilder {
inner: AudioDecoderBuilder,
}
impl AsyncAudioDecoderBuilder {
fn new(path: PathBuf) -> Self {
Self {
inner: AudioDecoderBuilder::new(path),
}
}
#[must_use]
pub fn output_format(mut self, format: SampleFormat) -> Self {
self.inner = self.inner.output_format(format);
self
}
#[must_use]
pub fn output_sample_rate(mut self, sample_rate: u32) -> Self {
self.inner = self.inner.output_sample_rate(sample_rate);
self
}
#[must_use]
pub fn output_channels(mut self, channels: u32) -> Self {
self.inner = self.inner.output_channels(channels);
self
}
pub async fn build(self) -> Result<AsyncAudioDecoder, DecodeError> {
let builder = self.inner;
let decoder = tokio::task::spawn_blocking(move || builder.build())
.await
.map_err(|e| DecodeError::Ffmpeg {
code: 0,
message: format!("spawn_blocking panicked: {e}"),
})??;
Ok(AsyncAudioDecoder {
inner: AsyncDecoder::new(decoder),
})
}
}
pub struct AsyncAudioDecoder {
inner: AsyncDecoder<AudioDecoder>,
}
impl AsyncAudioDecoder {
pub fn builder(path: impl AsRef<Path>) -> AsyncAudioDecoderBuilder {
AsyncAudioDecoderBuilder::new(path.as_ref().to_path_buf())
}
pub async fn open(path: impl AsRef<Path> + Send + 'static) -> Result<Self, DecodeError> {
let path: PathBuf = path.as_ref().to_path_buf();
let decoder = tokio::task::spawn_blocking(move || AudioDecoder::open(&path).build())
.await
.map_err(|e| DecodeError::Ffmpeg {
code: 0,
message: format!("spawn_blocking panicked: {e}"),
})??;
Ok(Self {
inner: AsyncDecoder::new(decoder),
})
}
pub async fn decode_frame(&mut self) -> Result<Option<AudioFrame>, DecodeError> {
self.inner.with(AudioDecoder::decode_one).await
}
pub fn into_stream(self) -> impl Stream<Item = Result<AudioFrame, DecodeError>> + Send {
stream::unfold(Some(self), |state| async move {
let mut decoder = state?; match decoder.decode_frame().await {
Ok(Some(frame)) => Some((Ok(frame), Some(decoder))),
Ok(None) => None, Err(e) => Some((Err(e), None)), }
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn _assert_send() {
fn is_send<T: Send>() {}
is_send::<AsyncAudioDecoder>();
}
#[tokio::test]
async fn async_audio_decoder_should_fail_on_missing_file() {
let result = AsyncAudioDecoder::open("/nonexistent/path/audio.mp3").await;
assert!(
matches!(result, Err(DecodeError::FileNotFound { .. })),
"expected FileNotFound"
);
}
#[tokio::test]
async fn async_audio_decoder_builder_output_format_should_propagate_to_sync_builder() {
let result = AsyncAudioDecoder::builder("/nonexistent/path/audio.mp3")
.output_format(SampleFormat::F32)
.build()
.await;
assert!(
matches!(result, Err(DecodeError::FileNotFound { .. })),
"builder with output_format must propagate FileNotFound"
);
}
#[tokio::test]
async fn async_audio_decoder_builder_output_sample_rate_should_propagate_to_sync_builder() {
let result = AsyncAudioDecoder::builder("/nonexistent/path/audio.mp3")
.output_sample_rate(48_000)
.build()
.await;
assert!(
matches!(result, Err(DecodeError::FileNotFound { .. })),
"builder with output_sample_rate must propagate FileNotFound"
);
}
#[tokio::test]
async fn into_stream_state_machine_should_terminate_after_error() {
use futures::StreamExt;
let items: Vec<Result<u32, u32>> = stream::unfold(Some(0u32), |state| async move {
let n = state?; match n {
0 | 1 => Some((Ok(n), Some(n + 1))),
_ => Some((Err(n), None)), }
})
.collect()
.await;
assert_eq!(
items.len(),
3,
"stream must stop after the error: expected 2 Ok + 1 Err, got {items:?}"
);
assert!(items[0].is_ok());
assert!(items[1].is_ok());
assert!(items[2].is_err());
}
}