Skip to main content

firewheel_nodes/
triple_buffer.rs

1use bevy_platform::sync::{Arc, Mutex, MutexGuard};
2use core::num::NonZeroU32;
3use firewheel_core::{
4    channel_config::{ChannelConfig, ChannelCount, NonZeroChannelCount},
5    diff::{Diff, EventQueue, Patch, PatchError, PathBuilder},
6    event::{ParamData, ProcEvents},
7    node::{
8        AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers,
9        ProcExtra, ProcInfo, ProcStreamCtx, ProcessStatus,
10    },
11    StreamInfo,
12};
13
14#[cfg(not(feature = "std"))]
15use bevy_platform::prelude::Vec;
16#[cfg(not(feature = "std"))]
17use num_traits::Float;
18
19/// The configuration of a [`TripleBufferNode`]
20#[derive(Debug, Clone, Copy, PartialEq)]
21#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
22#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
23#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
24pub struct TripleBufferConfig {
25    /// The number of channels
26    pub channels: NonZeroChannelCount,
27    /// The maximum window size that can be used
28    pub max_window_size: WindowSize,
29}
30
31impl Default for TripleBufferConfig {
32    fn default() -> Self {
33        Self {
34            channels: NonZeroChannelCount::STEREO,
35            max_window_size: WindowSize::default(),
36        }
37    }
38}
39
40/// The window size for a [`TripleBufferNode`]
41#[derive(Debug, Clone, Copy, PartialEq)]
42#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
43#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
44#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
45pub enum WindowSize {
46    /// Use the capacity in units of samples (of a single channel
47    /// of audio)
48    Samples(u32),
49    /// Use the capacity in units of seconds
50    Seconds(f64),
51}
52
53impl WindowSize {
54    pub fn as_frames(&self, sample_rate: NonZeroU32) -> u32 {
55        match self {
56            Self::Samples(samples) => *samples,
57            Self::Seconds(seconds) => (seconds * (sample_rate.get() as f64)).round() as u32,
58        }
59    }
60}
61
62impl Default for WindowSize {
63    fn default() -> Self {
64        Self::Samples(2048)
65    }
66}
67
68impl Diff for WindowSize {
69    fn diff<E: EventQueue>(&self, baseline: &Self, path: PathBuilder, event_queue: &mut E) {
70        if self != baseline {
71            match self {
72                WindowSize::Samples(samples) => event_queue.push_param(*samples, path),
73                WindowSize::Seconds(seconds) => event_queue.push_param(*seconds, path),
74            }
75        }
76    }
77}
78
79impl Patch for WindowSize {
80    type Patch = Self;
81
82    fn patch(data: &ParamData, _: &[u32]) -> Result<Self::Patch, PatchError> {
83        match data {
84            ParamData::U32(samples) => Ok(Self::Samples(*samples)),
85            ParamData::F64(seconds) => Ok(Self::Seconds(*seconds)),
86            _ => Err(PatchError::InvalidData),
87        }
88    }
89
90    fn apply(&mut self, value: Self::Patch) {
91        *self = value;
92    }
93}
94
95/// A node that sends raw audio data from the audio graph to another
96/// thread. Useful for cases where you only care about the latest data
97/// in the buffer, such as for creating visualizers.
98#[derive(Diff, Patch, Debug, Clone, Copy, PartialEq)]
99#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
100#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct TripleBufferNode {
103    /// The window size (the number of frames in each channel in the output buffer)
104    pub window_size: WindowSize,
105    /// Whether or not the node is enabled.
106    ///
107    /// Disable when not in use to save on CPU resources.
108    pub enabled: bool,
109}
110
111impl Default for TripleBufferNode {
112    fn default() -> Self {
113        Self {
114            window_size: WindowSize::default(),
115            enabled: true,
116        }
117    }
118}
119
120#[derive(Clone)]
121pub struct TripleBufferState {
122    num_channels: NonZeroChannelCount,
123    active_state: Arc<Mutex<Option<ActiveState>>>,
124}
125
126impl TripleBufferState {
127    /// The number of channels in this buffer.
128    pub fn num_channels(&self) -> NonZeroChannelCount {
129        self.num_channels
130    }
131
132    /// Get the latest audio data in the triple buffer.
133    pub fn output<'a>(&'a mut self) -> OutputAudioData<'a> {
134        OutputAudioData {
135            guarded_state: self.active_state.lock().unwrap(),
136        }
137    }
138}
139
140struct ActiveState {
141    consumer: triple_buffer::Output<TripleBufferData>,
142    sample_rate: NonZeroU32,
143}
144
145pub struct OutputAudioData<'a> {
146    guarded_state: MutexGuard<'a, Option<ActiveState>>,
147}
148
149impl<'a> OutputAudioData<'a> {
150    /// Returns `true` if the node is currently active.
151    pub fn is_active(&self) -> bool {
152        self.guarded_state.is_some()
153    }
154
155    /// The sample rate of the audio data.
156    ///
157    /// If the node is not currently active, then this will return `None`.
158    pub fn sample_rate(&self) -> Option<NonZeroU32> {
159        self.guarded_state.as_ref().map(|s| s.sample_rate)
160    }
161
162    /// Get the latest channels of audio data.
163    ///
164    /// The samples are in de-interleaved format (one `Vec` for each channel). The
165    /// length of each `Vec` will be equal to the `window_size` parameter at the
166    /// time the buffer was last updated.
167    ///
168    /// If the node is not currently active, then this will return `None`.
169    pub fn channels<'b>(&'b mut self) -> Option<&'b [Vec<f32>]> {
170        self.guarded_state
171            .as_mut()
172            .map(|s| s.consumer.read().buffers.as_slice())
173    }
174
175    /// Get the latest channels of audio data, along with a "generation" value.
176    ///
177    /// The generation value is equal to how many times the buffer has been updated
178    /// since the node was first created. This can be used to quickly check if the
179    /// buffer differs from the previous read.
180    ///
181    /// The samples are in de-interleaved format (one `Vec` for each channel). The
182    /// length of each `Vec` will be equal to the `window_size` parameter at the
183    /// time the buffer was last updated.
184    ///
185    /// If the node is not currently active, then this will return `None`.
186    pub fn channels_with_generation<'b>(&'b mut self) -> Option<(&'b [Vec<f32>], u64)> {
187        self.guarded_state.as_mut().map(|s| {
188            let data = s.consumer.read();
189            (data.buffers.as_slice(), data.generation)
190        })
191    }
192
193    /// Peek the audio data that is currently in the buffer without checking if
194    /// there is new data.
195    ///
196    /// The samples are in de-interleaved format (one `Vec` for each channel). The
197    /// length of each `Vec` will be equal to the `window_size` parameter at the
198    /// time the buffer was last updated.
199    ///
200    /// If the node is not currently active, then this will return `None`.
201    pub fn peek_channels<'b>(&'b self) -> Option<&'b [Vec<f32>]> {
202        self.guarded_state
203            .as_ref()
204            .map(|s| s.consumer.peek_output_buffer().buffers.as_slice())
205    }
206}
207
208impl AudioNode for TripleBufferNode {
209    type Configuration = TripleBufferConfig;
210
211    fn info(&self, config: &Self::Configuration) -> AudioNodeInfo {
212        AudioNodeInfo::new()
213            .debug_name("triple_buffer")
214            .channel_config(ChannelConfig {
215                num_inputs: config.channels.get(),
216                num_outputs: ChannelCount::ZERO,
217            })
218            .custom_state(TripleBufferState {
219                num_channels: config.channels,
220                active_state: Arc::new(Mutex::new(None)),
221            })
222    }
223
224    fn construct_processor(
225        &self,
226        config: &Self::Configuration,
227        mut cx: ConstructProcessorContext,
228    ) -> impl AudioNodeProcessor {
229        let sample_rate = cx.stream_info.sample_rate;
230        let max_window_size_frames = config.max_window_size.as_frames(sample_rate) as usize;
231
232        let (producer, consumer) =
233            triple_buffer::triple_buffer::<TripleBufferData>(&TripleBufferData::new(
234                config.channels.get().get() as usize,
235                max_window_size_frames,
236                0,
237            ));
238
239        let state = cx.custom_state_mut::<TripleBufferState>().unwrap();
240
241        *state.active_state.lock().unwrap() = Some(ActiveState {
242            consumer,
243            sample_rate,
244        });
245        let active_state = Arc::clone(&state.active_state);
246
247        let window_size_frames =
248            (self.window_size.as_frames(sample_rate) as usize).min(max_window_size_frames);
249
250        let tmp_ring_buffer = (0..config.channels.get().get() as usize)
251            .map(|_| {
252                let mut v = Vec::new();
253                v.reserve_exact(max_window_size_frames);
254                v.resize(window_size_frames, 0.0);
255                v
256            })
257            .collect();
258
259        Processor {
260            producer: Some(producer),
261            config: *config,
262            max_window_size_frames,
263            params: *self,
264            window_size_frames,
265            tmp_ring_buffer,
266            ring_buf_ptr: 0,
267            active_state,
268            generation: 0,
269            prev_publish_was_silent: true,
270            num_silent_frames_in_tmp: window_size_frames,
271            tmp_buffer_needs_cleared: false,
272        }
273    }
274}
275
276struct Processor {
277    producer: Option<triple_buffer::Input<TripleBufferData>>,
278    config: TripleBufferConfig,
279    max_window_size_frames: usize,
280
281    params: TripleBufferNode,
282    window_size_frames: usize,
283
284    tmp_ring_buffer: Vec<Vec<f32>>,
285    ring_buf_ptr: usize,
286
287    // The processor only uses this when a new stream has started.
288    active_state: Arc<Mutex<Option<ActiveState>>>,
289    generation: u64,
290
291    prev_publish_was_silent: bool,
292    num_silent_frames_in_tmp: usize,
293    tmp_buffer_needs_cleared: bool,
294}
295
296impl AudioNodeProcessor for Processor {
297    fn process(
298        &mut self,
299        info: &ProcInfo,
300        buffers: ProcBuffers,
301        events: &mut ProcEvents,
302        _extra: &mut ProcExtra,
303    ) -> ProcessStatus {
304        let was_enabled = self.params.enabled;
305
306        for patch in events.drain_patches::<TripleBufferNode>() {
307            match patch {
308                TripleBufferNodePatch::WindowSize(window_size) => {
309                    self.window_size_frames = (window_size.as_frames(info.sample_rate) as usize)
310                        .min(self.max_window_size_frames);
311                }
312                _ => {}
313            }
314
315            self.params.apply(patch);
316        }
317
318        let producer = self.producer.as_mut().unwrap();
319
320        if !self.params.enabled {
321            if was_enabled {
322                {
323                    let buffer = producer.input_buffer_mut();
324
325                    for buf_ch in buffer.buffers.iter_mut() {
326                        buf_ch.clear();
327                        buf_ch.resize(self.window_size_frames, 0.0);
328                    }
329
330                    self.generation += 1;
331                    buffer.generation = self.generation;
332                }
333
334                producer.publish();
335
336                for tmp_ch in self.tmp_ring_buffer.iter_mut() {
337                    tmp_ch.clear();
338                    tmp_ch.resize(self.window_size_frames, 0.0);
339                }
340
341                self.ring_buf_ptr = 0;
342                self.prev_publish_was_silent = true;
343                self.num_silent_frames_in_tmp = self.window_size_frames;
344                self.tmp_buffer_needs_cleared = false;
345            }
346
347            return ProcessStatus::ClearAllOutputs;
348        }
349
350        let mut resized = false;
351        if self.tmp_ring_buffer[0].len() != self.window_size_frames {
352            let prev_window_size_frames = self.tmp_ring_buffer[0].len();
353
354            // Use the data in the triple buffer as a temporary scratch buffer.
355            let buffer = producer.input_buffer_mut();
356
357            let first_copy_frames = prev_window_size_frames - self.ring_buf_ptr;
358            let second_copy_frames = prev_window_size_frames - first_copy_frames;
359
360            for (buf_ch, tmp_ch) in buffer
361                .buffers
362                .iter_mut()
363                .zip(self.tmp_ring_buffer.iter_mut())
364            {
365                buf_ch.clear();
366
367                if first_copy_frames > 0 {
368                    buf_ch.extend_from_slice(
369                        &tmp_ch[self.ring_buf_ptr..self.ring_buf_ptr + first_copy_frames],
370                    );
371                }
372                if second_copy_frames > 0 {
373                    buf_ch.extend_from_slice(&tmp_ch[0..second_copy_frames]);
374                }
375
376                tmp_ch.clear();
377                if prev_window_size_frames >= self.window_size_frames {
378                    tmp_ch.extend_from_slice(
379                        &buf_ch[prev_window_size_frames - self.window_size_frames
380                            ..prev_window_size_frames],
381                    );
382                } else {
383                    tmp_ch.resize(self.window_size_frames - prev_window_size_frames, 0.0);
384                    tmp_ch.extend_from_slice(&buf_ch[0..prev_window_size_frames]);
385                }
386            }
387
388            self.ring_buf_ptr = 0;
389            self.num_silent_frames_in_tmp = 0;
390            resized = true;
391        }
392
393        let input_is_silent = info
394            .in_silence_mask
395            .all_channels_silent(buffers.inputs.len());
396        if input_is_silent {
397            self.num_silent_frames_in_tmp =
398                (self.num_silent_frames_in_tmp + info.frames).min(self.window_size_frames);
399        } else {
400            self.num_silent_frames_in_tmp = 0;
401        }
402
403        if self.num_silent_frames_in_tmp == self.window_size_frames
404            && self.prev_publish_was_silent
405            && !resized
406        {
407            // The previous publish already contained silence, so no need to publish again.
408            self.tmp_buffer_needs_cleared = true;
409            return ProcessStatus::ClearAllOutputs;
410        }
411
412        if info.frames >= self.window_size_frames {
413            // Just copy all the new data.
414            for (tmp_ch, in_ch) in self.tmp_ring_buffer.iter_mut().zip(buffers.inputs.iter()) {
415                tmp_ch[0..self.window_size_frames]
416                    .copy_from_slice(&in_ch[info.frames - self.window_size_frames..info.frames]);
417            }
418            self.ring_buf_ptr = 0;
419            self.tmp_buffer_needs_cleared = false;
420        } else {
421            if self.tmp_buffer_needs_cleared {
422                self.tmp_buffer_needs_cleared = false;
423
424                for tmp_ch in self.tmp_ring_buffer.iter_mut() {
425                    tmp_ch.clear();
426                    tmp_ch.resize(self.window_size_frames, 0.0);
427                }
428                self.ring_buf_ptr = 0;
429            }
430
431            let first_copy_frames = info.frames.min(self.window_size_frames - self.ring_buf_ptr);
432            let second_copy_frames = info.frames - first_copy_frames;
433
434            for (tmp_ch, in_ch) in self.tmp_ring_buffer.iter_mut().zip(buffers.inputs.iter()) {
435                if first_copy_frames > 0 {
436                    tmp_ch[self.ring_buf_ptr..self.ring_buf_ptr + first_copy_frames]
437                        .copy_from_slice(&in_ch[0..first_copy_frames]);
438                }
439
440                if second_copy_frames > 0 {
441                    tmp_ch[0..second_copy_frames].copy_from_slice(
442                        &in_ch[first_copy_frames..first_copy_frames + second_copy_frames],
443                    );
444                }
445            }
446
447            self.ring_buf_ptr = if second_copy_frames > 0 {
448                second_copy_frames
449            } else {
450                self.ring_buf_ptr + first_copy_frames
451            };
452        }
453
454        {
455            let buffer = producer.input_buffer_mut();
456
457            let first_copy_frames = self.window_size_frames - self.ring_buf_ptr;
458            let second_copy_frames = self.window_size_frames - first_copy_frames;
459
460            for (buf_ch, tmp_ch) in buffer.buffers.iter_mut().zip(self.tmp_ring_buffer.iter()) {
461                buf_ch.clear();
462
463                if first_copy_frames > 0 {
464                    buf_ch.extend_from_slice(
465                        &tmp_ch[self.ring_buf_ptr..self.ring_buf_ptr + first_copy_frames],
466                    );
467                }
468                if second_copy_frames > 0 {
469                    buf_ch.extend_from_slice(&tmp_ch[0..second_copy_frames]);
470                }
471            }
472
473            self.generation += 1;
474            buffer.generation = self.generation;
475        }
476
477        producer.publish();
478
479        self.prev_publish_was_silent = self.num_silent_frames_in_tmp == self.window_size_frames;
480
481        ProcessStatus::ClearAllOutputs
482    }
483
484    fn stream_stopped(&mut self, _context: &mut ProcStreamCtx) {
485        *self.active_state.lock().unwrap() = None;
486        self.producer = None;
487    }
488
489    fn new_stream(&mut self, stream_info: &StreamInfo, _context: &mut ProcStreamCtx) {
490        self.max_window_size_frames = self
491            .config
492            .max_window_size
493            .as_frames(stream_info.sample_rate) as usize;
494
495        self.window_size_frames = (self.params.window_size.as_frames(stream_info.sample_rate)
496            as usize)
497            .min(self.max_window_size_frames);
498
499        self.tmp_ring_buffer = (0..self.config.channels.get().get() as usize)
500            .map(|_| {
501                let mut v = Vec::new();
502                v.reserve_exact(self.max_window_size_frames);
503                v.resize(self.window_size_frames, 0.0);
504                v
505            })
506            .collect();
507        self.ring_buf_ptr = 0;
508        self.num_silent_frames_in_tmp = self.window_size_frames;
509        self.tmp_buffer_needs_cleared = false;
510        self.prev_publish_was_silent = true;
511
512        self.generation += 1;
513
514        let (producer, consumer) =
515            triple_buffer::triple_buffer::<TripleBufferData>(&TripleBufferData::new(
516                self.config.channels.get().get() as usize,
517                self.max_window_size_frames,
518                self.generation,
519            ));
520
521        *self.active_state.lock().unwrap() = Some(ActiveState {
522            consumer,
523            sample_rate: stream_info.sample_rate,
524        });
525
526        self.producer = Some(producer);
527    }
528}
529
530// A wrapper to ensure that the triple buffer uses `reserve_exact` when cloning
531// the initial buffers.
532struct TripleBufferData {
533    buffers: Vec<Vec<f32>>,
534    max_frames: usize,
535    generation: u64,
536}
537
538impl TripleBufferData {
539    fn new(num_channels: usize, max_frames: usize, generation: u64) -> Self {
540        let mut buffers = Vec::new();
541        buffers.reserve_exact(num_channels);
542
543        buffers = (0..num_channels)
544            .map(|_| {
545                let mut v = Vec::new();
546                v.reserve_exact(max_frames);
547                v.resize(max_frames, 0.0);
548                v
549            })
550            .collect();
551
552        Self {
553            buffers,
554            max_frames,
555            generation,
556        }
557    }
558}
559
560impl Clone for TripleBufferData {
561    fn clone(&self) -> Self {
562        Self::new(self.buffers.len(), self.max_frames, self.generation)
563    }
564}