use crate::audio::sample_source::ChannelMappedSampleSource;
use parking_lot::{Mutex, RwLock};
use std::cell::RefCell;
use std::collections::HashMap;
#[cfg(test)]
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
#[cfg(test)]
use std::time::Instant;
use tracing::debug;
thread_local! {
static BATCH_READ_SCRATCH: RefCell<Vec<f32>> = RefCell::new(vec![0.0; 8192]);
static SOURCES_SCRATCH: RefCell<Vec<Arc<Mutex<ActiveSource>>>> = const { RefCell::new(Vec::new()) };
}
#[derive(Clone)]
pub struct AudioMixer {
active_sources: Arc<RwLock<Vec<Arc<Mutex<ActiveSource>>>>>,
num_channels: u16,
sample_rate: u32,
sample_counter: Arc<AtomicU64>,
#[cfg(test)]
frame_count: Arc<AtomicUsize>,
#[cfg(test)]
total_frame_time: Arc<AtomicUsize>, #[cfg(test)]
max_frame_time: Arc<AtomicUsize>, }
pub struct ActiveSource {
pub id: u64,
pub source: Box<dyn ChannelMappedSampleSource + Send + Sync>,
pub track_mappings: HashMap<String, Vec<u16>>,
pub channel_mappings: Vec<Vec<usize>>,
pub cached_source_channel_count: u16,
pub is_finished: Arc<AtomicBool>,
pub cancel_handle: crate::playsync::CancelHandle,
pub start_at_sample: Option<u64>,
pub cancel_at_sample: Option<Arc<std::sync::atomic::AtomicU64>>,
pub gain_envelope: Option<Arc<crate::audio::crossfade::GainEnvelope>>,
}
impl AudioMixer {
pub fn new(num_channels: u16, sample_rate: u32) -> Self {
Self {
active_sources: Arc::new(RwLock::new(Vec::new())),
num_channels,
sample_rate,
sample_counter: Arc::new(AtomicU64::new(0)),
#[cfg(test)]
frame_count: Arc::new(AtomicUsize::new(0)),
#[cfg(test)]
total_frame_time: Arc::new(AtomicUsize::new(0)),
#[cfg(test)]
max_frame_time: Arc::new(AtomicUsize::new(0)),
}
}
pub fn current_sample(&self) -> u64 {
self.sample_counter.load(Ordering::Relaxed)
}
pub fn sample_counter(&self) -> Arc<AtomicU64> {
self.sample_counter.clone()
}
fn precompute_channel_mappings(
source: &dyn ChannelMappedSampleSource,
track_mappings: &HashMap<String, Vec<u16>>,
) -> Vec<Vec<usize>> {
let source_channel_count = source.source_channel_count() as usize;
let mut channel_mappings = Vec::with_capacity(source_channel_count);
for source_channel in 0..source_channel_count {
let mut output_channels = Vec::new();
if let Some(labels) = source.channel_mappings().get(source_channel) {
for label in labels {
if let Some(track_channels) = track_mappings.get(label) {
for &track_channel in track_channels {
let output_index = (track_channel - 1) as usize;
output_channels.push(output_index);
}
}
}
}
channel_mappings.push(output_channels);
}
channel_mappings
}
pub fn add_source(&self, mut source: ActiveSource) {
if source.cached_source_channel_count == 0 {
source.cached_source_channel_count = source.source.source_channel_count();
}
let channel_mappings =
Self::precompute_channel_mappings(source.source.as_ref(), &source.track_mappings);
source.channel_mappings = channel_mappings;
let mut sources = self.active_sources.write();
sources.push(Arc::new(Mutex::new(source)));
}
pub fn remove_sources(&self, source_ids: &[u64]) {
let mut sources = self.active_sources.write();
sources.retain(|source| {
let source_guard = source.lock();
!source_ids.contains(&source_guard.id)
});
}
pub fn set_gain_envelope(
&self,
source_ids: &[u64],
envelope: Arc<crate::audio::crossfade::GainEnvelope>,
) {
let sources = self.active_sources.read();
for source_arc in sources.iter() {
let mut source = source_arc.lock();
if source_ids.contains(&source.id) {
let own = Arc::new(crate::audio::crossfade::GainEnvelope::new(
envelope.start_gain(),
envelope.end_gain(),
envelope.duration_samples(),
envelope.curve(),
));
source.gain_envelope = Some(own);
}
}
}
#[cfg(test)]
pub fn process_frame(&self) -> Vec<f32> {
#[cfg(test)]
let start_time = Instant::now();
let mut frame = vec![0.0f32; self.num_channels as usize];
let sources_to_process = {
let sources = self.active_sources.read();
sources.clone()
};
let mut finished_source_ids = Vec::new();
let mut source_frame_buffer = vec![0.0f32; 64];
for active_source_arc in sources_to_process {
let mut active_source = active_source_arc.lock();
if active_source.is_finished.load(Ordering::Relaxed)
|| active_source.cancel_handle.is_cancelled()
{
finished_source_ids.push(active_source.id);
continue;
}
let source_channel_count = active_source.cached_source_channel_count as usize;
if source_frame_buffer.len() < source_channel_count {
source_frame_buffer.resize(source_channel_count, 0.0);
}
match active_source
.source
.next_frame(&mut source_frame_buffer[..source_channel_count])
{
Ok(Some(_count)) => {
for (source_channel, &sample) in source_frame_buffer[..source_channel_count]
.iter()
.enumerate()
{
if let Some(output_channels) =
active_source.channel_mappings.get(source_channel)
{
for &output_index in output_channels {
if output_index < frame.len() {
frame[output_index] += sample;
}
}
}
}
}
Ok(None) => {
if active_source.source.is_exhausted().unwrap_or(true) {
active_source.is_finished.store(true, Ordering::Relaxed);
finished_source_ids.push(active_source.id);
}
}
Err(_) => {
active_source.is_finished.store(true, Ordering::Relaxed);
finished_source_ids.push(active_source.id);
}
}
}
if !finished_source_ids.is_empty() {
self.remove_sources(&finished_source_ids);
}
#[cfg(test)]
{
let frame_time = start_time.elapsed();
let frame_time_us = frame_time.as_micros() as usize;
self.frame_count.fetch_add(1, Ordering::Relaxed);
self.total_frame_time
.fetch_add(frame_time_us, Ordering::Relaxed);
let mut current_max = self.max_frame_time.load(Ordering::Relaxed);
while frame_time_us > current_max {
match self.max_frame_time.compare_exchange_weak(
current_max,
frame_time_us,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => current_max = x,
}
}
}
frame
}
#[cfg(test)]
pub fn process_frames(&self, num_frames: usize) -> Vec<f32> {
let mut frames = Vec::with_capacity(num_frames * self.num_channels as usize);
for _ in 0..num_frames {
let frame = self.process_frame();
frames.extend(frame);
}
frames
}
pub fn process_into_output(&self, output: &mut [f32], num_frames: usize) {
let channels = self.num_channels as usize;
debug_assert_eq!(output.len(), num_frames * channels);
let current_sample = self.sample_counter.load(Ordering::Relaxed);
let buffer_end_sample = current_sample + num_frames as u64;
output.fill(0.0);
let mut sources_to_process =
SOURCES_SCRATCH.with(|cell| std::mem::take(&mut *cell.borrow_mut()));
sources_to_process.clear();
{
let sources = self.active_sources.read();
sources_to_process.extend(sources.iter().cloned());
}
let mut finished_source_ids: Vec<u64> = Vec::new();
for active_source_arc in sources_to_process.iter() {
let mut active_source = active_source_arc.lock();
let has_active_fade_out = active_source
.gain_envelope
.as_ref()
.is_some_and(|env| env.end_gain() == 0.0 && !env.is_finished());
if active_source.is_finished.load(Ordering::Relaxed)
|| (active_source.cancel_handle.is_cancelled() && !has_active_fade_out)
{
debug!(
source_id = active_source.id,
reason = if active_source.is_finished.load(Ordering::Relaxed) {
"already_finished"
} else {
"cancel_handle_cancelled"
},
"mixer: source marked finished (skip)"
);
finished_source_ids.push(active_source.id);
continue;
}
if let Some(ref cancel_at) = active_source.cancel_at_sample {
let cancel_sample = cancel_at.load(Ordering::Relaxed);
if cancel_sample > 0 && current_sample >= cancel_sample {
debug!(
source_id = active_source.id,
cancel_sample,
current_sample,
"mixer: source marked finished (cancel_at_sample reached)"
);
active_source.is_finished.store(true, Ordering::Relaxed);
finished_source_ids.push(active_source.id);
continue;
}
}
let start_frame = if let Some(start_at) = active_source.start_at_sample {
if start_at >= buffer_end_sample {
continue;
}
if start_at > current_sample {
(start_at - current_sample) as usize
} else {
0 }
} else {
0 };
let end_frame = if let Some(ref cancel_at) = active_source.cancel_at_sample {
let cancel_sample = cancel_at.load(Ordering::Relaxed);
if cancel_sample > 0
&& cancel_sample > current_sample
&& cancel_sample < buffer_end_sample
{
(cancel_sample - current_sample) as usize
} else {
num_frames
}
} else {
num_frames
};
if end_frame <= start_frame {
continue;
}
let source_channel_count = active_source.cached_source_channel_count as usize;
let frames_needed = end_frame - start_frame;
BATCH_READ_SCRATCH.with(|cell| {
let mut buf = cell.borrow_mut();
let batch_samples = frames_needed * source_channel_count;
if buf.len() < batch_samples {
buf.resize(batch_samples, 0.0);
}
let batch_buf = &mut buf[..batch_samples];
match active_source.source.read_frames(batch_buf, frames_needed) {
Ok(frames_got) => {
let gain = active_source
.gain_envelope
.as_ref()
.map(|env| env.advance(frames_got as u64))
.unwrap_or(1.0);
for frame_idx in 0..frames_got {
let src_offset = frame_idx * source_channel_count;
let dst_base = (start_frame + frame_idx) * channels;
for (source_channel, &sample) in batch_buf
[src_offset..src_offset + source_channel_count]
.iter()
.enumerate()
{
if let Some(output_channels) =
active_source.channel_mappings.get(source_channel)
{
for &output_index in output_channels {
if output_index < channels {
output[dst_base + output_index] += sample * gain;
}
}
}
}
}
if let Some(ref env) = active_source.gain_envelope {
if env.is_finished() && env.end_gain() == 0.0 {
debug!(
source_id = active_source.id,
"mixer: source marked finished (fade-out envelope completed)"
);
active_source.is_finished.store(true, Ordering::Relaxed);
finished_source_ids.push(active_source.id);
}
}
if frames_got < frames_needed {
if active_source.source.is_exhausted().unwrap_or(true) {
debug!(
source_id = active_source.id,
"mixer: source marked finished (read_frames returned fewer frames)"
);
active_source.is_finished.store(true, Ordering::Relaxed);
finished_source_ids.push(active_source.id);
}
}
}
Err(_) => {
debug!(
source_id = active_source.id,
"mixer: source marked finished (read_frames error)"
);
active_source.is_finished.store(true, Ordering::Relaxed);
finished_source_ids.push(active_source.id);
}
}
});
}
self.sample_counter
.fetch_add(num_frames as u64, Ordering::Relaxed);
if !finished_source_ids.is_empty() {
debug!(
source_ids = ?finished_source_ids,
remaining_before = self.active_sources.read().len(),
"mixer: removing finished sources"
);
self.remove_sources(&finished_source_ids);
}
sources_to_process.clear();
SOURCES_SCRATCH.with(|cell| {
*cell.borrow_mut() = sources_to_process;
});
}
pub fn num_channels(&self) -> u16 {
self.num_channels
}
pub fn sample_rate(&self) -> u32 {
self.sample_rate
}
pub fn get_active_sources(&self) -> Arc<RwLock<Vec<Arc<Mutex<ActiveSource>>>>> {
self.active_sources.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::audio::sample_source::ChannelMappedSampleSource;
use crate::playsync::CancelHandle;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
fn create_test_source(
samples: Vec<f32>,
channel_count: u16,
mappings: Vec<Vec<String>>,
) -> Box<dyn ChannelMappedSampleSource> {
let memory_source =
crate::audio::sample_source::MemorySampleSource::new(samples, channel_count, 44100);
Box::new(crate::audio::sample_source::ChannelMappedSource::new(
Box::new(memory_source),
mappings,
channel_count,
))
}
fn make_active_source(
id: u64,
source: Box<dyn ChannelMappedSampleSource>,
track_mappings: HashMap<String, Vec<u16>>,
) -> ActiveSource {
ActiveSource {
id,
source,
track_mappings,
channel_mappings: Vec::new(),
cached_source_channel_count: 0,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: None,
gain_envelope: None,
}
}
#[test]
fn test_basic_mixing() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.5, 0.8]; let source = create_test_source(samples, 1, vec![vec!["test".to_string()]]);
let active_source = ActiveSource {
id: 1,
source,
track_mappings: {
let mut map = HashMap::new();
map.insert("test".to_string(), vec![1]); map
},
channel_mappings: Vec::new(), cached_source_channel_count: 1,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: None,
gain_envelope: None,
};
mixer.add_source(active_source);
let frames = mixer.process_frames(2);
assert_eq!(frames.len(), 4); assert_eq!(frames[0], 0.5); assert_eq!(frames[1], 0.0); assert_eq!(frames[2], 0.8); assert_eq!(frames[3], 0.0); }
#[test]
fn test_multiple_source_mixing() {
let mixer = AudioMixer::new(2, 44100);
let source1 = create_test_source(
vec![0.5, 0.3],
2,
vec![vec!["ch0".to_string()], vec!["ch1".to_string()]],
);
let source2 = create_test_source(
vec![0.2, 0.1],
2,
vec![vec!["ch0".to_string()], vec!["ch1".to_string()]],
);
let active_source1 = ActiveSource {
id: 1,
source: source1,
track_mappings: {
let mut map = HashMap::new();
map.insert("ch0".to_string(), vec![1]);
map.insert("ch1".to_string(), vec![2]);
map
},
channel_mappings: Vec::new(), cached_source_channel_count: 2,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: None,
gain_envelope: None,
};
let active_source2 = ActiveSource {
id: 2,
source: source2,
track_mappings: {
let mut map = HashMap::new();
map.insert("ch0".to_string(), vec![1]);
map.insert("ch1".to_string(), vec![2]);
map
},
channel_mappings: Vec::new(), cached_source_channel_count: 2,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: None,
gain_envelope: None,
};
mixer.add_source(active_source1);
mixer.add_source(active_source2);
let frame = mixer.process_frame();
assert_eq!(frame.len(), 2);
assert_eq!(frame[0], 0.7); assert_eq!(frame[1], 0.4); }
#[test]
fn test_32_channel_mixing() {
let mixer = AudioMixer::new(32, 44100);
let mut samples = vec![0.0; 64]; samples[0] = 0.5; samples[1] = 0.3; samples[32] = 0.8; samples[33] = 0.2;
let source = create_test_source(samples, 32, {
let mut mappings = vec![vec![]; 32];
mappings[0] = vec!["ch0".to_string()];
mappings[1] = vec!["ch1".to_string()];
mappings
});
let active_source = ActiveSource {
id: 1,
source,
track_mappings: {
let mut map = HashMap::new();
map.insert("ch0".to_string(), vec![1]); map.insert("ch1".to_string(), vec![2]); map
},
channel_mappings: Vec::new(), cached_source_channel_count: 32,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: None,
gain_envelope: None,
};
mixer.add_source(active_source);
let frames = mixer.process_frames(2);
assert_eq!(frames.len(), 64); assert_eq!(frames[0], 0.5); assert_eq!(frames[1], 0.3); assert_eq!(frames[32], 0.8); assert_eq!(frames[33], 0.2);
for frame in frames.iter().take(32).skip(2) {
assert_eq!(*frame, 0.0);
}
for frame in frames.iter().take(64).skip(34) {
assert_eq!(*frame, 0.0);
}
}
#[test]
fn test_cancel_before_start_does_not_panic() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.5; 1024]; let source = create_test_source(samples, 1, vec![vec!["test".to_string()]]);
let cancel_at = Arc::new(AtomicU64::new(200));
let active_source = ActiveSource {
id: 1,
source,
track_mappings: {
let mut map = HashMap::new();
map.insert("test".to_string(), vec![1]);
map
},
channel_mappings: Vec::new(),
cached_source_channel_count: 1,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: Some(400),
cancel_at_sample: Some(cancel_at),
gain_envelope: None,
};
mixer.add_source(active_source);
let mut output = vec![0.0f32; 512 * 2];
mixer.process_into_output(&mut output, 512);
for &sample in &output {
assert_eq!(sample, 0.0);
}
let mut output2 = vec![0.0f32; 512 * 2];
mixer.process_into_output(&mut output2, 512);
for &sample in &output2 {
assert_eq!(sample, 0.0);
}
assert_eq!(mixer.active_sources.read().len(), 0);
}
#[test]
fn test_process_into_output_basic() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.5, 0.8, 0.3, 0.6]; let source = create_test_source(samples, 1, vec![vec!["test".to_string()]]);
let active_source = ActiveSource {
id: 1,
source,
track_mappings: {
let mut map = HashMap::new();
map.insert("test".to_string(), vec![1]);
map
},
channel_mappings: Vec::new(),
cached_source_channel_count: 1,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: None,
gain_envelope: None,
};
mixer.add_source(active_source);
let mut output = vec![0.0f32; 4 * 2]; mixer.process_into_output(&mut output, 4);
assert_eq!(output[0], 0.5); assert_eq!(output[1], 0.0); assert_eq!(output[2], 0.8); assert_eq!(output[3], 0.0); assert_eq!(output[4], 0.3); assert_eq!(output[5], 0.0); assert_eq!(output[6], 0.6); assert_eq!(output[7], 0.0); }
#[test]
fn test_process_into_output_start_at_sample_mid_buffer() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.1, 0.2, 0.3, 0.4];
let source = create_test_source(samples, 1, vec![vec!["test".to_string()]]);
let active_source = ActiveSource {
id: 1,
source,
track_mappings: {
let mut map = HashMap::new();
map.insert("test".to_string(), vec![1]);
map
},
channel_mappings: Vec::new(),
cached_source_channel_count: 1,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: Some(2), cancel_at_sample: None,
gain_envelope: None,
};
mixer.add_source(active_source);
let mut output = vec![0.0f32; 4 * 2];
mixer.process_into_output(&mut output, 4);
assert_eq!(output[0], 0.0); assert_eq!(output[1], 0.0); assert_eq!(output[2], 0.0); assert_eq!(output[3], 0.0); assert_eq!(output[4], 0.1); assert_eq!(output[5], 0.0); assert_eq!(output[6], 0.2); assert_eq!(output[7], 0.0); }
#[test]
fn test_process_into_output_cancel_at_sample_mid_buffer() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8];
let source = create_test_source(samples, 1, vec![vec!["test".to_string()]]);
let cancel_at = Arc::new(AtomicU64::new(3)); let active_source = ActiveSource {
id: 1,
source,
track_mappings: {
let mut map = HashMap::new();
map.insert("test".to_string(), vec![1]);
map
},
channel_mappings: Vec::new(),
cached_source_channel_count: 1,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: Some(cancel_at),
gain_envelope: None,
};
mixer.add_source(active_source);
let mut output = vec![0.0f32; 8 * 2];
mixer.process_into_output(&mut output, 8);
assert_eq!(output[0], 0.1); assert_eq!(output[2], 0.2); assert_eq!(output[4], 0.3); assert_eq!(output[6], 0.0); assert_eq!(output[8], 0.0); assert_eq!(output[10], 0.0); }
#[test]
fn test_process_into_output_source_finishes_before_buffer_end() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.7, 0.9]; let source = create_test_source(samples, 1, vec![vec!["test".to_string()]]);
let active_source = ActiveSource {
id: 1,
source,
track_mappings: {
let mut map = HashMap::new();
map.insert("test".to_string(), vec![1]);
map
},
channel_mappings: Vec::new(),
cached_source_channel_count: 1,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: None,
gain_envelope: None,
};
mixer.add_source(active_source);
let mut output = vec![0.0f32; 8 * 2];
mixer.process_into_output(&mut output, 8);
assert_eq!(output[0], 0.7); assert_eq!(output[1], 0.0); assert_eq!(output[2], 0.9); assert_eq!(output[3], 0.0); for (i, val) in output.iter().enumerate().skip(4) {
assert_eq!(*val, 0.0, "output[{i}] should be silence");
}
assert_eq!(mixer.active_sources.read().len(), 0);
}
#[test]
fn test_process_into_output_multiple_sources() {
let mixer = AudioMixer::new(2, 44100);
let source1 = create_test_source(
vec![0.5, 0.3],
2,
vec![vec!["ch0".to_string()], vec!["ch1".to_string()]],
);
let source2 = create_test_source(
vec![0.2, 0.1],
2,
vec![vec!["ch0".to_string()], vec!["ch1".to_string()]],
);
let active_source1 = ActiveSource {
id: 1,
source: source1,
track_mappings: {
let mut map = HashMap::new();
map.insert("ch0".to_string(), vec![1]);
map.insert("ch1".to_string(), vec![2]);
map
},
channel_mappings: Vec::new(),
cached_source_channel_count: 2,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: None,
gain_envelope: None,
};
let active_source2 = ActiveSource {
id: 2,
source: source2,
track_mappings: {
let mut map = HashMap::new();
map.insert("ch0".to_string(), vec![1]);
map.insert("ch1".to_string(), vec![2]);
map
},
channel_mappings: Vec::new(),
cached_source_channel_count: 2,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: None,
gain_envelope: None,
};
mixer.add_source(active_source1);
mixer.add_source(active_source2);
let mut output = vec![0.0f32; 2]; mixer.process_into_output(&mut output, 1);
assert!((output[0] - 0.7).abs() < 1e-6); assert!((output[1] - 0.4).abs() < 1e-6); }
#[test]
fn test_process_into_output_start_and_cancel_mid_buffer() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8];
let source = create_test_source(samples, 1, vec![vec!["test".to_string()]]);
let cancel_at = Arc::new(AtomicU64::new(6)); let active_source = ActiveSource {
id: 1,
source,
track_mappings: {
let mut map = HashMap::new();
map.insert("test".to_string(), vec![1]);
map
},
channel_mappings: Vec::new(),
cached_source_channel_count: 1,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: Some(2), cancel_at_sample: Some(cancel_at),
gain_envelope: None,
};
mixer.add_source(active_source);
let mut output = vec![0.0f32; 8 * 2];
mixer.process_into_output(&mut output, 8);
assert_eq!(output[0], 0.0);
assert_eq!(output[2], 0.0);
assert_eq!(output[4], 0.1); assert_eq!(output[6], 0.2); assert_eq!(output[8], 0.3); assert_eq!(output[10], 0.4); assert_eq!(output[12], 0.0);
assert_eq!(output[14], 0.0);
}
struct UnderrunSimSource {
inner: Box<dyn ChannelMappedSampleSource>,
underrun_calls_remaining: usize,
exhausted: AtomicBool,
}
impl UnderrunSimSource {
fn new(inner: Box<dyn ChannelMappedSampleSource>, underrun_calls: usize) -> Self {
Self {
inner,
underrun_calls_remaining: underrun_calls,
exhausted: AtomicBool::new(false),
}
}
}
impl ChannelMappedSampleSource for UnderrunSimSource {
fn next_sample(
&mut self,
) -> Result<Option<f32>, crate::audio::sample_source::error::SampleSourceError> {
self.inner.next_sample()
}
fn next_frame(
&mut self,
output: &mut [f32],
) -> Result<Option<usize>, crate::audio::sample_source::error::SampleSourceError> {
self.inner.next_frame(output)
}
fn read_frames(
&mut self,
output: &mut [f32],
max_frames: usize,
) -> Result<usize, crate::audio::sample_source::error::SampleSourceError> {
if self.underrun_calls_remaining > 0 {
self.underrun_calls_remaining -= 1;
return Ok(0); }
let got = self.inner.read_frames(output, max_frames)?;
if got < max_frames {
self.exhausted.store(true, Ordering::Relaxed);
}
Ok(got)
}
fn channel_mappings(&self) -> &[Vec<String>] {
self.inner.channel_mappings()
}
fn source_channel_count(&self) -> u16 {
self.inner.source_channel_count()
}
fn is_exhausted(&self) -> Option<bool> {
Some(self.exhausted.load(Ordering::Relaxed))
}
}
#[test]
fn test_mixer_keeps_source_alive_during_transient_underrun() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.5, 0.8, 0.3, 0.6]; let inner = create_test_source(samples, 1, vec![vec!["test".to_string()]]);
let source: Box<dyn ChannelMappedSampleSource + Send + Sync> =
Box::new(UnderrunSimSource::new(inner, 1));
let is_finished = Arc::new(AtomicBool::new(false));
let active_source = ActiveSource {
id: 1,
source,
track_mappings: {
let mut map = HashMap::new();
map.insert("test".to_string(), vec![1]);
map
},
channel_mappings: Vec::new(),
cached_source_channel_count: 1,
is_finished: is_finished.clone(),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: None,
gain_envelope: None,
};
mixer.add_source(active_source);
let mut output = vec![0.0f32; 4 * 2];
mixer.process_into_output(&mut output, 4);
for &sample in &output {
assert_eq!(sample, 0.0, "underrun callback should produce silence");
}
assert!(
!is_finished.load(Ordering::Relaxed),
"source must not be marked finished on transient underrun"
);
assert_eq!(
mixer.active_sources.read().len(),
1,
"source must remain active"
);
let mut output2 = vec![0.0f32; 4 * 2];
mixer.process_into_output(&mut output2, 4);
assert_eq!(output2[0], 0.5); assert_eq!(output2[2], 0.8); assert_eq!(output2[4], 0.3); assert_eq!(output2[6], 0.6); }
#[test]
fn test_mixer_finishes_source_when_truly_exhausted() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.5, 0.8]; let inner = create_test_source(samples, 1, vec![vec!["test".to_string()]]);
let source: Box<dyn ChannelMappedSampleSource + Send + Sync> =
Box::new(UnderrunSimSource::new(inner, 0));
let active_source = ActiveSource {
id: 1,
source,
track_mappings: {
let mut map = HashMap::new();
map.insert("test".to_string(), vec![1]);
map
},
channel_mappings: Vec::new(),
cached_source_channel_count: 1,
is_finished: Arc::new(AtomicBool::new(false)),
cancel_handle: CancelHandle::new(),
start_at_sample: None,
cancel_at_sample: None,
gain_envelope: None,
};
mixer.add_source(active_source);
let mut output = vec![0.0f32; 8 * 2];
mixer.process_into_output(&mut output, 8);
assert_eq!(output[0], 0.5);
assert_eq!(output[2], 0.8);
for (i, val) in output.iter().enumerate().skip(4) {
assert_eq!(*val, 0.0, "output[{i}] should be silence");
}
assert_eq!(
mixer.active_sources.read().len(),
0,
"exhausted source must be removed"
);
}
mod precompute_channel_mappings_tests {
use super::*;
#[test]
fn single_channel_single_output() {
let source = create_test_source(vec![0.0], 1, vec![vec!["vocals".to_string()]]);
let mut track_mappings = HashMap::new();
track_mappings.insert("vocals".to_string(), vec![1]);
let mappings =
AudioMixer::precompute_channel_mappings(source.as_ref(), &track_mappings);
assert_eq!(mappings.len(), 1);
assert_eq!(mappings[0], vec![0]); }
#[test]
fn one_source_to_multiple_outputs() {
let source = create_test_source(vec![0.0], 1, vec![vec!["mono".to_string()]]);
let mut track_mappings = HashMap::new();
track_mappings.insert("mono".to_string(), vec![1, 2]);
let mappings =
AudioMixer::precompute_channel_mappings(source.as_ref(), &track_mappings);
assert_eq!(mappings.len(), 1);
assert_eq!(mappings[0], vec![0, 1]); }
#[test]
fn unmapped_label_produces_empty_outputs() {
let source = create_test_source(vec![0.0], 1, vec![vec!["not_in_config".to_string()]]);
let track_mappings = HashMap::new();
let mappings =
AudioMixer::precompute_channel_mappings(source.as_ref(), &track_mappings);
assert_eq!(mappings.len(), 1);
assert!(mappings[0].is_empty());
}
#[test]
fn multi_channel_source() {
let source = create_test_source(
vec![0.0; 2],
2,
vec![vec!["left".to_string()], vec!["right".to_string()]],
);
let mut track_mappings = HashMap::new();
track_mappings.insert("left".to_string(), vec![1]);
track_mappings.insert("right".to_string(), vec![2]);
let mappings =
AudioMixer::precompute_channel_mappings(source.as_ref(), &track_mappings);
assert_eq!(mappings.len(), 2);
assert_eq!(mappings[0], vec![0]); assert_eq!(mappings[1], vec![1]); }
#[test]
fn source_channel_with_multiple_labels() {
let source = create_test_source(
vec![0.0],
1,
vec![vec!["main".to_string(), "monitor".to_string()]],
);
let mut track_mappings = HashMap::new();
track_mappings.insert("main".to_string(), vec![1]);
track_mappings.insert("monitor".to_string(), vec![3]);
let mappings =
AudioMixer::precompute_channel_mappings(source.as_ref(), &track_mappings);
assert_eq!(mappings.len(), 1);
assert_eq!(mappings[0], vec![0, 2]); }
#[test]
fn no_labels_on_source_channel() {
let source = create_test_source(vec![0.0], 1, vec![vec![]]);
let mut track_mappings = HashMap::new();
track_mappings.insert("anything".to_string(), vec![1]);
let mappings =
AudioMixer::precompute_channel_mappings(source.as_ref(), &track_mappings);
assert_eq!(mappings.len(), 1);
assert!(mappings[0].is_empty());
}
}
mod remove_sources_tests {
use super::*;
#[test]
fn remove_single_source() {
let mixer = AudioMixer::new(2, 44100);
let source = create_test_source(vec![0.0; 100], 1, vec![vec!["t".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
mixer.add_source(make_active_source(42, source, mappings));
assert_eq!(mixer.active_sources.read().len(), 1);
mixer.remove_sources(&[42]);
assert_eq!(mixer.active_sources.read().len(), 0);
}
#[test]
fn remove_subset_of_sources() {
let mixer = AudioMixer::new(2, 44100);
for id in 1..=3 {
let source = create_test_source(vec![0.0; 100], 1, vec![vec!["t".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
mixer.add_source(make_active_source(id, source, mappings));
}
assert_eq!(mixer.active_sources.read().len(), 3);
mixer.remove_sources(&[1, 3]);
let sources = mixer.active_sources.read();
assert_eq!(sources.len(), 1);
assert_eq!(sources[0].lock().id, 2);
}
#[test]
fn remove_nonexistent_id_is_noop() {
let mixer = AudioMixer::new(2, 44100);
let source = create_test_source(vec![0.0; 100], 1, vec![vec!["t".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
mixer.add_source(make_active_source(1, source, mappings));
mixer.remove_sources(&[99]);
assert_eq!(mixer.active_sources.read().len(), 1);
}
#[test]
fn remove_from_empty_mixer() {
let mixer = AudioMixer::new(2, 44100);
mixer.remove_sources(&[1, 2, 3]); assert_eq!(mixer.active_sources.read().len(), 0);
}
}
mod sample_counter_tests {
use super::*;
#[test]
fn starts_at_zero() {
let mixer = AudioMixer::new(2, 44100);
assert_eq!(mixer.current_sample(), 0);
}
#[test]
fn advances_by_frame_count() {
let mixer = AudioMixer::new(2, 44100);
let mut output = vec![0.0f32; 256 * 2];
mixer.process_into_output(&mut output, 256);
assert_eq!(mixer.current_sample(), 256);
}
#[test]
fn accumulates_across_calls() {
let mixer = AudioMixer::new(2, 44100);
let mut output = vec![0.0f32; 128 * 2];
mixer.process_into_output(&mut output, 128);
mixer.process_into_output(&mut output, 128);
mixer.process_into_output(&mut output, 128);
assert_eq!(mixer.current_sample(), 384);
}
}
mod cancel_handle_tests {
use super::*;
#[test]
fn cancel_handle_removes_source() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.5; 1000];
let source = create_test_source(samples, 1, vec![vec!["t".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
let active = make_active_source(1, source, mappings);
let cancel = active.cancel_handle.clone();
mixer.add_source(active);
let mut output = vec![0.0f32; 64 * 2];
mixer.process_into_output(&mut output, 64);
assert_eq!(mixer.active_sources.read().len(), 1);
assert!(output[0] > 0.0);
cancel.cancel();
let mut output2 = vec![0.0f32; 64 * 2];
mixer.process_into_output(&mut output2, 64);
assert_eq!(mixer.active_sources.read().len(), 0);
for &sample in &output2 {
assert_eq!(sample, 0.0);
}
}
#[test]
fn already_finished_source_skipped() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.5; 1000];
let source = create_test_source(samples, 1, vec![vec!["t".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
let active = make_active_source(1, source, mappings);
active.is_finished.store(true, Ordering::Relaxed);
mixer.add_source(active);
let mut output = vec![0.0f32; 64 * 2];
mixer.process_into_output(&mut output, 64);
for &sample in &output {
assert_eq!(sample, 0.0);
}
assert_eq!(mixer.active_sources.read().len(), 0);
}
}
mod channel_mapping_mixing_tests {
use super::*;
#[test]
fn one_to_many_channel_mapping() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.5, 0.8]; let source = create_test_source(samples, 1, vec![vec!["mono".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("mono".to_string(), vec![1, 2]); mixer.add_source(make_active_source(1, source, mappings));
let mut output = vec![0.0f32; 2 * 2];
mixer.process_into_output(&mut output, 2);
assert_eq!(output[0], 0.5); assert_eq!(output[1], 0.5); assert_eq!(output[2], 0.8); assert_eq!(output[3], 0.8); }
#[test]
fn unmapped_source_produces_silence() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.5, 0.8];
let source = create_test_source(samples, 1, vec![vec!["not_configured".to_string()]]);
let mappings = HashMap::new(); mixer.add_source(make_active_source(1, source, mappings));
let mut output = vec![0.0f32; 2 * 2];
mixer.process_into_output(&mut output, 2);
for &sample in &output {
assert_eq!(sample, 0.0);
}
}
#[test]
fn source_with_out_of_range_output_channel() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.5];
let source = create_test_source(samples, 1, vec![vec!["test".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("test".to_string(), vec![5]); mixer.add_source(make_active_source(1, source, mappings));
let mut output = vec![0.0f32; 2];
mixer.process_into_output(&mut output, 1);
for &sample in &output {
assert_eq!(sample, 0.0);
}
}
}
mod mixer_properties_tests {
use super::*;
#[test]
fn num_channels_returns_configured_value() {
assert_eq!(AudioMixer::new(2, 44100).num_channels(), 2);
assert_eq!(AudioMixer::new(32, 48000).num_channels(), 32);
}
#[test]
fn sample_rate_returns_configured_value() {
assert_eq!(AudioMixer::new(2, 44100).sample_rate(), 44100);
assert_eq!(AudioMixer::new(2, 48000).sample_rate(), 48000);
}
#[test]
fn add_source_precomputes_channel_count() {
let mixer = AudioMixer::new(2, 44100);
let source = create_test_source(
vec![0.0; 4],
2,
vec![vec!["a".to_string()], vec!["b".to_string()]],
);
let active = make_active_source(1, source, HashMap::new());
assert_eq!(active.cached_source_channel_count, 0); mixer.add_source(active);
let sources = mixer.active_sources.read();
let source = sources[0].lock();
assert_eq!(source.cached_source_channel_count, 2);
}
#[test]
fn empty_mixer_produces_silence() {
let mixer = AudioMixer::new(4, 44100);
let mut output = vec![1.0f32; 256 * 4]; mixer.process_into_output(&mut output, 256);
for &sample in &output {
assert_eq!(sample, 0.0);
}
}
}
mod process_frame_tests {
use super::*;
#[test]
fn process_frame_source_exhausts() {
let mixer = AudioMixer::new(2, 44100);
let source = create_test_source(vec![0.5, 0.8], 1, vec![vec!["t".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
mixer.add_source(make_active_source(1, source, mappings));
let f1 = mixer.process_frame();
assert_eq!(f1[0], 0.5);
let f2 = mixer.process_frame();
assert_eq!(f2[0], 0.8);
let f3 = mixer.process_frame();
assert_eq!(f3[0], 0.0);
assert_eq!(mixer.active_sources.read().len(), 0);
}
#[test]
fn process_frame_cancelled_source() {
let mixer = AudioMixer::new(2, 44100);
let source = create_test_source(vec![0.5; 100], 1, vec![vec!["t".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
let active = make_active_source(1, source, mappings);
let cancel = active.cancel_handle.clone();
mixer.add_source(active);
cancel.cancel();
let frame = mixer.process_frame();
assert_eq!(frame[0], 0.0); assert_eq!(mixer.active_sources.read().len(), 0);
}
#[test]
fn process_frame_erroring_source() {
let mixer = AudioMixer::new(2, 44100);
let error_source = ErroringSource;
let source: Box<dyn ChannelMappedSampleSource> =
Box::new(crate::audio::sample_source::ChannelMappedSource::new(
Box::new(error_source),
vec![vec!["t".to_string()]],
1,
));
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
mixer.add_source(make_active_source(1, source, mappings));
let frame = mixer.process_frame();
assert_eq!(frame[0], 0.0); assert_eq!(mixer.active_sources.read().len(), 0);
}
#[test]
fn process_into_output_start_at_already_passed() {
let mixer = AudioMixer::new(2, 44100);
let source = create_test_source(vec![0.5; 20], 1, vec![vec!["t".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
let mut active = make_active_source(1, source, mappings);
active.start_at_sample = Some(0);
mixer.add_source(active);
let mut warmup = vec![0.0f32; 10 * 2];
mixer.process_into_output(&mut warmup, 10);
let mut output = vec![0.0f32; 4 * 2];
mixer.process_into_output(&mut output, 4);
assert!(
output[0] > 0.0,
"source should produce audio when start_at already passed"
);
}
#[test]
fn process_into_output_erroring_source() {
let mixer = AudioMixer::new(2, 44100);
let error_source = ErroringSource;
let source: Box<dyn ChannelMappedSampleSource> =
Box::new(crate::audio::sample_source::ChannelMappedSource::new(
Box::new(error_source),
vec![vec!["t".to_string()]],
1,
));
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
mixer.add_source(make_active_source(1, source, mappings));
let mut output = vec![0.0f32; 4 * 2];
mixer.process_into_output(&mut output, 4);
assert_eq!(mixer.active_sources.read().len(), 0);
}
#[test]
fn process_into_output_cancel_at_beyond_buffer() {
let mixer = AudioMixer::new(2, 44100);
let source = create_test_source(vec![0.5; 100], 1, vec![vec!["t".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
let mut active = make_active_source(1, source, mappings);
active.cancel_at_sample = Some(Arc::new(AtomicU64::new(99999)));
mixer.add_source(active);
let mut output = vec![0.0f32; 8 * 2];
mixer.process_into_output(&mut output, 8);
for i in 0..8 {
assert_eq!(output[i * 2], 0.5, "frame {i} should have audio");
}
}
}
struct ErroringSource;
impl crate::audio::sample_source::traits::SampleSource for ErroringSource {
fn next_sample(
&mut self,
) -> Result<Option<f32>, crate::audio::sample_source::error::SampleSourceError> {
Err(
crate::audio::sample_source::error::SampleSourceError::SampleConversionFailed(
"test error".into(),
),
)
}
fn channel_count(&self) -> u16 {
1
}
fn sample_rate(&self) -> u32 {
44100
}
fn bits_per_sample(&self) -> u16 {
32
}
fn sample_format(&self) -> crate::audio::SampleFormat {
crate::audio::SampleFormat::Float
}
fn duration(&self) -> Option<std::time::Duration> {
None
}
}
mod process_frame_resize_buffer {
use super::*;
#[test]
fn process_frame_handles_large_source_channel_count() {
let mixer = AudioMixer::new(2, 44100);
let channel_count = 70;
let mut samples = vec![0.0f32; channel_count];
samples[0] = 0.5;
let mut mappings_vec: Vec<Vec<String>> = Vec::new();
for i in 0..channel_count {
mappings_vec.push(vec![format!("ch{}", i)]);
}
let source = create_test_source(samples, channel_count as u16, mappings_vec);
let mut track_mappings = HashMap::new();
track_mappings.insert("ch0".to_string(), vec![1]);
mixer.add_source(make_active_source(1, source, track_mappings));
let frame = mixer.process_frame();
assert_eq!(frame[0], 0.5);
}
}
mod cancel_at_sample_zero_tests {
use super::*;
#[test]
fn cancel_at_sample_zero_means_no_cancel() {
let mixer = AudioMixer::new(2, 44100);
let samples = vec![0.5; 100];
let source = create_test_source(samples, 1, vec![vec!["t".to_string()]]);
let mut mappings = HashMap::new();
mappings.insert("t".to_string(), vec![1]);
let mut active = make_active_source(1, source, mappings);
active.cancel_at_sample = Some(Arc::new(AtomicU64::new(0)));
mixer.add_source(active);
let mut output = vec![0.0f32; 8 * 2];
mixer.process_into_output(&mut output, 8);
assert!(output[0] > 0.0, "source should play when cancel_at is 0");
}
}
}