firewheel_graph/
context.rs

1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::{
9    channel_config::{ChannelConfig, ChannelCount},
10    clock::AudioClock,
11    collector::Collector,
12    dsp::declick::DeclickValues,
13    event::{NodeEvent, NodeEventType},
14    node::{AudioNode, DynAudioNode, NodeID},
15    StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
21use bevy_platform::prelude::Box;
22#[cfg(not(feature = "std"))]
23use bevy_platform::prelude::Vec;
24
25use crate::error::RemoveNodeError;
26use crate::processor::BufferOutOfSpaceMode;
27use crate::{
28    backend::{AudioBackend, DeviceInfo},
29    error::{AddEdgeError, StartStreamError, UpdateError},
30    graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
31    processor::{
32        ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
33        SharedClock,
34    },
35};
36
37#[cfg(feature = "scheduled_events")]
38use crate::processor::ClearScheduledEventsEvent;
39#[cfg(feature = "scheduled_events")]
40use firewheel_core::clock::EventInstant;
41
42#[cfg(feature = "musical_transport")]
43use firewheel_core::clock::TransportState;
44
45/// The configuration of a Firewheel context.
46#[derive(Debug, Clone, Copy, PartialEq)]
47pub struct FirewheelConfig {
48    /// The number of input channels in the audio graph.
49    pub num_graph_inputs: ChannelCount,
50    /// The number of output channels in the audio graph.
51    pub num_graph_outputs: ChannelCount,
52    /// If `true`, then all outputs will be hard clipped at 0db to help
53    /// protect the system's speakers.
54    ///
55    /// Note that most operating systems already hard clip the output,
56    /// so this is usually not needed (TODO: Do research to see if this
57    /// assumption is true.)
58    ///
59    /// By default this is set to `false`.
60    pub hard_clip_outputs: bool,
61    /// An initial capacity to allocate for the nodes in the audio graph.
62    ///
63    /// By default this is set to `64`.
64    pub initial_node_capacity: u32,
65    /// An initial capacity to allocate for the edges in the audio graph.
66    ///
67    /// By default this is set to `256`.
68    pub initial_edge_capacity: u32,
69    /// The amount of time in seconds to fade in/out when pausing/resuming
70    /// to avoid clicks and pops.
71    ///
72    /// By default this is set to `10.0 / 1_000.0`.
73    pub declick_seconds: f32,
74    /// The initial capacity for a group of events.
75    ///
76    /// By default this is set to `128`.
77    pub initial_event_group_capacity: u32,
78    /// The capacity of the engine's internal message channel.
79    ///
80    /// By default this is set to `64`.
81    pub channel_capacity: u32,
82    /// The maximum number of events that can be sent in a single call
83    /// to [`AudioNodeProcessor::process`].
84    ///
85    /// By default this is set to `128`.
86    ///
87    /// [`AudioNodeProcessor::process`]: firewheel_core::node::AudioNodeProcessor::process
88    pub event_queue_capacity: usize,
89    /// The maximum number of immediate events (events that do *NOT* have a
90    /// scheduled time component) that can be stored at once in the audio
91    /// thread.
92    ///
93    /// By default this is set to `512`.
94    pub immediate_event_capacity: usize,
95    /// The maximum number of scheduled events (events that have a scheduled
96    /// time component) that can be stored at once in the audio thread.
97    ///
98    /// This can be set to `0` to save some memory if you do not plan on using
99    /// scheduled events.
100    ///
101    /// By default this is set to `512`.
102    #[cfg(feature = "scheduled_events")]
103    pub scheduled_event_capacity: usize,
104    /// How to handle event buffers on the audio thread running out of space.
105    ///
106    /// By default this is set to [`BufferOutOfSpaceMode::AllocateOnAudioThread`].
107    pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
108
109    /// The configuration of the realtime safe logger.
110    pub logger_config: RealtimeLoggerConfig,
111
112    /// Force all buffers in the audio graph to be cleared when processing.
113    ///
114    /// This is only meant to be used when diagnosing a misbehaving audio node.
115    /// Enabling this significantly increases processing overhead.
116    ///
117    /// If enabling this fixes audio glitching issues, then notify the node
118    /// author of the misbehavior.
119    ///
120    /// By default this is set to `false`.
121    pub debug_force_clear_buffers: bool,
122}
123
124impl Default for FirewheelConfig {
125    fn default() -> Self {
126        Self {
127            num_graph_inputs: ChannelCount::ZERO,
128            num_graph_outputs: ChannelCount::STEREO,
129            hard_clip_outputs: false,
130            initial_node_capacity: 128,
131            initial_edge_capacity: 256,
132            declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
133            initial_event_group_capacity: 128,
134            channel_capacity: 64,
135            event_queue_capacity: 128,
136            immediate_event_capacity: 512,
137            #[cfg(feature = "scheduled_events")]
138            scheduled_event_capacity: 512,
139            buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
140            logger_config: RealtimeLoggerConfig::default(),
141            debug_force_clear_buffers: false,
142        }
143    }
144}
145
146struct ActiveState<B: AudioBackend> {
147    backend_handle: B,
148    stream_info: StreamInfo,
149}
150
151/// A Firewheel context
152pub struct FirewheelCtx<B: AudioBackend> {
153    graph: AudioGraph,
154
155    to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
156    from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
157    logger_rx: RealtimeLoggerMainThread,
158
159    active_state: Option<ActiveState<B>>,
160
161    processor_channel: Option<(
162        ringbuf::HeapCons<ContextToProcessorMsg>,
163        ringbuf::HeapProd<ProcessorToContextMsg>,
164        triple_buffer::Input<SharedClock<B::Instant>>,
165        RealtimeLogger,
166    )>,
167    processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
168
169    shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
170    sample_rate: NonZeroU32,
171    sample_rate_recip: f64,
172
173    #[cfg(feature = "musical_transport")]
174    transport_state: Box<TransportState>,
175    #[cfg(feature = "musical_transport")]
176    transport_state_alloc_reuse: Option<Box<TransportState>>,
177
178    // Re-use the allocations for groups of events.
179    event_group_pool: Vec<Vec<NodeEvent>>,
180    event_group: Vec<NodeEvent>,
181    initial_event_group_capacity: usize,
182
183    #[cfg(feature = "scheduled_events")]
184    queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
185
186    config: FirewheelConfig,
187}
188
189impl<B: AudioBackend> FirewheelCtx<B> {
190    /// Create a new Firewheel context.
191    pub fn new(config: FirewheelConfig) -> Self {
192        let (to_processor_tx, from_context_rx) =
193            ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
194        let (to_context_tx, from_processor_rx) =
195            ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
196                .split();
197
198        let initial_event_group_capacity = config.initial_event_group_capacity as usize;
199        let mut event_group_pool = Vec::with_capacity(16);
200        for _ in 0..3 {
201            event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
202        }
203
204        let (shared_clock_input, shared_clock_output) =
205            triple_buffer::triple_buffer(&SharedClock::default());
206
207        let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
208
209        Self {
210            graph: AudioGraph::new(&config),
211            to_processor_tx,
212            from_processor_rx,
213            logger_rx,
214            active_state: None,
215            processor_channel: Some((from_context_rx, to_context_tx, shared_clock_input, logger)),
216            processor_drop_rx: None,
217            shared_clock_output: RefCell::new(shared_clock_output),
218            sample_rate: NonZeroU32::new(44100).unwrap(),
219            sample_rate_recip: 44100.0f64.recip(),
220            #[cfg(feature = "musical_transport")]
221            transport_state: Box::new(TransportState::default()),
222            #[cfg(feature = "musical_transport")]
223            transport_state_alloc_reuse: None,
224            event_group_pool,
225            event_group: Vec::with_capacity(initial_event_group_capacity),
226            initial_event_group_capacity,
227            #[cfg(feature = "scheduled_events")]
228            queued_clear_scheduled_events: Vec::new(),
229            config,
230        }
231    }
232
233    /// Get a reference to the currently active instance of the backend. Returns `None` if the backend has not
234    /// yet been initialized with `start_stream`.
235    pub fn active_backend(&self) -> Option<&B> {
236        self.active_state
237            .as_ref()
238            .map(|state| &state.backend_handle)
239    }
240
241    /// Get a mutable reference to the currently active instance of the backend. Returns `None` if the backend has not
242    /// yet been initialized with `start_stream`.
243    pub fn active_backend_mut(&mut self) -> Option<&mut B> {
244        self.active_state
245            .as_mut()
246            .map(|state| &mut state.backend_handle)
247    }
248
249    /// Get a list of the available audio input devices.
250    pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
251        B::available_input_devices()
252    }
253
254    /// Get a list of the available audio output devices.
255    pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
256        B::available_output_devices()
257    }
258
259    /// Returns `true` if an audio stream can be started right now.
260    ///
261    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
262    /// old stream to be fully stopped. This method is used to check if it has been
263    /// dropped yet.
264    ///
265    /// Note, in rare cases where the audio thread crashes without cleanly dropping
266    /// its contents, this may never return `true`. Consider adding a timeout to
267    /// avoid deadlocking.
268    pub fn can_start_stream(&self) -> bool {
269        if self.is_audio_stream_running() {
270            false
271        } else if let Some(rx) = &self.processor_drop_rx {
272            rx.try_peek().is_some()
273        } else {
274            true
275        }
276    }
277
278    /// Start an audio stream for this context. Only one audio stream can exist on
279    /// a context at a time.
280    ///
281    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
282    /// old stream to be fully stopped. Use [`FirewheelCtx::can_start_stream`] to
283    /// check if it has been dropped yet.
284    ///
285    /// Note, in rare cases where the audio thread crashes without cleanly dropping
286    /// its contents, this may never succeed. Consider adding a timeout to avoid
287    /// deadlocking.
288    pub fn start_stream(
289        &mut self,
290        config: B::Config,
291    ) -> Result<(), StartStreamError<B::StartStreamError>> {
292        if self.is_audio_stream_running() {
293            return Err(StartStreamError::AlreadyStarted);
294        }
295
296        if !self.can_start_stream() {
297            return Err(StartStreamError::OldStreamNotFinishedStopping);
298        }
299
300        let (mut backend_handle, mut stream_info) =
301            B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
302
303        stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
304        stream_info.declick_frames = NonZeroU32::new(
305            (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
306        )
307        .unwrap_or(NonZeroU32::MIN);
308
309        let maybe_processor = self.processor_channel.take();
310
311        stream_info.prev_sample_rate = if maybe_processor.is_some() {
312            stream_info.sample_rate
313        } else {
314            self.sample_rate
315        };
316
317        self.sample_rate = stream_info.sample_rate;
318        self.sample_rate_recip = stream_info.sample_rate_recip;
319
320        let schedule = self.graph.compile(&stream_info)?;
321
322        let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
323
324        let processor = if let Some((from_context_rx, to_context_tx, shared_clock_input, logger)) =
325            maybe_processor
326        {
327            FirewheelProcessorInner::new(
328                from_context_rx,
329                to_context_tx,
330                shared_clock_input,
331                self.config.immediate_event_capacity,
332                #[cfg(feature = "scheduled_events")]
333                self.config.scheduled_event_capacity,
334                self.config.event_queue_capacity,
335                &stream_info,
336                self.config.hard_clip_outputs,
337                self.config.buffer_out_of_space_mode,
338                logger,
339                self.config.debug_force_clear_buffers,
340            )
341        } else {
342            let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
343
344            if processor.poisoned {
345                panic!("The audio thread has panicked!");
346            }
347
348            processor.new_stream(&stream_info);
349
350            processor
351        };
352
353        backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
354
355        if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
356        {
357            panic!("Firewheel message channel is full!");
358        }
359
360        self.active_state = Some(ActiveState {
361            backend_handle,
362            stream_info,
363        });
364        self.processor_drop_rx = Some(drop_rx);
365
366        Ok(())
367    }
368
369    /// Stop the audio stream in this context.
370    pub fn stop_stream(&mut self) {
371        // When the backend handle is dropped, the backend will automatically
372        // stop its stream.
373        self.active_state = None;
374        self.graph.deactivate();
375    }
376
377    /// Returns `true` if there is currently a running audio stream.
378    pub fn is_audio_stream_running(&self) -> bool {
379        self.active_state.is_some()
380    }
381
382    /// Information about the running audio stream.
383    ///
384    /// Returns `None` if no audio stream is currently running.
385    pub fn stream_info(&self) -> Option<&StreamInfo> {
386        self.active_state.as_ref().map(|s| &s.stream_info)
387    }
388
389    /// Get the current time of the audio clock, without accounting for the delay
390    /// between when the clock was last updated and now.
391    ///
392    /// For most use cases you probably want to use [`FirewheelCtx::audio_clock_corrected`]
393    /// instead, but this method is provided if needed.
394    ///
395    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
396    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
397    /// that has been processed.) For applications where the timing of audio events is
398    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
399    /// OS's clock (`Instant::now()`).
400    ///
401    /// Note, calling this method is not super cheap, so avoid calling it many
402    /// times within the same game loop iteration if possible.
403    pub fn audio_clock(&self) -> AudioClock {
404        // Reading the latest value of the clock doesn't meaningfully mutate
405        // state, so treat it as an immutable operation with interior mutability.
406        //
407        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
408        // will never panic.
409        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
410        let clock = clock_borrowed.read();
411
412        let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
413            .map(|(update_instant, _delay)| update_instant);
414
415        AudioClock {
416            samples: clock.clock_samples,
417            seconds: clock
418                .clock_samples
419                .to_seconds(self.sample_rate, self.sample_rate_recip),
420            #[cfg(feature = "musical_transport")]
421            musical: clock.current_playhead,
422            #[cfg(feature = "musical_transport")]
423            transport_is_playing: clock.transport_is_playing,
424            update_instant,
425        }
426    }
427
428    /// Get the current time of the audio clock.
429    ///
430    /// Unlike, [`FirewheelCtx::audio_clock`], this method accounts for the delay
431    /// between when the audio clock was last updated and now, leading to a more
432    /// accurate result for games and other applications.
433    ///
434    /// If the delay could not be determined (i.e. an audio stream is not currently
435    /// running), then this will assume there was no delay between when the audio
436    /// clock was last updated and now.
437    ///
438    /// Note, due to the nature of audio processing, this clock is is *NOT* synced with
439    /// the system's time (`Instant::now`). (Instead it is based on the amount of data
440    /// that has been processed.) For applications where the timing of audio events is
441    /// critical (i.e. a rythm game), sync the game to this audio clock instead of the
442    /// OS's clock (`Instant::now()`).
443    ///
444    /// Note, calling this method is not super cheap, so avoid calling it many
445    /// times within the same game loop iteration if possible.
446    pub fn audio_clock_corrected(&self) -> AudioClock {
447        // Reading the latest value of the clock doesn't meaningfully mutate
448        // state, so treat it as an immutable operation with interior mutability.
449        //
450        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
451        // will never panic.
452        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
453        let clock = clock_borrowed.read();
454
455        let Some((update_instant, delay)) =
456            audio_clock_update_instant_and_delay(&clock, &self.active_state)
457        else {
458            // The audio thread is not currently running, so just return the
459            // latest value of the clock.
460            return AudioClock {
461                samples: clock.clock_samples,
462                seconds: clock
463                    .clock_samples
464                    .to_seconds(self.sample_rate, self.sample_rate_recip),
465                #[cfg(feature = "musical_transport")]
466                musical: clock.current_playhead,
467                #[cfg(feature = "musical_transport")]
468                transport_is_playing: clock.transport_is_playing,
469                update_instant: None,
470            };
471        };
472
473        // Account for the delay between when the clock was last updated and now.
474        let delta_seconds = DurationSeconds(delay.as_secs_f64());
475
476        let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
477
478        #[cfg(feature = "musical_transport")]
479        let musical = clock.current_playhead.map(|musical_time| {
480            if clock.transport_is_playing && self.transport_state.transport.is_some() {
481                self.transport_state
482                    .transport
483                    .as_ref()
484                    .unwrap()
485                    .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
486            } else {
487                musical_time
488            }
489        });
490
491        AudioClock {
492            samples,
493            seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
494            #[cfg(feature = "musical_transport")]
495            musical,
496            #[cfg(feature = "musical_transport")]
497            transport_is_playing: clock.transport_is_playing,
498            update_instant: Some(update_instant),
499        }
500    }
501
502    /// Get the instant the audio clock was last updated.
503    ///
504    /// This method accounts for the delay between when the audio clock was last
505    /// updated and now, leading to a more accurate result for games and other
506    /// applications.
507    ///
508    /// If the audio thread is not currently running, or if the delay could not
509    /// be determined for any other reason, then this will return `None`.
510    ///
511    /// Note, calling this method is not super cheap, so avoid calling it many
512    /// times within the same game loop iteration if possible.
513    pub fn audio_clock_instant(&self) -> Option<Instant> {
514        // Reading the latest value of the clock doesn't meaningfully mutate
515        // state, so treat it as an immutable operation with interior mutability.
516        //
517        // PANIC SAFETY: This struct is the only place this is ever borrowed, so this
518        // will never panic.
519        let mut clock_borrowed = self.shared_clock_output.borrow_mut();
520        let clock = clock_borrowed.read();
521
522        audio_clock_update_instant_and_delay(&clock, &self.active_state)
523            .map(|(update_instant, _delay)| update_instant)
524    }
525
526    /// Sync the state of the musical transport.
527    ///
528    /// If the message channel is full, then this will return an error.
529    #[cfg(feature = "musical_transport")]
530    pub fn sync_transport(
531        &mut self,
532        transport: &TransportState,
533    ) -> Result<(), UpdateError<B::StreamError>> {
534        if &*self.transport_state != transport {
535            let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
536                *t = transport.clone();
537                t
538            } else {
539                Box::new(transport.clone())
540            };
541
542            self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
543                .map_err(|(_, e)| e)?;
544
545            *self.transport_state = transport.clone();
546        }
547
548        Ok(())
549    }
550
551    /// Get the current transport state.
552    #[cfg(feature = "musical_transport")]
553    pub fn transport_state(&self) -> &TransportState {
554        &self.transport_state
555    }
556
557    /// Get the current transport state.
558    #[cfg(feature = "musical_transport")]
559    pub fn transport(&self) -> &TransportState {
560        &self.transport_state
561    }
562
563    /// Whether or not outputs are being hard clipped at 0dB.
564    pub fn hard_clip_outputs(&self) -> bool {
565        self.config.hard_clip_outputs
566    }
567
568    /// Set whether or not outputs should be hard clipped at 0dB to
569    /// help protect the system's speakers.
570    ///
571    /// Note that most operating systems already hard clip the output,
572    /// so this is usually not needed (TODO: Do research to see if this
573    /// assumption is true.)
574    ///
575    /// If the message channel is full, then this will return an error.
576    pub fn set_hard_clip_outputs(
577        &mut self,
578        hard_clip_outputs: bool,
579    ) -> Result<(), UpdateError<B::StreamError>> {
580        if self.config.hard_clip_outputs == hard_clip_outputs {
581            return Ok(());
582        }
583        self.config.hard_clip_outputs = hard_clip_outputs;
584
585        self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
586            .map_err(|(_, e)| e)
587    }
588
589    /// Update the firewheel context.
590    ///
591    /// This must be called reguarly (i.e. once every frame).
592    pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
593        self.logger_rx.flush();
594
595        firewheel_core::collector::GlobalCollector.collect();
596
597        for msg in self.from_processor_rx.pop_iter() {
598            match msg {
599                ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
600                    event_group.clear();
601                    self.event_group_pool.push(event_group);
602                }
603                ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
604                    let _ = schedule_data;
605                }
606                #[cfg(feature = "musical_transport")]
607                ProcessorToContextMsg::ReturnTransportState(transport_state) => {
608                    if self.transport_state_alloc_reuse.is_none() {
609                        self.transport_state_alloc_reuse = Some(transport_state);
610                    }
611                }
612                #[cfg(feature = "scheduled_events")]
613                ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
614                    let _ = msgs;
615                }
616            }
617        }
618
619        self.graph.update(
620            self.active_state.as_ref().map(|s| &s.stream_info),
621            &mut self.event_group,
622        );
623
624        if let Some(active_state) = &mut self.active_state {
625            if let Err(e) = active_state.backend_handle.poll_status() {
626                self.active_state = None;
627                self.graph.deactivate();
628
629                return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
630            }
631
632            if self
633                .processor_drop_rx
634                .as_ref()
635                .unwrap()
636                .try_peek()
637                .is_some()
638            {
639                self.active_state = None;
640                self.graph.deactivate();
641
642                return Err(UpdateError::StreamStoppedUnexpectedly(None));
643            }
644        }
645
646        if self.is_audio_stream_running() {
647            if self.graph.needs_compile() {
648                let schedule_data = self
649                    .graph
650                    .compile(&self.active_state.as_ref().unwrap().stream_info)?;
651
652                if let Err((msg, e)) = self
653                    .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
654                {
655                    let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
656                        unreachable!();
657                    };
658
659                    self.graph.on_schedule_send_failed(schedule);
660
661                    return Err(e);
662                }
663            }
664
665            #[cfg(feature = "scheduled_events")]
666            if !self.queued_clear_scheduled_events.is_empty() {
667                let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
668                    self.queued_clear_scheduled_events.drain(..).collect();
669
670                if let Err((msg, e)) = self
671                    .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
672                {
673                    let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
674                        unreachable!();
675                    };
676
677                    self.queued_clear_scheduled_events = msgs.drain(..).collect();
678
679                    return Err(e);
680                }
681            }
682
683            if !self.event_group.is_empty() {
684                let mut next_event_group = self
685                    .event_group_pool
686                    .pop()
687                    .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
688                core::mem::swap(&mut next_event_group, &mut self.event_group);
689
690                if let Err((msg, e)) = self
691                    .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
692                {
693                    let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
694                        unreachable!();
695                    };
696
697                    core::mem::swap(&mut event_group, &mut self.event_group);
698                    self.event_group_pool.push(event_group);
699
700                    return Err(e);
701                }
702            }
703        }
704
705        Ok(())
706    }
707
708    /// The ID of the graph input node
709    pub fn graph_in_node_id(&self) -> NodeID {
710        self.graph.graph_in_node()
711    }
712
713    /// The ID of the graph output node
714    pub fn graph_out_node_id(&self) -> NodeID {
715        self.graph.graph_out_node()
716    }
717
718    /// Add a node to the audio graph.
719    pub fn add_node<T: AudioNode + 'static>(
720        &mut self,
721        node: T,
722        config: Option<T::Configuration>,
723    ) -> NodeID {
724        self.graph.add_node(node, config)
725    }
726
727    /// Add a node to the audio graph which implements the type-erased [`DynAudioNode`] trait.
728    pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
729        self.graph.add_dyn_node(node)
730    }
731
732    /// Remove the given node from the audio graph.
733    ///
734    /// This will automatically remove all edges from the graph that
735    /// were connected to this node.
736    ///
737    /// On success, this returns a list of all edges that were removed
738    /// from the graph as a result of removing this node.
739    ///
740    /// This will return an error if the ID is of the graph input or graph
741    /// output node.
742    pub fn remove_node(
743        &mut self,
744        node_id: NodeID,
745    ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
746        self.graph.remove_node(node_id)
747    }
748
749    /// Get information about a node in the graph.
750    pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
751        self.graph.node_info(id)
752    }
753
754    /// Get an immutable reference to the custom state of a node.
755    pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
756        self.graph.node_state(id)
757    }
758
759    /// Get a type-erased, immutable reference to the custom state of a node.
760    pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
761        self.graph.node_state_dyn(id)
762    }
763
764    /// Get a mutable reference to the custom state of a node.
765    pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
766        self.graph.node_state_mut(id)
767    }
768
769    pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
770        self.graph.node_state_dyn_mut(id)
771    }
772
773    /// Get a list of all the existing nodes in the graph.
774    pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
775        self.graph.nodes()
776    }
777
778    /// Get a list of all the existing edges in the graph.
779    pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
780        self.graph.edges()
781    }
782
783    /// Set the number of input and output channels to and from the audio graph.
784    ///
785    /// Returns the list of edges that were removed.
786    pub fn set_graph_channel_config(
787        &mut self,
788        channel_config: ChannelConfig,
789    ) -> SmallVec<[EdgeID; 4]> {
790        self.graph.set_graph_channel_config(channel_config)
791    }
792
793    /// Add connections (edges) between two nodes to the graph.
794    ///
795    /// * `src_node` - The ID of the source node.
796    /// * `dst_node` - The ID of the destination node.
797    /// * `ports_src_dst` - The port indices for each connection to make,
798    /// where the first value in a tuple is the output port on `src_node`,
799    /// and the second value in that tuple is the input port on `dst_node`.
800    /// * `check_for_cycles` - If `true`, then this will run a check to
801    /// see if adding these edges will create a cycle in the graph, and
802    /// return an error if it does. Note, checking for cycles can be quite
803    /// expensive, so avoid enabling this when calling this method many times
804    /// in a row.
805    ///
806    /// If successful, then this returns a list of edge IDs in order.
807    ///
808    /// If this returns an error, then the audio graph has not been
809    /// modified.
810    pub fn connect(
811        &mut self,
812        src_node: NodeID,
813        dst_node: NodeID,
814        ports_src_dst: &[(PortIdx, PortIdx)],
815        check_for_cycles: bool,
816    ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
817        self.graph
818            .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
819    }
820
821    /// Remove connections (edges) between two nodes from the graph.
822    ///
823    /// * `src_node` - The ID of the source node.
824    /// * `dst_node` - The ID of the destination node.
825    /// * `ports_src_dst` - The port indices for each connection to make,
826    /// where the first value in a tuple is the output port on `src_node`,
827    /// and the second value in that tuple is the input port on `dst_node`.
828    ///
829    /// If none of the edges existed in the graph, then `false` will be
830    /// returned.
831    pub fn disconnect(
832        &mut self,
833        src_node: NodeID,
834        dst_node: NodeID,
835        ports_src_dst: &[(PortIdx, PortIdx)],
836    ) -> bool {
837        self.graph.disconnect(src_node, dst_node, ports_src_dst)
838    }
839
840    /// Remove all connections (edges) between two nodes in the graph.
841    ///
842    /// * `src_node` - The ID of the source node.
843    /// * `dst_node` - The ID of the destination node.
844    pub fn disconnect_all_between(
845        &mut self,
846        src_node: NodeID,
847        dst_node: NodeID,
848    ) -> SmallVec<[EdgeID; 4]> {
849        self.graph.disconnect_all_between(src_node, dst_node)
850    }
851
852    /// Remove a connection (edge) via the edge's unique ID.
853    ///
854    /// If the edge did not exist in this graph, then `false` will be returned.
855    pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
856        self.graph.disconnect_by_edge_id(edge_id)
857    }
858
859    /// Get information about the given [Edge]
860    pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
861        self.graph.edge(edge_id)
862    }
863
864    /// Runs a check to see if a cycle exists in the audio graph.
865    ///
866    /// Note, this method is expensive.
867    pub fn cycle_detected(&mut self) -> bool {
868        self.graph.cycle_detected()
869    }
870
871    /// Queue an event to be sent to an audio node's processor.
872    ///
873    /// Note, this event will not be sent until the event queue is flushed
874    /// in [`FirewheelCtx::update`].
875    pub fn queue_event(&mut self, event: NodeEvent) {
876        self.event_group.push(event);
877    }
878
879    /// Queue an event to be sent to an audio node's processor.
880    ///
881    /// Note, this event will not be sent until the event queue is flushed
882    /// in [`FirewheelCtx::update`].
883    pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
884        self.queue_event(NodeEvent {
885            node_id,
886            #[cfg(feature = "scheduled_events")]
887            time: None,
888            event,
889        });
890    }
891
892    /// Queue an event at a certain time, to be sent to an audio node's processor.
893    ///
894    /// If `time` is `None`, then the event will occur as soon as the node's
895    /// processor receives the event.
896    ///
897    /// Note, this event will not be sent until the event queue is flushed
898    /// in [`FirewheelCtx::update`].
899    #[cfg(feature = "scheduled_events")]
900    pub fn schedule_event_for(
901        &mut self,
902        node_id: NodeID,
903        event: NodeEventType,
904        time: Option<EventInstant>,
905    ) {
906        self.queue_event(NodeEvent {
907            node_id,
908            time,
909            event,
910        });
911    }
912
913    /// Cancel scheduled events for all nodes.
914    ///
915    /// This will clear all events that have been scheduled since the last call to
916    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
917    /// to [`FirewheelCtx::update`] will not be canceled.
918    ///
919    /// This only takes effect once [`FirewheelCtx::update`] is called.
920    #[cfg(feature = "scheduled_events")]
921    pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
922        self.queued_clear_scheduled_events
923            .push(ClearScheduledEventsEvent {
924                node_id: None,
925                event_type,
926            });
927    }
928
929    /// Cancel scheduled events for a specific node.
930    ///
931    /// This will clear all events that have been scheduled since the last call to
932    /// [`FirewheelCtx::update`]. Any events scheduled between then and the next call
933    /// to [`FirewheelCtx::update`] will not be canceled.
934    ///
935    /// This only takes effect once [`FirewheelCtx::update`] is called.
936    #[cfg(feature = "scheduled_events")]
937    pub fn cancel_scheduled_events_for(
938        &mut self,
939        node_id: NodeID,
940        event_type: ClearScheduledEventsType,
941    ) {
942        self.queued_clear_scheduled_events
943            .push(ClearScheduledEventsEvent {
944                node_id: Some(node_id),
945                event_type,
946            });
947    }
948
949    fn send_message_to_processor(
950        &mut self,
951        msg: ContextToProcessorMsg,
952    ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
953        self.to_processor_tx
954            .try_push(msg)
955            .map_err(|msg| (msg, UpdateError::MsgChannelFull))
956    }
957}
958
959impl<B: AudioBackend> Drop for FirewheelCtx<B> {
960    fn drop(&mut self) {
961        self.stop_stream();
962
963        // Wait for the processor to be drop to avoid deallocating it on
964        // the audio thread.
965        #[cfg(not(target_family = "wasm"))]
966        if let Some(drop_rx) = self.processor_drop_rx.take() {
967            let now = bevy_platform::time::Instant::now();
968
969            while drop_rx.try_peek().is_none() {
970                if now.elapsed() > core::time::Duration::from_secs(2) {
971                    break;
972                }
973
974                bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
975            }
976        }
977
978        firewheel_core::collector::GlobalCollector.collect();
979    }
980}
981
982impl<B: AudioBackend> FirewheelCtx<B> {
983    /// Construct an [`ContextQueue`] for diffing.
984    pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
985        ContextQueue {
986            context: self,
987            id,
988            #[cfg(feature = "scheduled_events")]
989            time: None,
990        }
991    }
992
993    #[cfg(feature = "scheduled_events")]
994    pub fn event_queue_scheduled(
995        &mut self,
996        id: NodeID,
997        time: Option<EventInstant>,
998    ) -> ContextQueue<'_, B> {
999        ContextQueue {
1000            context: self,
1001            id,
1002            time,
1003        }
1004    }
1005}
1006
1007/// An event queue acquired from [`FirewheelCtx::event_queue`].
1008///
1009/// This can help reduce event queue allocations
1010/// when you have direct access to the context.
1011///
1012/// ```
1013/// # use firewheel_core::{diff::{Diff, PathBuilder}, node::NodeID};
1014/// # use firewheel_graph::{backend::AudioBackend, FirewheelCtx, ContextQueue};
1015/// # fn context_queue<B: AudioBackend, D: Diff>(
1016/// #     context: &mut FirewheelCtx<B>,
1017/// #     node_id: NodeID,
1018/// #     params: &D,
1019/// #     baseline: &D,
1020/// # ) {
1021/// // Get a queue that will send events directly to the provided node.
1022/// let mut queue = context.event_queue(node_id);
1023/// // Perform diffing using this queue.
1024/// params.diff(baseline, PathBuilder::default(), &mut queue);
1025/// # }
1026/// ```
1027pub struct ContextQueue<'a, B: AudioBackend> {
1028    context: &'a mut FirewheelCtx<B>,
1029    id: NodeID,
1030    #[cfg(feature = "scheduled_events")]
1031    time: Option<EventInstant>,
1032}
1033
1034#[cfg(feature = "scheduled_events")]
1035impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1036    pub fn time(&self) -> Option<EventInstant> {
1037        self.time
1038    }
1039}
1040
1041impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1042    fn push(&mut self, data: NodeEventType) {
1043        self.context.queue_event(NodeEvent {
1044            event: data,
1045            #[cfg(feature = "scheduled_events")]
1046            time: self.time,
1047            node_id: self.id,
1048        });
1049    }
1050}
1051
1052/// The type of scheduled events to clear in a [`ClearScheduledEvents`] message.
1053#[cfg(feature = "scheduled_events")]
1054#[derive(Default, Debug, Clone, Copy, PartialEq)]
1055pub enum ClearScheduledEventsType {
1056    /// Clear both musical and non-musical scheduled events.
1057    #[default]
1058    All,
1059    /// Clear only non-musical scheduled events.
1060    NonMusicalOnly,
1061    /// Clear only musical scheduled events.
1062    MusicalOnly,
1063}
1064
1065fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1066    clock: &SharedClock<B::Instant>,
1067    active_state: &Option<ActiveState<B>>,
1068) -> Option<(Instant, Duration)> {
1069    active_state.as_ref().and_then(|active_state| {
1070        clock
1071            .process_timestamp
1072            .clone()
1073            .and_then(|process_timestamp| {
1074                active_state
1075                    .backend_handle
1076                    .delay_from_last_process(process_timestamp)
1077                    .and_then(|delay| {
1078                        Instant::now()
1079                            .checked_sub(delay)
1080                            .map(|instant| (instant, delay))
1081                    })
1082            })
1083    })
1084}