Skip to main content

fixed_resample/
channel.rs

1use std::{
2    ops::Range,
3    sync::{
4        atomic::{AtomicBool, Ordering},
5        Arc,
6    },
7};
8
9use audioadapter::{Adapter, AdapterMut};
10use audioadapter_buffers::direct::InterleavedSlice;
11use ringbuf::traits::{Consumer, Observer, Producer, Split};
12#[cfg(feature = "resampler")]
13use rubato::Resampler;
14
15use crate::Sample;
16
17#[cfg(feature = "resampler")]
18use crate::{Interleaved, PacketResampler, ResamplerConfig};
19
20const TMP_OUT_BUFFER_FRAMES: usize = 512;
21
22/// Additional options for a resampling channel.
23#[derive(Debug, Clone, Copy, PartialEq)]
24pub struct ResamplingChannelConfig {
25    /// The amount of latency added in seconds between the input stream and the
26    /// output stream. If this value is too small, then underflows may occur.
27    ///
28    /// The default value is `0.15` (150 ms).
29    pub latency_seconds: f64,
30
31    /// The capacity of the channel in seconds. If this is too small, then
32    /// overflows may occur. This should be at least twice as large as
33    /// `latency_seconds`.
34    ///
35    /// Note, the actual capacity may be slightly smaller due to how the internal
36    /// sampler processes in chunks.
37    ///
38    /// The default value is `0.4` (400 ms).
39    pub capacity_seconds: f64,
40
41    /// If the number of occupied samples in the channel is greater than or equal to
42    /// (`latency_seconds + percent * (capacity_seconds - latency_seconds)`), then discard the
43    /// number of samples needed to bring the number of occupied seconds back down to
44    /// [`ResamplingChannelConfig::latency_seconds`]. This is used to avoid excessive
45    /// overflows and reduce the percieved audio glitchiness.
46    ///
47    /// The percentage is a value in the range `[0.0, 100.0]`.
48    ///
49    /// Set to `None` to disable this autocorrecting behavior. If the producer end is being
50    /// used in a non-realtime context, then this should be set to `None`.
51    ///
52    /// By default this is set to `Some(75.0)`.
53    pub overflow_autocorrect_percent_threshold: Option<f64>,
54
55    /// If the number of occupied samples in the channel is below or equal to the given
56    /// percentage of [`ResamplingChannelConfig::latency_seconds`], then insert the number of
57    /// zero frames needed to bring the number of occupied samples back up to
58    /// [`ResamplingChannelConfig::latency_seconds`]. This is used to avoid excessive underflows
59    /// and reduce the percieved audio glitchiness.
60    ///
61    /// The percentage is a value in the range `[0.0, 100.0]`.
62    ///
63    /// Set to `None` to disable this autocorrecting behavior. If the consumer end is being
64    /// used in a non-realtime context, then this should be set to `None`.
65    ///
66    /// By default this is set to `Some(25.0)`.
67    pub underflow_autocorrect_percent_threshold: Option<f64>,
68
69    #[cfg(feature = "resampler")]
70    /// The configuration of the resampler to use.
71    ///
72    /// This is ignored when using [`resampling_channel_custom`].
73    pub resampler_config: ResamplerConfig,
74
75    #[cfg(feature = "resampler")]
76    /// If `true`, then the delay of the internal resampler (if used) will be
77    /// subtracted from the `latency_seconds` value to keep the perceived
78    /// latency consistent.
79    ///
80    /// The default value is `true`.
81    pub subtract_resampler_delay: bool,
82}
83
84impl Default for ResamplingChannelConfig {
85    fn default() -> Self {
86        Self {
87            latency_seconds: 0.15,
88            capacity_seconds: 0.4,
89            overflow_autocorrect_percent_threshold: Some(75.0),
90            underflow_autocorrect_percent_threshold: Some(25.0),
91            #[cfg(feature = "resampler")]
92            resampler_config: ResamplerConfig::default(),
93            #[cfg(feature = "resampler")]
94            subtract_resampler_delay: true,
95        }
96    }
97}
98
99/// Create a new realtime-safe spsc channel for sending samples across streams.
100///
101/// If the input and output samples rates differ, then this will automatically
102/// resample the input stream to match the output stream (unless the "resample"
103/// feature is disabled). If the sample rates match, then no resampling will
104/// occur.
105///
106/// Internally this uses the `rubato` and `ringbuf` crates.
107///
108/// * `in_sample_rate` - The sample rate of the input stream.
109/// * `out_sample_rate` - The sample rate of the output stream.
110/// * `num_channels` - The number of channels in the stream.
111/// * `push_interleave_only` - If you are NOT using [`ResamplingProd::push`],
112///   then you can set this to `true` to save a bit of memory. Otherwise,
113///   set this to `false`.
114/// * `config` - Additional options for the resampling channel.
115///
116/// # Panics
117///
118/// Panics when any of the following are true:
119///
120/// * `in_sample_rate == 0`
121/// * `out_sample_rate == 0`
122/// * `num_channels == 0`
123/// * `config.latency_seconds <= 0.0`
124/// * `config.capacity_seconds <= 0.0`
125/// * `config.output_chunk_size == 0`
126///
127/// If the "resampler" feature is disabled, then this will also panic if
128/// `in_sample_rate != out_sample_rate`.
129pub fn resampling_channel<T: Sample>(
130    num_channels: usize,
131    in_sample_rate: u32,
132    out_sample_rate: u32,
133    push_interleave_only: bool,
134    config: ResamplingChannelConfig,
135) -> (ResamplingProd<T>, ResamplingCons<T>) {
136    #[cfg(not(feature = "resampler"))]
137    assert_eq!(
138        in_sample_rate, out_sample_rate,
139        "Input and output sample rate must be equal when the \"resampler\" feature is disabled"
140    );
141
142    #[cfg(feature = "resampler")]
143    let resampler = if in_sample_rate != out_sample_rate {
144        Some(PacketResampler::<T, Interleaved<T>>::new(
145            num_channels,
146            in_sample_rate,
147            out_sample_rate,
148            config.resampler_config,
149        ))
150    } else {
151        None
152    };
153
154    resampling_channel_inner(
155        #[cfg(feature = "resampler")]
156        resampler,
157        num_channels,
158        in_sample_rate as f64,
159        Some(out_sample_rate),
160        config,
161        push_interleave_only,
162    )
163}
164
165/// Create a new realtime-safe spsc channel for sending samples across streams
166/// using the custom resampler.
167///
168/// Internally this uses the `rubato` and `ringbuf` crates.
169///
170/// * `resampler` - The custom rubato resampler.
171/// * `num_channels` - The number of channels in the stream.
172/// * `in_sample_rate` - The sample rate of the input signal. This does not
173///   have to be an integer.
174/// * `push_interleave_only` - If you are NOT using [`ResamplingProd::push`],
175///   then you can set this to `true` to save a bit of memory. Otherwise,
176///   set this to `false`.
177/// * `config` - Additional options for the resampling channel. Note that
178///   `config.quality` will be ignored.
179///
180/// # Panics
181///
182/// Panics when any of the following are true:
183///
184/// * `num_channels == 0`
185/// * `num_channels > resampler.nbr_channels()`
186/// * `in_sample_rate <= 0.0`
187/// * `config.latency_seconds <= 0.0`
188/// * `config.capacity_seconds <= 0.0`
189#[cfg(feature = "resampler")]
190pub fn resampling_channel_custom<T: Sample>(
191    resampler: Box<dyn Resampler<T>>,
192    num_channels: usize,
193    in_sample_rate: f64,
194    push_interleave_only: bool,
195    config: ResamplingChannelConfig,
196) -> (ResamplingProd<T>, ResamplingCons<T>) {
197    assert_ne!(num_channels, 0);
198    assert!(num_channels <= resampler.nbr_channels());
199    assert!(in_sample_rate.is_finite());
200    assert!(in_sample_rate > 0.0);
201
202    let resampler = Some(PacketResampler::from_custom(resampler));
203
204    resampling_channel_inner(
205        resampler,
206        num_channels,
207        in_sample_rate,
208        None,
209        config,
210        push_interleave_only,
211    )
212}
213
214fn resampling_channel_inner<T: Sample>(
215    #[cfg(feature = "resampler")] resampler: Option<PacketResampler<T, Interleaved<T>>>,
216    num_channels: usize,
217    in_sample_rate: f64,
218    out_sample_rate: Option<u32>,
219    config: ResamplingChannelConfig,
220    push_interleave_only: bool,
221) -> (ResamplingProd<T>, ResamplingCons<T>) {
222    assert!(config.latency_seconds > 0.0);
223    assert!(config.capacity_seconds > 0.0);
224
225    #[cfg(feature = "resampler")]
226    let output_to_input_ratio = resampler.as_ref().map(|r| r.ratio()).unwrap_or(1.0);
227    #[cfg(not(feature = "resampler"))]
228    let output_to_input_ratio = 1.0;
229
230    #[cfg(feature = "resampler")]
231    let is_resampling = resampler.is_some();
232    #[cfg(feature = "resampler")]
233    let resampler_output_delay = resampler.as_ref().map(|r| r.output_delay()).unwrap_or(0);
234
235    let in_sample_rate_recip = in_sample_rate.recip();
236    let out_sample_rate = out_sample_rate
237        .map(|o| o as f64)
238        .unwrap_or_else(|| in_sample_rate * output_to_input_ratio);
239    let out_sample_rate_recip = out_sample_rate.recip();
240
241    let latency_frames = ((out_sample_rate * config.latency_seconds).round() as usize).max(1);
242
243    #[allow(unused_mut)]
244    let mut channel_latency_frames = latency_frames;
245
246    #[cfg(feature = "resampler")]
247    if resampler.is_some() && config.subtract_resampler_delay {
248        if latency_frames > resampler_output_delay {
249            channel_latency_frames -= resampler_output_delay;
250        } else {
251            channel_latency_frames = 1;
252        }
253    }
254
255    let channel_latency_samples = channel_latency_frames * num_channels;
256
257    let buffer_capacity_frames = ((in_sample_rate * config.capacity_seconds).round() as usize)
258        .max(channel_latency_frames * 2);
259
260    let (mut prod, cons) = ringbuf::HeapRb::<T>::new(buffer_capacity_frames * num_channels).split();
261
262    // Fill the channel with initial zeros to create the desired latency.
263    prod.push_slice(&vec![T::zero(); channel_latency_frames * num_channels]);
264
265    let shared_state = Arc::new(SharedState::new());
266
267    let overflow_autocorrect_threshold_samples =
268        config
269            .overflow_autocorrect_percent_threshold
270            .map(|percent| {
271                let range_samples =
272                    (buffer_capacity_frames - channel_latency_frames) * num_channels;
273
274                ((range_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
275                    .min(range_samples)
276                    + channel_latency_samples
277            });
278    let underflow_autocorrect_threshold_samples = config
279        .underflow_autocorrect_percent_threshold
280        .map(|percent| {
281            ((channel_latency_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
282                .min(channel_latency_samples)
283        });
284
285    #[cfg(feature = "resampler")]
286    let do_create_tmp_out_buffer = !push_interleave_only && resampler.is_none();
287
288    #[cfg(not(feature = "resampler"))]
289    let do_create_tmp_out_buffer = !push_interleave_only;
290
291    let tmp_out_buffer = do_create_tmp_out_buffer.then(|| {
292        let len = TMP_OUT_BUFFER_FRAMES * num_channels;
293        let mut v = Vec::new();
294        v.reserve_exact(len);
295        v.resize(len, T::zero());
296        v
297    });
298
299    (
300        ResamplingProd {
301            prod,
302            num_channels,
303            latency_seconds: config.latency_seconds,
304            channel_latency_samples,
305            in_sample_rate,
306            in_sample_rate_recip,
307            out_sample_rate,
308            out_sample_rate_recip,
309            shared_state: Arc::clone(&shared_state),
310            waiting_for_output_to_reset: false,
311            underflow_autocorrect_threshold_samples,
312            tmp_out_buffer,
313            #[cfg(feature = "resampler")]
314            resampler,
315            #[cfg(feature = "resampler")]
316            output_to_input_ratio,
317        },
318        ResamplingCons {
319            cons,
320            num_channels,
321            latency_frames,
322            latency_seconds: config.latency_seconds,
323            channel_latency_samples,
324            in_sample_rate,
325            out_sample_rate,
326            out_sample_rate_recip,
327            shared_state,
328            waiting_for_input_to_reset: false,
329            overflow_autocorrect_threshold_samples,
330            #[cfg(feature = "resampler")]
331            is_resampling,
332            #[cfg(feature = "resampler")]
333            resampler_output_delay,
334        },
335    )
336}
337
338/// The producer end of a realtime-safe spsc channel for sending samples across
339/// streams.
340///
341/// If the input and output samples rates differ, then this will automatically
342/// resample the input stream to match the output stream. If the sample rates
343/// match, then no resampling will occur.
344///
345/// Internally this uses the `rubato` and `ringbuf` crates.
346pub struct ResamplingProd<T: Sample> {
347    prod: ringbuf::HeapProd<T>,
348    num_channels: usize,
349    latency_seconds: f64,
350    channel_latency_samples: usize,
351    in_sample_rate: f64,
352    in_sample_rate_recip: f64,
353    out_sample_rate: f64,
354    out_sample_rate_recip: f64,
355    shared_state: Arc<SharedState>,
356    waiting_for_output_to_reset: bool,
357    underflow_autocorrect_threshold_samples: Option<usize>,
358    tmp_out_buffer: Option<Vec<T>>,
359
360    #[cfg(feature = "resampler")]
361    resampler: Option<PacketResampler<T, Interleaved<T>>>,
362    #[cfg(feature = "resampler")]
363    output_to_input_ratio: f64,
364}
365
366impl<T: Sample + 'static> ResamplingProd<T> {
367    /// Push the given input data.
368    ///
369    /// * `input` - The input data. You can use one of the types in the
370    ///   [`audioadapter_buffers::direct`](crate::audioadapter_buffers::direct) module
371    ///   to wrap your input data into a type that implements [`Adapter`].
372    /// * `input_range` - The range in each input channel to read from. If this is
373    ///   `None`, then the entire input buffer will be read.
374    /// * `active_channels_mask` - An optional mask that selects which channels in
375    ///   `input` to use. Channels marked with `false` will be skipped and that
376    ///   output channel filled with zeros. If `None`, then all of the channels will
377    ///   be active.
378    ///
379    /// This method is realtime-safe.
380    ///
381    /// # Panics
382    /// Panics if:
383    /// * The `input_range` is out of bounds for any of the input channels.
384    /// * This producer was created with `push_interleave_only` set to `true`.
385    pub fn push(
386        &mut self,
387        input: &dyn Adapter<'_, T>,
388        input_range: Option<Range<usize>>,
389        active_channels_mask: Option<&[bool]>,
390    ) -> PushStatus {
391        self.set_input_stream_ready(true);
392
393        if !self.output_stream_ready() {
394            return PushStatus::OutputNotReady;
395        }
396
397        self.poll_reset();
398
399        let (input_start, total_frames) = if let Some(range) = input_range {
400            (range.start, range.end - range.start)
401        } else {
402            (0, input.frames())
403        };
404
405        let available_frames = self.available_frames();
406        let total_frames_to_copy = total_frames.min(available_frames);
407
408        #[cfg(feature = "resampler")]
409        let process_non_resampled = self.resampler.is_none();
410        #[cfg(not(feature = "resampler"))]
411        let process_non_resampled = true;
412
413        #[cfg(feature = "resampler")]
414        if let Some(resampler) = &mut self.resampler {
415            resampler.process(
416                input,
417                Some(input_start..input_start + total_frames_to_copy),
418                active_channels_mask,
419                |output_packet, _frames| {
420                    let pushed_samples = self.prod.push_slice(output_packet);
421                    debug_assert_eq!(pushed_samples, output_packet.len());
422                },
423                None,
424                false,
425            );
426        }
427
428        if process_non_resampled {
429            let tmp_out_buffer = self.tmp_out_buffer.as_mut().expect(
430                "ResamplingProd::push was called even though push_interleave_only was set to true",
431            );
432
433            if input.channels() < self.num_channels || active_channels_mask.is_some() {
434                let tmp_out_buf_len = tmp_out_buffer.len();
435                tmp_out_buffer[0..(total_frames_to_copy * self.num_channels).min(tmp_out_buf_len)]
436                    .fill(T::zero());
437            }
438
439            let mut frames_left = total_frames_to_copy;
440            while frames_left > 0 {
441                let block_frames_to_copy = frames_left.min(TMP_OUT_BUFFER_FRAMES);
442
443                {
444                    let mut out_buf_wrapper = InterleavedSlice::new_mut(
445                        tmp_out_buffer,
446                        self.num_channels,
447                        TMP_OUT_BUFFER_FRAMES,
448                    )
449                    .unwrap();
450
451                    for ch_i in 0..input.channels() {
452                        let channel_active = active_channels_mask
453                            .as_ref()
454                            .map(|m| m.get(ch_i).copied().unwrap_or(false))
455                            .unwrap_or(true);
456
457                        if channel_active {
458                            out_buf_wrapper.copy_from_other_to_channel(
459                                &AdapterWrapper { inner: input },
460                                ch_i,
461                                ch_i,
462                                input_start + (total_frames_to_copy - frames_left),
463                                0,
464                                block_frames_to_copy,
465                            );
466                        }
467                    }
468                }
469
470                let pushed_samples = self
471                    .prod
472                    .push_slice(&tmp_out_buffer[0..block_frames_to_copy * self.num_channels]);
473                debug_assert_eq!(pushed_samples, block_frames_to_copy * self.num_channels);
474
475                frames_left -= block_frames_to_copy;
476            }
477        }
478
479        if total_frames_to_copy < total_frames {
480            PushStatus::OverflowOccurred {
481                num_frames_pushed: total_frames_to_copy,
482            }
483        } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
484            PushStatus::UnderflowCorrected {
485                num_zero_frames_pushed: zero_frames_pushed,
486            }
487        } else {
488            PushStatus::Ok
489        }
490    }
491
492    /// Push the given input data in interleaved format.
493    ///
494    /// The input buffer must have the same number of channels as this producer.
495    ///
496    /// This method is realtime-safe.
497    pub fn push_interleaved(&mut self, input: &[T]) -> PushStatus {
498        self.set_input_stream_ready(true);
499
500        if !self.output_stream_ready() {
501            return PushStatus::OutputNotReady;
502        }
503
504        self.poll_reset();
505
506        let total_frames = input.len() / self.num_channels;
507
508        let available_frames = self.available_frames();
509        let total_frames_to_copy = total_frames.min(available_frames);
510
511        #[cfg(feature = "resampler")]
512        if let Some(resampler) = &mut self.resampler {
513            let input_wrapper =
514                InterleavedSlice::new(input, self.num_channels, total_frames_to_copy).unwrap();
515
516            resampler.process(
517                &input_wrapper,
518                None,
519                None,
520                |output_packet, _frames| {
521                    let pushed_samples = self.prod.push_slice(output_packet);
522                    debug_assert_eq!(pushed_samples, output_packet.len());
523                },
524                None,
525                false,
526            );
527        } else {
528            let pushed_samples = self
529                .prod
530                .push_slice(&input[0..total_frames_to_copy * self.num_channels]);
531            debug_assert_eq!(pushed_samples, total_frames_to_copy * self.num_channels);
532        }
533
534        if total_frames_to_copy < total_frames {
535            PushStatus::OverflowOccurred {
536                num_frames_pushed: total_frames_to_copy,
537            }
538        } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
539            PushStatus::UnderflowCorrected {
540                num_zero_frames_pushed: zero_frames_pushed,
541            }
542        } else {
543            PushStatus::Ok
544        }
545    }
546
547    /// Returns the number of input frames (samples in a single channel of audio)
548    /// that are currently available to be pushed to the channel.
549    ///
550    /// If the output stream is not ready yet, then this will return `0`.
551    ///
552    /// This method is realtime-safe.
553    pub fn available_frames(&mut self) -> usize {
554        if !self.output_stream_ready() {
555            return 0;
556        }
557
558        self.poll_reset();
559
560        let output_vacant_frames = self.prod.vacant_len() / self.num_channels;
561
562        #[cfg(feature = "resampler")]
563        if let Some(resampler) = &self.resampler {
564            let mut input_vacant_frames =
565                (output_vacant_frames as f64 * self.output_to_input_ratio).floor() as usize;
566
567            // Give some leeway to account for floating point inaccuracies.
568            input_vacant_frames = input_vacant_frames.saturating_sub(1);
569
570            if input_vacant_frames < resampler.max_input_block_frames() {
571                return 0;
572            }
573
574            // The resampler processes in chunks.
575            input_vacant_frames = (input_vacant_frames / resampler.max_input_block_frames())
576                * resampler.max_input_block_frames();
577
578            return input_vacant_frames - resampler.tmp_input_frames();
579        }
580
581        output_vacant_frames
582    }
583
584    /// The amount of data in seconds that is available to be pushed to the
585    /// channel.
586    ///
587    /// If the output stream is not ready yet, then this will return `0.0`.
588    ///
589    /// This method is realtime-safe.
590    pub fn available_seconds(&mut self) -> f64 {
591        self.available_frames() as f64 * self.in_sample_rate_recip
592    }
593
594    /// The amount of data that is currently occupied in the channel, in units of
595    /// output frames (samples in a single channel of audio).
596    ///
597    /// Note, this is the number of frames in the *output* audio stream, not the
598    /// input audio stream.
599    ///
600    /// This method is realtime-safe.
601    pub fn occupied_output_frames(&self) -> usize {
602        self.prod.occupied_len() / self.num_channels
603    }
604
605    /// The amount of data that is currently occupied in the channel, in units of
606    /// seconds.
607    ///
608    /// This method is realtime-safe.
609    pub fn occupied_seconds(&self) -> f64 {
610        self.occupied_output_frames() as f64 * self.out_sample_rate_recip
611    }
612
613    /// The number of channels configured for this stream.
614    ///
615    /// This method is realtime-safe.
616    pub fn num_channels(&self) -> usize {
617        self.num_channels
618    }
619
620    /// The sample rate of the input stream.
621    ///
622    /// This method is realtime-safe.
623    pub fn in_sample_rate(&self) -> f64 {
624        self.in_sample_rate
625    }
626
627    /// The sample rate of the output stream.
628    ///
629    /// This method is realtime-safe.
630    pub fn out_sample_rate(&self) -> f64 {
631        self.out_sample_rate
632    }
633
634    /// The latency of the channel in units of seconds.
635    ///
636    /// This method is realtime-safe.
637    pub fn latency_seconds(&self) -> f64 {
638        self.latency_seconds
639    }
640
641    /// Returns `true` if this channel is currently resampling.
642    ///
643    /// This method is realtime-safe.
644    #[cfg(feature = "resampler")]
645    pub fn is_resampling(&self) -> bool {
646        self.resampler.is_some()
647    }
648
649    /// Tell the consumer to clear all queued frames in the buffer.
650    ///
651    /// This method is realtime-safe.
652    pub fn reset(&mut self) {
653        self.shared_state.reset.store(true, Ordering::Relaxed);
654
655        self.waiting_for_output_to_reset = true;
656
657        #[cfg(feature = "resampler")]
658        if let Some(resampler) = &mut self.resampler {
659            resampler.reset();
660        }
661    }
662
663    /// Manually notify the output stream that the input stream is ready/not ready
664    /// to push samples to the channel.
665    ///
666    /// If this producer end is being used in a non-realtime context, then it is
667    /// a good idea to set this to `true` so that the consumer end can start
668    /// reading samples from the channel immediately.
669    ///
670    /// Note, calling [`ResamplingProd::push`] and
671    /// [`ResamplingProd::push_interleaved`] automatically sets the input stream as
672    /// ready.
673    ///
674    /// This method is realtime-safe.
675    pub fn set_input_stream_ready(&mut self, ready: bool) {
676        self.shared_state
677            .input_stream_ready
678            .store(ready, Ordering::Relaxed);
679    }
680
681    /// Whether or not the output stream is ready to read samples from the channel.
682    ///
683    /// This method is realtime-safe.
684    pub fn output_stream_ready(&self) -> bool {
685        self.shared_state
686            .output_stream_ready
687            .load(Ordering::Relaxed)
688            && !self.shared_state.reset.load(Ordering::Relaxed)
689    }
690
691    /// Correct for any underflows.
692    ///
693    /// This returns the number of extra zero frames (samples in a single channel of audio)
694    /// that were added due to an underflow occurring. If no underflow occured, then `None`
695    /// is returned.
696    ///
697    /// Note, this method is already automatically called in [`ResamplingProd::push`] and
698    /// [`ResamplingProd::push_interleaved`].
699    ///
700    /// This will have no effect if [`ResamplingChannelConfig::underflow_autocorrect_percent_threshold`]
701    /// was set to `None`.
702    ///
703    /// This method is realtime-safe.
704    pub fn autocorrect_underflows(&mut self) -> Option<usize> {
705        if !self.output_stream_ready() {
706            return None;
707        }
708
709        self.poll_reset();
710
711        if let Some(underflow_autocorrect_threshold_samples) =
712            self.underflow_autocorrect_threshold_samples
713        {
714            let len = self.prod.occupied_len();
715
716            if len <= underflow_autocorrect_threshold_samples && len < self.channel_latency_samples
717            {
718                let correction_samples = self.channel_latency_samples - len;
719
720                self.prod
721                    .push_iter((0..correction_samples).map(|_| T::zero()));
722
723                return Some(correction_samples / self.num_channels);
724            }
725        }
726
727        None
728    }
729
730    fn poll_reset(&mut self) {
731        if self.waiting_for_output_to_reset {
732            self.waiting_for_output_to_reset = false;
733
734            // Fill the channel with initial zeros to create the desired latency.
735            self.prod
736                .push_iter((0..self.channel_latency_samples).map(|_| T::zero()));
737        }
738    }
739}
740
741/// The consumer end of a realtime-safe spsc channel for sending samples across
742/// streams.
743///
744/// If the input and output samples rates differ, then this will automatically
745/// resample the input stream to match the output stream. If the sample rates
746/// match, then no resampling will occur.
747///
748/// Internally this uses the `rubato` and `ringbuf` crates.
749pub struct ResamplingCons<T: Sample> {
750    cons: ringbuf::HeapCons<T>,
751    num_channels: usize,
752    latency_seconds: f64,
753    latency_frames: usize,
754    channel_latency_samples: usize,
755    in_sample_rate: f64,
756    out_sample_rate: f64,
757    out_sample_rate_recip: f64,
758    shared_state: Arc<SharedState>,
759    waiting_for_input_to_reset: bool,
760    overflow_autocorrect_threshold_samples: Option<usize>,
761
762    #[cfg(feature = "resampler")]
763    resampler_output_delay: usize,
764    #[cfg(feature = "resampler")]
765    is_resampling: bool,
766}
767
768impl<T: Sample + 'static> ResamplingCons<T> {
769    /// The number of channels configured for this stream.
770    ///
771    /// This method is realtime-safe.
772    pub fn num_channels(&self) -> usize {
773        self.num_channels
774    }
775
776    /// The sample rate of the input stream.
777    ///
778    /// This method is realtime-safe.
779    pub fn in_sample_rate(&self) -> f64 {
780        self.in_sample_rate
781    }
782
783    /// The sample rate of the output stream.
784    ///
785    /// This method is realtime-safe.
786    pub fn out_sample_rate(&self) -> f64 {
787        self.out_sample_rate
788    }
789
790    /// The latency of the channel in units of seconds.
791    ///
792    /// This method is realtime-safe.
793    pub fn latency_seconds(&self) -> f64 {
794        self.latency_seconds
795    }
796
797    /// The latency of the channel in units of output frames.
798    ///
799    /// This method is realtime-safe.
800    pub fn latency_frames(&self) -> usize {
801        self.latency_frames
802    }
803
804    /// The number of frames (samples in a single channel of audio) that are
805    /// currently available to be read from the channel.
806    ///
807    /// If the input stream is not ready yet, then this will return `0`.
808    ///
809    /// This method is realtime-safe.
810    pub fn available_frames(&self) -> usize {
811        if self.input_stream_ready() {
812            self.cons.occupied_len() / self.num_channels
813        } else {
814            0
815        }
816    }
817
818    /// The amount of data in seconds that is currently available to be read
819    /// from the channel.
820    ///
821    /// If the input stream is not ready yet, then this will return `0.0`.
822    ///
823    /// This method is realtime-safe.
824    pub fn available_seconds(&self) -> f64 {
825        self.available_frames() as f64 * self.out_sample_rate_recip
826    }
827
828    /// The amount of data that is currently occupied in the channel, in units of
829    /// seconds.
830    ///
831    /// This method is realtime-safe.
832    pub fn occupied_seconds(&self) -> f64 {
833        (self.cons.occupied_len() / self.num_channels) as f64 * self.out_sample_rate_recip
834    }
835
836    /// Returns `true` if this channel is currently resampling.
837    ///
838    /// This method is realtime-safe.
839    #[cfg(feature = "resampler")]
840    pub fn is_resampling(&self) -> bool {
841        self.is_resampling
842    }
843
844    /// The delay of the internal resampler in number of output frames (samples in
845    /// a single channel of audio).
846    ///
847    /// If there is no resampler being used for this channel, then this will return
848    /// `0`.
849    ///
850    /// This method is realtime-safe.
851    #[cfg(feature = "resampler")]
852    pub fn resampler_output_delay(&self) -> usize {
853        self.resampler_output_delay
854    }
855
856    /// Discard a certian number of output frames from the buffer. This can be used
857    /// to correct for jitter and avoid excessive overflows and reduce the percieved
858    /// audible glitchiness.
859    ///
860    /// This will discard `frames.min(self.available_frames())` frames.
861    ///
862    /// Returns the number of output frames that were discarded.
863    ///
864    /// This method is realtime-safe.
865    pub fn discard_frames(&mut self, frames: usize) -> usize {
866        self.cons.skip(frames * self.num_channels) / self.num_channels
867    }
868
869    /// Read from the channel and store the results in the given output buffer.
870    ///
871    /// * `output` - The output buffer. You can use one of the types in the
872    ///   [`audioadapter_buffers::direct`](crate::audioadapter_buffers::direct) module
873    ///   to wrap your input data into a type that implements [`Adapter`].
874    /// * `output_range` - The range in each output channel to write to. If this is
875    ///   `None`, then the entire output buffer will be read.
876    /// * `active_channels_mask` - An optional mask that selects which channels in
877    ///   `output` to use. Channels marked with `false` will be filled with zeros.
878    ///   If `None`, then all of the channels will be active.
879    /// * `output_is_already_cleared` - If `true`, then this will skip the step of
880    ///   clearing the output buffer in the range `output_range` to zeros.
881    ///
882    /// This method is realtime-safe.
883    ///
884    /// # Panics
885    /// Panics if:
886    /// * The `input_range` is out of bounds for any of the input channels.
887    pub fn read(
888        &mut self,
889        output: &mut dyn AdapterMut<'_, T>,
890        output_range: Option<Range<usize>>,
891        active_channels_mask: Option<&[bool]>,
892        output_is_already_cleared: bool,
893    ) -> ReadStatus {
894        self.set_output_stream_ready(true);
895
896        self.poll_reset();
897
898        let (output_start, output_frames) = if let Some(range) = output_range {
899            (range.start, range.end - range.start)
900        } else {
901            (0, output.frames())
902        };
903
904        if !output_is_already_cleared {
905            output.fill_frames_with(output_start, output_frames, &T::zero());
906        }
907
908        if !self.input_stream_ready() {
909            return ReadStatus::InputNotReady;
910        }
911
912        self.waiting_for_input_to_reset = false;
913
914        let (s1, s2) = self.cons.as_slices();
915
916        let s1_frames = s1.len() / self.num_channels;
917        let s1_copy_frames = s1_frames.min(output_frames);
918
919        let s1_wrapper = InterleavedSlice::new(s1, self.num_channels, s1_frames).unwrap();
920        let s1_wrapper_2 = AdapterWrapper { inner: &s1_wrapper };
921
922        for ch_i in 0..output.channels().min(self.num_channels) {
923            let channel_active = active_channels_mask
924                .as_ref()
925                .map(|m| m.get(ch_i).copied().unwrap_or(false))
926                .unwrap_or(true);
927
928            if channel_active {
929                output.copy_from_other_to_channel(
930                    &s1_wrapper_2,
931                    ch_i,
932                    ch_i,
933                    0,
934                    output_start,
935                    s1_copy_frames,
936                );
937            }
938        }
939
940        let mut filled_frames = s1_copy_frames;
941
942        if output_frames > s1_copy_frames {
943            let s2_frames = s2.len() / self.num_channels;
944            let s2_copy_frames = s2_frames.min(output_frames - s1_copy_frames);
945
946            let s2_wrapper = InterleavedSlice::new(s2, self.num_channels, s2_frames).unwrap();
947            let s2_wrapper_2 = AdapterWrapper { inner: &s2_wrapper };
948
949            for ch_i in 0..output.channels().min(self.num_channels) {
950                let channel_active = active_channels_mask
951                    .as_ref()
952                    .map(|m| m.get(ch_i).copied().unwrap_or(false))
953                    .unwrap_or(true);
954
955                if channel_active {
956                    output.copy_from_other_to_channel(
957                        &s2_wrapper_2,
958                        ch_i,
959                        ch_i,
960                        0,
961                        output_start + s1_copy_frames,
962                        s2_copy_frames,
963                    );
964                }
965            }
966
967            filled_frames += s2_copy_frames;
968        }
969
970        self.cons.skip(filled_frames * self.num_channels);
971
972        if filled_frames < output_frames {
973            ReadStatus::UnderflowOccurred {
974                num_frames_read: filled_frames,
975            }
976        } else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
977            ReadStatus::OverflowCorrected {
978                num_frames_discarded,
979            }
980        } else {
981            ReadStatus::Ok
982        }
983    }
984
985    /// Read from the channel and store the results into the output buffer
986    /// in interleaved format.
987    ///
988    /// * `output` - The output buffer to write to. The output buffer must
989    ///   have the same number of channels as this consumer.
990    /// * `buffer_out_is_already_cleared` - If `true`, then this will skip the step of
991    ///   clearing the output buffer in the range `output_range` to zeros.
992    ///
993    /// This method is realtime-safe.
994    pub fn read_interleaved(
995        &mut self,
996        output: &mut [T],
997        output_already_cleared: bool,
998    ) -> ReadStatus {
999        self.set_output_stream_ready(true);
1000
1001        self.poll_reset();
1002
1003        if !self.input_stream_ready() {
1004            if !output_already_cleared {
1005                output.fill(T::zero());
1006            }
1007
1008            return ReadStatus::InputNotReady;
1009        }
1010
1011        self.waiting_for_input_to_reset = false;
1012
1013        let out_frames = output.len() / self.num_channels;
1014        let out_len = out_frames * self.num_channels;
1015
1016        let pushed_samples = self.cons.pop_slice(&mut output[..out_len]);
1017
1018        if pushed_samples < out_len {
1019            if !output_already_cleared {
1020                output[pushed_samples..].fill(T::zero());
1021            }
1022
1023            ReadStatus::UnderflowOccurred {
1024                num_frames_read: pushed_samples / self.num_channels,
1025            }
1026        } else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
1027            ReadStatus::OverflowCorrected {
1028                num_frames_discarded,
1029            }
1030        } else {
1031            ReadStatus::Ok
1032        }
1033    }
1034
1035    /// Poll the channel to see if it got a command to reset.
1036    ///
1037    /// Returns `true` if the channel was reset.
1038    pub fn poll_reset(&mut self) -> bool {
1039        if self.shared_state.reset.load(Ordering::Relaxed) {
1040            self.shared_state.reset.store(false, Ordering::Relaxed);
1041            self.waiting_for_input_to_reset = true;
1042
1043            self.cons.clear();
1044
1045            true
1046        } else {
1047            false
1048        }
1049    }
1050
1051    /// Manually notify the input stream that the output stream is ready/not ready
1052    /// to read samples from the channel.
1053    ///
1054    /// If this consumer end is being used in a non-realtime context, then it is
1055    /// a good idea to set this to `true` so that the producer end can start
1056    /// pushing samples to the channel immediately.
1057    ///
1058    /// Note, calling [`ResamplingCons::read`] and
1059    /// [`ResamplingCons::read_interleaved`] automatically sets the output stream as
1060    /// ready.
1061    ///
1062    /// This method is realtime-safe.
1063    pub fn set_output_stream_ready(&mut self, ready: bool) {
1064        self.shared_state
1065            .output_stream_ready
1066            .store(ready, Ordering::Relaxed);
1067    }
1068
1069    /// Whether or not the input stream is ready to push samples to the channel.
1070    ///
1071    /// This method is realtime-safe.
1072    pub fn input_stream_ready(&self) -> bool {
1073        self.shared_state.input_stream_ready.load(Ordering::Relaxed)
1074            && !(self.waiting_for_input_to_reset && self.cons.is_empty())
1075    }
1076
1077    /// Correct for any overflows.
1078    ///
1079    /// This returns the number of frames (samples in a single channel of audio) that were
1080    /// discarded due to an overflow occurring. If no overflow occured, then `None`
1081    /// is returned.
1082    ///
1083    /// Note, this method is already automatically called in [`ResamplingCons::read`] and
1084    /// [`ResamplingCons::read_interleaved`].
1085    ///
1086    /// This will have no effect if [`ResamplingChannelConfig::overflow_autocorrect_percent_threshold`]
1087    /// was set to `None`.
1088    ///
1089    /// This method is realtime-safe.
1090    pub fn autocorrect_overflows(&mut self) -> Option<usize> {
1091        if let Some(overflow_autocorrect_threshold_samples) =
1092            self.overflow_autocorrect_threshold_samples
1093        {
1094            let len = self.cons.occupied_len();
1095
1096            if len >= overflow_autocorrect_threshold_samples && len > self.channel_latency_samples {
1097                let correction_frames = (len - self.channel_latency_samples) / self.num_channels;
1098
1099                self.discard_frames(correction_frames);
1100
1101                return Some(correction_frames);
1102            }
1103        }
1104
1105        None
1106    }
1107}
1108
1109/// The status of pushing samples to [`ResamplingProd::push`] and
1110/// [`ResamplingProd::push_interleaved`].
1111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1112pub enum PushStatus {
1113    /// All samples were successfully pushed to the channel.
1114    Ok,
1115    /// The output stream is not yet ready to read samples from the channel.
1116    ///
1117    /// Note, this can also happen when the channel is reset.
1118    ///
1119    /// No samples were pushed to the channel.
1120    OutputNotReady,
1121    /// An overflow occured due to the input stream running faster than the
1122    /// output stream. Some or all of the samples were not pushed to the channel.
1123    ///
1124    /// If this occurs, then it may mean that [`ResamplingChannelConfig::capacity_seconds`]
1125    /// is too low and should be increased.
1126    OverflowOccurred {
1127        /// The number of frames (samples in a single channel of audio) that were
1128        /// successfully pushed to the channel.
1129        num_frames_pushed: usize,
1130    },
1131    /// An underflow occured due to the output stream running faster than the
1132    /// input stream.
1133    ///
1134    /// All of the samples were successfully pushed to the channel, however extra
1135    /// zero samples were also pushed to the channel to correct for the jitter.
1136    ///
1137    /// If this occurs, then it may mean that [`ResamplingChannelConfig::latency_seconds`]
1138    /// is too low and should be increased.
1139    UnderflowCorrected {
1140        /// The number of zero frames that were pushed after the other samples
1141        /// were pushed.
1142        num_zero_frames_pushed: usize,
1143    },
1144}
1145
1146/// The status of reading data from [`ResamplingCons::read`] and
1147/// [`ResamplingCons::read_interleaved`].
1148#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1149pub enum ReadStatus {
1150    /// The output buffer was fully filled with samples from the channel.
1151    Ok,
1152    /// The input stream is not yet ready to push samples to the channel.
1153    ///
1154    /// Note, this can also happen when the channel is reset.
1155    ///
1156    /// The output buffer was filled with zeros.
1157    InputNotReady,
1158    /// An underflow occured due to the output stream running faster than the input
1159    /// stream. Some or all of the samples in the output buffer have been filled with
1160    /// zeros on the end. This may result in audible audio glitches.
1161    ///
1162    /// If this occurs, then it may mean that [`ResamplingChannelConfig::latency_seconds`]
1163    /// is too low and should be increased.
1164    UnderflowOccurred {
1165        /// The number of frames (samples in a single channel of audio) that were
1166        /// successfully read from the channel. All frames past this have been filled
1167        /// with zeros.
1168        num_frames_read: usize,
1169    },
1170    /// An overflow occured due to the input stream running faster than the output
1171    /// stream
1172    ///
1173    /// All of the samples in the output buffer were successfully filled with samples,
1174    /// however a number of frames have also been discarded to correct for the jitter.
1175    ///
1176    /// If this occurs, then it may mean that [`ResamplingChannelConfig::capacity_seconds`]
1177    /// is too low and should be increased.
1178    OverflowCorrected {
1179        /// The number of frames that were discarded from the channel (after the
1180        /// frames have been read into the output buffer).
1181        num_frames_discarded: usize,
1182    },
1183}
1184
1185struct SharedState {
1186    reset: AtomicBool,
1187    input_stream_ready: AtomicBool,
1188    output_stream_ready: AtomicBool,
1189}
1190
1191impl SharedState {
1192    fn new() -> Self {
1193        Self {
1194            reset: AtomicBool::new(false),
1195            input_stream_ready: AtomicBool::new(false),
1196            output_stream_ready: AtomicBool::new(false),
1197        }
1198    }
1199}
1200
1201/// Needed to get around lifetime nonsense
1202struct AdapterWrapper<'a, 'b, T: Sample> {
1203    inner: &'a dyn Adapter<'b, T>,
1204}
1205
1206impl<'a, T: Sample + 'static> Adapter<'a, T> for AdapterWrapper<'_, '_, T> {
1207    /// Safety: This is just wrapping the inner method
1208    unsafe fn read_sample_unchecked(&self, channel: usize, frame: usize) -> T {
1209        self.inner.read_sample_unchecked(channel, frame)
1210    }
1211
1212    fn read_sample(&self, channel: usize, frame: usize) -> Option<T> {
1213        self.inner.read_sample(channel, frame)
1214    }
1215
1216    fn channels(&self) -> usize {
1217        self.inner.channels()
1218    }
1219
1220    fn frames(&self) -> usize {
1221        self.inner.frames()
1222    }
1223
1224    fn copy_from_channel_to_slice(&self, channel: usize, skip: usize, slice: &mut [T]) -> usize {
1225        self.inner.copy_from_channel_to_slice(channel, skip, slice)
1226    }
1227
1228    fn copy_from_frame_to_slice(&self, frame: usize, skip: usize, slice: &mut [T]) -> usize {
1229        self.inner.copy_from_frame_to_slice(frame, skip, slice)
1230    }
1231}