use std::io::Read;
use std::sync::Mutex;
use anyhow::{Context, Result};
use symphonia::core::io::{MediaSource, ReadOnlySource};
use crate::audio_pipeline::AudioPipeline;
use crate::decode::{decode_packet_and_then, make_decoder_for_track};
use crate::demux::{next_packet, probe_source_and_pick_default_track};
pub trait SamplesSink {
fn on_samples(&mut self, samples_16k_mono: &[f32]) -> Result<bool>;
}
#[derive(Debug, Clone)]
pub struct StreamDecodeOpts {
pub target_chunk_frames: usize,
pub hint_extension: Option<String>,
}
impl Default for StreamDecodeOpts {
fn default() -> Self {
Self {
target_chunk_frames: 1024,
hint_extension: None,
}
}
}
pub fn decode_to_stream_from_read<R>(
reader: R,
opts: StreamDecodeOpts,
sink: &mut dyn SamplesSink,
) -> Result<()>
where
R: Read + Send + 'static,
{
let source = ReadOnlySource::new(LockedRead::new(reader));
decode_impl(Box::new(source), opts, sink)
}
fn decode_impl(
source: Box<dyn MediaSource>,
opts: StreamDecodeOpts,
sink: &mut dyn SamplesSink,
) -> Result<()> {
let (mut format, track) =
probe_source_and_pick_default_track(source, opts.hint_extension.as_deref())?;
let mut decoder = make_decoder_for_track(&track)?;
let mut pipeline = AudioPipeline::new();
loop {
let Some(packet) = next_packet(&mut format)? else {
break;
};
if packet.track_id() != track.id {
continue;
}
decode_packet_and_then(&mut decoder, &packet, |decoded| {
pipeline
.push_decoded_and_emit(&decoded, opts.target_chunk_frames, |chunk| {
sink.on_samples(chunk)
})
.context("audio pipeline failed while processing decoded samples")
})?;
}
pipeline
.finalize(opts.target_chunk_frames, |chunk| sink.on_samples(chunk))
.context("audio pipeline failed during finalize")?;
Ok(())
}
struct LockedRead<R> {
inner: Mutex<R>,
}
impl<R> LockedRead<R> {
fn new(inner: R) -> Self {
Self {
inner: Mutex::new(inner),
}
}
}
impl<R: Read> Read for LockedRead<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.inner
.lock()
.map_err(|_| std::io::Error::other("decoder input mutex poisoned"))?
.read(buf)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::Cell;
#[test]
fn decoder_accepts_send_non_sync_readers() {
struct NotSyncReader {
inner: std::io::Cursor<Vec<u8>>,
_marker: Cell<u8>,
}
impl Read for NotSyncReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.inner.read(buf)
}
}
struct NoopSink;
impl SamplesSink for NoopSink {
fn on_samples(&mut self, _samples_16k_mono: &[f32]) -> Result<bool> {
Ok(true)
}
}
let reader = NotSyncReader {
inner: std::io::Cursor::new(Vec::new()),
_marker: Cell::new(0),
};
let res = decode_to_stream_from_read(reader, StreamDecodeOpts::default(), &mut NoopSink);
assert!(res.is_err());
}
}