firewheel_graph/
context.rs

1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::{
9    channel_config::{ChannelConfig, ChannelCount},
10    clock::AudioClock,
11    collector::Collector,
12    dsp::declick::DeclickValues,
13    event::{NodeEvent, NodeEventType},
14    node::{AudioNode, DynAudioNode, NodeID},
15    StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
21use bevy_platform::prelude::Box;
22#[cfg(not(feature = "std"))]
23use bevy_platform::prelude::Vec;
24
25use crate::error::RemoveNodeError;
26use crate::processor::BufferOutOfSpaceMode;
27use crate::{
28    backend::{AudioBackend, DeviceInfo},
29    error::{AddEdgeError, StartStreamError, UpdateError},
30    graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
31    processor::{
32        ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
33        SharedClock,
34    },
35};
36
37#[cfg(feature = "scheduled_events")]
38use crate::processor::ClearScheduledEventsEvent;
39#[cfg(feature = "scheduled_events")]
40use firewheel_core::clock::EventInstant;
41
42#[cfg(feature = "musical_transport")]
43use firewheel_core::clock::TransportState;
44
45/// The configuration of a Firewheel context.
46#[derive(Debug, Clone, Copy, PartialEq)]
47pub struct FirewheelConfig {
48    /// The number of input channels in the audio graph.
49    pub num_graph_inputs: ChannelCount,
50    /// The number of output channels in the audio graph.
51    pub num_graph_outputs: ChannelCount,
52    /// If `true`, then all outputs will be hard clipped at 0db to help
53    /// protect the system's speakers.
54    ///
55    /// Note that most operating systems already hard clip the output,
56    /// so this is usually not needed (TODO: Do research to see if this
57    /// assumption is true.)
58    ///
59    /// By default this is set to `false`.
60    pub hard_clip_outputs: bool,
61    /// An initial capacity to allocate for the nodes in the audio graph.
62    ///
63    /// By default this is set to `64`.
64    pub initial_node_capacity: u32,
65    /// An initial capacity to allocate for the edges in the audio graph.
66    ///
67    /// By default this is set to `256`.
68    pub initial_edge_capacity: u32,
69    /// The amount of time in seconds to fade in/out when pausing/resuming
70    /// to avoid clicks and pops.
71    ///
72    /// By default this is set to `10.0 / 1_000.0`.
73    pub declick_seconds: f32,
74    /// The initial capacity for a group of events.
75    ///
76    /// By default this is set to `128`.
77    pub initial_event_group_capacity: u32,
78    /// The capacity of the engine's internal message channel.
79    ///
80    /// By default this is set to `64`.
81    pub channel_capacity: u32,
82    /// The maximum number of events that can be sent in a single call
83    /// to [`AudioNodeProcessor::process`].
84    ///
85    /// By default this is set to `128`.
86    ///
87    /// [`AudioNodeProcessor::process`]: firewheel_core::node::AudioNodeProcessor::process
88    pub event_queue_capacity: usize,
89    /// The maximum number of immediate events (events that do *NOT* have a
90    /// scheduled time component) that can be stored at once in the audio
91    /// thread.
92    ///
93    /// By default this is set to `512`.
94    pub immediate_event_capacity: usize,
95    /// The maximum number of scheduled events (events that have a scheduled
96    /// time component) that can be stored at once in the audio thread.
97    ///
98    /// This can be set to `0` to save some memory if you do not plan on using
99    /// scheduled events.
100    ///
101    /// By default this is set to `512`.
102    #[cfg(feature = "scheduled_events")]
103    pub scheduled_event_capacity: usize,
104    /// How to handle event buffers on the audio thread running out of space.
105    ///
106    /// By default this is set to [`BufferOutOfSpaceMode::AllocateOnAudioThread`].
107    pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
108
109    /// The configuration of the realtime safe logger.
110    pub logger_config: RealtimeLoggerConfig,
111}
112
113impl Default for FirewheelConfig {
114    fn default() -> Self {
115        Self {
116            num_graph_inputs: ChannelCount::ZERO,
117            num_graph_outputs: ChannelCount::STEREO,
118            hard_clip_outputs: false,
119            initial_node_capacity: 128,
120            initial_edge_capacity: 256,
121            declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
122            initial_event_group_capacity: 128,
123            channel_capacity: 64,
124            event_queue_capacity: 128,
125            immediate_event_capacity: 512,
126            #[cfg(feature = "scheduled_events")]
127            scheduled_event_capacity: 512,
128            buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
129            logger_config: RealtimeLoggerConfig::default(),
130        }
131    }
132}
133
134struct ActiveState<B: AudioBackend> {
135    backend_handle: B,
136    stream_info: StreamInfo,
137}
138
139/// A Firewheel context
140pub struct FirewheelCtx<B: AudioBackend> {
141    graph: AudioGraph,
142
143    to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
144    from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
145    logger_rx: RealtimeLoggerMainThread,
146
147    active_state: Option<ActiveState<B>>,
148
149    processor_channel: Option<(
150        ringbuf::HeapCons<ContextToProcessorMsg>,
151        ringbuf::HeapProd<ProcessorToContextMsg>,
152        triple_buffer::Input<SharedClock<B::Instant>>,
153        RealtimeLogger,
154    )>,
155    processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
156
157    shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
158    sample_rate: NonZeroU32,
159    sample_rate_recip: f64,
160
161    #[cfg(feature = "musical_transport")]
162    transport_state: Box<TransportState>,
163    #[cfg(feature = "musical_transport")]
164    transport_state_alloc_reuse: Option<Box<TransportState>>,
165
166    // Re-use the allocations for groups of events.
167    event_group_pool: Vec<Vec<NodeEvent>>,
168    event_group: Vec<NodeEvent>,
169    initial_event_group_capacity: usize,
170
171    #[cfg(feature = "scheduled_events")]
172    queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
173
174    config: FirewheelConfig,
175}
176
177impl<B: AudioBackend> FirewheelCtx<B> {
178    /// Create a new Firewheel context.
179    pub fn new(config: FirewheelConfig) -> Self {
180        let (to_processor_tx, from_context_rx) =
181            ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
182        let (to_context_tx, from_processor_rx) =
183            ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
184                .split();
185
186        let initial_event_group_capacity = config.initial_event_group_capacity as usize;
187        let mut event_group_pool = Vec::with_capacity(16);
188        for _ in 0..3 {
189            event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
190        }
191
192        let (shared_clock_input, shared_clock_output) =
193            triple_buffer::triple_buffer(&SharedClock::default());
194
195        let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
196
197        Self {
198            graph: AudioGraph::new(&config),
199            to_processor_tx,
200            from_processor_rx,
201            logger_rx,
202            active_state: None,
203            processor_channel: Some((from_context_rx, to_context_tx, shared_clock_input, logger)),
204            processor_drop_rx: None,
205            shared_clock_output: RefCell::new(shared_clock_output),
206            sample_rate: NonZeroU32::new(44100).unwrap(),
207            sample_rate_recip: 44100.0f64.recip(),
208            #[cfg(feature = "musical_transport")]
209            transport_state: Box::new(TransportState::default()),
210            #[cfg(feature = "musical_transport")]
211            transport_state_alloc_reuse: None,
212            event_group_pool,
213            event_group: Vec::with_capacity(initial_event_group_capacity),
214            initial_event_group_capacity,
215            #[cfg(feature = "scheduled_events")]
216            queued_clear_scheduled_events: Vec::new(),
217            config,
218        }
219    }
220
221    /// Get a reference to the currently active instance of the backend. Returns `None` if the backend has not
222    /// yet been initialized with `start_stream`.
223    pub fn active_backend(&self) -> Option<&B> {
224        self.active_state
225            .as_ref()
226            .map(|state| &state.backend_handle)
227    }
228
229    /// Get a mutable reference to the currently active instance of the backend. Returns `None` if the backend has not
230    /// yet been initialized with `start_stream`.
231    pub fn active_backend_mut(&mut self) -> Option<&mut B> {
232        self.active_state
233            .as_mut()
234            .map(|state| &mut state.backend_handle)
235    }
236
237    /// Get a list of the available audio input devices.
238    pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
239        B::available_input_devices()
240    }
241
242    /// Get a list of the available audio output devices.
243    pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
244        B::available_output_devices()
245    }
246
247    /// Returns `true` if an audio stream can be started right now.
248    ///
249    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
250    /// old stream to be fully stopped. This method is used to check if it has been
251    /// dropped yet.
252    ///
253    /// Note, in rare cases where the audio thread crashes without cleanly dropping
254    /// its contents, this may never return `true`. Consider adding a timeout to
255    /// avoid deadlocking.
256    pub fn can_start_stream(&self) -> bool {
257        if self.is_audio_stream_running() {
258            false
259        } else if let Some(rx) = &self.processor_drop_rx {
260            rx.try_peek().is_some()
261        } else {
262            true
263        }
264    }
265
266    /// Start an audio stream for this context. Only one audio stream can exist on
267    /// a context at a time.
268    ///
269    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
270    /// old stream to be fully stopped. Use [`FirewheelCtx::can_start_stream`] to
271    /// check if it has been dropped yet.
272    ///
273    /// Note, in rare cases where the audio thread crashes without cleanly dropping
274    /// its contents, this may never succeed. Consider adding a timeout to avoid
275    /// deadlocking.
276    pub fn start_stream(
277        &mut self,
278        config: B::Config,
279    ) -> Result<(), StartStreamError<B::StartStreamError>> {
280        if self.is_audio_stream_running() {
281            return Err(StartStreamError::AlreadyStarted);
282        }
283
284        if !self.can_start_stream() {
285            return Err(StartStreamError::OldStreamNotFinishedStopping);
286        }
287
288        let (mut backend_handle, mut stream_info) =
289            B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
290
291        stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
292        stream_info.declick_frames = NonZeroU32::new(
293            (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
294        )
295        .unwrap_or(NonZeroU32::MIN);
296
297        let maybe_processor = self.processor_channel.take();
298
299        stream_info.prev_sample_rate = if maybe_processor.is_some() {
300            stream_info.sample_rate
301        } else {
302            self.sample_rate
303        };
304
305        self.sample_rate = stream_info.sample_rate;
306        self.sample_rate_recip = stream_info.sample_rate_recip;
307
308        let schedule = self.graph.compile(&stream_info)?;
309
310        let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
311
312        let processor = if let Some((from_context_rx, to_context_tx, shared_clock_input, logger)) =
313            maybe_processor
314        {
315            FirewheelProcessorInner::new(
316                from_context_rx,
317                to_context_tx,
318                shared_clock_input,
319                self.config.immediate_event_capacity,
320                #[cfg(feature = "scheduled_events")]
321                self.config.scheduled_event_capacity,
322                self.config.event_queue_capacity,
323                &stream_info,
324                self.config.hard_clip_outputs,
325                self.config.buffer_out_of_space_mode,
326                logger,
327            )
328        } else {
329            let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
330
331            if processor.poisoned {
332                panic!("The audio thread has panicked!");
333            }
334
335            processor.new_stream(&stream_info);
336
337            processor
338        };
339
340        backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
341
342        if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
343        {
344            panic!("Firewheel message channel is full!");
345        }
346
347        self.active_state = Some(ActiveState {
348            backend_handle,
349            stream_info,
350        });
351        self.processor_drop_rx = Some(drop_rx);
352
353        Ok(())
354    }
355
356    /// Stop the audio stream in this context.
357    pub fn stop_stream(&mut self) {
358        // When the backend handle is dropped, the backend will automatically
359        // stop its stream.
360        self.active_state = None;
361        self.graph.deactivate();
362    }
363
364    /// Returns `true` if there is currently a running audio stream.
365    pub fn is_audio_stream_running(&self) -> bool {
366        self.active_state.is_some()
367    }
368
369    /// Information about the running audio stream.
370    ///
371    /// Returns `None` if no audio stream is currently running.
372    pub fn stream_info(&self) -> Option<&StreamInfo> {
373        self.active_state.as_ref().map(|s| &s.stream_info)
374    }
375
376    /// Get the current time of the audio clock, without accounting for the delay
377    /// between when the clock was last updated and now.
378    ///
379    /// For most use cases you probably want to use [`FirewheelCtx::audio_clock_corrected`]
380    /// instead, but this method is provided if needed.
381    ///
382    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
383    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
384    /// that has been processed.) For applications where the timing of audio events is
385    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
386    /// OS's clock (`Instant::now()`).
387    ///
388    /// Note, calling this method is not super cheap, so avoid calling it many
389    /// times within the same game loop iteration if possible.
390    pub fn audio_clock(&self) -> AudioClock {
391        // Reading the latest value of the clock doesn't meaningfully mutate
392        // state, so treat it as an immutable operation with interior mutability.
393        //
394        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
395        // will never panic.
396        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
397        let clock = clock_borrowed.read();
398
399        let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
400            .map(|(update_instant, _delay)| update_instant);
401
402        AudioClock {
403            samples: clock.clock_samples,
404            seconds: clock
405                .clock_samples
406                .to_seconds(self.sample_rate, self.sample_rate_recip),
407            #[cfg(feature = "musical_transport")]
408            musical: clock.current_playhead,
409            #[cfg(feature = "musical_transport")]
410            transport_is_playing: clock.transport_is_playing,
411            update_instant,
412        }
413    }
414
415    /// Get the current time of the audio clock.
416    ///
417    /// Unlike, [`FirewheelCtx::audio_clock`], this method accounts for the delay
418    /// between when the audio clock was last updated and now, leading to a more
419    /// accurate result for games and other applications.
420    ///
421    /// If the delay could not be determined (i.e. an audio stream is not currently
422    /// running), then this will assume there was no delay between when the audio
423    /// clock was last updated and now.
424    ///
425    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
426    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
427    /// that has been processed.) For applications where the timing of audio events is
428    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
429    /// OS's clock (`Instant::now()`).
430    ///
431    /// Note, calling this method is not super cheap, so avoid calling it many
432    /// times within the same game loop iteration if possible.
433    pub fn audio_clock_corrected(&self) -> AudioClock {
434        // Reading the latest value of the clock doesn't meaningfully mutate
435        // state, so treat it as an immutable operation with interior mutability.
436        //
437        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
438        // will never panic.
439        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
440        let clock = clock_borrowed.read();
441
442        let Some((update_instant, delay)) =
443            audio_clock_update_instant_and_delay(&clock, &self.active_state)
444        else {
445            // The audio thread is not currently running, so just return the
446            // latest value of the clock.
447            return AudioClock {
448                samples: clock.clock_samples,
449                seconds: clock
450                    .clock_samples
451                    .to_seconds(self.sample_rate, self.sample_rate_recip),
452                #[cfg(feature = "musical_transport")]
453                musical: clock.current_playhead,
454                #[cfg(feature = "musical_transport")]
455                transport_is_playing: clock.transport_is_playing,
456                update_instant: None,
457            };
458        };
459
460        // Account for the delay between when the clock was last updated and now.
461        let delta_seconds = DurationSeconds(delay.as_secs_f64());
462
463        let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
464
465        #[cfg(feature = "musical_transport")]
466        let musical = clock.current_playhead.map(|musical_time| {
467            if clock.transport_is_playing && self.transport_state.transport.is_some() {
468                self.transport_state
469                    .transport
470                    .as_ref()
471                    .unwrap()
472                    .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
473            } else {
474                musical_time
475            }
476        });
477
478        AudioClock {
479            samples,
480            seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
481            #[cfg(feature = "musical_transport")]
482            musical,
483            #[cfg(feature = "musical_transport")]
484            transport_is_playing: clock.transport_is_playing,
485            update_instant: Some(update_instant),
486        }
487    }
488
489    /// Get the instant the audio clock was last updated.
490    ///
491    /// This method accounts for the delay between when the audio clock was last
492    /// updated and now, leading to a more accurate result for games and other
493    /// applications.
494    ///
495    /// If the audio thread is not currently running, or if the delay could not
496    /// be determined for any other reason, then this will return `None`.
497    ///
498    /// Note, calling this method is not super cheap, so avoid calling it many
499    /// times within the same game loop iteration if possible.
500    pub fn audio_clock_instant(&self) -> Option<Instant> {
501        // Reading the latest value of the clock doesn't meaningfully mutate
502        // state, so treat it as an immutable operation with interior mutability.
503        //
504        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
505        // will never panic.
506        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
507        let clock = clock_borrowed.read();
508
509        audio_clock_update_instant_and_delay(&clock, &self.active_state)
510            .map(|(update_instant, _delay)| update_instant)
511    }
512
513    /// Sync the state of the musical transport.
514    ///
515    /// If the message channel is full, then this will return an error.
516    #[cfg(feature = "musical_transport")]
517    pub fn sync_transport(
518        &mut self,
519        transport: &TransportState,
520    ) -> Result<(), UpdateError<B::StreamError>> {
521        if &*self.transport_state != transport {
522            let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
523                *t = transport.clone();
524                t
525            } else {
526                Box::new(transport.clone())
527            };
528
529            self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
530                .map_err(|(_, e)| e)?;
531
532            *self.transport_state = transport.clone();
533        }
534
535        Ok(())
536    }
537
538    /// Get the current transport state.
539    #[cfg(feature = "musical_transport")]
540    pub fn transport_state(&self) -> &TransportState {
541        &self.transport_state
542    }
543
544    /// Get the current transport state.
545    #[cfg(feature = "musical_transport")]
546    pub fn transport(&self) -> &TransportState {
547        &self.transport_state
548    }
549
550    /// Whether or not outputs are being hard clipped at 0dB.
551    pub fn hard_clip_outputs(&self) -> bool {
552        self.config.hard_clip_outputs
553    }
554
555    /// Set whether or not outputs should be hard clipped at 0dB to
556    /// help protect the system's speakers.
557    ///
558    /// Note that most operating systems already hard clip the output,
559    /// so this is usually not needed (TODO: Do research to see if this
560    /// assumption is true.)
561    ///
562    /// If the message channel is full, then this will return an error.
563    pub fn set_hard_clip_outputs(
564        &mut self,
565        hard_clip_outputs: bool,
566    ) -> Result<(), UpdateError<B::StreamError>> {
567        if self.config.hard_clip_outputs == hard_clip_outputs {
568            return Ok(());
569        }
570        self.config.hard_clip_outputs = hard_clip_outputs;
571
572        self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
573            .map_err(|(_, e)| e)
574    }
575
576    /// Update the firewheel context.
577    ///
578    /// This must be called reguarly (i.e. once every frame).
579    pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
580        self.logger_rx.flush();
581
582        firewheel_core::collector::GlobalCollector.collect();
583
584        for msg in self.from_processor_rx.pop_iter() {
585            match msg {
586                ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
587                    event_group.clear();
588                    self.event_group_pool.push(event_group);
589                }
590                ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
591                    let _ = schedule_data;
592                }
593                #[cfg(feature = "musical_transport")]
594                ProcessorToContextMsg::ReturnTransportState(transport_state) => {
595                    if self.transport_state_alloc_reuse.is_none() {
596                        self.transport_state_alloc_reuse = Some(transport_state);
597                    }
598                }
599                #[cfg(feature = "scheduled_events")]
600                ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
601                    let _ = msgs;
602                }
603            }
604        }
605
606        self.graph.update(
607            self.active_state.as_ref().map(|s| &s.stream_info),
608            &mut self.event_group,
609        );
610
611        if let Some(active_state) = &mut self.active_state {
612            if let Err(e) = active_state.backend_handle.poll_status() {
613                self.active_state = None;
614                self.graph.deactivate();
615
616                return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
617            }
618
619            if self
620                .processor_drop_rx
621                .as_ref()
622                .unwrap()
623                .try_peek()
624                .is_some()
625            {
626                self.active_state = None;
627                self.graph.deactivate();
628
629                return Err(UpdateError::StreamStoppedUnexpectedly(None));
630            }
631        }
632
633        if self.is_audio_stream_running() {
634            if self.graph.needs_compile() {
635                let schedule_data = self
636                    .graph
637                    .compile(&self.active_state.as_ref().unwrap().stream_info)?;
638
639                if let Err((msg, e)) = self
640                    .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
641                {
642                    let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
643                        unreachable!();
644                    };
645
646                    self.graph.on_schedule_send_failed(schedule);
647
648                    return Err(e);
649                }
650            }
651
652            #[cfg(feature = "scheduled_events")]
653            if !self.queued_clear_scheduled_events.is_empty() {
654                let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
655                    self.queued_clear_scheduled_events.drain(..).collect();
656
657                if let Err((msg, e)) = self
658                    .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
659                {
660                    let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
661                        unreachable!();
662                    };
663
664                    self.queued_clear_scheduled_events = msgs.drain(..).collect();
665
666                    return Err(e);
667                }
668            }
669
670            if !self.event_group.is_empty() {
671                let mut next_event_group = self
672                    .event_group_pool
673                    .pop()
674                    .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
675                core::mem::swap(&mut next_event_group, &mut self.event_group);
676
677                if let Err((msg, e)) = self
678                    .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
679                {
680                    let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
681                        unreachable!();
682                    };
683
684                    core::mem::swap(&mut event_group, &mut self.event_group);
685                    self.event_group_pool.push(event_group);
686
687                    return Err(e);
688                }
689            }
690        }
691
692        Ok(())
693    }
694
695    /// The ID of the graph input node
696    pub fn graph_in_node_id(&self) -> NodeID {
697        self.graph.graph_in_node()
698    }
699
700    /// The ID of the graph output node
701    pub fn graph_out_node_id(&self) -> NodeID {
702        self.graph.graph_out_node()
703    }
704
705    /// Add a node to the audio graph.
706    pub fn add_node<T: AudioNode + 'static>(
707        &mut self,
708        node: T,
709        config: Option<T::Configuration>,
710    ) -> NodeID {
711        self.graph.add_node(node, config)
712    }
713
714    /// Add a node to the audio graph which implements the type-erased [`DynAudioNode`] trait.
715    pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
716        self.graph.add_dyn_node(node)
717    }
718
719    /// Remove the given node from the audio graph.
720    ///
721    /// This will automatically remove all edges from the graph that
722    /// were connected to this node.
723    ///
724    /// On success, this returns a list of all edges that were removed
725    /// from the graph as a result of removing this node.
726    ///
727    /// This will return an error if the ID is of the graph input or graph
728    /// output node.
729    pub fn remove_node(
730        &mut self,
731        node_id: NodeID,
732    ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
733        self.graph.remove_node(node_id)
734    }
735
736    /// Get information about a node in the graph.
737    pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
738        self.graph.node_info(id)
739    }
740
741    /// Get an immutable reference to the custom state of a node.
742    pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
743        self.graph.node_state(id)
744    }
745
746    /// Get a type-erased, immutable reference to the custom state of a node.
747    pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
748        self.graph.node_state_dyn(id)
749    }
750
751    /// Get a mutable reference to the custom state of a node.
752    pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
753        self.graph.node_state_mut(id)
754    }
755
756    pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
757        self.graph.node_state_dyn_mut(id)
758    }
759
760    /// Get a list of all the existing nodes in the graph.
761    pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
762        self.graph.nodes()
763    }
764
765    /// Get a list of all the existing edges in the graph.
766    pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
767        self.graph.edges()
768    }
769
770    /// Set the number of input and output channels to and from the audio graph.
771    ///
772    /// Returns the list of edges that were removed.
773    pub fn set_graph_channel_config(
774        &mut self,
775        channel_config: ChannelConfig,
776    ) -> SmallVec<[EdgeID; 4]> {
777        self.graph.set_graph_channel_config(channel_config)
778    }
779
780    /// Add connections (edges) between two nodes to the graph.
781    ///
782    /// * `src_node` - The ID of the source node.
783    /// * `dst_node` - The ID of the destination node.
784    /// * `ports_src_dst` - The port indices for each connection to make,
785    /// where the first value in a tuple is the output port on `src_node`,
786    /// and the second value in that tuple is the input port on `dst_node`.
787    /// * `check_for_cycles` - If `true`, then this will run a check to
788    /// see if adding these edges will create a cycle in the graph, and
789    /// return an error if it does. Note, checking for cycles can be quite
790    /// expensive, so avoid enabling this when calling this method many times
791    /// in a row.
792    ///
793    /// If successful, then this returns a list of edge IDs in order.
794    ///
795    /// If this returns an error, then the audio graph has not been
796    /// modified.
797    pub fn connect(
798        &mut self,
799        src_node: NodeID,
800        dst_node: NodeID,
801        ports_src_dst: &[(PortIdx, PortIdx)],
802        check_for_cycles: bool,
803    ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
804        self.graph
805            .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
806    }
807
808    /// Remove connections (edges) between two nodes from the graph.
809    ///
810    /// * `src_node` - The ID of the source node.
811    /// * `dst_node` - The ID of the destination node.
812    /// * `ports_src_dst` - The port indices for each connection to make,
813    /// where the first value in a tuple is the output port on `src_node`,
814    /// and the second value in that tuple is the input port on `dst_node`.
815    ///
816    /// If none of the edges existed in the graph, then `false` will be
817    /// returned.
818    pub fn disconnect(
819        &mut self,
820        src_node: NodeID,
821        dst_node: NodeID,
822        ports_src_dst: &[(PortIdx, PortIdx)],
823    ) -> bool {
824        self.graph.disconnect(src_node, dst_node, ports_src_dst)
825    }
826
827    /// Remove all connections (edges) between two nodes in the graph.
828    ///
829    /// * `src_node` - The ID of the source node.
830    /// * `dst_node` - The ID of the destination node.
831    pub fn disconnect_all_between(
832        &mut self,
833        src_node: NodeID,
834        dst_node: NodeID,
835    ) -> SmallVec<[EdgeID; 4]> {
836        self.graph.disconnect_all_between(src_node, dst_node)
837    }
838
839    /// Remove a connection (edge) via the edge's unique ID.
840    ///
841    /// If the edge did not exist in this graph, then `false` will be returned.
842    pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
843        self.graph.disconnect_by_edge_id(edge_id)
844    }
845
846    /// Get information about the given [Edge]
847    pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
848        self.graph.edge(edge_id)
849    }
850
851    /// Runs a check to see if a cycle exists in the audio graph.
852    ///
853    /// Note, this method is expensive.
854    pub fn cycle_detected(&mut self) -> bool {
855        self.graph.cycle_detected()
856    }
857
858    /// Queue an event to be sent to an audio node's processor.
859    ///
860    /// Note, this event will not be sent until the event queue is flushed
861    /// in [`FirewheelCtx::update`].
862    pub fn queue_event(&mut self, event: NodeEvent) {
863        self.event_group.push(event);
864    }
865
866    /// Queue an event to be sent to an audio node's processor.
867    ///
868    /// Note, this event will not be sent until the event queue is flushed
869    /// in [`FirewheelCtx::update`].
870    pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
871        self.queue_event(NodeEvent {
872            node_id,
873            #[cfg(feature = "scheduled_events")]
874            time: None,
875            event,
876        });
877    }
878
879    /// Queue an event at a certain time, to be sent to an audio node's processor.
880    ///
881    /// If `time` is `None`, then the event will occur as soon as the node's
882    /// processor receives the event.
883    ///
884    /// Note, this event will not be sent until the event queue is flushed
885    /// in [`FirewheelCtx::update`].
886    #[cfg(feature = "scheduled_events")]
887    pub fn schedule_event_for(
888        &mut self,
889        node_id: NodeID,
890        event: NodeEventType,
891        time: Option<EventInstant>,
892    ) {
893        self.queue_event(NodeEvent {
894            node_id,
895            time,
896            event,
897        });
898    }
899
900    /// Cancel scheduled events for all nodes.
901    ///
902    /// This will clear all events that have been scheduled since the last call to
903    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
904    /// to [`FirewheelCtx::update`] will not be canceled.
905    ///
906    /// This only takes effect once [`FirewheelCtx::update`] is called.
907    #[cfg(feature = "scheduled_events")]
908    pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
909        self.queued_clear_scheduled_events
910            .push(ClearScheduledEventsEvent {
911                node_id: None,
912                event_type,
913            });
914    }
915
916    /// Cancel scheduled events for a specific node.
917    ///
918    /// This will clear all events that have been scheduled since the last call to
919    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
920    /// to [`FirewheelCtx::update`] will not be canceled.
921    ///
922    /// This only takes effect once [`FirewheelCtx::update`] is called.
923    #[cfg(feature = "scheduled_events")]
924    pub fn cancel_scheduled_events_for(
925        &mut self,
926        node_id: NodeID,
927        event_type: ClearScheduledEventsType,
928    ) {
929        self.queued_clear_scheduled_events
930            .push(ClearScheduledEventsEvent {
931                node_id: Some(node_id),
932                event_type,
933            });
934    }
935
936    fn send_message_to_processor(
937        &mut self,
938        msg: ContextToProcessorMsg,
939    ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
940        self.to_processor_tx
941            .try_push(msg)
942            .map_err(|msg| (msg, UpdateError::MsgChannelFull))
943    }
944}
945
946impl<B: AudioBackend> Drop for FirewheelCtx<B> {
947    fn drop(&mut self) {
948        self.stop_stream();
949
950        // Wait for the processor to be drop to avoid deallocating it on
951        // the audio thread.
952        #[cfg(not(target_family = "wasm"))]
953        if let Some(drop_rx) = self.processor_drop_rx.take() {
954            let now = bevy_platform::time::Instant::now();
955
956            while drop_rx.try_peek().is_none() {
957                if now.elapsed() > core::time::Duration::from_secs(2) {
958                    break;
959                }
960
961                bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
962            }
963        }
964
965        firewheel_core::collector::GlobalCollector.collect();
966    }
967}
968
969impl<B: AudioBackend> FirewheelCtx<B> {
970    /// Construct an [`ContextQueue`] for diffing.
971    pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
972        ContextQueue {
973            context: self,
974            id,
975            #[cfg(feature = "scheduled_events")]
976            time: None,
977        }
978    }
979
980    #[cfg(feature = "scheduled_events")]
981    pub fn event_queue_scheduled(
982        &mut self,
983        id: NodeID,
984        time: Option<EventInstant>,
985    ) -> ContextQueue<'_, B> {
986        ContextQueue {
987            context: self,
988            id,
989            time,
990        }
991    }
992}
993
994/// An event queue acquired from [`FirewheelCtx::event_queue`].
995///
996/// This can help reduce event queue allocations
997/// when you have direct access to the context.
998///
999/// ```
1000/// # use firewheel_core::{diff::{Diff, PathBuilder}, node::NodeID};
1001/// # use firewheel_graph::{backend::AudioBackend, FirewheelCtx, ContextQueue};
1002/// # fn context_queue<B: AudioBackend, D: Diff>(
1003/// #     context: &mut FirewheelCtx<B>,
1004/// #     node_id: NodeID,
1005/// #     params: &D,
1006/// #     baseline: &D,
1007/// # ) {
1008/// // Get a queue that will send events directly to the provided node.
1009/// let mut queue = context.event_queue(node_id);
1010/// // Perform diffing using this queue.
1011/// params.diff(baseline, PathBuilder::default(), &mut queue);
1012/// # }
1013/// ```
1014pub struct ContextQueue<'a, B: AudioBackend> {
1015    context: &'a mut FirewheelCtx<B>,
1016    id: NodeID,
1017    #[cfg(feature = "scheduled_events")]
1018    time: Option<EventInstant>,
1019}
1020
1021#[cfg(feature = "scheduled_events")]
1022impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1023    pub fn time(&self) -> Option<EventInstant> {
1024        self.time
1025    }
1026}
1027
1028impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1029    fn push(&mut self, data: NodeEventType) {
1030        self.context.queue_event(NodeEvent {
1031            event: data,
1032            #[cfg(feature = "scheduled_events")]
1033            time: self.time,
1034            node_id: self.id,
1035        });
1036    }
1037}
1038
1039/// The type of scheduled events to clear in a [`ClearScheduledEvents`] message.
1040#[cfg(feature = "scheduled_events")]
1041#[derive(Default, Debug, Clone, Copy, PartialEq)]
1042pub enum ClearScheduledEventsType {
1043    /// Clear both musical and non-musical scheduled events.
1044    #[default]
1045    All,
1046    /// Clear only non-musical scheduled events.
1047    NonMusicalOnly,
1048    /// Clear only musical scheduled events.
1049    MusicalOnly,
1050}
1051
1052fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1053    clock: &SharedClock<B::Instant>,
1054    active_state: &Option<ActiveState<B>>,
1055) -> Option<(Instant, Duration)> {
1056    active_state.as_ref().and_then(|active_state| {
1057        clock
1058            .process_timestamp
1059            .clone()
1060            .and_then(|process_timestamp| {
1061                active_state
1062                    .backend_handle
1063                    .delay_from_last_process(process_timestamp)
1064                    .and_then(|delay| {
1065                        Instant::now()
1066                            .checked_sub(delay)
1067                            .map(|instant| (instant, delay))
1068                    })
1069            })
1070    })
1071}