fixed_resample/channel.rs
1use std::{
2 num::NonZeroUsize,
3 ops::Range,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc,
7 },
8};
9
10use ringbuf::traits::{Consumer, Observer, Producer, Split};
11
12use crate::Sample;
13
14#[cfg(feature = "resampler")]
15use crate::{resampler_type::ResamplerType, FixedResampler, ResampleQuality};
16
17/// Additional options for a resampling channel.
18#[derive(Debug, Clone, Copy, PartialEq)]
19pub struct ResamplingChannelConfig {
20 /// The amount of latency added in seconds between the input stream and the
21 /// output stream. If this value is too small, then underflows may occur.
22 ///
23 /// The default value is `0.15` (150 ms).
24 pub latency_seconds: f64,
25
26 /// The capacity of the channel in seconds. If this is too small, then
27 /// overflows may occur. This should be at least twice as large as
28 /// `latency_seconds`.
29 ///
30 /// Note, the actual capacity may be slightly smaller due to how the internal
31 /// sampler processes in chunks.
32 ///
33 /// The default value is `0.4` (400 ms).
34 pub capacity_seconds: f64,
35
36 /// If the number of occupied samples in the channel is greater than or equal to
37 /// (`latency_seconds + percent * (capacity_seconds - latency_seconds)`), then discard the
38 /// number of samples needed to bring the number of occupied seconds back down to
39 /// [`ResamplingChannelConfig::latency_seconds`]. This is used to avoid excessive
40 /// overflows and reduce the percieved audio glitchiness.
41 ///
42 /// The percentage is a value in the range `[0.0, 100.0]`.
43 ///
44 /// Set to `None` to disable this autocorrecting behavior. If the producer end is being
45 /// used in a non-realtime context, then this should be set to `None`.
46 ///
47 /// By default this is set to `Some(75.0)`.
48 pub overflow_autocorrect_percent_threshold: Option<f64>,
49
50 /// If the number of occupied samples in the channel is below or equal to the given
51 /// percentage of [`ResamplingChannelConfig::latency_seconds`], then insert the number of
52 /// zero frames needed to bring the number of occupied samples back up to
53 /// [`ResamplingChannelConfig::latency_seconds`]. This is used to avoid excessive underflows
54 /// and reduce the percieved audio glitchiness.
55 ///
56 /// The percentage is a value in the range `[0.0, 100.0]`.
57 ///
58 /// Set to `None` to disable this autocorrecting behavior. If the consumer end is being
59 /// used in a non-realtime context, then this should be set to `None`.
60 ///
61 /// By default this is set to `Some(25.0)`.
62 pub underflow_autocorrect_percent_threshold: Option<f64>,
63
64 #[cfg(feature = "resampler")]
65 /// The quality of the resampling alrgorithm to use if needed.
66 ///
67 /// The default value is `ResampleQuality::Normal`.
68 pub quality: ResampleQuality,
69
70 #[cfg(feature = "resampler")]
71 /// If `true`, then the delay of the internal resampler (if used) will be
72 /// subtracted from the `latency_seconds` value to keep the perceived
73 /// latency consistent.
74 ///
75 /// The default value is `true`.
76 pub subtract_resampler_delay: bool,
77}
78
79impl Default for ResamplingChannelConfig {
80 fn default() -> Self {
81 Self {
82 latency_seconds: 0.15,
83 capacity_seconds: 0.4,
84 overflow_autocorrect_percent_threshold: Some(75.0),
85 underflow_autocorrect_percent_threshold: Some(25.0),
86 #[cfg(feature = "resampler")]
87 quality: ResampleQuality::High,
88 #[cfg(feature = "resampler")]
89 subtract_resampler_delay: true,
90 }
91 }
92}
93
94/// Create a new realtime-safe spsc channel for sending samples across streams.
95///
96/// If the input and output samples rates differ, then this will automatically
97/// resample the input stream to match the output stream (unless the "resample"
98/// feature is disabled). If the sample rates match, then no resampling will
99/// occur.
100///
101/// Internally this uses the `rubato` and `ringbuf` crates.
102///
103/// * `in_sample_rate` - The sample rate of the input stream.
104/// * `out_sample_rate` - The sample rate of the output stream.
105/// * `num_channels` - The number of channels in the stream.
106/// * `config` - Additional options for the resampling channel.
107///
108/// # Panics
109///
110/// Panics when any of the following are true:
111///
112/// * `in_sample_rate == 0`
113/// * `out_sample_rate == 0`
114/// * `num_channels == 0`
115/// * `config.latency_seconds <= 0.0`
116/// * `config.capacity_seconds <= 0.0`
117///
118/// If the "resampler" feature is disabled, then this will also panic if
119/// `in_sample_rate != out_sample_rate`.
120pub fn resampling_channel<T: Sample, const MAX_CHANNELS: usize>(
121 num_channels: NonZeroUsize,
122 in_sample_rate: u32,
123 out_sample_rate: u32,
124 config: ResamplingChannelConfig,
125) -> (ResamplingProd<T, MAX_CHANNELS>, ResamplingCons<T>) {
126 #[cfg(feature = "resampler")]
127 let resampler = if in_sample_rate != out_sample_rate {
128 Some(FixedResampler::<T, MAX_CHANNELS>::new(
129 num_channels,
130 in_sample_rate,
131 out_sample_rate,
132 config.quality,
133 true,
134 ))
135 } else {
136 None
137 };
138
139 resampling_channel_inner(
140 #[cfg(feature = "resampler")]
141 resampler,
142 num_channels,
143 in_sample_rate,
144 out_sample_rate,
145 config,
146 )
147}
148
149/// Create a new realtime-safe spsc channel for sending samples across streams
150/// using the custom resampler.
151///
152/// If the input and output samples rates differ, then this will automatically
153/// resample the input stream to match the output stream. If the sample rates
154/// match, then no resampling will occur.
155///
156/// Internally this uses the `rubato` and `ringbuf` crates.
157///
158/// * `resampler` - The custom rubato resampler.
159/// * `in_sample_rate` - The sample rate of the input stream.
160/// * `out_sample_rate` - The sample rate of the output stream.
161/// * `num_channels` - The number of channels in the stream.
162/// * `config` - Additional options for the resampling channel. Note that
163/// `config.quality` will be ignored.
164///
165/// # Panics
166///
167/// Panics when any of the following are true:
168///
169/// * `resampler.num_channels() != num_channels`
170/// * `in_sample_rate == 0`
171/// * `out_sample_rate == 0`
172/// * `num_channels == 0`
173/// * `config.latency_seconds <= 0.0`
174/// * `config.capacity_seconds <= 0.0`
175#[cfg(feature = "resampler")]
176pub fn resampling_channel_custom<T: Sample, const MAX_CHANNELS: usize>(
177 resampler: impl Into<ResamplerType<T>>,
178 num_channels: NonZeroUsize,
179 in_sample_rate: u32,
180 out_sample_rate: u32,
181 config: ResamplingChannelConfig,
182) -> (ResamplingProd<T, MAX_CHANNELS>, ResamplingCons<T>) {
183 let resampler = if in_sample_rate != out_sample_rate {
184 let resampler = FixedResampler::<T, MAX_CHANNELS>::from_custom(
185 resampler,
186 in_sample_rate,
187 out_sample_rate,
188 true,
189 );
190 assert_eq!(resampler.num_channels(), num_channels);
191
192 Some(resampler)
193 } else {
194 None
195 };
196
197 resampling_channel_inner(
198 resampler,
199 num_channels,
200 in_sample_rate,
201 out_sample_rate,
202 config,
203 )
204}
205
206fn resampling_channel_inner<T: Sample, const MAX_CHANNELS: usize>(
207 #[cfg(feature = "resampler")] resampler: Option<FixedResampler<T, MAX_CHANNELS>>,
208 num_channels: NonZeroUsize,
209 in_sample_rate: u32,
210 out_sample_rate: u32,
211 config: ResamplingChannelConfig,
212) -> (ResamplingProd<T, MAX_CHANNELS>, ResamplingCons<T>) {
213 #[cfg(not(feature = "resampler"))]
214 assert_eq!(
215 in_sample_rate, out_sample_rate,
216 "Input and output sample rate must be equal when the \"resampler\" feature is disabled"
217 );
218
219 assert_ne!(in_sample_rate, 0);
220 assert_ne!(out_sample_rate, 0);
221 assert!(config.latency_seconds > 0.0);
222 assert!(config.capacity_seconds > 0.0);
223
224 #[cfg(feature = "resampler")]
225 let is_resampling = resampler.is_some();
226 #[cfg(feature = "resampler")]
227 let resampler_output_delay = resampler.as_ref().map(|r| r.output_delay()).unwrap_or(0);
228
229 let in_sample_rate_recip = (in_sample_rate as f64).recip();
230 let out_sample_rate_recip = (out_sample_rate as f64).recip();
231
232 #[cfg(feature = "resampler")]
233 let output_to_input_ratio = in_sample_rate as f64 / out_sample_rate as f64;
234
235 let latency_frames =
236 ((out_sample_rate as f64 * config.latency_seconds).round() as usize).max(1);
237
238 #[allow(unused_mut)]
239 let mut channel_latency_frames = latency_frames;
240
241 #[cfg(feature = "resampler")]
242 if resampler.is_some() && config.subtract_resampler_delay {
243 if latency_frames > resampler_output_delay {
244 channel_latency_frames -= resampler_output_delay;
245 } else {
246 channel_latency_frames = 1;
247 }
248 }
249
250 let channel_latency_samples = channel_latency_frames * num_channels.get();
251
252 let buffer_capacity_frames = ((in_sample_rate as f64 * config.capacity_seconds).round()
253 as usize)
254 .max(channel_latency_frames * 2);
255
256 let (mut prod, cons) =
257 ringbuf::HeapRb::<T>::new(buffer_capacity_frames * num_channels.get()).split();
258
259 // Fill the channel with initial zeros to create the desired latency.
260 prod.push_slice(&vec![
261 T::zero();
262 channel_latency_frames * num_channels.get()
263 ]);
264
265 let shared_state = Arc::new(SharedState::new());
266
267 let overflow_autocorrect_threshold_samples =
268 config
269 .overflow_autocorrect_percent_threshold
270 .map(|percent| {
271 let range_samples =
272 (buffer_capacity_frames - channel_latency_frames) * num_channels.get();
273
274 ((range_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
275 .min(range_samples)
276 + channel_latency_samples
277 });
278 let underflow_autocorrect_threshold_samples = config
279 .underflow_autocorrect_percent_threshold
280 .map(|percent| {
281 ((channel_latency_samples as f64 * (percent / 100.0).clamp(0.0, 1.0)).round() as usize)
282 .min(channel_latency_samples)
283 });
284
285 (
286 ResamplingProd {
287 prod,
288 num_channels,
289 latency_seconds: config.latency_seconds,
290 channel_latency_samples,
291 in_sample_rate,
292 in_sample_rate_recip,
293 out_sample_rate,
294 out_sample_rate_recip,
295 shared_state: Arc::clone(&shared_state),
296 waiting_for_output_to_reset: false,
297 underflow_autocorrect_threshold_samples,
298 #[cfg(feature = "resampler")]
299 resampler,
300 #[cfg(feature = "resampler")]
301 output_to_input_ratio,
302 },
303 ResamplingCons {
304 cons,
305 num_channels,
306 latency_frames,
307 latency_seconds: config.latency_seconds,
308 channel_latency_samples,
309 in_sample_rate,
310 out_sample_rate,
311 out_sample_rate_recip,
312 shared_state,
313 waiting_for_input_to_reset: false,
314 overflow_autocorrect_threshold_samples,
315 #[cfg(feature = "resampler")]
316 is_resampling,
317 #[cfg(feature = "resampler")]
318 resampler_output_delay,
319 },
320 )
321}
322
323/// The producer end of a realtime-safe spsc channel for sending samples across
324/// streams.
325///
326/// If the input and output samples rates differ, then this will automatically
327/// resample the input stream to match the output stream. If the sample rates
328/// match, then no resampling will occur.
329///
330/// Internally this uses the `rubato` and `ringbuf` crates.
331pub struct ResamplingProd<T: Sample, const MAX_CHANNELS: usize> {
332 prod: ringbuf::HeapProd<T>,
333 num_channels: NonZeroUsize,
334 latency_seconds: f64,
335 channel_latency_samples: usize,
336 in_sample_rate: u32,
337 in_sample_rate_recip: f64,
338 out_sample_rate: u32,
339 out_sample_rate_recip: f64,
340 shared_state: Arc<SharedState>,
341 waiting_for_output_to_reset: bool,
342 underflow_autocorrect_threshold_samples: Option<usize>,
343
344 #[cfg(feature = "resampler")]
345 resampler: Option<FixedResampler<T, MAX_CHANNELS>>,
346 #[cfg(feature = "resampler")]
347 output_to_input_ratio: f64,
348}
349
350impl<T: Sample, const MAX_CHANNELS: usize> ResamplingProd<T, MAX_CHANNELS> {
351 /// Push the given data in de-interleaved format.
352 ///
353 /// * `input` - The input data in de-interleaved format.
354 /// * `input_range` - The range in each channel in `input` to read from.
355 ///
356 /// This method is realtime-safe.
357 ///
358 /// # Panics
359 /// Panics if:
360 /// * `input.len() < self.num_channels()`.
361 /// * The `input_range` is out of bounds for any of the input channels.
362 pub fn push<Vin: AsRef<[T]>>(
363 &mut self,
364 input: &[Vin],
365 input_range: Range<usize>,
366 ) -> PushStatus {
367 assert!(input.len() >= self.num_channels.get());
368
369 self.set_input_stream_ready(true);
370
371 if !self.output_stream_ready() {
372 return PushStatus::OutputNotReady;
373 }
374
375 self.poll_reset();
376
377 let input_frames = input_range.end - input_range.start;
378
379 #[cfg(feature = "resampler")]
380 if self.resampler.is_some() {
381 let available_frames = self.available_frames();
382
383 let proc_frames = input_frames.min(available_frames);
384
385 self.resampler.as_mut().unwrap().process(
386 input,
387 input_range.start..input_range.start + proc_frames,
388 |output_packet| {
389 let packet_frames = output_packet[0].len();
390
391 let pushed_frames = push_internal(
392 &mut self.prod,
393 &output_packet,
394 0,
395 packet_frames,
396 self.num_channels,
397 );
398
399 debug_assert_eq!(pushed_frames, packet_frames);
400 },
401 None,
402 true,
403 );
404
405 return if proc_frames < input_frames {
406 PushStatus::OverflowOccurred {
407 num_frames_pushed: proc_frames,
408 }
409 } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
410 PushStatus::UnderflowCorrected {
411 num_zero_frames_pushed: zero_frames_pushed,
412 }
413 } else {
414 PushStatus::Ok
415 };
416 }
417
418 let pushed_frames = push_internal(
419 &mut self.prod,
420 input,
421 input_range.start,
422 input_frames,
423 self.num_channels,
424 );
425
426 if pushed_frames < input_frames {
427 PushStatus::OverflowOccurred {
428 num_frames_pushed: pushed_frames,
429 }
430 } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
431 PushStatus::UnderflowCorrected {
432 num_zero_frames_pushed: zero_frames_pushed,
433 }
434 } else {
435 PushStatus::Ok
436 }
437 }
438
439 /// Push the given data in interleaved format.
440 ///
441 /// This method is realtime-safe.
442 pub fn push_interleaved(&mut self, input: &[T]) -> PushStatus {
443 self.set_input_stream_ready(true);
444
445 if !self.output_stream_ready() {
446 return PushStatus::OutputNotReady;
447 }
448
449 self.poll_reset();
450
451 #[cfg(feature = "resampler")]
452 if self.resampler.is_some() {
453 let input_frames = input.len() / self.num_channels.get();
454
455 let available_frames = self.available_frames();
456
457 let proc_frames = input_frames.min(available_frames);
458
459 self.resampler.as_mut().unwrap().process_interleaved(
460 &input[..proc_frames * self.num_channels.get()],
461 |output_packet| {
462 let pushed_samples = self.prod.push_slice(output_packet);
463
464 debug_assert_eq!(pushed_samples, output_packet.len());
465 },
466 None,
467 true,
468 );
469
470 return if proc_frames < input_frames {
471 PushStatus::OverflowOccurred {
472 num_frames_pushed: proc_frames,
473 }
474 } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
475 PushStatus::UnderflowCorrected {
476 num_zero_frames_pushed: zero_frames_pushed,
477 }
478 } else {
479 PushStatus::Ok
480 };
481 }
482
483 let pushed_samples = self.prod.push_slice(input);
484
485 if pushed_samples < input.len() {
486 PushStatus::OverflowOccurred {
487 num_frames_pushed: pushed_samples / self.num_channels.get(),
488 }
489 } else if let Some(zero_frames_pushed) = self.autocorrect_underflows() {
490 PushStatus::UnderflowCorrected {
491 num_zero_frames_pushed: zero_frames_pushed,
492 }
493 } else {
494 PushStatus::Ok
495 }
496 }
497
498 /// Returns the number of input frames (samples in a single channel of audio)
499 /// that are currently available to be pushed to the channel.
500 ///
501 /// If the output stream is not ready yet, then this will return `0`.
502 ///
503 /// This method is realtime-safe.
504 pub fn available_frames(&mut self) -> usize {
505 if !self.output_stream_ready() {
506 return 0;
507 }
508
509 self.poll_reset();
510
511 let output_vacant_frames = self.prod.vacant_len() / self.num_channels.get();
512
513 #[cfg(feature = "resampler")]
514 if let Some(resampler) = &self.resampler {
515 let mut input_vacant_frames =
516 (output_vacant_frames as f64 * self.output_to_input_ratio).floor() as usize;
517
518 // Give some leeway to account for floating point inaccuracies.
519 if input_vacant_frames > 0 {
520 input_vacant_frames -= 1;
521 }
522
523 if input_vacant_frames < resampler.input_block_frames() {
524 return 0;
525 }
526
527 // The resampler processes in chunks.
528 input_vacant_frames = (input_vacant_frames / resampler.input_block_frames())
529 * resampler.input_block_frames();
530
531 return input_vacant_frames - resampler.tmp_input_frames();
532 }
533
534 output_vacant_frames
535 }
536
537 /// The amount of data in seconds that is available to be pushed to the
538 /// channel.
539 ///
540 /// If the output stream is not ready yet, then this will return `0.0`.
541 ///
542 /// This method is realtime-safe.
543 pub fn available_seconds(&mut self) -> f64 {
544 self.available_frames() as f64 * self.in_sample_rate_recip
545 }
546
547 /// The amount of data that is currently occupied in the channel, in units of
548 /// output frames (samples in a single channel of audio).
549 ///
550 /// Note, this is the number of frames in the *output* audio stream, not the
551 /// input audio stream.
552 ///
553 /// This method is realtime-safe.
554 pub fn occupied_output_frames(&self) -> usize {
555 self.prod.occupied_len() / self.num_channels.get()
556 }
557
558 /// The amount of data that is currently occupied in the channel, in units of
559 /// seconds.
560 ///
561 /// This method is realtime-safe.
562 pub fn occupied_seconds(&self) -> f64 {
563 self.occupied_output_frames() as f64 * self.out_sample_rate_recip
564 }
565
566 /// The number of channels configured for this stream.
567 ///
568 /// This method is realtime-safe.
569 pub fn num_channels(&self) -> NonZeroUsize {
570 self.num_channels
571 }
572
573 /// The sample rate of the input stream.
574 ///
575 /// This method is realtime-safe.
576 pub fn in_sample_rate(&self) -> u32 {
577 self.in_sample_rate
578 }
579
580 /// The sample rate of the output stream.
581 ///
582 /// This method is realtime-safe.
583 pub fn out_sample_rate(&self) -> u32 {
584 self.out_sample_rate
585 }
586
587 /// The latency of the channel in units of seconds.
588 ///
589 /// This method is realtime-safe.
590 pub fn latency_seconds(&self) -> f64 {
591 self.latency_seconds
592 }
593
594 /// Returns `true` if this channel is currently resampling.
595 ///
596 /// This method is realtime-safe.
597 #[cfg(feature = "resampler")]
598 pub fn is_resampling(&self) -> bool {
599 self.resampler.is_some()
600 }
601
602 /// Tell the consumer to clear all queued frames in the buffer.
603 ///
604 /// This method is realtime-safe.
605 pub fn reset(&mut self) {
606 self.shared_state.reset.store(true, Ordering::Relaxed);
607
608 self.waiting_for_output_to_reset = true;
609
610 #[cfg(feature = "resampler")]
611 if let Some(resampler) = &mut self.resampler {
612 resampler.reset();
613 }
614 }
615
616 /// Manually notify the output stream that the input stream is ready/not ready
617 /// to push samples to the channel.
618 ///
619 /// If this producer end is being used in a non-realtime context, then it is
620 /// a good idea to set this to `true` so that the consumer end can start
621 /// reading samples from the channel immediately.
622 ///
623 /// Note, calling [`ResamplingProd::push`] and
624 /// [`ResamplingProd::push_interleaved`] automatically sets the input stream as
625 /// ready.
626 ///
627 /// This method is realtime-safe.
628 pub fn set_input_stream_ready(&mut self, ready: bool) {
629 self.shared_state
630 .input_stream_ready
631 .store(ready, Ordering::Relaxed);
632 }
633
634 /// Whether or not the output stream is ready to read samples from the channel.
635 ///
636 /// This method is realtime-safe.
637 pub fn output_stream_ready(&self) -> bool {
638 self.shared_state
639 .output_stream_ready
640 .load(Ordering::Relaxed)
641 && !self.shared_state.reset.load(Ordering::Relaxed)
642 }
643
644 /// Correct for any underflows.
645 ///
646 /// This returns the number of extra zero frames (samples in a single channel of audio)
647 /// that were added due to an underflow occurring. If no underflow occured, then `None`
648 /// is returned.
649 ///
650 /// Note, this method is already automatically called in [`ResamplingProd::push`] and
651 /// [`ResamplingProd::push_interleaved`].
652 ///
653 /// This will have no effect if [`ResamplingChannelConfig::underflow_autocorrect_percent_threshold`]
654 /// was set to `None`.
655 ///
656 /// This method is realtime-safe.
657 pub fn autocorrect_underflows(&mut self) -> Option<usize> {
658 if !self.output_stream_ready() {
659 return None;
660 }
661
662 self.poll_reset();
663
664 if let Some(underflow_autocorrect_threshold_samples) =
665 self.underflow_autocorrect_threshold_samples
666 {
667 let len = self.prod.occupied_len();
668 if len <= underflow_autocorrect_threshold_samples && len < self.channel_latency_samples
669 {
670 let correction_samples = self.channel_latency_samples - len;
671
672 self.prod
673 .push_iter((0..correction_samples).map(|_| T::zero()));
674
675 return Some(correction_samples / self.num_channels.get());
676 }
677 }
678
679 None
680 }
681
682 fn poll_reset(&mut self) {
683 if self.waiting_for_output_to_reset {
684 self.waiting_for_output_to_reset = false;
685
686 // Fill the channel with initial zeros to create the desired latency.
687 self.prod
688 .push_iter((0..self.channel_latency_samples).map(|_| T::zero()));
689 }
690 }
691}
692
693/// The consumer end of a realtime-safe spsc channel for sending samples across
694/// streams.
695///
696/// If the input and output samples rates differ, then this will automatically
697/// resample the input stream to match the output stream. If the sample rates
698/// match, then no resampling will occur.
699///
700/// Internally this uses the `rubato` and `ringbuf` crates.
701pub struct ResamplingCons<T: Sample> {
702 cons: ringbuf::HeapCons<T>,
703 num_channels: NonZeroUsize,
704 latency_seconds: f64,
705 latency_frames: usize,
706 channel_latency_samples: usize,
707 in_sample_rate: u32,
708 out_sample_rate: u32,
709 out_sample_rate_recip: f64,
710 shared_state: Arc<SharedState>,
711 waiting_for_input_to_reset: bool,
712 overflow_autocorrect_threshold_samples: Option<usize>,
713
714 #[cfg(feature = "resampler")]
715 resampler_output_delay: usize,
716 #[cfg(feature = "resampler")]
717 is_resampling: bool,
718}
719
720impl<T: Sample> ResamplingCons<T> {
721 /// The number of channels configured for this stream.
722 ///
723 /// This method is realtime-safe.
724 pub fn num_channels(&self) -> NonZeroUsize {
725 self.num_channels
726 }
727
728 /// The sample rate of the input stream.
729 ///
730 /// This method is realtime-safe.
731 pub fn in_sample_rate(&self) -> u32 {
732 self.in_sample_rate
733 }
734
735 /// The sample rate of the output stream.
736 ///
737 /// This method is realtime-safe.
738 pub fn out_sample_rate(&self) -> u32 {
739 self.out_sample_rate
740 }
741
742 /// The latency of the channel in units of seconds.
743 ///
744 /// This method is realtime-safe.
745 pub fn latency_seconds(&self) -> f64 {
746 self.latency_seconds
747 }
748
749 /// The latency of the channel in units of output frames.
750 ///
751 /// This method is realtime-safe.
752 pub fn latency_frames(&self) -> usize {
753 self.latency_frames
754 }
755
756 /// The number of frames (samples in a single channel of audio) that are
757 /// currently available to be read from the channel.
758 ///
759 /// If the input stream is not ready yet, then this will return `0`.
760 ///
761 /// This method is realtime-safe.
762 pub fn available_frames(&self) -> usize {
763 if self.input_stream_ready() {
764 self.cons.occupied_len() / self.num_channels.get()
765 } else {
766 0
767 }
768 }
769
770 /// The amount of data in seconds that is currently available to be read
771 /// from the channel.
772 ///
773 /// If the input stream is not ready yet, then this will return `0.0`.
774 ///
775 /// This method is realtime-safe.
776 pub fn available_seconds(&self) -> f64 {
777 self.available_frames() as f64 * self.out_sample_rate_recip
778 }
779
780 /// The amount of data that is currently occupied in the channel, in units of
781 /// seconds.
782 ///
783 /// This method is realtime-safe.
784 pub fn occupied_seconds(&self) -> f64 {
785 (self.cons.occupied_len() / self.num_channels.get()) as f64 * self.out_sample_rate_recip
786 }
787
788 /// Returns `true` if this channel is currently resampling.
789 ///
790 /// This method is realtime-safe.
791 #[cfg(feature = "resampler")]
792 pub fn is_resampling(&self) -> bool {
793 self.is_resampling
794 }
795
796 /// The delay of the internal resampler in number of output frames (samples in
797 /// a single channel of audio).
798 ///
799 /// If there is no resampler being used for this channel, then this will return
800 /// `0`.
801 ///
802 /// This method is realtime-safe.
803 #[cfg(feature = "resampler")]
804 pub fn resampler_output_delay(&self) -> usize {
805 self.resampler_output_delay
806 }
807
808 /// Discard a certian number of output frames from the buffer. This can be used
809 /// to correct for jitter and avoid excessive overflows and reduce the percieved
810 /// audible glitchiness.
811 ///
812 /// This will discard `frames.min(self.available_frames())` frames.
813 ///
814 /// Returns the number of output frames that were discarded.
815 ///
816 /// This method is realtime-safe.
817 pub fn discard_frames(&mut self, frames: usize) -> usize {
818 self.cons.skip(frames * self.num_channels.get()) / self.num_channels.get()
819 }
820
821 /// Read from the channel and store the results in the de-interleaved
822 /// output buffer.
823 ///
824 /// This method is realtime-safe.
825 pub fn read<Vout: AsMut<[T]>>(
826 &mut self,
827 output: &mut [Vout],
828 output_range: Range<usize>,
829 ) -> ReadStatus {
830 self.set_output_stream_ready(true);
831
832 self.poll_reset();
833
834 if !self.input_stream_ready() {
835 for ch in output.iter_mut() {
836 ch.as_mut()[output_range.clone()].fill(T::zero());
837 }
838
839 return ReadStatus::InputNotReady;
840 }
841
842 self.waiting_for_input_to_reset = false;
843
844 if output.len() > self.num_channels.get() {
845 for ch in output.iter_mut().skip(self.num_channels.get()) {
846 ch.as_mut()[output_range.clone()].fill(T::zero());
847 }
848 }
849
850 let output_frames = output_range.end - output_range.start;
851
852 // Simply copy the input stream to the output.
853 let (s1, s2) = self.cons.as_slices();
854
855 let s1_frames = s1.len() / self.num_channels.get();
856 let s1_copy_frames = s1_frames.min(output_frames);
857
858 fast_interleave::deinterleave_variable(
859 s1,
860 self.num_channels,
861 output,
862 output_range.start..output_range.start + s1_copy_frames,
863 );
864
865 let mut filled_frames = s1_copy_frames;
866
867 if output_frames > s1_copy_frames {
868 let s2_frames = s2.len() / self.num_channels.get();
869 let s2_copy_frames = s2_frames.min(output_frames - s1_copy_frames);
870
871 fast_interleave::deinterleave_variable(
872 s2,
873 self.num_channels,
874 output,
875 output_range.start + s1_copy_frames
876 ..output_range.start + s1_copy_frames + s2_copy_frames,
877 );
878
879 filled_frames += s2_copy_frames;
880 }
881
882 // SAFETY:
883 //
884 // * `T` implements `Copy`, so it does not have a drop method that needs to
885 // be called.
886 // * `self` is borrowed as mutable in this method, ensuring that the consumer
887 // cannot be accessed concurrently.
888 unsafe {
889 self.cons
890 .advance_read_index(filled_frames * self.num_channels.get());
891 }
892
893 if filled_frames < output_frames {
894 for (_, ch) in (0..self.num_channels.get()).zip(output.iter_mut()) {
895 ch.as_mut()[filled_frames..output_range.end].fill(T::zero());
896 }
897
898 ReadStatus::UnderflowOccurred {
899 num_frames_read: filled_frames,
900 }
901 } else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
902 ReadStatus::OverflowCorrected {
903 num_frames_discarded,
904 }
905 } else {
906 ReadStatus::Ok
907 }
908 }
909
910 /// Read from the channel and store the results into the output buffer
911 /// in interleaved format.
912 ///
913 /// This method is realtime-safe.
914 pub fn read_interleaved(&mut self, output: &mut [T]) -> ReadStatus {
915 self.set_output_stream_ready(true);
916
917 self.poll_reset();
918
919 if !self.input_stream_ready() {
920 output.fill(T::zero());
921
922 return ReadStatus::InputNotReady;
923 }
924
925 self.waiting_for_input_to_reset = false;
926
927 let out_frames = output.len() / self.num_channels.get();
928
929 let pushed_samples = self
930 .cons
931 .pop_slice(&mut output[..out_frames * self.num_channels.get()]);
932
933 if pushed_samples < output.len() {
934 output[pushed_samples..].fill(T::zero());
935
936 ReadStatus::UnderflowOccurred {
937 num_frames_read: pushed_samples / self.num_channels.get(),
938 }
939 } else if let Some(num_frames_discarded) = self.autocorrect_overflows() {
940 ReadStatus::OverflowCorrected {
941 num_frames_discarded,
942 }
943 } else {
944 ReadStatus::Ok
945 }
946 }
947
948 /// Poll the channel to see if it got a command to reset.
949 ///
950 /// Returns `true` if the channel was reset.
951 pub fn poll_reset(&mut self) -> bool {
952 if self.shared_state.reset.load(Ordering::Relaxed) {
953 self.shared_state.reset.store(false, Ordering::Relaxed);
954 self.waiting_for_input_to_reset = true;
955
956 self.cons.clear();
957
958 true
959 } else {
960 false
961 }
962 }
963
964 /// Manually notify the input stream that the output stream is ready/not ready
965 /// to read samples from the channel.
966 ///
967 /// If this consumer end is being used in a non-realtime context, then it is
968 /// a good idea to set this to `true` so that the producer end can start
969 /// pushing samples to the channel immediately.
970 ///
971 /// Note, calling [`ResamplingCons::read`] and
972 /// [`ResamplingCons::read_interleaved`] automatically sets the output stream as
973 /// ready.
974 ///
975 /// This method is realtime-safe.
976 pub fn set_output_stream_ready(&mut self, ready: bool) {
977 self.shared_state
978 .output_stream_ready
979 .store(ready, Ordering::Relaxed);
980 }
981
982 /// Whether or not the input stream is ready to push samples to the channel.
983 ///
984 /// This method is realtime-safe.
985 pub fn input_stream_ready(&self) -> bool {
986 self.shared_state.input_stream_ready.load(Ordering::Relaxed)
987 && !(self.waiting_for_input_to_reset && self.cons.is_empty())
988 }
989
990 /// Correct for any overflows.
991 ///
992 /// This returns the number of frames (samples in a single channel of audio) that were
993 /// discarded due to an overflow occurring. If no overflow occured, then `None`
994 /// is returned.
995 ///
996 /// Note, this method is already automatically called in [`ResamplingCons::read`] and
997 /// [`ResamplingCons::read_interleaved`].
998 ///
999 /// This will have no effect if [`ResamplingChannelConfig::overflow_autocorrect_percent_threshold`]
1000 /// was set to `None`.
1001 ///
1002 /// This method is realtime-safe.
1003 pub fn autocorrect_overflows(&mut self) -> Option<usize> {
1004 if let Some(overflow_autocorrect_threshold_samples) =
1005 self.overflow_autocorrect_threshold_samples
1006 {
1007 let len = self.cons.occupied_len();
1008
1009 if len >= overflow_autocorrect_threshold_samples && len > self.channel_latency_samples {
1010 let correction_frames =
1011 (len - self.channel_latency_samples) / self.num_channels.get();
1012
1013 self.discard_frames(correction_frames);
1014
1015 return Some(correction_frames);
1016 }
1017 }
1018
1019 None
1020 }
1021}
1022
1023/// The status of pushing samples to [`ResamplingProd::push`] and
1024/// [`ResamplingProd::push_interleaved`].
1025#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1026pub enum PushStatus {
1027 /// All samples were successfully pushed to the channel.
1028 Ok,
1029 /// The output stream is not yet ready to read samples from the channel.
1030 ///
1031 /// Note, this can also happen when the channel is reset.
1032 ///
1033 /// No samples were pushed to the channel.
1034 OutputNotReady,
1035 /// An overflow occured due to the input stream running faster than the
1036 /// output stream. Some or all of the samples were not pushed to the channel.
1037 ///
1038 /// If this occurs, then it may mean that [`ResamplingChannelConfig::capacity_seconds`]
1039 /// is too low and should be increased.
1040 OverflowOccurred {
1041 /// The number of frames (samples in a single channel of audio) that were
1042 /// successfully pushed to the channel.
1043 num_frames_pushed: usize,
1044 },
1045 /// An underflow occured due to the output stream running faster than the
1046 /// input stream.
1047 ///
1048 /// All of the samples were successfully pushed to the channel, however extra
1049 /// zero samples were also pushed to the channel to correct for the jitter.
1050 ///
1051 /// If this occurs, then it may mean that [`ResamplingChannelConfig::latency_seconds`]
1052 /// is too low and should be increased.
1053 UnderflowCorrected {
1054 /// The number of zero frames that were pushed after the other samples
1055 /// were pushed.
1056 num_zero_frames_pushed: usize,
1057 },
1058}
1059
1060/// The status of reading data from [`ResamplingCons::read`] and
1061/// [`ResamplingCons::read_interleaved`].
1062#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1063pub enum ReadStatus {
1064 /// The output buffer was fully filled with samples from the channel.
1065 Ok,
1066 /// The input stream is not yet ready to push samples to the channel.
1067 ///
1068 /// Note, this can also happen when the channel is reset.
1069 ///
1070 /// The output buffer was filled with zeros.
1071 InputNotReady,
1072 /// An underflow occured due to the output stream running faster than the input
1073 /// stream. Some or all of the samples in the output buffer have been filled with
1074 /// zeros on the end. This may result in audible audio glitches.
1075 ///
1076 /// If this occurs, then it may mean that [`ResamplingChannelConfig::latency_seconds`]
1077 /// is too low and should be increased.
1078 UnderflowOccurred {
1079 /// The number of frames (samples in a single channel of audio) that were
1080 /// successfully read from the channel. All frames past this have been filled
1081 /// with zeros.
1082 num_frames_read: usize,
1083 },
1084 /// An overflow occured due to the input stream running faster than the output
1085 /// stream
1086 ///
1087 /// All of the samples in the output buffer were successfully filled with samples,
1088 /// however a number of frames have also been discarded to correct for the jitter.
1089 ///
1090 /// If this occurs, then it may mean that [`ResamplingChannelConfig::capacity_seconds`]
1091 /// is too low and should be increased.
1092 OverflowCorrected {
1093 /// The number of frames that were discarded from the channel (after the
1094 /// frames have been read into the output buffer).
1095 num_frames_discarded: usize,
1096 },
1097}
1098
1099struct SharedState {
1100 reset: AtomicBool,
1101 input_stream_ready: AtomicBool,
1102 output_stream_ready: AtomicBool,
1103}
1104
1105impl SharedState {
1106 fn new() -> Self {
1107 Self {
1108 reset: AtomicBool::new(false),
1109 input_stream_ready: AtomicBool::new(false),
1110 output_stream_ready: AtomicBool::new(false),
1111 }
1112 }
1113}
1114
1115fn push_internal<T: Sample, Vin: AsRef<[T]>>(
1116 prod: &mut ringbuf::HeapProd<T>,
1117 input: &[Vin],
1118 in_start_frame: usize,
1119 frames: usize,
1120 num_channels: NonZeroUsize,
1121) -> usize {
1122 let (s1, s2) = prod.vacant_slices_mut();
1123
1124 if s1.len() == 0 {
1125 return 0;
1126 }
1127
1128 let s1_frames = s1.len() / num_channels.get();
1129 let s1_copy_frames = s1_frames.min(frames);
1130
1131 let mut frames_pushed = s1_copy_frames;
1132
1133 {
1134 // SAFETY:
1135 //
1136 // * `&mut [MaybeUninit<T>]` and `&mut [T]` are the same bit-for-bit.
1137 // * All data in the slice is initialized in the `interleave` method below.
1138 //
1139 // TODO: Remove unsafe on `maybe_uninit_write_slice` stabilization.
1140 let s1: &mut [T] =
1141 unsafe { std::mem::transmute(&mut s1[..s1_copy_frames * num_channels.get()]) };
1142
1143 fast_interleave::interleave_variable(
1144 input,
1145 in_start_frame..in_start_frame + s1_copy_frames,
1146 s1,
1147 num_channels,
1148 );
1149 }
1150
1151 if frames > s1_copy_frames && s2.len() > 0 {
1152 let s2_frames = s2.len() / num_channels.get();
1153 let s2_copy_frames = s2_frames.min(frames - s1_copy_frames);
1154
1155 // SAFETY:
1156 //
1157 // * `&mut [MaybeUninit<T>]` and `&mut [T]` are the same bit-for-bit.
1158 // * All data in the slice is initialized in the `interleave` method below.
1159 //
1160 // TODO: Remove unsafe on `maybe_uninit_write_slice` stabilization.
1161 let s2: &mut [T] =
1162 unsafe { std::mem::transmute(&mut s2[..s2_copy_frames * num_channels.get()]) };
1163
1164 fast_interleave::interleave_variable(
1165 input,
1166 in_start_frame + s1_copy_frames..in_start_frame + s1_copy_frames + s2_copy_frames,
1167 s2,
1168 num_channels,
1169 );
1170
1171 frames_pushed += s2_copy_frames;
1172 }
1173
1174 // SAFETY:
1175 //
1176 // * All frames up to `frames_pushed` was filled with data above.
1177 // * `prod` is borrowed as mutable in this method, ensuring that it cannot be
1178 // accessed concurrently.
1179 unsafe {
1180 prod.advance_write_index(frames_pushed * num_channels.get());
1181 }
1182
1183 frames_pushed
1184}