Skip to main content

firewheel_graph/
context.rs

1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::node::ProcStore;
9use firewheel_core::{
10    channel_config::{ChannelConfig, ChannelCount},
11    clock::AudioClock,
12    dsp::declick::DeclickValues,
13    event::{NodeEvent, NodeEventType},
14    node::{AudioNode, DynAudioNode, NodeID},
15    StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(not(feature = "std"))]
21use num_traits::Float;
22
23#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
24use bevy_platform::prelude::Box;
25#[cfg(not(feature = "std"))]
26use bevy_platform::prelude::Vec;
27
28use crate::error::RemoveNodeError;
29use crate::processor::BufferOutOfSpaceMode;
30use crate::{
31    backend::AudioBackend,
32    error::{AddEdgeError, StartStreamError, UpdateError},
33    graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
34    processor::{
35        ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
36        SharedClock,
37    },
38};
39
40#[cfg(feature = "scheduled_events")]
41use crate::processor::ClearScheduledEventsEvent;
42#[cfg(feature = "scheduled_events")]
43use firewheel_core::clock::EventInstant;
44
45#[cfg(feature = "musical_transport")]
46use firewheel_core::clock::TransportState;
47
48/// The configuration of a Firewheel context.
49#[derive(Debug, Clone, Copy, PartialEq)]
50#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub struct FirewheelConfig {
53    /// The number of input channels in the audio graph.
54    pub num_graph_inputs: ChannelCount,
55    /// The number of output channels in the audio graph.
56    pub num_graph_outputs: ChannelCount,
57    /// If `true`, then all outputs will be hard clipped at 0db to help
58    /// protect the system's speakers.
59    ///
60    /// Note that most operating systems already hard clip the output,
61    /// so this is usually not needed (TODO: Do research to see if this
62    /// assumption is true.)
63    ///
64    /// By default this is set to `false`.
65    pub hard_clip_outputs: bool,
66    /// An initial capacity to allocate for the nodes in the audio graph.
67    ///
68    /// By default this is set to `64`.
69    pub initial_node_capacity: u32,
70    /// An initial capacity to allocate for the edges in the audio graph.
71    ///
72    /// By default this is set to `256`.
73    pub initial_edge_capacity: u32,
74    /// The amount of time in seconds to fade in/out when pausing/resuming
75    /// to avoid clicks and pops.
76    ///
77    /// By default this is set to `10.0 / 1_000.0`.
78    pub declick_seconds: f32,
79    /// The initial capacity for a group of events.
80    ///
81    /// By default this is set to `128`.
82    pub initial_event_group_capacity: u32,
83    /// The capacity of the engine's internal message channel.
84    ///
85    /// By default this is set to `64`.
86    pub channel_capacity: u32,
87    /// The maximum number of events that can be sent in a single call
88    /// to [`AudioNodeProcessor::process`].
89    ///
90    /// By default this is set to `128`.
91    ///
92    /// [`AudioNodeProcessor::process`]: firewheel_core::node::AudioNodeProcessor::process
93    pub event_queue_capacity: usize,
94    /// The maximum number of immediate events (events that do *NOT* have a
95    /// scheduled time component) that can be stored at once in the audio
96    /// thread.
97    ///
98    /// By default this is set to `512`.
99    pub immediate_event_capacity: usize,
100    /// The maximum number of scheduled events (events that have a scheduled
101    /// time component) that can be stored at once in the audio thread.
102    ///
103    /// This can be set to `0` to save some memory if you do not plan on using
104    /// scheduled events.
105    ///
106    /// By default this is set to `512`.
107    #[cfg(feature = "scheduled_events")]
108    pub scheduled_event_capacity: usize,
109    /// How to handle event buffers on the audio thread running out of space.
110    ///
111    /// By default this is set to [`BufferOutOfSpaceMode::AllocateOnAudioThread`].
112    pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
113
114    /// The configuration of the realtime safe logger.
115    pub logger_config: RealtimeLoggerConfig,
116
117    /// Force all buffers in the audio graph to be cleared when processing.
118    ///
119    /// This is only meant to be used when diagnosing a misbehaving audio node.
120    /// Enabling this significantly increases processing overhead.
121    ///
122    /// If enabling this fixes audio glitching issues, then notify the node
123    /// author of the misbehavior.
124    ///
125    /// By default this is set to `false`.
126    pub debug_force_clear_buffers: bool,
127
128    /// The initial number of slots to allocate for the [`ProcStore`].
129    ///
130    /// By default this is set to `8`.
131    pub proc_store_capacity: usize,
132}
133
134impl Default for FirewheelConfig {
135    fn default() -> Self {
136        Self {
137            num_graph_inputs: ChannelCount::ZERO,
138            num_graph_outputs: ChannelCount::STEREO,
139            hard_clip_outputs: false,
140            initial_node_capacity: 128,
141            initial_edge_capacity: 256,
142            declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
143            initial_event_group_capacity: 128,
144            channel_capacity: 64,
145            event_queue_capacity: 128,
146            immediate_event_capacity: 512,
147            #[cfg(feature = "scheduled_events")]
148            scheduled_event_capacity: 512,
149            buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
150            logger_config: RealtimeLoggerConfig::default(),
151            debug_force_clear_buffers: false,
152            proc_store_capacity: 8,
153        }
154    }
155}
156
157struct ActiveState<B: AudioBackend> {
158    backend_handle: B,
159    stream_info: StreamInfo,
160}
161
162/// A Firewheel context
163pub struct FirewheelCtx<B: AudioBackend> {
164    graph: AudioGraph,
165
166    to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
167    from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
168    logger_rx: RealtimeLoggerMainThread,
169
170    active_state: Option<ActiveState<B>>,
171
172    processor_channel: Option<(
173        ringbuf::HeapCons<ContextToProcessorMsg>,
174        ringbuf::HeapProd<ProcessorToContextMsg>,
175        triple_buffer::Input<SharedClock<B::Instant>>,
176        RealtimeLogger,
177        ProcStore,
178    )>,
179    processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
180
181    shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
182    sample_rate: NonZeroU32,
183    sample_rate_recip: f64,
184
185    #[cfg(feature = "musical_transport")]
186    transport_state: Box<TransportState>,
187    #[cfg(feature = "musical_transport")]
188    transport_state_alloc_reuse: Option<Box<TransportState>>,
189
190    // Re-use the allocations for groups of events.
191    event_group_pool: Vec<Vec<NodeEvent>>,
192    event_group: Vec<NodeEvent>,
193    initial_event_group_capacity: usize,
194
195    #[cfg(feature = "scheduled_events")]
196    queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
197
198    config: FirewheelConfig,
199}
200
201impl<B: AudioBackend> FirewheelCtx<B> {
202    /// Create a new Firewheel context.
203    pub fn new(config: FirewheelConfig) -> Self {
204        let (to_processor_tx, from_context_rx) =
205            ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
206        let (to_context_tx, from_processor_rx) =
207            ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
208                .split();
209
210        let initial_event_group_capacity = config.initial_event_group_capacity as usize;
211        let mut event_group_pool = Vec::with_capacity(16);
212        for _ in 0..3 {
213            event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
214        }
215
216        let (shared_clock_input, shared_clock_output) =
217            triple_buffer::triple_buffer(&SharedClock::default());
218
219        let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
220
221        let proc_store = ProcStore::with_capacity(config.proc_store_capacity);
222
223        Self {
224            graph: AudioGraph::new(&config),
225            to_processor_tx,
226            from_processor_rx,
227            logger_rx,
228            active_state: None,
229            processor_channel: Some((
230                from_context_rx,
231                to_context_tx,
232                shared_clock_input,
233                logger,
234                proc_store,
235            )),
236            processor_drop_rx: None,
237            shared_clock_output: RefCell::new(shared_clock_output),
238            sample_rate: NonZeroU32::new(44100).unwrap(),
239            sample_rate_recip: 44100.0f64.recip(),
240            #[cfg(feature = "musical_transport")]
241            transport_state: Box::new(TransportState::default()),
242            #[cfg(feature = "musical_transport")]
243            transport_state_alloc_reuse: None,
244            event_group_pool,
245            event_group: Vec::with_capacity(initial_event_group_capacity),
246            initial_event_group_capacity,
247            #[cfg(feature = "scheduled_events")]
248            queued_clear_scheduled_events: Vec::new(),
249            config,
250        }
251    }
252
253    /// Get an immutable reference to the processor store.
254    ///
255    /// If an audio stream is currently running, this will return `None`.
256    pub fn proc_store(&self) -> Option<&ProcStore> {
257        if let Some((_, _, _, _, proc_store)) = &self.processor_channel {
258            Some(proc_store)
259        } else if let Some(processor) = self.processor_drop_rx.as_ref().unwrap().last() {
260            if processor.poisoned {
261                panic!("The audio thread has panicked!");
262            }
263
264            Some(&processor.extra.store)
265        } else {
266            None
267        }
268    }
269
270    /// Get a mutable reference to the processor store.
271    ///
272    /// If an audio stream is currently running, this will return `None`.
273    pub fn proc_store_mut(&mut self) -> Option<&mut ProcStore> {
274        if let Some((_, _, _, _, proc_store)) = &mut self.processor_channel {
275            Some(proc_store)
276        } else if let Some(processor) = self.processor_drop_rx.as_mut().unwrap().last_mut() {
277            if processor.poisoned {
278                panic!("The audio thread has panicked!");
279            }
280
281            Some(&mut processor.extra.store)
282        } else {
283            None
284        }
285    }
286
287    /// Get a reference to the currently active instance of the backend. Returns `None` if the backend has not
288    /// yet been initialized with `start_stream`.
289    pub fn active_backend(&self) -> Option<&B> {
290        self.active_state
291            .as_ref()
292            .map(|state| &state.backend_handle)
293    }
294
295    /// Get a mutable reference to the currently active instance of the backend. Returns `None` if the backend has not
296    /// yet been initialized with `start_stream`.
297    pub fn active_backend_mut(&mut self) -> Option<&mut B> {
298        self.active_state
299            .as_mut()
300            .map(|state| &mut state.backend_handle)
301    }
302
303    /// Get a struct used to retrieve the list of available audio devices
304    /// on the system and their available ocnfigurations.
305    pub fn device_enumerator(&self) -> B::Enumerator {
306        B::enumerator()
307    }
308
309    /// Returns `true` if an audio stream can be started right now.
310    ///
311    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
312    /// old stream to be fully stopped. This method is used to check if it has been
313    /// dropped yet.
314    ///
315    /// Note, in rare cases where the audio thread crashes without cleanly dropping
316    /// its contents, this may never return `true`. Consider adding a timeout to
317    /// avoid deadlocking.
318    pub fn can_start_stream(&self) -> bool {
319        if self.is_audio_stream_running() {
320            false
321        } else if let Some(rx) = &self.processor_drop_rx {
322            rx.try_peek().is_some()
323        } else {
324            true
325        }
326    }
327
328    /// Start an audio stream for this context. Only one audio stream can exist on
329    /// a context at a time.
330    ///
331    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
332    /// old stream to be fully stopped. Use [`FirewheelCtx::can_start_stream`] to
333    /// check if it has been dropped yet.
334    ///
335    /// Note, in rare cases where the audio thread crashes without cleanly dropping
336    /// its contents, this may never succeed. Consider adding a timeout to avoid
337    /// deadlocking.
338    pub fn start_stream(
339        &mut self,
340        config: B::Config,
341    ) -> Result<(), StartStreamError<B::StartStreamError>> {
342        if self.is_audio_stream_running() {
343            return Err(StartStreamError::AlreadyStarted);
344        }
345
346        if !self.can_start_stream() {
347            return Err(StartStreamError::OldStreamNotFinishedStopping);
348        }
349
350        let (mut backend_handle, mut stream_info) =
351            B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
352
353        stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
354        stream_info.declick_frames = NonZeroU32::new(
355            (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
356        )
357        .unwrap_or(NonZeroU32::MIN);
358
359        let maybe_processor = self.processor_channel.take();
360
361        stream_info.prev_sample_rate = if maybe_processor.is_some() {
362            stream_info.sample_rate
363        } else {
364            self.sample_rate
365        };
366
367        self.sample_rate = stream_info.sample_rate;
368        self.sample_rate_recip = stream_info.sample_rate_recip;
369
370        let schedule = self.graph.compile(&stream_info)?;
371
372        let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
373
374        let processor =
375            if let Some((from_context_rx, to_context_tx, shared_clock_input, logger, proc_store)) =
376                maybe_processor
377            {
378                FirewheelProcessorInner::new(
379                    from_context_rx,
380                    to_context_tx,
381                    shared_clock_input,
382                    self.config.immediate_event_capacity,
383                    #[cfg(feature = "scheduled_events")]
384                    self.config.scheduled_event_capacity,
385                    self.config.event_queue_capacity,
386                    &stream_info,
387                    self.config.hard_clip_outputs,
388                    self.config.buffer_out_of_space_mode,
389                    logger,
390                    self.config.debug_force_clear_buffers,
391                    proc_store,
392                )
393            } else {
394                let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
395
396                if processor.poisoned {
397                    panic!("The audio thread has panicked!");
398                }
399
400                processor.new_stream(&stream_info);
401
402                processor
403            };
404
405        backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
406
407        if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
408        {
409            panic!("Firewheel message channel is full!");
410        }
411
412        self.active_state = Some(ActiveState {
413            backend_handle,
414            stream_info,
415        });
416        self.processor_drop_rx = Some(drop_rx);
417
418        Ok(())
419    }
420
421    /// Stop the audio stream in this context.
422    pub fn stop_stream(&mut self) {
423        // When the backend handle is dropped, the backend will automatically
424        // stop its stream.
425        self.active_state = None;
426        self.graph.deactivate();
427    }
428
429    /// Returns `true` if there is currently a running audio stream.
430    pub fn is_audio_stream_running(&self) -> bool {
431        self.active_state.is_some()
432    }
433
434    /// Information about the running audio stream.
435    ///
436    /// Returns `None` if no audio stream is currently running.
437    pub fn stream_info(&self) -> Option<&StreamInfo> {
438        self.active_state.as_ref().map(|s| &s.stream_info)
439    }
440
441    /// Get the current time of the audio clock, without accounting for the delay
442    /// between when the clock was last updated and now.
443    ///
444    /// For most use cases you probably want to use [`FirewheelCtx::audio_clock_corrected`]
445    /// instead, but this method is provided if needed.
446    ///
447    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
448    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
449    /// that has been processed.) For applications where the timing of audio events is
450    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
451    /// OS's clock (`Instant::now()`).
452    ///
453    /// Note, calling this method is not super cheap, so avoid calling it many
454    /// times within the same game loop iteration if possible.
455    pub fn audio_clock(&self) -> AudioClock {
456        // Reading the latest value of the clock doesn't meaningfully mutate
457        // state, so treat it as an immutable operation with interior mutability.
458        //
459        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
460        // will never panic.
461        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
462        let clock = clock_borrowed.read();
463
464        let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
465            .map(|(update_instant, _delay)| update_instant);
466
467        AudioClock {
468            samples: clock.clock_samples,
469            seconds: clock
470                .clock_samples
471                .to_seconds(self.sample_rate, self.sample_rate_recip),
472            #[cfg(feature = "musical_transport")]
473            musical: clock.current_playhead,
474            #[cfg(feature = "musical_transport")]
475            transport_is_playing: clock.transport_is_playing,
476            update_instant,
477        }
478    }
479
480    /// Get the current time of the audio clock.
481    ///
482    /// Unlike, [`FirewheelCtx::audio_clock`], this method accounts for the delay
483    /// between when the audio clock was last updated and now, leading to a more
484    /// accurate result for games and other applications.
485    ///
486    /// If the delay could not be determined (i.e. an audio stream is not currently
487    /// running), then this will assume there was no delay between when the audio
488    /// clock was last updated and now.
489    ///
490    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
491    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
492    /// that has been processed.) For applications where the timing of audio events is
493    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
494    /// OS's clock (`Instant::now()`).
495    ///
496    /// Note, calling this method is not super cheap, so avoid calling it many
497    /// times within the same game loop iteration if possible.
498    pub fn audio_clock_corrected(&self) -> AudioClock {
499        // Reading the latest value of the clock doesn't meaningfully mutate
500        // state, so treat it as an immutable operation with interior mutability.
501        //
502        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
503        // will never panic.
504        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
505        let clock = clock_borrowed.read();
506
507        let Some((update_instant, delay)) =
508            audio_clock_update_instant_and_delay(&clock, &self.active_state)
509        else {
510            // The audio thread is not currently running, so just return the
511            // latest value of the clock.
512            return AudioClock {
513                samples: clock.clock_samples,
514                seconds: clock
515                    .clock_samples
516                    .to_seconds(self.sample_rate, self.sample_rate_recip),
517                #[cfg(feature = "musical_transport")]
518                musical: clock.current_playhead,
519                #[cfg(feature = "musical_transport")]
520                transport_is_playing: clock.transport_is_playing,
521                update_instant: None,
522            };
523        };
524
525        // Account for the delay between when the clock was last updated and now.
526        let delta_seconds = DurationSeconds(delay.as_secs_f64());
527
528        let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
529
530        #[cfg(feature = "musical_transport")]
531        let musical = clock.current_playhead.map(|musical_time| {
532            if clock.transport_is_playing && self.transport_state.transport.is_some() {
533                self.transport_state
534                    .transport
535                    .as_ref()
536                    .unwrap()
537                    .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
538            } else {
539                musical_time
540            }
541        });
542
543        AudioClock {
544            samples,
545            seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
546            #[cfg(feature = "musical_transport")]
547            musical,
548            #[cfg(feature = "musical_transport")]
549            transport_is_playing: clock.transport_is_playing,
550            update_instant: Some(update_instant),
551        }
552    }
553
554    /// Get the instant the audio clock was last updated.
555    ///
556    /// This method accounts for the delay between when the audio clock was last
557    /// updated and now, leading to a more accurate result for games and other
558    /// applications.
559    ///
560    /// If the audio thread is not currently running, or if the delay could not
561    /// be determined for any other reason, then this will return `None`.
562    ///
563    /// Note, calling this method is not super cheap, so avoid calling it many
564    /// times within the same game loop iteration if possible.
565    pub fn audio_clock_instant(&self) -> Option<Instant> {
566        // Reading the latest value of the clock doesn't meaningfully mutate
567        // state, so treat it as an immutable operation with interior mutability.
568        //
569        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
570        // will never panic.
571        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
572        let clock = clock_borrowed.read();
573
574        audio_clock_update_instant_and_delay(&clock, &self.active_state)
575            .map(|(update_instant, _delay)| update_instant)
576    }
577
578    /// Sync the state of the musical transport.
579    ///
580    /// If the message channel is full, then this will return an error.
581    #[cfg(feature = "musical_transport")]
582    pub fn sync_transport(
583        &mut self,
584        transport: &TransportState,
585    ) -> Result<(), UpdateError<B::StreamError>> {
586        if &*self.transport_state != transport {
587            let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
588                *t = transport.clone();
589                t
590            } else {
591                Box::new(transport.clone())
592            };
593
594            self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
595                .map_err(|(_, e)| e)?;
596
597            *self.transport_state = transport.clone();
598        }
599
600        Ok(())
601    }
602
603    /// Get the current transport state.
604    #[cfg(feature = "musical_transport")]
605    pub fn transport_state(&self) -> &TransportState {
606        &self.transport_state
607    }
608
609    /// Get the current transport state.
610    #[cfg(feature = "musical_transport")]
611    pub fn transport(&self) -> &TransportState {
612        &self.transport_state
613    }
614
615    /// Whether or not outputs are being hard clipped at 0dB.
616    pub fn hard_clip_outputs(&self) -> bool {
617        self.config.hard_clip_outputs
618    }
619
620    /// Set whether or not outputs should be hard clipped at 0dB to
621    /// help protect the system's speakers.
622    ///
623    /// Note that most operating systems already hard clip the output,
624    /// so this is usually not needed (TODO: Do research to see if this
625    /// assumption is true.)
626    ///
627    /// If the message channel is full, then this will return an error.
628    pub fn set_hard_clip_outputs(
629        &mut self,
630        hard_clip_outputs: bool,
631    ) -> Result<(), UpdateError<B::StreamError>> {
632        if self.config.hard_clip_outputs == hard_clip_outputs {
633            return Ok(());
634        }
635        self.config.hard_clip_outputs = hard_clip_outputs;
636
637        self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
638            .map_err(|(_, e)| e)
639    }
640
641    /// Update the firewheel context.
642    ///
643    /// This must be called reguarly (i.e. once every frame).
644    pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
645        self.logger_rx.flush(
646            |msg| {
647                #[cfg(feature = "tracing")]
648                tracing::error!("{}", msg);
649
650                #[cfg(all(feature = "log", not(feature = "tracing")))]
651                log::error!("{}", msg);
652
653                let _ = msg;
654            },
655            #[cfg(debug_assertions)]
656            |msg| {
657                #[cfg(feature = "tracing")]
658                tracing::debug!("{}", msg);
659
660                #[cfg(all(feature = "log", not(feature = "tracing")))]
661                log::debug!("{}", msg);
662
663                let _ = msg;
664            },
665        );
666
667        firewheel_core::collector::GlobalRtGc::collect();
668
669        for msg in self.from_processor_rx.pop_iter() {
670            match msg {
671                ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
672                    event_group.clear();
673                    self.event_group_pool.push(event_group);
674                }
675                ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
676                    let _ = schedule_data;
677                }
678                #[cfg(feature = "musical_transport")]
679                ProcessorToContextMsg::ReturnTransportState(transport_state) => {
680                    if self.transport_state_alloc_reuse.is_none() {
681                        self.transport_state_alloc_reuse = Some(transport_state);
682                    }
683                }
684                #[cfg(feature = "scheduled_events")]
685                ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
686                    let _ = msgs;
687                }
688            }
689        }
690
691        self.graph.update(
692            self.active_state.as_ref().map(|s| &s.stream_info),
693            &mut self.event_group,
694        );
695
696        if let Some(active_state) = &mut self.active_state {
697            if let Err(e) = active_state.backend_handle.poll_status() {
698                self.active_state = None;
699                self.graph.deactivate();
700
701                return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
702            }
703
704            if self
705                .processor_drop_rx
706                .as_ref()
707                .unwrap()
708                .try_peek()
709                .is_some()
710            {
711                self.active_state = None;
712                self.graph.deactivate();
713
714                return Err(UpdateError::StreamStoppedUnexpectedly(None));
715            }
716        }
717
718        if self.is_audio_stream_running() {
719            if self.graph.needs_compile() {
720                let schedule_data = self
721                    .graph
722                    .compile(&self.active_state.as_ref().unwrap().stream_info)?;
723
724                if let Err((msg, e)) = self
725                    .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
726                {
727                    let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
728                        unreachable!();
729                    };
730
731                    self.graph.on_schedule_send_failed(schedule);
732
733                    return Err(e);
734                }
735            }
736
737            #[cfg(feature = "scheduled_events")]
738            if !self.queued_clear_scheduled_events.is_empty() {
739                let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
740                    self.queued_clear_scheduled_events.drain(..).collect();
741
742                if let Err((msg, e)) = self
743                    .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
744                {
745                    let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
746                        unreachable!();
747                    };
748
749                    self.queued_clear_scheduled_events = msgs.drain(..).collect();
750
751                    return Err(e);
752                }
753            }
754
755            if !self.event_group.is_empty() {
756                let mut next_event_group = self
757                    .event_group_pool
758                    .pop()
759                    .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
760                core::mem::swap(&mut next_event_group, &mut self.event_group);
761
762                if let Err((msg, e)) = self
763                    .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
764                {
765                    let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
766                        unreachable!();
767                    };
768
769                    core::mem::swap(&mut event_group, &mut self.event_group);
770                    self.event_group_pool.push(event_group);
771
772                    return Err(e);
773                }
774            }
775        }
776
777        Ok(())
778    }
779
780    /// The ID of the graph input node
781    pub fn graph_in_node_id(&self) -> NodeID {
782        self.graph.graph_in_node()
783    }
784
785    /// The ID of the graph output node
786    pub fn graph_out_node_id(&self) -> NodeID {
787        self.graph.graph_out_node()
788    }
789
790    /// Add a node to the audio graph.
791    pub fn add_node<T: AudioNode + 'static>(
792        &mut self,
793        node: T,
794        config: Option<T::Configuration>,
795    ) -> NodeID {
796        self.graph.add_node(node, config)
797    }
798
799    /// Add a node to the audio graph which implements the type-erased [`DynAudioNode`] trait.
800    pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
801        self.graph.add_dyn_node(node)
802    }
803
804    /// Remove the given node from the audio graph.
805    ///
806    /// This will automatically remove all edges from the graph that
807    /// were connected to this node.
808    ///
809    /// On success, this returns a list of all edges that were removed
810    /// from the graph as a result of removing this node.
811    ///
812    /// This will return an error if the ID is of the graph input or graph
813    /// output node.
814    pub fn remove_node(
815        &mut self,
816        node_id: NodeID,
817    ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
818        self.graph.remove_node(node_id)
819    }
820
821    /// Get information about a node in the graph.
822    pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
823        self.graph.node_info(id)
824    }
825
826    /// Get an immutable reference to the custom state of a node.
827    pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
828        self.graph.node_state(id)
829    }
830
831    /// Get a type-erased, immutable reference to the custom state of a node.
832    pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
833        self.graph.node_state_dyn(id)
834    }
835
836    /// Get a mutable reference to the custom state of a node.
837    pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
838        self.graph.node_state_mut(id)
839    }
840
841    pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
842        self.graph.node_state_dyn_mut(id)
843    }
844
845    /// Get a list of all the existing nodes in the graph.
846    pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
847        self.graph.nodes()
848    }
849
850    /// Get a list of all the existing edges in the graph.
851    pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
852        self.graph.edges()
853    }
854
855    /// Set the number of input and output channels to and from the audio graph.
856    ///
857    /// Returns the list of edges that were removed.
858    pub fn set_graph_channel_config(
859        &mut self,
860        channel_config: ChannelConfig,
861    ) -> SmallVec<[EdgeID; 4]> {
862        self.graph.set_graph_channel_config(channel_config)
863    }
864
865    /// Add connections (edges) between two nodes to the graph.
866    ///
867    /// * `src_node` - The ID of the source node.
868    /// * `dst_node` - The ID of the destination node.
869    /// * `ports_src_dst` - The port indices for each connection to make,
870    /// where the first value in a tuple is the output port on `src_node`,
871    /// and the second value in that tuple is the input port on `dst_node`.
872    /// * `check_for_cycles` - If `true`, then this will run a check to
873    /// see if adding these edges will create a cycle in the graph, and
874    /// return an error if it does. Note, checking for cycles can be quite
875    /// expensive, so avoid enabling this when calling this method many times
876    /// in a row.
877    ///
878    /// If successful, then this returns a list of edge IDs in order.
879    ///
880    /// If this returns an error, then the audio graph has not been
881    /// modified.
882    pub fn connect(
883        &mut self,
884        src_node: NodeID,
885        dst_node: NodeID,
886        ports_src_dst: &[(PortIdx, PortIdx)],
887        check_for_cycles: bool,
888    ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
889        self.graph
890            .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
891    }
892
893    /// Remove connections (edges) between two nodes from the graph.
894    ///
895    /// * `src_node` - The ID of the source node.
896    /// * `dst_node` - The ID of the destination node.
897    /// * `ports_src_dst` - The port indices for each connection to make,
898    /// where the first value in a tuple is the output port on `src_node`,
899    /// and the second value in that tuple is the input port on `dst_node`.
900    ///
901    /// If none of the edges existed in the graph, then `false` will be
902    /// returned.
903    pub fn disconnect(
904        &mut self,
905        src_node: NodeID,
906        dst_node: NodeID,
907        ports_src_dst: &[(PortIdx, PortIdx)],
908    ) -> bool {
909        self.graph.disconnect(src_node, dst_node, ports_src_dst)
910    }
911
912    /// Remove all connections (edges) between two nodes in the graph.
913    ///
914    /// * `src_node` - The ID of the source node.
915    /// * `dst_node` - The ID of the destination node.
916    pub fn disconnect_all_between(
917        &mut self,
918        src_node: NodeID,
919        dst_node: NodeID,
920    ) -> SmallVec<[EdgeID; 4]> {
921        self.graph.disconnect_all_between(src_node, dst_node)
922    }
923
924    /// Remove a connection (edge) via the edge's unique ID.
925    ///
926    /// If the edge did not exist in this graph, then `false` will be returned.
927    pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
928        self.graph.disconnect_by_edge_id(edge_id)
929    }
930
931    /// Get information about the given [Edge]
932    pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
933        self.graph.edge(edge_id)
934    }
935
936    /// Runs a check to see if a cycle exists in the audio graph.
937    ///
938    /// Note, this method is expensive.
939    pub fn cycle_detected(&mut self) -> bool {
940        self.graph.cycle_detected()
941    }
942
943    /// Queue an event to be sent to an audio node's processor.
944    ///
945    /// Note, this event will not be sent until the event queue is flushed
946    /// in [`FirewheelCtx::update`].
947    pub fn queue_event(&mut self, event: NodeEvent) {
948        self.event_group.push(event);
949    }
950
951    /// Queue an event to be sent to an audio node's processor.
952    ///
953    /// Note, this event will not be sent until the event queue is flushed
954    /// in [`FirewheelCtx::update`].
955    pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
956        self.queue_event(NodeEvent {
957            node_id,
958            #[cfg(feature = "scheduled_events")]
959            time: None,
960            event,
961        });
962    }
963
964    /// Queue an event at a certain time, to be sent to an audio node's processor.
965    ///
966    /// If `time` is `None`, then the event will occur as soon as the node's
967    /// processor receives the event.
968    ///
969    /// Note, this event will not be sent until the event queue is flushed
970    /// in [`FirewheelCtx::update`].
971    #[cfg(feature = "scheduled_events")]
972    pub fn schedule_event_for(
973        &mut self,
974        node_id: NodeID,
975        event: NodeEventType,
976        time: Option<EventInstant>,
977    ) {
978        self.queue_event(NodeEvent {
979            node_id,
980            time,
981            event,
982        });
983    }
984
985    /// Cancel scheduled events for all nodes.
986    ///
987    /// This will clear all events that have been scheduled since the last call to
988    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
989    /// to [`FirewheelCtx::update`] will not be canceled.
990    ///
991    /// This only takes effect once [`FirewheelCtx::update`] is called.
992    #[cfg(feature = "scheduled_events")]
993    pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
994        self.queued_clear_scheduled_events
995            .push(ClearScheduledEventsEvent {
996                node_id: None,
997                event_type,
998            });
999    }
1000
1001    /// Cancel scheduled events for a specific node.
1002    ///
1003    /// This will clear all events that have been scheduled since the last call to
1004    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
1005    /// to [`FirewheelCtx::update`] will not be canceled.
1006    ///
1007    /// This only takes effect once [`FirewheelCtx::update`] is called.
1008    #[cfg(feature = "scheduled_events")]
1009    pub fn cancel_scheduled_events_for(
1010        &mut self,
1011        node_id: NodeID,
1012        event_type: ClearScheduledEventsType,
1013    ) {
1014        self.queued_clear_scheduled_events
1015            .push(ClearScheduledEventsEvent {
1016                node_id: Some(node_id),
1017                event_type,
1018            });
1019    }
1020
1021    fn send_message_to_processor(
1022        &mut self,
1023        msg: ContextToProcessorMsg,
1024    ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
1025        self.to_processor_tx
1026            .try_push(msg)
1027            .map_err(|msg| (msg, UpdateError::MsgChannelFull))
1028    }
1029}
1030
1031impl<B: AudioBackend> Drop for FirewheelCtx<B> {
1032    fn drop(&mut self) {
1033        self.stop_stream();
1034
1035        // Wait for the processor to be drop to avoid deallocating it on
1036        // the audio thread.
1037        #[cfg(not(target_family = "wasm"))]
1038        if let Some(drop_rx) = self.processor_drop_rx.take() {
1039            let now = bevy_platform::time::Instant::now();
1040
1041            while drop_rx.try_peek().is_none() {
1042                if now.elapsed() > core::time::Duration::from_secs(2) {
1043                    break;
1044                }
1045
1046                bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
1047            }
1048        }
1049
1050        firewheel_core::collector::GlobalRtGc::collect();
1051    }
1052}
1053
1054impl<B: AudioBackend> FirewheelCtx<B> {
1055    /// Construct an [`ContextQueue`] for diffing.
1056    pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
1057        ContextQueue {
1058            context: self,
1059            id,
1060            #[cfg(feature = "scheduled_events")]
1061            time: None,
1062        }
1063    }
1064
1065    #[cfg(feature = "scheduled_events")]
1066    pub fn event_queue_scheduled(
1067        &mut self,
1068        id: NodeID,
1069        time: Option<EventInstant>,
1070    ) -> ContextQueue<'_, B> {
1071        ContextQueue {
1072            context: self,
1073            id,
1074            time,
1075        }
1076    }
1077}
1078
1079/// An event queue acquired from [`FirewheelCtx::event_queue`].
1080///
1081/// This can help reduce event queue allocations
1082/// when you have direct access to the context.
1083///
1084/// ```
1085/// # use firewheel_core::{diff::{Diff, PathBuilder}, node::NodeID};
1086/// # use firewheel_graph::{backend::AudioBackend, FirewheelCtx, ContextQueue};
1087/// # fn context_queue<B: AudioBackend, D: Diff>(
1088/// #     context: &mut FirewheelCtx<B>,
1089/// #     node_id: NodeID,
1090/// #     params: &D,
1091/// #     baseline: &D,
1092/// # ) {
1093/// // Get a queue that will send events directly to the provided node.
1094/// let mut queue = context.event_queue(node_id);
1095/// // Perform diffing using this queue.
1096/// params.diff(baseline, PathBuilder::default(), &mut queue);
1097/// # }
1098/// ```
1099pub struct ContextQueue<'a, B: AudioBackend> {
1100    context: &'a mut FirewheelCtx<B>,
1101    id: NodeID,
1102    #[cfg(feature = "scheduled_events")]
1103    time: Option<EventInstant>,
1104}
1105
1106#[cfg(feature = "scheduled_events")]
1107impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1108    pub fn time(&self) -> Option<EventInstant> {
1109        self.time
1110    }
1111}
1112
1113impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1114    fn push(&mut self, data: NodeEventType) {
1115        self.context.queue_event(NodeEvent {
1116            event: data,
1117            #[cfg(feature = "scheduled_events")]
1118            time: self.time,
1119            node_id: self.id,
1120        });
1121    }
1122}
1123
1124/// The type of scheduled events to clear
1125#[cfg(feature = "scheduled_events")]
1126#[derive(Default, Debug, Clone, Copy, PartialEq)]
1127pub enum ClearScheduledEventsType {
1128    /// Clear both musical and non-musical scheduled events.
1129    #[default]
1130    All,
1131    /// Clear only non-musical scheduled events.
1132    NonMusicalOnly,
1133    /// Clear only musical scheduled events.
1134    MusicalOnly,
1135}
1136
1137fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1138    clock: &SharedClock<B::Instant>,
1139    active_state: &Option<ActiveState<B>>,
1140) -> Option<(Instant, Duration)> {
1141    active_state.as_ref().and_then(|active_state| {
1142        clock
1143            .process_timestamp
1144            .clone()
1145            .and_then(|process_timestamp| {
1146                active_state
1147                    .backend_handle
1148                    .delay_from_last_process(process_timestamp)
1149                    .and_then(|delay| {
1150                        Instant::now()
1151                            .checked_sub(delay)
1152                            .map(|instant| (instant, delay))
1153                    })
1154            })
1155    })
1156}