Skip to main content

firewheel_nodes/stream/
reader.rs

1use bevy_platform::sync::{
2    atomic::{AtomicBool, Ordering},
3    Arc, Mutex,
4};
5use core::{
6    num::{NonZeroU32, NonZeroUsize},
7    ops::Range,
8};
9
10use firewheel_core::{
11    channel_config::{ChannelConfig, ChannelCount, NonZeroChannelCount},
12    collector::ArcGc,
13    event::{NodeEventType, ProcEvents},
14    node::{
15        AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers,
16        ProcExtra, ProcInfo, ProcStreamCtx, ProcessStatus,
17    },
18};
19use fixed_resample::{PushStatus, ReadStatus, ResamplingChannelConfig};
20
21pub const MAX_CHANNELS: usize = 16;
22
23/// The configuration of a [`StreamReaderNode`]
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
26#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
27#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
28pub struct StreamReaderConfig {
29    /// The number of channels.
30    pub channels: NonZeroChannelCount,
31}
32
33impl Default for StreamReaderConfig {
34    fn default() -> Self {
35        Self {
36            channels: NonZeroChannelCount::STEREO,
37        }
38    }
39}
40
41/// A node that sends blocks of raw audio data from the audio graph
42/// to another thread
43#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
44#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
45#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
46#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
47pub struct StreamReaderNode;
48
49#[derive(Clone)]
50pub struct StreamReaderState {
51    channels: NonZeroChannelCount,
52    active_state: Option<ActiveState>,
53    shared_state: ArcGc<SharedState>,
54}
55
56impl StreamReaderState {
57    pub fn new(channels: NonZeroChannelCount) -> Self {
58        assert!((channels.get().get() as usize) < MAX_CHANNELS);
59
60        Self {
61            channels,
62            active_state: None,
63            shared_state: ArcGc::new(SharedState::new()),
64        }
65    }
66
67    /// Returns `true` if there is there is currently an active stream on this node.
68    pub fn is_active(&self) -> bool {
69        self.active_state.is_some() && self.shared_state.stream_active.load(Ordering::Relaxed)
70    }
71
72    /// Returns `true` if an underflow occured (due to the output stream
73    /// running faster than the input stream).
74    ///
75    /// If this happens excessively in Release mode, you may want to consider
76    /// increasing [`ResamplingChannelConfig::latency_seconds`].
77    ///
78    /// (Calling this will also reset the flag indicating whether an
79    /// underflow occurred.)out
80    pub fn underflow_occurred(&self) -> bool {
81        self.shared_state
82            .underflow_occurred
83            .swap(false, Ordering::Relaxed)
84    }
85
86    /// Returns `true` if an overflow occured (due to the input stream
87    /// running faster than the output stream).
88    ///
89    /// If this happens excessively in Release mode, you may want to consider
90    /// increasing [`ResamplingChannelConfig::capacity_seconds`]. For
91    /// example, if you are streaming data from a network, you may want to
92    /// increase the capacity to several seconds.
93    ///
94    /// (Calling this will also reset the flag indicating whether an
95    /// overflow occurred.)
96    pub fn overflow_occurred(&self) -> bool {
97        self.shared_state
98            .overflow_occurred
99            .swap(false, Ordering::Relaxed)
100    }
101
102    /// Begin the output audio stream on this node.
103    ///
104    /// The returned event must be sent to the node's processor for this to take effect.
105    ///
106    /// * `sample_rate` - The sample rate of this node.
107    /// * `output_stream_sample_rate` - The sample rate of the active output audio stream.
108    /// * `channel_config` - The configuration of the input to output channel.
109    ///
110    /// If there is already an active stream running on this node, then this will return
111    /// an error.
112    pub fn start_stream(
113        &mut self,
114        sample_rate: NonZeroU32,
115        output_stream_sample_rate: NonZeroU32,
116        channel_config: ResamplingChannelConfig,
117    ) -> Result<NewOutputStreamEvent, ()> {
118        if self.is_active() {
119            return Err(());
120        }
121
122        self.shared_state.reset();
123
124        let (prod, cons) = fixed_resample::resampling_channel::<f32, MAX_CHANNELS>(
125            NonZeroUsize::new(self.channels.get().get() as usize).unwrap(),
126            output_stream_sample_rate.get(),
127            sample_rate.get(),
128            channel_config,
129        );
130
131        self.active_state = Some(ActiveState {
132            cons: Arc::new(Mutex::new(cons)),
133            sample_rate,
134        });
135        self.shared_state
136            .stream_active
137            .store(true, Ordering::Relaxed);
138
139        Ok(NewOutputStreamEvent { prod: Some(prod) })
140    }
141
142    /// The total number of frames (not samples) that can currently be read from
143    /// the stream.
144    ///
145    /// If there is no active stream, the stream is paused, or the processor end
146    /// is not ready to receive samples, then this will return `0`.
147    pub fn available_frames(&self) -> usize {
148        if self.is_ready() {
149            self.active_state
150                .as_ref()
151                .map(|s| s.cons.lock().unwrap().available_frames())
152                .unwrap_or(0)
153        } else {
154            0
155        }
156    }
157
158    /// The amount of data in seconds that is currently occupied in the channel.
159    ///
160    /// This value will be in the range `[0.0, ResamplingChannelConfig::capacity_seconds]`.
161    ///
162    /// This can also be used to detect when an extra packet of data should be read or
163    /// discarded to correct for jitter.
164    ///
165    /// If there is no active stream, then this will return `None`.
166    pub fn occupied_seconds(&self) -> Option<f64> {
167        self.active_state
168            .as_ref()
169            .map(|s| s.cons.lock().unwrap().occupied_seconds())
170    }
171
172    /// The number of channels in this node.
173    pub fn num_channels(&self) -> NonZeroChannelCount {
174        self.channels
175    }
176
177    /// The sample rate of the active stream.
178    ///
179    /// Returns `None` if there is no active stream.
180    pub fn sample_rate(&self) -> Option<NonZeroU32> {
181        self.active_state.as_ref().map(|s| s.sample_rate)
182    }
183
184    /// Read from the channel and write the results into the given output buffer
185    /// in interleaved format.
186    ///
187    /// If there is no active stream, the stream is paused, or the processor end
188    /// is not ready to send samples, then the output will be filled with zeros
189    /// and `None` will be returned.
190    pub fn read_interleaved(&mut self, output: &mut [f32]) -> Option<ReadStatus> {
191        if !self.is_ready() {
192            output.fill(0.0);
193            return None;
194        }
195
196        Some(
197            self.active_state
198                .as_mut()
199                .unwrap()
200                .cons
201                .lock()
202                .unwrap()
203                .read_interleaved(output),
204        )
205    }
206
207    /// Read from the channel and write the results into the given output buffer in
208    /// de-interleaved format.
209    ///
210    /// * `output` - The channels to write data to.
211    /// * `range` - The range in each slice in `output` to write to.
212    ///
213    /// If there is no active stream, the stream is paused, or the processor end
214    /// is not ready to send samples, then the output will be filled with zeros
215    /// and `None` will be returned.
216    pub fn read<Vin: AsMut<[f32]>>(
217        &mut self,
218        output: &mut [Vin],
219        range: Range<usize>,
220    ) -> Option<ReadStatus> {
221        if !self.is_ready() {
222            for ch in output.iter_mut() {
223                ch.as_mut()[range.clone()].fill(0.0);
224            }
225            return None;
226        }
227
228        Some(
229            self.active_state
230                .as_mut()
231                .unwrap()
232                .cons
233                .lock()
234                .unwrap()
235                .read(output, range),
236        )
237    }
238
239    /// Discard a certian number of output frames from the buffer. This can be used to
240    /// correct for jitter and avoid excessive overflows and reduce the percieved audible
241    /// glitchiness.
242    ///
243    /// This will discard `frames.min(self.available_frames())` frames.
244    ///
245    /// Returns the number of output frames that were discarded.
246    pub fn discard_frames(&mut self) -> usize {
247        if let Some(state) = &mut self.active_state {
248            state.cons.lock().unwrap().discard_frames(usize::MAX)
249        } else {
250            0
251        }
252    }
253
254    /// Correct for any overflows.
255    ///
256    /// This returns the number of frames (samples in a single channel of audio) that were
257    /// discarded due to an overflow occurring. If no overflow occured, then `None`
258    /// is returned.
259    ///
260    /// Note, this method is already automatically called in [`StreamReaderState::read`] and
261    /// [`StreamReaderState::read_interleaved`].
262    ///
263    /// This will have no effect if [`ResamplingChannelConfig::overflow_autocorrect_percent_threshold`]
264    /// was set to `None`.
265    ///
266    /// This method is realtime-safe.
267    pub fn autocorrect_overflows(&mut self) -> Option<usize> {
268        if let Some(state) = &mut self.active_state {
269            state.cons.lock().unwrap().autocorrect_overflows()
270        } else {
271            None
272        }
273    }
274
275    /// Returns `true` if the processor end of the stream is ready to start sending
276    /// data.
277    pub fn is_ready(&self) -> bool {
278        self.active_state.is_some()
279            && self.shared_state.channel_started.load(Ordering::Relaxed)
280            && !self.shared_state.paused.load(Ordering::Relaxed)
281    }
282
283    /// Pause any active audio streams.
284    pub fn pause_stream(&mut self) {
285        if self.is_active() {
286            self.shared_state.paused.store(true, Ordering::Relaxed);
287        }
288    }
289
290    /// Resume any active audio streams after pausing.
291    pub fn resume(&mut self) {
292        self.shared_state.paused.store(false, Ordering::Relaxed);
293    }
294
295    // Stop any active audio input streams.
296    pub fn stop_stream(&mut self) {
297        self.active_state = None;
298        self.shared_state.reset();
299    }
300
301    pub fn handle(&self) -> Mutex<Self> {
302        Mutex::new((*self).clone())
303    }
304}
305
306impl Drop for StreamReaderState {
307    fn drop(&mut self) {
308        self.stop_stream();
309    }
310}
311
312impl AudioNode for StreamReaderNode {
313    type Configuration = StreamReaderConfig;
314
315    fn info(&self, config: &Self::Configuration) -> AudioNodeInfo {
316        AudioNodeInfo::new()
317            .debug_name("stream_reader")
318            .channel_config(ChannelConfig {
319                num_inputs: config.channels.get(),
320                num_outputs: ChannelCount::ZERO,
321            })
322            .custom_state(StreamReaderState::new(config.channels))
323    }
324
325    fn construct_processor(
326        &self,
327        _config: &Self::Configuration,
328        cx: ConstructProcessorContext,
329    ) -> impl AudioNodeProcessor {
330        Processor {
331            prod: None,
332            shared_state: ArcGc::clone(
333                &cx.custom_state::<StreamReaderState>().unwrap().shared_state,
334            ),
335        }
336    }
337}
338
339#[derive(Clone)]
340struct ActiveState {
341    cons: Arc<Mutex<fixed_resample::ResamplingCons<f32>>>,
342    sample_rate: NonZeroU32,
343}
344
345struct SharedState {
346    stream_active: AtomicBool,
347    channel_started: AtomicBool,
348    paused: AtomicBool,
349    underflow_occurred: AtomicBool,
350    overflow_occurred: AtomicBool,
351}
352
353impl SharedState {
354    fn new() -> Self {
355        Self {
356            stream_active: AtomicBool::new(false),
357            channel_started: AtomicBool::new(false),
358            paused: AtomicBool::new(false),
359            underflow_occurred: AtomicBool::new(false),
360            overflow_occurred: AtomicBool::new(false),
361        }
362    }
363
364    fn reset(&self) {
365        self.stream_active.store(false, Ordering::Relaxed);
366        self.channel_started.store(false, Ordering::Relaxed);
367        self.paused.store(false, Ordering::Relaxed);
368        self.underflow_occurred.store(false, Ordering::Relaxed);
369        self.overflow_occurred.store(false, Ordering::Relaxed);
370    }
371}
372
373struct Processor {
374    prod: Option<fixed_resample::ResamplingProd<f32, MAX_CHANNELS>>,
375    shared_state: ArcGc<SharedState>,
376}
377
378impl AudioNodeProcessor for Processor {
379    fn process(
380        &mut self,
381        info: &ProcInfo,
382        buffers: ProcBuffers,
383        events: &mut ProcEvents,
384        _extra: &mut ProcExtra,
385    ) -> ProcessStatus {
386        for mut event in events.drain() {
387            if let Some(out_stream_event) = event.downcast_mut::<NewOutputStreamEvent>() {
388                // Swap the values so that the old producer gets dropped on
389                // the main thread.
390                core::mem::swap(&mut self.prod, &mut out_stream_event.prod);
391            }
392        }
393
394        if !self.shared_state.stream_active.load(Ordering::Relaxed)
395            || self.shared_state.paused.load(Ordering::Relaxed)
396        {
397            return ProcessStatus::Bypass;
398        }
399
400        let Some(prod) = &mut self.prod else {
401            return ProcessStatus::Bypass;
402        };
403
404        // Notify the input stream that the output stream has begun
405        // reading data.
406        self.shared_state
407            .channel_started
408            .store(true, Ordering::Relaxed);
409
410        let status = prod.push(buffers.inputs, 0..info.frames);
411
412        match status {
413            PushStatus::OverflowOccurred {
414                num_frames_pushed: _,
415            } => {
416                self.shared_state
417                    .overflow_occurred
418                    .store(true, Ordering::Relaxed);
419            }
420            PushStatus::UnderflowCorrected {
421                num_zero_frames_pushed: _,
422            } => {
423                self.shared_state
424                    .underflow_occurred
425                    .store(true, Ordering::Relaxed);
426            }
427            _ => {}
428        }
429
430        ProcessStatus::Bypass
431    }
432
433    fn stream_stopped(&mut self, _context: &mut ProcStreamCtx) {
434        self.shared_state
435            .stream_active
436            .store(false, Ordering::Relaxed);
437        self.prod = None;
438    }
439}
440
441pub struct NewOutputStreamEvent {
442    prod: Option<fixed_resample::ResamplingProd<f32, MAX_CHANNELS>>,
443}
444
445impl From<NewOutputStreamEvent> for NodeEventType {
446    fn from(value: NewOutputStreamEvent) -> Self {
447        NodeEventType::custom(value)
448    }
449}