firewheel_graph/
context.rs

1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::node::ProcStore;
9use firewheel_core::{
10    channel_config::{ChannelConfig, ChannelCount},
11    clock::AudioClock,
12    dsp::declick::DeclickValues,
13    event::{NodeEvent, NodeEventType},
14    node::{AudioNode, DynAudioNode, NodeID},
15    StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
21use bevy_platform::prelude::Box;
22#[cfg(not(feature = "std"))]
23use bevy_platform::prelude::Vec;
24
25use crate::error::RemoveNodeError;
26use crate::processor::BufferOutOfSpaceMode;
27use crate::{
28    backend::{AudioBackend, DeviceInfo},
29    error::{AddEdgeError, StartStreamError, UpdateError},
30    graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
31    processor::{
32        ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
33        SharedClock,
34    },
35};
36
37#[cfg(feature = "scheduled_events")]
38use crate::processor::ClearScheduledEventsEvent;
39#[cfg(feature = "scheduled_events")]
40use firewheel_core::clock::EventInstant;
41
42#[cfg(feature = "musical_transport")]
43use firewheel_core::clock::TransportState;
44
45/// The configuration of a Firewheel context.
46#[derive(Debug, Clone, Copy, PartialEq)]
47#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
48#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
49pub struct FirewheelConfig {
50    /// The number of input channels in the audio graph.
51    pub num_graph_inputs: ChannelCount,
52    /// The number of output channels in the audio graph.
53    pub num_graph_outputs: ChannelCount,
54    /// If `true`, then all outputs will be hard clipped at 0db to help
55    /// protect the system's speakers.
56    ///
57    /// Note that most operating systems already hard clip the output,
58    /// so this is usually not needed (TODO: Do research to see if this
59    /// assumption is true.)
60    ///
61    /// By default this is set to `false`.
62    pub hard_clip_outputs: bool,
63    /// An initial capacity to allocate for the nodes in the audio graph.
64    ///
65    /// By default this is set to `64`.
66    pub initial_node_capacity: u32,
67    /// An initial capacity to allocate for the edges in the audio graph.
68    ///
69    /// By default this is set to `256`.
70    pub initial_edge_capacity: u32,
71    /// The amount of time in seconds to fade in/out when pausing/resuming
72    /// to avoid clicks and pops.
73    ///
74    /// By default this is set to `10.0 / 1_000.0`.
75    pub declick_seconds: f32,
76    /// The initial capacity for a group of events.
77    ///
78    /// By default this is set to `128`.
79    pub initial_event_group_capacity: u32,
80    /// The capacity of the engine's internal message channel.
81    ///
82    /// By default this is set to `64`.
83    pub channel_capacity: u32,
84    /// The maximum number of events that can be sent in a single call
85    /// to [`AudioNodeProcessor::process`].
86    ///
87    /// By default this is set to `128`.
88    ///
89    /// [`AudioNodeProcessor::process`]: firewheel_core::node::AudioNodeProcessor::process
90    pub event_queue_capacity: usize,
91    /// The maximum number of immediate events (events that do *NOT* have a
92    /// scheduled time component) that can be stored at once in the audio
93    /// thread.
94    ///
95    /// By default this is set to `512`.
96    pub immediate_event_capacity: usize,
97    /// The maximum number of scheduled events (events that have a scheduled
98    /// time component) that can be stored at once in the audio thread.
99    ///
100    /// This can be set to `0` to save some memory if you do not plan on using
101    /// scheduled events.
102    ///
103    /// By default this is set to `512`.
104    #[cfg(feature = "scheduled_events")]
105    pub scheduled_event_capacity: usize,
106    /// How to handle event buffers on the audio thread running out of space.
107    ///
108    /// By default this is set to [`BufferOutOfSpaceMode::AllocateOnAudioThread`].
109    pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
110
111    /// The configuration of the realtime safe logger.
112    pub logger_config: RealtimeLoggerConfig,
113
114    /// Force all buffers in the audio graph to be cleared when processing.
115    ///
116    /// This is only meant to be used when diagnosing a misbehaving audio node.
117    /// Enabling this significantly increases processing overhead.
118    ///
119    /// If enabling this fixes audio glitching issues, then notify the node
120    /// author of the misbehavior.
121    ///
122    /// By default this is set to `false`.
123    pub debug_force_clear_buffers: bool,
124
125    /// The initial number of slots to allocate for the [`ProcStore`].
126    ///
127    /// By default this is set to `8`.
128    pub proc_store_capacity: usize,
129}
130
131impl Default for FirewheelConfig {
132    fn default() -> Self {
133        Self {
134            num_graph_inputs: ChannelCount::ZERO,
135            num_graph_outputs: ChannelCount::STEREO,
136            hard_clip_outputs: false,
137            initial_node_capacity: 128,
138            initial_edge_capacity: 256,
139            declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
140            initial_event_group_capacity: 128,
141            channel_capacity: 64,
142            event_queue_capacity: 128,
143            immediate_event_capacity: 512,
144            #[cfg(feature = "scheduled_events")]
145            scheduled_event_capacity: 512,
146            buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
147            logger_config: RealtimeLoggerConfig::default(),
148            debug_force_clear_buffers: false,
149            proc_store_capacity: 8,
150        }
151    }
152}
153
154struct ActiveState<B: AudioBackend> {
155    backend_handle: B,
156    stream_info: StreamInfo,
157}
158
159/// A Firewheel context
160pub struct FirewheelCtx<B: AudioBackend> {
161    graph: AudioGraph,
162
163    to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
164    from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
165    logger_rx: RealtimeLoggerMainThread,
166
167    active_state: Option<ActiveState<B>>,
168
169    processor_channel: Option<(
170        ringbuf::HeapCons<ContextToProcessorMsg>,
171        ringbuf::HeapProd<ProcessorToContextMsg>,
172        triple_buffer::Input<SharedClock<B::Instant>>,
173        RealtimeLogger,
174        ProcStore,
175    )>,
176    processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
177
178    shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
179    sample_rate: NonZeroU32,
180    sample_rate_recip: f64,
181
182    #[cfg(feature = "musical_transport")]
183    transport_state: Box<TransportState>,
184    #[cfg(feature = "musical_transport")]
185    transport_state_alloc_reuse: Option<Box<TransportState>>,
186
187    // Re-use the allocations for groups of events.
188    event_group_pool: Vec<Vec<NodeEvent>>,
189    event_group: Vec<NodeEvent>,
190    initial_event_group_capacity: usize,
191
192    #[cfg(feature = "scheduled_events")]
193    queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
194
195    config: FirewheelConfig,
196}
197
198impl<B: AudioBackend> FirewheelCtx<B> {
199    /// Create a new Firewheel context.
200    pub fn new(config: FirewheelConfig) -> Self {
201        let (to_processor_tx, from_context_rx) =
202            ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
203        let (to_context_tx, from_processor_rx) =
204            ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
205                .split();
206
207        let initial_event_group_capacity = config.initial_event_group_capacity as usize;
208        let mut event_group_pool = Vec::with_capacity(16);
209        for _ in 0..3 {
210            event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
211        }
212
213        let (shared_clock_input, shared_clock_output) =
214            triple_buffer::triple_buffer(&SharedClock::default());
215
216        let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
217
218        let proc_store = ProcStore::with_capacity(config.proc_store_capacity);
219
220        Self {
221            graph: AudioGraph::new(&config),
222            to_processor_tx,
223            from_processor_rx,
224            logger_rx,
225            active_state: None,
226            processor_channel: Some((
227                from_context_rx,
228                to_context_tx,
229                shared_clock_input,
230                logger,
231                proc_store,
232            )),
233            processor_drop_rx: None,
234            shared_clock_output: RefCell::new(shared_clock_output),
235            sample_rate: NonZeroU32::new(44100).unwrap(),
236            sample_rate_recip: 44100.0f64.recip(),
237            #[cfg(feature = "musical_transport")]
238            transport_state: Box::new(TransportState::default()),
239            #[cfg(feature = "musical_transport")]
240            transport_state_alloc_reuse: None,
241            event_group_pool,
242            event_group: Vec::with_capacity(initial_event_group_capacity),
243            initial_event_group_capacity,
244            #[cfg(feature = "scheduled_events")]
245            queued_clear_scheduled_events: Vec::new(),
246            config,
247        }
248    }
249
250    /// Get an immutable reference to the processor store.
251    ///
252    /// If an audio stream is currently running, this will return `None`.
253    pub fn proc_store(&self) -> Option<&ProcStore> {
254        if let Some((_, _, _, _, proc_store)) = &self.processor_channel {
255            Some(proc_store)
256        } else if let Some(processor) = self.processor_drop_rx.as_ref().unwrap().last() {
257            if processor.poisoned {
258                panic!("The audio thread has panicked!");
259            }
260
261            Some(&processor.extra.store)
262        } else {
263            None
264        }
265    }
266
267    /// Get a mutable reference to the processor store.
268    ///
269    /// If an audio stream is currently running, this will return `None`.
270    pub fn proc_store_mut(&mut self) -> Option<&mut ProcStore> {
271        if let Some((_, _, _, _, proc_store)) = &mut self.processor_channel {
272            Some(proc_store)
273        } else if let Some(processor) = self.processor_drop_rx.as_mut().unwrap().last_mut() {
274            if processor.poisoned {
275                panic!("The audio thread has panicked!");
276            }
277
278            Some(&mut processor.extra.store)
279        } else {
280            None
281        }
282    }
283
284    /// Get a reference to the currently active instance of the backend. Returns `None` if the backend has not
285    /// yet been initialized with `start_stream`.
286    pub fn active_backend(&self) -> Option<&B> {
287        self.active_state
288            .as_ref()
289            .map(|state| &state.backend_handle)
290    }
291
292    /// Get a mutable reference to the currently active instance of the backend. Returns `None` if the backend has not
293    /// yet been initialized with `start_stream`.
294    pub fn active_backend_mut(&mut self) -> Option<&mut B> {
295        self.active_state
296            .as_mut()
297            .map(|state| &mut state.backend_handle)
298    }
299
300    /// Get a list of the available audio input devices.
301    pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
302        B::available_input_devices()
303    }
304
305    /// Get a list of the available audio output devices.
306    pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
307        B::available_output_devices()
308    }
309
310    /// Returns `true` if an audio stream can be started right now.
311    ///
312    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
313    /// old stream to be fully stopped. This method is used to check if it has been
314    /// dropped yet.
315    ///
316    /// Note, in rare cases where the audio thread crashes without cleanly dropping
317    /// its contents, this may never return `true`. Consider adding a timeout to
318    /// avoid deadlocking.
319    pub fn can_start_stream(&self) -> bool {
320        if self.is_audio_stream_running() {
321            false
322        } else if let Some(rx) = &self.processor_drop_rx {
323            rx.try_peek().is_some()
324        } else {
325            true
326        }
327    }
328
329    /// Start an audio stream for this context. Only one audio stream can exist on
330    /// a context at a time.
331    ///
332    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
333    /// old stream to be fully stopped. Use [`FirewheelCtx::can_start_stream`] to
334    /// check if it has been dropped yet.
335    ///
336    /// Note, in rare cases where the audio thread crashes without cleanly dropping
337    /// its contents, this may never succeed. Consider adding a timeout to avoid
338    /// deadlocking.
339    pub fn start_stream(
340        &mut self,
341        config: B::Config,
342    ) -> Result<(), StartStreamError<B::StartStreamError>> {
343        if self.is_audio_stream_running() {
344            return Err(StartStreamError::AlreadyStarted);
345        }
346
347        if !self.can_start_stream() {
348            return Err(StartStreamError::OldStreamNotFinishedStopping);
349        }
350
351        let (mut backend_handle, mut stream_info) =
352            B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
353
354        stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
355        stream_info.declick_frames = NonZeroU32::new(
356            (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
357        )
358        .unwrap_or(NonZeroU32::MIN);
359
360        let maybe_processor = self.processor_channel.take();
361
362        stream_info.prev_sample_rate = if maybe_processor.is_some() {
363            stream_info.sample_rate
364        } else {
365            self.sample_rate
366        };
367
368        self.sample_rate = stream_info.sample_rate;
369        self.sample_rate_recip = stream_info.sample_rate_recip;
370
371        let schedule = self.graph.compile(&stream_info)?;
372
373        let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
374
375        let processor =
376            if let Some((from_context_rx, to_context_tx, shared_clock_input, logger, proc_store)) =
377                maybe_processor
378            {
379                FirewheelProcessorInner::new(
380                    from_context_rx,
381                    to_context_tx,
382                    shared_clock_input,
383                    self.config.immediate_event_capacity,
384                    #[cfg(feature = "scheduled_events")]
385                    self.config.scheduled_event_capacity,
386                    self.config.event_queue_capacity,
387                    &stream_info,
388                    self.config.hard_clip_outputs,
389                    self.config.buffer_out_of_space_mode,
390                    logger,
391                    self.config.debug_force_clear_buffers,
392                    proc_store,
393                )
394            } else {
395                let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
396
397                if processor.poisoned {
398                    panic!("The audio thread has panicked!");
399                }
400
401                processor.new_stream(&stream_info);
402
403                processor
404            };
405
406        backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
407
408        if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
409        {
410            panic!("Firewheel message channel is full!");
411        }
412
413        self.active_state = Some(ActiveState {
414            backend_handle,
415            stream_info,
416        });
417        self.processor_drop_rx = Some(drop_rx);
418
419        Ok(())
420    }
421
422    /// Stop the audio stream in this context.
423    pub fn stop_stream(&mut self) {
424        // When the backend handle is dropped, the backend will automatically
425        // stop its stream.
426        self.active_state = None;
427        self.graph.deactivate();
428    }
429
430    /// Returns `true` if there is currently a running audio stream.
431    pub fn is_audio_stream_running(&self) -> bool {
432        self.active_state.is_some()
433    }
434
435    /// Information about the running audio stream.
436    ///
437    /// Returns `None` if no audio stream is currently running.
438    pub fn stream_info(&self) -> Option<&StreamInfo> {
439        self.active_state.as_ref().map(|s| &s.stream_info)
440    }
441
442    /// Get the current time of the audio clock, without accounting for the delay
443    /// between when the clock was last updated and now.
444    ///
445    /// For most use cases you probably want to use [`FirewheelCtx::audio_clock_corrected`]
446    /// instead, but this method is provided if needed.
447    ///
448    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
449    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
450    /// that has been processed.) For applications where the timing of audio events is
451    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
452    /// OS's clock (`Instant::now()`).
453    ///
454    /// Note, calling this method is not super cheap, so avoid calling it many
455    /// times within the same game loop iteration if possible.
456    pub fn audio_clock(&self) -> AudioClock {
457        // Reading the latest value of the clock doesn't meaningfully mutate
458        // state, so treat it as an immutable operation with interior mutability.
459        //
460        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
461        // will never panic.
462        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
463        let clock = clock_borrowed.read();
464
465        let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
466            .map(|(update_instant, _delay)| update_instant);
467
468        AudioClock {
469            samples: clock.clock_samples,
470            seconds: clock
471                .clock_samples
472                .to_seconds(self.sample_rate, self.sample_rate_recip),
473            #[cfg(feature = "musical_transport")]
474            musical: clock.current_playhead,
475            #[cfg(feature = "musical_transport")]
476            transport_is_playing: clock.transport_is_playing,
477            update_instant,
478        }
479    }
480
481    /// Get the current time of the audio clock.
482    ///
483    /// Unlike, [`FirewheelCtx::audio_clock`], this method accounts for the delay
484    /// between when the audio clock was last updated and now, leading to a more
485    /// accurate result for games and other applications.
486    ///
487    /// If the delay could not be determined (i.e. an audio stream is not currently
488    /// running), then this will assume there was no delay between when the audio
489    /// clock was last updated and now.
490    ///
491    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
492    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
493    /// that has been processed.) For applications where the timing of audio events is
494    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
495    /// OS's clock (`Instant::now()`).
496    ///
497    /// Note, calling this method is not super cheap, so avoid calling it many
498    /// times within the same game loop iteration if possible.
499    pub fn audio_clock_corrected(&self) -> AudioClock {
500        // Reading the latest value of the clock doesn't meaningfully mutate
501        // state, so treat it as an immutable operation with interior mutability.
502        //
503        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
504        // will never panic.
505        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
506        let clock = clock_borrowed.read();
507
508        let Some((update_instant, delay)) =
509            audio_clock_update_instant_and_delay(&clock, &self.active_state)
510        else {
511            // The audio thread is not currently running, so just return the
512            // latest value of the clock.
513            return AudioClock {
514                samples: clock.clock_samples,
515                seconds: clock
516                    .clock_samples
517                    .to_seconds(self.sample_rate, self.sample_rate_recip),
518                #[cfg(feature = "musical_transport")]
519                musical: clock.current_playhead,
520                #[cfg(feature = "musical_transport")]
521                transport_is_playing: clock.transport_is_playing,
522                update_instant: None,
523            };
524        };
525
526        // Account for the delay between when the clock was last updated and now.
527        let delta_seconds = DurationSeconds(delay.as_secs_f64());
528
529        let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
530
531        #[cfg(feature = "musical_transport")]
532        let musical = clock.current_playhead.map(|musical_time| {
533            if clock.transport_is_playing && self.transport_state.transport.is_some() {
534                self.transport_state
535                    .transport
536                    .as_ref()
537                    .unwrap()
538                    .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
539            } else {
540                musical_time
541            }
542        });
543
544        AudioClock {
545            samples,
546            seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
547            #[cfg(feature = "musical_transport")]
548            musical,
549            #[cfg(feature = "musical_transport")]
550            transport_is_playing: clock.transport_is_playing,
551            update_instant: Some(update_instant),
552        }
553    }
554
555    /// Get the instant the audio clock was last updated.
556    ///
557    /// This method accounts for the delay between when the audio clock was last
558    /// updated and now, leading to a more accurate result for games and other
559    /// applications.
560    ///
561    /// If the audio thread is not currently running, or if the delay could not
562    /// be determined for any other reason, then this will return `None`.
563    ///
564    /// Note, calling this method is not super cheap, so avoid calling it many
565    /// times within the same game loop iteration if possible.
566    pub fn audio_clock_instant(&self) -> Option<Instant> {
567        // Reading the latest value of the clock doesn't meaningfully mutate
568        // state, so treat it as an immutable operation with interior mutability.
569        //
570        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
571        // will never panic.
572        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
573        let clock = clock_borrowed.read();
574
575        audio_clock_update_instant_and_delay(&clock, &self.active_state)
576            .map(|(update_instant, _delay)| update_instant)
577    }
578
579    /// Sync the state of the musical transport.
580    ///
581    /// If the message channel is full, then this will return an error.
582    #[cfg(feature = "musical_transport")]
583    pub fn sync_transport(
584        &mut self,
585        transport: &TransportState,
586    ) -> Result<(), UpdateError<B::StreamError>> {
587        if &*self.transport_state != transport {
588            let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
589                *t = transport.clone();
590                t
591            } else {
592                Box::new(transport.clone())
593            };
594
595            self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
596                .map_err(|(_, e)| e)?;
597
598            *self.transport_state = transport.clone();
599        }
600
601        Ok(())
602    }
603
604    /// Get the current transport state.
605    #[cfg(feature = "musical_transport")]
606    pub fn transport_state(&self) -> &TransportState {
607        &self.transport_state
608    }
609
610    /// Get the current transport state.
611    #[cfg(feature = "musical_transport")]
612    pub fn transport(&self) -> &TransportState {
613        &self.transport_state
614    }
615
616    /// Whether or not outputs are being hard clipped at 0dB.
617    pub fn hard_clip_outputs(&self) -> bool {
618        self.config.hard_clip_outputs
619    }
620
621    /// Set whether or not outputs should be hard clipped at 0dB to
622    /// help protect the system's speakers.
623    ///
624    /// Note that most operating systems already hard clip the output,
625    /// so this is usually not needed (TODO: Do research to see if this
626    /// assumption is true.)
627    ///
628    /// If the message channel is full, then this will return an error.
629    pub fn set_hard_clip_outputs(
630        &mut self,
631        hard_clip_outputs: bool,
632    ) -> Result<(), UpdateError<B::StreamError>> {
633        if self.config.hard_clip_outputs == hard_clip_outputs {
634            return Ok(());
635        }
636        self.config.hard_clip_outputs = hard_clip_outputs;
637
638        self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
639            .map_err(|(_, e)| e)
640    }
641
642    /// Update the firewheel context.
643    ///
644    /// This must be called reguarly (i.e. once every frame).
645    pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
646        self.logger_rx.flush();
647
648        firewheel_core::collector::GlobalRtGc::collect();
649
650        for msg in self.from_processor_rx.pop_iter() {
651            match msg {
652                ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
653                    event_group.clear();
654                    self.event_group_pool.push(event_group);
655                }
656                ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
657                    let _ = schedule_data;
658                }
659                #[cfg(feature = "musical_transport")]
660                ProcessorToContextMsg::ReturnTransportState(transport_state) => {
661                    if self.transport_state_alloc_reuse.is_none() {
662                        self.transport_state_alloc_reuse = Some(transport_state);
663                    }
664                }
665                #[cfg(feature = "scheduled_events")]
666                ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
667                    let _ = msgs;
668                }
669            }
670        }
671
672        self.graph.update(
673            self.active_state.as_ref().map(|s| &s.stream_info),
674            &mut self.event_group,
675        );
676
677        if let Some(active_state) = &mut self.active_state {
678            if let Err(e) = active_state.backend_handle.poll_status() {
679                self.active_state = None;
680                self.graph.deactivate();
681
682                return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
683            }
684
685            if self
686                .processor_drop_rx
687                .as_ref()
688                .unwrap()
689                .try_peek()
690                .is_some()
691            {
692                self.active_state = None;
693                self.graph.deactivate();
694
695                return Err(UpdateError::StreamStoppedUnexpectedly(None));
696            }
697        }
698
699        if self.is_audio_stream_running() {
700            if self.graph.needs_compile() {
701                let schedule_data = self
702                    .graph
703                    .compile(&self.active_state.as_ref().unwrap().stream_info)?;
704
705                if let Err((msg, e)) = self
706                    .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
707                {
708                    let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
709                        unreachable!();
710                    };
711
712                    self.graph.on_schedule_send_failed(schedule);
713
714                    return Err(e);
715                }
716            }
717
718            #[cfg(feature = "scheduled_events")]
719            if !self.queued_clear_scheduled_events.is_empty() {
720                let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
721                    self.queued_clear_scheduled_events.drain(..).collect();
722
723                if let Err((msg, e)) = self
724                    .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
725                {
726                    let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
727                        unreachable!();
728                    };
729
730                    self.queued_clear_scheduled_events = msgs.drain(..).collect();
731
732                    return Err(e);
733                }
734            }
735
736            if !self.event_group.is_empty() {
737                let mut next_event_group = self
738                    .event_group_pool
739                    .pop()
740                    .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
741                core::mem::swap(&mut next_event_group, &mut self.event_group);
742
743                if let Err((msg, e)) = self
744                    .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
745                {
746                    let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
747                        unreachable!();
748                    };
749
750                    core::mem::swap(&mut event_group, &mut self.event_group);
751                    self.event_group_pool.push(event_group);
752
753                    return Err(e);
754                }
755            }
756        }
757
758        Ok(())
759    }
760
761    /// The ID of the graph input node
762    pub fn graph_in_node_id(&self) -> NodeID {
763        self.graph.graph_in_node()
764    }
765
766    /// The ID of the graph output node
767    pub fn graph_out_node_id(&self) -> NodeID {
768        self.graph.graph_out_node()
769    }
770
771    /// Add a node to the audio graph.
772    pub fn add_node<T: AudioNode + 'static>(
773        &mut self,
774        node: T,
775        config: Option<T::Configuration>,
776    ) -> NodeID {
777        self.graph.add_node(node, config)
778    }
779
780    /// Add a node to the audio graph which implements the type-erased [`DynAudioNode`] trait.
781    pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
782        self.graph.add_dyn_node(node)
783    }
784
785    /// Remove the given node from the audio graph.
786    ///
787    /// This will automatically remove all edges from the graph that
788    /// were connected to this node.
789    ///
790    /// On success, this returns a list of all edges that were removed
791    /// from the graph as a result of removing this node.
792    ///
793    /// This will return an error if the ID is of the graph input or graph
794    /// output node.
795    pub fn remove_node(
796        &mut self,
797        node_id: NodeID,
798    ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
799        self.graph.remove_node(node_id)
800    }
801
802    /// Get information about a node in the graph.
803    pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
804        self.graph.node_info(id)
805    }
806
807    /// Get an immutable reference to the custom state of a node.
808    pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
809        self.graph.node_state(id)
810    }
811
812    /// Get a type-erased, immutable reference to the custom state of a node.
813    pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
814        self.graph.node_state_dyn(id)
815    }
816
817    /// Get a mutable reference to the custom state of a node.
818    pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
819        self.graph.node_state_mut(id)
820    }
821
822    pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
823        self.graph.node_state_dyn_mut(id)
824    }
825
826    /// Get a list of all the existing nodes in the graph.
827    pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
828        self.graph.nodes()
829    }
830
831    /// Get a list of all the existing edges in the graph.
832    pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
833        self.graph.edges()
834    }
835
836    /// Set the number of input and output channels to and from the audio graph.
837    ///
838    /// Returns the list of edges that were removed.
839    pub fn set_graph_channel_config(
840        &mut self,
841        channel_config: ChannelConfig,
842    ) -> SmallVec<[EdgeID; 4]> {
843        self.graph.set_graph_channel_config(channel_config)
844    }
845
846    /// Add connections (edges) between two nodes to the graph.
847    ///
848    /// * `src_node` - The ID of the source node.
849    /// * `dst_node` - The ID of the destination node.
850    /// * `ports_src_dst` - The port indices for each connection to make,
851    /// where the first value in a tuple is the output port on `src_node`,
852    /// and the second value in that tuple is the input port on `dst_node`.
853    /// * `check_for_cycles` - If `true`, then this will run a check to
854    /// see if adding these edges will create a cycle in the graph, and
855    /// return an error if it does. Note, checking for cycles can be quite
856    /// expensive, so avoid enabling this when calling this method many times
857    /// in a row.
858    ///
859    /// If successful, then this returns a list of edge IDs in order.
860    ///
861    /// If this returns an error, then the audio graph has not been
862    /// modified.
863    pub fn connect(
864        &mut self,
865        src_node: NodeID,
866        dst_node: NodeID,
867        ports_src_dst: &[(PortIdx, PortIdx)],
868        check_for_cycles: bool,
869    ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
870        self.graph
871            .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
872    }
873
874    /// Remove connections (edges) between two nodes from the graph.
875    ///
876    /// * `src_node` - The ID of the source node.
877    /// * `dst_node` - The ID of the destination node.
878    /// * `ports_src_dst` - The port indices for each connection to make,
879    /// where the first value in a tuple is the output port on `src_node`,
880    /// and the second value in that tuple is the input port on `dst_node`.
881    ///
882    /// If none of the edges existed in the graph, then `false` will be
883    /// returned.
884    pub fn disconnect(
885        &mut self,
886        src_node: NodeID,
887        dst_node: NodeID,
888        ports_src_dst: &[(PortIdx, PortIdx)],
889    ) -> bool {
890        self.graph.disconnect(src_node, dst_node, ports_src_dst)
891    }
892
893    /// Remove all connections (edges) between two nodes in the graph.
894    ///
895    /// * `src_node` - The ID of the source node.
896    /// * `dst_node` - The ID of the destination node.
897    pub fn disconnect_all_between(
898        &mut self,
899        src_node: NodeID,
900        dst_node: NodeID,
901    ) -> SmallVec<[EdgeID; 4]> {
902        self.graph.disconnect_all_between(src_node, dst_node)
903    }
904
905    /// Remove a connection (edge) via the edge's unique ID.
906    ///
907    /// If the edge did not exist in this graph, then `false` will be returned.
908    pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
909        self.graph.disconnect_by_edge_id(edge_id)
910    }
911
912    /// Get information about the given [Edge]
913    pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
914        self.graph.edge(edge_id)
915    }
916
917    /// Runs a check to see if a cycle exists in the audio graph.
918    ///
919    /// Note, this method is expensive.
920    pub fn cycle_detected(&mut self) -> bool {
921        self.graph.cycle_detected()
922    }
923
924    /// Queue an event to be sent to an audio node's processor.
925    ///
926    /// Note, this event will not be sent until the event queue is flushed
927    /// in [`FirewheelCtx::update`].
928    pub fn queue_event(&mut self, event: NodeEvent) {
929        self.event_group.push(event);
930    }
931
932    /// Queue an event to be sent to an audio node's processor.
933    ///
934    /// Note, this event will not be sent until the event queue is flushed
935    /// in [`FirewheelCtx::update`].
936    pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
937        self.queue_event(NodeEvent {
938            node_id,
939            #[cfg(feature = "scheduled_events")]
940            time: None,
941            event,
942        });
943    }
944
945    /// Queue an event at a certain time, to be sent to an audio node's processor.
946    ///
947    /// If `time` is `None`, then the event will occur as soon as the node's
948    /// processor receives the event.
949    ///
950    /// Note, this event will not be sent until the event queue is flushed
951    /// in [`FirewheelCtx::update`].
952    #[cfg(feature = "scheduled_events")]
953    pub fn schedule_event_for(
954        &mut self,
955        node_id: NodeID,
956        event: NodeEventType,
957        time: Option<EventInstant>,
958    ) {
959        self.queue_event(NodeEvent {
960            node_id,
961            time,
962            event,
963        });
964    }
965
966    /// Cancel scheduled events for all nodes.
967    ///
968    /// This will clear all events that have been scheduled since the last call to
969    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
970    /// to [`FirewheelCtx::update`] will not be canceled.
971    ///
972    /// This only takes effect once [`FirewheelCtx::update`] is called.
973    #[cfg(feature = "scheduled_events")]
974    pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
975        self.queued_clear_scheduled_events
976            .push(ClearScheduledEventsEvent {
977                node_id: None,
978                event_type,
979            });
980    }
981
982    /// Cancel scheduled events for a specific node.
983    ///
984    /// This will clear all events that have been scheduled since the last call to
985    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
986    /// to [`FirewheelCtx::update`] will not be canceled.
987    ///
988    /// This only takes effect once [`FirewheelCtx::update`] is called.
989    #[cfg(feature = "scheduled_events")]
990    pub fn cancel_scheduled_events_for(
991        &mut self,
992        node_id: NodeID,
993        event_type: ClearScheduledEventsType,
994    ) {
995        self.queued_clear_scheduled_events
996            .push(ClearScheduledEventsEvent {
997                node_id: Some(node_id),
998                event_type,
999            });
1000    }
1001
1002    fn send_message_to_processor(
1003        &mut self,
1004        msg: ContextToProcessorMsg,
1005    ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
1006        self.to_processor_tx
1007            .try_push(msg)
1008            .map_err(|msg| (msg, UpdateError::MsgChannelFull))
1009    }
1010}
1011
1012impl<B: AudioBackend> Drop for FirewheelCtx<B> {
1013    fn drop(&mut self) {
1014        self.stop_stream();
1015
1016        // Wait for the processor to be drop to avoid deallocating it on
1017        // the audio thread.
1018        #[cfg(not(target_family = "wasm"))]
1019        if let Some(drop_rx) = self.processor_drop_rx.take() {
1020            let now = bevy_platform::time::Instant::now();
1021
1022            while drop_rx.try_peek().is_none() {
1023                if now.elapsed() > core::time::Duration::from_secs(2) {
1024                    break;
1025                }
1026
1027                bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
1028            }
1029        }
1030
1031        firewheel_core::collector::GlobalRtGc::collect();
1032    }
1033}
1034
1035impl<B: AudioBackend> FirewheelCtx<B> {
1036    /// Construct an [`ContextQueue`] for diffing.
1037    pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
1038        ContextQueue {
1039            context: self,
1040            id,
1041            #[cfg(feature = "scheduled_events")]
1042            time: None,
1043        }
1044    }
1045
1046    #[cfg(feature = "scheduled_events")]
1047    pub fn event_queue_scheduled(
1048        &mut self,
1049        id: NodeID,
1050        time: Option<EventInstant>,
1051    ) -> ContextQueue<'_, B> {
1052        ContextQueue {
1053            context: self,
1054            id,
1055            time,
1056        }
1057    }
1058}
1059
1060/// An event queue acquired from [`FirewheelCtx::event_queue`].
1061///
1062/// This can help reduce event queue allocations
1063/// when you have direct access to the context.
1064///
1065/// ```
1066/// # use firewheel_core::{diff::{Diff, PathBuilder}, node::NodeID};
1067/// # use firewheel_graph::{backend::AudioBackend, FirewheelCtx, ContextQueue};
1068/// # fn context_queue<B: AudioBackend, D: Diff>(
1069/// #     context: &mut FirewheelCtx<B>,
1070/// #     node_id: NodeID,
1071/// #     params: &D,
1072/// #     baseline: &D,
1073/// # ) {
1074/// // Get a queue that will send events directly to the provided node.
1075/// let mut queue = context.event_queue(node_id);
1076/// // Perform diffing using this queue.
1077/// params.diff(baseline, PathBuilder::default(), &mut queue);
1078/// # }
1079/// ```
1080pub struct ContextQueue<'a, B: AudioBackend> {
1081    context: &'a mut FirewheelCtx<B>,
1082    id: NodeID,
1083    #[cfg(feature = "scheduled_events")]
1084    time: Option<EventInstant>,
1085}
1086
1087#[cfg(feature = "scheduled_events")]
1088impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1089    pub fn time(&self) -> Option<EventInstant> {
1090        self.time
1091    }
1092}
1093
1094impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1095    fn push(&mut self, data: NodeEventType) {
1096        self.context.queue_event(NodeEvent {
1097            event: data,
1098            #[cfg(feature = "scheduled_events")]
1099            time: self.time,
1100            node_id: self.id,
1101        });
1102    }
1103}
1104
1105/// The type of scheduled events to clear in a [`ClearScheduledEvents`] message.
1106#[cfg(feature = "scheduled_events")]
1107#[derive(Default, Debug, Clone, Copy, PartialEq)]
1108pub enum ClearScheduledEventsType {
1109    /// Clear both musical and non-musical scheduled events.
1110    #[default]
1111    All,
1112    /// Clear only non-musical scheduled events.
1113    NonMusicalOnly,
1114    /// Clear only musical scheduled events.
1115    MusicalOnly,
1116}
1117
1118fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1119    clock: &SharedClock<B::Instant>,
1120    active_state: &Option<ActiveState<B>>,
1121) -> Option<(Instant, Duration)> {
1122    active_state.as_ref().and_then(|active_state| {
1123        clock
1124            .process_timestamp
1125            .clone()
1126            .and_then(|process_timestamp| {
1127                active_state
1128                    .backend_handle
1129                    .delay_from_last_process(process_timestamp)
1130                    .and_then(|delay| {
1131                        Instant::now()
1132                            .checked_sub(delay)
1133                            .map(|instant| (instant, delay))
1134                    })
1135            })
1136    })
1137}