1use bevy_platform::sync::{
2 atomic::{AtomicI64, Ordering},
3 Arc,
4};
5use core::any::Any;
6use core::num::NonZeroU32;
7use firewheel_core::{
8 atomic_float::AtomicF64,
9 channel_config::{ChannelConfig, ChannelCount},
10 clock::{ClockSamples, ClockSeconds, MusicalTime, MusicalTransport},
11 collector::Collector,
12 dsp::declick::DeclickValues,
13 event::{NodeEvent, NodeEventType},
14 node::{AudioNode, DynAudioNode, NodeID},
15 StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20use crate::{
21 backend::{AudioBackend, DeviceInfo},
22 error::{AddEdgeError, StartStreamError, UpdateError},
23 graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
24 processor::{
25 ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
26 },
27};
28
29#[derive(Debug, Clone, Copy, PartialEq)]
31pub struct FirewheelConfig {
32 pub num_graph_inputs: ChannelCount,
34 pub num_graph_outputs: ChannelCount,
36 pub hard_clip_outputs: bool,
45 pub initial_node_capacity: u32,
49 pub initial_edge_capacity: u32,
53 pub declick_seconds: f32,
58 pub initial_event_group_capacity: u32,
62 pub channel_capacity: u32,
66 pub event_queue_capacity: u32,
70}
71
72impl Default for FirewheelConfig {
73 fn default() -> Self {
74 Self {
75 num_graph_inputs: ChannelCount::ZERO,
76 num_graph_outputs: ChannelCount::STEREO,
77 hard_clip_outputs: false,
78 initial_node_capacity: 128,
79 initial_edge_capacity: 256,
80 declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
81 initial_event_group_capacity: 128,
82 channel_capacity: 64,
83 event_queue_capacity: 128,
84 }
85 }
86}
87
88struct ActiveState<B: AudioBackend> {
89 backend_handle: B,
90 stream_info: StreamInfo,
91}
92
93pub struct FirewheelCtx<B: AudioBackend> {
95 graph: AudioGraph,
96
97 to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
98 from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
99
100 active_state: Option<ActiveState<B>>,
101
102 processor_channel: Option<(
103 ringbuf::HeapCons<ContextToProcessorMsg>,
104 ringbuf::HeapProd<ProcessorToContextMsg>,
105 )>,
106 processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner>>,
107
108 clock_shared: Arc<ClockValues>,
109
110 event_group_pool: Vec<Vec<NodeEvent>>,
112 event_group: Vec<NodeEvent>,
113 initial_event_group_capacity: usize,
114
115 config: FirewheelConfig,
116}
117
118impl<B: AudioBackend> FirewheelCtx<B> {
119 pub fn new(config: FirewheelConfig) -> Self {
121 let clock_shared = Arc::new(ClockValues {
122 seconds: AtomicF64::new(0.0),
123 samples: AtomicI64::new(0),
124 musical: AtomicF64::new(0.0),
125 });
126
127 let (to_processor_tx, from_context_rx) =
128 ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
129 let (to_context_tx, from_processor_rx) =
130 ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
131 .split();
132
133 let initial_event_group_capacity = config.initial_event_group_capacity as usize;
134 let mut event_group_pool = Vec::with_capacity(16);
135 for _ in 0..3 {
136 event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
137 }
138
139 Self {
140 graph: AudioGraph::new(&config),
141 to_processor_tx,
142 from_processor_rx,
143 active_state: None,
144 processor_channel: Some((from_context_rx, to_context_tx)),
145 processor_drop_rx: None,
146 clock_shared: Arc::clone(&clock_shared),
147 event_group_pool,
148 event_group: Vec::with_capacity(initial_event_group_capacity),
149 initial_event_group_capacity,
150 config,
151 }
152 }
153
154 pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
156 B::available_input_devices()
157 }
158
159 pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
161 B::available_output_devices()
162 }
163
164 pub fn can_start_stream(&self) -> bool {
174 if self.is_audio_stream_running() {
175 false
176 } else if let Some(rx) = &self.processor_drop_rx {
177 rx.try_peek().is_some()
178 } else {
179 true
180 }
181 }
182
183 pub fn start_stream(
194 &mut self,
195 config: B::Config,
196 ) -> Result<(), StartStreamError<B::StartStreamError>> {
197 if self.is_audio_stream_running() {
198 return Err(StartStreamError::AlreadyStarted);
199 }
200
201 if !self.can_start_stream() {
202 return Err(StartStreamError::OldStreamNotFinishedStopping);
203 }
204
205 let (mut backend_handle, mut stream_info) =
206 B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
207
208 stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
209 stream_info.declick_frames = NonZeroU32::new(
210 (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
211 )
212 .unwrap_or(NonZeroU32::MIN);
213
214 let schedule = self.graph.compile(&stream_info)?;
215
216 let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner>::new(1).split();
217
218 let processor =
219 if let Some((from_context_rx, to_context_tx)) = self.processor_channel.take() {
220 FirewheelProcessorInner::new(
221 from_context_rx,
222 to_context_tx,
223 Arc::clone(&self.clock_shared),
224 self.graph.node_capacity(),
225 &stream_info,
226 self.config.hard_clip_outputs,
227 )
228 } else {
229 let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
230
231 if processor.poisoned {
232 panic!("The audio thread has panicked!");
233 }
234
235 processor.new_stream(&stream_info);
236
237 processor
238 };
239
240 backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
241
242 if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
243 {
244 panic!("Firewheel message channel is full!");
245 }
246
247 self.active_state = Some(ActiveState {
248 backend_handle,
249 stream_info,
250 });
251 self.processor_drop_rx = Some(drop_rx);
252
253 Ok(())
254 }
255
256 pub fn stop_stream(&mut self) {
258 self.active_state = None;
261 self.graph.deactivate();
262 }
263
264 pub fn is_audio_stream_running(&self) -> bool {
266 self.active_state.is_some()
267 }
268
269 pub fn stream_info(&self) -> Option<&StreamInfo> {
273 self.active_state.as_ref().map(|s| &s.stream_info)
274 }
275
276 pub fn clock_now(&self) -> ClockSeconds {
283 ClockSeconds(self.clock_shared.seconds.load(Ordering::Relaxed))
284 }
285
286 pub fn clock_samples(&self) -> ClockSamples {
294 ClockSamples(self.clock_shared.samples.load(Ordering::Relaxed))
295 }
296
297 pub fn clock_musical(&self) -> MusicalTime {
301 MusicalTime(self.clock_shared.musical.load(Ordering::Relaxed))
302 }
303
304 pub fn set_transport(
312 &mut self,
313 transport: Option<MusicalTransport>,
314 ) -> Result<(), UpdateError<B::StreamError>> {
315 self.send_message_to_processor(ContextToProcessorMsg::SetTransport(transport))
316 .map_err(|(_, e)| e)
317 }
318
319 pub fn start_or_restart_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
323 self.send_message_to_processor(ContextToProcessorMsg::StartOrRestartTransport)
324 .map_err(|(_, e)| e)
325 }
326
327 pub fn pause_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
331 self.send_message_to_processor(ContextToProcessorMsg::PauseTransport)
332 .map_err(|(_, e)| e)
333 }
334
335 pub fn resume_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
339 self.send_message_to_processor(ContextToProcessorMsg::ResumeTransport)
340 .map_err(|(_, e)| e)
341 }
342
343 pub fn stop_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
347 self.send_message_to_processor(ContextToProcessorMsg::StopTransport)
348 .map_err(|(_, e)| e)
349 }
350
351 pub fn hard_clip_outputs(&self) -> bool {
353 self.config.hard_clip_outputs
354 }
355
356 pub fn set_hard_clip_outputs(
365 &mut self,
366 hard_clip_outputs: bool,
367 ) -> Result<(), UpdateError<B::StreamError>> {
368 if self.config.hard_clip_outputs == hard_clip_outputs {
369 return Ok(());
370 }
371 self.config.hard_clip_outputs = hard_clip_outputs;
372
373 self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
374 .map_err(|(_, e)| e)
375 }
376
377 pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
381 firewheel_core::collector::GlobalCollector.collect();
382
383 for msg in self.from_processor_rx.pop_iter() {
384 match msg {
385 ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
386 event_group.clear();
387 self.event_group_pool.push(event_group);
388 }
389 ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
390 let _ = schedule_data;
391 }
392 }
393 }
394
395 self.graph.update(
396 self.active_state.as_ref().map(|s| &s.stream_info),
397 &mut self.event_group,
398 );
399
400 if let Some(active_state) = &mut self.active_state {
401 if let Err(e) = active_state.backend_handle.poll_status() {
402 self.active_state = None;
403 self.graph.deactivate();
404
405 return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
406 }
407
408 if self
409 .processor_drop_rx
410 .as_ref()
411 .unwrap()
412 .try_peek()
413 .is_some()
414 {
415 self.active_state = None;
416 self.graph.deactivate();
417
418 return Err(UpdateError::StreamStoppedUnexpectedly(None));
419 }
420 }
421
422 if self.is_audio_stream_running() {
423 if self.graph.needs_compile() {
424 let schedule_data = self
425 .graph
426 .compile(&self.active_state.as_ref().unwrap().stream_info)?;
427
428 if let Err((msg, e)) = self
429 .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
430 {
431 let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
432 unreachable!();
433 };
434
435 self.graph.on_schedule_send_failed(schedule);
436
437 return Err(e);
438 }
439 }
440
441 if !self.event_group.is_empty() {
442 let mut next_event_group = self
443 .event_group_pool
444 .pop()
445 .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
446 core::mem::swap(&mut next_event_group, &mut self.event_group);
447
448 if let Err((msg, e)) = self
449 .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
450 {
451 let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
452 unreachable!();
453 };
454
455 core::mem::swap(&mut event_group, &mut self.event_group);
456 self.event_group_pool.push(event_group);
457
458 return Err(e);
459 }
460 }
461 }
462
463 Ok(())
464 }
465
466 pub fn graph_in_node_id(&self) -> NodeID {
468 self.graph.graph_in_node()
469 }
470
471 pub fn graph_out_node_id(&self) -> NodeID {
473 self.graph.graph_out_node()
474 }
475
476 pub fn add_node<T: AudioNode + 'static>(
478 &mut self,
479 node: T,
480 config: Option<T::Configuration>,
481 ) -> NodeID {
482 self.graph.add_node(node, config)
483 }
484
485 pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
487 self.graph.add_dyn_node(node)
488 }
489
490 pub fn remove_node(&mut self, node_id: NodeID) -> Result<SmallVec<[EdgeID; 4]>, ()> {
502 self.graph.remove_node(node_id)
503 }
504
505 pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
507 self.graph.node_info(id)
508 }
509
510 pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
512 self.graph.node_state(id)
513 }
514
515 pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
517 self.graph.node_state_dyn(id)
518 }
519
520 pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
522 self.graph.node_state_mut(id)
523 }
524
525 pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
526 self.graph.node_state_dyn_mut(id)
527 }
528
529 pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
531 self.graph.nodes()
532 }
533
534 pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
536 self.graph.edges()
537 }
538
539 pub fn set_graph_channel_config(
543 &mut self,
544 channel_config: ChannelConfig,
545 ) -> SmallVec<[EdgeID; 4]> {
546 self.graph.set_graph_channel_config(channel_config)
547 }
548
549 pub fn connect(
567 &mut self,
568 src_node: NodeID,
569 dst_node: NodeID,
570 ports_src_dst: &[(PortIdx, PortIdx)],
571 check_for_cycles: bool,
572 ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
573 self.graph
574 .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
575 }
576
577 pub fn disconnect(
588 &mut self,
589 src_node: NodeID,
590 dst_node: NodeID,
591 ports_src_dst: &[(PortIdx, PortIdx)],
592 ) -> bool {
593 self.graph.disconnect(src_node, dst_node, ports_src_dst)
594 }
595
596 pub fn disconnect_all_between(
601 &mut self,
602 src_node: NodeID,
603 dst_node: NodeID,
604 ) -> SmallVec<[EdgeID; 4]> {
605 self.graph.disconnect_all_between(src_node, dst_node)
606 }
607
608 pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
612 self.graph.disconnect_by_edge_id(edge_id)
613 }
614
615 pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
617 self.graph.edge(edge_id)
618 }
619
620 pub fn cycle_detected(&mut self) -> bool {
624 self.graph.cycle_detected()
625 }
626
627 pub fn queue_event(&mut self, event: NodeEvent) {
632 self.event_group.push(event);
633 }
634
635 pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
640 self.queue_event(NodeEvent { node_id, event });
641 }
642
643 fn send_message_to_processor(
644 &mut self,
645 msg: ContextToProcessorMsg,
646 ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
647 self.to_processor_tx
648 .try_push(msg)
649 .map_err(|msg| (msg, UpdateError::MsgChannelFull))
650 }
651}
652
653impl<B: AudioBackend> Drop for FirewheelCtx<B> {
654 fn drop(&mut self) {
655 self.stop_stream();
656
657 #[cfg(not(target_family = "wasm"))]
660 if let Some(drop_rx) = self.processor_drop_rx.take() {
661 let now = bevy_platform::time::Instant::now();
662
663 while drop_rx.try_peek().is_none() {
664 if now.elapsed() > core::time::Duration::from_secs(2) {
665 break;
666 }
667
668 bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
669 }
670 }
671
672 firewheel_core::collector::GlobalCollector.collect();
673 }
674}
675
676pub(crate) struct ClockValues {
677 pub seconds: AtomicF64,
678 pub samples: AtomicI64,
679 pub musical: AtomicF64,
680}
681
682impl<B: AudioBackend> FirewheelCtx<B> {
683 pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
685 ContextQueue { context: self, id }
686 }
687}
688
689pub struct ContextQueue<'a, B: AudioBackend> {
710 context: &'a mut FirewheelCtx<B>,
711 id: NodeID,
712}
713
714impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
715 fn push(&mut self, data: NodeEventType) {
716 self.context.queue_event(NodeEvent {
717 event: data,
718 node_id: self.id,
719 });
720 }
721}