use std::{
num::NonZeroUsize,
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use ringbuf::traits::{Consumer, Observer, Producer, Split};
use crate::Sample;
#[cfg(feature = "resampler")]
use crate::{resampler_type::ResamplerType, FixedResampler, ResampleQuality};
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct ResamplingChannelConfig {
pub latency_seconds: f64,
pub capacity_seconds: f64,
pub overflow_autocorrect_percent_threshold: Option<f64>,
pub underflow_autocorrect_percent_threshold: Option<f64>,
#[cfg(feature = "resampler")]
pub quality: ResampleQuality,
#[cfg(feature = "resampler")]
pub subtract_resampler_delay: bool,
}
impl Default for ResamplingChannelConfig {
fn default() -> Self {
Self {
latency_seconds: 0.15,
capacity_seconds: 0.4,
overflow_autocorrect_percent_threshold: Some(75.0),
underflow_autocorrect_percent_threshold: Some(25.0),
#[cfg(feature = "resampler")]
quality: ResampleQuality::High,
#[cfg(feature = "resampler")]
subtract_resampler_delay: true,
}
}
}
pub fn resampling_channel<T: Sample, const MAX_CHANNELS: usize>(
num_channels: NonZeroUsize,
in_sample_rate: u32,
out_sample_rate: u32,
config: ResamplingChannelConfig,
) -> (ResamplingProd<T, MAX_CHANNELS>, ResamplingCons<T>) {
#[cfg(feature = "resampler")]
let resampler = if in_sample_rate != out_sample_rate {
Some(FixedResampler::<T, MAX_CHANNELS>::new(
num_channels,
in_sample_rate,
out_sample_rate,
config.quality,
true,
))
} else {
None
};
resampling_channel_inner(
#[cfg(feature = "resampler")]
resampler,
num_channels,
in_sample_rate,
out_sample_rate,
config,
)
}
#[cfg(feature = "resampler")]
pub fn resampling_channel_custom<T: Sample, const MAX_CHANNELS: usize>(
resampler: impl Into<ResamplerType<T>>,
num_channels: NonZeroUsize,
in_sample_rate: u32,
out_sample_rate: u32,
config: ResamplingChannelConfig,
) -> (ResamplingProd<T, MAX_CHANNELS>, ResamplingCons<T>) {
let resampler = if in_sample_rate != out_sample_rate {
let resampler = FixedResampler::<T, MAX_CHANNELS>::from_custom(
resampler,
in_sample_rate,
out_sample_rate,
true,
);
assert_eq!(resampler.num_channels(), num_channels);
Some(resampler)
} else {
None
};
resampling_channel_inner(
resampler,
num_channels,
in_sample_rate,
out_sample_rate,
config,
)
}
fn resampling_channel_inner<T: Sample, const MAX_CHANNELS: usize>(
#[cfg(feature = "resampler")] resampler: Option<FixedResampler<T, MAX_CHANNELS>>,
num_channels: NonZeroUsize,
in_sample_rate: u32,
out_sample_rate: u32,
config: ResamplingChannelConfig,
) -> (ResamplingProd<T, MAX_CHANNELS>, ResamplingCons<T>) {
#[cfg(not(feature = "resampler"))]
assert_eq!(
in_sample_rate, out_sample_rate,
"Input and output sample rate must be equal when the \"resampler\" feature is disabled"
);
assert_ne!(in_sample_rate, 0);
assert_ne!(out_sample_rate, 0);
assert!(config.latency_seconds > 0.0);
assert!(config.capacity_seconds > 0.0);
#[cfg(feature = "resampler")]
let is_resampling = resampler.is_some();
#[cfg(feature = "resampler")]
let resampler_output_delay = resampler.as_ref().map(|r| r.output_delay()).unwrap_or(0);
let in_sample_rate_recip = (in_sample_rate as f64).recip();
let out_sample_rate_recip = (out_sample_rate as f64).recip();
#[cfg(feature = "resampler")]
let output_to_input_ratio = in_sample_rate as f64 / out_sample_rate as f64;
let latency_frames =
((out_sample_rate as f64 * config.latency_seconds).round() as usize).max(1);
#[allow(unused_mut)]
let mut channel_latency_frames = latency_frames;
#[cfg(feature = "resampler")]
if resampler.is_some() && config.subtract_resampler_delay {
if latency_frames > resampler_output_delay {
channel_latency_frames -= resampler_output_delay;
} else {
channel_latency_frames = 1;
}
}
let channel_latency_samples = channel_latency_frames * num_channels.get();
let buffer_capacity_frames = ((in_sample_rate as f64 * config.capacity_seconds).round()
as usize)
.max(channel_latency_frames * 2);
let (mut prod, cons) =
ringbuf::HeapRb::<T>::new(buffer_capacity_frames * num_channels.get()).split();
prod.push_slice(&vec![
T::zero();
channel_latency_frames * num_channels.get()
]);
let shared_state = Arc::new(SharedState::new());
let overflow_autocorrect_threshold_samples =
config
.overflow_autocorrect_percent_threshold
.map(|percent| {
let range_samples =
(buffer_capacity_frames - channel_latency_frames) * num_channels.get();
((range_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
.min(range_samples)
+ channel_latency_samples
});
let underflow_autocorrect_threshold_samples = config
.underflow_autocorrect_percent_threshold
.map(|percent| {
((channel_latency_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
.min(channel_latency_samples)
});
(
ResamplingProd {
prod,
num_channels,
latency_seconds: config.latency_seconds,
channel_latency_samples,
in_sample_rate,
in_sample_rate_recip,
out_sample_rate,
out_sample_rate_recip,
shared_state: Arc::clone(&shared_state),
waiting_for_output_to_reset: false,
underflow_autocorrect_threshold_samples,
#[cfg(feature = "resampler")]
resampler,
#[cfg(feature = "resampler")]
output_to_input_ratio,
},
ResamplingCons {
cons,
num_channels,
latency_frames,
latency_seconds: config.latency_seconds,
channel_latency_samples,
in_sample_rate,
out_sample_rate,
out_sample_rate_recip,
shared_state,
waiting_for_input_to_reset: false,
overflow_autocorrect_threshold_samples,
#[cfg(feature = "resampler")]
is_resampling,
#[cfg(feature = "resampler")]
resampler_output_delay,
},
)
}
pub struct ResamplingProd<T: Sample, const MAX_CHANNELS: usize> {
prod: ringbuf::HeapProd<T>,
num_channels: NonZeroUsize,
latency_seconds: f64,
channel_latency_samples: usize,
in_sample_rate: u32,
in_sample_rate_recip: f64,
out_sample_rate: u32,
out_sample_rate_recip: f64,
shared_state: Arc<SharedState>,
waiting_for_output_to_reset: bool,
underflow_autocorrect_threshold_samples: Option<usize>,
#[cfg(feature = "resampler")]
resampler: Option<FixedResampler<T, MAX_CHANNELS>>,
#[cfg(feature = "resampler")]
output_to_input_ratio: f64,
}
impl<T: Sample, const MAX_CHANNELS: usize> ResamplingProd<T, MAX_CHANNELS> {
pub fn push<Vin: AsRef<[T]>>(
&mut self,
input: &[Vin],
input_range: Range<usize>,
) -> PushStatus {
assert!(input.len() >= self.num_channels.get());
self.set_input_stream_ready(true);
if !self.output_stream_ready() {
return PushStatus::OutputNotReady;
}
self.poll_reset();
let input_frames = input_range.end - input_range.start;
#[cfg(feature = "resampler")]
if self.resampler.is_some() {
let available_frames = self.available_frames();
let proc_frames = input_frames.min(available_frames);
self.resampler.as_mut().unwrap().process(
input,
input_range.start..input_range.start + proc_frames,
|output_packet| {
let packet_frames = output_packet[0].len();
let pushed_frames = push_internal(
&mut self.prod,
&output_packet,
0,
packet_frames,
self.num_channels,
);
debug_assert_eq!(pushed_frames, packet_frames);
},
None,
true,
);
return if proc_frames < input_frames {
PushStatus::OverflowOccurred {
num_frames_pushed: proc_frames,
}
} else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
PushStatus::UnderflowCorrected {
num_zero_frames_pushed: zero_frames_pushed,
}
} else {
PushStatus::Ok
};
}
let pushed_frames = push_internal(
&mut self.prod,
input,
input_range.start,
input_frames,
self.num_channels,
);
if pushed_frames < input_frames {
PushStatus::OverflowOccurred {
num_frames_pushed: pushed_frames,
}
} else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
PushStatus::UnderflowCorrected {
num_zero_frames_pushed: zero_frames_pushed,
}
} else {
PushStatus::Ok
}
}
pub fn push_interleaved(&mut self, input: &[T]) -> PushStatus {
self.set_input_stream_ready(true);
if !self.output_stream_ready() {
return PushStatus::OutputNotReady;
}
self.poll_reset();
#[cfg(feature = "resampler")]
if self.resampler.is_some() {
let input_frames = input.len() / self.num_channels.get();
let available_frames = self.available_frames();
let proc_frames = input_frames.min(available_frames);
self.resampler.as_mut().unwrap().process_interleaved(
&input[..proc_frames * self.num_channels.get()],
|output_packet| {
let pushed_samples = self.prod.push_slice(output_packet);
debug_assert_eq!(pushed_samples, output_packet.len());
},
None,
true,
);
return if proc_frames < input_frames {
PushStatus::OverflowOccurred {
num_frames_pushed: proc_frames,
}
} else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
PushStatus::UnderflowCorrected {
num_zero_frames_pushed: zero_frames_pushed,
}
} else {
PushStatus::Ok
};
}
let pushed_samples = self.prod.push_slice(input);
if pushed_samples < input.len() {
PushStatus::OverflowOccurred {
num_frames_pushed: pushed_samples / self.num_channels.get(),
}
} else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
PushStatus::UnderflowCorrected {
num_zero_frames_pushed: zero_frames_pushed,
}
} else {
PushStatus::Ok
}
}
pub fn available_frames(&mut self) -> usize {
if !self.output_stream_ready() {
return 0;
}
self.poll_reset();
let output_vacant_frames = self.prod.vacant_len() / self.num_channels.get();
#[cfg(feature = "resampler")]
if let Some(resampler) = &self.resampler {
let mut input_vacant_frames =
(output_vacant_frames as f64 * self.output_to_input_ratio).floor() as usize;
if input_vacant_frames > 0 {
input_vacant_frames -= 1;
}
if input_vacant_frames < resampler.input_block_frames() {
return 0;
}
input_vacant_frames = (input_vacant_frames / resampler.input_block_frames())
* resampler.input_block_frames();
return input_vacant_frames - resampler.tmp_input_frames();
}
output_vacant_frames
}
pub fn available_seconds(&mut self) -> f64 {
self.available_frames() as f64 * self.in_sample_rate_recip
}
pub fn occupied_output_frames(&self) -> usize {
self.prod.occupied_len() / self.num_channels.get()
}
pub fn occupied_seconds(&self) -> f64 {
self.occupied_output_frames() as f64 * self.out_sample_rate_recip
}
pub fn num_channels(&self) -> NonZeroUsize {
self.num_channels
}
pub fn in_sample_rate(&self) -> u32 {
self.in_sample_rate
}
pub fn out_sample_rate(&self) -> u32 {
self.out_sample_rate
}
pub fn latency_seconds(&self) -> f64 {
self.latency_seconds
}
#[cfg(feature = "resampler")]
pub fn is_resampling(&self) -> bool {
self.resampler.is_some()
}
pub fn reset(&mut self) {
self.shared_state.reset.store(true, Ordering::Relaxed);
self.waiting_for_output_to_reset = true;
#[cfg(feature = "resampler")]
if let Some(resampler) = &mut self.resampler {
resampler.reset();
}
}
pub fn set_input_stream_ready(&mut self, ready: bool) {
self.shared_state
.input_stream_ready
.store(ready, Ordering::Relaxed);
}
pub fn output_stream_ready(&self) -> bool {
self.shared_state
.output_stream_ready
.load(Ordering::Relaxed)
&& !self.shared_state.reset.load(Ordering::Relaxed)
}
pub fn autocorrect_underflows(&mut self) -> Option<usize> {
if !self.output_stream_ready() {
return None;
}
self.poll_reset();
if let Some(underflow_autocorrect_threshold_samples) =
self.underflow_autocorrect_threshold_samples
{
let len = self.prod.occupied_len();
if len <= underflow_autocorrect_threshold_samples && len < self.channel_latency_samples
{
let correction_samples = self.channel_latency_samples - len;
self.prod
.push_iter((0..correction_samples).map(|_| T::zero()));
return Some(correction_samples / self.num_channels.get());
}
}
None
}
fn poll_reset(&mut self) {
if self.waiting_for_output_to_reset {
self.waiting_for_output_to_reset = false;
self.prod
.push_iter((0..self.channel_latency_samples).map(|_| T::zero()));
}
}
}
pub struct ResamplingCons<T: Sample> {
cons: ringbuf::HeapCons<T>,
num_channels: NonZeroUsize,
latency_seconds: f64,
latency_frames: usize,
channel_latency_samples: usize,
in_sample_rate: u32,
out_sample_rate: u32,
out_sample_rate_recip: f64,
shared_state: Arc<SharedState>,
waiting_for_input_to_reset: bool,
overflow_autocorrect_threshold_samples: Option<usize>,
#[cfg(feature = "resampler")]
resampler_output_delay: usize,
#[cfg(feature = "resampler")]
is_resampling: bool,
}
impl<T: Sample> ResamplingCons<T> {
pub fn num_channels(&self) -> NonZeroUsize {
self.num_channels
}
pub fn in_sample_rate(&self) -> u32 {
self.in_sample_rate
}
pub fn out_sample_rate(&self) -> u32 {
self.out_sample_rate
}
pub fn latency_seconds(&self) -> f64 {
self.latency_seconds
}
pub fn latency_frames(&self) -> usize {
self.latency_frames
}
pub fn available_frames(&self) -> usize {
if self.input_stream_ready() {
self.cons.occupied_len() / self.num_channels.get()
} else {
0
}
}
pub fn available_seconds(&self) -> f64 {
self.available_frames() as f64 * self.out_sample_rate_recip
}
pub fn occupied_seconds(&self) -> f64 {
(self.cons.occupied_len() / self.num_channels.get()) as f64 * self.out_sample_rate_recip
}
#[cfg(feature = "resampler")]
pub fn is_resampling(&self) -> bool {
self.is_resampling
}
#[cfg(feature = "resampler")]
pub fn resampler_output_delay(&self) -> usize {
self.resampler_output_delay
}
pub fn discard_frames(&mut self, frames: usize) -> usize {
self.cons.skip(frames * self.num_channels.get()) / self.num_channels.get()
}
pub fn read<Vout: AsMut<[T]>>(
&mut self,
output: &mut [Vout],
output_range: Range<usize>,
) -> ReadStatus {
self.set_output_stream_ready(true);
self.poll_reset();
if !self.input_stream_ready() {
for ch in output.iter_mut() {
ch.as_mut()[output_range.clone()].fill(T::zero());
}
return ReadStatus::InputNotReady;
}
self.waiting_for_input_to_reset = false;
if output.len() > self.num_channels.get() {
for ch in output.iter_mut().skip(self.num_channels.get()) {
ch.as_mut()[output_range.clone()].fill(T::zero());
}
}
let output_frames = output_range.end - output_range.start;
let (s1, s2) = self.cons.as_slices();
let s1_frames = s1.len() / self.num_channels.get();
let s1_copy_frames = s1_frames.min(output_frames);
fast_interleave::deinterleave_variable(
s1,
self.num_channels,
output,
output_range.start..output_range.start + s1_copy_frames,
);
let mut filled_frames = s1_copy_frames;
if output_frames > s1_copy_frames {
let s2_frames = s2.len() / self.num_channels.get();
let s2_copy_frames = s2_frames.min(output_frames - s1_copy_frames);
fast_interleave::deinterleave_variable(
s2,
self.num_channels,
output,
output_range.start + s1_copy_frames
..output_range.start + s1_copy_frames + s2_copy_frames,
);
filled_frames += s2_copy_frames;
}
unsafe {
self.cons
.advance_read_index(filled_frames * self.num_channels.get());
}
if filled_frames < output_frames {
for (_, ch) in (0..self.num_channels.get()).zip(output.iter_mut()) {
ch.as_mut()[filled_frames..output_range.end].fill(T::zero());
}
ReadStatus::UnderflowOccurred {
num_frames_read: filled_frames,
}
} else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
ReadStatus::OverflowCorrected {
num_frames_discarded,
}
} else {
ReadStatus::Ok
}
}
pub fn read_interleaved(&mut self, output: &mut [T]) -> ReadStatus {
self.set_output_stream_ready(true);
self.poll_reset();
if !self.input_stream_ready() {
output.fill(T::zero());
return ReadStatus::InputNotReady;
}
self.waiting_for_input_to_reset = false;
let out_frames = output.len() / self.num_channels.get();
let pushed_samples = self
.cons
.pop_slice(&mut output[..out_frames * self.num_channels.get()]);
if pushed_samples < output.len() {
output[pushed_samples..].fill(T::zero());
ReadStatus::UnderflowOccurred {
num_frames_read: pushed_samples / self.num_channels.get(),
}
} else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
ReadStatus::OverflowCorrected {
num_frames_discarded,
}
} else {
ReadStatus::Ok
}
}
pub fn poll_reset(&mut self) -> bool {
if self.shared_state.reset.load(Ordering::Relaxed) {
self.shared_state.reset.store(false, Ordering::Relaxed);
self.waiting_for_input_to_reset = true;
self.cons.clear();
true
} else {
false
}
}
pub fn set_output_stream_ready(&mut self, ready: bool) {
self.shared_state
.output_stream_ready
.store(ready, Ordering::Relaxed);
}
pub fn input_stream_ready(&self) -> bool {
self.shared_state.input_stream_ready.load(Ordering::Relaxed)
&& !(self.waiting_for_input_to_reset && self.cons.is_empty())
}
pub fn autocorrect_overflows(&mut self) -> Option<usize> {
if let Some(overflow_autocorrect_threshold_samples) =
self.overflow_autocorrect_threshold_samples
{
let len = self.cons.occupied_len();
if len >= overflow_autocorrect_threshold_samples && len > self.channel_latency_samples {
let correction_frames =
(len - self.channel_latency_samples) / self.num_channels.get();
self.discard_frames(correction_frames);
return Some(correction_frames);
}
}
None
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PushStatus {
Ok,
OutputNotReady,
OverflowOccurred {
num_frames_pushed: usize,
},
UnderflowCorrected {
num_zero_frames_pushed: usize,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadStatus {
Ok,
InputNotReady,
UnderflowOccurred {
num_frames_read: usize,
},
OverflowCorrected {
num_frames_discarded: usize,
},
}
struct SharedState {
reset: AtomicBool,
input_stream_ready: AtomicBool,
output_stream_ready: AtomicBool,
}
impl SharedState {
fn new() -> Self {
Self {
reset: AtomicBool::new(false),
input_stream_ready: AtomicBool::new(false),
output_stream_ready: AtomicBool::new(false),
}
}
}
fn push_internal<T: Sample, Vin: AsRef<[T]>>(
prod: &mut ringbuf::HeapProd<T>,
input: &[Vin],
in_start_frame: usize,
frames: usize,
num_channels: NonZeroUsize,
) -> usize {
let (s1, s2) = prod.vacant_slices_mut();
if s1.len() == 0 {
return 0;
}
let s1_frames = s1.len() / num_channels.get();
let s1_copy_frames = s1_frames.min(frames);
let mut frames_pushed = s1_copy_frames;
{
let s1: &mut [T] =
unsafe { std::mem::transmute(&mut s1[..s1_copy_frames * num_channels.get()]) };
fast_interleave::interleave_variable(
input,
in_start_frame..in_start_frame + s1_copy_frames,
s1,
num_channels,
);
}
if frames > s1_copy_frames && s2.len() > 0 {
let s2_frames = s2.len() / num_channels.get();
let s2_copy_frames = s2_frames.min(frames - s1_copy_frames);
let s2: &mut [T] =
unsafe { std::mem::transmute(&mut s2[..s2_copy_frames * num_channels.get()]) };
fast_interleave::interleave_variable(
input,
in_start_frame + s1_copy_frames..in_start_frame + s1_copy_frames + s2_copy_frames,
s2,
num_channels,
);
frames_pushed += s2_copy_frames;
}
unsafe {
prod.advance_write_index(frames_pushed * num_channels.get());
}
frames_pushed
}