firewheel_graph/
context.rs

1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::node::ProcStore;
9use firewheel_core::{
10    channel_config::{ChannelConfig, ChannelCount},
11    clock::AudioClock,
12    dsp::declick::DeclickValues,
13    event::{NodeEvent, NodeEventType},
14    node::{AudioNode, DynAudioNode, NodeID},
15    StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(not(feature = "std"))]
21use num_traits::Float;
22
23#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
24use bevy_platform::prelude::Box;
25#[cfg(not(feature = "std"))]
26use bevy_platform::prelude::Vec;
27
28use crate::error::RemoveNodeError;
29use crate::processor::BufferOutOfSpaceMode;
30use crate::{
31    backend::{AudioBackend, DeviceInfo},
32    error::{AddEdgeError, StartStreamError, UpdateError},
33    graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
34    processor::{
35        ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
36        SharedClock,
37    },
38};
39
40#[cfg(feature = "scheduled_events")]
41use crate::processor::ClearScheduledEventsEvent;
42#[cfg(feature = "scheduled_events")]
43use firewheel_core::clock::EventInstant;
44
45#[cfg(feature = "musical_transport")]
46use firewheel_core::clock::TransportState;
47
48/// The configuration of a Firewheel context.
49#[derive(Debug, Clone, Copy, PartialEq)]
50#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub struct FirewheelConfig {
53    /// The number of input channels in the audio graph.
54    pub num_graph_inputs: ChannelCount,
55    /// The number of output channels in the audio graph.
56    pub num_graph_outputs: ChannelCount,
57    /// If `true`, then all outputs will be hard clipped at 0db to help
58    /// protect the system's speakers.
59    ///
60    /// Note that most operating systems already hard clip the output,
61    /// so this is usually not needed (TODO: Do research to see if this
62    /// assumption is true.)
63    ///
64    /// By default this is set to `false`.
65    pub hard_clip_outputs: bool,
66    /// An initial capacity to allocate for the nodes in the audio graph.
67    ///
68    /// By default this is set to `64`.
69    pub initial_node_capacity: u32,
70    /// An initial capacity to allocate for the edges in the audio graph.
71    ///
72    /// By default this is set to `256`.
73    pub initial_edge_capacity: u32,
74    /// The amount of time in seconds to fade in/out when pausing/resuming
75    /// to avoid clicks and pops.
76    ///
77    /// By default this is set to `10.0 / 1_000.0`.
78    pub declick_seconds: f32,
79    /// The initial capacity for a group of events.
80    ///
81    /// By default this is set to `128`.
82    pub initial_event_group_capacity: u32,
83    /// The capacity of the engine's internal message channel.
84    ///
85    /// By default this is set to `64`.
86    pub channel_capacity: u32,
87    /// The maximum number of events that can be sent in a single call
88    /// to [`AudioNodeProcessor::process`].
89    ///
90    /// By default this is set to `128`.
91    ///
92    /// [`AudioNodeProcessor::process`]: firewheel_core::node::AudioNodeProcessor::process
93    pub event_queue_capacity: usize,
94    /// The maximum number of immediate events (events that do *NOT* have a
95    /// scheduled time component) that can be stored at once in the audio
96    /// thread.
97    ///
98    /// By default this is set to `512`.
99    pub immediate_event_capacity: usize,
100    /// The maximum number of scheduled events (events that have a scheduled
101    /// time component) that can be stored at once in the audio thread.
102    ///
103    /// This can be set to `0` to save some memory if you do not plan on using
104    /// scheduled events.
105    ///
106    /// By default this is set to `512`.
107    #[cfg(feature = "scheduled_events")]
108    pub scheduled_event_capacity: usize,
109    /// How to handle event buffers on the audio thread running out of space.
110    ///
111    /// By default this is set to [`BufferOutOfSpaceMode::AllocateOnAudioThread`].
112    pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
113
114    /// The configuration of the realtime safe logger.
115    pub logger_config: RealtimeLoggerConfig,
116
117    /// Force all buffers in the audio graph to be cleared when processing.
118    ///
119    /// This is only meant to be used when diagnosing a misbehaving audio node.
120    /// Enabling this significantly increases processing overhead.
121    ///
122    /// If enabling this fixes audio glitching issues, then notify the node
123    /// author of the misbehavior.
124    ///
125    /// By default this is set to `false`.
126    pub debug_force_clear_buffers: bool,
127
128    /// The initial number of slots to allocate for the [`ProcStore`].
129    ///
130    /// By default this is set to `8`.
131    pub proc_store_capacity: usize,
132}
133
134impl Default for FirewheelConfig {
135    fn default() -> Self {
136        Self {
137            num_graph_inputs: ChannelCount::ZERO,
138            num_graph_outputs: ChannelCount::STEREO,
139            hard_clip_outputs: false,
140            initial_node_capacity: 128,
141            initial_edge_capacity: 256,
142            declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
143            initial_event_group_capacity: 128,
144            channel_capacity: 64,
145            event_queue_capacity: 128,
146            immediate_event_capacity: 512,
147            #[cfg(feature = "scheduled_events")]
148            scheduled_event_capacity: 512,
149            buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
150            logger_config: RealtimeLoggerConfig::default(),
151            debug_force_clear_buffers: false,
152            proc_store_capacity: 8,
153        }
154    }
155}
156
157struct ActiveState<B: AudioBackend> {
158    backend_handle: B,
159    stream_info: StreamInfo,
160}
161
162/// A Firewheel context
163pub struct FirewheelCtx<B: AudioBackend> {
164    graph: AudioGraph,
165
166    to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
167    from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
168    logger_rx: RealtimeLoggerMainThread,
169
170    active_state: Option<ActiveState<B>>,
171
172    processor_channel: Option<(
173        ringbuf::HeapCons<ContextToProcessorMsg>,
174        ringbuf::HeapProd<ProcessorToContextMsg>,
175        triple_buffer::Input<SharedClock<B::Instant>>,
176        RealtimeLogger,
177        ProcStore,
178    )>,
179    processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
180
181    shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
182    sample_rate: NonZeroU32,
183    sample_rate_recip: f64,
184
185    #[cfg(feature = "musical_transport")]
186    transport_state: Box<TransportState>,
187    #[cfg(feature = "musical_transport")]
188    transport_state_alloc_reuse: Option<Box<TransportState>>,
189
190    // Re-use the allocations for groups of events.
191    event_group_pool: Vec<Vec<NodeEvent>>,
192    event_group: Vec<NodeEvent>,
193    initial_event_group_capacity: usize,
194
195    #[cfg(feature = "scheduled_events")]
196    queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
197
198    config: FirewheelConfig,
199}
200
201impl<B: AudioBackend> FirewheelCtx<B> {
202    /// Create a new Firewheel context.
203    pub fn new(config: FirewheelConfig) -> Self {
204        let (to_processor_tx, from_context_rx) =
205            ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
206        let (to_context_tx, from_processor_rx) =
207            ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
208                .split();
209
210        let initial_event_group_capacity = config.initial_event_group_capacity as usize;
211        let mut event_group_pool = Vec::with_capacity(16);
212        for _ in 0..3 {
213            event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
214        }
215
216        let (shared_clock_input, shared_clock_output) =
217            triple_buffer::triple_buffer(&SharedClock::default());
218
219        let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
220
221        let proc_store = ProcStore::with_capacity(config.proc_store_capacity);
222
223        Self {
224            graph: AudioGraph::new(&config),
225            to_processor_tx,
226            from_processor_rx,
227            logger_rx,
228            active_state: None,
229            processor_channel: Some((
230                from_context_rx,
231                to_context_tx,
232                shared_clock_input,
233                logger,
234                proc_store,
235            )),
236            processor_drop_rx: None,
237            shared_clock_output: RefCell::new(shared_clock_output),
238            sample_rate: NonZeroU32::new(44100).unwrap(),
239            sample_rate_recip: 44100.0f64.recip(),
240            #[cfg(feature = "musical_transport")]
241            transport_state: Box::new(TransportState::default()),
242            #[cfg(feature = "musical_transport")]
243            transport_state_alloc_reuse: None,
244            event_group_pool,
245            event_group: Vec::with_capacity(initial_event_group_capacity),
246            initial_event_group_capacity,
247            #[cfg(feature = "scheduled_events")]
248            queued_clear_scheduled_events: Vec::new(),
249            config,
250        }
251    }
252
253    /// Get an immutable reference to the processor store.
254    ///
255    /// If an audio stream is currently running, this will return `None`.
256    pub fn proc_store(&self) -> Option<&ProcStore> {
257        if let Some((_, _, _, _, proc_store)) = &self.processor_channel {
258            Some(proc_store)
259        } else if let Some(processor) = self.processor_drop_rx.as_ref().unwrap().last() {
260            if processor.poisoned {
261                panic!("The audio thread has panicked!");
262            }
263
264            Some(&processor.extra.store)
265        } else {
266            None
267        }
268    }
269
270    /// Get a mutable reference to the processor store.
271    ///
272    /// If an audio stream is currently running, this will return `None`.
273    pub fn proc_store_mut(&mut self) -> Option<&mut ProcStore> {
274        if let Some((_, _, _, _, proc_store)) = &mut self.processor_channel {
275            Some(proc_store)
276        } else if let Some(processor) = self.processor_drop_rx.as_mut().unwrap().last_mut() {
277            if processor.poisoned {
278                panic!("The audio thread has panicked!");
279            }
280
281            Some(&mut processor.extra.store)
282        } else {
283            None
284        }
285    }
286
287    /// Get a reference to the currently active instance of the backend. Returns `None` if the backend has not
288    /// yet been initialized with `start_stream`.
289    pub fn active_backend(&self) -> Option<&B> {
290        self.active_state
291            .as_ref()
292            .map(|state| &state.backend_handle)
293    }
294
295    /// Get a mutable reference to the currently active instance of the backend. Returns `None` if the backend has not
296    /// yet been initialized with `start_stream`.
297    pub fn active_backend_mut(&mut self) -> Option<&mut B> {
298        self.active_state
299            .as_mut()
300            .map(|state| &mut state.backend_handle)
301    }
302
303    /// Get a list of the available audio input devices.
304    pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
305        B::available_input_devices()
306    }
307
308    /// Get a list of the available audio output devices.
309    pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
310        B::available_output_devices()
311    }
312
313    /// Returns `true` if an audio stream can be started right now.
314    ///
315    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
316    /// old stream to be fully stopped. This method is used to check if it has been
317    /// dropped yet.
318    ///
319    /// Note, in rare cases where the audio thread crashes without cleanly dropping
320    /// its contents, this may never return `true`. Consider adding a timeout to
321    /// avoid deadlocking.
322    pub fn can_start_stream(&self) -> bool {
323        if self.is_audio_stream_running() {
324            false
325        } else if let Some(rx) = &self.processor_drop_rx {
326            rx.try_peek().is_some()
327        } else {
328            true
329        }
330    }
331
332    /// Start an audio stream for this context. Only one audio stream can exist on
333    /// a context at a time.
334    ///
335    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
336    /// old stream to be fully stopped. Use [`FirewheelCtx::can_start_stream`] to
337    /// check if it has been dropped yet.
338    ///
339    /// Note, in rare cases where the audio thread crashes without cleanly dropping
340    /// its contents, this may never succeed. Consider adding a timeout to avoid
341    /// deadlocking.
342    pub fn start_stream(
343        &mut self,
344        config: B::Config,
345    ) -> Result<(), StartStreamError<B::StartStreamError>> {
346        if self.is_audio_stream_running() {
347            return Err(StartStreamError::AlreadyStarted);
348        }
349
350        if !self.can_start_stream() {
351            return Err(StartStreamError::OldStreamNotFinishedStopping);
352        }
353
354        let (mut backend_handle, mut stream_info) =
355            B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
356
357        stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
358        stream_info.declick_frames = NonZeroU32::new(
359            (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
360        )
361        .unwrap_or(NonZeroU32::MIN);
362
363        let maybe_processor = self.processor_channel.take();
364
365        stream_info.prev_sample_rate = if maybe_processor.is_some() {
366            stream_info.sample_rate
367        } else {
368            self.sample_rate
369        };
370
371        self.sample_rate = stream_info.sample_rate;
372        self.sample_rate_recip = stream_info.sample_rate_recip;
373
374        let schedule = self.graph.compile(&stream_info)?;
375
376        let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
377
378        let processor =
379            if let Some((from_context_rx, to_context_tx, shared_clock_input, logger, proc_store)) =
380                maybe_processor
381            {
382                FirewheelProcessorInner::new(
383                    from_context_rx,
384                    to_context_tx,
385                    shared_clock_input,
386                    self.config.immediate_event_capacity,
387                    #[cfg(feature = "scheduled_events")]
388                    self.config.scheduled_event_capacity,
389                    self.config.event_queue_capacity,
390                    &stream_info,
391                    self.config.hard_clip_outputs,
392                    self.config.buffer_out_of_space_mode,
393                    logger,
394                    self.config.debug_force_clear_buffers,
395                    proc_store,
396                )
397            } else {
398                let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
399
400                if processor.poisoned {
401                    panic!("The audio thread has panicked!");
402                }
403
404                processor.new_stream(&stream_info);
405
406                processor
407            };
408
409        backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
410
411        if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
412        {
413            panic!("Firewheel message channel is full!");
414        }
415
416        self.active_state = Some(ActiveState {
417            backend_handle,
418            stream_info,
419        });
420        self.processor_drop_rx = Some(drop_rx);
421
422        Ok(())
423    }
424
425    /// Stop the audio stream in this context.
426    pub fn stop_stream(&mut self) {
427        // When the backend handle is dropped, the backend will automatically
428        // stop its stream.
429        self.active_state = None;
430        self.graph.deactivate();
431    }
432
433    /// Returns `true` if there is currently a running audio stream.
434    pub fn is_audio_stream_running(&self) -> bool {
435        self.active_state.is_some()
436    }
437
438    /// Information about the running audio stream.
439    ///
440    /// Returns `None` if no audio stream is currently running.
441    pub fn stream_info(&self) -> Option<&StreamInfo> {
442        self.active_state.as_ref().map(|s| &s.stream_info)
443    }
444
445    /// Get the current time of the audio clock, without accounting for the delay
446    /// between when the clock was last updated and now.
447    ///
448    /// For most use cases you probably want to use [`FirewheelCtx::audio_clock_corrected`]
449    /// instead, but this method is provided if needed.
450    ///
451    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
452    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
453    /// that has been processed.) For applications where the timing of audio events is
454    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
455    /// OS's clock (`Instant::now()`).
456    ///
457    /// Note, calling this method is not super cheap, so avoid calling it many
458    /// times within the same game loop iteration if possible.
459    pub fn audio_clock(&self) -> AudioClock {
460        // Reading the latest value of the clock doesn't meaningfully mutate
461        // state, so treat it as an immutable operation with interior mutability.
462        //
463        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
464        // will never panic.
465        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
466        let clock = clock_borrowed.read();
467
468        let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
469            .map(|(update_instant, _delay)| update_instant);
470
471        AudioClock {
472            samples: clock.clock_samples,
473            seconds: clock
474                .clock_samples
475                .to_seconds(self.sample_rate, self.sample_rate_recip),
476            #[cfg(feature = "musical_transport")]
477            musical: clock.current_playhead,
478            #[cfg(feature = "musical_transport")]
479            transport_is_playing: clock.transport_is_playing,
480            update_instant,
481        }
482    }
483
484    /// Get the current time of the audio clock.
485    ///
486    /// Unlike, [`FirewheelCtx::audio_clock`], this method accounts for the delay
487    /// between when the audio clock was last updated and now, leading to a more
488    /// accurate result for games and other applications.
489    ///
490    /// If the delay could not be determined (i.e. an audio stream is not currently
491    /// running), then this will assume there was no delay between when the audio
492    /// clock was last updated and now.
493    ///
494    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
495    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
496    /// that has been processed.) For applications where the timing of audio events is
497    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
498    /// OS's clock (`Instant::now()`).
499    ///
500    /// Note, calling this method is not super cheap, so avoid calling it many
501    /// times within the same game loop iteration if possible.
502    pub fn audio_clock_corrected(&self) -> AudioClock {
503        // Reading the latest value of the clock doesn't meaningfully mutate
504        // state, so treat it as an immutable operation with interior mutability.
505        //
506        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
507        // will never panic.
508        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
509        let clock = clock_borrowed.read();
510
511        let Some((update_instant, delay)) =
512            audio_clock_update_instant_and_delay(&clock, &self.active_state)
513        else {
514            // The audio thread is not currently running, so just return the
515            // latest value of the clock.
516            return AudioClock {
517                samples: clock.clock_samples,
518                seconds: clock
519                    .clock_samples
520                    .to_seconds(self.sample_rate, self.sample_rate_recip),
521                #[cfg(feature = "musical_transport")]
522                musical: clock.current_playhead,
523                #[cfg(feature = "musical_transport")]
524                transport_is_playing: clock.transport_is_playing,
525                update_instant: None,
526            };
527        };
528
529        // Account for the delay between when the clock was last updated and now.
530        let delta_seconds = DurationSeconds(delay.as_secs_f64());
531
532        let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
533
534        #[cfg(feature = "musical_transport")]
535        let musical = clock.current_playhead.map(|musical_time| {
536            if clock.transport_is_playing && self.transport_state.transport.is_some() {
537                self.transport_state
538                    .transport
539                    .as_ref()
540                    .unwrap()
541                    .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
542            } else {
543                musical_time
544            }
545        });
546
547        AudioClock {
548            samples,
549            seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
550            #[cfg(feature = "musical_transport")]
551            musical,
552            #[cfg(feature = "musical_transport")]
553            transport_is_playing: clock.transport_is_playing,
554            update_instant: Some(update_instant),
555        }
556    }
557
558    /// Get the instant the audio clock was last updated.
559    ///
560    /// This method accounts for the delay between when the audio clock was last
561    /// updated and now, leading to a more accurate result for games and other
562    /// applications.
563    ///
564    /// If the audio thread is not currently running, or if the delay could not
565    /// be determined for any other reason, then this will return `None`.
566    ///
567    /// Note, calling this method is not super cheap, so avoid calling it many
568    /// times within the same game loop iteration if possible.
569    pub fn audio_clock_instant(&self) -> Option<Instant> {
570        // Reading the latest value of the clock doesn't meaningfully mutate
571        // state, so treat it as an immutable operation with interior mutability.
572        //
573        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
574        // will never panic.
575        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
576        let clock = clock_borrowed.read();
577
578        audio_clock_update_instant_and_delay(&clock, &self.active_state)
579            .map(|(update_instant, _delay)| update_instant)
580    }
581
582    /// Sync the state of the musical transport.
583    ///
584    /// If the message channel is full, then this will return an error.
585    #[cfg(feature = "musical_transport")]
586    pub fn sync_transport(
587        &mut self,
588        transport: &TransportState,
589    ) -> Result<(), UpdateError<B::StreamError>> {
590        if &*self.transport_state != transport {
591            let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
592                *t = transport.clone();
593                t
594            } else {
595                Box::new(transport.clone())
596            };
597
598            self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
599                .map_err(|(_, e)| e)?;
600
601            *self.transport_state = transport.clone();
602        }
603
604        Ok(())
605    }
606
607    /// Get the current transport state.
608    #[cfg(feature = "musical_transport")]
609    pub fn transport_state(&self) -> &TransportState {
610        &self.transport_state
611    }
612
613    /// Get the current transport state.
614    #[cfg(feature = "musical_transport")]
615    pub fn transport(&self) -> &TransportState {
616        &self.transport_state
617    }
618
619    /// Whether or not outputs are being hard clipped at 0dB.
620    pub fn hard_clip_outputs(&self) -> bool {
621        self.config.hard_clip_outputs
622    }
623
624    /// Set whether or not outputs should be hard clipped at 0dB to
625    /// help protect the system's speakers.
626    ///
627    /// Note that most operating systems already hard clip the output,
628    /// so this is usually not needed (TODO: Do research to see if this
629    /// assumption is true.)
630    ///
631    /// If the message channel is full, then this will return an error.
632    pub fn set_hard_clip_outputs(
633        &mut self,
634        hard_clip_outputs: bool,
635    ) -> Result<(), UpdateError<B::StreamError>> {
636        if self.config.hard_clip_outputs == hard_clip_outputs {
637            return Ok(());
638        }
639        self.config.hard_clip_outputs = hard_clip_outputs;
640
641        self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
642            .map_err(|(_, e)| e)
643    }
644
645    /// Update the firewheel context.
646    ///
647    /// This must be called reguarly (i.e. once every frame).
648    pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
649        self.logger_rx.flush();
650
651        firewheel_core::collector::GlobalRtGc::collect();
652
653        for msg in self.from_processor_rx.pop_iter() {
654            match msg {
655                ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
656                    event_group.clear();
657                    self.event_group_pool.push(event_group);
658                }
659                ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
660                    let _ = schedule_data;
661                }
662                #[cfg(feature = "musical_transport")]
663                ProcessorToContextMsg::ReturnTransportState(transport_state) => {
664                    if self.transport_state_alloc_reuse.is_none() {
665                        self.transport_state_alloc_reuse = Some(transport_state);
666                    }
667                }
668                #[cfg(feature = "scheduled_events")]
669                ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
670                    let _ = msgs;
671                }
672            }
673        }
674
675        self.graph.update(
676            self.active_state.as_ref().map(|s| &s.stream_info),
677            &mut self.event_group,
678        );
679
680        if let Some(active_state) = &mut self.active_state {
681            if let Err(e) = active_state.backend_handle.poll_status() {
682                self.active_state = None;
683                self.graph.deactivate();
684
685                return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
686            }
687
688            if self
689                .processor_drop_rx
690                .as_ref()
691                .unwrap()
692                .try_peek()
693                .is_some()
694            {
695                self.active_state = None;
696                self.graph.deactivate();
697
698                return Err(UpdateError::StreamStoppedUnexpectedly(None));
699            }
700        }
701
702        if self.is_audio_stream_running() {
703            if self.graph.needs_compile() {
704                let schedule_data = self
705                    .graph
706                    .compile(&self.active_state.as_ref().unwrap().stream_info)?;
707
708                if let Err((msg, e)) = self
709                    .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
710                {
711                    let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
712                        unreachable!();
713                    };
714
715                    self.graph.on_schedule_send_failed(schedule);
716
717                    return Err(e);
718                }
719            }
720
721            #[cfg(feature = "scheduled_events")]
722            if !self.queued_clear_scheduled_events.is_empty() {
723                let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
724                    self.queued_clear_scheduled_events.drain(..).collect();
725
726                if let Err((msg, e)) = self
727                    .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
728                {
729                    let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
730                        unreachable!();
731                    };
732
733                    self.queued_clear_scheduled_events = msgs.drain(..).collect();
734
735                    return Err(e);
736                }
737            }
738
739            if !self.event_group.is_empty() {
740                let mut next_event_group = self
741                    .event_group_pool
742                    .pop()
743                    .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
744                core::mem::swap(&mut next_event_group, &mut self.event_group);
745
746                if let Err((msg, e)) = self
747                    .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
748                {
749                    let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
750                        unreachable!();
751                    };
752
753                    core::mem::swap(&mut event_group, &mut self.event_group);
754                    self.event_group_pool.push(event_group);
755
756                    return Err(e);
757                }
758            }
759        }
760
761        Ok(())
762    }
763
764    /// The ID of the graph input node
765    pub fn graph_in_node_id(&self) -> NodeID {
766        self.graph.graph_in_node()
767    }
768
769    /// The ID of the graph output node
770    pub fn graph_out_node_id(&self) -> NodeID {
771        self.graph.graph_out_node()
772    }
773
774    /// Add a node to the audio graph.
775    pub fn add_node<T: AudioNode + 'static>(
776        &mut self,
777        node: T,
778        config: Option<T::Configuration>,
779    ) -> NodeID {
780        self.graph.add_node(node, config)
781    }
782
783    /// Add a node to the audio graph which implements the type-erased [`DynAudioNode`] trait.
784    pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
785        self.graph.add_dyn_node(node)
786    }
787
788    /// Remove the given node from the audio graph.
789    ///
790    /// This will automatically remove all edges from the graph that
791    /// were connected to this node.
792    ///
793    /// On success, this returns a list of all edges that were removed
794    /// from the graph as a result of removing this node.
795    ///
796    /// This will return an error if the ID is of the graph input or graph
797    /// output node.
798    pub fn remove_node(
799        &mut self,
800        node_id: NodeID,
801    ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
802        self.graph.remove_node(node_id)
803    }
804
805    /// Get information about a node in the graph.
806    pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
807        self.graph.node_info(id)
808    }
809
810    /// Get an immutable reference to the custom state of a node.
811    pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
812        self.graph.node_state(id)
813    }
814
815    /// Get a type-erased, immutable reference to the custom state of a node.
816    pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
817        self.graph.node_state_dyn(id)
818    }
819
820    /// Get a mutable reference to the custom state of a node.
821    pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
822        self.graph.node_state_mut(id)
823    }
824
825    pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
826        self.graph.node_state_dyn_mut(id)
827    }
828
829    /// Get a list of all the existing nodes in the graph.
830    pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
831        self.graph.nodes()
832    }
833
834    /// Get a list of all the existing edges in the graph.
835    pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
836        self.graph.edges()
837    }
838
839    /// Set the number of input and output channels to and from the audio graph.
840    ///
841    /// Returns the list of edges that were removed.
842    pub fn set_graph_channel_config(
843        &mut self,
844        channel_config: ChannelConfig,
845    ) -> SmallVec<[EdgeID; 4]> {
846        self.graph.set_graph_channel_config(channel_config)
847    }
848
849    /// Add connections (edges) between two nodes to the graph.
850    ///
851    /// * `src_node` - The ID of the source node.
852    /// * `dst_node` - The ID of the destination node.
853    /// * `ports_src_dst` - The port indices for each connection to make,
854    /// where the first value in a tuple is the output port on `src_node`,
855    /// and the second value in that tuple is the input port on `dst_node`.
856    /// * `check_for_cycles` - If `true`, then this will run a check to
857    /// see if adding these edges will create a cycle in the graph, and
858    /// return an error if it does. Note, checking for cycles can be quite
859    /// expensive, so avoid enabling this when calling this method many times
860    /// in a row.
861    ///
862    /// If successful, then this returns a list of edge IDs in order.
863    ///
864    /// If this returns an error, then the audio graph has not been
865    /// modified.
866    pub fn connect(
867        &mut self,
868        src_node: NodeID,
869        dst_node: NodeID,
870        ports_src_dst: &[(PortIdx, PortIdx)],
871        check_for_cycles: bool,
872    ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
873        self.graph
874            .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
875    }
876
877    /// Remove connections (edges) between two nodes from the graph.
878    ///
879    /// * `src_node` - The ID of the source node.
880    /// * `dst_node` - The ID of the destination node.
881    /// * `ports_src_dst` - The port indices for each connection to make,
882    /// where the first value in a tuple is the output port on `src_node`,
883    /// and the second value in that tuple is the input port on `dst_node`.
884    ///
885    /// If none of the edges existed in the graph, then `false` will be
886    /// returned.
887    pub fn disconnect(
888        &mut self,
889        src_node: NodeID,
890        dst_node: NodeID,
891        ports_src_dst: &[(PortIdx, PortIdx)],
892    ) -> bool {
893        self.graph.disconnect(src_node, dst_node, ports_src_dst)
894    }
895
896    /// Remove all connections (edges) between two nodes in the graph.
897    ///
898    /// * `src_node` - The ID of the source node.
899    /// * `dst_node` - The ID of the destination node.
900    pub fn disconnect_all_between(
901        &mut self,
902        src_node: NodeID,
903        dst_node: NodeID,
904    ) -> SmallVec<[EdgeID; 4]> {
905        self.graph.disconnect_all_between(src_node, dst_node)
906    }
907
908    /// Remove a connection (edge) via the edge's unique ID.
909    ///
910    /// If the edge did not exist in this graph, then `false` will be returned.
911    pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
912        self.graph.disconnect_by_edge_id(edge_id)
913    }
914
915    /// Get information about the given [Edge]
916    pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
917        self.graph.edge(edge_id)
918    }
919
920    /// Runs a check to see if a cycle exists in the audio graph.
921    ///
922    /// Note, this method is expensive.
923    pub fn cycle_detected(&mut self) -> bool {
924        self.graph.cycle_detected()
925    }
926
927    /// Queue an event to be sent to an audio node's processor.
928    ///
929    /// Note, this event will not be sent until the event queue is flushed
930    /// in [`FirewheelCtx::update`].
931    pub fn queue_event(&mut self, event: NodeEvent) {
932        self.event_group.push(event);
933    }
934
935    /// Queue an event to be sent to an audio node's processor.
936    ///
937    /// Note, this event will not be sent until the event queue is flushed
938    /// in [`FirewheelCtx::update`].
939    pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
940        self.queue_event(NodeEvent {
941            node_id,
942            #[cfg(feature = "scheduled_events")]
943            time: None,
944            event,
945        });
946    }
947
948    /// Queue an event at a certain time, to be sent to an audio node's processor.
949    ///
950    /// If `time` is `None`, then the event will occur as soon as the node's
951    /// processor receives the event.
952    ///
953    /// Note, this event will not be sent until the event queue is flushed
954    /// in [`FirewheelCtx::update`].
955    #[cfg(feature = "scheduled_events")]
956    pub fn schedule_event_for(
957        &mut self,
958        node_id: NodeID,
959        event: NodeEventType,
960        time: Option<EventInstant>,
961    ) {
962        self.queue_event(NodeEvent {
963            node_id,
964            time,
965            event,
966        });
967    }
968
969    /// Cancel scheduled events for all nodes.
970    ///
971    /// This will clear all events that have been scheduled since the last call to
972    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
973    /// to [`FirewheelCtx::update`] will not be canceled.
974    ///
975    /// This only takes effect once [`FirewheelCtx::update`] is called.
976    #[cfg(feature = "scheduled_events")]
977    pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
978        self.queued_clear_scheduled_events
979            .push(ClearScheduledEventsEvent {
980                node_id: None,
981                event_type,
982            });
983    }
984
985    /// Cancel scheduled events for a specific node.
986    ///
987    /// This will clear all events that have been scheduled since the last call to
988    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
989    /// to [`FirewheelCtx::update`] will not be canceled.
990    ///
991    /// This only takes effect once [`FirewheelCtx::update`] is called.
992    #[cfg(feature = "scheduled_events")]
993    pub fn cancel_scheduled_events_for(
994        &mut self,
995        node_id: NodeID,
996        event_type: ClearScheduledEventsType,
997    ) {
998        self.queued_clear_scheduled_events
999            .push(ClearScheduledEventsEvent {
1000                node_id: Some(node_id),
1001                event_type,
1002            });
1003    }
1004
1005    fn send_message_to_processor(
1006        &mut self,
1007        msg: ContextToProcessorMsg,
1008    ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
1009        self.to_processor_tx
1010            .try_push(msg)
1011            .map_err(|msg| (msg, UpdateError::MsgChannelFull))
1012    }
1013}
1014
1015impl<B: AudioBackend> Drop for FirewheelCtx<B> {
1016    fn drop(&mut self) {
1017        self.stop_stream();
1018
1019        // Wait for the processor to be drop to avoid deallocating it on
1020        // the audio thread.
1021        #[cfg(not(target_family = "wasm"))]
1022        if let Some(drop_rx) = self.processor_drop_rx.take() {
1023            let now = bevy_platform::time::Instant::now();
1024
1025            while drop_rx.try_peek().is_none() {
1026                if now.elapsed() > core::time::Duration::from_secs(2) {
1027                    break;
1028                }
1029
1030                bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
1031            }
1032        }
1033
1034        firewheel_core::collector::GlobalRtGc::collect();
1035    }
1036}
1037
1038impl<B: AudioBackend> FirewheelCtx<B> {
1039    /// Construct an [`ContextQueue`] for diffing.
1040    pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
1041        ContextQueue {
1042            context: self,
1043            id,
1044            #[cfg(feature = "scheduled_events")]
1045            time: None,
1046        }
1047    }
1048
1049    #[cfg(feature = "scheduled_events")]
1050    pub fn event_queue_scheduled(
1051        &mut self,
1052        id: NodeID,
1053        time: Option<EventInstant>,
1054    ) -> ContextQueue<'_, B> {
1055        ContextQueue {
1056            context: self,
1057            id,
1058            time,
1059        }
1060    }
1061}
1062
1063/// An event queue acquired from [`FirewheelCtx::event_queue`].
1064///
1065/// This can help reduce event queue allocations
1066/// when you have direct access to the context.
1067///
1068/// ```
1069/// # use firewheel_core::{diff::{Diff, PathBuilder}, node::NodeID};
1070/// # use firewheel_graph::{backend::AudioBackend, FirewheelCtx, ContextQueue};
1071/// # fn context_queue<B: AudioBackend, D: Diff>(
1072/// #     context: &mut FirewheelCtx<B>,
1073/// #     node_id: NodeID,
1074/// #     params: &D,
1075/// #     baseline: &D,
1076/// # ) {
1077/// // Get a queue that will send events directly to the provided node.
1078/// let mut queue = context.event_queue(node_id);
1079/// // Perform diffing using this queue.
1080/// params.diff(baseline, PathBuilder::default(), &mut queue);
1081/// # }
1082/// ```
1083pub struct ContextQueue<'a, B: AudioBackend> {
1084    context: &'a mut FirewheelCtx<B>,
1085    id: NodeID,
1086    #[cfg(feature = "scheduled_events")]
1087    time: Option<EventInstant>,
1088}
1089
1090#[cfg(feature = "scheduled_events")]
1091impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1092    pub fn time(&self) -> Option<EventInstant> {
1093        self.time
1094    }
1095}
1096
1097impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1098    fn push(&mut self, data: NodeEventType) {
1099        self.context.queue_event(NodeEvent {
1100            event: data,
1101            #[cfg(feature = "scheduled_events")]
1102            time: self.time,
1103            node_id: self.id,
1104        });
1105    }
1106}
1107
1108/// The type of scheduled events to clear
1109#[cfg(feature = "scheduled_events")]
1110#[derive(Default, Debug, Clone, Copy, PartialEq)]
1111pub enum ClearScheduledEventsType {
1112    /// Clear both musical and non-musical scheduled events.
1113    #[default]
1114    All,
1115    /// Clear only non-musical scheduled events.
1116    NonMusicalOnly,
1117    /// Clear only musical scheduled events.
1118    MusicalOnly,
1119}
1120
1121fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1122    clock: &SharedClock<B::Instant>,
1123    active_state: &Option<ActiveState<B>>,
1124) -> Option<(Instant, Duration)> {
1125    active_state.as_ref().and_then(|active_state| {
1126        clock
1127            .process_timestamp
1128            .clone()
1129            .and_then(|process_timestamp| {
1130                active_state
1131                    .backend_handle
1132                    .delay_from_last_process(process_timestamp)
1133                    .and_then(|delay| {
1134                        Instant::now()
1135                            .checked_sub(delay)
1136                            .map(|instant| (instant, delay))
1137                    })
1138            })
1139    })
1140}