1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::{
9 channel_config::{ChannelConfig, ChannelCount},
10 clock::AudioClock,
11 collector::Collector,
12 dsp::declick::DeclickValues,
13 event::{NodeEvent, NodeEventType},
14 node::{AudioNode, DynAudioNode, NodeID},
15 StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20use crate::processor::BufferOutOfSpaceMode;
21use crate::{
22 backend::{AudioBackend, DeviceInfo},
23 error::{AddEdgeError, StartStreamError, UpdateError},
24 graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
25 processor::{
26 ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
27 SharedClock,
28 },
29};
30
31#[cfg(feature = "scheduled_events")]
32use crate::processor::ClearScheduledEventsEvent;
33#[cfg(feature = "scheduled_events")]
34use firewheel_core::clock::EventInstant;
35
36#[cfg(feature = "musical_transport")]
37use firewheel_core::clock::TransportState;
38
39#[derive(Debug, Clone, Copy, PartialEq)]
41pub struct FirewheelConfig {
42 pub num_graph_inputs: ChannelCount,
44 pub num_graph_outputs: ChannelCount,
46 pub hard_clip_outputs: bool,
55 pub initial_node_capacity: u32,
59 pub initial_edge_capacity: u32,
63 pub declick_seconds: f32,
68 pub initial_event_group_capacity: u32,
72 pub channel_capacity: u32,
76 pub event_queue_capacity: usize,
83 pub immediate_event_capacity: usize,
89 #[cfg(feature = "scheduled_events")]
97 pub scheduled_event_capacity: usize,
98 pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
102
103 pub logger_config: RealtimeLoggerConfig,
105}
106
107impl Default for FirewheelConfig {
108 fn default() -> Self {
109 Self {
110 num_graph_inputs: ChannelCount::ZERO,
111 num_graph_outputs: ChannelCount::STEREO,
112 hard_clip_outputs: false,
113 initial_node_capacity: 128,
114 initial_edge_capacity: 256,
115 declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
116 initial_event_group_capacity: 128,
117 channel_capacity: 64,
118 event_queue_capacity: 128,
119 immediate_event_capacity: 512,
120 #[cfg(feature = "scheduled_events")]
121 scheduled_event_capacity: 512,
122 buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
123 logger_config: RealtimeLoggerConfig::default(),
124 }
125 }
126}
127
128struct ActiveState<B: AudioBackend> {
129 backend_handle: B,
130 stream_info: StreamInfo,
131}
132
133pub struct FirewheelCtx<B: AudioBackend> {
135 graph: AudioGraph,
136
137 to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
138 from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
139 logger_rx: RealtimeLoggerMainThread,
140
141 active_state: Option<ActiveState<B>>,
142
143 processor_channel: Option<(
144 ringbuf::HeapCons<ContextToProcessorMsg>,
145 ringbuf::HeapProd<ProcessorToContextMsg>,
146 triple_buffer::Input<SharedClock<B::Instant>>,
147 RealtimeLogger,
148 )>,
149 processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
150
151 shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
152 sample_rate: NonZeroU32,
153 sample_rate_recip: f64,
154
155 #[cfg(feature = "musical_transport")]
156 transport_state: Box<TransportState>,
157 #[cfg(feature = "musical_transport")]
158 transport_state_alloc_reuse: Option<Box<TransportState>>,
159
160 event_group_pool: Vec<Vec<NodeEvent>>,
162 event_group: Vec<NodeEvent>,
163 initial_event_group_capacity: usize,
164
165 #[cfg(feature = "scheduled_events")]
166 queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
167
168 config: FirewheelConfig,
169}
170
171impl<B: AudioBackend> FirewheelCtx<B> {
172 pub fn new(config: FirewheelConfig) -> Self {
174 let (to_processor_tx, from_context_rx) =
175 ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
176 let (to_context_tx, from_processor_rx) =
177 ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
178 .split();
179
180 let initial_event_group_capacity = config.initial_event_group_capacity as usize;
181 let mut event_group_pool = Vec::with_capacity(16);
182 for _ in 0..3 {
183 event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
184 }
185
186 let (shared_clock_input, shared_clock_output) =
187 triple_buffer::triple_buffer(&SharedClock::default());
188
189 let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
190
191 Self {
192 graph: AudioGraph::new(&config),
193 to_processor_tx,
194 from_processor_rx,
195 logger_rx,
196 active_state: None,
197 processor_channel: Some((from_context_rx, to_context_tx, shared_clock_input, logger)),
198 processor_drop_rx: None,
199 shared_clock_output: RefCell::new(shared_clock_output),
200 sample_rate: NonZeroU32::new(44100).unwrap(),
201 sample_rate_recip: 44100.0f64.recip(),
202 #[cfg(feature = "musical_transport")]
203 transport_state: Box::new(TransportState::default()),
204 #[cfg(feature = "musical_transport")]
205 transport_state_alloc_reuse: None,
206 event_group_pool,
207 event_group: Vec::with_capacity(initial_event_group_capacity),
208 initial_event_group_capacity,
209 #[cfg(feature = "scheduled_events")]
210 queued_clear_scheduled_events: Vec::new(),
211 config,
212 }
213 }
214
215 pub fn active_backend(&self) -> Option<&B> {
218 self.active_state
219 .as_ref()
220 .map(|state| &state.backend_handle)
221 }
222
223 pub fn active_backend_mut(&mut self) -> Option<&mut B> {
226 self.active_state
227 .as_mut()
228 .map(|state| &mut state.backend_handle)
229 }
230
231 pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
233 B::available_input_devices()
234 }
235
236 pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
238 B::available_output_devices()
239 }
240
241 pub fn can_start_stream(&self) -> bool {
251 if self.is_audio_stream_running() {
252 false
253 } else if let Some(rx) = &self.processor_drop_rx {
254 rx.try_peek().is_some()
255 } else {
256 true
257 }
258 }
259
260 pub fn start_stream(
271 &mut self,
272 config: B::Config,
273 ) -> Result<(), StartStreamError<B::StartStreamError>> {
274 if self.is_audio_stream_running() {
275 return Err(StartStreamError::AlreadyStarted);
276 }
277
278 if !self.can_start_stream() {
279 return Err(StartStreamError::OldStreamNotFinishedStopping);
280 }
281
282 let (mut backend_handle, mut stream_info) =
283 B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
284
285 stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
286 stream_info.declick_frames = NonZeroU32::new(
287 (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
288 )
289 .unwrap_or(NonZeroU32::MIN);
290
291 let maybe_processor = self.processor_channel.take();
292
293 stream_info.prev_sample_rate = if maybe_processor.is_some() {
294 stream_info.sample_rate
295 } else {
296 self.sample_rate
297 };
298
299 self.sample_rate = stream_info.sample_rate;
300 self.sample_rate_recip = stream_info.sample_rate_recip;
301
302 let schedule = self.graph.compile(&stream_info)?;
303
304 let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
305
306 let processor = if let Some((from_context_rx, to_context_tx, shared_clock_input, logger)) =
307 maybe_processor
308 {
309 FirewheelProcessorInner::new(
310 from_context_rx,
311 to_context_tx,
312 shared_clock_input,
313 self.config.immediate_event_capacity,
314 #[cfg(feature = "scheduled_events")]
315 self.config.scheduled_event_capacity,
316 self.config.event_queue_capacity,
317 &stream_info,
318 self.config.hard_clip_outputs,
319 self.config.buffer_out_of_space_mode,
320 logger,
321 )
322 } else {
323 let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
324
325 if processor.poisoned {
326 panic!("The audio thread has panicked!");
327 }
328
329 processor.new_stream(&stream_info);
330
331 processor
332 };
333
334 backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
335
336 if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
337 {
338 panic!("Firewheel message channel is full!");
339 }
340
341 self.active_state = Some(ActiveState {
342 backend_handle,
343 stream_info,
344 });
345 self.processor_drop_rx = Some(drop_rx);
346
347 Ok(())
348 }
349
350 pub fn stop_stream(&mut self) {
352 self.active_state = None;
355 self.graph.deactivate();
356 }
357
358 pub fn is_audio_stream_running(&self) -> bool {
360 self.active_state.is_some()
361 }
362
363 pub fn stream_info(&self) -> Option<&StreamInfo> {
367 self.active_state.as_ref().map(|s| &s.stream_info)
368 }
369
370 pub fn audio_clock(&self) -> AudioClock {
385 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
391 let clock = clock_borrowed.read();
392
393 let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
394 .map(|(update_instant, _delay)| update_instant);
395
396 AudioClock {
397 samples: clock.clock_samples,
398 seconds: clock
399 .clock_samples
400 .to_seconds(self.sample_rate, self.sample_rate_recip),
401 #[cfg(feature = "musical_transport")]
402 musical: clock.current_playhead,
403 #[cfg(feature = "musical_transport")]
404 transport_is_playing: clock.transport_is_playing,
405 update_instant,
406 }
407 }
408
409 pub fn audio_clock_corrected(&self) -> AudioClock {
428 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
434 let clock = clock_borrowed.read();
435
436 let Some((update_instant, delay)) =
437 audio_clock_update_instant_and_delay(&clock, &self.active_state)
438 else {
439 return AudioClock {
442 samples: clock.clock_samples,
443 seconds: clock
444 .clock_samples
445 .to_seconds(self.sample_rate, self.sample_rate_recip),
446 #[cfg(feature = "musical_transport")]
447 musical: clock.current_playhead,
448 #[cfg(feature = "musical_transport")]
449 transport_is_playing: clock.transport_is_playing,
450 update_instant: None,
451 };
452 };
453
454 let delta_seconds = DurationSeconds(delay.as_secs_f64());
456
457 let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
458
459 #[cfg(feature = "musical_transport")]
460 let musical = clock.current_playhead.map(|musical_time| {
461 if clock.transport_is_playing && self.transport_state.transport.is_some() {
462 self.transport_state
463 .transport
464 .as_ref()
465 .unwrap()
466 .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
467 } else {
468 musical_time
469 }
470 });
471
472 AudioClock {
473 samples,
474 seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
475 #[cfg(feature = "musical_transport")]
476 musical,
477 #[cfg(feature = "musical_transport")]
478 transport_is_playing: clock.transport_is_playing,
479 update_instant: Some(update_instant),
480 }
481 }
482
483 pub fn audio_clock_instant(&self) -> Option<Instant> {
495 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
501 let clock = clock_borrowed.read();
502
503 audio_clock_update_instant_and_delay(&clock, &self.active_state)
504 .map(|(update_instant, _delay)| update_instant)
505 }
506
507 #[cfg(feature = "musical_transport")]
511 pub fn sync_transport(
512 &mut self,
513 transport: &TransportState,
514 ) -> Result<(), UpdateError<B::StreamError>> {
515 if &*self.transport_state != transport {
516 let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
517 *t = transport.clone();
518 t
519 } else {
520 Box::new(transport.clone())
521 };
522
523 self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
524 .map_err(|(_, e)| e)?;
525
526 *self.transport_state = transport.clone();
527 }
528
529 Ok(())
530 }
531
532 #[cfg(feature = "musical_transport")]
534 pub fn transport_state(&self) -> &TransportState {
535 &self.transport_state
536 }
537
538 #[cfg(feature = "musical_transport")]
540 pub fn transport(&self) -> &TransportState {
541 &self.transport_state
542 }
543
544 pub fn hard_clip_outputs(&self) -> bool {
546 self.config.hard_clip_outputs
547 }
548
549 pub fn set_hard_clip_outputs(
558 &mut self,
559 hard_clip_outputs: bool,
560 ) -> Result<(), UpdateError<B::StreamError>> {
561 if self.config.hard_clip_outputs == hard_clip_outputs {
562 return Ok(());
563 }
564 self.config.hard_clip_outputs = hard_clip_outputs;
565
566 self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
567 .map_err(|(_, e)| e)
568 }
569
570 pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
574 self.logger_rx.flush();
575
576 firewheel_core::collector::GlobalCollector.collect();
577
578 for msg in self.from_processor_rx.pop_iter() {
579 match msg {
580 ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
581 event_group.clear();
582 self.event_group_pool.push(event_group);
583 }
584 ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
585 let _ = schedule_data;
586 }
587 #[cfg(feature = "musical_transport")]
588 ProcessorToContextMsg::ReturnTransportState(transport_state) => {
589 if self.transport_state_alloc_reuse.is_none() {
590 self.transport_state_alloc_reuse = Some(transport_state);
591 }
592 }
593 #[cfg(feature = "scheduled_events")]
594 ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
595 let _ = msgs;
596 }
597 }
598 }
599
600 self.graph.update(
601 self.active_state.as_ref().map(|s| &s.stream_info),
602 &mut self.event_group,
603 );
604
605 if let Some(active_state) = &mut self.active_state {
606 if let Err(e) = active_state.backend_handle.poll_status() {
607 self.active_state = None;
608 self.graph.deactivate();
609
610 return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
611 }
612
613 if self
614 .processor_drop_rx
615 .as_ref()
616 .unwrap()
617 .try_peek()
618 .is_some()
619 {
620 self.active_state = None;
621 self.graph.deactivate();
622
623 return Err(UpdateError::StreamStoppedUnexpectedly(None));
624 }
625 }
626
627 if self.is_audio_stream_running() {
628 if self.graph.needs_compile() {
629 let schedule_data = self
630 .graph
631 .compile(&self.active_state.as_ref().unwrap().stream_info)?;
632
633 if let Err((msg, e)) = self
634 .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
635 {
636 let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
637 unreachable!();
638 };
639
640 self.graph.on_schedule_send_failed(schedule);
641
642 return Err(e);
643 }
644 }
645
646 #[cfg(feature = "scheduled_events")]
647 if !self.queued_clear_scheduled_events.is_empty() {
648 let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
649 self.queued_clear_scheduled_events.drain(..).collect();
650
651 if let Err((msg, e)) = self
652 .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
653 {
654 let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
655 unreachable!();
656 };
657
658 self.queued_clear_scheduled_events = msgs.drain(..).collect();
659
660 return Err(e);
661 }
662 }
663
664 if !self.event_group.is_empty() {
665 let mut next_event_group = self
666 .event_group_pool
667 .pop()
668 .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
669 core::mem::swap(&mut next_event_group, &mut self.event_group);
670
671 if let Err((msg, e)) = self
672 .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
673 {
674 let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
675 unreachable!();
676 };
677
678 core::mem::swap(&mut event_group, &mut self.event_group);
679 self.event_group_pool.push(event_group);
680
681 return Err(e);
682 }
683 }
684 }
685
686 Ok(())
687 }
688
689 pub fn graph_in_node_id(&self) -> NodeID {
691 self.graph.graph_in_node()
692 }
693
694 pub fn graph_out_node_id(&self) -> NodeID {
696 self.graph.graph_out_node()
697 }
698
699 pub fn add_node<T: AudioNode + 'static>(
701 &mut self,
702 node: T,
703 config: Option<T::Configuration>,
704 ) -> NodeID {
705 self.graph.add_node(node, config)
706 }
707
708 pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
710 self.graph.add_dyn_node(node)
711 }
712
713 pub fn remove_node(&mut self, node_id: NodeID) -> Result<SmallVec<[EdgeID; 4]>, ()> {
725 self.graph.remove_node(node_id)
726 }
727
728 pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
730 self.graph.node_info(id)
731 }
732
733 pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
735 self.graph.node_state(id)
736 }
737
738 pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
740 self.graph.node_state_dyn(id)
741 }
742
743 pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
745 self.graph.node_state_mut(id)
746 }
747
748 pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
749 self.graph.node_state_dyn_mut(id)
750 }
751
752 pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
754 self.graph.nodes()
755 }
756
757 pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
759 self.graph.edges()
760 }
761
762 pub fn set_graph_channel_config(
766 &mut self,
767 channel_config: ChannelConfig,
768 ) -> SmallVec<[EdgeID; 4]> {
769 self.graph.set_graph_channel_config(channel_config)
770 }
771
772 pub fn connect(
790 &mut self,
791 src_node: NodeID,
792 dst_node: NodeID,
793 ports_src_dst: &[(PortIdx, PortIdx)],
794 check_for_cycles: bool,
795 ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
796 self.graph
797 .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
798 }
799
800 pub fn disconnect(
811 &mut self,
812 src_node: NodeID,
813 dst_node: NodeID,
814 ports_src_dst: &[(PortIdx, PortIdx)],
815 ) -> bool {
816 self.graph.disconnect(src_node, dst_node, ports_src_dst)
817 }
818
819 pub fn disconnect_all_between(
824 &mut self,
825 src_node: NodeID,
826 dst_node: NodeID,
827 ) -> SmallVec<[EdgeID; 4]> {
828 self.graph.disconnect_all_between(src_node, dst_node)
829 }
830
831 pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
835 self.graph.disconnect_by_edge_id(edge_id)
836 }
837
838 pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
840 self.graph.edge(edge_id)
841 }
842
843 pub fn cycle_detected(&mut self) -> bool {
847 self.graph.cycle_detected()
848 }
849
850 pub fn queue_event(&mut self, event: NodeEvent) {
855 self.event_group.push(event);
856 }
857
858 pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
863 self.queue_event(NodeEvent {
864 node_id,
865 #[cfg(feature = "scheduled_events")]
866 time: None,
867 event,
868 });
869 }
870
871 #[cfg(feature = "scheduled_events")]
876 pub fn schedule_event_for(
877 &mut self,
878 node_id: NodeID,
879 event: NodeEventType,
880 time: EventInstant,
881 ) {
882 self.queue_event(NodeEvent {
883 node_id,
884 time: Some(time),
885 event,
886 });
887 }
888
889 #[cfg(feature = "scheduled_events")]
897 pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
898 self.queued_clear_scheduled_events
899 .push(ClearScheduledEventsEvent {
900 node_id: None,
901 event_type,
902 });
903 }
904
905 #[cfg(feature = "scheduled_events")]
913 pub fn cancel_scheduled_events_for(
914 &mut self,
915 node_id: NodeID,
916 event_type: ClearScheduledEventsType,
917 ) {
918 self.queued_clear_scheduled_events
919 .push(ClearScheduledEventsEvent {
920 node_id: Some(node_id),
921 event_type,
922 });
923 }
924
925 fn send_message_to_processor(
926 &mut self,
927 msg: ContextToProcessorMsg,
928 ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
929 self.to_processor_tx
930 .try_push(msg)
931 .map_err(|msg| (msg, UpdateError::MsgChannelFull))
932 }
933}
934
935impl<B: AudioBackend> Drop for FirewheelCtx<B> {
936 fn drop(&mut self) {
937 self.stop_stream();
938
939 #[cfg(not(target_family = "wasm"))]
942 if let Some(drop_rx) = self.processor_drop_rx.take() {
943 let now = bevy_platform::time::Instant::now();
944
945 while drop_rx.try_peek().is_none() {
946 if now.elapsed() > core::time::Duration::from_secs(2) {
947 break;
948 }
949
950 bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
951 }
952 }
953
954 firewheel_core::collector::GlobalCollector.collect();
955 }
956}
957
958impl<B: AudioBackend> FirewheelCtx<B> {
959 pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
961 ContextQueue { context: self, id }
962 }
963}
964
965pub struct ContextQueue<'a, B: AudioBackend> {
986 context: &'a mut FirewheelCtx<B>,
987 id: NodeID,
988}
989
990#[cfg(feature = "scheduled_events")]
991pub struct TimedContextQueue<'a, B: AudioBackend> {
992 time: EventInstant,
993 context_queue: ContextQueue<'a, B>,
994}
995
996impl<'a, B: AudioBackend> ContextQueue<'a, B> {
997 pub fn reborrow<'b>(&'b mut self) -> ContextQueue<'b, B> {
998 ContextQueue {
999 context: &mut *self.context,
1000 id: self.id,
1001 }
1002 }
1003
1004 #[cfg(feature = "scheduled_events")]
1005 pub fn with_time<'b>(&'b mut self, time: EventInstant) -> TimedContextQueue<'b, B> {
1006 TimedContextQueue {
1007 time,
1008 context_queue: self.reborrow(),
1009 }
1010 }
1011}
1012
1013impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1014 fn push(&mut self, data: NodeEventType) {
1015 self.context.queue_event(NodeEvent {
1016 event: data,
1017 #[cfg(feature = "scheduled_events")]
1018 time: None,
1019 node_id: self.id,
1020 });
1021 }
1022}
1023
1024#[cfg(feature = "scheduled_events")]
1025impl<B: AudioBackend> firewheel_core::diff::EventQueue for TimedContextQueue<'_, B> {
1026 fn push(&mut self, data: NodeEventType) {
1027 self.context_queue.context.queue_event(NodeEvent {
1028 event: data,
1029 time: Some(self.time),
1030 node_id: self.context_queue.id,
1031 });
1032 }
1033}
1034
1035#[cfg(feature = "scheduled_events")]
1037#[derive(Default, Debug, Clone, Copy, PartialEq)]
1038pub enum ClearScheduledEventsType {
1039 #[default]
1041 All,
1042 NonMusicalOnly,
1044 MusicalOnly,
1046}
1047
1048fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1049 clock: &SharedClock<B::Instant>,
1050 active_state: &Option<ActiveState<B>>,
1051) -> Option<(Instant, Duration)> {
1052 active_state.as_ref().and_then(|active_state| {
1053 clock
1054 .process_timestamp
1055 .clone()
1056 .and_then(|process_timestamp| {
1057 active_state
1058 .backend_handle
1059 .delay_from_last_process(process_timestamp)
1060 .and_then(|delay| {
1061 Instant::now()
1062 .checked_sub(delay)
1063 .map(|instant| (instant, delay))
1064 })
1065 })
1066 })
1067}