1use bevy_platform::sync::{
2 atomic::{AtomicBool, Ordering},
3 Arc, Mutex,
4};
5use core::{
6 num::{NonZeroU32, NonZeroUsize},
7 ops::Range,
8};
9
10use firewheel_core::{
11 channel_config::{ChannelConfig, ChannelCount, NonZeroChannelCount},
12 collector::ArcGc,
13 event::{NodeEventType, ProcEvents},
14 log::RealtimeLogger,
15 node::{
16 AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers,
17 ProcExtra, ProcInfo, ProcessStatus,
18 },
19};
20use fixed_resample::{PushStatus, ReadStatus, ResamplingChannelConfig};
21
22pub const MAX_CHANNELS: usize = 16;
23
24#[derive(Debug, Clone, Copy, PartialEq)]
25#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
26#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
27pub struct StreamReaderConfig {
28 pub channels: NonZeroChannelCount,
30}
31
32impl Default for StreamReaderConfig {
33 fn default() -> Self {
34 Self {
35 channels: NonZeroChannelCount::STEREO,
36 }
37 }
38}
39
40#[derive(Default, Debug, Clone, Copy)]
41#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]
42#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
43pub struct StreamReaderNode;
44
45#[derive(Clone)]
46pub struct StreamReaderState {
47 channels: NonZeroChannelCount,
48 active_state: Option<ActiveState>,
49 shared_state: ArcGc<SharedState>,
50}
51
52impl StreamReaderState {
53 pub fn new(channels: NonZeroChannelCount) -> Self {
54 assert!((channels.get().get() as usize) < MAX_CHANNELS);
55
56 Self {
57 channels,
58 active_state: None,
59 shared_state: ArcGc::new(SharedState::new()),
60 }
61 }
62
63 pub fn is_active(&self) -> bool {
65 self.active_state.is_some() && self.shared_state.stream_active.load(Ordering::Relaxed)
66 }
67
68 pub fn underflow_occurred(&self) -> bool {
77 self.shared_state
78 .underflow_occurred
79 .swap(false, Ordering::Relaxed)
80 }
81
82 pub fn overflow_occurred(&self) -> bool {
93 self.shared_state
94 .overflow_occurred
95 .swap(false, Ordering::Relaxed)
96 }
97
98 pub fn start_stream(
109 &mut self,
110 sample_rate: NonZeroU32,
111 output_stream_sample_rate: NonZeroU32,
112 channel_config: ResamplingChannelConfig,
113 ) -> Result<NewOutputStreamEvent, ()> {
114 if self.is_active() {
115 return Err(());
116 }
117
118 self.shared_state.reset();
119
120 let (prod, cons) = fixed_resample::resampling_channel::<f32, MAX_CHANNELS>(
121 NonZeroUsize::new(self.channels.get().get() as usize).unwrap(),
122 output_stream_sample_rate.get(),
123 sample_rate.get(),
124 channel_config,
125 );
126
127 self.active_state = Some(ActiveState {
128 cons: Arc::new(Mutex::new(cons)),
129 sample_rate,
130 });
131 self.shared_state
132 .stream_active
133 .store(true, Ordering::Relaxed);
134
135 Ok(NewOutputStreamEvent { prod: Some(prod) })
136 }
137
138 pub fn available_frames(&self) -> usize {
144 if self.is_ready() {
145 self.active_state
146 .as_ref()
147 .map(|s| s.cons.lock().unwrap().available_frames())
148 .unwrap_or(0)
149 } else {
150 0
151 }
152 }
153
154 pub fn occupied_seconds(&self) -> Option<f64> {
163 self.active_state
164 .as_ref()
165 .map(|s| s.cons.lock().unwrap().occupied_seconds())
166 }
167
168 pub fn num_channels(&self) -> NonZeroChannelCount {
170 self.channels
171 }
172
173 pub fn sample_rate(&self) -> Option<NonZeroU32> {
177 self.active_state.as_ref().map(|s| s.sample_rate)
178 }
179
180 pub fn read_interleaved(&mut self, output: &mut [f32]) -> Option<ReadStatus> {
187 if !self.is_ready() {
188 output.fill(0.0);
189 return None;
190 }
191
192 Some(
193 self.active_state
194 .as_mut()
195 .unwrap()
196 .cons
197 .lock()
198 .unwrap()
199 .read_interleaved(output),
200 )
201 }
202
203 pub fn read<Vin: AsMut<[f32]>>(
213 &mut self,
214 output: &mut [Vin],
215 range: Range<usize>,
216 ) -> Option<ReadStatus> {
217 if !self.is_ready() {
218 for ch in output.iter_mut() {
219 ch.as_mut()[range.clone()].fill(0.0);
220 }
221 return None;
222 }
223
224 Some(
225 self.active_state
226 .as_mut()
227 .unwrap()
228 .cons
229 .lock()
230 .unwrap()
231 .read(output, range),
232 )
233 }
234
235 pub fn discard_frames(&mut self) -> usize {
243 if let Some(state) = &mut self.active_state {
244 state.cons.lock().unwrap().discard_frames(usize::MAX)
245 } else {
246 0
247 }
248 }
249
250 pub fn autocorrect_overflows(&mut self) -> Option<usize> {
264 if let Some(state) = &mut self.active_state {
265 state.cons.lock().unwrap().autocorrect_overflows()
266 } else {
267 None
268 }
269 }
270
271 pub fn is_ready(&self) -> bool {
274 self.active_state.is_some()
275 && self.shared_state.channel_started.load(Ordering::Relaxed)
276 && !self.shared_state.paused.load(Ordering::Relaxed)
277 }
278
279 pub fn pause_stream(&mut self) {
281 if self.is_active() {
282 self.shared_state.paused.store(true, Ordering::Relaxed);
283 }
284 }
285
286 pub fn resume(&mut self) {
288 self.shared_state.paused.store(false, Ordering::Relaxed);
289 }
290
291 pub fn stop_stream(&mut self) {
293 self.active_state = None;
294 self.shared_state.reset();
295 }
296
297 pub fn handle(&self) -> Mutex<Self> {
298 Mutex::new((*self).clone())
299 }
300}
301
302impl Drop for StreamReaderState {
303 fn drop(&mut self) {
304 self.stop_stream();
305 }
306}
307
308impl AudioNode for StreamReaderNode {
309 type Configuration = StreamReaderConfig;
310
311 fn info(&self, config: &Self::Configuration) -> AudioNodeInfo {
312 AudioNodeInfo::new()
313 .debug_name("stream_reader")
314 .channel_config(ChannelConfig {
315 num_inputs: config.channels.get(),
316 num_outputs: ChannelCount::ZERO,
317 })
318 .custom_state(StreamReaderState::new(config.channels))
319 }
320
321 fn construct_processor(
322 &self,
323 _config: &Self::Configuration,
324 cx: ConstructProcessorContext,
325 ) -> impl AudioNodeProcessor {
326 Processor {
327 prod: None,
328 shared_state: ArcGc::clone(
329 &cx.custom_state::<StreamReaderState>().unwrap().shared_state,
330 ),
331 }
332 }
333}
334
335#[derive(Clone)]
336struct ActiveState {
337 cons: Arc<Mutex<fixed_resample::ResamplingCons<f32>>>,
338 sample_rate: NonZeroU32,
339}
340
341struct SharedState {
342 stream_active: AtomicBool,
343 channel_started: AtomicBool,
344 paused: AtomicBool,
345 underflow_occurred: AtomicBool,
346 overflow_occurred: AtomicBool,
347}
348
349impl SharedState {
350 fn new() -> Self {
351 Self {
352 stream_active: AtomicBool::new(false),
353 channel_started: AtomicBool::new(false),
354 paused: AtomicBool::new(false),
355 underflow_occurred: AtomicBool::new(false),
356 overflow_occurred: AtomicBool::new(false),
357 }
358 }
359
360 fn reset(&self) {
361 self.stream_active.store(false, Ordering::Relaxed);
362 self.channel_started.store(false, Ordering::Relaxed);
363 self.paused.store(false, Ordering::Relaxed);
364 self.underflow_occurred.store(false, Ordering::Relaxed);
365 self.overflow_occurred.store(false, Ordering::Relaxed);
366 }
367}
368
369struct Processor {
370 prod: Option<fixed_resample::ResamplingProd<f32, MAX_CHANNELS>>,
371 shared_state: ArcGc<SharedState>,
372}
373
374impl AudioNodeProcessor for Processor {
375 fn process(
376 &mut self,
377 info: &ProcInfo,
378 buffers: ProcBuffers,
379 events: &mut ProcEvents,
380 _extra: &mut ProcExtra,
381 ) -> ProcessStatus {
382 for mut event in events.drain() {
383 if let Some(out_stream_event) = event.downcast_mut::<NewOutputStreamEvent>() {
384 core::mem::swap(&mut self.prod, &mut out_stream_event.prod);
387 }
388 }
389
390 if !self.shared_state.stream_active.load(Ordering::Relaxed)
391 || self.shared_state.paused.load(Ordering::Relaxed)
392 {
393 return ProcessStatus::Bypass;
394 }
395
396 let Some(prod) = &mut self.prod else {
397 return ProcessStatus::Bypass;
398 };
399
400 self.shared_state
403 .channel_started
404 .store(true, Ordering::Relaxed);
405
406 let status = prod.push(buffers.inputs, 0..info.frames);
407
408 match status {
409 PushStatus::OverflowOccurred {
410 num_frames_pushed: _,
411 } => {
412 self.shared_state
413 .overflow_occurred
414 .store(true, Ordering::Relaxed);
415 }
416 PushStatus::UnderflowCorrected {
417 num_zero_frames_pushed: _,
418 } => {
419 self.shared_state
420 .underflow_occurred
421 .store(true, Ordering::Relaxed);
422 }
423 _ => {}
424 }
425
426 ProcessStatus::Bypass
427 }
428
429 fn stream_stopped(&mut self, _logger: &mut RealtimeLogger) {
430 self.shared_state
431 .stream_active
432 .store(false, Ordering::Relaxed);
433 self.prod = None;
434 }
435}
436
437pub struct NewOutputStreamEvent {
438 prod: Option<fixed_resample::ResamplingProd<f32, MAX_CHANNELS>>,
439}
440
441impl From<NewOutputStreamEvent> for NodeEventType {
442 fn from(value: NewOutputStreamEvent) -> Self {
443 NodeEventType::custom(value)
444 }
445}