firewheel_graph/
context.rs

1use bevy_platform::sync::{
2    atomic::{AtomicI64, Ordering},
3    Arc,
4};
5use core::any::Any;
6use core::num::NonZeroU32;
7use firewheel_core::{
8    atomic_float::AtomicF64,
9    channel_config::{ChannelConfig, ChannelCount},
10    clock::{ClockSamples, ClockSeconds, MusicalTime, MusicalTransport},
11    collector::Collector,
12    dsp::declick::DeclickValues,
13    event::{NodeEvent, NodeEventType},
14    node::{AudioNode, DynAudioNode, NodeID},
15    StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20use crate::{
21    backend::{AudioBackend, DeviceInfo},
22    error::{AddEdgeError, StartStreamError, UpdateError},
23    graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
24    processor::{
25        ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
26    },
27};
28
29/// The configuration of a Firewheel context.
30#[derive(Debug, Clone, Copy, PartialEq)]
31pub struct FirewheelConfig {
32    /// The number of input channels in the audio graph.
33    pub num_graph_inputs: ChannelCount,
34    /// The number of output channels in the audio graph.
35    pub num_graph_outputs: ChannelCount,
36    /// If `true`, then all outputs will be hard clipped at 0db to help
37    /// protect the system's speakers.
38    ///
39    /// Note that most operating systems already hard clip the output,
40    /// so this is usually not needed (TODO: Do research to see if this
41    /// assumption is true.)
42    ///
43    /// By default this is set to `false`.
44    pub hard_clip_outputs: bool,
45    /// An initial capacity to allocate for the nodes in the audio graph.
46    ///
47    /// By default this is set to `64`.
48    pub initial_node_capacity: u32,
49    /// An initial capacity to allocate for the edges in the audio graph.
50    ///
51    /// By default this is set to `256`.
52    pub initial_edge_capacity: u32,
53    /// The amount of time in seconds to fade in/out when pausing/resuming
54    /// to avoid clicks and pops.
55    ///
56    /// By default this is set to `10.0 / 1_000.0`.
57    pub declick_seconds: f32,
58    /// The initial capacity for a group of events.
59    ///
60    /// By default this is set to `128`.
61    pub initial_event_group_capacity: u32,
62    /// The capacity of the engine's internal message channel.
63    ///
64    /// By default this is set to `64`.
65    pub channel_capacity: u32,
66    /// The capacity of an event queue in the engine (one event queue per node).
67    ///
68    /// By default this is set to `128`.
69    pub event_queue_capacity: u32,
70}
71
72impl Default for FirewheelConfig {
73    fn default() -> Self {
74        Self {
75            num_graph_inputs: ChannelCount::ZERO,
76            num_graph_outputs: ChannelCount::STEREO,
77            hard_clip_outputs: false,
78            initial_node_capacity: 128,
79            initial_edge_capacity: 256,
80            declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
81            initial_event_group_capacity: 128,
82            channel_capacity: 64,
83            event_queue_capacity: 128,
84        }
85    }
86}
87
88struct ActiveState<B: AudioBackend> {
89    backend_handle: B,
90    stream_info: StreamInfo,
91}
92
93/// A Firewheel context
94pub struct FirewheelCtx<B: AudioBackend> {
95    graph: AudioGraph,
96
97    to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
98    from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
99
100    active_state: Option<ActiveState<B>>,
101
102    processor_channel: Option<(
103        ringbuf::HeapCons<ContextToProcessorMsg>,
104        ringbuf::HeapProd<ProcessorToContextMsg>,
105    )>,
106    processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner>>,
107
108    clock_shared: Arc<ClockValues>,
109
110    // Re-use the allocations for groups of events.
111    event_group_pool: Vec<Vec<NodeEvent>>,
112    event_group: Vec<NodeEvent>,
113    initial_event_group_capacity: usize,
114
115    config: FirewheelConfig,
116}
117
118impl<B: AudioBackend> FirewheelCtx<B> {
119    /// Create a new Firewheel context.
120    pub fn new(config: FirewheelConfig) -> Self {
121        let clock_shared = Arc::new(ClockValues {
122            seconds: AtomicF64::new(0.0),
123            samples: AtomicI64::new(0),
124            musical: AtomicF64::new(0.0),
125        });
126
127        let (to_processor_tx, from_context_rx) =
128            ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
129        let (to_context_tx, from_processor_rx) =
130            ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
131                .split();
132
133        let initial_event_group_capacity = config.initial_event_group_capacity as usize;
134        let mut event_group_pool = Vec::with_capacity(16);
135        for _ in 0..3 {
136            event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
137        }
138
139        Self {
140            graph: AudioGraph::new(&config),
141            to_processor_tx,
142            from_processor_rx,
143            active_state: None,
144            processor_channel: Some((from_context_rx, to_context_tx)),
145            processor_drop_rx: None,
146            clock_shared: Arc::clone(&clock_shared),
147            event_group_pool,
148            event_group: Vec::with_capacity(initial_event_group_capacity),
149            initial_event_group_capacity,
150            config,
151        }
152    }
153
154    /// Get a list of the available audio input devices.
155    pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
156        B::available_input_devices()
157    }
158
159    /// Get a list of the available audio output devices.
160    pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
161        B::available_output_devices()
162    }
163
164    /// Returns `true` if an audio stream can be started right now.
165    ///
166    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
167    /// old stream to be fully stopped. This method is used to check if it has been
168    /// dropped yet.
169    ///
170    /// Note, in rare cases where the audio thread crashes without cleanly dropping
171    /// its contents, this may never return `true`. Consider adding a timeout to
172    /// avoid deadlocking.
173    pub fn can_start_stream(&self) -> bool {
174        if self.is_audio_stream_running() {
175            false
176        } else if let Some(rx) = &self.processor_drop_rx {
177            rx.try_peek().is_some()
178        } else {
179            true
180        }
181    }
182
183    /// Start an audio stream for this context. Only one audio stream can exist on
184    /// a context at a time.
185    ///
186    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
187    /// old stream to be fully stopped. Use [`FirewheelCtx::can_start_stream`] to
188    /// check if it has been dropped yet.
189    ///
190    /// Note, in rare cases where the audio thread crashes without cleanly dropping
191    /// its contents, this may never succeed. Consider adding a timeout to avoid
192    /// deadlocking.
193    pub fn start_stream(
194        &mut self,
195        config: B::Config,
196    ) -> Result<(), StartStreamError<B::StartStreamError>> {
197        if self.is_audio_stream_running() {
198            return Err(StartStreamError::AlreadyStarted);
199        }
200
201        if !self.can_start_stream() {
202            return Err(StartStreamError::OldStreamNotFinishedStopping);
203        }
204
205        let (mut backend_handle, mut stream_info) =
206            B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
207
208        stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
209        stream_info.declick_frames = NonZeroU32::new(
210            (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
211        )
212        .unwrap_or(NonZeroU32::MIN);
213
214        let schedule = self.graph.compile(&stream_info)?;
215
216        let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner>::new(1).split();
217
218        let processor =
219            if let Some((from_context_rx, to_context_tx)) = self.processor_channel.take() {
220                FirewheelProcessorInner::new(
221                    from_context_rx,
222                    to_context_tx,
223                    Arc::clone(&self.clock_shared),
224                    self.graph.node_capacity(),
225                    &stream_info,
226                    self.config.hard_clip_outputs,
227                )
228            } else {
229                let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
230
231                if processor.poisoned {
232                    panic!("The audio thread has panicked!");
233                }
234
235                processor.new_stream(&stream_info);
236
237                processor
238            };
239
240        backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
241
242        if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
243        {
244            panic!("Firewheel message channel is full!");
245        }
246
247        self.active_state = Some(ActiveState {
248            backend_handle,
249            stream_info,
250        });
251        self.processor_drop_rx = Some(drop_rx);
252
253        Ok(())
254    }
255
256    /// Stop the audio stream in this context.
257    pub fn stop_stream(&mut self) {
258        // When the backend handle is dropped, the backend will automatically
259        // stop its stream.
260        self.active_state = None;
261        self.graph.deactivate();
262    }
263
264    /// Returns `true` if there is currently a running audio stream.
265    pub fn is_audio_stream_running(&self) -> bool {
266        self.active_state.is_some()
267    }
268
269    /// Information about the running audio stream.
270    ///
271    /// Returns `None` if no audio stream is currently running.
272    pub fn stream_info(&self) -> Option<&StreamInfo> {
273        self.active_state.as_ref().map(|s| &s.stream_info)
274    }
275
276    /// The current time of the clock in the number of seconds since the stream
277    /// was started.
278    ///
279    /// Note, this clock is not perfectly accurate, but it is good enough for
280    /// most use cases. This clock also correctly accounts for any output
281    /// underflows that may occur.
282    pub fn clock_now(&self) -> ClockSeconds {
283        ClockSeconds(self.clock_shared.seconds.load(Ordering::Relaxed))
284    }
285
286    /// The current time of the sample clock in the number of samples (of a single
287    /// channel of audio) that have been processed since the beginning of the
288    /// stream.
289    ///
290    /// This is more accurate than the seconds clock, and is ideal for syncing
291    /// events to a musical transport. Though note that this clock does not
292    /// account for any output underflows that may occur.
293    pub fn clock_samples(&self) -> ClockSamples {
294        ClockSamples(self.clock_shared.samples.load(Ordering::Relaxed))
295    }
296
297    /// The current musical time of the transport.
298    ///
299    /// If no transport is currently active, then this will have a value of `0`.
300    pub fn clock_musical(&self) -> MusicalTime {
301        MusicalTime(self.clock_shared.musical.load(Ordering::Relaxed))
302    }
303
304    /// Set the musical transport to use.
305    ///
306    /// If an existing musical transport is already running, then the new
307    /// transport will pick up where the old one left off. This allows you
308    /// to, for example, change the tempo dynamically at runtime.
309    ///
310    /// If the message channel is full, then this will return an error.
311    pub fn set_transport(
312        &mut self,
313        transport: Option<MusicalTransport>,
314    ) -> Result<(), UpdateError<B::StreamError>> {
315        self.send_message_to_processor(ContextToProcessorMsg::SetTransport(transport))
316            .map_err(|(_, e)| e)
317    }
318
319    /// Start or restart the musical transport.
320    ///
321    /// If the message channel is full, then this will return an error.
322    pub fn start_or_restart_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
323        self.send_message_to_processor(ContextToProcessorMsg::StartOrRestartTransport)
324            .map_err(|(_, e)| e)
325    }
326
327    /// Pause the musical transport.
328    ///
329    /// If the message channel is full, then this will return an error.
330    pub fn pause_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
331        self.send_message_to_processor(ContextToProcessorMsg::PauseTransport)
332            .map_err(|(_, e)| e)
333    }
334
335    /// Resume the musical transport.
336    ///
337    /// If the message channel is full, then this will return an error.
338    pub fn resume_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
339        self.send_message_to_processor(ContextToProcessorMsg::ResumeTransport)
340            .map_err(|(_, e)| e)
341    }
342
343    /// Stop the musical transport.
344    ///
345    /// If the message channel is full, then this will return an error.
346    pub fn stop_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
347        self.send_message_to_processor(ContextToProcessorMsg::StopTransport)
348            .map_err(|(_, e)| e)
349    }
350
351    /// Whether or not outputs are being hard clipped at 0dB.
352    pub fn hard_clip_outputs(&self) -> bool {
353        self.config.hard_clip_outputs
354    }
355
356    /// Set whether or not outputs should be hard clipped at 0dB to
357    /// help protect the system's speakers.
358    ///
359    /// Note that most operating systems already hard clip the output,
360    /// so this is usually not needed (TODO: Do research to see if this
361    /// assumption is true.)
362    ///
363    /// If the message channel is full, then this will return an error.
364    pub fn set_hard_clip_outputs(
365        &mut self,
366        hard_clip_outputs: bool,
367    ) -> Result<(), UpdateError<B::StreamError>> {
368        if self.config.hard_clip_outputs == hard_clip_outputs {
369            return Ok(());
370        }
371        self.config.hard_clip_outputs = hard_clip_outputs;
372
373        self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
374            .map_err(|(_, e)| e)
375    }
376
377    /// Update the firewheel context.
378    ///
379    /// This must be called reguarly (i.e. once every frame).
380    pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
381        firewheel_core::collector::GlobalCollector.collect();
382
383        for msg in self.from_processor_rx.pop_iter() {
384            match msg {
385                ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
386                    event_group.clear();
387                    self.event_group_pool.push(event_group);
388                }
389                ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
390                    let _ = schedule_data;
391                }
392            }
393        }
394
395        self.graph.update(
396            self.active_state.as_ref().map(|s| &s.stream_info),
397            &mut self.event_group,
398        );
399
400        if let Some(active_state) = &mut self.active_state {
401            if let Err(e) = active_state.backend_handle.poll_status() {
402                self.active_state = None;
403                self.graph.deactivate();
404
405                return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
406            }
407
408            if self
409                .processor_drop_rx
410                .as_ref()
411                .unwrap()
412                .try_peek()
413                .is_some()
414            {
415                self.active_state = None;
416                self.graph.deactivate();
417
418                return Err(UpdateError::StreamStoppedUnexpectedly(None));
419            }
420        }
421
422        if self.is_audio_stream_running() {
423            if self.graph.needs_compile() {
424                let schedule_data = self
425                    .graph
426                    .compile(&self.active_state.as_ref().unwrap().stream_info)?;
427
428                if let Err((msg, e)) = self
429                    .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
430                {
431                    let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
432                        unreachable!();
433                    };
434
435                    self.graph.on_schedule_send_failed(schedule);
436
437                    return Err(e);
438                }
439            }
440
441            if !self.event_group.is_empty() {
442                let mut next_event_group = self
443                    .event_group_pool
444                    .pop()
445                    .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
446                core::mem::swap(&mut next_event_group, &mut self.event_group);
447
448                if let Err((msg, e)) = self
449                    .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
450                {
451                    let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
452                        unreachable!();
453                    };
454
455                    core::mem::swap(&mut event_group, &mut self.event_group);
456                    self.event_group_pool.push(event_group);
457
458                    return Err(e);
459                }
460            }
461        }
462
463        Ok(())
464    }
465
466    /// The ID of the graph input node
467    pub fn graph_in_node_id(&self) -> NodeID {
468        self.graph.graph_in_node()
469    }
470
471    /// The ID of the graph output node
472    pub fn graph_out_node_id(&self) -> NodeID {
473        self.graph.graph_out_node()
474    }
475
476    /// Add a node to the audio graph.
477    pub fn add_node<T: AudioNode + 'static>(
478        &mut self,
479        node: T,
480        config: Option<T::Configuration>,
481    ) -> NodeID {
482        self.graph.add_node(node, config)
483    }
484
485    /// Add a node to the audio graph which implements the type-erased [`DynAudioNode`] trait.
486    pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
487        self.graph.add_dyn_node(node)
488    }
489
490    /// Remove the given node from the audio graph.
491    ///
492    /// This will automatically remove all edges from the graph that
493    /// were connected to this node.
494    ///
495    /// On success, this returns a list of all edges that were removed
496    /// from the graph as a result of removing this node.
497    ///
498    /// This will return an error if a node with the given ID does not
499    /// exist in the graph, or if the ID is of the graph input or graph
500    /// output node.
501    pub fn remove_node(&mut self, node_id: NodeID) -> Result<SmallVec<[EdgeID; 4]>, ()> {
502        self.graph.remove_node(node_id)
503    }
504
505    /// Get information about a node in the graph.
506    pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
507        self.graph.node_info(id)
508    }
509
510    /// Get an immutable reference to the custom state of a node.
511    pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
512        self.graph.node_state(id)
513    }
514
515    /// Get a type-erased, immutable reference to the custom state of a node.
516    pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
517        self.graph.node_state_dyn(id)
518    }
519
520    /// Get a mutable reference to the custom state of a node.
521    pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
522        self.graph.node_state_mut(id)
523    }
524
525    pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
526        self.graph.node_state_dyn_mut(id)
527    }
528
529    /// Get a list of all the existing nodes in the graph.
530    pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
531        self.graph.nodes()
532    }
533
534    /// Get a list of all the existing edges in the graph.
535    pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
536        self.graph.edges()
537    }
538
539    /// Set the number of input and output channels to and from the audio graph.
540    ///
541    /// Returns the list of edges that were removed.
542    pub fn set_graph_channel_config(
543        &mut self,
544        channel_config: ChannelConfig,
545    ) -> SmallVec<[EdgeID; 4]> {
546        self.graph.set_graph_channel_config(channel_config)
547    }
548
549    /// Add connections (edges) between two nodes to the graph.
550    ///
551    /// * `src_node` - The ID of the source node.
552    /// * `dst_node` - The ID of the destination node.
553    /// * `ports_src_dst` - The port indices for each connection to make,
554    /// where the first value in a tuple is the output port on `src_node`,
555    /// and the second value in that tuple is the input port on `dst_node`.
556    /// * `check_for_cycles` - If `true`, then this will run a check to
557    /// see if adding these edges will create a cycle in the graph, and
558    /// return an error if it does. Note, checking for cycles can be quite
559    /// expensive, so avoid enabling this when calling this method many times
560    /// in a row.
561    ///
562    /// If successful, then this returns a list of edge IDs in order.
563    ///
564    /// If this returns an error, then the audio graph has not been
565    /// modified.
566    pub fn connect(
567        &mut self,
568        src_node: NodeID,
569        dst_node: NodeID,
570        ports_src_dst: &[(PortIdx, PortIdx)],
571        check_for_cycles: bool,
572    ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
573        self.graph
574            .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
575    }
576
577    /// Remove connections (edges) between two nodes from the graph.
578    ///
579    /// * `src_node` - The ID of the source node.
580    /// * `dst_node` - The ID of the destination node.
581    /// * `ports_src_dst` - The port indices for each connection to make,
582    /// where the first value in a tuple is the output port on `src_node`,
583    /// and the second value in that tuple is the input port on `dst_node`.
584    ///
585    /// If none of the edges existed in the graph, then `false` will be
586    /// returned.
587    pub fn disconnect(
588        &mut self,
589        src_node: NodeID,
590        dst_node: NodeID,
591        ports_src_dst: &[(PortIdx, PortIdx)],
592    ) -> bool {
593        self.graph.disconnect(src_node, dst_node, ports_src_dst)
594    }
595
596    /// Remove all connections (edges) between two nodes in the graph.
597    ///
598    /// * `src_node` - The ID of the source node.
599    /// * `dst_node` - The ID of the destination node.
600    pub fn disconnect_all_between(
601        &mut self,
602        src_node: NodeID,
603        dst_node: NodeID,
604    ) -> SmallVec<[EdgeID; 4]> {
605        self.graph.disconnect_all_between(src_node, dst_node)
606    }
607
608    /// Remove a connection (edge) via the edge's unique ID.
609    ///
610    /// If the edge did not exist in this graph, then `false` will be returned.
611    pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
612        self.graph.disconnect_by_edge_id(edge_id)
613    }
614
615    /// Get information about the given [Edge]
616    pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
617        self.graph.edge(edge_id)
618    }
619
620    /// Runs a check to see if a cycle exists in the audio graph.
621    ///
622    /// Note, this method is expensive.
623    pub fn cycle_detected(&mut self) -> bool {
624        self.graph.cycle_detected()
625    }
626
627    /// Queue an event to be sent to an audio node's processor.
628    ///
629    /// Note, this event will not be sent until the event queue is flushed
630    /// in [`FirewheelCtx::update`].
631    pub fn queue_event(&mut self, event: NodeEvent) {
632        self.event_group.push(event);
633    }
634
635    /// Queue an event to be sent to an audio node's processor.
636    ///
637    /// Note, this event will not be sent until the event queue is flushed
638    /// in [`FirewheelCtx::update`].
639    pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
640        self.queue_event(NodeEvent { node_id, event });
641    }
642
643    fn send_message_to_processor(
644        &mut self,
645        msg: ContextToProcessorMsg,
646    ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
647        self.to_processor_tx
648            .try_push(msg)
649            .map_err(|msg| (msg, UpdateError::MsgChannelFull))
650    }
651}
652
653impl<B: AudioBackend> Drop for FirewheelCtx<B> {
654    fn drop(&mut self) {
655        self.stop_stream();
656
657        // Wait for the processor to be drop to avoid deallocating it on
658        // the audio thread.
659        #[cfg(not(target_family = "wasm"))]
660        if let Some(drop_rx) = self.processor_drop_rx.take() {
661            let now = bevy_platform::time::Instant::now();
662
663            while drop_rx.try_peek().is_none() {
664                if now.elapsed() > core::time::Duration::from_secs(2) {
665                    break;
666                }
667
668                bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
669            }
670        }
671
672        firewheel_core::collector::GlobalCollector.collect();
673    }
674}
675
676pub(crate) struct ClockValues {
677    pub seconds: AtomicF64,
678    pub samples: AtomicI64,
679    pub musical: AtomicF64,
680}
681
682impl<B: AudioBackend> FirewheelCtx<B> {
683    /// Construct an [`ContextQueue`] for diffing.
684    pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
685        ContextQueue { context: self, id }
686    }
687}
688
689/// An event queue acquired from [`FirewheelCtx::event_queue`].
690///
691/// This can help reduce event queue allocations
692/// when you have direct access to the context.
693///
694/// ```
695/// # use firewheel_core::{diff::{Diff, PathBuilder}, node::NodeID};
696/// # use firewheel_graph::{backend::AudioBackend, FirewheelCtx, ContextQueue};
697/// # fn context_queue<B: AudioBackend, D: Diff>(
698/// #     context: &mut FirewheelCtx<B>,
699/// #     node_id: NodeID,
700/// #     params: &D,
701/// #     baseline: &D,
702/// # ) {
703/// // Get a queue that will send events directly to the provided node.
704/// let mut queue = context.event_queue(node_id);
705/// // Perform diffing using this queue.
706/// params.diff(baseline, PathBuilder::default(), &mut queue);
707/// # }
708/// ```
709pub struct ContextQueue<'a, B: AudioBackend> {
710    context: &'a mut FirewheelCtx<B>,
711    id: NodeID,
712}
713
714impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
715    fn push(&mut self, data: NodeEventType) {
716        self.context.queue_event(NodeEvent {
717            event: data,
718            node_id: self.id,
719        });
720    }
721}