firewheel_graph/
context.rs

1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::{
9    channel_config::{ChannelConfig, ChannelCount},
10    clock::AudioClock,
11    collector::Collector,
12    dsp::declick::DeclickValues,
13    event::{NodeEvent, NodeEventType},
14    node::{AudioNode, DynAudioNode, NodeID},
15    StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20use crate::processor::BufferOutOfSpaceMode;
21use crate::{
22    backend::{AudioBackend, DeviceInfo},
23    error::{AddEdgeError, StartStreamError, UpdateError},
24    graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
25    processor::{
26        ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
27        SharedClock,
28    },
29};
30
31#[cfg(feature = "scheduled_events")]
32use crate::processor::ClearScheduledEventsEvent;
33#[cfg(feature = "scheduled_events")]
34use firewheel_core::clock::EventInstant;
35
36#[cfg(feature = "musical_transport")]
37use firewheel_core::clock::TransportState;
38
39/// The configuration of a Firewheel context.
40#[derive(Debug, Clone, Copy, PartialEq)]
41pub struct FirewheelConfig {
42    /// The number of input channels in the audio graph.
43    pub num_graph_inputs: ChannelCount,
44    /// The number of output channels in the audio graph.
45    pub num_graph_outputs: ChannelCount,
46    /// If `true`, then all outputs will be hard clipped at 0db to help
47    /// protect the system's speakers.
48    ///
49    /// Note that most operating systems already hard clip the output,
50    /// so this is usually not needed (TODO: Do research to see if this
51    /// assumption is true.)
52    ///
53    /// By default this is set to `false`.
54    pub hard_clip_outputs: bool,
55    /// An initial capacity to allocate for the nodes in the audio graph.
56    ///
57    /// By default this is set to `64`.
58    pub initial_node_capacity: u32,
59    /// An initial capacity to allocate for the edges in the audio graph.
60    ///
61    /// By default this is set to `256`.
62    pub initial_edge_capacity: u32,
63    /// The amount of time in seconds to fade in/out when pausing/resuming
64    /// to avoid clicks and pops.
65    ///
66    /// By default this is set to `10.0 / 1_000.0`.
67    pub declick_seconds: f32,
68    /// The initial capacity for a group of events.
69    ///
70    /// By default this is set to `128`.
71    pub initial_event_group_capacity: u32,
72    /// The capacity of the engine's internal message channel.
73    ///
74    /// By default this is set to `64`.
75    pub channel_capacity: u32,
76    /// The maximum number of events that can be sent in a single call
77    /// to [`AudioNodeProcessor::process`].
78    ///
79    /// By default this is set to `128`.
80    ///
81    /// [`AudioNodeProcessor::process`]: firewheel_core::node::AudioNodeProcessor::process
82    pub event_queue_capacity: usize,
83    /// The maximum number of immediate events (events that do *NOT* have a
84    /// scheduled time component) that can be stored at once in the audio
85    /// thread.
86    ///
87    /// By default this is set to `512`.
88    pub immediate_event_capacity: usize,
89    /// The maximum number of scheduled events (events that have a scheduled
90    /// time component) that can be stored at once in the audio thread.
91    ///
92    /// This can be set to `0` to save some memory if you do not plan on using
93    /// scheduled events.
94    ///
95    /// By default this is set to `512`.
96    #[cfg(feature = "scheduled_events")]
97    pub scheduled_event_capacity: usize,
98    /// How to handle event buffers on the audio thread running out of space.
99    ///
100    /// By default this is set to [`BufferOutOfSpaceMode::AllocateOnAudioThread`].
101    pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
102
103    /// The configuration of the realtime safe logger.
104    pub logger_config: RealtimeLoggerConfig,
105}
106
107impl Default for FirewheelConfig {
108    fn default() -> Self {
109        Self {
110            num_graph_inputs: ChannelCount::ZERO,
111            num_graph_outputs: ChannelCount::STEREO,
112            hard_clip_outputs: false,
113            initial_node_capacity: 128,
114            initial_edge_capacity: 256,
115            declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
116            initial_event_group_capacity: 128,
117            channel_capacity: 64,
118            event_queue_capacity: 128,
119            immediate_event_capacity: 512,
120            #[cfg(feature = "scheduled_events")]
121            scheduled_event_capacity: 512,
122            buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
123            logger_config: RealtimeLoggerConfig::default(),
124        }
125    }
126}
127
128struct ActiveState<B: AudioBackend> {
129    backend_handle: B,
130    stream_info: StreamInfo,
131}
132
133/// A Firewheel context
134pub struct FirewheelCtx<B: AudioBackend> {
135    graph: AudioGraph,
136
137    to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
138    from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
139    logger_rx: RealtimeLoggerMainThread,
140
141    active_state: Option<ActiveState<B>>,
142
143    processor_channel: Option<(
144        ringbuf::HeapCons<ContextToProcessorMsg>,
145        ringbuf::HeapProd<ProcessorToContextMsg>,
146        triple_buffer::Input<SharedClock<B::Instant>>,
147        RealtimeLogger,
148    )>,
149    processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
150
151    shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
152    sample_rate: NonZeroU32,
153    sample_rate_recip: f64,
154
155    #[cfg(feature = "musical_transport")]
156    transport_state: Box<TransportState>,
157    #[cfg(feature = "musical_transport")]
158    transport_state_alloc_reuse: Option<Box<TransportState>>,
159
160    // Re-use the allocations for groups of events.
161    event_group_pool: Vec<Vec<NodeEvent>>,
162    event_group: Vec<NodeEvent>,
163    initial_event_group_capacity: usize,
164
165    #[cfg(feature = "scheduled_events")]
166    queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
167
168    config: FirewheelConfig,
169}
170
171impl<B: AudioBackend> FirewheelCtx<B> {
172    /// Create a new Firewheel context.
173    pub fn new(config: FirewheelConfig) -> Self {
174        let (to_processor_tx, from_context_rx) =
175            ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
176        let (to_context_tx, from_processor_rx) =
177            ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
178                .split();
179
180        let initial_event_group_capacity = config.initial_event_group_capacity as usize;
181        let mut event_group_pool = Vec::with_capacity(16);
182        for _ in 0..3 {
183            event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
184        }
185
186        let (shared_clock_input, shared_clock_output) =
187            triple_buffer::triple_buffer(&SharedClock::default());
188
189        let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
190
191        Self {
192            graph: AudioGraph::new(&config),
193            to_processor_tx,
194            from_processor_rx,
195            logger_rx,
196            active_state: None,
197            processor_channel: Some((from_context_rx, to_context_tx, shared_clock_input, logger)),
198            processor_drop_rx: None,
199            shared_clock_output: RefCell::new(shared_clock_output),
200            sample_rate: NonZeroU32::new(44100).unwrap(),
201            sample_rate_recip: 44100.0f64.recip(),
202            #[cfg(feature = "musical_transport")]
203            transport_state: Box::new(TransportState::default()),
204            #[cfg(feature = "musical_transport")]
205            transport_state_alloc_reuse: None,
206            event_group_pool,
207            event_group: Vec::with_capacity(initial_event_group_capacity),
208            initial_event_group_capacity,
209            #[cfg(feature = "scheduled_events")]
210            queued_clear_scheduled_events: Vec::new(),
211            config,
212        }
213    }
214
215    /// Get a reference to the currently active instance of the backend. Returns `None` if the backend has not
216    /// yet been initialized with `start_stream`.
217    pub fn active_backend(&self) -> Option<&B> {
218        self.active_state
219            .as_ref()
220            .map(|state| &state.backend_handle)
221    }
222
223    /// Get a mutable reference to the currently active instance of the backend. Returns `None` if the backend has not
224    /// yet been initialized with `start_stream`.
225    pub fn active_backend_mut(&mut self) -> Option<&mut B> {
226        self.active_state
227            .as_mut()
228            .map(|state| &mut state.backend_handle)
229    }
230
231    /// Get a list of the available audio input devices.
232    pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
233        B::available_input_devices()
234    }
235
236    /// Get a list of the available audio output devices.
237    pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
238        B::available_output_devices()
239    }
240
241    /// Returns `true` if an audio stream can be started right now.
242    ///
243    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
244    /// old stream to be fully stopped. This method is used to check if it has been
245    /// dropped yet.
246    ///
247    /// Note, in rare cases where the audio thread crashes without cleanly dropping
248    /// its contents, this may never return `true`. Consider adding a timeout to
249    /// avoid deadlocking.
250    pub fn can_start_stream(&self) -> bool {
251        if self.is_audio_stream_running() {
252            false
253        } else if let Some(rx) = &self.processor_drop_rx {
254            rx.try_peek().is_some()
255        } else {
256            true
257        }
258    }
259
260    /// Start an audio stream for this context. Only one audio stream can exist on
261    /// a context at a time.
262    ///
263    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
264    /// old stream to be fully stopped. Use [`FirewheelCtx::can_start_stream`] to
265    /// check if it has been dropped yet.
266    ///
267    /// Note, in rare cases where the audio thread crashes without cleanly dropping
268    /// its contents, this may never succeed. Consider adding a timeout to avoid
269    /// deadlocking.
270    pub fn start_stream(
271        &mut self,
272        config: B::Config,
273    ) -> Result<(), StartStreamError<B::StartStreamError>> {
274        if self.is_audio_stream_running() {
275            return Err(StartStreamError::AlreadyStarted);
276        }
277
278        if !self.can_start_stream() {
279            return Err(StartStreamError::OldStreamNotFinishedStopping);
280        }
281
282        let (mut backend_handle, mut stream_info) =
283            B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
284
285        stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
286        stream_info.declick_frames = NonZeroU32::new(
287            (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
288        )
289        .unwrap_or(NonZeroU32::MIN);
290
291        let maybe_processor = self.processor_channel.take();
292
293        stream_info.prev_sample_rate = if maybe_processor.is_some() {
294            stream_info.sample_rate
295        } else {
296            self.sample_rate
297        };
298
299        self.sample_rate = stream_info.sample_rate;
300        self.sample_rate_recip = stream_info.sample_rate_recip;
301
302        let schedule = self.graph.compile(&stream_info)?;
303
304        let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
305
306        let processor = if let Some((from_context_rx, to_context_tx, shared_clock_input, logger)) =
307            maybe_processor
308        {
309            FirewheelProcessorInner::new(
310                from_context_rx,
311                to_context_tx,
312                shared_clock_input,
313                self.config.immediate_event_capacity,
314                #[cfg(feature = "scheduled_events")]
315                self.config.scheduled_event_capacity,
316                self.config.event_queue_capacity,
317                &stream_info,
318                self.config.hard_clip_outputs,
319                self.config.buffer_out_of_space_mode,
320                logger,
321            )
322        } else {
323            let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
324
325            if processor.poisoned {
326                panic!("The audio thread has panicked!");
327            }
328
329            processor.new_stream(&stream_info);
330
331            processor
332        };
333
334        backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
335
336        if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
337        {
338            panic!("Firewheel message channel is full!");
339        }
340
341        self.active_state = Some(ActiveState {
342            backend_handle,
343            stream_info,
344        });
345        self.processor_drop_rx = Some(drop_rx);
346
347        Ok(())
348    }
349
350    /// Stop the audio stream in this context.
351    pub fn stop_stream(&mut self) {
352        // When the backend handle is dropped, the backend will automatically
353        // stop its stream.
354        self.active_state = None;
355        self.graph.deactivate();
356    }
357
358    /// Returns `true` if there is currently a running audio stream.
359    pub fn is_audio_stream_running(&self) -> bool {
360        self.active_state.is_some()
361    }
362
363    /// Information about the running audio stream.
364    ///
365    /// Returns `None` if no audio stream is currently running.
366    pub fn stream_info(&self) -> Option<&StreamInfo> {
367        self.active_state.as_ref().map(|s| &s.stream_info)
368    }
369
370    /// Get the current time of the audio clock, without accounting for the delay
371    /// between when the clock was last updated and now.
372    ///
373    /// For most use cases you probably want to use [`FirewheelCtx::audio_clock_corrected`]
374    /// instead, but this method is provided if needed.
375    ///
376    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
377    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
378    /// that has been processed.) For applications where the timing of audio events is
379    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
380    /// OS's clock (`Instant::now()`).
381    ///
382    /// Note, calling this method is not super cheap, so avoid calling it many
383    /// times within the same game loop iteration if possible.
384    pub fn audio_clock(&self) -> AudioClock {
385        // Reading the latest value of the clock doesn't meaningfully mutate
386        // state, so treat it as an immutable operation with interior mutability.
387        //
388        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
389        // will never panic.
390        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
391        let clock = clock_borrowed.read();
392
393        let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
394            .map(|(update_instant, _delay)| update_instant);
395
396        AudioClock {
397            samples: clock.clock_samples,
398            seconds: clock
399                .clock_samples
400                .to_seconds(self.sample_rate, self.sample_rate_recip),
401            #[cfg(feature = "musical_transport")]
402            musical: clock.current_playhead,
403            #[cfg(feature = "musical_transport")]
404            transport_is_playing: clock.transport_is_playing,
405            update_instant,
406        }
407    }
408
409    /// Get the current time of the audio clock.
410    ///
411    /// Unlike, [`FirewheelCtx::audio_clock`], this method accounts for the delay
412    /// between when the audio clock was last updated and now, leading to a more
413    /// accurate result for games and other applications.
414    ///
415    /// If the delay could not be determined (i.e. an audio stream is not currently
416    /// running), then this will assume there was no delay between when the audio
417    /// clock was last updated and now.
418    ///
419    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
420    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
421    /// that has been processed.) For applications where the timing of audio events is
422    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
423    /// OS's clock (`Instant::now()`).
424    ///
425    /// Note, calling this method is not super cheap, so avoid calling it many
426    /// times within the same game loop iteration if possible.
427    pub fn audio_clock_corrected(&self) -> AudioClock {
428        // Reading the latest value of the clock doesn't meaningfully mutate
429        // state, so treat it as an immutable operation with interior mutability.
430        //
431        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
432        // will never panic.
433        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
434        let clock = clock_borrowed.read();
435
436        let Some((update_instant, delay)) =
437            audio_clock_update_instant_and_delay(&clock, &self.active_state)
438        else {
439            // The audio thread is not currently running, so just return the
440            // latest value of the clock.
441            return AudioClock {
442                samples: clock.clock_samples,
443                seconds: clock
444                    .clock_samples
445                    .to_seconds(self.sample_rate, self.sample_rate_recip),
446                #[cfg(feature = "musical_transport")]
447                musical: clock.current_playhead,
448                #[cfg(feature = "musical_transport")]
449                transport_is_playing: clock.transport_is_playing,
450                update_instant: None,
451            };
452        };
453
454        // Account for the delay between when the clock was last updated and now.
455        let delta_seconds = DurationSeconds(delay.as_secs_f64());
456
457        let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
458
459        #[cfg(feature = "musical_transport")]
460        let musical = clock.current_playhead.map(|musical_time| {
461            if clock.transport_is_playing && self.transport_state.transport.is_some() {
462                self.transport_state
463                    .transport
464                    .as_ref()
465                    .unwrap()
466                    .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
467            } else {
468                musical_time
469            }
470        });
471
472        AudioClock {
473            samples,
474            seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
475            #[cfg(feature = "musical_transport")]
476            musical,
477            #[cfg(feature = "musical_transport")]
478            transport_is_playing: clock.transport_is_playing,
479            update_instant: Some(update_instant),
480        }
481    }
482
483    /// Get the instant the audio clock was last updated.
484    ///
485    /// This method accounts for the delay between when the audio clock was last
486    /// updated and now, leading to a more accurate result for games and other
487    /// applications.
488    ///
489    /// If the audio thread is not currently running, or if the delay could not
490    /// be determined for any other reason, then this will return `None`.
491    ///
492    /// Note, calling this method is not super cheap, so avoid calling it many
493    /// times within the same game loop iteration if possible.
494    pub fn audio_clock_instant(&self) -> Option<Instant> {
495        // Reading the latest value of the clock doesn't meaningfully mutate
496        // state, so treat it as an immutable operation with interior mutability.
497        //
498        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
499        // will never panic.
500        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
501        let clock = clock_borrowed.read();
502
503        audio_clock_update_instant_and_delay(&clock, &self.active_state)
504            .map(|(update_instant, _delay)| update_instant)
505    }
506
507    /// Sync the state of the musical transport.
508    ///
509    /// If the message channel is full, then this will return an error.
510    #[cfg(feature = "musical_transport")]
511    pub fn sync_transport(
512        &mut self,
513        transport: &TransportState,
514    ) -> Result<(), UpdateError<B::StreamError>> {
515        if &*self.transport_state != transport {
516            let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
517                *t = transport.clone();
518                t
519            } else {
520                Box::new(transport.clone())
521            };
522
523            self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
524                .map_err(|(_, e)| e)?;
525
526            *self.transport_state = transport.clone();
527        }
528
529        Ok(())
530    }
531
532    /// Get the current transport state.
533    #[cfg(feature = "musical_transport")]
534    pub fn transport_state(&self) -> &TransportState {
535        &self.transport_state
536    }
537
538    /// Get the current transport state.
539    #[cfg(feature = "musical_transport")]
540    pub fn transport(&self) -> &TransportState {
541        &self.transport_state
542    }
543
544    /// Whether or not outputs are being hard clipped at 0dB.
545    pub fn hard_clip_outputs(&self) -> bool {
546        self.config.hard_clip_outputs
547    }
548
549    /// Set whether or not outputs should be hard clipped at 0dB to
550    /// help protect the system's speakers.
551    ///
552    /// Note that most operating systems already hard clip the output,
553    /// so this is usually not needed (TODO: Do research to see if this
554    /// assumption is true.)
555    ///
556    /// If the message channel is full, then this will return an error.
557    pub fn set_hard_clip_outputs(
558        &mut self,
559        hard_clip_outputs: bool,
560    ) -> Result<(), UpdateError<B::StreamError>> {
561        if self.config.hard_clip_outputs == hard_clip_outputs {
562            return Ok(());
563        }
564        self.config.hard_clip_outputs = hard_clip_outputs;
565
566        self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
567            .map_err(|(_, e)| e)
568    }
569
570    /// Update the firewheel context.
571    ///
572    /// This must be called reguarly (i.e. once every frame).
573    pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
574        self.logger_rx.flush();
575
576        firewheel_core::collector::GlobalCollector.collect();
577
578        for msg in self.from_processor_rx.pop_iter() {
579            match msg {
580                ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
581                    event_group.clear();
582                    self.event_group_pool.push(event_group);
583                }
584                ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
585                    let _ = schedule_data;
586                }
587                #[cfg(feature = "musical_transport")]
588                ProcessorToContextMsg::ReturnTransportState(transport_state) => {
589                    if self.transport_state_alloc_reuse.is_none() {
590                        self.transport_state_alloc_reuse = Some(transport_state);
591                    }
592                }
593                #[cfg(feature = "scheduled_events")]
594                ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
595                    let _ = msgs;
596                }
597            }
598        }
599
600        self.graph.update(
601            self.active_state.as_ref().map(|s| &s.stream_info),
602            &mut self.event_group,
603        );
604
605        if let Some(active_state) = &mut self.active_state {
606            if let Err(e) = active_state.backend_handle.poll_status() {
607                self.active_state = None;
608                self.graph.deactivate();
609
610                return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
611            }
612
613            if self
614                .processor_drop_rx
615                .as_ref()
616                .unwrap()
617                .try_peek()
618                .is_some()
619            {
620                self.active_state = None;
621                self.graph.deactivate();
622
623                return Err(UpdateError::StreamStoppedUnexpectedly(None));
624            }
625        }
626
627        if self.is_audio_stream_running() {
628            if self.graph.needs_compile() {
629                let schedule_data = self
630                    .graph
631                    .compile(&self.active_state.as_ref().unwrap().stream_info)?;
632
633                if let Err((msg, e)) = self
634                    .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
635                {
636                    let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
637                        unreachable!();
638                    };
639
640                    self.graph.on_schedule_send_failed(schedule);
641
642                    return Err(e);
643                }
644            }
645
646            #[cfg(feature = "scheduled_events")]
647            if !self.queued_clear_scheduled_events.is_empty() {
648                let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
649                    self.queued_clear_scheduled_events.drain(..).collect();
650
651                if let Err((msg, e)) = self
652                    .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
653                {
654                    let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
655                        unreachable!();
656                    };
657
658                    self.queued_clear_scheduled_events = msgs.drain(..).collect();
659
660                    return Err(e);
661                }
662            }
663
664            if !self.event_group.is_empty() {
665                let mut next_event_group = self
666                    .event_group_pool
667                    .pop()
668                    .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
669                core::mem::swap(&mut next_event_group, &mut self.event_group);
670
671                if let Err((msg, e)) = self
672                    .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
673                {
674                    let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
675                        unreachable!();
676                    };
677
678                    core::mem::swap(&mut event_group, &mut self.event_group);
679                    self.event_group_pool.push(event_group);
680
681                    return Err(e);
682                }
683            }
684        }
685
686        Ok(())
687    }
688
689    /// The ID of the graph input node
690    pub fn graph_in_node_id(&self) -> NodeID {
691        self.graph.graph_in_node()
692    }
693
694    /// The ID of the graph output node
695    pub fn graph_out_node_id(&self) -> NodeID {
696        self.graph.graph_out_node()
697    }
698
699    /// Add a node to the audio graph.
700    pub fn add_node<T: AudioNode + 'static>(
701        &mut self,
702        node: T,
703        config: Option<T::Configuration>,
704    ) -> NodeID {
705        self.graph.add_node(node, config)
706    }
707
708    /// Add a node to the audio graph which implements the type-erased [`DynAudioNode`] trait.
709    pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
710        self.graph.add_dyn_node(node)
711    }
712
713    /// Remove the given node from the audio graph.
714    ///
715    /// This will automatically remove all edges from the graph that
716    /// were connected to this node.
717    ///
718    /// On success, this returns a list of all edges that were removed
719    /// from the graph as a result of removing this node.
720    ///
721    /// This will return an error if a node with the given ID does not
722    /// exist in the graph, or if the ID is of the graph input or graph
723    /// output node.
724    pub fn remove_node(&mut self, node_id: NodeID) -> Result<SmallVec<[EdgeID; 4]>, ()> {
725        self.graph.remove_node(node_id)
726    }
727
728    /// Get information about a node in the graph.
729    pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
730        self.graph.node_info(id)
731    }
732
733    /// Get an immutable reference to the custom state of a node.
734    pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
735        self.graph.node_state(id)
736    }
737
738    /// Get a type-erased, immutable reference to the custom state of a node.
739    pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
740        self.graph.node_state_dyn(id)
741    }
742
743    /// Get a mutable reference to the custom state of a node.
744    pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
745        self.graph.node_state_mut(id)
746    }
747
748    pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
749        self.graph.node_state_dyn_mut(id)
750    }
751
752    /// Get a list of all the existing nodes in the graph.
753    pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
754        self.graph.nodes()
755    }
756
757    /// Get a list of all the existing edges in the graph.
758    pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
759        self.graph.edges()
760    }
761
762    /// Set the number of input and output channels to and from the audio graph.
763    ///
764    /// Returns the list of edges that were removed.
765    pub fn set_graph_channel_config(
766        &mut self,
767        channel_config: ChannelConfig,
768    ) -> SmallVec<[EdgeID; 4]> {
769        self.graph.set_graph_channel_config(channel_config)
770    }
771
772    /// Add connections (edges) between two nodes to the graph.
773    ///
774    /// * `src_node` - The ID of the source node.
775    /// * `dst_node` - The ID of the destination node.
776    /// * `ports_src_dst` - The port indices for each connection to make,
777    /// where the first value in a tuple is the output port on `src_node`,
778    /// and the second value in that tuple is the input port on `dst_node`.
779    /// * `check_for_cycles` - If `true`, then this will run a check to
780    /// see if adding these edges will create a cycle in the graph, and
781    /// return an error if it does. Note, checking for cycles can be quite
782    /// expensive, so avoid enabling this when calling this method many times
783    /// in a row.
784    ///
785    /// If successful, then this returns a list of edge IDs in order.
786    ///
787    /// If this returns an error, then the audio graph has not been
788    /// modified.
789    pub fn connect(
790        &mut self,
791        src_node: NodeID,
792        dst_node: NodeID,
793        ports_src_dst: &[(PortIdx, PortIdx)],
794        check_for_cycles: bool,
795    ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
796        self.graph
797            .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
798    }
799
800    /// Remove connections (edges) between two nodes from the graph.
801    ///
802    /// * `src_node` - The ID of the source node.
803    /// * `dst_node` - The ID of the destination node.
804    /// * `ports_src_dst` - The port indices for each connection to make,
805    /// where the first value in a tuple is the output port on `src_node`,
806    /// and the second value in that tuple is the input port on `dst_node`.
807    ///
808    /// If none of the edges existed in the graph, then `false` will be
809    /// returned.
810    pub fn disconnect(
811        &mut self,
812        src_node: NodeID,
813        dst_node: NodeID,
814        ports_src_dst: &[(PortIdx, PortIdx)],
815    ) -> bool {
816        self.graph.disconnect(src_node, dst_node, ports_src_dst)
817    }
818
819    /// Remove all connections (edges) between two nodes in the graph.
820    ///
821    /// * `src_node` - The ID of the source node.
822    /// * `dst_node` - The ID of the destination node.
823    pub fn disconnect_all_between(
824        &mut self,
825        src_node: NodeID,
826        dst_node: NodeID,
827    ) -> SmallVec<[EdgeID; 4]> {
828        self.graph.disconnect_all_between(src_node, dst_node)
829    }
830
831    /// Remove a connection (edge) via the edge's unique ID.
832    ///
833    /// If the edge did not exist in this graph, then `false` will be returned.
834    pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
835        self.graph.disconnect_by_edge_id(edge_id)
836    }
837
838    /// Get information about the given [Edge]
839    pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
840        self.graph.edge(edge_id)
841    }
842
843    /// Runs a check to see if a cycle exists in the audio graph.
844    ///
845    /// Note, this method is expensive.
846    pub fn cycle_detected(&mut self) -> bool {
847        self.graph.cycle_detected()
848    }
849
850    /// Queue an event to be sent to an audio node's processor.
851    ///
852    /// Note, this event will not be sent until the event queue is flushed
853    /// in [`FirewheelCtx::update`].
854    pub fn queue_event(&mut self, event: NodeEvent) {
855        self.event_group.push(event);
856    }
857
858    /// Queue an event to be sent to an audio node's processor.
859    ///
860    /// Note, this event will not be sent until the event queue is flushed
861    /// in [`FirewheelCtx::update`].
862    pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
863        self.queue_event(NodeEvent {
864            node_id,
865            #[cfg(feature = "scheduled_events")]
866            time: None,
867            event,
868        });
869    }
870
871    /// Queue an event at a certain time, to be sent to an audio node's processor.
872    ///
873    /// Note, this event will not be sent until the event queue is flushed
874    /// in [`FirewheelCtx::update`].
875    #[cfg(feature = "scheduled_events")]
876    pub fn schedule_event_for(
877        &mut self,
878        node_id: NodeID,
879        event: NodeEventType,
880        time: EventInstant,
881    ) {
882        self.queue_event(NodeEvent {
883            node_id,
884            time: Some(time),
885            event,
886        });
887    }
888
889    /// Cancel scheduled events for all nodes.
890    ///
891    /// This will clear all events that have been scheduled since the last call to
892    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
893    /// to [`FirewheelCtx::update`] will not be canceled.
894    ///
895    /// This only takes effect once [`FirewheelCtx::update`] is called.
896    #[cfg(feature = "scheduled_events")]
897    pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
898        self.queued_clear_scheduled_events
899            .push(ClearScheduledEventsEvent {
900                node_id: None,
901                event_type,
902            });
903    }
904
905    /// Cancel scheduled events for a specific node.
906    ///
907    /// This will clear all events that have been scheduled since the last call to
908    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
909    /// to [`FirewheelCtx::update`] will not be canceled.
910    ///
911    /// This only takes effect once [`FirewheelCtx::update`] is called.
912    #[cfg(feature = "scheduled_events")]
913    pub fn cancel_scheduled_events_for(
914        &mut self,
915        node_id: NodeID,
916        event_type: ClearScheduledEventsType,
917    ) {
918        self.queued_clear_scheduled_events
919            .push(ClearScheduledEventsEvent {
920                node_id: Some(node_id),
921                event_type,
922            });
923    }
924
925    fn send_message_to_processor(
926        &mut self,
927        msg: ContextToProcessorMsg,
928    ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
929        self.to_processor_tx
930            .try_push(msg)
931            .map_err(|msg| (msg, UpdateError::MsgChannelFull))
932    }
933}
934
935impl<B: AudioBackend> Drop for FirewheelCtx<B> {
936    fn drop(&mut self) {
937        self.stop_stream();
938
939        // Wait for the processor to be drop to avoid deallocating it on
940        // the audio thread.
941        #[cfg(not(target_family = "wasm"))]
942        if let Some(drop_rx) = self.processor_drop_rx.take() {
943            let now = bevy_platform::time::Instant::now();
944
945            while drop_rx.try_peek().is_none() {
946                if now.elapsed() > core::time::Duration::from_secs(2) {
947                    break;
948                }
949
950                bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
951            }
952        }
953
954        firewheel_core::collector::GlobalCollector.collect();
955    }
956}
957
958impl<B: AudioBackend> FirewheelCtx<B> {
959    /// Construct an [`ContextQueue`] for diffing.
960    pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
961        ContextQueue { context: self, id }
962    }
963}
964
965/// An event queue acquired from [`FirewheelCtx::event_queue`].
966///
967/// This can help reduce event queue allocations
968/// when you have direct access to the context.
969///
970/// ```
971/// # use firewheel_core::{diff::{Diff, PathBuilder}, node::NodeID};
972/// # use firewheel_graph::{backend::AudioBackend, FirewheelCtx, ContextQueue};
973/// # fn context_queue<B: AudioBackend, D: Diff>(
974/// #     context: &mut FirewheelCtx<B>,
975/// #     node_id: NodeID,
976/// #     params: &D,
977/// #     baseline: &D,
978/// # ) {
979/// // Get a queue that will send events directly to the provided node.
980/// let mut queue = context.event_queue(node_id);
981/// // Perform diffing using this queue.
982/// params.diff(baseline, PathBuilder::default(), &mut queue);
983/// # }
984/// ```
985pub struct ContextQueue<'a, B: AudioBackend> {
986    context: &'a mut FirewheelCtx<B>,
987    id: NodeID,
988}
989
990#[cfg(feature = "scheduled_events")]
991pub struct TimedContextQueue<'a, B: AudioBackend> {
992    time: EventInstant,
993    context_queue: ContextQueue<'a, B>,
994}
995
996impl<'a, B: AudioBackend> ContextQueue<'a, B> {
997    pub fn reborrow<'b>(&'b mut self) -> ContextQueue<'b, B> {
998        ContextQueue {
999            context: &mut *self.context,
1000            id: self.id,
1001        }
1002    }
1003
1004    #[cfg(feature = "scheduled_events")]
1005    pub fn with_time<'b>(&'b mut self, time: EventInstant) -> TimedContextQueue<'b, B> {
1006        TimedContextQueue {
1007            time,
1008            context_queue: self.reborrow(),
1009        }
1010    }
1011}
1012
1013impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1014    fn push(&mut self, data: NodeEventType) {
1015        self.context.queue_event(NodeEvent {
1016            event: data,
1017            #[cfg(feature = "scheduled_events")]
1018            time: None,
1019            node_id: self.id,
1020        });
1021    }
1022}
1023
1024#[cfg(feature = "scheduled_events")]
1025impl<B: AudioBackend> firewheel_core::diff::EventQueue for TimedContextQueue<'_, B> {
1026    fn push(&mut self, data: NodeEventType) {
1027        self.context_queue.context.queue_event(NodeEvent {
1028            event: data,
1029            time: Some(self.time),
1030            node_id: self.context_queue.id,
1031        });
1032    }
1033}
1034
1035/// The type of scheduled events to clear in a [`ClearScheduledEvents`] message.
1036#[cfg(feature = "scheduled_events")]
1037#[derive(Default, Debug, Clone, Copy, PartialEq)]
1038pub enum ClearScheduledEventsType {
1039    /// Clear both musical and non-musical scheduled events.
1040    #[default]
1041    All,
1042    /// Clear only non-musical scheduled events.
1043    NonMusicalOnly,
1044    /// Clear only musical scheduled events.
1045    MusicalOnly,
1046}
1047
1048fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1049    clock: &SharedClock<B::Instant>,
1050    active_state: &Option<ActiveState<B>>,
1051) -> Option<(Instant, Duration)> {
1052    active_state.as_ref().and_then(|active_state| {
1053        clock
1054            .process_timestamp
1055            .clone()
1056            .and_then(|process_timestamp| {
1057                active_state
1058                    .backend_handle
1059                    .delay_from_last_process(process_timestamp)
1060                    .and_then(|delay| {
1061                        Instant::now()
1062                            .checked_sub(delay)
1063                            .map(|instant| (instant, delay))
1064                    })
1065            })
1066    })
1067}