fixed_resample/
channel.rs

1use std::{
2    num::NonZeroUsize,
3    ops::Range,
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc,
7    },
8};
9
10use ringbuf::traits::{Consumer, Observer, Producer, Split};
11
12use crate::Sample;
13
14#[cfg(feature = "resampler")]
15use crate::{resampler_type::ResamplerType, FixedResampler, ResampleQuality};
16
17/// Additional options for a resampling channel.
18#[derive(Debug, Clone, Copy, PartialEq)]
19pub struct ResamplingChannelConfig {
20    /// The amount of latency added in seconds between the input stream and the
21    /// output stream. If this value is too small, then underflows may occur.
22    ///
23    /// The default value is `0.15` (150 ms).
24    pub latency_seconds: f64,
25
26    /// The capacity of the channel in seconds. If this is too small, then
27    /// overflows may occur. This should be at least twice as large as
28    /// `latency_seconds`.
29    ///
30    /// Note, the actual capacity may be slightly smaller due to how the internal
31    /// sampler processes in chunks.
32    ///
33    /// The default value is `0.4` (400 ms).
34    pub capacity_seconds: f64,
35
36    /// If the number of occupied samples in the channel is greater than or equal to
37    /// (`latency_seconds + percent * (capacity_seconds - latency_seconds)`), then discard the
38    /// number of samples needed to bring the number of occupied seconds back down to
39    /// [`ResamplingChannelConfig::latency_seconds`]. This is used to avoid excessive
40    /// overflows and reduce the percieved audio glitchiness.
41    ///
42    /// The percentage is a value in the range `[0.0, 100.0]`.
43    ///
44    /// Set to `None` to disable this autocorrecting behavior. If the producer end is being
45    /// used in a non-realtime context, then this should be set to `None`.
46    ///
47    /// By default this is set to `Some(75.0)`.
48    pub overflow_autocorrect_percent_threshold: Option<f64>,
49
50    /// If the number of occupied samples in the channel is below or equal to the given
51    /// percentage of [`ResamplingChannelConfig::latency_seconds`], then insert the number of
52    /// zero frames needed to bring the number of occupied samples back up to
53    /// [`ResamplingChannelConfig::latency_seconds`]. This is used to avoid excessive underflows
54    /// and reduce the percieved audio glitchiness.
55    ///
56    /// The percentage is a value in the range `[0.0, 100.0]`.
57    ///
58    /// Set to `None` to disable this autocorrecting behavior. If the consumer end is being
59    /// used in a non-realtime context, then this should be set to `None`.
60    ///
61    /// By default this is set to `Some(25.0)`.
62    pub underflow_autocorrect_percent_threshold: Option<f64>,
63
64    #[cfg(feature = "resampler")]
65    /// The quality of the resampling alrgorithm to use if needed.
66    ///
67    /// The default value is `ResampleQuality::Normal`.
68    pub quality: ResampleQuality,
69
70    #[cfg(feature = "resampler")]
71    /// If `true`, then the delay of the internal resampler (if used) will be
72    /// subtracted from the `latency_seconds` value to keep the perceived
73    /// latency consistent.
74    ///
75    /// The default value is `true`.
76    pub subtract_resampler_delay: bool,
77}
78
79impl Default for ResamplingChannelConfig {
80    fn default() -> Self {
81        Self {
82            latency_seconds: 0.15,
83            capacity_seconds: 0.4,
84            overflow_autocorrect_percent_threshold: Some(75.0),
85            underflow_autocorrect_percent_threshold: Some(25.0),
86            #[cfg(feature = "resampler")]
87            quality: ResampleQuality::High,
88            #[cfg(feature = "resampler")]
89            subtract_resampler_delay: true,
90        }
91    }
92}
93
94/// Create a new realtime-safe spsc channel for sending samples across streams.
95///
96/// If the input and output samples rates differ, then this will automatically
97/// resample the input stream to match the output stream (unless the "resample"
98/// feature is disabled). If the sample rates match, then no resampling will
99/// occur.
100///
101/// Internally this uses the `rubato` and `ringbuf` crates.
102///
103/// * `in_sample_rate` - The sample rate of the input stream.
104/// * `out_sample_rate` - The sample rate of the output stream.
105/// * `num_channels` - The number of channels in the stream.
106/// * `config` - Additional options for the resampling channel.
107///
108/// # Panics
109///
110/// Panics when any of the following are true:
111///
112/// * `in_sample_rate == 0`
113/// * `out_sample_rate == 0`
114/// * `num_channels == 0`
115/// * `config.latency_seconds <= 0.0`
116/// * `config.capacity_seconds <= 0.0`
117///
118/// If the "resampler" feature is disabled, then this will also panic if
119/// `in_sample_rate != out_sample_rate`.
120pub fn resampling_channel<T: Sample, const MAX_CHANNELS: usize>(
121    num_channels: NonZeroUsize,
122    in_sample_rate: u32,
123    out_sample_rate: u32,
124    config: ResamplingChannelConfig,
125) -> (ResamplingProd<T, MAX_CHANNELS>, ResamplingCons<T>) {
126    #[cfg(feature = "resampler")]
127    let resampler = if in_sample_rate != out_sample_rate {
128        Some(FixedResampler::<T, MAX_CHANNELS>::new(
129            num_channels,
130            in_sample_rate,
131            out_sample_rate,
132            config.quality,
133            true,
134        ))
135    } else {
136        None
137    };
138
139    resampling_channel_inner(
140        #[cfg(feature = "resampler")]
141        resampler,
142        num_channels,
143        in_sample_rate,
144        out_sample_rate,
145        config,
146    )
147}
148
149/// Create a new realtime-safe spsc channel for sending samples across streams
150/// using the custom resampler.
151///
152/// If the input and output samples rates differ, then this will automatically
153/// resample the input stream to match the output stream. If the sample rates
154/// match, then no resampling will occur.
155///
156/// Internally this uses the `rubato` and `ringbuf` crates.
157///
158/// * `resampler` - The custom rubato resampler.
159/// * `in_sample_rate` - The sample rate of the input stream.
160/// * `out_sample_rate` - The sample rate of the output stream.
161/// * `num_channels` - The number of channels in the stream.
162/// * `config` - Additional options for the resampling channel. Note that
163/// `config.quality` will be ignored.
164///
165/// # Panics
166///
167/// Panics when any of the following are true:
168///
169/// * `resampler.num_channels() != num_channels`
170/// * `in_sample_rate == 0`
171/// * `out_sample_rate == 0`
172/// * `num_channels == 0`
173/// * `config.latency_seconds <= 0.0`
174/// * `config.capacity_seconds <= 0.0`
175#[cfg(feature = "resampler")]
176pub fn resampling_channel_custom<T: Sample, const MAX_CHANNELS: usize>(
177    resampler: impl Into<ResamplerType<T>>,
178    num_channels: NonZeroUsize,
179    in_sample_rate: u32,
180    out_sample_rate: u32,
181    config: ResamplingChannelConfig,
182) -> (ResamplingProd<T, MAX_CHANNELS>, ResamplingCons<T>) {
183    let resampler = if in_sample_rate != out_sample_rate {
184        let resampler = FixedResampler::<T, MAX_CHANNELS>::from_custom(
185            resampler,
186            in_sample_rate,
187            out_sample_rate,
188            true,
189        );
190        assert_eq!(resampler.num_channels(), num_channels);
191
192        Some(resampler)
193    } else {
194        None
195    };
196
197    resampling_channel_inner(
198        resampler,
199        num_channels,
200        in_sample_rate,
201        out_sample_rate,
202        config,
203    )
204}
205
206fn resampling_channel_inner<T: Sample, const MAX_CHANNELS: usize>(
207    #[cfg(feature = "resampler")] resampler: Option<FixedResampler<T, MAX_CHANNELS>>,
208    num_channels: NonZeroUsize,
209    in_sample_rate: u32,
210    out_sample_rate: u32,
211    config: ResamplingChannelConfig,
212) -> (ResamplingProd<T, MAX_CHANNELS>, ResamplingCons<T>) {
213    #[cfg(not(feature = "resampler"))]
214    assert_eq!(
215        in_sample_rate, out_sample_rate,
216        "Input and output sample rate must be equal when the \"resampler\" feature is disabled"
217    );
218
219    assert_ne!(in_sample_rate, 0);
220    assert_ne!(out_sample_rate, 0);
221    assert!(config.latency_seconds > 0.0);
222    assert!(config.capacity_seconds > 0.0);
223
224    #[cfg(feature = "resampler")]
225    let is_resampling = resampler.is_some();
226    #[cfg(feature = "resampler")]
227    let resampler_output_delay = resampler.as_ref().map(|r| r.output_delay()).unwrap_or(0);
228
229    let in_sample_rate_recip = (in_sample_rate as f64).recip();
230    let out_sample_rate_recip = (out_sample_rate as f64).recip();
231
232    #[cfg(feature = "resampler")]
233    let output_to_input_ratio = in_sample_rate as f64 / out_sample_rate as f64;
234
235    let latency_frames =
236        ((out_sample_rate as f64 * config.latency_seconds).round() as usize).max(1);
237
238    #[allow(unused_mut)]
239    let mut channel_latency_frames = latency_frames;
240
241    #[cfg(feature = "resampler")]
242    if resampler.is_some() && config.subtract_resampler_delay {
243        if latency_frames > resampler_output_delay {
244            channel_latency_frames -= resampler_output_delay;
245        } else {
246            channel_latency_frames = 1;
247        }
248    }
249
250    let channel_latency_samples = channel_latency_frames * num_channels.get();
251
252    let buffer_capacity_frames = ((in_sample_rate as f64 * config.capacity_seconds).round()
253        as usize)
254        .max(channel_latency_frames * 2);
255
256    let (mut prod, cons) =
257        ringbuf::HeapRb::<T>::new(buffer_capacity_frames * num_channels.get()).split();
258
259    // Fill the channel with initial zeros to create the desired latency.
260    prod.push_slice(&vec![
261        T::zero();
262        channel_latency_frames * num_channels.get()
263    ]);
264
265    let shared_state = Arc::new(SharedState::new());
266
267    let overflow_autocorrect_threshold_samples =
268        config
269            .overflow_autocorrect_percent_threshold
270            .map(|percent| {
271                let range_samples =
272                    (buffer_capacity_frames - channel_latency_frames) * num_channels.get();
273
274                ((range_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
275                    .min(range_samples)
276                    + channel_latency_samples
277            });
278    let underflow_autocorrect_threshold_samples = config
279        .underflow_autocorrect_percent_threshold
280        .map(|percent| {
281            ((channel_latency_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
282                .min(channel_latency_samples)
283        });
284
285    (
286        ResamplingProd {
287            prod,
288            num_channels,
289            latency_seconds: config.latency_seconds,
290            channel_latency_samples,
291            in_sample_rate,
292            in_sample_rate_recip,
293            out_sample_rate,
294            out_sample_rate_recip,
295            shared_state: Arc::clone(&shared_state),
296            waiting_for_output_to_reset: false,
297            underflow_autocorrect_threshold_samples,
298            #[cfg(feature = "resampler")]
299            resampler,
300            #[cfg(feature = "resampler")]
301            output_to_input_ratio,
302        },
303        ResamplingCons {
304            cons,
305            num_channels,
306            latency_frames,
307            latency_seconds: config.latency_seconds,
308            channel_latency_samples,
309            in_sample_rate,
310            out_sample_rate,
311            out_sample_rate_recip,
312            shared_state,
313            waiting_for_input_to_reset: false,
314            overflow_autocorrect_threshold_samples,
315            #[cfg(feature = "resampler")]
316            is_resampling,
317            #[cfg(feature = "resampler")]
318            resampler_output_delay,
319        },
320    )
321}
322
323/// The producer end of a realtime-safe spsc channel for sending samples across
324/// streams.
325///
326/// If the input and output samples rates differ, then this will automatically
327/// resample the input stream to match the output stream. If the sample rates
328/// match, then no resampling will occur.
329///
330/// Internally this uses the `rubato` and `ringbuf` crates.
331pub struct ResamplingProd<T: Sample, const MAX_CHANNELS: usize> {
332    prod: ringbuf::HeapProd<T>,
333    num_channels: NonZeroUsize,
334    latency_seconds: f64,
335    channel_latency_samples: usize,
336    in_sample_rate: u32,
337    in_sample_rate_recip: f64,
338    out_sample_rate: u32,
339    out_sample_rate_recip: f64,
340    shared_state: Arc<SharedState>,
341    waiting_for_output_to_reset: bool,
342    underflow_autocorrect_threshold_samples: Option<usize>,
343
344    #[cfg(feature = "resampler")]
345    resampler: Option<FixedResampler<T, MAX_CHANNELS>>,
346    #[cfg(feature = "resampler")]
347    output_to_input_ratio: f64,
348}
349
350impl<T: Sample, const MAX_CHANNELS: usize> ResamplingProd<T, MAX_CHANNELS> {
351    /// Push the given data in de-interleaved format.
352    ///
353    /// * `input` - The input data in de-interleaved format.
354    /// * `input_range` - The range in each channel in `input` to read from.
355    ///
356    /// This method is realtime-safe.
357    ///
358    /// # Panics
359    /// Panics if:
360    /// * `input.len() < self.num_channels()`.
361    /// * The `input_range` is out of bounds for any of the input channels.
362    pub fn push<Vin: AsRef<[T]>>(
363        &mut self,
364        input: &[Vin],
365        input_range: Range<usize>,
366    ) -> PushStatus {
367        assert!(input.len() >= self.num_channels.get());
368
369        self.set_input_stream_ready(true);
370
371        if !self.output_stream_ready() {
372            return PushStatus::OutputNotReady;
373        }
374
375        self.poll_reset();
376
377        let input_frames = input_range.end - input_range.start;
378
379        #[cfg(feature = "resampler")]
380        if self.resampler.is_some() {
381            let available_frames = self.available_frames();
382
383            let proc_frames = input_frames.min(available_frames);
384
385            self.resampler.as_mut().unwrap().process(
386                input,
387                input_range.start..input_range.start + proc_frames,
388                |output_packet| {
389                    let packet_frames = output_packet[0].len();
390
391                    let pushed_frames = push_internal(
392                        &mut self.prod,
393                        &output_packet,
394                        0,
395                        packet_frames,
396                        self.num_channels,
397                    );
398
399                    debug_assert_eq!(pushed_frames, packet_frames);
400                },
401                None,
402                true,
403            );
404
405            return if proc_frames < input_frames {
406                PushStatus::OverflowOccurred {
407                    num_frames_pushed: proc_frames,
408                }
409            } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
410                PushStatus::UnderflowCorrected {
411                    num_zero_frames_pushed: zero_frames_pushed,
412                }
413            } else {
414                PushStatus::Ok
415            };
416        }
417
418        let pushed_frames = push_internal(
419            &mut self.prod,
420            input,
421            input_range.start,
422            input_frames,
423            self.num_channels,
424        );
425
426        if pushed_frames < input_frames {
427            PushStatus::OverflowOccurred {
428                num_frames_pushed: pushed_frames,
429            }
430        } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
431            PushStatus::UnderflowCorrected {
432                num_zero_frames_pushed: zero_frames_pushed,
433            }
434        } else {
435            PushStatus::Ok
436        }
437    }
438
439    /// Push the given data in interleaved format.
440    ///
441    /// This method is realtime-safe.
442    pub fn push_interleaved(&mut self, input: &[T]) -> PushStatus {
443        self.set_input_stream_ready(true);
444
445        if !self.output_stream_ready() {
446            return PushStatus::OutputNotReady;
447        }
448
449        self.poll_reset();
450
451        #[cfg(feature = "resampler")]
452        if self.resampler.is_some() {
453            let input_frames = input.len() / self.num_channels.get();
454
455            let available_frames = self.available_frames();
456
457            let proc_frames = input_frames.min(available_frames);
458
459            self.resampler.as_mut().unwrap().process_interleaved(
460                &input[..proc_frames * self.num_channels.get()],
461                |output_packet| {
462                    let pushed_samples = self.prod.push_slice(output_packet);
463
464                    debug_assert_eq!(pushed_samples, output_packet.len());
465                },
466                None,
467                true,
468            );
469
470            return if proc_frames < input_frames {
471                PushStatus::OverflowOccurred {
472                    num_frames_pushed: proc_frames,
473                }
474            } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
475                PushStatus::UnderflowCorrected {
476                    num_zero_frames_pushed: zero_frames_pushed,
477                }
478            } else {
479                PushStatus::Ok
480            };
481        }
482
483        let pushed_samples = self.prod.push_slice(input);
484
485        if pushed_samples < input.len() {
486            PushStatus::OverflowOccurred {
487                num_frames_pushed: pushed_samples / self.num_channels.get(),
488            }
489        } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
490            PushStatus::UnderflowCorrected {
491                num_zero_frames_pushed: zero_frames_pushed,
492            }
493        } else {
494            PushStatus::Ok
495        }
496    }
497
498    /// Returns the number of input frames (samples in a single channel of audio)
499    /// that are currently available to be pushed to the channel.
500    ///
501    /// If the output stream is not ready yet, then this will return `0`.
502    ///
503    /// This method is realtime-safe.
504    pub fn available_frames(&mut self) -> usize {
505        if !self.output_stream_ready() {
506            return 0;
507        }
508
509        self.poll_reset();
510
511        let output_vacant_frames = self.prod.vacant_len() / self.num_channels.get();
512
513        #[cfg(feature = "resampler")]
514        if let Some(resampler) = &self.resampler {
515            let mut input_vacant_frames =
516                (output_vacant_frames as f64 * self.output_to_input_ratio).floor() as usize;
517
518            // Give some leeway to account for floating point inaccuracies.
519            if input_vacant_frames > 0 {
520                input_vacant_frames -= 1;
521            }
522
523            if input_vacant_frames < resampler.input_block_frames() {
524                return 0;
525            }
526
527            // The resampler processes in chunks.
528            input_vacant_frames = (input_vacant_frames / resampler.input_block_frames())
529                * resampler.input_block_frames();
530
531            return input_vacant_frames - resampler.tmp_input_frames();
532        }
533
534        output_vacant_frames
535    }
536
537    /// The amount of data in seconds that is available to be pushed to the
538    /// channel.
539    ///
540    /// If the output stream is not ready yet, then this will return `0.0`.
541    ///
542    /// This method is realtime-safe.
543    pub fn available_seconds(&mut self) -> f64 {
544        self.available_frames() as f64 * self.in_sample_rate_recip
545    }
546
547    /// The amount of data that is currently occupied in the channel, in units of
548    /// output frames (samples in a single channel of audio).
549    ///
550    /// Note, this is the number of frames in the *output* audio stream, not the
551    /// input audio stream.
552    ///
553    /// This method is realtime-safe.
554    pub fn occupied_output_frames(&self) -> usize {
555        self.prod.occupied_len() / self.num_channels.get()
556    }
557
558    /// The amount of data that is currently occupied in the channel, in units of
559    /// seconds.
560    ///
561    /// This method is realtime-safe.
562    pub fn occupied_seconds(&self) -> f64 {
563        self.occupied_output_frames() as f64 * self.out_sample_rate_recip
564    }
565
566    /// The number of channels configured for this stream.
567    ///
568    /// This method is realtime-safe.
569    pub fn num_channels(&self) -> NonZeroUsize {
570        self.num_channels
571    }
572
573    /// The sample rate of the input stream.
574    ///
575    /// This method is realtime-safe.
576    pub fn in_sample_rate(&self) -> u32 {
577        self.in_sample_rate
578    }
579
580    /// The sample rate of the output stream.
581    ///
582    /// This method is realtime-safe.
583    pub fn out_sample_rate(&self) -> u32 {
584        self.out_sample_rate
585    }
586
587    /// The latency of the channel in units of seconds.
588    ///
589    /// This method is realtime-safe.
590    pub fn latency_seconds(&self) -> f64 {
591        self.latency_seconds
592    }
593
594    /// Returns `true` if this channel is currently resampling.
595    ///
596    /// This method is realtime-safe.
597    #[cfg(feature = "resampler")]
598    pub fn is_resampling(&self) -> bool {
599        self.resampler.is_some()
600    }
601
602    /// Tell the consumer to clear all queued frames in the buffer.
603    ///
604    /// This method is realtime-safe.
605    pub fn reset(&mut self) {
606        self.shared_state.reset.store(true, Ordering::Relaxed);
607
608        self.waiting_for_output_to_reset = true;
609
610        #[cfg(feature = "resampler")]
611        if let Some(resampler) = &mut self.resampler {
612            resampler.reset();
613        }
614    }
615
616    /// Manually notify the output stream that the input stream is ready/not ready
617    /// to push samples to the channel.
618    ///
619    /// If this producer end is being used in a non-realtime context, then it is
620    /// a good idea to set this to `true` so that the consumer end can start
621    /// reading samples from the channel immediately.
622    ///
623    /// Note, calling [`ResamplingProd::push`] and
624    /// [`ResamplingProd::push_interleaved`] automatically sets the input stream as
625    /// ready.
626    ///
627    /// This method is realtime-safe.
628    pub fn set_input_stream_ready(&mut self, ready: bool) {
629        self.shared_state
630            .input_stream_ready
631            .store(ready, Ordering::Relaxed);
632    }
633
634    /// Whether or not the output stream is ready to read samples from the channel.
635    ///
636    /// This method is realtime-safe.
637    pub fn output_stream_ready(&self) -> bool {
638        self.shared_state
639            .output_stream_ready
640            .load(Ordering::Relaxed)
641            && !self.shared_state.reset.load(Ordering::Relaxed)
642    }
643
644    /// Correct for any underflows.
645    ///
646    /// This returns the number of extra zero frames (samples in a single channel of audio)
647    /// that were added due to an underflow occurring. If no underflow occured, then `None`
648    /// is returned.
649    ///
650    /// Note, this method is already automatically called in [`ResamplingProd::push`] and
651    /// [`ResamplingProd::push_interleaved`].
652    ///
653    /// This will have no effect if [`ResamplingChannelConfig::underflow_autocorrect_percent_threshold`]
654    /// was set to `None`.
655    ///
656    /// This method is realtime-safe.
657    pub fn autocorrect_underflows(&mut self) -> Option<usize> {
658        if !self.output_stream_ready() {
659            return None;
660        }
661
662        self.poll_reset();
663
664        if let Some(underflow_autocorrect_threshold_samples) =
665            self.underflow_autocorrect_threshold_samples
666        {
667            let len = self.prod.occupied_len();
668            if len <= underflow_autocorrect_threshold_samples && len < self.channel_latency_samples
669            {
670                let correction_samples = self.channel_latency_samples - len;
671
672                self.prod
673                    .push_iter((0..correction_samples).map(|_| T::zero()));
674
675                return Some(correction_samples / self.num_channels.get());
676            }
677        }
678
679        None
680    }
681
682    fn poll_reset(&mut self) {
683        if self.waiting_for_output_to_reset {
684            self.waiting_for_output_to_reset = false;
685
686            // Fill the channel with initial zeros to create the desired latency.
687            self.prod
688                .push_iter((0..self.channel_latency_samples).map(|_| T::zero()));
689        }
690    }
691}
692
693/// The consumer end of a realtime-safe spsc channel for sending samples across
694/// streams.
695///
696/// If the input and output samples rates differ, then this will automatically
697/// resample the input stream to match the output stream. If the sample rates
698/// match, then no resampling will occur.
699///
700/// Internally this uses the `rubato` and `ringbuf` crates.
701pub struct ResamplingCons<T: Sample> {
702    cons: ringbuf::HeapCons<T>,
703    num_channels: NonZeroUsize,
704    latency_seconds: f64,
705    latency_frames: usize,
706    channel_latency_samples: usize,
707    in_sample_rate: u32,
708    out_sample_rate: u32,
709    out_sample_rate_recip: f64,
710    shared_state: Arc<SharedState>,
711    waiting_for_input_to_reset: bool,
712    overflow_autocorrect_threshold_samples: Option<usize>,
713
714    #[cfg(feature = "resampler")]
715    resampler_output_delay: usize,
716    #[cfg(feature = "resampler")]
717    is_resampling: bool,
718}
719
720impl<T: Sample> ResamplingCons<T> {
721    /// The number of channels configured for this stream.
722    ///
723    /// This method is realtime-safe.
724    pub fn num_channels(&self) -> NonZeroUsize {
725        self.num_channels
726    }
727
728    /// The sample rate of the input stream.
729    ///
730    /// This method is realtime-safe.
731    pub fn in_sample_rate(&self) -> u32 {
732        self.in_sample_rate
733    }
734
735    /// The sample rate of the output stream.
736    ///
737    /// This method is realtime-safe.
738    pub fn out_sample_rate(&self) -> u32 {
739        self.out_sample_rate
740    }
741
742    /// The latency of the channel in units of seconds.
743    ///
744    /// This method is realtime-safe.
745    pub fn latency_seconds(&self) -> f64 {
746        self.latency_seconds
747    }
748
749    /// The latency of the channel in units of output frames.
750    ///
751    /// This method is realtime-safe.
752    pub fn latency_frames(&self) -> usize {
753        self.latency_frames
754    }
755
756    /// The number of frames (samples in a single channel of audio) that are
757    /// currently available to be read from the channel.
758    ///
759    /// If the input stream is not ready yet, then this will return `0`.
760    ///
761    /// This method is realtime-safe.
762    pub fn available_frames(&self) -> usize {
763        if self.input_stream_ready() {
764            self.cons.occupied_len() / self.num_channels.get()
765        } else {
766            0
767        }
768    }
769
770    /// The amount of data in seconds that is currently available to be read
771    /// from the channel.
772    ///
773    /// If the input stream is not ready yet, then this will return `0.0`.
774    ///
775    /// This method is realtime-safe.
776    pub fn available_seconds(&self) -> f64 {
777        self.available_frames() as f64 * self.out_sample_rate_recip
778    }
779
780    /// The amount of data that is currently occupied in the channel, in units of
781    /// seconds.
782    ///
783    /// This method is realtime-safe.
784    pub fn occupied_seconds(&self) -> f64 {
785        (self.cons.occupied_len() / self.num_channels.get()) as f64 * self.out_sample_rate_recip
786    }
787
788    /// Returns `true` if this channel is currently resampling.
789    ///
790    /// This method is realtime-safe.
791    #[cfg(feature = "resampler")]
792    pub fn is_resampling(&self) -> bool {
793        self.is_resampling
794    }
795
796    /// The delay of the internal resampler in number of output frames (samples in
797    /// a single channel of audio).
798    ///
799    /// If there is no resampler being used for this channel, then this will return
800    /// `0`.
801    ///
802    /// This method is realtime-safe.
803    #[cfg(feature = "resampler")]
804    pub fn resampler_output_delay(&self) -> usize {
805        self.resampler_output_delay
806    }
807
808    /// Discard a certian number of output frames from the buffer. This can be used
809    /// to correct for jitter and avoid excessive overflows and reduce the percieved
810    /// audible glitchiness.
811    ///
812    /// This will discard `frames.min(self.available_frames())` frames.
813    ///
814    /// Returns the number of output frames that were discarded.
815    ///
816    /// This method is realtime-safe.
817    pub fn discard_frames(&mut self, frames: usize) -> usize {
818        self.cons.skip(frames * self.num_channels.get()) / self.num_channels.get()
819    }
820
821    /// Read from the channel and store the results in the de-interleaved
822    /// output buffer.
823    ///
824    /// This method is realtime-safe.
825    pub fn read<Vout: AsMut<[T]>>(
826        &mut self,
827        output: &mut [Vout],
828        output_range: Range<usize>,
829    ) -> ReadStatus {
830        self.set_output_stream_ready(true);
831
832        self.poll_reset();
833
834        if !self.input_stream_ready() {
835            for ch in output.iter_mut() {
836                ch.as_mut()[output_range.clone()].fill(T::zero());
837            }
838
839            return ReadStatus::InputNotReady;
840        }
841
842        self.waiting_for_input_to_reset = false;
843
844        if output.len() > self.num_channels.get() {
845            for ch in output.iter_mut().skip(self.num_channels.get()) {
846                ch.as_mut()[output_range.clone()].fill(T::zero());
847            }
848        }
849
850        let output_frames = output_range.end - output_range.start;
851
852        // Simply copy the input stream to the output.
853        let (s1, s2) = self.cons.as_slices();
854
855        let s1_frames = s1.len() / self.num_channels.get();
856        let s1_copy_frames = s1_frames.min(output_frames);
857
858        fast_interleave::deinterleave_variable(
859            s1,
860            self.num_channels,
861            output,
862            output_range.start..output_range.start + s1_copy_frames,
863        );
864
865        let mut filled_frames = s1_copy_frames;
866
867        if output_frames > s1_copy_frames {
868            let s2_frames = s2.len() / self.num_channels.get();
869            let s2_copy_frames = s2_frames.min(output_frames - s1_copy_frames);
870
871            fast_interleave::deinterleave_variable(
872                s2,
873                self.num_channels,
874                output,
875                output_range.start + s1_copy_frames
876                    ..output_range.start + s1_copy_frames + s2_copy_frames,
877            );
878
879            filled_frames += s2_copy_frames;
880        }
881
882        // SAFETY:
883        //
884        // * `T` implements `Copy`, so it does not have a drop method that needs to
885        // be called.
886        // * `self` is borrowed as mutable in this method, ensuring that the consumer
887        // cannot be accessed concurrently.
888        unsafe {
889            self.cons
890                .advance_read_index(filled_frames * self.num_channels.get());
891        }
892
893        if filled_frames < output_frames {
894            for (_, ch) in (0..self.num_channels.get()).zip(output.iter_mut()) {
895                ch.as_mut()[filled_frames..output_range.end].fill(T::zero());
896            }
897
898            ReadStatus::UnderflowOccurred {
899                num_frames_read: filled_frames,
900            }
901        } else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
902            ReadStatus::OverflowCorrected {
903                num_frames_discarded,
904            }
905        } else {
906            ReadStatus::Ok
907        }
908    }
909
910    /// Read from the channel and store the results into the output buffer
911    /// in interleaved format.
912    ///
913    /// This method is realtime-safe.
914    pub fn read_interleaved(&mut self, output: &mut [T]) -> ReadStatus {
915        self.set_output_stream_ready(true);
916
917        self.poll_reset();
918
919        if !self.input_stream_ready() {
920            output.fill(T::zero());
921
922            return ReadStatus::InputNotReady;
923        }
924
925        self.waiting_for_input_to_reset = false;
926
927        let out_frames = output.len() / self.num_channels.get();
928
929        let pushed_samples = self
930            .cons
931            .pop_slice(&mut output[..out_frames * self.num_channels.get()]);
932
933        if pushed_samples < output.len() {
934            output[pushed_samples..].fill(T::zero());
935
936            ReadStatus::UnderflowOccurred {
937                num_frames_read: pushed_samples / self.num_channels.get(),
938            }
939        } else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
940            ReadStatus::OverflowCorrected {
941                num_frames_discarded,
942            }
943        } else {
944            ReadStatus::Ok
945        }
946    }
947
948    /// Poll the channel to see if it got a command to reset.
949    ///
950    /// Returns `true` if the channel was reset.
951    pub fn poll_reset(&mut self) -> bool {
952        if self.shared_state.reset.load(Ordering::Relaxed) {
953            self.shared_state.reset.store(false, Ordering::Relaxed);
954            self.waiting_for_input_to_reset = true;
955
956            self.cons.clear();
957
958            true
959        } else {
960            false
961        }
962    }
963
964    /// Manually notify the input stream that the output stream is ready/not ready
965    /// to read samples from the channel.
966    ///
967    /// If this consumer end is being used in a non-realtime context, then it is
968    /// a good idea to set this to `true` so that the producer end can start
969    /// pushing samples to the channel immediately.
970    ///
971    /// Note, calling [`ResamplingCons::read`] and
972    /// [`ResamplingCons::read_interleaved`] automatically sets the output stream as
973    /// ready.
974    ///
975    /// This method is realtime-safe.
976    pub fn set_output_stream_ready(&mut self, ready: bool) {
977        self.shared_state
978            .output_stream_ready
979            .store(ready, Ordering::Relaxed);
980    }
981
982    /// Whether or not the input stream is ready to push samples to the channel.
983    ///
984    /// This method is realtime-safe.
985    pub fn input_stream_ready(&self) -> bool {
986        self.shared_state.input_stream_ready.load(Ordering::Relaxed)
987            && !(self.waiting_for_input_to_reset && self.cons.is_empty())
988    }
989
990    /// Correct for any overflows.
991    ///
992    /// This returns the number of frames (samples in a single channel of audio) that were
993    /// discarded due to an overflow occurring. If no overflow occured, then `None`
994    /// is returned.
995    ///
996    /// Note, this method is already automatically called in [`ResamplingCons::read`] and
997    /// [`ResamplingCons::read_interleaved`].
998    ///
999    /// This will have no effect if [`ResamplingChannelConfig::overflow_autocorrect_percent_threshold`]
1000    /// was set to `None`.
1001    ///
1002    /// This method is realtime-safe.
1003    pub fn autocorrect_overflows(&mut self) -> Option<usize> {
1004        if let Some(overflow_autocorrect_threshold_samples) =
1005            self.overflow_autocorrect_threshold_samples
1006        {
1007            let len = self.cons.occupied_len();
1008
1009            if len >= overflow_autocorrect_threshold_samples && len > self.channel_latency_samples {
1010                let correction_frames =
1011                    (len - self.channel_latency_samples) / self.num_channels.get();
1012
1013                self.discard_frames(correction_frames);
1014
1015                return Some(correction_frames);
1016            }
1017        }
1018
1019        None
1020    }
1021}
1022
1023/// The status of pushing samples to [`ResamplingProd::push`] and
1024/// [`ResamplingProd::push_interleaved`].
1025#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1026pub enum PushStatus {
1027    /// All samples were successfully pushed to the channel.
1028    Ok,
1029    /// The output stream is not yet ready to read samples from the channel.
1030    ///
1031    /// Note, this can also happen when the channel is reset.
1032    ///
1033    /// No samples were pushed to the channel.
1034    OutputNotReady,
1035    /// An overflow occured due to the input stream running faster than the
1036    /// output stream. Some or all of the samples were not pushed to the channel.
1037    ///
1038    /// If this occurs, then it may mean that [`ResamplingChannelConfig::capacity_seconds`]
1039    /// is too low and should be increased.
1040    OverflowOccurred {
1041        /// The number of frames (samples in a single channel of audio) that were
1042        /// successfully pushed to the channel.
1043        num_frames_pushed: usize,
1044    },
1045    /// An underflow occured due to the output stream running faster than the
1046    /// input stream.
1047    ///
1048    /// All of the samples were successfully pushed to the channel, however extra
1049    /// zero samples were also pushed to the channel to correct for the jitter.
1050    ///
1051    /// If this occurs, then it may mean that [`ResamplingChannelConfig::latency_seconds`]
1052    /// is too low and should be increased.
1053    UnderflowCorrected {
1054        /// The number of zero frames that were pushed after the other samples
1055        /// were pushed.
1056        num_zero_frames_pushed: usize,
1057    },
1058}
1059
1060/// The status of reading data from [`ResamplingCons::read`] and
1061/// [`ResamplingCons::read_interleaved`].
1062#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1063pub enum ReadStatus {
1064    /// The output buffer was fully filled with samples from the channel.
1065    Ok,
1066    /// The input stream is not yet ready to push samples to the channel.
1067    ///
1068    /// Note, this can also happen when the channel is reset.
1069    ///
1070    /// The output buffer was filled with zeros.
1071    InputNotReady,
1072    /// An underflow occured due to the output stream running faster than the input
1073    /// stream. Some or all of the samples in the output buffer have been filled with
1074    /// zeros on the end. This may result in audible audio glitches.
1075    ///
1076    /// If this occurs, then it may mean that [`ResamplingChannelConfig::latency_seconds`]
1077    /// is too low and should be increased.
1078    UnderflowOccurred {
1079        /// The number of frames (samples in a single channel of audio) that were
1080        /// successfully read from the channel. All frames past this have been filled
1081        /// with zeros.
1082        num_frames_read: usize,
1083    },
1084    /// An overflow occured due to the input stream running faster than the output
1085    /// stream
1086    ///
1087    /// All of the samples in the output buffer were successfully filled with samples,
1088    /// however a number of frames have also been discarded to correct for the jitter.
1089    ///
1090    /// If this occurs, then it may mean that [`ResamplingChannelConfig::capacity_seconds`]
1091    /// is too low and should be increased.
1092    OverflowCorrected {
1093        /// The number of frames that were discarded from the channel (after the
1094        /// frames have been read into the output buffer).
1095        num_frames_discarded: usize,
1096    },
1097}
1098
1099struct SharedState {
1100    reset: AtomicBool,
1101    input_stream_ready: AtomicBool,
1102    output_stream_ready: AtomicBool,
1103}
1104
1105impl SharedState {
1106    fn new() -> Self {
1107        Self {
1108            reset: AtomicBool::new(false),
1109            input_stream_ready: AtomicBool::new(false),
1110            output_stream_ready: AtomicBool::new(false),
1111        }
1112    }
1113}
1114
1115fn push_internal<T: Sample, Vin: AsRef<[T]>>(
1116    prod: &mut ringbuf::HeapProd<T>,
1117    input: &[Vin],
1118    in_start_frame: usize,
1119    frames: usize,
1120    num_channels: NonZeroUsize,
1121) -> usize {
1122    let (s1, s2) = prod.vacant_slices_mut();
1123
1124    if s1.len() == 0 {
1125        return 0;
1126    }
1127
1128    let s1_frames = s1.len() / num_channels.get();
1129    let s1_copy_frames = s1_frames.min(frames);
1130
1131    let mut frames_pushed = s1_copy_frames;
1132
1133    {
1134        // SAFETY:
1135        //
1136        // * `&mut [MaybeUninit<T>]` and `&mut [T]` are the same bit-for-bit.
1137        // * All data in the slice is initialized in the `interleave` method below.
1138        //
1139        // TODO: Remove unsafe on `maybe_uninit_write_slice` stabilization.
1140        let s1: &mut [T] =
1141            unsafe { std::mem::transmute(&mut s1[..s1_copy_frames * num_channels.get()]) };
1142
1143        fast_interleave::interleave_variable(
1144            input,
1145            in_start_frame..in_start_frame + s1_copy_frames,
1146            s1,
1147            num_channels,
1148        );
1149    }
1150
1151    if frames > s1_copy_frames && s2.len() > 0 {
1152        let s2_frames = s2.len() / num_channels.get();
1153        let s2_copy_frames = s2_frames.min(frames - s1_copy_frames);
1154
1155        // SAFETY:
1156        //
1157        // * `&mut [MaybeUninit<T>]` and `&mut [T]` are the same bit-for-bit.
1158        // * All data in the slice is initialized in the `interleave` method below.
1159        //
1160        // TODO: Remove unsafe on `maybe_uninit_write_slice` stabilization.
1161        let s2: &mut [T] =
1162            unsafe { std::mem::transmute(&mut s2[..s2_copy_frames * num_channels.get()]) };
1163
1164        fast_interleave::interleave_variable(
1165            input,
1166            in_start_frame + s1_copy_frames..in_start_frame + s1_copy_frames + s2_copy_frames,
1167            s2,
1168            num_channels,
1169        );
1170
1171        frames_pushed += s2_copy_frames;
1172    }
1173
1174    // SAFETY:
1175    //
1176    // * All frames up to `frames_pushed` was filled with data above.
1177    // * `prod` is borrowed as mutable in this method, ensuring that it cannot be
1178    // accessed concurrently.
1179    unsafe {
1180        prod.advance_write_index(frames_pushed * num_channels.get());
1181    }
1182
1183    frames_pushed
1184}