fixed_resample/channel.rs
1use std::{
2 ops::Range,
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6 },
7};
8
9use audioadapter::{Adapter, AdapterMut};
10use audioadapter_buffers::direct::InterleavedSlice;
11use ringbuf::traits::{Consumer, Observer, Producer, Split};
12#[cfg(feature = "resampler")]
13use rubato::Resampler;
14
15use crate::Sample;
16
17#[cfg(feature = "resampler")]
18use crate::{Interleaved, PacketResampler, ResamplerConfig};
19
20const TMP_OUT_BUFFER_FRAMES: usize = 512;
21
22/// Additional options for a resampling channel.
23#[derive(Debug, Clone, Copy, PartialEq)]
24pub struct ResamplingChannelConfig {
25 /// The amount of latency added in seconds between the input stream and the
26 /// output stream. If this value is too small, then underflows may occur.
27 ///
28 /// The default value is `0.15` (150 ms).
29 pub latency_seconds: f64,
30
31 /// The capacity of the channel in seconds. If this is too small, then
32 /// overflows may occur. This should be at least twice as large as
33 /// `latency_seconds`.
34 ///
35 /// Note, the actual capacity may be slightly smaller due to how the internal
36 /// sampler processes in chunks.
37 ///
38 /// The default value is `0.4` (400 ms).
39 pub capacity_seconds: f64,
40
41 /// If the number of occupied samples in the channel is greater than or equal to
42 /// (`latency_seconds + percent * (capacity_seconds - latency_seconds)`), then discard the
43 /// number of samples needed to bring the number of occupied seconds back down to
44 /// [`ResamplingChannelConfig::latency_seconds`]. This is used to avoid excessive
45 /// overflows and reduce the percieved audio glitchiness.
46 ///
47 /// The percentage is a value in the range `[0.0, 100.0]`.
48 ///
49 /// Set to `None` to disable this autocorrecting behavior. If the producer end is being
50 /// used in a non-realtime context, then this should be set to `None`.
51 ///
52 /// By default this is set to `Some(75.0)`.
53 pub overflow_autocorrect_percent_threshold: Option<f64>,
54
55 /// If the number of occupied samples in the channel is below or equal to the given
56 /// percentage of [`ResamplingChannelConfig::latency_seconds`], then insert the number of
57 /// zero frames needed to bring the number of occupied samples back up to
58 /// [`ResamplingChannelConfig::latency_seconds`]. This is used to avoid excessive underflows
59 /// and reduce the percieved audio glitchiness.
60 ///
61 /// The percentage is a value in the range `[0.0, 100.0]`.
62 ///
63 /// Set to `None` to disable this autocorrecting behavior. If the consumer end is being
64 /// used in a non-realtime context, then this should be set to `None`.
65 ///
66 /// By default this is set to `Some(25.0)`.
67 pub underflow_autocorrect_percent_threshold: Option<f64>,
68
69 #[cfg(feature = "resampler")]
70 /// The configuration of the resampler to use.
71 ///
72 /// This is ignored when using [`resampling_channel_custom`].
73 pub resampler_config: ResamplerConfig,
74
75 #[cfg(feature = "resampler")]
76 /// If `true`, then the delay of the internal resampler (if used) will be
77 /// subtracted from the `latency_seconds` value to keep the perceived
78 /// latency consistent.
79 ///
80 /// The default value is `true`.
81 pub subtract_resampler_delay: bool,
82}
83
84impl Default for ResamplingChannelConfig {
85 fn default() -> Self {
86 Self {
87 latency_seconds: 0.15,
88 capacity_seconds: 0.4,
89 overflow_autocorrect_percent_threshold: Some(75.0),
90 underflow_autocorrect_percent_threshold: Some(25.0),
91 #[cfg(feature = "resampler")]
92 resampler_config: ResamplerConfig::default(),
93 #[cfg(feature = "resampler")]
94 subtract_resampler_delay: true,
95 }
96 }
97}
98
99/// Create a new realtime-safe spsc channel for sending samples across streams.
100///
101/// If the input and output samples rates differ, then this will automatically
102/// resample the input stream to match the output stream (unless the "resample"
103/// feature is disabled). If the sample rates match, then no resampling will
104/// occur.
105///
106/// Internally this uses the `rubato` and `ringbuf` crates.
107///
108/// * `in_sample_rate` - The sample rate of the input stream.
109/// * `out_sample_rate` - The sample rate of the output stream.
110/// * `num_channels` - The number of channels in the stream.
111/// * `push_interleave_only` - If you are NOT using [`ResamplingProd::push`],
112/// then you can set this to `true` to save a bit of memory. Otherwise,
113/// set this to `false`.
114/// * `config` - Additional options for the resampling channel.
115///
116/// # Panics
117///
118/// Panics when any of the following are true:
119///
120/// * `in_sample_rate == 0`
121/// * `out_sample_rate == 0`
122/// * `num_channels == 0`
123/// * `config.latency_seconds <= 0.0`
124/// * `config.capacity_seconds <= 0.0`
125/// * `config.output_chunk_size == 0`
126///
127/// If the "resampler" feature is disabled, then this will also panic if
128/// `in_sample_rate != out_sample_rate`.
129pub fn resampling_channel<T: Sample>(
130 num_channels: usize,
131 in_sample_rate: u32,
132 out_sample_rate: u32,
133 push_interleave_only: bool,
134 config: ResamplingChannelConfig,
135) -> (ResamplingProd<T>, ResamplingCons<T>) {
136 #[cfg(not(feature = "resampler"))]
137 assert_eq!(
138 in_sample_rate, out_sample_rate,
139 "Input and output sample rate must be equal when the \"resampler\" feature is disabled"
140 );
141
142 #[cfg(feature = "resampler")]
143 let resampler = if in_sample_rate != out_sample_rate {
144 Some(PacketResampler::<T, Interleaved<T>>::new(
145 num_channels,
146 in_sample_rate,
147 out_sample_rate,
148 config.resampler_config,
149 ))
150 } else {
151 None
152 };
153
154 resampling_channel_inner(
155 #[cfg(feature = "resampler")]
156 resampler,
157 num_channels,
158 in_sample_rate as f64,
159 Some(out_sample_rate),
160 config,
161 push_interleave_only,
162 )
163}
164
165/// Create a new realtime-safe spsc channel for sending samples across streams
166/// using the custom resampler.
167///
168/// Internally this uses the `rubato` and `ringbuf` crates.
169///
170/// * `resampler` - The custom rubato resampler.
171/// * `num_channels` - The number of channels in the stream.
172/// * `in_sample_rate` - The sample rate of the input signal. This does not
173/// have to be an integer.
174/// * `push_interleave_only` - If you are NOT using [`ResamplingProd::push`],
175/// then you can set this to `true` to save a bit of memory. Otherwise,
176/// set this to `false`.
177/// * `config` - Additional options for the resampling channel. Note that
178/// `config.quality` will be ignored.
179///
180/// # Panics
181///
182/// Panics when any of the following are true:
183///
184/// * `num_channels == 0`
185/// * `num_channels > resampler.nbr_channels()`
186/// * `in_sample_rate <= 0.0`
187/// * `config.latency_seconds <= 0.0`
188/// * `config.capacity_seconds <= 0.0`
189#[cfg(feature = "resampler")]
190pub fn resampling_channel_custom<T: Sample>(
191 resampler: Box<dyn Resampler<T>>,
192 num_channels: usize,
193 in_sample_rate: f64,
194 push_interleave_only: bool,
195 config: ResamplingChannelConfig,
196) -> (ResamplingProd<T>, ResamplingCons<T>) {
197 assert_ne!(num_channels, 0);
198 assert!(num_channels <= resampler.nbr_channels());
199 assert!(in_sample_rate.is_finite());
200 assert!(in_sample_rate > 0.0);
201
202 let resampler = Some(PacketResampler::from_custom(resampler));
203
204 resampling_channel_inner(
205 resampler,
206 num_channels,
207 in_sample_rate,
208 None,
209 config,
210 push_interleave_only,
211 )
212}
213
214fn resampling_channel_inner<T: Sample>(
215 #[cfg(feature = "resampler")] resampler: Option<PacketResampler<T, Interleaved<T>>>,
216 num_channels: usize,
217 in_sample_rate: f64,
218 out_sample_rate: Option<u32>,
219 config: ResamplingChannelConfig,
220 push_interleave_only: bool,
221) -> (ResamplingProd<T>, ResamplingCons<T>) {
222 assert!(config.latency_seconds > 0.0);
223 assert!(config.capacity_seconds > 0.0);
224
225 #[cfg(feature = "resampler")]
226 let output_to_input_ratio = resampler.as_ref().map(|r| r.ratio()).unwrap_or(1.0);
227 #[cfg(not(feature = "resampler"))]
228 let output_to_input_ratio = 1.0;
229
230 #[cfg(feature = "resampler")]
231 let is_resampling = resampler.is_some();
232 #[cfg(feature = "resampler")]
233 let resampler_output_delay = resampler.as_ref().map(|r| r.output_delay()).unwrap_or(0);
234
235 let in_sample_rate_recip = in_sample_rate.recip();
236 let out_sample_rate = out_sample_rate
237 .map(|o| o as f64)
238 .unwrap_or_else(|| in_sample_rate * output_to_input_ratio);
239 let out_sample_rate_recip = out_sample_rate.recip();
240
241 let latency_frames = ((out_sample_rate * config.latency_seconds).round() as usize).max(1);
242
243 #[allow(unused_mut)]
244 let mut channel_latency_frames = latency_frames;
245
246 #[cfg(feature = "resampler")]
247 if resampler.is_some() && config.subtract_resampler_delay {
248 if latency_frames > resampler_output_delay {
249 channel_latency_frames -= resampler_output_delay;
250 } else {
251 channel_latency_frames = 1;
252 }
253 }
254
255 let channel_latency_samples = channel_latency_frames * num_channels;
256
257 let buffer_capacity_frames = ((in_sample_rate * config.capacity_seconds).round() as usize)
258 .max(channel_latency_frames * 2);
259
260 let (mut prod, cons) = ringbuf::HeapRb::<T>::new(buffer_capacity_frames * num_channels).split();
261
262 // Fill the channel with initial zeros to create the desired latency.
263 prod.push_slice(&vec![T::zero(); channel_latency_frames * num_channels]);
264
265 let shared_state = Arc::new(SharedState::new());
266
267 let overflow_autocorrect_threshold_samples =
268 config
269 .overflow_autocorrect_percent_threshold
270 .map(|percent| {
271 let range_samples =
272 (buffer_capacity_frames - channel_latency_frames) * num_channels;
273
274 ((range_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
275 .min(range_samples)
276 + channel_latency_samples
277 });
278 let underflow_autocorrect_threshold_samples = config
279 .underflow_autocorrect_percent_threshold
280 .map(|percent| {
281 ((channel_latency_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
282 .min(channel_latency_samples)
283 });
284
285 #[cfg(feature = "resampler")]
286 let do_create_tmp_out_buffer = !push_interleave_only && resampler.is_none();
287
288 #[cfg(not(feature = "resampler"))]
289 let do_create_tmp_out_buffer = !push_interleave_only;
290
291 let tmp_out_buffer = do_create_tmp_out_buffer.then(|| {
292 let len = TMP_OUT_BUFFER_FRAMES * num_channels;
293 let mut v = Vec::new();
294 v.reserve_exact(len);
295 v.resize(len, T::zero());
296 v
297 });
298
299 (
300 ResamplingProd {
301 prod,
302 num_channels,
303 latency_seconds: config.latency_seconds,
304 channel_latency_samples,
305 in_sample_rate,
306 in_sample_rate_recip,
307 out_sample_rate,
308 out_sample_rate_recip,
309 shared_state: Arc::clone(&shared_state),
310 waiting_for_output_to_reset: false,
311 underflow_autocorrect_threshold_samples,
312 tmp_out_buffer,
313 #[cfg(feature = "resampler")]
314 resampler,
315 #[cfg(feature = "resampler")]
316 output_to_input_ratio,
317 },
318 ResamplingCons {
319 cons,
320 num_channels,
321 latency_frames,
322 latency_seconds: config.latency_seconds,
323 channel_latency_samples,
324 in_sample_rate,
325 out_sample_rate,
326 out_sample_rate_recip,
327 shared_state,
328 waiting_for_input_to_reset: false,
329 overflow_autocorrect_threshold_samples,
330 #[cfg(feature = "resampler")]
331 is_resampling,
332 #[cfg(feature = "resampler")]
333 resampler_output_delay,
334 },
335 )
336}
337
338/// The producer end of a realtime-safe spsc channel for sending samples across
339/// streams.
340///
341/// If the input and output samples rates differ, then this will automatically
342/// resample the input stream to match the output stream. If the sample rates
343/// match, then no resampling will occur.
344///
345/// Internally this uses the `rubato` and `ringbuf` crates.
346pub struct ResamplingProd<T: Sample> {
347 prod: ringbuf::HeapProd<T>,
348 num_channels: usize,
349 latency_seconds: f64,
350 channel_latency_samples: usize,
351 in_sample_rate: f64,
352 in_sample_rate_recip: f64,
353 out_sample_rate: f64,
354 out_sample_rate_recip: f64,
355 shared_state: Arc<SharedState>,
356 waiting_for_output_to_reset: bool,
357 underflow_autocorrect_threshold_samples: Option<usize>,
358 tmp_out_buffer: Option<Vec<T>>,
359
360 #[cfg(feature = "resampler")]
361 resampler: Option<PacketResampler<T, Interleaved<T>>>,
362 #[cfg(feature = "resampler")]
363 output_to_input_ratio: f64,
364}
365
366impl<T: Sample + 'static> ResamplingProd<T> {
367 /// Push the given input data.
368 ///
369 /// * `input` - The input data. You can use one of the types in the
370 /// [`audioadapter_buffers::direct`](crate::audioadapter_buffers::direct) module
371 /// to wrap your input data into a type that implements [`Adapter`].
372 /// * `input_range` - The range in each input channel to read from. If this is
373 /// `None`, then the entire input buffer will be read.
374 /// * `active_channels_mask` - An optional mask that selects which channels in
375 /// `input` to use. Channels marked with `false` will be skipped and that
376 /// output channel filled with zeros. If `None`, then all of the channels will
377 /// be active.
378 ///
379 /// This method is realtime-safe.
380 ///
381 /// # Panics
382 /// Panics if:
383 /// * The `input_range` is out of bounds for any of the input channels.
384 /// * This producer was created with `push_interleave_only` set to `true`.
385 pub fn push(
386 &mut self,
387 input: &dyn Adapter<'_, T>,
388 input_range: Option<Range<usize>>,
389 active_channels_mask: Option<&[bool]>,
390 ) -> PushStatus {
391 self.set_input_stream_ready(true);
392
393 if !self.output_stream_ready() {
394 return PushStatus::OutputNotReady;
395 }
396
397 self.poll_reset();
398
399 let (input_start, total_frames) = if let Some(range) = input_range {
400 (range.start, range.end - range.start)
401 } else {
402 (0, input.frames())
403 };
404
405 let available_frames = self.available_frames();
406 let total_frames_to_copy = total_frames.min(available_frames);
407
408 #[cfg(feature = "resampler")]
409 let process_non_resampled = self.resampler.is_none();
410 #[cfg(not(feature = "resampler"))]
411 let process_non_resampled = true;
412
413 #[cfg(feature = "resampler")]
414 if let Some(resampler) = &mut self.resampler {
415 resampler.process(
416 input,
417 Some(input_start..input_start + total_frames_to_copy),
418 active_channels_mask,
419 |output_packet, _frames| {
420 let pushed_samples = self.prod.push_slice(output_packet);
421 debug_assert_eq!(pushed_samples, output_packet.len());
422 },
423 None,
424 false,
425 );
426 }
427
428 if process_non_resampled {
429 let tmp_out_buffer = self.tmp_out_buffer.as_mut().expect(
430 "ResamplingProd::push was called even though push_interleave_only was set to true",
431 );
432
433 if input.channels() < self.num_channels || active_channels_mask.is_some() {
434 let tmp_out_buf_len = tmp_out_buffer.len();
435 tmp_out_buffer[0..(total_frames_to_copy * self.num_channels).min(tmp_out_buf_len)]
436 .fill(T::zero());
437 }
438
439 let mut frames_left = total_frames_to_copy;
440 while frames_left > 0 {
441 let block_frames_to_copy = frames_left.min(TMP_OUT_BUFFER_FRAMES);
442
443 {
444 let mut out_buf_wrapper = InterleavedSlice::new_mut(
445 tmp_out_buffer,
446 self.num_channels,
447 TMP_OUT_BUFFER_FRAMES,
448 )
449 .unwrap();
450
451 for ch_i in 0..input.channels() {
452 let channel_active = active_channels_mask
453 .as_ref()
454 .map(|m| m.get(ch_i).copied().unwrap_or(false))
455 .unwrap_or(true);
456
457 if channel_active {
458 out_buf_wrapper.copy_from_other_to_channel(
459 &AdapterWrapper { inner: input },
460 ch_i,
461 ch_i,
462 input_start + (total_frames_to_copy - frames_left),
463 0,
464 block_frames_to_copy,
465 );
466 }
467 }
468 }
469
470 let pushed_samples = self
471 .prod
472 .push_slice(&tmp_out_buffer[0..block_frames_to_copy * self.num_channels]);
473 debug_assert_eq!(pushed_samples, block_frames_to_copy * self.num_channels);
474
475 frames_left -= block_frames_to_copy;
476 }
477 }
478
479 if total_frames_to_copy < total_frames {
480 PushStatus::OverflowOccurred {
481 num_frames_pushed: total_frames_to_copy,
482 }
483 } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
484 PushStatus::UnderflowCorrected {
485 num_zero_frames_pushed: zero_frames_pushed,
486 }
487 } else {
488 PushStatus::Ok
489 }
490 }
491
492 /// Push the given input data in interleaved format.
493 ///
494 /// The input buffer must have the same number of channels as this producer.
495 ///
496 /// This method is realtime-safe.
497 pub fn push_interleaved(&mut self, input: &[T]) -> PushStatus {
498 self.set_input_stream_ready(true);
499
500 if !self.output_stream_ready() {
501 return PushStatus::OutputNotReady;
502 }
503
504 self.poll_reset();
505
506 let total_frames = input.len() / self.num_channels;
507
508 let available_frames = self.available_frames();
509 let total_frames_to_copy = total_frames.min(available_frames);
510
511 #[cfg(feature = "resampler")]
512 if let Some(resampler) = &mut self.resampler {
513 let input_wrapper =
514 InterleavedSlice::new(input, self.num_channels, total_frames_to_copy).unwrap();
515
516 resampler.process(
517 &input_wrapper,
518 None,
519 None,
520 |output_packet, _frames| {
521 let pushed_samples = self.prod.push_slice(output_packet);
522 debug_assert_eq!(pushed_samples, output_packet.len());
523 },
524 None,
525 false,
526 );
527 } else {
528 let pushed_samples = self
529 .prod
530 .push_slice(&input[0..total_frames_to_copy * self.num_channels]);
531 debug_assert_eq!(pushed_samples, total_frames_to_copy * self.num_channels);
532 }
533
534 if total_frames_to_copy < total_frames {
535 PushStatus::OverflowOccurred {
536 num_frames_pushed: total_frames_to_copy,
537 }
538 } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
539 PushStatus::UnderflowCorrected {
540 num_zero_frames_pushed: zero_frames_pushed,
541 }
542 } else {
543 PushStatus::Ok
544 }
545 }
546
547 /// Returns the number of input frames (samples in a single channel of audio)
548 /// that are currently available to be pushed to the channel.
549 ///
550 /// If the output stream is not ready yet, then this will return `0`.
551 ///
552 /// This method is realtime-safe.
553 pub fn available_frames(&mut self) -> usize {
554 if !self.output_stream_ready() {
555 return 0;
556 }
557
558 self.poll_reset();
559
560 let output_vacant_frames = self.prod.vacant_len() / self.num_channels;
561
562 #[cfg(feature = "resampler")]
563 if let Some(resampler) = &self.resampler {
564 let mut input_vacant_frames =
565 (output_vacant_frames as f64 * self.output_to_input_ratio).floor() as usize;
566
567 // Give some leeway to account for floating point inaccuracies.
568 input_vacant_frames = input_vacant_frames.saturating_sub(1);
569
570 if input_vacant_frames < resampler.max_input_block_frames() {
571 return 0;
572 }
573
574 // The resampler processes in chunks.
575 input_vacant_frames = (input_vacant_frames / resampler.max_input_block_frames())
576 * resampler.max_input_block_frames();
577
578 return input_vacant_frames - resampler.tmp_input_frames();
579 }
580
581 output_vacant_frames
582 }
583
584 /// The amount of data in seconds that is available to be pushed to the
585 /// channel.
586 ///
587 /// If the output stream is not ready yet, then this will return `0.0`.
588 ///
589 /// This method is realtime-safe.
590 pub fn available_seconds(&mut self) -> f64 {
591 self.available_frames() as f64 * self.in_sample_rate_recip
592 }
593
594 /// The amount of data that is currently occupied in the channel, in units of
595 /// output frames (samples in a single channel of audio).
596 ///
597 /// Note, this is the number of frames in the *output* audio stream, not the
598 /// input audio stream.
599 ///
600 /// This method is realtime-safe.
601 pub fn occupied_output_frames(&self) -> usize {
602 self.prod.occupied_len() / self.num_channels
603 }
604
605 /// The amount of data that is currently occupied in the channel, in units of
606 /// seconds.
607 ///
608 /// This method is realtime-safe.
609 pub fn occupied_seconds(&self) -> f64 {
610 self.occupied_output_frames() as f64 * self.out_sample_rate_recip
611 }
612
613 /// The number of channels configured for this stream.
614 ///
615 /// This method is realtime-safe.
616 pub fn num_channels(&self) -> usize {
617 self.num_channels
618 }
619
620 /// The sample rate of the input stream.
621 ///
622 /// This method is realtime-safe.
623 pub fn in_sample_rate(&self) -> f64 {
624 self.in_sample_rate
625 }
626
627 /// The sample rate of the output stream.
628 ///
629 /// This method is realtime-safe.
630 pub fn out_sample_rate(&self) -> f64 {
631 self.out_sample_rate
632 }
633
634 /// The latency of the channel in units of seconds.
635 ///
636 /// This method is realtime-safe.
637 pub fn latency_seconds(&self) -> f64 {
638 self.latency_seconds
639 }
640
641 /// Returns `true` if this channel is currently resampling.
642 ///
643 /// This method is realtime-safe.
644 #[cfg(feature = "resampler")]
645 pub fn is_resampling(&self) -> bool {
646 self.resampler.is_some()
647 }
648
649 /// Tell the consumer to clear all queued frames in the buffer.
650 ///
651 /// This method is realtime-safe.
652 pub fn reset(&mut self) {
653 self.shared_state.reset.store(true, Ordering::Relaxed);
654
655 self.waiting_for_output_to_reset = true;
656
657 #[cfg(feature = "resampler")]
658 if let Some(resampler) = &mut self.resampler {
659 resampler.reset();
660 }
661 }
662
663 /// Manually notify the output stream that the input stream is ready/not ready
664 /// to push samples to the channel.
665 ///
666 /// If this producer end is being used in a non-realtime context, then it is
667 /// a good idea to set this to `true` so that the consumer end can start
668 /// reading samples from the channel immediately.
669 ///
670 /// Note, calling [`ResamplingProd::push`] and
671 /// [`ResamplingProd::push_interleaved`] automatically sets the input stream as
672 /// ready.
673 ///
674 /// This method is realtime-safe.
675 pub fn set_input_stream_ready(&mut self, ready: bool) {
676 self.shared_state
677 .input_stream_ready
678 .store(ready, Ordering::Relaxed);
679 }
680
681 /// Whether or not the output stream is ready to read samples from the channel.
682 ///
683 /// This method is realtime-safe.
684 pub fn output_stream_ready(&self) -> bool {
685 self.shared_state
686 .output_stream_ready
687 .load(Ordering::Relaxed)
688 && !self.shared_state.reset.load(Ordering::Relaxed)
689 }
690
691 /// Correct for any underflows.
692 ///
693 /// This returns the number of extra zero frames (samples in a single channel of audio)
694 /// that were added due to an underflow occurring. If no underflow occured, then `None`
695 /// is returned.
696 ///
697 /// Note, this method is already automatically called in [`ResamplingProd::push`] and
698 /// [`ResamplingProd::push_interleaved`].
699 ///
700 /// This will have no effect if [`ResamplingChannelConfig::underflow_autocorrect_percent_threshold`]
701 /// was set to `None`.
702 ///
703 /// This method is realtime-safe.
704 pub fn autocorrect_underflows(&mut self) -> Option<usize> {
705 if !self.output_stream_ready() {
706 return None;
707 }
708
709 self.poll_reset();
710
711 if let Some(underflow_autocorrect_threshold_samples) =
712 self.underflow_autocorrect_threshold_samples
713 {
714 let len = self.prod.occupied_len();
715
716 if len <= underflow_autocorrect_threshold_samples && len < self.channel_latency_samples
717 {
718 let correction_samples = self.channel_latency_samples - len;
719
720 self.prod
721 .push_iter((0..correction_samples).map(|_| T::zero()));
722
723 return Some(correction_samples / self.num_channels);
724 }
725 }
726
727 None
728 }
729
730 fn poll_reset(&mut self) {
731 if self.waiting_for_output_to_reset {
732 self.waiting_for_output_to_reset = false;
733
734 // Fill the channel with initial zeros to create the desired latency.
735 self.prod
736 .push_iter((0..self.channel_latency_samples).map(|_| T::zero()));
737 }
738 }
739}
740
741/// The consumer end of a realtime-safe spsc channel for sending samples across
742/// streams.
743///
744/// If the input and output samples rates differ, then this will automatically
745/// resample the input stream to match the output stream. If the sample rates
746/// match, then no resampling will occur.
747///
748/// Internally this uses the `rubato` and `ringbuf` crates.
749pub struct ResamplingCons<T: Sample> {
750 cons: ringbuf::HeapCons<T>,
751 num_channels: usize,
752 latency_seconds: f64,
753 latency_frames: usize,
754 channel_latency_samples: usize,
755 in_sample_rate: f64,
756 out_sample_rate: f64,
757 out_sample_rate_recip: f64,
758 shared_state: Arc<SharedState>,
759 waiting_for_input_to_reset: bool,
760 overflow_autocorrect_threshold_samples: Option<usize>,
761
762 #[cfg(feature = "resampler")]
763 resampler_output_delay: usize,
764 #[cfg(feature = "resampler")]
765 is_resampling: bool,
766}
767
768impl<T: Sample + 'static> ResamplingCons<T> {
769 /// The number of channels configured for this stream.
770 ///
771 /// This method is realtime-safe.
772 pub fn num_channels(&self) -> usize {
773 self.num_channels
774 }
775
776 /// The sample rate of the input stream.
777 ///
778 /// This method is realtime-safe.
779 pub fn in_sample_rate(&self) -> f64 {
780 self.in_sample_rate
781 }
782
783 /// The sample rate of the output stream.
784 ///
785 /// This method is realtime-safe.
786 pub fn out_sample_rate(&self) -> f64 {
787 self.out_sample_rate
788 }
789
790 /// The latency of the channel in units of seconds.
791 ///
792 /// This method is realtime-safe.
793 pub fn latency_seconds(&self) -> f64 {
794 self.latency_seconds
795 }
796
797 /// The latency of the channel in units of output frames.
798 ///
799 /// This method is realtime-safe.
800 pub fn latency_frames(&self) -> usize {
801 self.latency_frames
802 }
803
804 /// The number of frames (samples in a single channel of audio) that are
805 /// currently available to be read from the channel.
806 ///
807 /// If the input stream is not ready yet, then this will return `0`.
808 ///
809 /// This method is realtime-safe.
810 pub fn available_frames(&self) -> usize {
811 if self.input_stream_ready() {
812 self.cons.occupied_len() / self.num_channels
813 } else {
814 0
815 }
816 }
817
818 /// The amount of data in seconds that is currently available to be read
819 /// from the channel.
820 ///
821 /// If the input stream is not ready yet, then this will return `0.0`.
822 ///
823 /// This method is realtime-safe.
824 pub fn available_seconds(&self) -> f64 {
825 self.available_frames() as f64 * self.out_sample_rate_recip
826 }
827
828 /// The amount of data that is currently occupied in the channel, in units of
829 /// seconds.
830 ///
831 /// This method is realtime-safe.
832 pub fn occupied_seconds(&self) -> f64 {
833 (self.cons.occupied_len() / self.num_channels) as f64 * self.out_sample_rate_recip
834 }
835
836 /// Returns `true` if this channel is currently resampling.
837 ///
838 /// This method is realtime-safe.
839 #[cfg(feature = "resampler")]
840 pub fn is_resampling(&self) -> bool {
841 self.is_resampling
842 }
843
844 /// The delay of the internal resampler in number of output frames (samples in
845 /// a single channel of audio).
846 ///
847 /// If there is no resampler being used for this channel, then this will return
848 /// `0`.
849 ///
850 /// This method is realtime-safe.
851 #[cfg(feature = "resampler")]
852 pub fn resampler_output_delay(&self) -> usize {
853 self.resampler_output_delay
854 }
855
856 /// Discard a certian number of output frames from the buffer. This can be used
857 /// to correct for jitter and avoid excessive overflows and reduce the percieved
858 /// audible glitchiness.
859 ///
860 /// This will discard `frames.min(self.available_frames())` frames.
861 ///
862 /// Returns the number of output frames that were discarded.
863 ///
864 /// This method is realtime-safe.
865 pub fn discard_frames(&mut self, frames: usize) -> usize {
866 self.cons.skip(frames * self.num_channels) / self.num_channels
867 }
868
869 /// Read from the channel and store the results in the given output buffer.
870 ///
871 /// * `output` - The output buffer. You can use one of the types in the
872 /// [`audioadapter_buffers::direct`](crate::audioadapter_buffers::direct) module
873 /// to wrap your input data into a type that implements [`Adapter`].
874 /// * `output_range` - The range in each output channel to write to. If this is
875 /// `None`, then the entire output buffer will be read.
876 /// * `active_channels_mask` - An optional mask that selects which channels in
877 /// `output` to use. Channels marked with `false` will be filled with zeros.
878 /// If `None`, then all of the channels will be active.
879 /// * `output_is_already_cleared` - If `true`, then this will skip the step of
880 /// clearing the output buffer in the range `output_range` to zeros.
881 ///
882 /// This method is realtime-safe.
883 ///
884 /// # Panics
885 /// Panics if:
886 /// * The `input_range` is out of bounds for any of the input channels.
887 pub fn read(
888 &mut self,
889 output: &mut dyn AdapterMut<'_, T>,
890 output_range: Option<Range<usize>>,
891 active_channels_mask: Option<&[bool]>,
892 output_is_already_cleared: bool,
893 ) -> ReadStatus {
894 self.set_output_stream_ready(true);
895
896 self.poll_reset();
897
898 let (output_start, output_frames) = if let Some(range) = output_range {
899 (range.start, range.end - range.start)
900 } else {
901 (0, output.frames())
902 };
903
904 if !output_is_already_cleared {
905 output.fill_frames_with(output_start, output_frames, &T::zero());
906 }
907
908 if !self.input_stream_ready() {
909 return ReadStatus::InputNotReady;
910 }
911
912 self.waiting_for_input_to_reset = false;
913
914 let (s1, s2) = self.cons.as_slices();
915
916 let s1_frames = s1.len() / self.num_channels;
917 let s1_copy_frames = s1_frames.min(output_frames);
918
919 let s1_wrapper = InterleavedSlice::new(s1, self.num_channels, s1_frames).unwrap();
920 let s1_wrapper_2 = AdapterWrapper { inner: &s1_wrapper };
921
922 for ch_i in 0..output.channels().min(self.num_channels) {
923 let channel_active = active_channels_mask
924 .as_ref()
925 .map(|m| m.get(ch_i).copied().unwrap_or(false))
926 .unwrap_or(true);
927
928 if channel_active {
929 output.copy_from_other_to_channel(
930 &s1_wrapper_2,
931 ch_i,
932 ch_i,
933 0,
934 output_start,
935 s1_copy_frames,
936 );
937 }
938 }
939
940 let mut filled_frames = s1_copy_frames;
941
942 if output_frames > s1_copy_frames {
943 let s2_frames = s2.len() / self.num_channels;
944 let s2_copy_frames = s2_frames.min(output_frames - s1_copy_frames);
945
946 let s2_wrapper = InterleavedSlice::new(s2, self.num_channels, s2_frames).unwrap();
947 let s2_wrapper_2 = AdapterWrapper { inner: &s2_wrapper };
948
949 for ch_i in 0..output.channels().min(self.num_channels) {
950 let channel_active = active_channels_mask
951 .as_ref()
952 .map(|m| m.get(ch_i).copied().unwrap_or(false))
953 .unwrap_or(true);
954
955 if channel_active {
956 output.copy_from_other_to_channel(
957 &s2_wrapper_2,
958 ch_i,
959 ch_i,
960 0,
961 output_start + s1_copy_frames,
962 s2_copy_frames,
963 );
964 }
965 }
966
967 filled_frames += s2_copy_frames;
968 }
969
970 self.cons.skip(filled_frames * self.num_channels);
971
972 if filled_frames < output_frames {
973 ReadStatus::UnderflowOccurred {
974 num_frames_read: filled_frames,
975 }
976 } else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
977 ReadStatus::OverflowCorrected {
978 num_frames_discarded,
979 }
980 } else {
981 ReadStatus::Ok
982 }
983 }
984
985 /// Read from the channel and store the results into the output buffer
986 /// in interleaved format.
987 ///
988 /// * `output` - The output buffer to write to. The output buffer must
989 /// have the same number of channels as this consumer.
990 /// * `buffer_out_is_already_cleared` - If `true`, then this will skip the step of
991 /// clearing the output buffer in the range `output_range` to zeros.
992 ///
993 /// This method is realtime-safe.
994 pub fn read_interleaved(
995 &mut self,
996 output: &mut [T],
997 output_already_cleared: bool,
998 ) -> ReadStatus {
999 self.set_output_stream_ready(true);
1000
1001 self.poll_reset();
1002
1003 if !self.input_stream_ready() {
1004 if !output_already_cleared {
1005 output.fill(T::zero());
1006 }
1007
1008 return ReadStatus::InputNotReady;
1009 }
1010
1011 self.waiting_for_input_to_reset = false;
1012
1013 let out_frames = output.len() / self.num_channels;
1014 let out_len = out_frames * self.num_channels;
1015
1016 let pushed_samples = self.cons.pop_slice(&mut output[..out_len]);
1017
1018 if pushed_samples < out_len {
1019 if !output_already_cleared {
1020 output[pushed_samples..].fill(T::zero());
1021 }
1022
1023 ReadStatus::UnderflowOccurred {
1024 num_frames_read: pushed_samples / self.num_channels,
1025 }
1026 } else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
1027 ReadStatus::OverflowCorrected {
1028 num_frames_discarded,
1029 }
1030 } else {
1031 ReadStatus::Ok
1032 }
1033 }
1034
1035 /// Poll the channel to see if it got a command to reset.
1036 ///
1037 /// Returns `true` if the channel was reset.
1038 pub fn poll_reset(&mut self) -> bool {
1039 if self.shared_state.reset.load(Ordering::Relaxed) {
1040 self.shared_state.reset.store(false, Ordering::Relaxed);
1041 self.waiting_for_input_to_reset = true;
1042
1043 self.cons.clear();
1044
1045 true
1046 } else {
1047 false
1048 }
1049 }
1050
1051 /// Manually notify the input stream that the output stream is ready/not ready
1052 /// to read samples from the channel.
1053 ///
1054 /// If this consumer end is being used in a non-realtime context, then it is
1055 /// a good idea to set this to `true` so that the producer end can start
1056 /// pushing samples to the channel immediately.
1057 ///
1058 /// Note, calling [`ResamplingCons::read`] and
1059 /// [`ResamplingCons::read_interleaved`] automatically sets the output stream as
1060 /// ready.
1061 ///
1062 /// This method is realtime-safe.
1063 pub fn set_output_stream_ready(&mut self, ready: bool) {
1064 self.shared_state
1065 .output_stream_ready
1066 .store(ready, Ordering::Relaxed);
1067 }
1068
1069 /// Whether or not the input stream is ready to push samples to the channel.
1070 ///
1071 /// This method is realtime-safe.
1072 pub fn input_stream_ready(&self) -> bool {
1073 self.shared_state.input_stream_ready.load(Ordering::Relaxed)
1074 && !(self.waiting_for_input_to_reset && self.cons.is_empty())
1075 }
1076
1077 /// Correct for any overflows.
1078 ///
1079 /// This returns the number of frames (samples in a single channel of audio) that were
1080 /// discarded due to an overflow occurring. If no overflow occured, then `None`
1081 /// is returned.
1082 ///
1083 /// Note, this method is already automatically called in [`ResamplingCons::read`] and
1084 /// [`ResamplingCons::read_interleaved`].
1085 ///
1086 /// This will have no effect if [`ResamplingChannelConfig::overflow_autocorrect_percent_threshold`]
1087 /// was set to `None`.
1088 ///
1089 /// This method is realtime-safe.
1090 pub fn autocorrect_overflows(&mut self) -> Option<usize> {
1091 if let Some(overflow_autocorrect_threshold_samples) =
1092 self.overflow_autocorrect_threshold_samples
1093 {
1094 let len = self.cons.occupied_len();
1095
1096 if len >= overflow_autocorrect_threshold_samples && len > self.channel_latency_samples {
1097 let correction_frames = (len - self.channel_latency_samples) / self.num_channels;
1098
1099 self.discard_frames(correction_frames);
1100
1101 return Some(correction_frames);
1102 }
1103 }
1104
1105 None
1106 }
1107}
1108
1109/// The status of pushing samples to [`ResamplingProd::push`] and
1110/// [`ResamplingProd::push_interleaved`].
1111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1112pub enum PushStatus {
1113 /// All samples were successfully pushed to the channel.
1114 Ok,
1115 /// The output stream is not yet ready to read samples from the channel.
1116 ///
1117 /// Note, this can also happen when the channel is reset.
1118 ///
1119 /// No samples were pushed to the channel.
1120 OutputNotReady,
1121 /// An overflow occured due to the input stream running faster than the
1122 /// output stream. Some or all of the samples were not pushed to the channel.
1123 ///
1124 /// If this occurs, then it may mean that [`ResamplingChannelConfig::capacity_seconds`]
1125 /// is too low and should be increased.
1126 OverflowOccurred {
1127 /// The number of frames (samples in a single channel of audio) that were
1128 /// successfully pushed to the channel.
1129 num_frames_pushed: usize,
1130 },
1131 /// An underflow occured due to the output stream running faster than the
1132 /// input stream.
1133 ///
1134 /// All of the samples were successfully pushed to the channel, however extra
1135 /// zero samples were also pushed to the channel to correct for the jitter.
1136 ///
1137 /// If this occurs, then it may mean that [`ResamplingChannelConfig::latency_seconds`]
1138 /// is too low and should be increased.
1139 UnderflowCorrected {
1140 /// The number of zero frames that were pushed after the other samples
1141 /// were pushed.
1142 num_zero_frames_pushed: usize,
1143 },
1144}
1145
1146/// The status of reading data from [`ResamplingCons::read`] and
1147/// [`ResamplingCons::read_interleaved`].
1148#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1149pub enum ReadStatus {
1150 /// The output buffer was fully filled with samples from the channel.
1151 Ok,
1152 /// The input stream is not yet ready to push samples to the channel.
1153 ///
1154 /// Note, this can also happen when the channel is reset.
1155 ///
1156 /// The output buffer was filled with zeros.
1157 InputNotReady,
1158 /// An underflow occured due to the output stream running faster than the input
1159 /// stream. Some or all of the samples in the output buffer have been filled with
1160 /// zeros on the end. This may result in audible audio glitches.
1161 ///
1162 /// If this occurs, then it may mean that [`ResamplingChannelConfig::latency_seconds`]
1163 /// is too low and should be increased.
1164 UnderflowOccurred {
1165 /// The number of frames (samples in a single channel of audio) that were
1166 /// successfully read from the channel. All frames past this have been filled
1167 /// with zeros.
1168 num_frames_read: usize,
1169 },
1170 /// An overflow occured due to the input stream running faster than the output
1171 /// stream
1172 ///
1173 /// All of the samples in the output buffer were successfully filled with samples,
1174 /// however a number of frames have also been discarded to correct for the jitter.
1175 ///
1176 /// If this occurs, then it may mean that [`ResamplingChannelConfig::capacity_seconds`]
1177 /// is too low and should be increased.
1178 OverflowCorrected {
1179 /// The number of frames that were discarded from the channel (after the
1180 /// frames have been read into the output buffer).
1181 num_frames_discarded: usize,
1182 },
1183}
1184
1185struct SharedState {
1186 reset: AtomicBool,
1187 input_stream_ready: AtomicBool,
1188 output_stream_ready: AtomicBool,
1189}
1190
1191impl SharedState {
1192 fn new() -> Self {
1193 Self {
1194 reset: AtomicBool::new(false),
1195 input_stream_ready: AtomicBool::new(false),
1196 output_stream_ready: AtomicBool::new(false),
1197 }
1198 }
1199}
1200
1201/// Needed to get around lifetime nonsense
1202struct AdapterWrapper<'a, 'b, T: Sample> {
1203 inner: &'a dyn Adapter<'b, T>,
1204}
1205
1206impl<'a, T: Sample + 'static> Adapter<'a, T> for AdapterWrapper<'_, '_, T> {
1207 /// Safety: This is just wrapping the inner method
1208 unsafe fn read_sample_unchecked(&self, channel: usize, frame: usize) -> T {
1209 self.inner.read_sample_unchecked(channel, frame)
1210 }
1211
1212 fn read_sample(&self, channel: usize, frame: usize) -> Option<T> {
1213 self.inner.read_sample(channel, frame)
1214 }
1215
1216 fn channels(&self) -> usize {
1217 self.inner.channels()
1218 }
1219
1220 fn frames(&self) -> usize {
1221 self.inner.frames()
1222 }
1223
1224 fn copy_from_channel_to_slice(&self, channel: usize, skip: usize, slice: &mut [T]) -> usize {
1225 self.inner.copy_from_channel_to_slice(channel, skip, slice)
1226 }
1227
1228 fn copy_from_frame_to_slice(&self, frame: usize, skip: usize, slice: &mut [T]) -> usize {
1229 self.inner.copy_from_frame_to_slice(frame, skip, slice)
1230 }
1231}