fixed_resample/channel.rs
1use std::{
2 ops::Range,
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6 },
7};
8
9use audioadapter::{Adapter, AdapterMut};
10use audioadapter_buffers::direct::InterleavedSlice;
11use ringbuf::traits::{Consumer, Observer, Producer, Split};
12#[cfg(feature = "resampler")]
13use rubato::Resampler;
14
15use crate::Sample;
16
17#[cfg(feature = "resampler")]
18use crate::{Interleaved, PacketResampler, ResamplerConfig};
19
20const TMP_OUT_BUFFER_FRAMES: usize = 512;
21
22/// Additional options for a resampling channel.
23#[derive(Debug, Clone, Copy, PartialEq)]
24pub struct ResamplingChannelConfig {
25 /// The amount of latency added in seconds between the input stream and the
26 /// output stream. If this value is too small, then underflows may occur.
27 ///
28 /// The default value is `0.15` (150 ms).
29 pub latency_seconds: f64,
30
31 /// The capacity of the channel in seconds. If this is too small, then
32 /// overflows may occur. This should be at least twice as large as
33 /// `latency_seconds`.
34 ///
35 /// Note, the actual capacity may be slightly smaller due to how the internal
36 /// sampler processes in chunks.
37 ///
38 /// The default value is `0.4` (400 ms).
39 pub capacity_seconds: f64,
40
41 /// If the number of occupied samples in the channel is greater than or equal to
42 /// (`latency_seconds + percent * (capacity_seconds - latency_seconds)`), then discard the
43 /// number of samples needed to bring the number of occupied seconds back down to
44 /// [`ResamplingChannelConfig::latency_seconds`]. This is used to avoid excessive
45 /// overflows and reduce the percieved audio glitchiness.
46 ///
47 /// The percentage is a value in the range `[0.0, 100.0]`.
48 ///
49 /// Set to `None` to disable this autocorrecting behavior. If the producer end is being
50 /// used in a non-realtime context, then this should be set to `None`.
51 ///
52 /// By default this is set to `Some(75.0)`.
53 pub overflow_autocorrect_percent_threshold: Option<f64>,
54
55 /// If the number of occupied samples in the channel is below or equal to the given
56 /// percentage of [`ResamplingChannelConfig::latency_seconds`], then insert the number of
57 /// zero frames needed to bring the number of occupied samples back up to
58 /// [`ResamplingChannelConfig::latency_seconds`]. This is used to avoid excessive underflows
59 /// and reduce the percieved audio glitchiness.
60 ///
61 /// The percentage is a value in the range `[0.0, 100.0]`.
62 ///
63 /// Set to `None` to disable this autocorrecting behavior. If the consumer end is being
64 /// used in a non-realtime context, then this should be set to `None`.
65 ///
66 /// By default this is set to `Some(25.0)`.
67 pub underflow_autocorrect_percent_threshold: Option<f64>,
68
69 #[cfg(feature = "resampler")]
70 /// The configuration of the resampler to use.
71 ///
72 /// This is ignored when using [`resampling_channel_custom`].
73 pub resampler_config: ResamplerConfig,
74
75 #[cfg(feature = "resampler")]
76 /// If `true`, then the delay of the internal resampler (if used) will be
77 /// subtracted from the `latency_seconds` value to keep the perceived
78 /// latency consistent.
79 ///
80 /// The default value is `true`.
81 pub subtract_resampler_delay: bool,
82}
83
84impl Default for ResamplingChannelConfig {
85 fn default() -> Self {
86 Self {
87 latency_seconds: 0.15,
88 capacity_seconds: 0.4,
89 overflow_autocorrect_percent_threshold: Some(75.0),
90 underflow_autocorrect_percent_threshold: Some(25.0),
91 #[cfg(feature = "resampler")]
92 resampler_config: ResamplerConfig::default(),
93 #[cfg(feature = "resampler")]
94 subtract_resampler_delay: true,
95 }
96 }
97}
98
99/// Create a new realtime-safe spsc channel for sending samples across streams.
100///
101/// If the input and output samples rates differ, then this will automatically
102/// resample the input stream to match the output stream (unless the "resample"
103/// feature is disabled). If the sample rates match, then no resampling will
104/// occur.
105///
106/// Internally this uses the `rubato` and `ringbuf` crates.
107///
108/// * `in_sample_rate` - The sample rate of the input stream.
109/// * `out_sample_rate` - The sample rate of the output stream.
110/// * `num_channels` - The number of channels in the stream.
111/// * `push_interleave_only` - If you are NOT using [`ResamplingProd::push`],
112/// then you can set this to `true` to save a bit of memory. Otherwise,
113/// set this to `false`.
114/// * `config` - Additional options for the resampling channel.
115///
116/// # Panics
117///
118/// Panics when any of the following are true:
119///
120/// * `in_sample_rate == 0`
121/// * `out_sample_rate == 0`
122/// * `num_channels == 0`
123/// * `config.latency_seconds <= 0.0`
124/// * `config.capacity_seconds <= 0.0`
125/// * `config.output_chunk_size == 0`
126///
127/// If the "resampler" feature is disabled, then this will also panic if
128/// `in_sample_rate != out_sample_rate`.
129pub fn resampling_channel<T: Sample>(
130 num_channels: usize,
131 in_sample_rate: u32,
132 out_sample_rate: u32,
133 push_interleave_only: bool,
134 config: ResamplingChannelConfig,
135) -> (ResamplingProd<T>, ResamplingCons<T>) {
136 #[cfg(not(feature = "resampler"))]
137 assert_eq!(
138 in_sample_rate, out_sample_rate,
139 "Input and output sample rate must be equal when the \"resampler\" feature is disabled"
140 );
141
142 #[cfg(feature = "resampler")]
143 let resampler = if in_sample_rate != out_sample_rate {
144 Some(PacketResampler::<T, Interleaved<T>>::new(
145 num_channels,
146 in_sample_rate,
147 out_sample_rate,
148 config.resampler_config,
149 ))
150 } else {
151 None
152 };
153
154 resampling_channel_inner(
155 #[cfg(feature = "resampler")]
156 resampler,
157 num_channels,
158 in_sample_rate as f64,
159 Some(out_sample_rate),
160 config,
161 push_interleave_only,
162 )
163}
164
165/// Create a new realtime-safe spsc channel for sending samples across streams
166/// using the custom resampler.
167///
168/// Internally this uses the `rubato` and `ringbuf` crates.
169///
170/// * `resampler` - The custom rubato resampler.
171/// * `num_channels` - The number of channels in the stream.
172/// * `in_sample_rate` - The sample rate of the input signal. This does not
173/// have to be an integer.
174/// * `push_interleave_only` - If you are NOT using [`ResamplingProd::push`],
175/// then you can set this to `true` to save a bit of memory. Otherwise,
176/// set this to `false`.
177/// * `config` - Additional options for the resampling channel. Note that
178/// `config.quality` will be ignored.
179///
180/// # Panics
181///
182/// Panics when any of the following are true:
183///
184/// * `num_channels == 0`
185/// * `num_channels > resampler.nbr_channels()`
186/// * `in_sample_rate <= 0.0`
187/// * `config.latency_seconds <= 0.0`
188/// * `config.capacity_seconds <= 0.0`
189#[cfg(feature = "resampler")]
190pub fn resampling_channel_custom<T: Sample>(
191 resampler: Box<dyn Resampler<T>>,
192 num_channels: usize,
193 in_sample_rate: f64,
194 push_interleave_only: bool,
195 config: ResamplingChannelConfig,
196) -> (ResamplingProd<T>, ResamplingCons<T>) {
197 assert_ne!(num_channels, 0);
198 assert!(num_channels <= resampler.nbr_channels());
199 assert!(in_sample_rate.is_finite());
200 assert!(in_sample_rate > 0.0);
201
202 let resampler = Some(PacketResampler::from_custom(resampler));
203
204 resampling_channel_inner(
205 resampler,
206 num_channels,
207 in_sample_rate,
208 None,
209 config,
210 push_interleave_only,
211 )
212}
213
214fn resampling_channel_inner<T: Sample>(
215 #[cfg(feature = "resampler")] resampler: Option<PacketResampler<T, Interleaved<T>>>,
216 num_channels: usize,
217 in_sample_rate: f64,
218 out_sample_rate: Option<u32>,
219 config: ResamplingChannelConfig,
220 push_interleave_only: bool,
221) -> (ResamplingProd<T>, ResamplingCons<T>) {
222 assert!(config.latency_seconds > 0.0);
223 assert!(config.capacity_seconds > 0.0);
224
225 #[cfg(feature = "resampler")]
226 let output_to_input_ratio = resampler.as_ref().map(|r| r.ratio()).unwrap_or(1.0);
227 #[cfg(not(feature = "resampler"))]
228 let output_to_input_ratio = 1.0;
229
230 #[cfg(feature = "resampler")]
231 let is_resampling = resampler.is_some();
232 #[cfg(feature = "resampler")]
233 let resampler_output_delay = resampler.as_ref().map(|r| r.output_delay()).unwrap_or(0);
234
235 let in_sample_rate_recip = in_sample_rate.recip();
236 let out_sample_rate = out_sample_rate
237 .map(|o| o as f64)
238 .unwrap_or_else(|| in_sample_rate * output_to_input_ratio);
239 let out_sample_rate_recip = out_sample_rate.recip();
240
241 let latency_frames = ((out_sample_rate * config.latency_seconds).round() as usize).max(1);
242
243 #[allow(unused_mut)]
244 let mut channel_latency_frames = latency_frames;
245
246 #[cfg(feature = "resampler")]
247 if resampler.is_some() && config.subtract_resampler_delay {
248 if latency_frames > resampler_output_delay {
249 channel_latency_frames -= resampler_output_delay;
250 } else {
251 channel_latency_frames = 1;
252 }
253 }
254
255 let channel_latency_samples = channel_latency_frames * num_channels;
256
257 let buffer_capacity_frames = ((in_sample_rate * config.capacity_seconds).round() as usize)
258 .max(channel_latency_frames * 2);
259
260 let (mut prod, cons) = ringbuf::HeapRb::<T>::new(buffer_capacity_frames * num_channels).split();
261
262 // Fill the channel with initial zeros to create the desired latency.
263 prod.push_slice(&vec![T::zero(); channel_latency_frames * num_channels]);
264
265 let shared_state = Arc::new(SharedState::new());
266
267 let overflow_autocorrect_threshold_samples =
268 config
269 .overflow_autocorrect_percent_threshold
270 .map(|percent| {
271 let range_samples =
272 (buffer_capacity_frames - channel_latency_frames) * num_channels;
273
274 ((range_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
275 .min(range_samples)
276 + channel_latency_samples
277 });
278 let underflow_autocorrect_threshold_samples = config
279 .underflow_autocorrect_percent_threshold
280 .map(|percent| {
281 ((channel_latency_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
282 .min(channel_latency_samples)
283 });
284
285 #[cfg(feature = "resampler")]
286 let do_create_tmp_out_buffer = !push_interleave_only && resampler.is_none();
287
288 #[cfg(not(feature = "resampler"))]
289 let do_create_tmp_out_buffer = !push_interleave_only;
290
291 let tmp_out_buffer = do_create_tmp_out_buffer.then(|| {
292 let len = TMP_OUT_BUFFER_FRAMES * num_channels;
293 let mut v = Vec::new();
294 v.reserve_exact(len);
295 v.resize(len, T::zero());
296 v
297 });
298
299 (
300 ResamplingProd {
301 prod,
302 num_channels,
303 latency_seconds: config.latency_seconds,
304 channel_latency_samples,
305 in_sample_rate,
306 in_sample_rate_recip,
307 out_sample_rate,
308 out_sample_rate_recip,
309 shared_state: Arc::clone(&shared_state),
310 waiting_for_output_to_reset: false,
311 underflow_autocorrect_threshold_samples,
312 tmp_out_buffer,
313 #[cfg(feature = "resampler")]
314 resampler,
315 #[cfg(feature = "resampler")]
316 output_to_input_ratio,
317 },
318 ResamplingCons {
319 cons,
320 num_channels,
321 latency_frames,
322 latency_seconds: config.latency_seconds,
323 channel_latency_samples,
324 in_sample_rate,
325 out_sample_rate,
326 out_sample_rate_recip,
327 shared_state,
328 waiting_for_input_to_reset: false,
329 overflow_autocorrect_threshold_samples,
330 #[cfg(feature = "resampler")]
331 is_resampling,
332 #[cfg(feature = "resampler")]
333 resampler_output_delay,
334 },
335 )
336}
337
338/// The producer end of a realtime-safe spsc channel for sending samples across
339/// streams.
340///
341/// If the input and output samples rates differ, then this will automatically
342/// resample the input stream to match the output stream. If the sample rates
343/// match, then no resampling will occur.
344///
345/// Internally this uses the `rubato` and `ringbuf` crates.
346pub struct ResamplingProd<T: Sample> {
347 prod: ringbuf::HeapProd<T>,
348 num_channels: usize,
349 latency_seconds: f64,
350 channel_latency_samples: usize,
351 in_sample_rate: f64,
352 in_sample_rate_recip: f64,
353 out_sample_rate: f64,
354 out_sample_rate_recip: f64,
355 shared_state: Arc<SharedState>,
356 waiting_for_output_to_reset: bool,
357 underflow_autocorrect_threshold_samples: Option<usize>,
358 tmp_out_buffer: Option<Vec<T>>,
359
360 #[cfg(feature = "resampler")]
361 resampler: Option<PacketResampler<T, Interleaved<T>>>,
362 #[cfg(feature = "resampler")]
363 output_to_input_ratio: f64,
364}
365
366impl<T: Sample + 'static> ResamplingProd<T> {
367 /// Push the given input data.
368 ///
369 /// * `input` - The input data. You can use one of the types in the
370 /// [`audioadapter_buffers::direct`](crate::audioadapter_buffers::direct) module
371 /// to wrap your input data into a type that implements [`Adapter`].
372 /// * `input_range` - The range in each input channel to read from. If this is
373 /// `None`, then the entire input buffer will be read.
374 /// * `active_channels_mask` - An optional mask that selects which channels in
375 /// `input` to use. Channels marked with `false` will be skipped and that
376 /// output channel filled with zeros. If `None`, then all of the channels will
377 /// be active.
378 ///
379 /// This method is realtime-safe.
380 ///
381 /// # Panics
382 /// Panics if:
383 /// * The `input_range` is out of bounds for any of the input channels.
384 /// * This producer was created with `push_interleave_only` set to `true`.
385 pub fn push(
386 &mut self,
387 input: &dyn Adapter<'_, T>,
388 input_range: Option<Range<usize>>,
389 active_channels_mask: Option<&[bool]>,
390 ) -> PushStatus {
391 self.set_input_stream_ready(true);
392
393 if !self.output_stream_ready() {
394 return PushStatus::OutputNotReady;
395 }
396
397 self.poll_reset();
398
399 let (input_start, total_frames) = if let Some(range) = input_range {
400 (range.start, range.end - range.start)
401 } else {
402 (0, input.frames())
403 };
404
405 let available_frames = self.available_frames();
406 let total_frames_to_copy = total_frames.min(available_frames);
407
408 #[cfg(feature = "resampler")]
409 let process_non_resampled = self.resampler.is_none();
410 #[cfg(not(feature = "resampler"))]
411 let process_non_resampled = true;
412
413 #[cfg(feature = "resampler")]
414 if let Some(resampler) = &mut self.resampler {
415 resampler.process(
416 input,
417 Some(input_start..input_start + total_frames_to_copy),
418 active_channels_mask,
419 |output_packet, _frames| {
420 let pushed_samples = self.prod.push_slice(output_packet);
421 debug_assert_eq!(pushed_samples, output_packet.len());
422 },
423 None,
424 false,
425 );
426 }
427
428 if process_non_resampled {
429 let tmp_out_buffer = self.tmp_out_buffer.as_mut().expect(
430 "ResamplingProd::push was called even though push_interleave_only was set to true",
431 );
432
433 if input.channels() < self.num_channels || active_channels_mask.is_some() {
434 let tmp_out_buf_len = tmp_out_buffer.len();
435 tmp_out_buffer[0..(total_frames_to_copy * self.num_channels).min(tmp_out_buf_len)]
436 .fill(T::zero());
437 }
438
439 let mut frames_left = total_frames_to_copy;
440 while frames_left > 0 {
441 let block_frames_to_copy = frames_left.min(TMP_OUT_BUFFER_FRAMES);
442
443 {
444 let mut out_buf_wrapper = InterleavedSlice::new_mut(
445 tmp_out_buffer,
446 self.num_channels,
447 TMP_OUT_BUFFER_FRAMES,
448 )
449 .unwrap();
450
451 for ch_i in 0..input.channels() {
452 let channel_active = active_channels_mask
453 .as_ref()
454 .map(|m| m.get(ch_i).copied().unwrap_or(false))
455 .unwrap_or(true);
456
457 if channel_active {
458 out_buf_wrapper.copy_from_other_to_channel(
459 &AdapterWrapper { inner: input },
460 ch_i,
461 ch_i,
462 input_start + (total_frames_to_copy - frames_left),
463 0,
464 block_frames_to_copy,
465 );
466 }
467 }
468 }
469
470 let pushed_samples = self
471 .prod
472 .push_slice(&tmp_out_buffer[0..block_frames_to_copy * self.num_channels]);
473 debug_assert_eq!(pushed_samples, block_frames_to_copy * self.num_channels);
474
475 frames_left -= block_frames_to_copy;
476 }
477 }
478
479 if total_frames_to_copy < total_frames {
480 PushStatus::OverflowOccurred {
481 num_frames_pushed: total_frames_to_copy,
482 }
483 } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
484 PushStatus::UnderflowCorrected {
485 num_zero_frames_pushed: zero_frames_pushed,
486 }
487 } else {
488 PushStatus::Ok
489 }
490 }
491
492 /// Push the given input data in interleaved format.
493 ///
494 /// The input buffer must have the same number of channels as this producer.
495 ///
496 /// This method is realtime-safe.
497 pub fn push_interleaved(&mut self, input: &[T]) -> PushStatus {
498 self.set_input_stream_ready(true);
499
500 if !self.output_stream_ready() {
501 return PushStatus::OutputNotReady;
502 }
503
504 self.poll_reset();
505
506 let total_frames = input.len() / self.num_channels;
507
508 let available_frames = self.available_frames();
509 let total_frames_to_copy = total_frames.min(available_frames);
510
511 #[cfg(feature = "resampler")]
512 let process_non_resampled = self.resampler.is_none();
513 #[cfg(not(feature = "resampler"))]
514 let process_non_resampled = true;
515
516 #[cfg(feature = "resampler")]
517 if let Some(resampler) = &mut self.resampler {
518 let input_wrapper =
519 InterleavedSlice::new(input, self.num_channels, total_frames_to_copy).unwrap();
520
521 resampler.process(
522 &input_wrapper,
523 None,
524 None,
525 |output_packet, _frames| {
526 let pushed_samples = self.prod.push_slice(output_packet);
527 debug_assert_eq!(pushed_samples, output_packet.len());
528 },
529 None,
530 false,
531 );
532 }
533
534 if process_non_resampled {
535 let pushed_samples = self
536 .prod
537 .push_slice(&input[0..total_frames_to_copy * self.num_channels]);
538 debug_assert_eq!(pushed_samples, total_frames_to_copy * self.num_channels);
539 }
540
541 if total_frames_to_copy < total_frames {
542 PushStatus::OverflowOccurred {
543 num_frames_pushed: total_frames_to_copy,
544 }
545 } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
546 PushStatus::UnderflowCorrected {
547 num_zero_frames_pushed: zero_frames_pushed,
548 }
549 } else {
550 PushStatus::Ok
551 }
552 }
553
554 /// Returns the number of input frames (samples in a single channel of audio)
555 /// that are currently available to be pushed to the channel.
556 ///
557 /// If the output stream is not ready yet, then this will return `0`.
558 ///
559 /// This method is realtime-safe.
560 pub fn available_frames(&mut self) -> usize {
561 if !self.output_stream_ready() {
562 return 0;
563 }
564
565 self.poll_reset();
566
567 let output_vacant_frames = self.prod.vacant_len() / self.num_channels;
568
569 #[cfg(feature = "resampler")]
570 if let Some(resampler) = &self.resampler {
571 let mut input_vacant_frames =
572 (output_vacant_frames as f64 * self.output_to_input_ratio).floor() as usize;
573
574 // Give some leeway to account for floating point inaccuracies.
575 input_vacant_frames = input_vacant_frames.saturating_sub(1);
576
577 if input_vacant_frames < resampler.max_input_block_frames() {
578 return 0;
579 }
580
581 // The resampler processes in chunks.
582 input_vacant_frames = (input_vacant_frames / resampler.max_input_block_frames())
583 * resampler.max_input_block_frames();
584
585 return input_vacant_frames - resampler.tmp_input_frames();
586 }
587
588 output_vacant_frames
589 }
590
591 /// The amount of data in seconds that is available to be pushed to the
592 /// channel.
593 ///
594 /// If the output stream is not ready yet, then this will return `0.0`.
595 ///
596 /// This method is realtime-safe.
597 pub fn available_seconds(&mut self) -> f64 {
598 self.available_frames() as f64 * self.in_sample_rate_recip
599 }
600
601 /// The amount of data that is currently occupied in the channel, in units of
602 /// output frames (samples in a single channel of audio).
603 ///
604 /// Note, this is the number of frames in the *output* audio stream, not the
605 /// input audio stream.
606 ///
607 /// This method is realtime-safe.
608 pub fn occupied_output_frames(&self) -> usize {
609 self.prod.occupied_len() / self.num_channels
610 }
611
612 /// The amount of data that is currently occupied in the channel, in units of
613 /// seconds.
614 ///
615 /// This method is realtime-safe.
616 pub fn occupied_seconds(&self) -> f64 {
617 self.occupied_output_frames() as f64 * self.out_sample_rate_recip
618 }
619
620 /// The number of channels configured for this stream.
621 ///
622 /// This method is realtime-safe.
623 pub fn num_channels(&self) -> usize {
624 self.num_channels
625 }
626
627 /// The sample rate of the input stream.
628 ///
629 /// This method is realtime-safe.
630 pub fn in_sample_rate(&self) -> f64 {
631 self.in_sample_rate
632 }
633
634 /// The sample rate of the output stream.
635 ///
636 /// This method is realtime-safe.
637 pub fn out_sample_rate(&self) -> f64 {
638 self.out_sample_rate
639 }
640
641 /// The latency of the channel in units of seconds.
642 ///
643 /// This method is realtime-safe.
644 pub fn latency_seconds(&self) -> f64 {
645 self.latency_seconds
646 }
647
648 /// Returns `true` if this channel is currently resampling.
649 ///
650 /// This method is realtime-safe.
651 #[cfg(feature = "resampler")]
652 pub fn is_resampling(&self) -> bool {
653 self.resampler.is_some()
654 }
655
656 /// Tell the consumer to clear all queued frames in the buffer.
657 ///
658 /// This method is realtime-safe.
659 pub fn reset(&mut self) {
660 self.shared_state.reset.store(true, Ordering::Relaxed);
661
662 self.waiting_for_output_to_reset = true;
663
664 #[cfg(feature = "resampler")]
665 if let Some(resampler) = &mut self.resampler {
666 resampler.reset();
667 }
668 }
669
670 /// Manually notify the output stream that the input stream is ready/not ready
671 /// to push samples to the channel.
672 ///
673 /// If this producer end is being used in a non-realtime context, then it is
674 /// a good idea to set this to `true` so that the consumer end can start
675 /// reading samples from the channel immediately.
676 ///
677 /// Note, calling [`ResamplingProd::push`] and
678 /// [`ResamplingProd::push_interleaved`] automatically sets the input stream as
679 /// ready.
680 ///
681 /// This method is realtime-safe.
682 pub fn set_input_stream_ready(&mut self, ready: bool) {
683 self.shared_state
684 .input_stream_ready
685 .store(ready, Ordering::Relaxed);
686 }
687
688 /// Whether or not the output stream is ready to read samples from the channel.
689 ///
690 /// This method is realtime-safe.
691 pub fn output_stream_ready(&self) -> bool {
692 self.shared_state
693 .output_stream_ready
694 .load(Ordering::Relaxed)
695 && !self.shared_state.reset.load(Ordering::Relaxed)
696 }
697
698 /// Correct for any underflows.
699 ///
700 /// This returns the number of extra zero frames (samples in a single channel of audio)
701 /// that were added due to an underflow occurring. If no underflow occured, then `None`
702 /// is returned.
703 ///
704 /// Note, this method is already automatically called in [`ResamplingProd::push`] and
705 /// [`ResamplingProd::push_interleaved`].
706 ///
707 /// This will have no effect if [`ResamplingChannelConfig::underflow_autocorrect_percent_threshold`]
708 /// was set to `None`.
709 ///
710 /// This method is realtime-safe.
711 pub fn autocorrect_underflows(&mut self) -> Option<usize> {
712 if !self.output_stream_ready() {
713 return None;
714 }
715
716 self.poll_reset();
717
718 if let Some(underflow_autocorrect_threshold_samples) =
719 self.underflow_autocorrect_threshold_samples
720 {
721 let len = self.prod.occupied_len();
722
723 if len <= underflow_autocorrect_threshold_samples && len < self.channel_latency_samples
724 {
725 let correction_samples = self.channel_latency_samples - len;
726
727 self.prod
728 .push_iter((0..correction_samples).map(|_| T::zero()));
729
730 return Some(correction_samples / self.num_channels);
731 }
732 }
733
734 None
735 }
736
737 fn poll_reset(&mut self) {
738 if self.waiting_for_output_to_reset {
739 self.waiting_for_output_to_reset = false;
740
741 // Fill the channel with initial zeros to create the desired latency.
742 self.prod
743 .push_iter((0..self.channel_latency_samples).map(|_| T::zero()));
744 }
745 }
746}
747
748/// The consumer end of a realtime-safe spsc channel for sending samples across
749/// streams.
750///
751/// If the input and output samples rates differ, then this will automatically
752/// resample the input stream to match the output stream. If the sample rates
753/// match, then no resampling will occur.
754///
755/// Internally this uses the `rubato` and `ringbuf` crates.
756pub struct ResamplingCons<T: Sample> {
757 cons: ringbuf::HeapCons<T>,
758 num_channels: usize,
759 latency_seconds: f64,
760 latency_frames: usize,
761 channel_latency_samples: usize,
762 in_sample_rate: f64,
763 out_sample_rate: f64,
764 out_sample_rate_recip: f64,
765 shared_state: Arc<SharedState>,
766 waiting_for_input_to_reset: bool,
767 overflow_autocorrect_threshold_samples: Option<usize>,
768
769 #[cfg(feature = "resampler")]
770 resampler_output_delay: usize,
771 #[cfg(feature = "resampler")]
772 is_resampling: bool,
773}
774
775impl<T: Sample + 'static> ResamplingCons<T> {
776 /// The number of channels configured for this stream.
777 ///
778 /// This method is realtime-safe.
779 pub fn num_channels(&self) -> usize {
780 self.num_channels
781 }
782
783 /// The sample rate of the input stream.
784 ///
785 /// This method is realtime-safe.
786 pub fn in_sample_rate(&self) -> f64 {
787 self.in_sample_rate
788 }
789
790 /// The sample rate of the output stream.
791 ///
792 /// This method is realtime-safe.
793 pub fn out_sample_rate(&self) -> f64 {
794 self.out_sample_rate
795 }
796
797 /// The latency of the channel in units of seconds.
798 ///
799 /// This method is realtime-safe.
800 pub fn latency_seconds(&self) -> f64 {
801 self.latency_seconds
802 }
803
804 /// The latency of the channel in units of output frames.
805 ///
806 /// This method is realtime-safe.
807 pub fn latency_frames(&self) -> usize {
808 self.latency_frames
809 }
810
811 /// The number of frames (samples in a single channel of audio) that are
812 /// currently available to be read from the channel.
813 ///
814 /// If the input stream is not ready yet, then this will return `0`.
815 ///
816 /// This method is realtime-safe.
817 pub fn available_frames(&self) -> usize {
818 if self.input_stream_ready() {
819 self.cons.occupied_len() / self.num_channels
820 } else {
821 0
822 }
823 }
824
825 /// The amount of data in seconds that is currently available to be read
826 /// from the channel.
827 ///
828 /// If the input stream is not ready yet, then this will return `0.0`.
829 ///
830 /// This method is realtime-safe.
831 pub fn available_seconds(&self) -> f64 {
832 self.available_frames() as f64 * self.out_sample_rate_recip
833 }
834
835 /// The amount of data that is currently occupied in the channel, in units of
836 /// seconds.
837 ///
838 /// This method is realtime-safe.
839 pub fn occupied_seconds(&self) -> f64 {
840 (self.cons.occupied_len() / self.num_channels) as f64 * self.out_sample_rate_recip
841 }
842
843 /// Returns `true` if this channel is currently resampling.
844 ///
845 /// This method is realtime-safe.
846 #[cfg(feature = "resampler")]
847 pub fn is_resampling(&self) -> bool {
848 self.is_resampling
849 }
850
851 /// The delay of the internal resampler in number of output frames (samples in
852 /// a single channel of audio).
853 ///
854 /// If there is no resampler being used for this channel, then this will return
855 /// `0`.
856 ///
857 /// This method is realtime-safe.
858 #[cfg(feature = "resampler")]
859 pub fn resampler_output_delay(&self) -> usize {
860 self.resampler_output_delay
861 }
862
863 /// Discard a certian number of output frames from the buffer. This can be used
864 /// to correct for jitter and avoid excessive overflows and reduce the percieved
865 /// audible glitchiness.
866 ///
867 /// This will discard `frames.min(self.available_frames())` frames.
868 ///
869 /// Returns the number of output frames that were discarded.
870 ///
871 /// This method is realtime-safe.
872 pub fn discard_frames(&mut self, frames: usize) -> usize {
873 self.cons.skip(frames * self.num_channels) / self.num_channels
874 }
875
876 /// Read from the channel and store the results in the given output buffer.
877 ///
878 /// * `output` - The output buffer. You can use one of the types in the
879 /// [`audioadapter_buffers::direct`](crate::audioadapter_buffers::direct) module
880 /// to wrap your input data into a type that implements [`Adapter`].
881 /// * `output_range` - The range in each output channel to write to. If this is
882 /// `None`, then the entire output buffer will be read.
883 /// * `active_channels_mask` - An optional mask that selects which channels in
884 /// `output` to use. Channels marked with `false` will be filled with zeros.
885 /// If `None`, then all of the channels will be active.
886 /// * `output_is_already_cleared` - If `true`, then this will skip the step of
887 /// clearing the output buffer in the range `output_range` to zeros.
888 ///
889 /// This method is realtime-safe.
890 ///
891 /// # Panics
892 /// Panics if:
893 /// * The `input_range` is out of bounds for any of the input channels.
894 pub fn read(
895 &mut self,
896 output: &mut dyn AdapterMut<'_, T>,
897 output_range: Option<Range<usize>>,
898 active_channels_mask: Option<&[bool]>,
899 output_is_already_cleared: bool,
900 ) -> ReadStatus {
901 self.set_output_stream_ready(true);
902
903 self.poll_reset();
904
905 let (output_start, output_frames) = if let Some(range) = output_range {
906 (range.start, range.end - range.start)
907 } else {
908 (0, output.frames())
909 };
910
911 if !output_is_already_cleared {
912 output.fill_frames_with(output_start, output_frames, &T::zero());
913 }
914
915 if !self.input_stream_ready() {
916 return ReadStatus::InputNotReady;
917 }
918
919 self.waiting_for_input_to_reset = false;
920
921 let (s1, s2) = self.cons.as_slices();
922
923 let s1_frames = s1.len() / self.num_channels;
924 let s1_copy_frames = s1_frames.min(output_frames);
925
926 let s1_wrapper = InterleavedSlice::new(s1, self.num_channels, s1_frames).unwrap();
927 let s1_wrapper_2 = AdapterWrapper { inner: &s1_wrapper };
928
929 for ch_i in 0..output.channels().min(self.num_channels) {
930 let channel_active = active_channels_mask
931 .as_ref()
932 .map(|m| m.get(ch_i).copied().unwrap_or(false))
933 .unwrap_or(true);
934
935 if channel_active {
936 output.copy_from_other_to_channel(
937 &s1_wrapper_2,
938 ch_i,
939 ch_i,
940 0,
941 output_start,
942 s1_copy_frames,
943 );
944 }
945 }
946
947 let mut filled_frames = s1_copy_frames;
948
949 if output_frames > s1_copy_frames {
950 let s2_frames = s2.len() / self.num_channels;
951 let s2_copy_frames = s2_frames.min(output_frames - s1_copy_frames);
952
953 let s2_wrapper = InterleavedSlice::new(s2, self.num_channels, s2_frames).unwrap();
954 let s2_wrapper_2 = AdapterWrapper { inner: &s2_wrapper };
955
956 for ch_i in 0..output.channels().min(self.num_channels) {
957 let channel_active = active_channels_mask
958 .as_ref()
959 .map(|m| m.get(ch_i).copied().unwrap_or(false))
960 .unwrap_or(true);
961
962 if channel_active {
963 output.copy_from_other_to_channel(
964 &s2_wrapper_2,
965 ch_i,
966 ch_i,
967 0,
968 output_start + s1_copy_frames,
969 s2_copy_frames,
970 );
971 }
972 }
973
974 filled_frames += s2_copy_frames;
975 }
976
977 self.cons.skip(filled_frames * self.num_channels);
978
979 if filled_frames < output_frames {
980 ReadStatus::UnderflowOccurred {
981 num_frames_read: filled_frames,
982 }
983 } else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
984 ReadStatus::OverflowCorrected {
985 num_frames_discarded,
986 }
987 } else {
988 ReadStatus::Ok
989 }
990 }
991
992 /// Read from the channel and store the results into the output buffer
993 /// in interleaved format.
994 ///
995 /// * `output` - The output buffer to write to. The output buffer must
996 /// have the same number of channels as this consumer.
997 /// * `buffer_out_is_already_cleared` - If `true`, then this will skip the step of
998 /// clearing the output buffer in the range `output_range` to zeros.
999 ///
1000 /// This method is realtime-safe.
1001 pub fn read_interleaved(
1002 &mut self,
1003 output: &mut [T],
1004 output_already_cleared: bool,
1005 ) -> ReadStatus {
1006 self.set_output_stream_ready(true);
1007
1008 self.poll_reset();
1009
1010 if !self.input_stream_ready() {
1011 if !output_already_cleared {
1012 output.fill(T::zero());
1013 }
1014
1015 return ReadStatus::InputNotReady;
1016 }
1017
1018 self.waiting_for_input_to_reset = false;
1019
1020 let out_frames = output.len() / self.num_channels;
1021 let out_len = out_frames * self.num_channels;
1022
1023 let pushed_samples = self.cons.pop_slice(&mut output[..out_len]);
1024
1025 if pushed_samples < out_len {
1026 if !output_already_cleared {
1027 output[pushed_samples..].fill(T::zero());
1028 }
1029
1030 ReadStatus::UnderflowOccurred {
1031 num_frames_read: pushed_samples / self.num_channels,
1032 }
1033 } else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
1034 ReadStatus::OverflowCorrected {
1035 num_frames_discarded,
1036 }
1037 } else {
1038 ReadStatus::Ok
1039 }
1040 }
1041
1042 /// Poll the channel to see if it got a command to reset.
1043 ///
1044 /// Returns `true` if the channel was reset.
1045 pub fn poll_reset(&mut self) -> bool {
1046 if self.shared_state.reset.load(Ordering::Relaxed) {
1047 self.shared_state.reset.store(false, Ordering::Relaxed);
1048 self.waiting_for_input_to_reset = true;
1049
1050 self.cons.clear();
1051
1052 true
1053 } else {
1054 false
1055 }
1056 }
1057
1058 /// Manually notify the input stream that the output stream is ready/not ready
1059 /// to read samples from the channel.
1060 ///
1061 /// If this consumer end is being used in a non-realtime context, then it is
1062 /// a good idea to set this to `true` so that the producer end can start
1063 /// pushing samples to the channel immediately.
1064 ///
1065 /// Note, calling [`ResamplingCons::read`] and
1066 /// [`ResamplingCons::read_interleaved`] automatically sets the output stream as
1067 /// ready.
1068 ///
1069 /// This method is realtime-safe.
1070 pub fn set_output_stream_ready(&mut self, ready: bool) {
1071 self.shared_state
1072 .output_stream_ready
1073 .store(ready, Ordering::Relaxed);
1074 }
1075
1076 /// Whether or not the input stream is ready to push samples to the channel.
1077 ///
1078 /// This method is realtime-safe.
1079 pub fn input_stream_ready(&self) -> bool {
1080 self.shared_state.input_stream_ready.load(Ordering::Relaxed)
1081 && !(self.waiting_for_input_to_reset && self.cons.is_empty())
1082 }
1083
1084 /// Correct for any overflows.
1085 ///
1086 /// This returns the number of frames (samples in a single channel of audio) that were
1087 /// discarded due to an overflow occurring. If no overflow occured, then `None`
1088 /// is returned.
1089 ///
1090 /// Note, this method is already automatically called in [`ResamplingCons::read`] and
1091 /// [`ResamplingCons::read_interleaved`].
1092 ///
1093 /// This will have no effect if [`ResamplingChannelConfig::overflow_autocorrect_percent_threshold`]
1094 /// was set to `None`.
1095 ///
1096 /// This method is realtime-safe.
1097 pub fn autocorrect_overflows(&mut self) -> Option<usize> {
1098 if let Some(overflow_autocorrect_threshold_samples) =
1099 self.overflow_autocorrect_threshold_samples
1100 {
1101 let len = self.cons.occupied_len();
1102
1103 if len >= overflow_autocorrect_threshold_samples && len > self.channel_latency_samples {
1104 let correction_frames = (len - self.channel_latency_samples) / self.num_channels;
1105
1106 self.discard_frames(correction_frames);
1107
1108 return Some(correction_frames);
1109 }
1110 }
1111
1112 None
1113 }
1114}
1115
1116/// The status of pushing samples to [`ResamplingProd::push`] and
1117/// [`ResamplingProd::push_interleaved`].
1118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1119pub enum PushStatus {
1120 /// All samples were successfully pushed to the channel.
1121 Ok,
1122 /// The output stream is not yet ready to read samples from the channel.
1123 ///
1124 /// Note, this can also happen when the channel is reset.
1125 ///
1126 /// No samples were pushed to the channel.
1127 OutputNotReady,
1128 /// An overflow occured due to the input stream running faster than the
1129 /// output stream. Some or all of the samples were not pushed to the channel.
1130 ///
1131 /// If this occurs, then it may mean that [`ResamplingChannelConfig::capacity_seconds`]
1132 /// is too low and should be increased.
1133 OverflowOccurred {
1134 /// The number of frames (samples in a single channel of audio) that were
1135 /// successfully pushed to the channel.
1136 num_frames_pushed: usize,
1137 },
1138 /// An underflow occured due to the output stream running faster than the
1139 /// input stream.
1140 ///
1141 /// All of the samples were successfully pushed to the channel, however extra
1142 /// zero samples were also pushed to the channel to correct for the jitter.
1143 ///
1144 /// If this occurs, then it may mean that [`ResamplingChannelConfig::latency_seconds`]
1145 /// is too low and should be increased.
1146 UnderflowCorrected {
1147 /// The number of zero frames that were pushed after the other samples
1148 /// were pushed.
1149 num_zero_frames_pushed: usize,
1150 },
1151}
1152
1153/// The status of reading data from [`ResamplingCons::read`] and
1154/// [`ResamplingCons::read_interleaved`].
1155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1156pub enum ReadStatus {
1157 /// The output buffer was fully filled with samples from the channel.
1158 Ok,
1159 /// The input stream is not yet ready to push samples to the channel.
1160 ///
1161 /// Note, this can also happen when the channel is reset.
1162 ///
1163 /// The output buffer was filled with zeros.
1164 InputNotReady,
1165 /// An underflow occured due to the output stream running faster than the input
1166 /// stream. Some or all of the samples in the output buffer have been filled with
1167 /// zeros on the end. This may result in audible audio glitches.
1168 ///
1169 /// If this occurs, then it may mean that [`ResamplingChannelConfig::latency_seconds`]
1170 /// is too low and should be increased.
1171 UnderflowOccurred {
1172 /// The number of frames (samples in a single channel of audio) that were
1173 /// successfully read from the channel. All frames past this have been filled
1174 /// with zeros.
1175 num_frames_read: usize,
1176 },
1177 /// An overflow occured due to the input stream running faster than the output
1178 /// stream
1179 ///
1180 /// All of the samples in the output buffer were successfully filled with samples,
1181 /// however a number of frames have also been discarded to correct for the jitter.
1182 ///
1183 /// If this occurs, then it may mean that [`ResamplingChannelConfig::capacity_seconds`]
1184 /// is too low and should be increased.
1185 OverflowCorrected {
1186 /// The number of frames that were discarded from the channel (after the
1187 /// frames have been read into the output buffer).
1188 num_frames_discarded: usize,
1189 },
1190}
1191
1192struct SharedState {
1193 reset: AtomicBool,
1194 input_stream_ready: AtomicBool,
1195 output_stream_ready: AtomicBool,
1196}
1197
1198impl SharedState {
1199 fn new() -> Self {
1200 Self {
1201 reset: AtomicBool::new(false),
1202 input_stream_ready: AtomicBool::new(false),
1203 output_stream_ready: AtomicBool::new(false),
1204 }
1205 }
1206}
1207
1208/// Needed to get around lifetime nonsense
1209struct AdapterWrapper<'a, 'b, T: Sample> {
1210 inner: &'a dyn Adapter<'b, T>,
1211}
1212
1213// # Safety: This simply wraps each of the trait methods.
1214unsafe impl<'a, T: Sample + 'static> Adapter<'a, T> for AdapterWrapper<'_, '_, T> {
1215 /// Safety: This is just wrapping the inner method
1216 unsafe fn read_sample_unchecked(&self, channel: usize, frame: usize) -> T {
1217 self.inner.read_sample_unchecked(channel, frame)
1218 }
1219
1220 fn read_sample(&self, channel: usize, frame: usize) -> Option<T> {
1221 self.inner.read_sample(channel, frame)
1222 }
1223
1224 fn channels(&self) -> usize {
1225 self.inner.channels()
1226 }
1227
1228 fn frames(&self) -> usize {
1229 self.inner.frames()
1230 }
1231
1232 fn copy_from_channel_to_slice(&self, channel: usize, skip: usize, slice: &mut [T]) -> usize {
1233 self.inner.copy_from_channel_to_slice(channel, skip, slice)
1234 }
1235
1236 fn copy_from_frame_to_slice(&self, frame: usize, skip: usize, slice: &mut [T]) -> usize {
1237 self.inner.copy_from_frame_to_slice(frame, skip, slice)
1238 }
1239}