1use bevy_platform::sync::{Arc, Mutex, MutexGuard};
2use core::num::NonZeroU32;
3use firewheel_core::{
4 channel_config::{ChannelConfig, ChannelCount, NonZeroChannelCount},
5 diff::{Diff, EventQueue, Patch, PatchError, PathBuilder},
6 event::{ParamData, ProcEvents},
7 node::{
8 AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers,
9 ProcExtra, ProcInfo, ProcStreamCtx, ProcessStatus,
10 },
11 StreamInfo,
12};
13
14#[cfg(not(feature = "std"))]
15use bevy_platform::prelude::Vec;
16#[cfg(not(feature = "std"))]
17use num_traits::Float;
18
19#[derive(Debug, Clone, Copy, PartialEq)]
21#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
22#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
23#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
24pub struct TripleBufferConfig {
25 pub channels: NonZeroChannelCount,
27 pub max_window_size: WindowSize,
29}
30
31impl Default for TripleBufferConfig {
32 fn default() -> Self {
33 Self {
34 channels: NonZeroChannelCount::STEREO,
35 max_window_size: WindowSize::default(),
36 }
37 }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq)]
42#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
43#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
44#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
45pub enum WindowSize {
46 Samples(u32),
49 Seconds(f64),
51}
52
53impl WindowSize {
54 pub fn as_frames(&self, sample_rate: NonZeroU32) -> u32 {
55 match self {
56 Self::Samples(samples) => *samples,
57 Self::Seconds(seconds) => (seconds * (sample_rate.get() as f64)).round() as u32,
58 }
59 }
60}
61
62impl Default for WindowSize {
63 fn default() -> Self {
64 Self::Samples(2048)
65 }
66}
67
68impl Diff for WindowSize {
69 fn diff<E: EventQueue>(&self, baseline: &Self, path: PathBuilder, event_queue: &mut E) {
70 if self != baseline {
71 match self {
72 WindowSize::Samples(samples) => event_queue.push_param(*samples, path),
73 WindowSize::Seconds(seconds) => event_queue.push_param(*seconds, path),
74 }
75 }
76 }
77}
78
79impl Patch for WindowSize {
80 type Patch = Self;
81
82 fn patch(data: &ParamData, _: &[u32]) -> Result<Self::Patch, PatchError> {
83 match data {
84 ParamData::U32(samples) => Ok(Self::Samples(*samples)),
85 ParamData::F64(seconds) => Ok(Self::Seconds(*seconds)),
86 _ => Err(PatchError::InvalidData),
87 }
88 }
89
90 fn apply(&mut self, value: Self::Patch) {
91 *self = value;
92 }
93}
94
95#[derive(Diff, Patch, Debug, Clone, Copy, PartialEq)]
99#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
100#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct TripleBufferNode {
103 pub window_size: WindowSize,
105 pub enabled: bool,
109}
110
111impl Default for TripleBufferNode {
112 fn default() -> Self {
113 Self {
114 window_size: WindowSize::default(),
115 enabled: true,
116 }
117 }
118}
119
120#[derive(Clone)]
121pub struct TripleBufferState {
122 num_channels: NonZeroChannelCount,
123 active_state: Arc<Mutex<Option<ActiveState>>>,
124}
125
126impl TripleBufferState {
127 pub fn num_channels(&self) -> NonZeroChannelCount {
129 self.num_channels
130 }
131
132 pub fn output<'a>(&'a mut self) -> OutputAudioData<'a> {
134 OutputAudioData {
135 guarded_state: self.active_state.lock().unwrap(),
136 }
137 }
138}
139
140struct ActiveState {
141 consumer: triple_buffer::Output<TripleBufferData>,
142 sample_rate: NonZeroU32,
143}
144
145pub struct OutputAudioData<'a> {
146 guarded_state: MutexGuard<'a, Option<ActiveState>>,
147}
148
149impl<'a> OutputAudioData<'a> {
150 pub fn is_active(&self) -> bool {
152 self.guarded_state.is_some()
153 }
154
155 pub fn sample_rate(&self) -> Option<NonZeroU32> {
159 self.guarded_state.as_ref().map(|s| s.sample_rate)
160 }
161
162 pub fn channels<'b>(&'b mut self) -> Option<&'b [Vec<f32>]> {
170 self.guarded_state
171 .as_mut()
172 .map(|s| s.consumer.read().buffers.as_slice())
173 }
174
175 pub fn channels_with_generation<'b>(&'b mut self) -> Option<(&'b [Vec<f32>], u64)> {
187 self.guarded_state.as_mut().map(|s| {
188 let data = s.consumer.read();
189 (data.buffers.as_slice(), data.generation)
190 })
191 }
192
193 pub fn peek_channels<'b>(&'b self) -> Option<&'b [Vec<f32>]> {
202 self.guarded_state
203 .as_ref()
204 .map(|s| s.consumer.peek_output_buffer().buffers.as_slice())
205 }
206}
207
208impl AudioNode for TripleBufferNode {
209 type Configuration = TripleBufferConfig;
210
211 fn info(&self, config: &Self::Configuration) -> AudioNodeInfo {
212 AudioNodeInfo::new()
213 .debug_name("triple_buffer")
214 .channel_config(ChannelConfig {
215 num_inputs: config.channels.get(),
216 num_outputs: ChannelCount::ZERO,
217 })
218 .custom_state(TripleBufferState {
219 num_channels: config.channels,
220 active_state: Arc::new(Mutex::new(None)),
221 })
222 }
223
224 fn construct_processor(
225 &self,
226 config: &Self::Configuration,
227 mut cx: ConstructProcessorContext,
228 ) -> impl AudioNodeProcessor {
229 let sample_rate = cx.stream_info.sample_rate;
230 let max_window_size_frames = config.max_window_size.as_frames(sample_rate) as usize;
231
232 let (producer, consumer) =
233 triple_buffer::triple_buffer::<TripleBufferData>(&TripleBufferData::new(
234 config.channels.get().get() as usize,
235 max_window_size_frames,
236 0,
237 ));
238
239 let state = cx.custom_state_mut::<TripleBufferState>().unwrap();
240
241 *state.active_state.lock().unwrap() = Some(ActiveState {
242 consumer,
243 sample_rate,
244 });
245 let active_state = Arc::clone(&state.active_state);
246
247 let window_size_frames =
248 (self.window_size.as_frames(sample_rate) as usize).min(max_window_size_frames);
249
250 let tmp_ring_buffer = (0..config.channels.get().get() as usize)
251 .map(|_| {
252 let mut v = Vec::new();
253 v.reserve_exact(max_window_size_frames);
254 v.resize(window_size_frames, 0.0);
255 v
256 })
257 .collect();
258
259 Processor {
260 producer: Some(producer),
261 config: *config,
262 max_window_size_frames,
263 params: *self,
264 window_size_frames,
265 tmp_ring_buffer,
266 ring_buf_ptr: 0,
267 active_state,
268 generation: 0,
269 prev_publish_was_silent: true,
270 num_silent_frames_in_tmp: window_size_frames,
271 tmp_buffer_needs_cleared: false,
272 }
273 }
274}
275
276struct Processor {
277 producer: Option<triple_buffer::Input<TripleBufferData>>,
278 config: TripleBufferConfig,
279 max_window_size_frames: usize,
280
281 params: TripleBufferNode,
282 window_size_frames: usize,
283
284 tmp_ring_buffer: Vec<Vec<f32>>,
285 ring_buf_ptr: usize,
286
287 active_state: Arc<Mutex<Option<ActiveState>>>,
289 generation: u64,
290
291 prev_publish_was_silent: bool,
292 num_silent_frames_in_tmp: usize,
293 tmp_buffer_needs_cleared: bool,
294}
295
296impl AudioNodeProcessor for Processor {
297 fn process(
298 &mut self,
299 info: &ProcInfo,
300 buffers: ProcBuffers,
301 events: &mut ProcEvents,
302 _extra: &mut ProcExtra,
303 ) -> ProcessStatus {
304 let was_enabled = self.params.enabled;
305
306 for patch in events.drain_patches::<TripleBufferNode>() {
307 match patch {
308 TripleBufferNodePatch::WindowSize(window_size) => {
309 self.window_size_frames = (window_size.as_frames(info.sample_rate) as usize)
310 .min(self.max_window_size_frames);
311 }
312 _ => {}
313 }
314
315 self.params.apply(patch);
316 }
317
318 let producer = self.producer.as_mut().unwrap();
319
320 if !self.params.enabled {
321 if was_enabled {
322 {
323 let buffer = producer.input_buffer_mut();
324
325 for buf_ch in buffer.buffers.iter_mut() {
326 buf_ch.clear();
327 buf_ch.resize(self.window_size_frames, 0.0);
328 }
329
330 self.generation += 1;
331 buffer.generation = self.generation;
332 }
333
334 producer.publish();
335
336 for tmp_ch in self.tmp_ring_buffer.iter_mut() {
337 tmp_ch.clear();
338 tmp_ch.resize(self.window_size_frames, 0.0);
339 }
340
341 self.ring_buf_ptr = 0;
342 self.prev_publish_was_silent = true;
343 self.num_silent_frames_in_tmp = self.window_size_frames;
344 self.tmp_buffer_needs_cleared = false;
345 }
346
347 return ProcessStatus::ClearAllOutputs;
348 }
349
350 let mut resized = false;
351 if self.tmp_ring_buffer[0].len() != self.window_size_frames {
352 let prev_window_size_frames = self.tmp_ring_buffer[0].len();
353
354 let buffer = producer.input_buffer_mut();
356
357 let first_copy_frames = prev_window_size_frames - self.ring_buf_ptr;
358 let second_copy_frames = prev_window_size_frames - first_copy_frames;
359
360 for (buf_ch, tmp_ch) in buffer
361 .buffers
362 .iter_mut()
363 .zip(self.tmp_ring_buffer.iter_mut())
364 {
365 buf_ch.clear();
366
367 if first_copy_frames > 0 {
368 buf_ch.extend_from_slice(
369 &tmp_ch[self.ring_buf_ptr..self.ring_buf_ptr + first_copy_frames],
370 );
371 }
372 if second_copy_frames > 0 {
373 buf_ch.extend_from_slice(&tmp_ch[0..second_copy_frames]);
374 }
375
376 tmp_ch.clear();
377 if prev_window_size_frames >= self.window_size_frames {
378 tmp_ch.extend_from_slice(
379 &buf_ch[prev_window_size_frames - self.window_size_frames
380 ..prev_window_size_frames],
381 );
382 } else {
383 tmp_ch.resize(self.window_size_frames - prev_window_size_frames, 0.0);
384 tmp_ch.extend_from_slice(&buf_ch[0..prev_window_size_frames]);
385 }
386 }
387
388 self.ring_buf_ptr = 0;
389 self.num_silent_frames_in_tmp = 0;
390 resized = true;
391 }
392
393 let input_is_silent = info
394 .in_silence_mask
395 .all_channels_silent(buffers.inputs.len());
396 if input_is_silent {
397 self.num_silent_frames_in_tmp =
398 (self.num_silent_frames_in_tmp + info.frames).min(self.window_size_frames);
399 } else {
400 self.num_silent_frames_in_tmp = 0;
401 }
402
403 if self.num_silent_frames_in_tmp == self.window_size_frames
404 && self.prev_publish_was_silent
405 && !resized
406 {
407 self.tmp_buffer_needs_cleared = true;
409 return ProcessStatus::ClearAllOutputs;
410 }
411
412 if info.frames >= self.window_size_frames {
413 for (tmp_ch, in_ch) in self.tmp_ring_buffer.iter_mut().zip(buffers.inputs.iter()) {
415 tmp_ch[0..self.window_size_frames]
416 .copy_from_slice(&in_ch[info.frames - self.window_size_frames..info.frames]);
417 }
418 self.ring_buf_ptr = 0;
419 self.tmp_buffer_needs_cleared = false;
420 } else {
421 if self.tmp_buffer_needs_cleared {
422 self.tmp_buffer_needs_cleared = false;
423
424 for tmp_ch in self.tmp_ring_buffer.iter_mut() {
425 tmp_ch.clear();
426 tmp_ch.resize(self.window_size_frames, 0.0);
427 }
428 self.ring_buf_ptr = 0;
429 }
430
431 let first_copy_frames = info.frames.min(self.window_size_frames - self.ring_buf_ptr);
432 let second_copy_frames = info.frames - first_copy_frames;
433
434 for (tmp_ch, in_ch) in self.tmp_ring_buffer.iter_mut().zip(buffers.inputs.iter()) {
435 if first_copy_frames > 0 {
436 tmp_ch[self.ring_buf_ptr..self.ring_buf_ptr + first_copy_frames]
437 .copy_from_slice(&in_ch[0..first_copy_frames]);
438 }
439
440 if second_copy_frames > 0 {
441 tmp_ch[0..second_copy_frames].copy_from_slice(
442 &in_ch[first_copy_frames..first_copy_frames + second_copy_frames],
443 );
444 }
445 }
446
447 self.ring_buf_ptr = if second_copy_frames > 0 {
448 second_copy_frames
449 } else {
450 self.ring_buf_ptr + first_copy_frames
451 };
452 }
453
454 {
455 let buffer = producer.input_buffer_mut();
456
457 let first_copy_frames = self.window_size_frames - self.ring_buf_ptr;
458 let second_copy_frames = self.window_size_frames - first_copy_frames;
459
460 for (buf_ch, tmp_ch) in buffer.buffers.iter_mut().zip(self.tmp_ring_buffer.iter()) {
461 buf_ch.clear();
462
463 if first_copy_frames > 0 {
464 buf_ch.extend_from_slice(
465 &tmp_ch[self.ring_buf_ptr..self.ring_buf_ptr + first_copy_frames],
466 );
467 }
468 if second_copy_frames > 0 {
469 buf_ch.extend_from_slice(&tmp_ch[0..second_copy_frames]);
470 }
471 }
472
473 self.generation += 1;
474 buffer.generation = self.generation;
475 }
476
477 producer.publish();
478
479 self.prev_publish_was_silent = self.num_silent_frames_in_tmp == self.window_size_frames;
480
481 ProcessStatus::ClearAllOutputs
482 }
483
484 fn stream_stopped(&mut self, _context: &mut ProcStreamCtx) {
485 *self.active_state.lock().unwrap() = None;
486 self.producer = None;
487 }
488
489 fn new_stream(&mut self, stream_info: &StreamInfo, _context: &mut ProcStreamCtx) {
490 self.max_window_size_frames = self
491 .config
492 .max_window_size
493 .as_frames(stream_info.sample_rate) as usize;
494
495 self.window_size_frames = (self.params.window_size.as_frames(stream_info.sample_rate)
496 as usize)
497 .min(self.max_window_size_frames);
498
499 self.tmp_ring_buffer = (0..self.config.channels.get().get() as usize)
500 .map(|_| {
501 let mut v = Vec::new();
502 v.reserve_exact(self.max_window_size_frames);
503 v.resize(self.window_size_frames, 0.0);
504 v
505 })
506 .collect();
507 self.ring_buf_ptr = 0;
508 self.num_silent_frames_in_tmp = self.window_size_frames;
509 self.tmp_buffer_needs_cleared = false;
510 self.prev_publish_was_silent = true;
511
512 self.generation += 1;
513
514 let (producer, consumer) =
515 triple_buffer::triple_buffer::<TripleBufferData>(&TripleBufferData::new(
516 self.config.channels.get().get() as usize,
517 self.max_window_size_frames,
518 self.generation,
519 ));
520
521 *self.active_state.lock().unwrap() = Some(ActiveState {
522 consumer,
523 sample_rate: stream_info.sample_rate,
524 });
525
526 self.producer = Some(producer);
527 }
528}
529
530struct TripleBufferData {
533 buffers: Vec<Vec<f32>>,
534 max_frames: usize,
535 generation: u64,
536}
537
538impl TripleBufferData {
539 fn new(num_channels: usize, max_frames: usize, generation: u64) -> Self {
540 let mut buffers = Vec::new();
541 buffers.reserve_exact(num_channels);
542
543 buffers = (0..num_channels)
544 .map(|_| {
545 let mut v = Vec::new();
546 v.reserve_exact(max_frames);
547 v.resize(max_frames, 0.0);
548 v
549 })
550 .collect();
551
552 Self {
553 buffers,
554 max_frames,
555 generation,
556 }
557 }
558}
559
560impl Clone for TripleBufferData {
561 fn clone(&self) -> Self {
562 Self::new(self.buffers.len(), self.max_frames, self.generation)
563 }
564}