1use bevy_platform::sync::{
2 atomic::{AtomicBool, Ordering},
3 Arc, Mutex,
4};
5use core::{
6 num::{NonZeroU32, NonZeroUsize},
7 ops::Range,
8};
9
10use firewheel_core::{
11 channel_config::{ChannelConfig, ChannelCount, NonZeroChannelCount},
12 collector::ArcGc,
13 event::{NodeEventType, ProcEvents},
14 node::{
15 AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers,
16 ProcExtra, ProcInfo, ProcStreamCtx, ProcessStatus,
17 },
18};
19use fixed_resample::{PushStatus, ReadStatus, ResamplingChannelConfig};
20
21pub const MAX_CHANNELS: usize = 16;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
26#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
27#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
28pub struct StreamReaderConfig {
29 pub channels: NonZeroChannelCount,
31}
32
33impl Default for StreamReaderConfig {
34 fn default() -> Self {
35 Self {
36 channels: NonZeroChannelCount::STEREO,
37 }
38 }
39}
40
41#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
44#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
45#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
46#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
47pub struct StreamReaderNode;
48
49#[derive(Clone)]
50pub struct StreamReaderState {
51 channels: NonZeroChannelCount,
52 active_state: Option<ActiveState>,
53 shared_state: ArcGc<SharedState>,
54}
55
56impl StreamReaderState {
57 pub fn new(channels: NonZeroChannelCount) -> Self {
58 assert!((channels.get().get() as usize) < MAX_CHANNELS);
59
60 Self {
61 channels,
62 active_state: None,
63 shared_state: ArcGc::new(SharedState::new()),
64 }
65 }
66
67 pub fn is_active(&self) -> bool {
69 self.active_state.is_some() && self.shared_state.stream_active.load(Ordering::Relaxed)
70 }
71
72 pub fn underflow_occurred(&self) -> bool {
81 self.shared_state
82 .underflow_occurred
83 .swap(false, Ordering::Relaxed)
84 }
85
86 pub fn overflow_occurred(&self) -> bool {
97 self.shared_state
98 .overflow_occurred
99 .swap(false, Ordering::Relaxed)
100 }
101
102 pub fn start_stream(
113 &mut self,
114 sample_rate: NonZeroU32,
115 output_stream_sample_rate: NonZeroU32,
116 channel_config: ResamplingChannelConfig,
117 ) -> Result<NewOutputStreamEvent, ()> {
118 if self.is_active() {
119 return Err(());
120 }
121
122 self.shared_state.reset();
123
124 let (prod, cons) = fixed_resample::resampling_channel::<f32, MAX_CHANNELS>(
125 NonZeroUsize::new(self.channels.get().get() as usize).unwrap(),
126 output_stream_sample_rate.get(),
127 sample_rate.get(),
128 channel_config,
129 );
130
131 self.active_state = Some(ActiveState {
132 cons: Arc::new(Mutex::new(cons)),
133 sample_rate,
134 });
135 self.shared_state
136 .stream_active
137 .store(true, Ordering::Relaxed);
138
139 Ok(NewOutputStreamEvent { prod: Some(prod) })
140 }
141
142 pub fn available_frames(&self) -> usize {
148 if self.is_ready() {
149 self.active_state
150 .as_ref()
151 .map(|s| s.cons.lock().unwrap().available_frames())
152 .unwrap_or(0)
153 } else {
154 0
155 }
156 }
157
158 pub fn occupied_seconds(&self) -> Option<f64> {
167 self.active_state
168 .as_ref()
169 .map(|s| s.cons.lock().unwrap().occupied_seconds())
170 }
171
172 pub fn num_channels(&self) -> NonZeroChannelCount {
174 self.channels
175 }
176
177 pub fn sample_rate(&self) -> Option<NonZeroU32> {
181 self.active_state.as_ref().map(|s| s.sample_rate)
182 }
183
184 pub fn read_interleaved(&mut self, output: &mut [f32]) -> Option<ReadStatus> {
191 if !self.is_ready() {
192 output.fill(0.0);
193 return None;
194 }
195
196 Some(
197 self.active_state
198 .as_mut()
199 .unwrap()
200 .cons
201 .lock()
202 .unwrap()
203 .read_interleaved(output),
204 )
205 }
206
207 pub fn read<Vin: AsMut<[f32]>>(
217 &mut self,
218 output: &mut [Vin],
219 range: Range<usize>,
220 ) -> Option<ReadStatus> {
221 if !self.is_ready() {
222 for ch in output.iter_mut() {
223 ch.as_mut()[range.clone()].fill(0.0);
224 }
225 return None;
226 }
227
228 Some(
229 self.active_state
230 .as_mut()
231 .unwrap()
232 .cons
233 .lock()
234 .unwrap()
235 .read(output, range),
236 )
237 }
238
239 pub fn discard_frames(&mut self) -> usize {
247 if let Some(state) = &mut self.active_state {
248 state.cons.lock().unwrap().discard_frames(usize::MAX)
249 } else {
250 0
251 }
252 }
253
254 pub fn autocorrect_overflows(&mut self) -> Option<usize> {
268 if let Some(state) = &mut self.active_state {
269 state.cons.lock().unwrap().autocorrect_overflows()
270 } else {
271 None
272 }
273 }
274
275 pub fn is_ready(&self) -> bool {
278 self.active_state.is_some()
279 && self.shared_state.channel_started.load(Ordering::Relaxed)
280 && !self.shared_state.paused.load(Ordering::Relaxed)
281 }
282
283 pub fn pause_stream(&mut self) {
285 if self.is_active() {
286 self.shared_state.paused.store(true, Ordering::Relaxed);
287 }
288 }
289
290 pub fn resume(&mut self) {
292 self.shared_state.paused.store(false, Ordering::Relaxed);
293 }
294
295 pub fn stop_stream(&mut self) {
297 self.active_state = None;
298 self.shared_state.reset();
299 }
300
301 pub fn handle(&self) -> Mutex<Self> {
302 Mutex::new((*self).clone())
303 }
304}
305
306impl Drop for StreamReaderState {
307 fn drop(&mut self) {
308 self.stop_stream();
309 }
310}
311
312impl AudioNode for StreamReaderNode {
313 type Configuration = StreamReaderConfig;
314
315 fn info(&self, config: &Self::Configuration) -> AudioNodeInfo {
316 AudioNodeInfo::new()
317 .debug_name("stream_reader")
318 .channel_config(ChannelConfig {
319 num_inputs: config.channels.get(),
320 num_outputs: ChannelCount::ZERO,
321 })
322 .custom_state(StreamReaderState::new(config.channels))
323 }
324
325 fn construct_processor(
326 &self,
327 _config: &Self::Configuration,
328 cx: ConstructProcessorContext,
329 ) -> impl AudioNodeProcessor {
330 Processor {
331 prod: None,
332 shared_state: ArcGc::clone(
333 &cx.custom_state::<StreamReaderState>().unwrap().shared_state,
334 ),
335 }
336 }
337}
338
339#[derive(Clone)]
340struct ActiveState {
341 cons: Arc<Mutex<fixed_resample::ResamplingCons<f32>>>,
342 sample_rate: NonZeroU32,
343}
344
345struct SharedState {
346 stream_active: AtomicBool,
347 channel_started: AtomicBool,
348 paused: AtomicBool,
349 underflow_occurred: AtomicBool,
350 overflow_occurred: AtomicBool,
351}
352
353impl SharedState {
354 fn new() -> Self {
355 Self {
356 stream_active: AtomicBool::new(false),
357 channel_started: AtomicBool::new(false),
358 paused: AtomicBool::new(false),
359 underflow_occurred: AtomicBool::new(false),
360 overflow_occurred: AtomicBool::new(false),
361 }
362 }
363
364 fn reset(&self) {
365 self.stream_active.store(false, Ordering::Relaxed);
366 self.channel_started.store(false, Ordering::Relaxed);
367 self.paused.store(false, Ordering::Relaxed);
368 self.underflow_occurred.store(false, Ordering::Relaxed);
369 self.overflow_occurred.store(false, Ordering::Relaxed);
370 }
371}
372
373struct Processor {
374 prod: Option<fixed_resample::ResamplingProd<f32, MAX_CHANNELS>>,
375 shared_state: ArcGc<SharedState>,
376}
377
378impl AudioNodeProcessor for Processor {
379 fn process(
380 &mut self,
381 info: &ProcInfo,
382 buffers: ProcBuffers,
383 events: &mut ProcEvents,
384 _extra: &mut ProcExtra,
385 ) -> ProcessStatus {
386 for mut event in events.drain() {
387 if let Some(out_stream_event) = event.downcast_mut::<NewOutputStreamEvent>() {
388 core::mem::swap(&mut self.prod, &mut out_stream_event.prod);
391 }
392 }
393
394 if !self.shared_state.stream_active.load(Ordering::Relaxed)
395 || self.shared_state.paused.load(Ordering::Relaxed)
396 {
397 return ProcessStatus::Bypass;
398 }
399
400 let Some(prod) = &mut self.prod else {
401 return ProcessStatus::Bypass;
402 };
403
404 self.shared_state
407 .channel_started
408 .store(true, Ordering::Relaxed);
409
410 let status = prod.push(buffers.inputs, 0..info.frames);
411
412 match status {
413 PushStatus::OverflowOccurred {
414 num_frames_pushed: _,
415 } => {
416 self.shared_state
417 .overflow_occurred
418 .store(true, Ordering::Relaxed);
419 }
420 PushStatus::UnderflowCorrected {
421 num_zero_frames_pushed: _,
422 } => {
423 self.shared_state
424 .underflow_occurred
425 .store(true, Ordering::Relaxed);
426 }
427 _ => {}
428 }
429
430 ProcessStatus::Bypass
431 }
432
433 fn stream_stopped(&mut self, _context: &mut ProcStreamCtx) {
434 self.shared_state
435 .stream_active
436 .store(false, Ordering::Relaxed);
437 self.prod = None;
438 }
439}
440
441pub struct NewOutputStreamEvent {
442 prod: Option<fixed_resample::ResamplingProd<f32, MAX_CHANNELS>>,
443}
444
445impl From<NewOutputStreamEvent> for NodeEventType {
446 fn from(value: NewOutputStreamEvent) -> Self {
447 NodeEventType::custom(value)
448 }
449}