firewheel_graph/
context.rs

1use atomic_float::AtomicF64;
2use core::any::Any;
3use firewheel_core::{
4    channel_config::{ChannelConfig, ChannelCount},
5    clock::{ClockSamples, ClockSeconds, MusicalTime, MusicalTransport},
6    collector::Collector,
7    dsp::declick::DeclickValues,
8    event::{NodeEvent, NodeEventType},
9    node::{AudioNode, DynAudioNode, NodeID},
10    StreamInfo,
11};
12use ringbuf::traits::{Consumer, Producer, Split};
13use smallvec::SmallVec;
14use std::{
15    num::NonZeroU32,
16    sync::{
17        atomic::{AtomicI64, Ordering},
18        Arc,
19    },
20};
21
22use crate::{
23    backend::{AudioBackend, DeviceInfo},
24    error::{AddEdgeError, StartStreamError, UpdateError},
25    graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
26    processor::{
27        ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
28    },
29};
30
31/// The configuration of a Firewheel context.
32#[derive(Debug, Clone, Copy, PartialEq)]
33pub struct FirewheelConfig {
34    /// The number of input channels in the audio graph.
35    pub num_graph_inputs: ChannelCount,
36    /// The number of output channels in the audio graph.
37    pub num_graph_outputs: ChannelCount,
38    /// If `true`, then all outputs will be hard clipped at 0db to help
39    /// protect the system's speakers.
40    ///
41    /// Note that most operating systems already hard clip the output,
42    /// so this is usually not needed (TODO: Do research to see if this
43    /// assumption is true.)
44    ///
45    /// By default this is set to `false`.
46    pub hard_clip_outputs: bool,
47    /// An initial capacity to allocate for the nodes in the audio graph.
48    ///
49    /// By default this is set to `64`.
50    pub initial_node_capacity: u32,
51    /// An initial capacity to allocate for the edges in the audio graph.
52    ///
53    /// By default this is set to `256`.
54    pub initial_edge_capacity: u32,
55    /// The amount of time in seconds to fade in/out when pausing/resuming
56    /// to avoid clicks and pops.
57    ///
58    /// By default this is set to `10.0 / 1_000.0`.
59    pub declick_seconds: f32,
60    /// The initial capacity for a group of events.
61    ///
62    /// By default this is set to `128`.
63    pub initial_event_group_capacity: u32,
64    /// The capacity of the engine's internal message channel.
65    ///
66    /// By default this is set to `64`.
67    pub channel_capacity: u32,
68    /// The capacity of an event queue in the engine (one event queue per node).
69    ///
70    /// By default this is set to `128`.
71    pub event_queue_capacity: u32,
72}
73
74impl Default for FirewheelConfig {
75    fn default() -> Self {
76        Self {
77            num_graph_inputs: ChannelCount::ZERO,
78            num_graph_outputs: ChannelCount::STEREO,
79            hard_clip_outputs: false,
80            initial_node_capacity: 128,
81            initial_edge_capacity: 256,
82            declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
83            initial_event_group_capacity: 128,
84            channel_capacity: 64,
85            event_queue_capacity: 128,
86        }
87    }
88}
89
90struct ActiveState<B: AudioBackend> {
91    backend_handle: B,
92    stream_info: StreamInfo,
93}
94
95/// A Firewheel context
96pub struct FirewheelCtx<B: AudioBackend> {
97    graph: AudioGraph,
98
99    to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
100    from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
101
102    active_state: Option<ActiveState<B>>,
103
104    processor_channel: Option<(
105        ringbuf::HeapCons<ContextToProcessorMsg>,
106        ringbuf::HeapProd<ProcessorToContextMsg>,
107    )>,
108    processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner>>,
109
110    clock_shared: Arc<ClockValues>,
111
112    // Re-use the allocations for groups of events.
113    event_group_pool: Vec<Vec<NodeEvent>>,
114    event_group: Vec<NodeEvent>,
115    initial_event_group_capacity: usize,
116
117    config: FirewheelConfig,
118}
119
120impl<B: AudioBackend> FirewheelCtx<B> {
121    /// Create a new Firewheel context.
122    pub fn new(config: FirewheelConfig) -> Self {
123        let clock_shared = Arc::new(ClockValues {
124            seconds: AtomicF64::new(0.0),
125            samples: AtomicI64::new(0),
126            musical: AtomicF64::new(0.0),
127        });
128
129        let (to_processor_tx, from_context_rx) =
130            ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
131        let (to_context_tx, from_processor_rx) =
132            ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
133                .split();
134
135        let initial_event_group_capacity = config.initial_event_group_capacity as usize;
136        let mut event_group_pool = Vec::with_capacity(16);
137        for _ in 0..3 {
138            event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
139        }
140
141        Self {
142            graph: AudioGraph::new(&config),
143            to_processor_tx,
144            from_processor_rx,
145            active_state: None,
146            processor_channel: Some((from_context_rx, to_context_tx)),
147            processor_drop_rx: None,
148            clock_shared: Arc::clone(&clock_shared),
149            event_group_pool,
150            event_group: Vec::with_capacity(initial_event_group_capacity),
151            initial_event_group_capacity,
152            config,
153        }
154    }
155
156    /// Get a list of the available audio input devices.
157    pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
158        B::available_input_devices()
159    }
160
161    /// Get a list of the available audio output devices.
162    pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
163        B::available_output_devices()
164    }
165
166    /// Returns `true` if an audio stream can be started right now.
167    ///
168    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
169    /// old stream to be fully stopped. This method is used to check if it has been
170    /// dropped yet.
171    ///
172    /// Note, in rare cases where the audio thread crashes without cleanly dropping
173    /// its contents, this may never return `true`. Consider adding a timeout to
174    /// avoid deadlocking.
175    pub fn can_start_stream(&self) -> bool {
176        if self.is_audio_stream_running() {
177            false
178        } else if let Some(rx) = &self.processor_drop_rx {
179            rx.try_peek().is_some()
180        } else {
181            true
182        }
183    }
184
185    /// Start an audio stream for this context. Only one audio stream can exist on
186    /// a context at a time.
187    ///
188    /// When calling [`FirewheelCtx::stop_stream()`], it may take some time for the
189    /// old stream to be fully stopped. Use [`FirewheelCtx::can_start_stream`] to
190    /// check if it has been dropped yet.
191    ///
192    /// Note, in rare cases where the audio thread crashes without cleanly dropping
193    /// its contents, this may never succeed. Consider adding a timeout to avoid
194    /// deadlocking.
195    pub fn start_stream(
196        &mut self,
197        config: B::Config,
198    ) -> Result<(), StartStreamError<B::StartStreamError>> {
199        if self.is_audio_stream_running() {
200            return Err(StartStreamError::AlreadyStarted);
201        }
202
203        if !self.can_start_stream() {
204            return Err(StartStreamError::OldStreamNotFinishedStopping);
205        }
206
207        let (mut backend_handle, mut stream_info) =
208            B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
209
210        stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
211        stream_info.declick_frames = NonZeroU32::new(
212            (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
213        )
214        .unwrap_or(NonZeroU32::MIN);
215
216        let schedule = self.graph.compile(&stream_info)?;
217
218        let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner>::new(1).split();
219
220        let processor =
221            if let Some((from_context_rx, to_context_tx)) = self.processor_channel.take() {
222                FirewheelProcessorInner::new(
223                    from_context_rx,
224                    to_context_tx,
225                    Arc::clone(&self.clock_shared),
226                    self.graph.node_capacity(),
227                    &stream_info,
228                    self.config.hard_clip_outputs,
229                )
230            } else {
231                let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
232
233                if processor.poisoned {
234                    panic!("The audio thread has panicked!");
235                }
236
237                processor.new_stream(&stream_info);
238
239                processor
240            };
241
242        backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
243
244        if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
245        {
246            panic!("Firewheel message channel is full!");
247        }
248
249        self.active_state = Some(ActiveState {
250            backend_handle,
251            stream_info,
252        });
253        self.processor_drop_rx = Some(drop_rx);
254
255        Ok(())
256    }
257
258    /// Stop the audio stream in this context.
259    pub fn stop_stream(&mut self) {
260        // When the backend handle is dropped, the backend will automatically
261        // stop its stream.
262        self.active_state = None;
263        self.graph.deactivate();
264    }
265
266    /// Returns `true` if there is currently a running audio stream.
267    pub fn is_audio_stream_running(&self) -> bool {
268        self.active_state.is_some()
269    }
270
271    /// Information about the running audio stream.
272    ///
273    /// Returns `None` if no audio stream is currently running.
274    pub fn stream_info(&self) -> Option<&StreamInfo> {
275        self.active_state.as_ref().map(|s| &s.stream_info)
276    }
277
278    /// The current time of the clock in the number of seconds since the stream
279    /// was started.
280    ///
281    /// Note, this clock is not perfectly accurate, but it is good enough for
282    /// most use cases. This clock also correctly accounts for any output
283    /// underflows that may occur.
284    pub fn clock_now(&self) -> ClockSeconds {
285        ClockSeconds(self.clock_shared.seconds.load(Ordering::Relaxed))
286    }
287
288    /// The current time of the sample clock in the number of samples (of a single
289    /// channel of audio) that have been processed since the beginning of the
290    /// stream.
291    ///
292    /// This is more accurate than the seconds clock, and is ideal for syncing
293    /// events to a musical transport. Though note that this clock does not
294    /// account for any output underflows that may occur.
295    pub fn clock_samples(&self) -> ClockSamples {
296        ClockSamples(self.clock_shared.samples.load(Ordering::Relaxed))
297    }
298
299    /// The current musical time of the transport.
300    ///
301    /// If no transport is currently active, then this will have a value of `0`.
302    pub fn clock_musical(&self) -> MusicalTime {
303        MusicalTime(self.clock_shared.musical.load(Ordering::Relaxed))
304    }
305
306    /// Set the musical transport to use.
307    ///
308    /// If an existing musical transport is already running, then the new
309    /// transport will pick up where the old one left off. This allows you
310    /// to, for example, change the tempo dynamically at runtime.
311    ///
312    /// If the message channel is full, then this will return an error.
313    pub fn set_transport(
314        &mut self,
315        transport: Option<MusicalTransport>,
316    ) -> Result<(), UpdateError<B::StreamError>> {
317        self.send_message_to_processor(ContextToProcessorMsg::SetTransport(transport))
318            .map_err(|(_, e)| e)
319    }
320
321    /// Start or restart the musical transport.
322    ///
323    /// If the message channel is full, then this will return an error.
324    pub fn start_or_restart_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
325        self.send_message_to_processor(ContextToProcessorMsg::StartOrRestartTransport)
326            .map_err(|(_, e)| e)
327    }
328
329    /// Pause the musical transport.
330    ///
331    /// If the message channel is full, then this will return an error.
332    pub fn pause_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
333        self.send_message_to_processor(ContextToProcessorMsg::PauseTransport)
334            .map_err(|(_, e)| e)
335    }
336
337    /// Resume the musical transport.
338    ///
339    /// If the message channel is full, then this will return an error.
340    pub fn resume_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
341        self.send_message_to_processor(ContextToProcessorMsg::ResumeTransport)
342            .map_err(|(_, e)| e)
343    }
344
345    /// Stop the musical transport.
346    ///
347    /// If the message channel is full, then this will return an error.
348    pub fn stop_transport(&mut self) -> Result<(), UpdateError<B::StreamError>> {
349        self.send_message_to_processor(ContextToProcessorMsg::StopTransport)
350            .map_err(|(_, e)| e)
351    }
352
353    /// Whether or not outputs are being hard clipped at 0dB.
354    pub fn hard_clip_outputs(&self) -> bool {
355        self.config.hard_clip_outputs
356    }
357
358    /// Set whether or not outputs should be hard clipped at 0dB to
359    /// help protect the system's speakers.
360    ///
361    /// Note that most operating systems already hard clip the output,
362    /// so this is usually not needed (TODO: Do research to see if this
363    /// assumption is true.)
364    ///
365    /// If the message channel is full, then this will return an error.
366    pub fn set_hard_clip_outputs(
367        &mut self,
368        hard_clip_outputs: bool,
369    ) -> Result<(), UpdateError<B::StreamError>> {
370        if self.config.hard_clip_outputs == hard_clip_outputs {
371            return Ok(());
372        }
373        self.config.hard_clip_outputs = hard_clip_outputs;
374
375        self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
376            .map_err(|(_, e)| e)
377    }
378
379    /// Update the firewheel context.
380    ///
381    /// This must be called reguarly (i.e. once every frame).
382    pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
383        firewheel_core::collector::GlobalCollector.collect();
384
385        for msg in self.from_processor_rx.pop_iter() {
386            match msg {
387                ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
388                    event_group.clear();
389                    self.event_group_pool.push(event_group);
390                }
391                ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
392                    let _ = schedule_data;
393                }
394            }
395        }
396
397        self.graph.update(
398            self.active_state.as_ref().map(|s| &s.stream_info),
399            &mut self.event_group,
400        );
401
402        if let Some(active_state) = &mut self.active_state {
403            if let Err(e) = active_state.backend_handle.poll_status() {
404                self.active_state = None;
405                self.graph.deactivate();
406
407                return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
408            }
409
410            if self
411                .processor_drop_rx
412                .as_ref()
413                .unwrap()
414                .try_peek()
415                .is_some()
416            {
417                self.active_state = None;
418                self.graph.deactivate();
419
420                return Err(UpdateError::StreamStoppedUnexpectedly(None));
421            }
422        }
423
424        if self.is_audio_stream_running() {
425            if self.graph.needs_compile() {
426                let schedule_data = self
427                    .graph
428                    .compile(&self.active_state.as_ref().unwrap().stream_info)?;
429
430                if let Err((msg, e)) = self
431                    .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
432                {
433                    let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
434                        unreachable!();
435                    };
436
437                    self.graph.on_schedule_send_failed(schedule);
438
439                    return Err(e);
440                }
441            }
442
443            if !self.event_group.is_empty() {
444                let mut next_event_group = self
445                    .event_group_pool
446                    .pop()
447                    .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
448                std::mem::swap(&mut next_event_group, &mut self.event_group);
449
450                if let Err((msg, e)) = self
451                    .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
452                {
453                    let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
454                        unreachable!();
455                    };
456
457                    std::mem::swap(&mut event_group, &mut self.event_group);
458                    self.event_group_pool.push(event_group);
459
460                    return Err(e);
461                }
462            }
463        }
464
465        Ok(())
466    }
467
468    /// The ID of the graph input node
469    pub fn graph_in_node_id(&self) -> NodeID {
470        self.graph.graph_in_node()
471    }
472
473    /// The ID of the graph output node
474    pub fn graph_out_node_id(&self) -> NodeID {
475        self.graph.graph_out_node()
476    }
477
478    /// Add a node to the audio graph.
479    pub fn add_node<T: AudioNode + 'static>(
480        &mut self,
481        node: T,
482        config: Option<T::Configuration>,
483    ) -> NodeID {
484        self.graph.add_node(node, config)
485    }
486
487    /// Add a node to the audio graph which implements the type-erased [`DynAudioNode`] trait.
488    pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
489        self.graph.add_dyn_node(node)
490    }
491
492    /// Remove the given node from the audio graph.
493    ///
494    /// This will automatically remove all edges from the graph that
495    /// were connected to this node.
496    ///
497    /// On success, this returns a list of all edges that were removed
498    /// from the graph as a result of removing this node.
499    ///
500    /// This will return an error if a node with the given ID does not
501    /// exist in the graph, or if the ID is of the graph input or graph
502    /// output node.
503    pub fn remove_node(&mut self, node_id: NodeID) -> Result<SmallVec<[EdgeID; 4]>, ()> {
504        self.graph.remove_node(node_id)
505    }
506
507    /// Get information about a node in the graph.
508    pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
509        self.graph.node_info(id)
510    }
511
512    /// Get an immutable reference to the custom state of a node.
513    pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
514        self.graph.node_state(id)
515    }
516
517    /// Get a type-erased, immutable reference to the custom state of a node.
518    pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
519        self.graph.node_state_dyn(id)
520    }
521
522    /// Get a mutable reference to the custom state of a node.
523    pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
524        self.graph.node_state_mut(id)
525    }
526
527    pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
528        self.graph.node_state_dyn_mut(id)
529    }
530
531    /// Get a list of all the existing nodes in the graph.
532    pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
533        self.graph.nodes()
534    }
535
536    /// Get a list of all the existing edges in the graph.
537    pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
538        self.graph.edges()
539    }
540
541    /// Set the number of input and output channels to and from the audio graph.
542    ///
543    /// Returns the list of edges that were removed.
544    pub fn set_graph_channel_config(
545        &mut self,
546        channel_config: ChannelConfig,
547    ) -> SmallVec<[EdgeID; 4]> {
548        self.graph.set_graph_channel_config(channel_config)
549    }
550
551    /// Add connections (edges) between two nodes to the graph.
552    ///
553    /// * `src_node` - The ID of the source node.
554    /// * `dst_node` - The ID of the destination node.
555    /// * `ports_src_dst` - The port indices for each connection to make,
556    /// where the first value in a tuple is the output port on `src_node`,
557    /// and the second value in that tuple is the input port on `dst_node`.
558    /// * `check_for_cycles` - If `true`, then this will run a check to
559    /// see if adding these edges will create a cycle in the graph, and
560    /// return an error if it does. Note, checking for cycles can be quite
561    /// expensive, so avoid enabling this when calling this method many times
562    /// in a row.
563    ///
564    /// If successful, then this returns a list of edge IDs in order.
565    ///
566    /// If this returns an error, then the audio graph has not been
567    /// modified.
568    pub fn connect(
569        &mut self,
570        src_node: NodeID,
571        dst_node: NodeID,
572        ports_src_dst: &[(PortIdx, PortIdx)],
573        check_for_cycles: bool,
574    ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
575        self.graph
576            .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
577    }
578
579    /// Remove connections (edges) between two nodes from the graph.
580    ///
581    /// * `src_node` - The ID of the source node.
582    /// * `dst_node` - The ID of the destination node.
583    /// * `ports_src_dst` - The port indices for each connection to make,
584    /// where the first value in a tuple is the output port on `src_node`,
585    /// and the second value in that tuple is the input port on `dst_node`.
586    ///
587    /// If none of the edges existed in the graph, then `false` will be
588    /// returned.
589    pub fn disconnect(
590        &mut self,
591        src_node: NodeID,
592        dst_node: NodeID,
593        ports_src_dst: &[(PortIdx, PortIdx)],
594    ) -> bool {
595        self.graph.disconnect(src_node, dst_node, ports_src_dst)
596    }
597
598    /// Remove all connections (edges) between two nodes in the graph.
599    ///
600    /// * `src_node` - The ID of the source node.
601    /// * `dst_node` - The ID of the destination node.
602    pub fn disconnect_all_between(
603        &mut self,
604        src_node: NodeID,
605        dst_node: NodeID,
606    ) -> SmallVec<[EdgeID; 4]> {
607        self.graph.disconnect_all_between(src_node, dst_node)
608    }
609
610    /// Remove a connection (edge) via the edge's unique ID.
611    ///
612    /// If the edge did not exist in this graph, then `false` will be returned.
613    pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
614        self.graph.disconnect_by_edge_id(edge_id)
615    }
616
617    /// Get information about the given [Edge]
618    pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
619        self.graph.edge(edge_id)
620    }
621
622    /// Runs a check to see if a cycle exists in the audio graph.
623    ///
624    /// Note, this method is expensive.
625    pub fn cycle_detected(&mut self) -> bool {
626        self.graph.cycle_detected()
627    }
628
629    /// Queue an event to be sent to an audio node's processor.
630    ///
631    /// Note, this event will not be sent until the event queue is flushed
632    /// in [`FirewheelCtx::update`].
633    pub fn queue_event(&mut self, event: NodeEvent) {
634        self.event_group.push(event);
635    }
636
637    /// Queue an event to be sent to an audio node's processor.
638    ///
639    /// Note, this event will not be sent until the event queue is flushed
640    /// in [`FirewheelCtx::update`].
641    pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
642        self.queue_event(NodeEvent { node_id, event });
643    }
644
645    fn send_message_to_processor(
646        &mut self,
647        msg: ContextToProcessorMsg,
648    ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
649        self.to_processor_tx
650            .try_push(msg)
651            .map_err(|msg| (msg, UpdateError::MsgChannelFull))
652    }
653}
654
655impl<B: AudioBackend> Drop for FirewheelCtx<B> {
656    fn drop(&mut self) {
657        self.stop_stream();
658
659        // Wait for the processor to be drop to avoid deallocating it on
660        // the audio thread.
661        #[cfg(not(target_family = "wasm"))]
662        if let Some(drop_rx) = self.processor_drop_rx.take() {
663            let now = std::time::Instant::now();
664
665            while drop_rx.try_peek().is_none() {
666                if now.elapsed() > std::time::Duration::from_secs(2) {
667                    break;
668                }
669
670                std::thread::sleep(std::time::Duration::from_millis(2));
671            }
672        }
673
674        firewheel_core::collector::GlobalCollector.collect();
675    }
676}
677
678pub(crate) struct ClockValues {
679    pub seconds: AtomicF64,
680    pub samples: AtomicI64,
681    pub musical: AtomicF64,
682}
683
684impl<B: AudioBackend> FirewheelCtx<B> {
685    /// Construct an [`ContextQueue`] for diffing.
686    pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
687        ContextQueue { context: self, id }
688    }
689}
690
691/// An event queue acquired from [`FirewheelCtx::event_queue`].
692///
693/// This can help reduce event queue allocations
694/// when you have direct access to the context.
695///
696/// ```
697/// # use firewheel_core::{diff::{Diff, PathBuilder}, node::NodeID};
698/// # use firewheel_graph::{backend::AudioBackend, FirewheelCtx, ContextQueue};
699/// # fn context_queue<B: AudioBackend, D: Diff>(
700/// #     context: &mut FirewheelCtx<B>,
701/// #     node_id: NodeID,
702/// #     params: &D,
703/// #     baseline: &D,
704/// # ) {
705/// // Get a queue that will send events directly to the provided node.
706/// let mut queue = context.event_queue(node_id);
707/// // Perform diffing using this queue.
708/// params.diff(baseline, PathBuilder::default(), &mut queue);
709/// # }
710/// ```
711pub struct ContextQueue<'a, B: AudioBackend> {
712    context: &'a mut FirewheelCtx<B>,
713    id: NodeID,
714}
715
716impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
717    fn push(&mut self, data: NodeEventType) {
718        self.context.queue_event(NodeEvent {
719            event: data,
720            node_id: self.id,
721        });
722    }
723}