use anyhow::{Context, Result, anyhow, bail};
use audioadapter_buffers::direct::SequentialSliceOfVecs;
use rubato::{
Async, FixedAsync, Resampler, SincInterpolationParameters, SincInterpolationType,
WindowFunction,
};
use symphonia::core::audio::{AudioBufferRef, SampleBuffer};
pub const TARGET_SAMPLE_RATE: u32 = 16_000;
pub struct AudioPipeline {
sample_buf_f32: Option<SampleBuffer<f32>>,
resampler: Option<Async<f32>>,
mono_src_acc: Vec<f32>,
resample_in: Vec<Vec<f32>>,
resample_out: Vec<Vec<f32>>,
}
impl Default for AudioPipeline {
fn default() -> Self {
Self::new()
}
}
impl AudioPipeline {
pub fn new() -> Self {
Self {
sample_buf_f32: None,
resampler: None,
mono_src_acc: Vec::new(),
resample_in: vec![Vec::new()],
resample_out: vec![Vec::new()],
}
}
pub fn push_decoded_and_emit(
&mut self,
decoded: &AudioBufferRef<'_>,
target_chunk_frames: usize,
mut emit: impl FnMut(&[f32]) -> Result<bool>,
) -> Result<()> {
let (interleaved, src_rate, channels) =
decoded_to_interleaved_f32(decoded, &mut self.sample_buf_f32)?;
let mono_src = downmix_to_mono(&interleaved, channels);
if src_rate == TARGET_SAMPLE_RATE {
emit_mono_chunks(&mono_src, target_chunk_frames, &mut emit)?;
return Ok(());
}
self.ensure_resampler(src_rate)?;
self.push_and_flush_resampler(&mono_src, target_chunk_frames, &mut emit)?;
Ok(())
}
pub fn finalize(
&mut self,
target_chunk_frames: usize,
mut emit: impl FnMut(&[f32]) -> Result<bool>,
) -> Result<()> {
let Some(rs) = self.resampler.as_mut() else {
return Ok(());
};
if self.mono_src_acc.is_empty() {
return Ok(());
}
let in_next = rs.input_frames_next();
let rem = self.mono_src_acc.len() % in_next;
if rem != 0 {
self.mono_src_acc
.resize(self.mono_src_acc.len() + (in_next - rem), 0.0);
}
while !self.mono_src_acc.is_empty() {
let block: Vec<f32> = self.mono_src_acc.drain(..in_next).collect();
let out = self.resample_block_into_out(&block)?;
emit_mono_chunks(out, target_chunk_frames, &mut emit)?;
}
Ok(())
}
fn ensure_resampler(&mut self, src_rate: u32) -> Result<()> {
if self.resampler.is_some() {
return Ok(());
}
let in_chunk_src_frames = 2048;
let ratio = TARGET_SAMPLE_RATE as f64 / src_rate as f64;
let rs = Async::<f32>::new_sinc(
ratio,
2.0,
&SincInterpolationParameters {
sinc_len: 256,
f_cutoff: 0.95,
interpolation: SincInterpolationType::Linear,
oversampling_factor: 256,
window: WindowFunction::BlackmanHarris2,
},
in_chunk_src_frames,
1, FixedAsync::Input,
)
.map_err(anyhow::Error::new)
.context("failed to init resampler")?;
let out_max = rs.output_frames_max();
self.resample_out[0].resize(out_max, 0.0);
self.resampler = Some(rs);
Ok(())
}
fn push_and_flush_resampler(
&mut self,
mono_src: &[f32],
target_chunk_frames: usize,
emit: &mut impl FnMut(&[f32]) -> Result<bool>,
) -> Result<()> {
self.mono_src_acc.extend_from_slice(mono_src);
loop {
let rs = self
.resampler
.as_ref()
.ok_or_else(|| anyhow!("resampler not initialized"))?;
let in_next = rs.input_frames_next();
if self.mono_src_acc.len() < in_next {
break;
}
let block: Vec<f32> = self.mono_src_acc.drain(..in_next).collect();
let out = self.resample_block_into_out(&block)?;
for chunk in out.chunks(target_chunk_frames) {
if !emit(chunk)? {
return Ok(());
}
}
}
Ok(())
}
fn resample_block_into_out(&mut self, mono_src_block: &[f32]) -> Result<&[f32]> {
let rs = self
.resampler
.as_mut()
.ok_or_else(|| anyhow!("resampler not initialized"))?;
self.resample_in[0].clear();
self.resample_in[0].extend_from_slice(mono_src_block);
let input = SequentialSliceOfVecs::new(&self.resample_in, 1, mono_src_block.len())
.map_err(|e| anyhow!(e))?;
let out_max = rs.output_frames_max();
if self.resample_out[0].len() < out_max {
self.resample_out[0].resize(out_max, 0.0);
}
let mut output = SequentialSliceOfVecs::new_mut(&mut self.resample_out, 1, out_max)
.map_err(|e| anyhow!(e))?;
let (_frames_in, frames_out) = rs
.process_into_buffer(&input, &mut output, None)
.map_err(|e| anyhow!(e))
.context("resampler process failed")?;
Ok(&self.resample_out[0][..frames_out])
}
}
fn decoded_to_interleaved_f32(
decoded: &AudioBufferRef<'_>,
sample_buf_f32: &mut Option<SampleBuffer<f32>>,
) -> Result<(Vec<f32>, u32, usize)> {
ensure_sample_buffer(decoded, sample_buf_f32);
let buf = sample_buf_f32
.as_mut()
.ok_or_else(|| anyhow!("sample buffer not initialized"))?;
buf.copy_interleaved_ref(decoded.clone());
let src_rate = decoded.spec().rate;
let channels = decoded.spec().channels.count();
if channels == 0 {
bail!("decoded audio had zero channels");
}
Ok((buf.samples().to_vec(), src_rate, channels))
}
fn ensure_sample_buffer(
decoded: &AudioBufferRef<'_>,
sample_buf_f32: &mut Option<SampleBuffer<f32>>,
) {
if sample_buf_f32.is_some() {
return;
}
let spec = *decoded.spec();
let duration = decoded.capacity() as u64;
*sample_buf_f32 = Some(SampleBuffer::<f32>::new(duration, spec));
}
fn downmix_to_mono(interleaved: &[f32], channels: usize) -> Vec<f32> {
if channels == 1 {
return interleaved.to_vec();
}
let frames = interleaved.len() / channels;
let mut mono = Vec::with_capacity(frames);
for f in 0..frames {
let base = f * channels;
let mut acc = 0.0;
for c in 0..channels {
acc += interleaved[base + c];
}
mono.push(acc / channels as f32);
}
mono
}
fn emit_mono_chunks(
mono_16k: &[f32],
chunk_frames: usize,
emit: &mut impl FnMut(&[f32]) -> Result<bool>,
) -> Result<()> {
for chunk in mono_16k.chunks(chunk_frames) {
if !emit(chunk)? {
break;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn finalize_is_noop_without_resampler() -> anyhow::Result<()> {
let mut pipeline = AudioPipeline::new();
pipeline.finalize(256, |_| Ok(true))?;
Ok(())
}
#[test]
fn downmix_to_mono_single_channel_is_identity() {
let input = vec![0.0, 1.0, -1.0];
let mono = downmix_to_mono(&input, 1);
assert_eq!(mono, input);
}
#[test]
fn downmix_to_mono_averages_channels() {
let interleaved = vec![1.0, 3.0, -1.0, 1.0];
let mono = downmix_to_mono(&interleaved, 2);
assert_eq!(mono, vec![2.0, 0.0]);
}
#[test]
fn emit_mono_chunks_respects_early_stop() -> anyhow::Result<()> {
let mut seen = Vec::new();
let mono = vec![1.0; 10];
emit_mono_chunks(&mono, 4, &mut |chunk| {
seen.push(chunk.len());
Ok(false)
})?;
assert_eq!(seen, vec![4]);
Ok(())
}
#[test]
fn resample_block_errors_when_resampler_is_missing() {
let mut pipeline = AudioPipeline::new();
let err = pipeline.resample_block_into_out(&[0.0; 16]).unwrap_err();
assert!(err.to_string().contains("resampler not initialized"));
}
#[test]
fn resample_path_emits_and_finalize_flushes_remainder() -> anyhow::Result<()> {
let mut pipeline = AudioPipeline::new();
pipeline.ensure_resampler(8_000)?;
pipeline.ensure_resampler(8_000)?;
let in_max = pipeline
.resampler
.as_ref()
.expect("resampler initialized")
.input_frames_max();
let mono_src = vec![0.0; (in_max * 2) + 7];
let mut emitted_samples = 0usize;
pipeline.push_and_flush_resampler(&mono_src, 256, &mut |chunk| {
emitted_samples += chunk.len();
Ok(true)
})?;
assert!(pipeline.mono_src_acc.len() < in_max);
pipeline.finalize(256, |chunk| {
emitted_samples += chunk.len();
Ok(true)
})?;
assert!(emitted_samples > 0);
Ok(())
}
}