1use bevy_platform::sync::{
2 atomic::{AtomicBool, Ordering},
3 Arc, Mutex,
4};
5use core::{
6 num::{NonZeroU32, NonZeroUsize},
7 ops::Range,
8};
9
10use firewheel_core::{
11 channel_config::{ChannelConfig, ChannelCount, NonZeroChannelCount},
12 collector::ArcGc,
13 dsp::declick::{DeclickFadeCurve, Declicker},
14 event::{NodeEventType, ProcEvents},
15 mask::{MaskType, SilenceMask},
16 node::{
17 AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers,
18 ProcExtra, ProcInfo, ProcStreamCtx, ProcessStatus,
19 },
20};
21use fixed_resample::{ReadStatus, ResamplingChannelConfig};
22
23pub use fixed_resample::PushStatus;
24
25pub const MAX_CHANNELS: usize = 16;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
30#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct StreamWriterConfig {
33 pub channels: NonZeroChannelCount,
35
36 pub check_for_silence: bool,
42}
43
44impl Default for StreamWriterConfig {
45 fn default() -> Self {
46 Self {
47 channels: NonZeroChannelCount::STEREO,
48 check_for_silence: true,
49 }
50 }
51}
52
53#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
56#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
57#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59pub struct StreamWriterNode;
60
61#[derive(Clone)]
62pub struct StreamWriterState {
63 channels: NonZeroChannelCount,
64 active_state: Option<ActiveState>,
65 shared_state: ArcGc<SharedState>,
66}
67
68impl StreamWriterState {
69 fn new(channels: NonZeroChannelCount) -> Self {
70 Self {
71 channels,
72 active_state: None,
73 shared_state: ArcGc::new(SharedState::new()),
74 }
75 }
76
77 pub fn is_active(&self) -> bool {
79 self.active_state.is_some() && self.shared_state.stream_active.load(Ordering::Relaxed)
80 }
81
82 pub fn underflow_occurred(&self) -> bool {
91 self.shared_state
92 .underflow_occurred
93 .swap(false, Ordering::Relaxed)
94 }
95
96 pub fn overflow_occurred(&self) -> bool {
107 self.shared_state
108 .overflow_occurred
109 .swap(false, Ordering::Relaxed)
110 }
111
112 pub fn available_frames(&self) -> usize {
117 if self.is_ready() {
118 self.active_state
119 .as_ref()
120 .map(|s| s.prod.lock().unwrap().available_frames())
121 .unwrap_or(0)
122 } else {
123 0
124 }
125 }
126
127 pub fn occupied_seconds(&self) -> Option<f64> {
133 self.active_state
134 .as_ref()
135 .map(|s| s.prod.lock().unwrap().occupied_seconds())
136 }
137
138 pub fn num_channels(&self) -> NonZeroChannelCount {
140 self.channels
141 }
142
143 pub fn sample_rate(&self) -> Option<NonZeroU32> {
147 self.active_state.as_ref().map(|s| s.sample_rate)
148 }
149
150 pub fn start_stream(
161 &mut self,
162 sample_rate: NonZeroU32,
163 output_stream_sample_rate: NonZeroU32,
164 channel_config: ResamplingChannelConfig,
165 ) -> Result<NewInputStreamEvent, ()> {
166 if self.is_active() {
167 return Err(());
168 }
169
170 self.shared_state.reset();
171
172 let (prod, cons) = fixed_resample::resampling_channel::<f32, MAX_CHANNELS>(
173 NonZeroUsize::new(self.channels.get().get() as usize).unwrap(),
174 sample_rate.get(),
175 output_stream_sample_rate.get(),
176 channel_config,
177 );
178
179 self.active_state = Some(ActiveState {
180 prod: Arc::new(Mutex::new(prod)),
181 sample_rate,
182 });
183 self.shared_state
184 .stream_active
185 .store(true, Ordering::Relaxed);
186
187 Ok(NewInputStreamEvent { cons: Some(cons) })
188 }
189
190 pub fn push_interleaved(&mut self, data: &[f32]) -> PushStatus {
200 if !self.is_ready() {
201 return PushStatus::OutputNotReady;
202 }
203
204 self.active_state
205 .as_mut()
206 .unwrap()
207 .prod
208 .lock()
209 .unwrap()
210 .push_interleaved(data)
211 }
212
213 pub fn push<Vin: AsRef<[f32]>>(&mut self, data: &[Vin], range: Range<usize>) -> PushStatus {
226 if !self.is_ready() {
227 return PushStatus::OutputNotReady;
228 }
229
230 self.active_state
231 .as_mut()
232 .unwrap()
233 .prod
234 .lock()
235 .unwrap()
236 .push(data, range)
237 }
238
239 pub fn is_ready(&self) -> bool {
242 self.active_state.is_some()
243 && self.shared_state.channel_started.load(Ordering::Relaxed)
244 && !self.shared_state.paused.load(Ordering::Relaxed)
245 }
246
247 pub fn pause_stream(&mut self) {
249 if self.is_active() {
250 self.shared_state.paused.store(true, Ordering::Relaxed);
251 }
252 }
253
254 pub fn autocorrect_underflows(&mut self) -> Option<usize> {
268 if let Some(state) = &mut self.active_state {
269 state.prod.lock().unwrap().autocorrect_underflows()
270 } else {
271 None
272 }
273 }
274
275 pub fn resume(&mut self) {
277 self.shared_state.paused.store(false, Ordering::Relaxed);
278 }
279
280 pub fn stop_stream(&mut self) {
282 self.active_state = None;
283 self.shared_state.reset();
284 }
285
286 pub fn handle(&self) -> Mutex<Self> {
287 Mutex::new((*self).clone())
288 }
289}
290
291impl Drop for StreamWriterState {
292 fn drop(&mut self) {
293 self.stop_stream();
294 }
295}
296
297impl AudioNode for StreamWriterNode {
298 type Configuration = StreamWriterConfig;
299
300 fn info(&self, config: &Self::Configuration) -> AudioNodeInfo {
301 AudioNodeInfo::new()
302 .debug_name("stream_writer")
303 .channel_config(ChannelConfig {
304 num_inputs: ChannelCount::ZERO,
305 num_outputs: config.channels.get(),
306 })
307 .custom_state(StreamWriterState::new(config.channels))
308 }
309
310 fn construct_processor(
311 &self,
312 config: &Self::Configuration,
313 cx: ConstructProcessorContext,
314 ) -> impl AudioNodeProcessor {
315 Processor {
316 cons: None,
317 shared_state: ArcGc::clone(
318 &cx.custom_state::<StreamWriterState>().unwrap().shared_state,
319 ),
320 check_for_silence: config.check_for_silence,
321 pause_declicker: Declicker::SettledAt0,
322 }
323 }
324}
325
326#[derive(Clone)]
327struct ActiveState {
328 prod: Arc<Mutex<fixed_resample::ResamplingProd<f32, MAX_CHANNELS>>>,
329 sample_rate: NonZeroU32,
330}
331
332struct SharedState {
333 stream_active: AtomicBool,
334 channel_started: AtomicBool,
335 paused: AtomicBool,
336 underflow_occurred: AtomicBool,
337 overflow_occurred: AtomicBool,
338}
339
340impl SharedState {
341 fn new() -> Self {
342 Self {
343 stream_active: AtomicBool::new(false),
344 channel_started: AtomicBool::new(false),
345 paused: AtomicBool::new(false),
346 underflow_occurred: AtomicBool::new(false),
347 overflow_occurred: AtomicBool::new(false),
348 }
349 }
350
351 fn reset(&self) {
352 self.stream_active.store(false, Ordering::Relaxed);
353 self.channel_started.store(false, Ordering::Relaxed);
354 self.paused.store(false, Ordering::Relaxed);
355 self.underflow_occurred.store(false, Ordering::Relaxed);
356 self.overflow_occurred.store(false, Ordering::Relaxed);
357 }
358}
359
360struct Processor {
361 cons: Option<fixed_resample::ResamplingCons<f32>>,
362 shared_state: ArcGc<SharedState>,
363 check_for_silence: bool,
364 pause_declicker: Declicker,
365}
366
367impl AudioNodeProcessor for Processor {
368 fn process(
369 &mut self,
370 info: &ProcInfo,
371 buffers: ProcBuffers,
372 events: &mut ProcEvents,
373 extra: &mut ProcExtra,
374 ) -> ProcessStatus {
375 for mut event in events.drain() {
376 if let Some(in_stream_event) = event.downcast_mut::<NewInputStreamEvent>() {
377 core::mem::swap(&mut self.cons, &mut in_stream_event.cons);
380 }
381 }
382
383 let enabled = self.shared_state.stream_active.load(Ordering::Relaxed)
384 && !self.shared_state.paused.load(Ordering::Relaxed);
385
386 self.pause_declicker
387 .fade_to_enabled(enabled, &extra.declick_values);
388
389 if self.pause_declicker.disabled() {
390 return ProcessStatus::ClearAllOutputs;
391 }
392
393 let Some(cons) = &mut self.cons else {
394 self.pause_declicker.reset_to_0();
395 return ProcessStatus::ClearAllOutputs;
396 };
397
398 self.shared_state
401 .channel_started
402 .store(true, Ordering::Relaxed);
403
404 let status = cons.read(buffers.outputs, 0..info.frames);
405
406 match status {
407 ReadStatus::UnderflowOccurred { num_frames_read: _ } => {
408 self.shared_state
409 .underflow_occurred
410 .store(true, Ordering::Relaxed);
411 }
412 ReadStatus::OverflowCorrected {
413 num_frames_discarded: _,
414 } => {
415 self.shared_state
416 .overflow_occurred
417 .store(true, Ordering::Relaxed);
418 }
419 _ => {}
420 }
421
422 if !self.pause_declicker.has_settled() {
423 self.pause_declicker.process(
424 buffers.outputs,
425 0..info.frames,
426 &extra.declick_values,
427 1.0,
428 DeclickFadeCurve::EqualPower3dB,
429 );
430 }
431
432 let mut silence_mask = SilenceMask::NONE_SILENT;
433 if self.check_for_silence {
434 let resampler_channels = cons.num_channels().get();
435
436 for (ch_i, ch) in buffers.outputs.iter().enumerate() {
437 if ch_i >= resampler_channels {
438 silence_mask.set_channel(ch_i, true);
440 } else {
441 let mut all_silent = true;
442 for &s in ch[..info.frames].iter() {
443 if s != 0.0 {
444 all_silent = false;
445 break;
446 }
447 }
448
449 if all_silent {
450 silence_mask.set_channel(ch_i, true);
451 }
452 }
453 }
454 }
455
456 ProcessStatus::OutputsModifiedWithMask(MaskType::Silence(silence_mask))
457 }
458
459 fn stream_stopped(&mut self, _context: &mut ProcStreamCtx) {
460 self.shared_state
461 .stream_active
462 .store(false, Ordering::Relaxed);
463 self.cons = None;
464 self.pause_declicker.reset_to_0();
465 }
466}
467
468pub struct NewInputStreamEvent {
469 cons: Option<fixed_resample::ResamplingCons<f32>>,
470}
471
472impl From<NewInputStreamEvent> for NodeEventType {
473 fn from(value: NewInputStreamEvent) -> Self {
474 NodeEventType::custom(value)
475 }
476}