firewheel_nodes/stream/
writer.rs

1use bevy_platform::sync::{
2    atomic::{AtomicBool, Ordering},
3    Arc, Mutex,
4};
5use core::{
6    num::{NonZeroU32, NonZeroUsize},
7    ops::Range,
8};
9
10use firewheel_core::{
11    channel_config::{ChannelConfig, ChannelCount, NonZeroChannelCount},
12    collector::ArcGc,
13    dsp::declick::{DeclickFadeCurve, Declicker},
14    event::{NodeEventType, ProcEvents},
15    mask::{MaskType, SilenceMask},
16    node::{
17        AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers,
18        ProcExtra, ProcInfo, ProcStreamCtx, ProcessStatus,
19    },
20};
21use fixed_resample::{ReadStatus, ResamplingChannelConfig};
22
23pub use fixed_resample::PushStatus;
24
25pub const MAX_CHANNELS: usize = 16;
26
27/// The configuration of a [`StreamWriterNode`]
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
30#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct StreamWriterConfig {
33    /// The number of channels.
34    pub channels: NonZeroChannelCount,
35
36    /// Whether or not to check for silence in the input stream. Highly
37    /// recommened to set this to `true` to improve audio graph performance
38    /// when there is no input on the microphone.
39    ///
40    /// By default this is set to `true`.
41    pub check_for_silence: bool,
42}
43
44impl Default for StreamWriterConfig {
45    fn default() -> Self {
46        Self {
47            channels: NonZeroChannelCount::STEREO,
48            check_for_silence: true,
49        }
50    }
51}
52
53/// A node that takes blocks of raw audio data a thread and plays
54/// them in the audio graph
55#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
56#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
57#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59pub struct StreamWriterNode;
60
61#[derive(Clone)]
62pub struct StreamWriterState {
63    channels: NonZeroChannelCount,
64    active_state: Option<ActiveState>,
65    shared_state: ArcGc<SharedState>,
66}
67
68impl StreamWriterState {
69    fn new(channels: NonZeroChannelCount) -> Self {
70        Self {
71            channels,
72            active_state: None,
73            shared_state: ArcGc::new(SharedState::new()),
74        }
75    }
76
77    /// Returns `true` if there is there is currently an active stream on this node.
78    pub fn is_active(&self) -> bool {
79        self.active_state.is_some() && self.shared_state.stream_active.load(Ordering::Relaxed)
80    }
81
82    /// Returns `true` if an underflow occured (due to the output stream
83    /// running faster than the input stream).
84    ///
85    /// If this happens excessively in Release mode, you may want to consider
86    /// increasing [`ResamplingChannelConfig::latency_seconds`].
87    ///
88    /// (Calling this will also reset the flag indicating whether an
89    /// underflow occurred.)
90    pub fn underflow_occurred(&self) -> bool {
91        self.shared_state
92            .underflow_occurred
93            .swap(false, Ordering::Relaxed)
94    }
95
96    /// Returns `true` if an overflow occured (due to the input stream
97    /// running faster than the output stream).
98    ///
99    /// If this happens excessively in Release mode, you may want to consider
100    /// increasing [`ResamplingChannelConfig::capacity_seconds`]. For
101    /// example, if you are streaming data from a network, you may want to
102    /// increase the capacity to several seconds.
103    ///
104    /// (Calling this will also reset the flag indicating whether an
105    /// overflow occurred.)
106    pub fn overflow_occurred(&self) -> bool {
107        self.shared_state
108            .overflow_occurred
109            .swap(false, Ordering::Relaxed)
110    }
111
112    /// The total number of frames (not samples) that can currently be pushed to the stream.
113    ///
114    /// If there is no active stream, the stream is paused, or the processor end
115    /// is not ready to receive samples, then this will return `0`.
116    pub fn available_frames(&self) -> usize {
117        if self.is_ready() {
118            self.active_state
119                .as_ref()
120                .map(|s| s.prod.lock().unwrap().available_frames())
121                .unwrap_or(0)
122        } else {
123            0
124        }
125    }
126
127    /// The amount of data in seconds that is currently occupied in the channel.
128    ///
129    /// This value will be in the range `[0.0, ResamplingChannelConfig::capacity_seconds]`.
130    ///
131    /// If there is no active stream, then this will return `None`.
132    pub fn occupied_seconds(&self) -> Option<f64> {
133        self.active_state
134            .as_ref()
135            .map(|s| s.prod.lock().unwrap().occupied_seconds())
136    }
137
138    /// The number of channels in this node.
139    pub fn num_channels(&self) -> NonZeroChannelCount {
140        self.channels
141    }
142
143    /// The sample rate of the active stream.
144    ///
145    /// Returns `None` if there is no active stream.
146    pub fn sample_rate(&self) -> Option<NonZeroU32> {
147        self.active_state.as_ref().map(|s| s.sample_rate)
148    }
149
150    /// Begin the input audio stream on this node.
151    ///
152    /// The returned event must be sent to the node's processor for this to take effect.
153    ///
154    /// * `sample_rate` - The sample rate of this node.
155    /// * `output_stream_sample_rate` - The sample rate of the active output audio stream.
156    /// * `channel_config` - The configuration of the input to output channel.
157    ///
158    /// If there is already an active stream running on this node, then this will return
159    /// an error.
160    pub fn start_stream(
161        &mut self,
162        sample_rate: NonZeroU32,
163        output_stream_sample_rate: NonZeroU32,
164        channel_config: ResamplingChannelConfig,
165    ) -> Result<NewInputStreamEvent, ()> {
166        if self.is_active() {
167            return Err(());
168        }
169
170        self.shared_state.reset();
171
172        let (prod, cons) = fixed_resample::resampling_channel::<f32, MAX_CHANNELS>(
173            NonZeroUsize::new(self.channels.get().get() as usize).unwrap(),
174            sample_rate.get(),
175            output_stream_sample_rate.get(),
176            channel_config,
177        );
178
179        self.active_state = Some(ActiveState {
180            prod: Arc::new(Mutex::new(prod)),
181            sample_rate,
182        });
183        self.shared_state
184            .stream_active
185            .store(true, Ordering::Relaxed);
186
187        Ok(NewInputStreamEvent { cons: Some(cons) })
188    }
189
190    /// Push the given data in interleaved format.
191    ///
192    /// Returns the number of frames (not samples) that were successfully pushed.
193    /// If this number is less than the number of frames in `data`, then it means
194    /// an overflow has occured.
195    ///
196    /// If there is no active stream, the stream is paused, or the processor end
197    /// is not ready to receive samples, then no data will be sent and this will
198    /// return `0`.
199    pub fn push_interleaved(&mut self, data: &[f32]) -> PushStatus {
200        if !self.is_ready() {
201            return PushStatus::OutputNotReady;
202        }
203
204        self.active_state
205            .as_mut()
206            .unwrap()
207            .prod
208            .lock()
209            .unwrap()
210            .push_interleaved(data)
211    }
212
213    /// Push the given data in de-interleaved format.
214    ///
215    /// * `data` - The channels of data to push to the channel.
216    /// * `range` - The range in each slice in `input` to read data from.
217    ///
218    /// Returns the number of frames (not samples) that were successfully pushed.
219    /// If this number is less than the number of frames in `data`, then it means
220    /// an overflow has occured.
221    ///b
222    /// If there is no active stream, the stream is paused, or the processor end
223    /// is not ready to receive samples, then no data will be sent and this will
224    /// return `0`.
225    pub fn push<Vin: AsRef<[f32]>>(&mut self, data: &[Vin], range: Range<usize>) -> PushStatus {
226        if !self.is_ready() {
227            return PushStatus::OutputNotReady;
228        }
229
230        self.active_state
231            .as_mut()
232            .unwrap()
233            .prod
234            .lock()
235            .unwrap()
236            .push(data, range)
237    }
238
239    /// Returns `true` if the processor end of the stream is ready to start receiving
240    /// data.
241    pub fn is_ready(&self) -> bool {
242        self.active_state.is_some()
243            && self.shared_state.channel_started.load(Ordering::Relaxed)
244            && !self.shared_state.paused.load(Ordering::Relaxed)
245    }
246
247    /// Pause any active audio streams.
248    pub fn pause_stream(&mut self) {
249        if self.is_active() {
250            self.shared_state.paused.store(true, Ordering::Relaxed);
251        }
252    }
253
254    /// Correct for any underflows.
255    ///
256    /// This returns the number of extra zero frames (samples in a single channel of audio)
257    /// that were added due to an underflow occurring. If no underflow occured, then `None`
258    /// is returned.
259    ///
260    /// Note, this method is already automatically called in [`StreamWriterState::push`] and
261    /// [`StreamWriterState::push_interleaved`].
262    ///
263    /// This will have no effect if [`ResamplingChannelConfig::underflow_autocorrect_percent_threshold`]
264    /// was set to `None`.
265    ///
266    /// This method is realtime-safe.
267    pub fn autocorrect_underflows(&mut self) -> Option<usize> {
268        if let Some(state) = &mut self.active_state {
269            state.prod.lock().unwrap().autocorrect_underflows()
270        } else {
271            None
272        }
273    }
274
275    /// Resume any active audio streams after pausing.
276    pub fn resume(&mut self) {
277        self.shared_state.paused.store(false, Ordering::Relaxed);
278    }
279
280    // Stop any active audio input streams.
281    pub fn stop_stream(&mut self) {
282        self.active_state = None;
283        self.shared_state.reset();
284    }
285
286    pub fn handle(&self) -> Mutex<Self> {
287        Mutex::new((*self).clone())
288    }
289}
290
291impl Drop for StreamWriterState {
292    fn drop(&mut self) {
293        self.stop_stream();
294    }
295}
296
297impl AudioNode for StreamWriterNode {
298    type Configuration = StreamWriterConfig;
299
300    fn info(&self, config: &Self::Configuration) -> AudioNodeInfo {
301        AudioNodeInfo::new()
302            .debug_name("stream_writer")
303            .channel_config(ChannelConfig {
304                num_inputs: ChannelCount::ZERO,
305                num_outputs: config.channels.get(),
306            })
307            .custom_state(StreamWriterState::new(config.channels))
308    }
309
310    fn construct_processor(
311        &self,
312        config: &Self::Configuration,
313        cx: ConstructProcessorContext,
314    ) -> impl AudioNodeProcessor {
315        Processor {
316            cons: None,
317            shared_state: ArcGc::clone(
318                &cx.custom_state::<StreamWriterState>().unwrap().shared_state,
319            ),
320            check_for_silence: config.check_for_silence,
321            pause_declicker: Declicker::SettledAt0,
322        }
323    }
324}
325
326#[derive(Clone)]
327struct ActiveState {
328    prod: Arc<Mutex<fixed_resample::ResamplingProd<f32, MAX_CHANNELS>>>,
329    sample_rate: NonZeroU32,
330}
331
332struct SharedState {
333    stream_active: AtomicBool,
334    channel_started: AtomicBool,
335    paused: AtomicBool,
336    underflow_occurred: AtomicBool,
337    overflow_occurred: AtomicBool,
338}
339
340impl SharedState {
341    fn new() -> Self {
342        Self {
343            stream_active: AtomicBool::new(false),
344            channel_started: AtomicBool::new(false),
345            paused: AtomicBool::new(false),
346            underflow_occurred: AtomicBool::new(false),
347            overflow_occurred: AtomicBool::new(false),
348        }
349    }
350
351    fn reset(&self) {
352        self.stream_active.store(false, Ordering::Relaxed);
353        self.channel_started.store(false, Ordering::Relaxed);
354        self.paused.store(false, Ordering::Relaxed);
355        self.underflow_occurred.store(false, Ordering::Relaxed);
356        self.overflow_occurred.store(false, Ordering::Relaxed);
357    }
358}
359
360struct Processor {
361    cons: Option<fixed_resample::ResamplingCons<f32>>,
362    shared_state: ArcGc<SharedState>,
363    check_for_silence: bool,
364    pause_declicker: Declicker,
365}
366
367impl AudioNodeProcessor for Processor {
368    fn process(
369        &mut self,
370        info: &ProcInfo,
371        buffers: ProcBuffers,
372        events: &mut ProcEvents,
373        extra: &mut ProcExtra,
374    ) -> ProcessStatus {
375        for mut event in events.drain() {
376            if let Some(in_stream_event) = event.downcast_mut::<NewInputStreamEvent>() {
377                // Swap the values so that the old consumer gets dropped on
378                // the main thread.
379                core::mem::swap(&mut self.cons, &mut in_stream_event.cons);
380            }
381        }
382
383        let enabled = self.shared_state.stream_active.load(Ordering::Relaxed)
384            && !self.shared_state.paused.load(Ordering::Relaxed);
385
386        self.pause_declicker
387            .fade_to_enabled(enabled, &extra.declick_values);
388
389        if self.pause_declicker.disabled() {
390            return ProcessStatus::ClearAllOutputs;
391        }
392
393        let Some(cons) = &mut self.cons else {
394            self.pause_declicker.reset_to_0();
395            return ProcessStatus::ClearAllOutputs;
396        };
397
398        // Notify the input stream that the output stream has begun
399        // reading data.
400        self.shared_state
401            .channel_started
402            .store(true, Ordering::Relaxed);
403
404        let status = cons.read(buffers.outputs, 0..info.frames);
405
406        match status {
407            ReadStatus::UnderflowOccurred { num_frames_read: _ } => {
408                self.shared_state
409                    .underflow_occurred
410                    .store(true, Ordering::Relaxed);
411            }
412            ReadStatus::OverflowCorrected {
413                num_frames_discarded: _,
414            } => {
415                self.shared_state
416                    .overflow_occurred
417                    .store(true, Ordering::Relaxed);
418            }
419            _ => {}
420        }
421
422        if !self.pause_declicker.has_settled() {
423            self.pause_declicker.process(
424                buffers.outputs,
425                0..info.frames,
426                &extra.declick_values,
427                1.0,
428                DeclickFadeCurve::EqualPower3dB,
429            );
430        }
431
432        let mut silence_mask = SilenceMask::NONE_SILENT;
433        if self.check_for_silence {
434            let resampler_channels = cons.num_channels().get();
435
436            for (ch_i, ch) in buffers.outputs.iter().enumerate() {
437                if ch_i >= resampler_channels {
438                    // `cons.read()` clears any extra channels
439                    silence_mask.set_channel(ch_i, true);
440                } else {
441                    let mut all_silent = true;
442                    for &s in ch[..info.frames].iter() {
443                        if s != 0.0 {
444                            all_silent = false;
445                            break;
446                        }
447                    }
448
449                    if all_silent {
450                        silence_mask.set_channel(ch_i, true);
451                    }
452                }
453            }
454        }
455
456        ProcessStatus::OutputsModifiedWithMask(MaskType::Silence(silence_mask))
457    }
458
459    fn stream_stopped(&mut self, _context: &mut ProcStreamCtx) {
460        self.shared_state
461            .stream_active
462            .store(false, Ordering::Relaxed);
463        self.cons = None;
464        self.pause_declicker.reset_to_0();
465    }
466}
467
468pub struct NewInputStreamEvent {
469    cons: Option<fixed_resample::ResamplingCons<f32>>,
470}
471
472impl From<NewInputStreamEvent> for NodeEventType {
473    fn from(value: NewInputStreamEvent) -> Self {
474        NodeEventType::custom(value)
475    }
476}