firewheel_nodes/stream/
reader.rs

1use bevy_platform::sync::{
2    atomic::{AtomicBool, Ordering},
3    Arc, Mutex,
4};
5use core::{
6    num::{NonZeroU32, NonZeroUsize},
7    ops::Range,
8};
9
10use firewheel_core::{
11    channel_config::{ChannelConfig, ChannelCount, NonZeroChannelCount},
12    collector::ArcGc,
13    event::{NodeEventType, ProcEvents},
14    log::RealtimeLogger,
15    node::{
16        AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers,
17        ProcExtra, ProcInfo, ProcessStatus,
18    },
19};
20use fixed_resample::{PushStatus, ReadStatus, ResamplingChannelConfig};
21
22pub const MAX_CHANNELS: usize = 16;
23
24#[derive(Debug, Clone, Copy, PartialEq)]
25#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
26#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
27pub struct StreamReaderConfig {
28    /// The number of channels.
29    pub channels: NonZeroChannelCount,
30}
31
32impl Default for StreamReaderConfig {
33    fn default() -> Self {
34        Self {
35            channels: NonZeroChannelCount::STEREO,
36        }
37    }
38}
39
40#[derive(Default, Debug, Clone, Copy)]
41#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
42#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
43pub struct StreamReaderNode;
44
45#[derive(Clone)]
46pub struct StreamReaderState {
47    channels: NonZeroChannelCount,
48    active_state: Option<ActiveState>,
49    shared_state: ArcGc<SharedState>,
50}
51
52impl StreamReaderState {
53    pub fn new(channels: NonZeroChannelCount) -> Self {
54        assert!((channels.get().get() as usize) < MAX_CHANNELS);
55
56        Self {
57            channels,
58            active_state: None,
59            shared_state: ArcGc::new(SharedState::new()),
60        }
61    }
62
63    /// Returns `true` if there is there is currently an active stream on this node.
64    pub fn is_active(&self) -> bool {
65        self.active_state.is_some() && self.shared_state.stream_active.load(Ordering::Relaxed)
66    }
67
68    /// Returns `true` if an underflow occured (due to the output stream
69    /// running faster than the input stream).
70    ///
71    /// If this happens excessively in Release mode, you may want to consider
72    /// increasing [`ResamplingChannelConfig::latency_seconds`].
73    ///
74    /// (Calling this will also reset the flag indicating whether an
75    /// underflow occurred.)out
76    pub fn underflow_occurred(&self) -> bool {
77        self.shared_state
78            .underflow_occurred
79            .swap(false, Ordering::Relaxed)
80    }
81
82    /// Returns `true` if an overflow occured (due to the input stream
83    /// running faster than the output stream).
84    ///
85    /// If this happens excessively in Release mode, you may want to consider
86    /// increasing [`ResamplingChannelConfig::capacity_seconds`]. For
87    /// example, if you are streaming data from a network, you may want to
88    /// increase the capacity to several seconds.
89    ///
90    /// (Calling this will also reset the flag indicating whether an
91    /// overflow occurred.)
92    pub fn overflow_occurred(&self) -> bool {
93        self.shared_state
94            .overflow_occurred
95            .swap(false, Ordering::Relaxed)
96    }
97
98    /// Begin the output audio stream on this node.
99    ///
100    /// The returned event must be sent to the node's processor for this to take effect.
101    ///
102    /// * `sample_rate` - The sample rate of this node.
103    /// * `output_stream_sample_rate` - The sample rate of the active output audio stream.
104    /// * `channel_config` - The configuration of the input to output channel.
105    ///
106    /// If there is already an active stream running on this node, then this will return
107    /// an error.
108    pub fn start_stream(
109        &mut self,
110        sample_rate: NonZeroU32,
111        output_stream_sample_rate: NonZeroU32,
112        channel_config: ResamplingChannelConfig,
113    ) -> Result<NewOutputStreamEvent, ()> {
114        if self.is_active() {
115            return Err(());
116        }
117
118        self.shared_state.reset();
119
120        let (prod, cons) = fixed_resample::resampling_channel::<f32, MAX_CHANNELS>(
121            NonZeroUsize::new(self.channels.get().get() as usize).unwrap(),
122            output_stream_sample_rate.get(),
123            sample_rate.get(),
124            channel_config,
125        );
126
127        self.active_state = Some(ActiveState {
128            cons: Arc::new(Mutex::new(cons)),
129            sample_rate,
130        });
131        self.shared_state
132            .stream_active
133            .store(true, Ordering::Relaxed);
134
135        Ok(NewOutputStreamEvent { prod: Some(prod) })
136    }
137
138    /// The total number of frames (not samples) that can currently be read from
139    /// the stream.
140    ///
141    /// If there is no active stream, the stream is paused, or the processor end
142    /// is not ready to receive samples, then this will return `0`.
143    pub fn available_frames(&self) -> usize {
144        if self.is_ready() {
145            self.active_state
146                .as_ref()
147                .map(|s| s.cons.lock().unwrap().available_frames())
148                .unwrap_or(0)
149        } else {
150            0
151        }
152    }
153
154    /// The amount of data in seconds that is currently occupied in the channel.
155    ///
156    /// This value will be in the range `[0.0, ResamplingChannelConfig::capacity_seconds]`.
157    ///
158    /// This can also be used to detect when an extra packet of data should be read or
159    /// discarded to correct for jitter.
160    ///
161    /// If there is no active stream, then this will return `None`.
162    pub fn occupied_seconds(&self) -> Option<f64> {
163        self.active_state
164            .as_ref()
165            .map(|s| s.cons.lock().unwrap().occupied_seconds())
166    }
167
168    /// The number of channels in this node.
169    pub fn num_channels(&self) -> NonZeroChannelCount {
170        self.channels
171    }
172
173    /// The sample rate of the active stream.
174    ///
175    /// Returns `None` if there is no active stream.
176    pub fn sample_rate(&self) -> Option<NonZeroU32> {
177        self.active_state.as_ref().map(|s| s.sample_rate)
178    }
179
180    /// Read from the channel and write the results into the given output buffer
181    /// in interleaved format.
182    ///
183    /// If there is no active stream, the stream is paused, or the processor end
184    /// is not ready to send samples, then the output will be filled with zeros
185    /// and `None` will be returned.
186    pub fn read_interleaved(&mut self, output: &mut [f32]) -> Option<ReadStatus> {
187        if !self.is_ready() {
188            output.fill(0.0);
189            return None;
190        }
191
192        Some(
193            self.active_state
194                .as_mut()
195                .unwrap()
196                .cons
197                .lock()
198                .unwrap()
199                .read_interleaved(output),
200        )
201    }
202
203    /// Read from the channel and write the results into the given output buffer in
204    /// de-interleaved format.
205    ///
206    /// * `output` - The channels to write data to.
207    /// * `range` - The range in each slice in `output` to write to.
208    ///
209    /// If there is no active stream, the stream is paused, or the processor end
210    /// is not ready to send samples, then the output will be filled with zeros
211    /// and `None` will be returned.
212    pub fn read<Vin: AsMut<[f32]>>(
213        &mut self,
214        output: &mut [Vin],
215        range: Range<usize>,
216    ) -> Option<ReadStatus> {
217        if !self.is_ready() {
218            for ch in output.iter_mut() {
219                ch.as_mut()[range.clone()].fill(0.0);
220            }
221            return None;
222        }
223
224        Some(
225            self.active_state
226                .as_mut()
227                .unwrap()
228                .cons
229                .lock()
230                .unwrap()
231                .read(output, range),
232        )
233    }
234
235    /// Discard a certian number of output frames from the buffer. This can be used to
236    /// correct for jitter and avoid excessive overflows and reduce the percieved audible
237    /// glitchiness.
238    ///
239    /// This will discard `frames.min(self.available_frames())` frames.
240    ///
241    /// Returns the number of output frames that were discarded.
242    pub fn discard_frames(&mut self) -> usize {
243        if let Some(state) = &mut self.active_state {
244            state.cons.lock().unwrap().discard_frames(usize::MAX)
245        } else {
246            0
247        }
248    }
249
250    /// Correct for any overflows.
251    ///
252    /// This returns the number of frames (samples in a single channel of audio) that were
253    /// discarded due to an overflow occurring. If no overflow occured, then `None`
254    /// is returned.
255    ///
256    /// Note, this method is already automatically called in [`StreamReaderState::read`] and
257    /// [`StreamReaderState::read_interleaved`].
258    ///
259    /// This will have no effect if [`ResamplingChannelConfig::overflow_autocorrect_percent_threshold`]
260    /// was set to `None`.
261    ///
262    /// This method is realtime-safe.
263    pub fn autocorrect_overflows(&mut self) -> Option<usize> {
264        if let Some(state) = &mut self.active_state {
265            state.cons.lock().unwrap().autocorrect_overflows()
266        } else {
267            None
268        }
269    }
270
271    /// Returns `true` if the processor end of the stream is ready to start sending
272    /// data.
273    pub fn is_ready(&self) -> bool {
274        self.active_state.is_some()
275            && self.shared_state.channel_started.load(Ordering::Relaxed)
276            && !self.shared_state.paused.load(Ordering::Relaxed)
277    }
278
279    /// Pause any active audio streams.
280    pub fn pause_stream(&mut self) {
281        if self.is_active() {
282            self.shared_state.paused.store(true, Ordering::Relaxed);
283        }
284    }
285
286    /// Resume any active audio streams after pausing.
287    pub fn resume(&mut self) {
288        self.shared_state.paused.store(false, Ordering::Relaxed);
289    }
290
291    // Stop any active audio input streams.
292    pub fn stop_stream(&mut self) {
293        self.active_state = None;
294        self.shared_state.reset();
295    }
296
297    pub fn handle(&self) -> Mutex<Self> {
298        Mutex::new((*self).clone())
299    }
300}
301
302impl Drop for StreamReaderState {
303    fn drop(&mut self) {
304        self.stop_stream();
305    }
306}
307
308impl AudioNode for StreamReaderNode {
309    type Configuration = StreamReaderConfig;
310
311    fn info(&self, config: &Self::Configuration) -> AudioNodeInfo {
312        AudioNodeInfo::new()
313            .debug_name("stream_reader")
314            .channel_config(ChannelConfig {
315                num_inputs: config.channels.get(),
316                num_outputs: ChannelCount::ZERO,
317            })
318            .custom_state(StreamReaderState::new(config.channels))
319    }
320
321    fn construct_processor(
322        &self,
323        _config: &Self::Configuration,
324        cx: ConstructProcessorContext,
325    ) -> impl AudioNodeProcessor {
326        Processor {
327            prod: None,
328            shared_state: ArcGc::clone(
329                &cx.custom_state::<StreamReaderState>().unwrap().shared_state,
330            ),
331        }
332    }
333}
334
335#[derive(Clone)]
336struct ActiveState {
337    cons: Arc<Mutex<fixed_resample::ResamplingCons<f32>>>,
338    sample_rate: NonZeroU32,
339}
340
341struct SharedState {
342    stream_active: AtomicBool,
343    channel_started: AtomicBool,
344    paused: AtomicBool,
345    underflow_occurred: AtomicBool,
346    overflow_occurred: AtomicBool,
347}
348
349impl SharedState {
350    fn new() -> Self {
351        Self {
352            stream_active: AtomicBool::new(false),
353            channel_started: AtomicBool::new(false),
354            paused: AtomicBool::new(false),
355            underflow_occurred: AtomicBool::new(false),
356            overflow_occurred: AtomicBool::new(false),
357        }
358    }
359
360    fn reset(&self) {
361        self.stream_active.store(false, Ordering::Relaxed);
362        self.channel_started.store(false, Ordering::Relaxed);
363        self.paused.store(false, Ordering::Relaxed);
364        self.underflow_occurred.store(false, Ordering::Relaxed);
365        self.overflow_occurred.store(false, Ordering::Relaxed);
366    }
367}
368
369struct Processor {
370    prod: Option<fixed_resample::ResamplingProd<f32, MAX_CHANNELS>>,
371    shared_state: ArcGc<SharedState>,
372}
373
374impl AudioNodeProcessor for Processor {
375    fn process(
376        &mut self,
377        info: &ProcInfo,
378        buffers: ProcBuffers,
379        events: &mut ProcEvents,
380        _extra: &mut ProcExtra,
381    ) -> ProcessStatus {
382        for mut event in events.drain() {
383            if let Some(out_stream_event) = event.downcast_mut::<NewOutputStreamEvent>() {
384                // Swap the values so that the old producer gets dropped on
385                // the main thread.
386                core::mem::swap(&mut self.prod, &mut out_stream_event.prod);
387            }
388        }
389
390        if !self.shared_state.stream_active.load(Ordering::Relaxed)
391            || self.shared_state.paused.load(Ordering::Relaxed)
392        {
393            return ProcessStatus::Bypass;
394        }
395
396        let Some(prod) = &mut self.prod else {
397            return ProcessStatus::Bypass;
398        };
399
400        // Notify the input stream that the output stream has begun
401        // reading data.
402        self.shared_state
403            .channel_started
404            .store(true, Ordering::Relaxed);
405
406        let status = prod.push(buffers.inputs, 0..info.frames);
407
408        match status {
409            PushStatus::OverflowOccurred {
410                num_frames_pushed: _,
411            } => {
412                self.shared_state
413                    .overflow_occurred
414                    .store(true, Ordering::Relaxed);
415            }
416            PushStatus::UnderflowCorrected {
417                num_zero_frames_pushed: _,
418            } => {
419                self.shared_state
420                    .underflow_occurred
421                    .store(true, Ordering::Relaxed);
422            }
423            _ => {}
424        }
425
426        ProcessStatus::Bypass
427    }
428
429    fn stream_stopped(&mut self, _logger: &mut RealtimeLogger) {
430        self.shared_state
431            .stream_active
432            .store(false, Ordering::Relaxed);
433        self.prod = None;
434    }
435}
436
437pub struct NewOutputStreamEvent {
438    prod: Option<fixed_resample::ResamplingProd<f32, MAX_CHANNELS>>,
439}
440
441impl From<NewOutputStreamEvent> for NodeEventType {
442    fn from(value: NewOutputStreamEvent) -> Self {
443        NodeEventType::custom(value)
444    }
445}