1use atomic_float::AtomicF64;
2use core::any::Any;
3use firewheel_core::{
4 channel_config::{ChannelConfig, ChannelCount},
5 clock::{ClockSamples, ClockSeconds, MusicalTime, MusicalTransport},
6 collector::Collector,
7 dsp::declick::DeclickValues,
8 event::{NodeEvent, NodeEventType},
9 node::{AudioNode, DynAudioNode, NodeID},
10 StreamInfo,
11};
12use ringbuf::traits::{Consumer, Producer, Split};
13use smallvec::SmallVec;
14use std::{
15 num::NonZeroU32,
16 sync::{
17 atomic::{AtomicI64, Ordering},
18 Arc,
19 },
20};
21
22use crate::{
23 backend::{AudioBackend, DeviceInfo},
24 error::{AddEdgeError, StartStreamError, UpdateError},
25 graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
26 processor::{
27 ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
28 },
29};
30
31#[derive(Debug, Clone, Copy, PartialEq)]
33pub struct FirewheelConfig {
34 pub num_graph_inputs: ChannelCount,
36 pub num_graph_outputs: ChannelCount,
38 pub hard_clip_outputs: bool,
47 pub initial_node_capacity: u32,
51 pub initial_edge_capacity: u32,
55 pub declick_seconds: f32,
60 pub initial_event_group_capacity: u32,
64 pub channel_capacity: u32,
68 pub event_queue_capacity: u32,
72}
73
74impl Default for FirewheelConfig {
75 fn default() -> Self {
76 Self {
77 num_graph_inputs: ChannelCount::ZERO,
78 num_graph_outputs: ChannelCount::STEREO,
79 hard_clip_outputs: false,
80 initial_node_capacity: 128,
81 initial_edge_capacity: 256,
82 declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
83 initial_event_group_capacity: 128,
84 channel_capacity: 64,
85 event_queue_capacity: 128,
86 }
87 }
88}
89
90struct ActiveState<B: AudioBackend> {
91 backend_handle: B,
92 stream_info: StreamInfo,
93}
94
95pub struct FirewheelCtx<B: AudioBackend> {
97 graph: AudioGraph,
98
99 to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
100 from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
101
102 active_state: Option<ActiveState<B>>,
103
104 processor_channel: Option<(
105 ringbuf::HeapCons<ContextToProcessorMsg>,
106 ringbuf::HeapProd<ProcessorToContextMsg>,
107 )>,
108 processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner>>,
109
110 clock_shared: Arc<ClockValues>,
111
112 event_group_pool: Vec<Vec<NodeEvent>>,
114 event_group: Vec<NodeEvent>,
115 initial_event_group_capacity: usize,
116
117 config: FirewheelConfig,
118}
119
120impl<B: AudioBackend> FirewheelCtx<B> {
121 pub fn new(config: FirewheelConfig) -> Self {
123 let clock_shared = Arc::new(ClockValues {
124 seconds: AtomicF64::new(0.0),
125 samples: AtomicI64::new(0),
126 musical: AtomicF64::new(0.0),
127 });
128
129 let (to_processor_tx, from_context_rx) =
130 ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
131 let (to_context_tx, from_processor_rx) =
132 ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
133 .split();
134
135 let initial_event_group_capacity = config.initial_event_group_capacity as usize;
136 let mut event_group_pool = Vec::with_capacity(16);
137 for _ in 0..3 {
138 event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
139 }
140
141 Self {
142 graph: AudioGraph::new(&config),
143 to_processor_tx,
144 from_processor_rx,
145 active_state: None,
146 processor_channel: Some((from_context_rx, to_context_tx)),
147 processor_drop_rx: None,
148 clock_shared: Arc::clone(&clock_shared),
149 event_group_pool,
150 event_group: Vec::with_capacity(initial_event_group_capacity),
151 initial_event_group_capacity,
152 config,
153 }
154 }
155
156 pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
158 B::available_input_devices()
159 }
160
161 pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
163 B::available_output_devices()
164 }
165
166 pub fn can_start_stream(&self) -> bool {
176 if self.is_audio_stream_running() {
177 false
178 } else if let Some(rx) = &self.processor_drop_rx {
179 rx.try_peek().is_some()
180 } else {
181 true
182 }
183 }
184
185 pub fn start_stream(
196 &mut self,
197 config: B::Config,
198 ) -> Result<(), StartStreamError<B::StartStreamError>> {
199 if self.is_audio_stream_running() {
200 return Err(StartStreamError::AlreadyStarted);
201 }
202
203 if !self.can_start_stream() {
204 return Err(StartStreamError::OldStreamNotFinishedStopping);
205 }
206
207 let (mut backend_handle, mut stream_info) =
208 B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
209
210 stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
211 stream_info.declick_frames = NonZeroU32::new(
212 (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
213 )
214 .unwrap_or(NonZeroU32::MIN);
215
216 let schedule = self.graph.compile(&stream_info)?;
217
218 let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner>::new(1).split();
219
220 let processor =
221 if let Some((from_context_rx, to_context_tx)) = self.processor_channel.take() {
222 FirewheelProcessorInner::new(
223 from_context_rx,
224 to_context_tx,
225 Arc::clone(&self.clock_shared),
226 self.graph.node_capacity(),
227 &stream_info,
228 self.config.hard_clip_outputs,
229 )
230 } else {
231 let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
232
233 if processor.poisoned {
234 panic!("The audio thread has panicked!");
235 }
236
237 processor.new_stream(&stream_info);
238
239 processor
240 };
241
242 backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
243
244 if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
245 {
246 panic!("Firewheel message channel is full!");
247 }
248
249 self.active_state = Some(ActiveState {
250 backend_handle,
251 stream_info,
252 });
253 self.processor_drop_rx = Some(drop_rx);
254
255 Ok(())
256 }
257
258 pub fn stop_stream(&mut self) {
260 self.active_state = None;
263 self.graph.deactivate();
264 }
265
266 pub fn is_audio_stream_running(&self) -> bool {
268 self.active_state.is_some()
269 }
270
271 pub fn stream_info(&self) -> Option<&StreamInfo> {
275 self.active_state.as_ref().map(|s| &s.stream_info)
276 }
277
278 pub fn clock_now(&self) -> ClockSeconds {
285 ClockSeconds(self.clock_shared.seconds.load(Ordering::Relaxed))
286 }
287
288 pub fn clock_samples(&self) -> ClockSamples {
296 ClockSamples(self.clock_shared.samples.load(Ordering::Relaxed))
297 }
298
299 pub fn clock_musical(&self) -> MusicalTime {
303 MusicalTime(self.clock_shared.musical.load(Ordering::Relaxed))
304 }
305
306 pub fn set_transport(
314 &mut self,
315 transport: Option<MusicalTransport>,
316 ) -> Result<(), UpdateError<B::StreamError>> {
317 self.send_message_to_processor(ContextToProcessorMsg::SetTransport(transport))
318 .map_err(|(_, e)| e)
319 }
320
321 pub fn start_or_restart_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
325 self.send_message_to_processor(ContextToProcessorMsg::StartOrRestartTransport)
326 .map_err(|(_, e)| e)
327 }
328
329 pub fn pause_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
333 self.send_message_to_processor(ContextToProcessorMsg::PauseTransport)
334 .map_err(|(_, e)| e)
335 }
336
337 pub fn resume_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
341 self.send_message_to_processor(ContextToProcessorMsg::ResumeTransport)
342 .map_err(|(_, e)| e)
343 }
344
345 pub fn stop_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
349 self.send_message_to_processor(ContextToProcessorMsg::StopTransport)
350 .map_err(|(_, e)| e)
351 }
352
353 pub fn hard_clip_outputs(&self) -> bool {
355 self.config.hard_clip_outputs
356 }
357
358 pub fn set_hard_clip_outputs(
367 &mut self,
368 hard_clip_outputs: bool,
369 ) -> Result<(), UpdateError<B::StreamError>> {
370 if self.config.hard_clip_outputs == hard_clip_outputs {
371 return Ok(());
372 }
373 self.config.hard_clip_outputs = hard_clip_outputs;
374
375 self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
376 .map_err(|(_, e)| e)
377 }
378
379 pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
383 firewheel_core::collector::GlobalCollector.collect();
384
385 for msg in self.from_processor_rx.pop_iter() {
386 match msg {
387 ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
388 event_group.clear();
389 self.event_group_pool.push(event_group);
390 }
391 ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
392 let _ = schedule_data;
393 }
394 }
395 }
396
397 self.graph.update(
398 self.active_state.as_ref().map(|s| &s.stream_info),
399 &mut self.event_group,
400 );
401
402 if let Some(active_state) = &mut self.active_state {
403 if let Err(e) = active_state.backend_handle.poll_status() {
404 self.active_state = None;
405 self.graph.deactivate();
406
407 return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
408 }
409
410 if self
411 .processor_drop_rx
412 .as_ref()
413 .unwrap()
414 .try_peek()
415 .is_some()
416 {
417 self.active_state = None;
418 self.graph.deactivate();
419
420 return Err(UpdateError::StreamStoppedUnexpectedly(None));
421 }
422 }
423
424 if self.is_audio_stream_running() {
425 if self.graph.needs_compile() {
426 let schedule_data = self
427 .graph
428 .compile(&self.active_state.as_ref().unwrap().stream_info)?;
429
430 if let Err((msg, e)) = self
431 .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
432 {
433 let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
434 unreachable!();
435 };
436
437 self.graph.on_schedule_send_failed(schedule);
438
439 return Err(e);
440 }
441 }
442
443 if !self.event_group.is_empty() {
444 let mut next_event_group = self
445 .event_group_pool
446 .pop()
447 .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
448 std::mem::swap(&mut next_event_group, &mut self.event_group);
449
450 if let Err((msg, e)) = self
451 .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
452 {
453 let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
454 unreachable!();
455 };
456
457 std::mem::swap(&mut event_group, &mut self.event_group);
458 self.event_group_pool.push(event_group);
459
460 return Err(e);
461 }
462 }
463 }
464
465 Ok(())
466 }
467
468 pub fn graph_in_node_id(&self) -> NodeID {
470 self.graph.graph_in_node()
471 }
472
473 pub fn graph_out_node_id(&self) -> NodeID {
475 self.graph.graph_out_node()
476 }
477
478 pub fn add_node<T: AudioNode + 'static>(
480 &mut self,
481 node: T,
482 config: Option<T::Configuration>,
483 ) -> NodeID {
484 self.graph.add_node(node, config)
485 }
486
487 pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
489 self.graph.add_dyn_node(node)
490 }
491
492 pub fn remove_node(&mut self, node_id: NodeID) -> Result<SmallVec<[EdgeID; 4]>, ()> {
504 self.graph.remove_node(node_id)
505 }
506
507 pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
509 self.graph.node_info(id)
510 }
511
512 pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
514 self.graph.node_state(id)
515 }
516
517 pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
519 self.graph.node_state_dyn(id)
520 }
521
522 pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
524 self.graph.node_state_mut(id)
525 }
526
527 pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
528 self.graph.node_state_dyn_mut(id)
529 }
530
531 pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
533 self.graph.nodes()
534 }
535
536 pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
538 self.graph.edges()
539 }
540
541 pub fn set_graph_channel_config(
545 &mut self,
546 channel_config: ChannelConfig,
547 ) -> SmallVec<[EdgeID; 4]> {
548 self.graph.set_graph_channel_config(channel_config)
549 }
550
551 pub fn connect(
569 &mut self,
570 src_node: NodeID,
571 dst_node: NodeID,
572 ports_src_dst: &[(PortIdx, PortIdx)],
573 check_for_cycles: bool,
574 ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
575 self.graph
576 .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
577 }
578
579 pub fn disconnect(
590 &mut self,
591 src_node: NodeID,
592 dst_node: NodeID,
593 ports_src_dst: &[(PortIdx, PortIdx)],
594 ) -> bool {
595 self.graph.disconnect(src_node, dst_node, ports_src_dst)
596 }
597
598 pub fn disconnect_all_between(
603 &mut self,
604 src_node: NodeID,
605 dst_node: NodeID,
606 ) -> SmallVec<[EdgeID; 4]> {
607 self.graph.disconnect_all_between(src_node, dst_node)
608 }
609
610 pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
614 self.graph.disconnect_by_edge_id(edge_id)
615 }
616
617 pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
619 self.graph.edge(edge_id)
620 }
621
622 pub fn cycle_detected(&mut self) -> bool {
626 self.graph.cycle_detected()
627 }
628
629 pub fn queue_event(&mut self, event: NodeEvent) {
634 self.event_group.push(event);
635 }
636
637 pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
642 self.queue_event(NodeEvent { node_id, event });
643 }
644
645 fn send_message_to_processor(
646 &mut self,
647 msg: ContextToProcessorMsg,
648 ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
649 self.to_processor_tx
650 .try_push(msg)
651 .map_err(|msg| (msg, UpdateError::MsgChannelFull))
652 }
653}
654
655impl<B: AudioBackend> Drop for FirewheelCtx<B> {
656 fn drop(&mut self) {
657 self.stop_stream();
658
659 #[cfg(not(target_family = "wasm"))]
662 if let Some(drop_rx) = self.processor_drop_rx.take() {
663 let now = std::time::Instant::now();
664
665 while drop_rx.try_peek().is_none() {
666 if now.elapsed() > std::time::Duration::from_secs(2) {
667 break;
668 }
669
670 std::thread::sleep(std::time::Duration::from_millis(2));
671 }
672 }
673
674 firewheel_core::collector::GlobalCollector.collect();
675 }
676}
677
678pub(crate) struct ClockValues {
679 pub seconds: AtomicF64,
680 pub samples: AtomicI64,
681 pub musical: AtomicF64,
682}
683
684impl<B: AudioBackend> FirewheelCtx<B> {
685 pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
687 ContextQueue { context: self, id }
688 }
689}
690
691pub struct ContextQueue<'a, B: AudioBackend> {
712 context: &'a mut FirewheelCtx<B>,
713 id: NodeID,
714}
715
716impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
717 fn push(&mut self, data: NodeEventType) {
718 self.context.queue_event(NodeEvent {
719 event: data,
720 node_id: self.id,
721 });
722 }
723}