Skip to main content

firewheel_graph/
context.rs

1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::node::ProcStore;
9use firewheel_core::{
10    channel_config::{ChannelConfig, ChannelCount},
11    clock::AudioClock,
12    dsp::declick::DeclickValues,
13    event::{NodeEvent, NodeEventType},
14    node::{AudioNode, DynAudioNode, NodeID},
15    StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(not(feature = "std"))]
21use num_traits::Float;
22
23#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
24use bevy_platform::prelude::Box;
25#[cfg(not(feature = "std"))]
26use bevy_platform::prelude::Vec;
27
28use crate::error::RemoveNodeError;
29use crate::processor::BufferOutOfSpaceMode;
30use crate::{
31    backend::AudioBackend,
32    error::{AddEdgeError, StartStreamError, UpdateError},
33    graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
34    processor::{
35        ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
36        SharedClock,
37    },
38};
39
40#[cfg(feature = "scheduled_events")]
41use crate::processor::ClearScheduledEventsEvent;
42#[cfg(feature = "scheduled_events")]
43use firewheel_core::clock::EventInstant;
44
45#[cfg(feature = "musical_transport")]
46use firewheel_core::clock::TransportState;
47
48/// The configuration of a Firewheel context.
49#[derive(Debug, Clone, Copy, PartialEq)]
50#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub struct FirewheelConfig {
53    /// The number of input channels in the audio graph.
54    pub num_graph_inputs: ChannelCount,
55    /// The number of output channels in the audio graph.
56    pub num_graph_outputs: ChannelCount,
57    /// If `true`, then all outputs will be hard clipped at 0db to help
58    /// protect the system's speakers.
59    ///
60    /// Note that most operating systems already hard clip the output,
61    /// so this is usually not needed (TODO: Do research to see if this
62    /// assumption is true.)
63    ///
64    /// By default this is set to `false`.
65    pub hard_clip_outputs: bool,
66    /// An initial capacity to allocate for the nodes in the audio graph.
67    ///
68    /// By default this is set to `64`.
69    pub initial_node_capacity: u32,
70    /// An initial capacity to allocate for the edges in the audio graph.
71    ///
72    /// By default this is set to `256`.
73    pub initial_edge_capacity: u32,
74    /// The amount of time in seconds to fade in/out when pausing/resuming
75    /// to avoid clicks and pops.
76    ///
77    /// By default this is set to `10.0 / 1_000.0`.
78    pub declick_seconds: f32,
79    /// The initial capacity for a group of events.
80    ///
81    /// By default this is set to `128`.
82    pub initial_event_group_capacity: u32,
83    /// The capacity of the engine's internal message channel.
84    ///
85    /// By default this is set to `64`.
86    pub channel_capacity: u32,
87    /// The maximum number of events that can be sent in a single call
88    /// to [`AudioNodeProcessor::process`].
89    ///
90    /// By default this is set to `128`.
91    ///
92    /// [`AudioNodeProcessor::process`]: firewheel_core::node::AudioNodeProcessor::process
93    pub event_queue_capacity: usize,
94    /// The maximum number of immediate events (events that do *NOT* have a
95    /// scheduled time component) that can be stored at once in the audio
96    /// thread.
97    ///
98    /// By default this is set to `512`.
99    pub immediate_event_capacity: usize,
100    /// The maximum number of scheduled events (events that have a scheduled
101    /// time component) that can be stored at once in the audio thread.
102    ///
103    /// This can be set to `0` to save some memory if you do not plan on using
104    /// scheduled events.
105    ///
106    /// By default this is set to `512`.
107    #[cfg(feature = "scheduled_events")]
108    pub scheduled_event_capacity: usize,
109    /// How to handle event buffers on the audio thread running out of space.
110    ///
111    /// By default this is set to [`BufferOutOfSpaceMode::AllocateOnAudioThread`].
112    pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
113
114    /// The configuration of the realtime safe logger.
115    pub logger_config: RealtimeLoggerConfig,
116
117    /// Force all buffers in the audio graph to be cleared when processing.
118    ///
119    /// This is only meant to be used when diagnosing a misbehaving audio node.
120    /// Enabling this significantly increases processing overhead.
121    ///
122    /// If enabling this fixes audio glitching issues, then notify the node
123    /// author of the misbehavior.
124    ///
125    /// By default this is set to `false`.
126    pub debug_force_clear_buffers: bool,
127
128    /// The initial number of slots to allocate for the [`ProcStore`].
129    ///
130    /// By default this is set to `8`.
131    pub proc_store_capacity: usize,
132}
133
134impl Default for FirewheelConfig {
135    fn default() -> Self {
136        Self {
137            num_graph_inputs: ChannelCount::ZERO,
138            num_graph_outputs: ChannelCount::STEREO,
139            hard_clip_outputs: false,
140            initial_node_capacity: 128,
141            initial_edge_capacity: 256,
142            declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
143            initial_event_group_capacity: 128,
144            channel_capacity: 64,
145            event_queue_capacity: 128,
146            immediate_event_capacity: 512,
147            #[cfg(feature = "scheduled_events")]
148            scheduled_event_capacity: 512,
149            buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
150            logger_config: RealtimeLoggerConfig::default(),
151            debug_force_clear_buffers: false,
152            proc_store_capacity: 8,
153        }
154    }
155}
156
157struct ActiveState<B: AudioBackend> {
158    backend_handle: B,
159    stream_info: StreamInfo,
160}
161
162pub(crate) struct ProcessorChannel<B: AudioBackend> {
163    pub(crate) from_context_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
164    pub(crate) to_context_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
165    pub(crate) shared_clock_input: triple_buffer::Input<SharedClock<B::Instant>>,
166    pub(crate) logger: RealtimeLogger,
167    pub(crate) store: ProcStore,
168}
169
170/// A Firewheel context
171pub struct FirewheelCtx<B: AudioBackend> {
172    graph: AudioGraph,
173
174    to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
175    from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
176    logger_rx: RealtimeLoggerMainThread,
177
178    active_state: Option<ActiveState<B>>,
179
180    processor_channel: Option<ProcessorChannel<B>>,
181    processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
182
183    shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
184    sample_rate: NonZeroU32,
185    sample_rate_recip: f64,
186
187    #[cfg(feature = "musical_transport")]
188    transport_state: Box<TransportState>,
189    #[cfg(feature = "musical_transport")]
190    transport_state_alloc_reuse: Option<Box<TransportState>>,
191
192    // Re-use the allocations for groups of events.
193    event_group_pool: Vec<Vec<NodeEvent>>,
194    event_group: Vec<NodeEvent>,
195    initial_event_group_capacity: usize,
196
197    #[cfg(feature = "scheduled_events")]
198    queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
199
200    config: FirewheelConfig,
201}
202
203impl<B: AudioBackend> FirewheelCtx<B> {
204    /// Create a new Firewheel context.
205    pub fn new(config: FirewheelConfig) -> Self {
206        let (to_processor_tx, from_context_rx) =
207            ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
208        let (to_context_tx, from_processor_rx) =
209            ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
210                .split();
211
212        let initial_event_group_capacity = config.initial_event_group_capacity as usize;
213        let mut event_group_pool = Vec::with_capacity(16);
214        for _ in 0..3 {
215            event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
216        }
217
218        let (shared_clock_input, shared_clock_output) =
219            triple_buffer::triple_buffer(&SharedClock::default());
220
221        let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
222
223        let store = ProcStore::with_capacity(config.proc_store_capacity);
224
225        Self {
226            graph: AudioGraph::new(&config),
227            to_processor_tx,
228            from_processor_rx,
229            logger_rx,
230            active_state: None,
231            processor_channel: Some(ProcessorChannel {
232                from_context_rx,
233                to_context_tx,
234                shared_clock_input,
235                logger,
236                store,
237            }),
238            processor_drop_rx: None,
239            shared_clock_output: RefCell::new(shared_clock_output),
240            sample_rate: NonZeroU32::new(44100).unwrap(),
241            sample_rate_recip: 44100.0f64.recip(),
242            #[cfg(feature = "musical_transport")]
243            transport_state: Box::new(TransportState::default()),
244            #[cfg(feature = "musical_transport")]
245            transport_state_alloc_reuse: None,
246            event_group_pool,
247            event_group: Vec::with_capacity(initial_event_group_capacity),
248            initial_event_group_capacity,
249            #[cfg(feature = "scheduled_events")]
250            queued_clear_scheduled_events: Vec::new(),
251            config,
252        }
253    }
254
255    /// Get an immutable reference to the processor store.
256    ///
257    /// If an audio stream is currently running, this will return `None`.
258    pub fn proc_store(&self) -> Option<&ProcStore> {
259        if let Some(proc_channel) = &self.processor_channel {
260            Some(&proc_channel.store)
261        } else if let Some(processor) = self.processor_drop_rx.as_ref().unwrap().last() {
262            if processor.poisoned {
263                panic!("The audio thread has panicked!");
264            }
265
266            Some(&processor.extra.store)
267        } else {
268            None
269        }
270    }
271
272    /// Get a mutable reference to the processor store.
273    ///
274    /// If an audio stream is currently running, this will return `None`.
275    pub fn proc_store_mut(&mut self) -> Option<&mut ProcStore> {
276        if let Some(proc_channel) = &mut self.processor_channel {
277            Some(&mut proc_channel.store)
278        } else if let Some(processor) = self.processor_drop_rx.as_mut().unwrap().last_mut() {
279            if processor.poisoned {
280                panic!("The audio thread has panicked!");
281            }
282
283            Some(&mut processor.extra.store)
284        } else {
285            None
286        }
287    }
288
289    /// Get a reference to the currently active instance of the backend. Returns `None` if the backend has not
290    /// yet been initialized with `start_stream`.
291    pub fn active_backend(&self) -> Option<&B> {
292        self.active_state
293            .as_ref()
294            .map(|state| &state.backend_handle)
295    }
296
297    /// Get a mutable reference to the currently active instance of the backend. Returns `None` if the backend has not
298    /// yet been initialized with `start_stream`.
299    pub fn active_backend_mut(&mut self) -> Option<&mut B> {
300        self.active_state
301            .as_mut()
302            .map(|state| &mut state.backend_handle)
303    }
304
305    /// Get a struct used to retrieve the list of available audio devices
306    /// on the system and their available ocnfigurations.
307    pub fn device_enumerator(&self) -> B::Enumerator {
308        B::enumerator()
309    }
310
311    /// Returns `true` if an audio stream can be started right now.
312    ///
313    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
314    /// old stream to be fully stopped. This method is used to check if it has been
315    /// dropped yet.
316    ///
317    /// Note, in rare cases where the audio thread crashes without cleanly dropping
318    /// its contents, this may never return `true`. Consider adding a timeout to
319    /// avoid deadlocking.
320    pub fn can_start_stream(&self) -> bool {
321        if self.is_audio_stream_running() {
322            false
323        } else if let Some(rx) = &self.processor_drop_rx {
324            rx.try_peek().is_some()
325        } else {
326            true
327        }
328    }
329
330    /// Start an audio stream for this context. Only one audio stream can exist on
331    /// a context at a time.
332    ///
333    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
334    /// old stream to be fully stopped. Use [`FirewheelCtx::can_start_stream`] to
335    /// check if it has been dropped yet.
336    ///
337    /// Note, in rare cases where the audio thread crashes without cleanly dropping
338    /// its contents, this may never succeed. Consider adding a timeout to avoid
339    /// deadlocking.
340    pub fn start_stream(
341        &mut self,
342        config: B::Config,
343    ) -> Result<(), StartStreamError<B::StartStreamError>> {
344        if self.is_audio_stream_running() {
345            return Err(StartStreamError::AlreadyStarted);
346        }
347
348        if !self.can_start_stream() {
349            return Err(StartStreamError::OldStreamNotFinishedStopping);
350        }
351
352        let (mut backend_handle, mut stream_info) =
353            B::start_stream(config).map_err(StartStreamError::BackendError)?;
354
355        stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
356        stream_info.declick_frames = NonZeroU32::new(
357            (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
358        )
359        .unwrap_or(NonZeroU32::MIN);
360
361        let maybe_processor = self.processor_channel.take();
362
363        stream_info.prev_sample_rate = if maybe_processor.is_some() {
364            stream_info.sample_rate
365        } else {
366            self.sample_rate
367        };
368
369        self.sample_rate = stream_info.sample_rate;
370        self.sample_rate_recip = stream_info.sample_rate_recip;
371
372        let schedule = self.graph.compile(&stream_info)?;
373
374        let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
375
376        let processor = if let Some(proc_channel) = maybe_processor {
377            FirewheelProcessorInner::new(
378                proc_channel,
379                self.config.immediate_event_capacity,
380                #[cfg(feature = "scheduled_events")]
381                self.config.scheduled_event_capacity,
382                self.config.event_queue_capacity,
383                &stream_info,
384                self.config.hard_clip_outputs,
385                self.config.buffer_out_of_space_mode,
386                self.config.debug_force_clear_buffers,
387            )
388        } else {
389            let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
390
391            if processor.poisoned {
392                panic!("The audio thread has panicked!");
393            }
394
395            processor.new_stream(&stream_info);
396
397            processor
398        };
399
400        backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
401
402        if self
403            .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
404            .is_err()
405        {
406            panic!("Firewheel message channel is full!");
407        }
408
409        self.active_state = Some(ActiveState {
410            backend_handle,
411            stream_info,
412        });
413        self.processor_drop_rx = Some(drop_rx);
414
415        Ok(())
416    }
417
418    /// Stop the audio stream in this context.
419    pub fn stop_stream(&mut self) {
420        // When the backend handle is dropped, the backend will automatically
421        // stop its stream.
422        self.active_state = None;
423        self.graph.deactivate();
424    }
425
426    /// Returns `true` if there is currently a running audio stream.
427    pub fn is_audio_stream_running(&self) -> bool {
428        self.active_state.is_some()
429    }
430
431    /// Information about the running audio stream.
432    ///
433    /// Returns `None` if no audio stream is currently running.
434    pub fn stream_info(&self) -> Option<&StreamInfo> {
435        self.active_state.as_ref().map(|s| &s.stream_info)
436    }
437
438    /// Get the current time of the audio clock, without accounting for the delay
439    /// between when the clock was last updated and now.
440    ///
441    /// For most use cases you probably want to use [`FirewheelCtx::audio_clock_corrected`]
442    /// instead, but this method is provided if needed.
443    ///
444    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
445    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
446    /// that has been processed.) For applications where the timing of audio events is
447    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
448    /// OS's clock (`Instant::now()`).
449    ///
450    /// Note, calling this method is not super cheap, so avoid calling it many
451    /// times within the same game loop iteration if possible.
452    pub fn audio_clock(&self) -> AudioClock {
453        // Reading the latest value of the clock doesn't meaningfully mutate
454        // state, so treat it as an immutable operation with interior mutability.
455        //
456        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
457        // will never panic.
458        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
459        let clock = clock_borrowed.read();
460
461        let update_instant = audio_clock_update_instant_and_delay(clock, &self.active_state)
462            .map(|(update_instant, _delay)| update_instant);
463
464        AudioClock {
465            samples: clock.clock_samples,
466            seconds: clock
467                .clock_samples
468                .to_seconds(self.sample_rate, self.sample_rate_recip),
469            #[cfg(feature = "musical_transport")]
470            musical: clock.current_playhead,
471            #[cfg(feature = "musical_transport")]
472            transport_is_playing: clock.transport_is_playing,
473            update_instant,
474        }
475    }
476
477    /// Get the current time of the audio clock.
478    ///
479    /// Unlike, [`FirewheelCtx::audio_clock`], this method accounts for the delay
480    /// between when the audio clock was last updated and now, leading to a more
481    /// accurate result for games and other applications.
482    ///
483    /// If the delay could not be determined (i.e. an audio stream is not currently
484    /// running), then this will assume there was no delay between when the audio
485    /// clock was last updated and now.
486    ///
487    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
488    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
489    /// that has been processed.) For applications where the timing of audio events is
490    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
491    /// OS's clock (`Instant::now()`).
492    ///
493    /// Note, calling this method is not super cheap, so avoid calling it many
494    /// times within the same game loop iteration if possible.
495    pub fn audio_clock_corrected(&self) -> AudioClock {
496        // Reading the latest value of the clock doesn't meaningfully mutate
497        // state, so treat it as an immutable operation with interior mutability.
498        //
499        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
500        // will never panic.
501        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
502        let clock = clock_borrowed.read();
503
504        let Some((update_instant, delay)) =
505            audio_clock_update_instant_and_delay(clock, &self.active_state)
506        else {
507            // The audio thread is not currently running, so just return the
508            // latest value of the clock.
509            return AudioClock {
510                samples: clock.clock_samples,
511                seconds: clock
512                    .clock_samples
513                    .to_seconds(self.sample_rate, self.sample_rate_recip),
514                #[cfg(feature = "musical_transport")]
515                musical: clock.current_playhead,
516                #[cfg(feature = "musical_transport")]
517                transport_is_playing: clock.transport_is_playing,
518                update_instant: None,
519            };
520        };
521
522        // Account for the delay between when the clock was last updated and now.
523        let delta_seconds = DurationSeconds(delay.as_secs_f64());
524
525        let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
526
527        #[cfg(feature = "musical_transport")]
528        let musical = clock.current_playhead.map(|musical_time| {
529            if clock.transport_is_playing && self.transport_state.transport.is_some() {
530                self.transport_state
531                    .transport
532                    .as_ref()
533                    .unwrap()
534                    .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
535            } else {
536                musical_time
537            }
538        });
539
540        AudioClock {
541            samples,
542            seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
543            #[cfg(feature = "musical_transport")]
544            musical,
545            #[cfg(feature = "musical_transport")]
546            transport_is_playing: clock.transport_is_playing,
547            update_instant: Some(update_instant),
548        }
549    }
550
551    /// Get the instant the audio clock was last updated.
552    ///
553    /// This method accounts for the delay between when the audio clock was last
554    /// updated and now, leading to a more accurate result for games and other
555    /// applications.
556    ///
557    /// If the audio thread is not currently running, or if the delay could not
558    /// be determined for any other reason, then this will return `None`.
559    ///
560    /// Note, calling this method is not super cheap, so avoid calling it many
561    /// times within the same game loop iteration if possible.
562    pub fn audio_clock_instant(&self) -> Option<Instant> {
563        // Reading the latest value of the clock doesn't meaningfully mutate
564        // state, so treat it as an immutable operation with interior mutability.
565        //
566        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
567        // will never panic.
568        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
569        let clock = clock_borrowed.read();
570
571        audio_clock_update_instant_and_delay(clock, &self.active_state)
572            .map(|(update_instant, _delay)| update_instant)
573    }
574
575    /// Sync the state of the musical transport.
576    ///
577    /// If the message channel is full, then this will return an error.
578    #[cfg(feature = "musical_transport")]
579    pub fn sync_transport(
580        &mut self,
581        transport: &TransportState,
582    ) -> Result<(), UpdateError<B::StreamError>> {
583        if &*self.transport_state != transport {
584            let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
585                *t = transport.clone();
586                t
587            } else {
588                Box::new(transport.clone())
589            };
590
591            self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
592                .map_err(|(_, e)| e)?;
593
594            *self.transport_state = transport.clone();
595        }
596
597        Ok(())
598    }
599
600    /// Get the current transport state.
601    #[cfg(feature = "musical_transport")]
602    pub fn transport_state(&self) -> &TransportState {
603        &self.transport_state
604    }
605
606    /// Get the current transport state.
607    #[cfg(feature = "musical_transport")]
608    pub fn transport(&self) -> &TransportState {
609        &self.transport_state
610    }
611
612    /// Whether or not outputs are being hard clipped at 0dB.
613    pub fn hard_clip_outputs(&self) -> bool {
614        self.config.hard_clip_outputs
615    }
616
617    /// Set whether or not outputs should be hard clipped at 0dB to
618    /// help protect the system's speakers.
619    ///
620    /// Note that most operating systems already hard clip the output,
621    /// so this is usually not needed (TODO: Do research to see if this
622    /// assumption is true.)
623    ///
624    /// If the message channel is full, then this will return an error.
625    pub fn set_hard_clip_outputs(
626        &mut self,
627        hard_clip_outputs: bool,
628    ) -> Result<(), UpdateError<B::StreamError>> {
629        if self.config.hard_clip_outputs == hard_clip_outputs {
630            return Ok(());
631        }
632        self.config.hard_clip_outputs = hard_clip_outputs;
633
634        self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
635            .map_err(|(_, e)| e)
636    }
637
638    /// Update the firewheel context.
639    ///
640    /// This must be called reguarly (i.e. once every frame).
641    pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
642        self.logger_rx.flush(
643            |msg| {
644                #[cfg(feature = "tracing")]
645                tracing::error!("{}", msg);
646
647                #[cfg(all(feature = "log", not(feature = "tracing")))]
648                log::error!("{}", msg);
649
650                let _ = msg;
651            },
652            #[cfg(debug_assertions)]
653            |msg| {
654                #[cfg(feature = "tracing")]
655                tracing::debug!("{}", msg);
656
657                #[cfg(all(feature = "log", not(feature = "tracing")))]
658                log::debug!("{}", msg);
659
660                let _ = msg;
661            },
662        );
663
664        firewheel_core::collector::GlobalRtGc::collect();
665
666        for msg in self.from_processor_rx.pop_iter() {
667            match msg {
668                ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
669                    event_group.clear();
670                    self.event_group_pool.push(event_group);
671                }
672                ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
673                    let _ = schedule_data;
674                }
675                #[cfg(feature = "musical_transport")]
676                ProcessorToContextMsg::ReturnTransportState(transport_state) => {
677                    if self.transport_state_alloc_reuse.is_none() {
678                        self.transport_state_alloc_reuse = Some(transport_state);
679                    }
680                }
681                #[cfg(feature = "scheduled_events")]
682                ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
683                    let _ = msgs;
684                }
685            }
686        }
687
688        self.graph.update(
689            self.active_state.as_ref().map(|s| &s.stream_info),
690            &mut self.event_group,
691        );
692
693        if let Some(active_state) = &mut self.active_state {
694            if let Err(e) = active_state.backend_handle.poll_status() {
695                self.active_state = None;
696                self.graph.deactivate();
697
698                return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
699            }
700
701            if self
702                .processor_drop_rx
703                .as_ref()
704                .unwrap()
705                .try_peek()
706                .is_some()
707            {
708                self.active_state = None;
709                self.graph.deactivate();
710
711                return Err(UpdateError::StreamStoppedUnexpectedly(None));
712            }
713        }
714
715        if self.is_audio_stream_running() {
716            if self.graph.needs_compile() {
717                let schedule_data = self
718                    .graph
719                    .compile(&self.active_state.as_ref().unwrap().stream_info)?;
720
721                if let Err((msg, e)) = self
722                    .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
723                {
724                    let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
725                        unreachable!();
726                    };
727
728                    self.graph.on_schedule_send_failed(schedule);
729
730                    return Err(e);
731                }
732            }
733
734            #[cfg(feature = "scheduled_events")]
735            if !self.queued_clear_scheduled_events.is_empty() {
736                let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
737                    self.queued_clear_scheduled_events.drain(..).collect();
738
739                if let Err((msg, e)) = self
740                    .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
741                {
742                    let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
743                        unreachable!();
744                    };
745
746                    self.queued_clear_scheduled_events = msgs.drain(..).collect();
747
748                    return Err(e);
749                }
750            }
751
752            if !self.event_group.is_empty() {
753                let mut next_event_group = self
754                    .event_group_pool
755                    .pop()
756                    .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
757                core::mem::swap(&mut next_event_group, &mut self.event_group);
758
759                if let Err((msg, e)) = self
760                    .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
761                {
762                    let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
763                        unreachable!();
764                    };
765
766                    core::mem::swap(&mut event_group, &mut self.event_group);
767                    self.event_group_pool.push(event_group);
768
769                    return Err(e);
770                }
771            }
772        }
773
774        Ok(())
775    }
776
777    /// The ID of the graph input node
778    pub fn graph_in_node_id(&self) -> NodeID {
779        self.graph.graph_in_node()
780    }
781
782    /// The ID of the graph output node
783    pub fn graph_out_node_id(&self) -> NodeID {
784        self.graph.graph_out_node()
785    }
786
787    /// Add a node to the audio graph.
788    pub fn add_node<T: AudioNode + 'static>(
789        &mut self,
790        node: T,
791        config: Option<T::Configuration>,
792    ) -> NodeID {
793        self.graph.add_node(node, config)
794    }
795
796    /// Add a node to the audio graph which implements the type-erased [`DynAudioNode`] trait.
797    pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
798        self.graph.add_dyn_node(node)
799    }
800
801    /// Remove the given node from the audio graph.
802    ///
803    /// This will automatically remove all edges from the graph that
804    /// were connected to this node.
805    ///
806    /// On success, this returns a list of all edges that were removed
807    /// from the graph as a result of removing this node.
808    ///
809    /// This will return an error if the ID is of the graph input or graph
810    /// output node.
811    pub fn remove_node(
812        &mut self,
813        node_id: NodeID,
814    ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
815        self.graph.remove_node(node_id)
816    }
817
818    /// Get information about a node in the graph.
819    pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
820        self.graph.node_info(id)
821    }
822
823    /// Get an immutable reference to the custom state of a node.
824    pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
825        self.graph.node_state(id)
826    }
827
828    /// Get a type-erased, immutable reference to the custom state of a node.
829    pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
830        self.graph.node_state_dyn(id)
831    }
832
833    /// Get a mutable reference to the custom state of a node.
834    pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
835        self.graph.node_state_mut(id)
836    }
837
838    pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
839        self.graph.node_state_dyn_mut(id)
840    }
841
842    /// Get a list of all the existing nodes in the graph.
843    pub fn nodes(&self) -> impl Iterator<Item = &NodeEntry> {
844        self.graph.nodes()
845    }
846
847    /// Get a list of all the existing edges in the graph.
848    pub fn edges(&self) -> impl Iterator<Item = &Edge> {
849        self.graph.edges()
850    }
851
852    /// Set the number of input and output channels to and from the audio graph.
853    ///
854    /// Returns the list of edges that were removed.
855    pub fn set_graph_channel_config(
856        &mut self,
857        channel_config: ChannelConfig,
858    ) -> SmallVec<[EdgeID; 4]> {
859        self.graph.set_graph_channel_config(channel_config)
860    }
861
862    /// Add connections (edges) between two nodes to the graph.
863    ///
864    /// * `src_node` - The ID of the source node.
865    /// * `dst_node` - The ID of the destination node.
866    /// * `ports_src_dst` - The port indices for each connection to make,
867    ///   where the first value in a tuple is the output port on `src_node`,
868    ///   and the second value in that tuple is the input port on `dst_node`.
869    /// * `check_for_cycles` - If `true`, then this will run a check to
870    ///   see if adding these edges will create a cycle in the graph, and
871    ///   return an error if it does. Note, checking for cycles can be quite
872    ///   expensive, so avoid enabling this when calling this method many times
873    ///   in a row.
874    ///
875    /// If successful, then this returns a list of edge IDs in order.
876    ///
877    /// If this returns an error, then the audio graph has not been
878    /// modified.
879    pub fn connect(
880        &mut self,
881        src_node: NodeID,
882        dst_node: NodeID,
883        ports_src_dst: &[(PortIdx, PortIdx)],
884        check_for_cycles: bool,
885    ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
886        self.graph
887            .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
888    }
889
890    /// Remove connections (edges) between two nodes from the graph.
891    ///
892    /// * `src_node` - The ID of the source node.
893    /// * `dst_node` - The ID of the destination node.
894    /// * `ports_src_dst` - The port indices for each connection to make,
895    ///   where the first value in a tuple is the output port on `src_node`,
896    ///   and the second value in that tuple is the input port on `dst_node`.
897    ///
898    /// If none of the edges existed in the graph, then `false` will be
899    /// returned.
900    pub fn disconnect(
901        &mut self,
902        src_node: NodeID,
903        dst_node: NodeID,
904        ports_src_dst: &[(PortIdx, PortIdx)],
905    ) -> bool {
906        self.graph.disconnect(src_node, dst_node, ports_src_dst)
907    }
908
909    /// Remove all connections (edges) between two nodes in the graph.
910    ///
911    /// * `src_node` - The ID of the source node.
912    /// * `dst_node` - The ID of the destination node.
913    pub fn disconnect_all_between(
914        &mut self,
915        src_node: NodeID,
916        dst_node: NodeID,
917    ) -> SmallVec<[EdgeID; 4]> {
918        self.graph.disconnect_all_between(src_node, dst_node)
919    }
920
921    /// Remove a connection (edge) via the edge's unique ID.
922    ///
923    /// If the edge did not exist in this graph, then `false` will be returned.
924    pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
925        self.graph.disconnect_by_edge_id(edge_id)
926    }
927
928    /// Get information about the given [Edge]
929    pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
930        self.graph.edge(edge_id)
931    }
932
933    /// Runs a check to see if a cycle exists in the audio graph.
934    ///
935    /// Note, this method is expensive.
936    pub fn cycle_detected(&mut self) -> bool {
937        self.graph.cycle_detected()
938    }
939
940    /// Queue an event to be sent to an audio node's processor.
941    ///
942    /// Note, this event will not be sent until the event queue is flushed
943    /// in [`FirewheelCtx::update`].
944    pub fn queue_event(&mut self, event: NodeEvent) {
945        self.event_group.push(event);
946    }
947
948    /// Queue an event to be sent to an audio node's processor.
949    ///
950    /// Note, this event will not be sent until the event queue is flushed
951    /// in [`FirewheelCtx::update`].
952    pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
953        self.queue_event(NodeEvent {
954            node_id,
955            #[cfg(feature = "scheduled_events")]
956            time: None,
957            event,
958        });
959    }
960
961    /// Queue an event at a certain time, to be sent to an audio node's processor.
962    ///
963    /// If `time` is `None`, then the event will occur as soon as the node's
964    /// processor receives the event.
965    ///
966    /// Note, this event will not be sent until the event queue is flushed
967    /// in [`FirewheelCtx::update`].
968    #[cfg(feature = "scheduled_events")]
969    pub fn schedule_event_for(
970        &mut self,
971        node_id: NodeID,
972        event: NodeEventType,
973        time: Option<EventInstant>,
974    ) {
975        self.queue_event(NodeEvent {
976            node_id,
977            time,
978            event,
979        });
980    }
981
982    /// Cancel scheduled events for all nodes.
983    ///
984    /// This will clear all events that have been scheduled since the last call to
985    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
986    /// to [`FirewheelCtx::update`] will not be canceled.
987    ///
988    /// This only takes effect once [`FirewheelCtx::update`] is called.
989    #[cfg(feature = "scheduled_events")]
990    pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
991        self.queued_clear_scheduled_events
992            .push(ClearScheduledEventsEvent {
993                node_id: None,
994                event_type,
995            });
996    }
997
998    /// Cancel scheduled events for a specific node.
999    ///
1000    /// This will clear all events that have been scheduled since the last call to
1001    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
1002    /// to [`FirewheelCtx::update`] will not be canceled.
1003    ///
1004    /// This only takes effect once [`FirewheelCtx::update`] is called.
1005    #[cfg(feature = "scheduled_events")]
1006    pub fn cancel_scheduled_events_for(
1007        &mut self,
1008        node_id: NodeID,
1009        event_type: ClearScheduledEventsType,
1010    ) {
1011        self.queued_clear_scheduled_events
1012            .push(ClearScheduledEventsEvent {
1013                node_id: Some(node_id),
1014                event_type,
1015            });
1016    }
1017
1018    fn send_message_to_processor(
1019        &mut self,
1020        msg: ContextToProcessorMsg,
1021    ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
1022        self.to_processor_tx
1023            .try_push(msg)
1024            .map_err(|msg| (msg, UpdateError::MsgChannelFull))
1025    }
1026}
1027
1028impl<B: AudioBackend> Drop for FirewheelCtx<B> {
1029    fn drop(&mut self) {
1030        self.stop_stream();
1031
1032        // Wait for the processor to be drop to avoid deallocating it on
1033        // the audio thread.
1034        #[cfg(not(target_family = "wasm"))]
1035        if let Some(drop_rx) = self.processor_drop_rx.take() {
1036            let now = bevy_platform::time::Instant::now();
1037
1038            while drop_rx.try_peek().is_none() {
1039                if now.elapsed() > core::time::Duration::from_secs(2) {
1040                    break;
1041                }
1042
1043                bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
1044            }
1045        }
1046
1047        firewheel_core::collector::GlobalRtGc::collect();
1048    }
1049}
1050
1051impl<B: AudioBackend> FirewheelCtx<B> {
1052    /// Construct an [`ContextQueue`] for diffing.
1053    pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
1054        ContextQueue {
1055            context: self,
1056            id,
1057            #[cfg(feature = "scheduled_events")]
1058            time: None,
1059        }
1060    }
1061
1062    #[cfg(feature = "scheduled_events")]
1063    pub fn event_queue_scheduled(
1064        &mut self,
1065        id: NodeID,
1066        time: Option<EventInstant>,
1067    ) -> ContextQueue<'_, B> {
1068        ContextQueue {
1069            context: self,
1070            id,
1071            time,
1072        }
1073    }
1074}
1075
1076/// An event queue acquired from [`FirewheelCtx::event_queue`].
1077///
1078/// This can help reduce event queue allocations
1079/// when you have direct access to the context.
1080///
1081/// ```
1082/// # use firewheel_core::{diff::{Diff, PathBuilder}, node::NodeID};
1083/// # use firewheel_graph::{backend::AudioBackend, FirewheelCtx, ContextQueue};
1084/// # fn context_queue<B: AudioBackend, D: Diff>(
1085/// #     context: &mut FirewheelCtx<B>,
1086/// #     node_id: NodeID,
1087/// #     params: &D,
1088/// #     baseline: &D,
1089/// # ) {
1090/// // Get a queue that will send events directly to the provided node.
1091/// let mut queue = context.event_queue(node_id);
1092/// // Perform diffing using this queue.
1093/// params.diff(baseline, PathBuilder::default(), &mut queue);
1094/// # }
1095/// ```
1096pub struct ContextQueue<'a, B: AudioBackend> {
1097    context: &'a mut FirewheelCtx<B>,
1098    id: NodeID,
1099    #[cfg(feature = "scheduled_events")]
1100    time: Option<EventInstant>,
1101}
1102
1103#[cfg(feature = "scheduled_events")]
1104impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1105    pub fn time(&self) -> Option<EventInstant> {
1106        self.time
1107    }
1108}
1109
1110impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1111    fn push(&mut self, data: NodeEventType) {
1112        self.context.queue_event(NodeEvent {
1113            event: data,
1114            #[cfg(feature = "scheduled_events")]
1115            time: self.time,
1116            node_id: self.id,
1117        });
1118    }
1119}
1120
1121/// The type of scheduled events to clear
1122#[cfg(feature = "scheduled_events")]
1123#[derive(Default, Debug, Clone, Copy, PartialEq)]
1124pub enum ClearScheduledEventsType {
1125    /// Clear both musical and non-musical scheduled events.
1126    #[default]
1127    All,
1128    /// Clear only non-musical scheduled events.
1129    NonMusicalOnly,
1130    /// Clear only musical scheduled events.
1131    MusicalOnly,
1132}
1133
1134fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1135    clock: &SharedClock<B::Instant>,
1136    active_state: &Option<ActiveState<B>>,
1137) -> Option<(Instant, Duration)> {
1138    active_state.as_ref().and_then(|active_state| {
1139        clock
1140            .process_timestamp
1141            .clone()
1142            .and_then(|process_timestamp| {
1143                active_state
1144                    .backend_handle
1145                    .delay_from_last_process(process_timestamp)
1146                    .and_then(|delay| {
1147                        Instant::now()
1148                            .checked_sub(delay)
1149                            .map(|instant| (instant, delay))
1150                    })
1151            })
1152    })
1153}