use crate::audio_info::*;
use melodium_core::*;
use melodium_macro::mel_treatment;
#[cfg(feature = "real")]
use {
crate::channels::*,
async_channel::bounded,
std::sync::Arc,
symphonia::core::{
audio::AudioBuffer,
codecs::DecoderOptions,
errors::Error,
formats::FormatOptions,
io::{MediaSourceStream, MediaSourceStreamOptions},
meta::MetadataOptions,
probe::Hint,
},
};
#[mel_treatment(
input data Stream<byte>
output signal Stream<f32>
output info Block<AudioInfo>
output failed Block<void>
output errors Stream<string>
)]
pub async fn decode_mono(hint: Option<string>) {
#[cfg(feature = "real")]
{
let (byte_sender, byte_receiver) = bounded::<Vec<u8>>(1024);
let (mono_sender, mono_receiver) = bounded::<Vec<f32>>(64);
let (err_sender, err_receiver) = bounded::<(bool, String)>(64);
let (info_sender, info_receiver) = bounded::<AudioInfo>(1);
let mut symphonia_hint = Hint::new();
if let Some(ref h) = hint {
symphonia_hint.with_extension(h.as_str());
}
let decode_fut = async move {
let _ = async_std::task::spawn_blocking(move || {
let _ = &info_sender; let mss = MediaSourceStream::new(
Box::new(ChannelReaderMediaSource::new(byte_receiver)),
MediaSourceStreamOptions::default(),
);
let probe = symphonia::default::get_probe();
let probe_result = match probe.format(
&symphonia_hint,
mss,
&FormatOptions::default(),
&MetadataOptions::default(),
) {
Ok(r) => r,
Err(e) => {
let _ = async_std::task::block_on(
err_sender.send((true, format!("format detection failed: {e}"))),
);
return;
}
};
let mut format_reader = probe_result.format;
let track = match format_reader.default_track() {
Some(t) => t.clone(),
None => {
let _ = async_std::task::block_on(
err_sender.send((true, "no audio track found".to_string())),
);
return;
}
};
let codec_registry = symphonia::default::get_codecs();
let mut decoder =
match codec_registry.make(&track.codec_params, &DecoderOptions::default()) {
Ok(d) => d,
Err(e) => {
let _ = async_std::task::block_on(
err_sender.send((true, format!("unsupported codec: {e}"))),
);
return;
}
};
let duration_seconds = track
.codec_params
.time_base
.zip(track.codec_params.n_frames)
.map(|(tb, n)| {
let t = tb.calc_time(n);
t.seconds as f64 + t.frac
});
let audio_info = AudioInfo {
codec: format!("{}", track.codec_params.codec),
channels: track
.codec_params
.channels
.map(|c| c.count() as u32)
.unwrap_or(0),
sample_rate: track.codec_params.sample_rate.unwrap_or(0),
duration_seconds,
};
let _ = async_std::task::block_on(info_sender.send(audio_info));
let mut f32_buf: AudioBuffer<f32> = AudioBuffer::unused();
loop {
let packet = match format_reader.next_packet() {
Ok(p) => p,
Err(Error::IoError(e)) => {
let _ = async_std::task::block_on(
err_sender.send((true, format!("I/O error: {e}"))),
);
break;
}
Err(Error::ResetRequired) => break,
Err(Error::DecodeError(e)) => {
let _ = async_std::task::block_on(
err_sender.send((false, format!("packet skipped: {e}"))),
);
continue;
}
Err(e) => {
let _ = async_std::task::block_on(
err_sender.send((true, format!("demux error: {e}"))),
);
break;
}
};
let audio_buf = match decoder.decode(&packet) {
Ok(buf) => buf,
Err(Error::DecodeError(e)) => {
let _ = async_std::task::block_on(
err_sender.send((false, format!("packet skipped: {e}"))),
);
continue;
}
Err(e) => {
let _ = async_std::task::block_on(
err_sender.send((true, format!("decode error: {e}"))),
);
break;
}
};
audio_buf.convert(&mut f32_buf);
let planes = f32_buf.planes();
let channel_slices = planes.planes();
let num_channels = channel_slices.len();
if num_channels == 0 {
continue;
}
let num_frames = channel_slices[0].len();
let mono: Vec<f32> = if num_channels == 1 {
channel_slices[0].to_vec()
} else {
let inv = 1.0_f32 / num_channels as f32;
(0..num_frames)
.map(|frame| {
channel_slices.iter().map(|ch| ch[frame]).sum::<f32>() * inv
})
.collect()
};
if async_std::task::block_on(mono_sender.send(mono)).is_err() {
break;
}
}
})
.await;
};
let feed_fut = async move {
while let Ok(chunk) = data
.recv_many()
.await
.map(|values| TryInto::<Vec<u8>>::try_into(values).unwrap())
{
if byte_sender.send(chunk).await.is_err() {
break;
}
}
};
let forward_fut = async {
while let Ok(mono) = mono_receiver.recv().await {
let batch: VecDeque<f32> = mono.into_iter().collect();
if signal.send_many(batch.into()).await.is_err() {
break;
}
}
};
let info_fut = async {
if let Ok(audio_info) = info_receiver.recv().await {
let _ = info.send_one(Value::Data(Arc::new(audio_info))).await;
}
};
let error_fut = async {
while let Ok((fatal, msg)) = err_receiver.recv().await {
let _ = errors.send_one(msg.into()).await;
if fatal {
let _ = failed.send_one(().into()).await;
break;
}
}
};
futures::join!(decode_fut, feed_fut, forward_fut, info_fut, error_fut);
}
#[cfg(feature = "mock")]
{
while data.recv_many().await.is_ok() {}
let _ = errors
.send_one(
"audio decoding is not available in this build"
.to_string()
.into(),
)
.await;
let _ = failed.send_one(().into()).await;
}
}