1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::{
9 channel_config::{ChannelConfig, ChannelCount},
10 clock::AudioClock,
11 collector::Collector,
12 dsp::declick::DeclickValues,
13 event::{NodeEvent, NodeEventType},
14 node::{AudioNode, DynAudioNode, NodeID},
15 StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
21use bevy_platform::prelude::Box;
22#[cfg(not(feature = "std"))]
23use bevy_platform::prelude::Vec;
24
25use crate::error::RemoveNodeError;
26use crate::processor::BufferOutOfSpaceMode;
27use crate::{
28 backend::{AudioBackend, DeviceInfo},
29 error::{AddEdgeError, StartStreamError, UpdateError},
30 graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
31 processor::{
32 ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
33 SharedClock,
34 },
35};
36
37#[cfg(feature = "scheduled_events")]
38use crate::processor::ClearScheduledEventsEvent;
39#[cfg(feature = "scheduled_events")]
40use firewheel_core::clock::EventInstant;
41
42#[cfg(feature = "musical_transport")]
43use firewheel_core::clock::TransportState;
44
45#[derive(Debug, Clone, Copy, PartialEq)]
47pub struct FirewheelConfig {
48 pub num_graph_inputs: ChannelCount,
50 pub num_graph_outputs: ChannelCount,
52 pub hard_clip_outputs: bool,
61 pub initial_node_capacity: u32,
65 pub initial_edge_capacity: u32,
69 pub declick_seconds: f32,
74 pub initial_event_group_capacity: u32,
78 pub channel_capacity: u32,
82 pub event_queue_capacity: usize,
89 pub immediate_event_capacity: usize,
95 #[cfg(feature = "scheduled_events")]
103 pub scheduled_event_capacity: usize,
104 pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
108
109 pub logger_config: RealtimeLoggerConfig,
111
112 pub debug_force_clear_buffers: bool,
122}
123
124impl Default for FirewheelConfig {
125 fn default() -> Self {
126 Self {
127 num_graph_inputs: ChannelCount::ZERO,
128 num_graph_outputs: ChannelCount::STEREO,
129 hard_clip_outputs: false,
130 initial_node_capacity: 128,
131 initial_edge_capacity: 256,
132 declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
133 initial_event_group_capacity: 128,
134 channel_capacity: 64,
135 event_queue_capacity: 128,
136 immediate_event_capacity: 512,
137 #[cfg(feature = "scheduled_events")]
138 scheduled_event_capacity: 512,
139 buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
140 logger_config: RealtimeLoggerConfig::default(),
141 debug_force_clear_buffers: false,
142 }
143 }
144}
145
146struct ActiveState<B: AudioBackend> {
147 backend_handle: B,
148 stream_info: StreamInfo,
149}
150
151pub struct FirewheelCtx<B: AudioBackend> {
153 graph: AudioGraph,
154
155 to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
156 from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
157 logger_rx: RealtimeLoggerMainThread,
158
159 active_state: Option<ActiveState<B>>,
160
161 processor_channel: Option<(
162 ringbuf::HeapCons<ContextToProcessorMsg>,
163 ringbuf::HeapProd<ProcessorToContextMsg>,
164 triple_buffer::Input<SharedClock<B::Instant>>,
165 RealtimeLogger,
166 )>,
167 processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
168
169 shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
170 sample_rate: NonZeroU32,
171 sample_rate_recip: f64,
172
173 #[cfg(feature = "musical_transport")]
174 transport_state: Box<TransportState>,
175 #[cfg(feature = "musical_transport")]
176 transport_state_alloc_reuse: Option<Box<TransportState>>,
177
178 event_group_pool: Vec<Vec<NodeEvent>>,
180 event_group: Vec<NodeEvent>,
181 initial_event_group_capacity: usize,
182
183 #[cfg(feature = "scheduled_events")]
184 queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
185
186 config: FirewheelConfig,
187}
188
189impl<B: AudioBackend> FirewheelCtx<B> {
190 pub fn new(config: FirewheelConfig) -> Self {
192 let (to_processor_tx, from_context_rx) =
193 ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
194 let (to_context_tx, from_processor_rx) =
195 ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
196 .split();
197
198 let initial_event_group_capacity = config.initial_event_group_capacity as usize;
199 let mut event_group_pool = Vec::with_capacity(16);
200 for _ in 0..3 {
201 event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
202 }
203
204 let (shared_clock_input, shared_clock_output) =
205 triple_buffer::triple_buffer(&SharedClock::default());
206
207 let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
208
209 Self {
210 graph: AudioGraph::new(&config),
211 to_processor_tx,
212 from_processor_rx,
213 logger_rx,
214 active_state: None,
215 processor_channel: Some((from_context_rx, to_context_tx, shared_clock_input, logger)),
216 processor_drop_rx: None,
217 shared_clock_output: RefCell::new(shared_clock_output),
218 sample_rate: NonZeroU32::new(44100).unwrap(),
219 sample_rate_recip: 44100.0f64.recip(),
220 #[cfg(feature = "musical_transport")]
221 transport_state: Box::new(TransportState::default()),
222 #[cfg(feature = "musical_transport")]
223 transport_state_alloc_reuse: None,
224 event_group_pool,
225 event_group: Vec::with_capacity(initial_event_group_capacity),
226 initial_event_group_capacity,
227 #[cfg(feature = "scheduled_events")]
228 queued_clear_scheduled_events: Vec::new(),
229 config,
230 }
231 }
232
233 pub fn active_backend(&self) -> Option<&B> {
236 self.active_state
237 .as_ref()
238 .map(|state| &state.backend_handle)
239 }
240
241 pub fn active_backend_mut(&mut self) -> Option<&mut B> {
244 self.active_state
245 .as_mut()
246 .map(|state| &mut state.backend_handle)
247 }
248
249 pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
251 B::available_input_devices()
252 }
253
254 pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
256 B::available_output_devices()
257 }
258
259 pub fn can_start_stream(&self) -> bool {
269 if self.is_audio_stream_running() {
270 false
271 } else if let Some(rx) = &self.processor_drop_rx {
272 rx.try_peek().is_some()
273 } else {
274 true
275 }
276 }
277
278 pub fn start_stream(
289 &mut self,
290 config: B::Config,
291 ) -> Result<(), StartStreamError<B::StartStreamError>> {
292 if self.is_audio_stream_running() {
293 return Err(StartStreamError::AlreadyStarted);
294 }
295
296 if !self.can_start_stream() {
297 return Err(StartStreamError::OldStreamNotFinishedStopping);
298 }
299
300 let (mut backend_handle, mut stream_info) =
301 B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
302
303 stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
304 stream_info.declick_frames = NonZeroU32::new(
305 (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
306 )
307 .unwrap_or(NonZeroU32::MIN);
308
309 let maybe_processor = self.processor_channel.take();
310
311 stream_info.prev_sample_rate = if maybe_processor.is_some() {
312 stream_info.sample_rate
313 } else {
314 self.sample_rate
315 };
316
317 self.sample_rate = stream_info.sample_rate;
318 self.sample_rate_recip = stream_info.sample_rate_recip;
319
320 let schedule = self.graph.compile(&stream_info)?;
321
322 let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
323
324 let processor = if let Some((from_context_rx, to_context_tx, shared_clock_input, logger)) =
325 maybe_processor
326 {
327 FirewheelProcessorInner::new(
328 from_context_rx,
329 to_context_tx,
330 shared_clock_input,
331 self.config.immediate_event_capacity,
332 #[cfg(feature = "scheduled_events")]
333 self.config.scheduled_event_capacity,
334 self.config.event_queue_capacity,
335 &stream_info,
336 self.config.hard_clip_outputs,
337 self.config.buffer_out_of_space_mode,
338 logger,
339 self.config.debug_force_clear_buffers,
340 )
341 } else {
342 let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
343
344 if processor.poisoned {
345 panic!("The audio thread has panicked!");
346 }
347
348 processor.new_stream(&stream_info);
349
350 processor
351 };
352
353 backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
354
355 if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
356 {
357 panic!("Firewheel message channel is full!");
358 }
359
360 self.active_state = Some(ActiveState {
361 backend_handle,
362 stream_info,
363 });
364 self.processor_drop_rx = Some(drop_rx);
365
366 Ok(())
367 }
368
369 pub fn stop_stream(&mut self) {
371 self.active_state = None;
374 self.graph.deactivate();
375 }
376
377 pub fn is_audio_stream_running(&self) -> bool {
379 self.active_state.is_some()
380 }
381
382 pub fn stream_info(&self) -> Option<&StreamInfo> {
386 self.active_state.as_ref().map(|s| &s.stream_info)
387 }
388
389 pub fn audio_clock(&self) -> AudioClock {
404 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
410 let clock = clock_borrowed.read();
411
412 let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
413 .map(|(update_instant, _delay)| update_instant);
414
415 AudioClock {
416 samples: clock.clock_samples,
417 seconds: clock
418 .clock_samples
419 .to_seconds(self.sample_rate, self.sample_rate_recip),
420 #[cfg(feature = "musical_transport")]
421 musical: clock.current_playhead,
422 #[cfg(feature = "musical_transport")]
423 transport_is_playing: clock.transport_is_playing,
424 update_instant,
425 }
426 }
427
428 pub fn audio_clock_corrected(&self) -> AudioClock {
447 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
453 let clock = clock_borrowed.read();
454
455 let Some((update_instant, delay)) =
456 audio_clock_update_instant_and_delay(&clock, &self.active_state)
457 else {
458 return AudioClock {
461 samples: clock.clock_samples,
462 seconds: clock
463 .clock_samples
464 .to_seconds(self.sample_rate, self.sample_rate_recip),
465 #[cfg(feature = "musical_transport")]
466 musical: clock.current_playhead,
467 #[cfg(feature = "musical_transport")]
468 transport_is_playing: clock.transport_is_playing,
469 update_instant: None,
470 };
471 };
472
473 let delta_seconds = DurationSeconds(delay.as_secs_f64());
475
476 let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
477
478 #[cfg(feature = "musical_transport")]
479 let musical = clock.current_playhead.map(|musical_time| {
480 if clock.transport_is_playing && self.transport_state.transport.is_some() {
481 self.transport_state
482 .transport
483 .as_ref()
484 .unwrap()
485 .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
486 } else {
487 musical_time
488 }
489 });
490
491 AudioClock {
492 samples,
493 seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
494 #[cfg(feature = "musical_transport")]
495 musical,
496 #[cfg(feature = "musical_transport")]
497 transport_is_playing: clock.transport_is_playing,
498 update_instant: Some(update_instant),
499 }
500 }
501
502 pub fn audio_clock_instant(&self) -> Option<Instant> {
514 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
520 let clock = clock_borrowed.read();
521
522 audio_clock_update_instant_and_delay(&clock, &self.active_state)
523 .map(|(update_instant, _delay)| update_instant)
524 }
525
526 #[cfg(feature = "musical_transport")]
530 pub fn sync_transport(
531 &mut self,
532 transport: &TransportState,
533 ) -> Result<(), UpdateError<B::StreamError>> {
534 if &*self.transport_state != transport {
535 let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
536 *t = transport.clone();
537 t
538 } else {
539 Box::new(transport.clone())
540 };
541
542 self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
543 .map_err(|(_, e)| e)?;
544
545 *self.transport_state = transport.clone();
546 }
547
548 Ok(())
549 }
550
551 #[cfg(feature = "musical_transport")]
553 pub fn transport_state(&self) -> &TransportState {
554 &self.transport_state
555 }
556
557 #[cfg(feature = "musical_transport")]
559 pub fn transport(&self) -> &TransportState {
560 &self.transport_state
561 }
562
563 pub fn hard_clip_outputs(&self) -> bool {
565 self.config.hard_clip_outputs
566 }
567
568 pub fn set_hard_clip_outputs(
577 &mut self,
578 hard_clip_outputs: bool,
579 ) -> Result<(), UpdateError<B::StreamError>> {
580 if self.config.hard_clip_outputs == hard_clip_outputs {
581 return Ok(());
582 }
583 self.config.hard_clip_outputs = hard_clip_outputs;
584
585 self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
586 .map_err(|(_, e)| e)
587 }
588
589 pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
593 self.logger_rx.flush();
594
595 firewheel_core::collector::GlobalCollector.collect();
596
597 for msg in self.from_processor_rx.pop_iter() {
598 match msg {
599 ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
600 event_group.clear();
601 self.event_group_pool.push(event_group);
602 }
603 ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
604 let _ = schedule_data;
605 }
606 #[cfg(feature = "musical_transport")]
607 ProcessorToContextMsg::ReturnTransportState(transport_state) => {
608 if self.transport_state_alloc_reuse.is_none() {
609 self.transport_state_alloc_reuse = Some(transport_state);
610 }
611 }
612 #[cfg(feature = "scheduled_events")]
613 ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
614 let _ = msgs;
615 }
616 }
617 }
618
619 self.graph.update(
620 self.active_state.as_ref().map(|s| &s.stream_info),
621 &mut self.event_group,
622 );
623
624 if let Some(active_state) = &mut self.active_state {
625 if let Err(e) = active_state.backend_handle.poll_status() {
626 self.active_state = None;
627 self.graph.deactivate();
628
629 return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
630 }
631
632 if self
633 .processor_drop_rx
634 .as_ref()
635 .unwrap()
636 .try_peek()
637 .is_some()
638 {
639 self.active_state = None;
640 self.graph.deactivate();
641
642 return Err(UpdateError::StreamStoppedUnexpectedly(None));
643 }
644 }
645
646 if self.is_audio_stream_running() {
647 if self.graph.needs_compile() {
648 let schedule_data = self
649 .graph
650 .compile(&self.active_state.as_ref().unwrap().stream_info)?;
651
652 if let Err((msg, e)) = self
653 .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
654 {
655 let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
656 unreachable!();
657 };
658
659 self.graph.on_schedule_send_failed(schedule);
660
661 return Err(e);
662 }
663 }
664
665 #[cfg(feature = "scheduled_events")]
666 if !self.queued_clear_scheduled_events.is_empty() {
667 let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
668 self.queued_clear_scheduled_events.drain(..).collect();
669
670 if let Err((msg, e)) = self
671 .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
672 {
673 let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
674 unreachable!();
675 };
676
677 self.queued_clear_scheduled_events = msgs.drain(..).collect();
678
679 return Err(e);
680 }
681 }
682
683 if !self.event_group.is_empty() {
684 let mut next_event_group = self
685 .event_group_pool
686 .pop()
687 .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
688 core::mem::swap(&mut next_event_group, &mut self.event_group);
689
690 if let Err((msg, e)) = self
691 .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
692 {
693 let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
694 unreachable!();
695 };
696
697 core::mem::swap(&mut event_group, &mut self.event_group);
698 self.event_group_pool.push(event_group);
699
700 return Err(e);
701 }
702 }
703 }
704
705 Ok(())
706 }
707
708 pub fn graph_in_node_id(&self) -> NodeID {
710 self.graph.graph_in_node()
711 }
712
713 pub fn graph_out_node_id(&self) -> NodeID {
715 self.graph.graph_out_node()
716 }
717
718 pub fn add_node<T: AudioNode + 'static>(
720 &mut self,
721 node: T,
722 config: Option<T::Configuration>,
723 ) -> NodeID {
724 self.graph.add_node(node, config)
725 }
726
727 pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
729 self.graph.add_dyn_node(node)
730 }
731
732 pub fn remove_node(
743 &mut self,
744 node_id: NodeID,
745 ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
746 self.graph.remove_node(node_id)
747 }
748
749 pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
751 self.graph.node_info(id)
752 }
753
754 pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
756 self.graph.node_state(id)
757 }
758
759 pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
761 self.graph.node_state_dyn(id)
762 }
763
764 pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
766 self.graph.node_state_mut(id)
767 }
768
769 pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
770 self.graph.node_state_dyn_mut(id)
771 }
772
773 pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
775 self.graph.nodes()
776 }
777
778 pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
780 self.graph.edges()
781 }
782
783 pub fn set_graph_channel_config(
787 &mut self,
788 channel_config: ChannelConfig,
789 ) -> SmallVec<[EdgeID; 4]> {
790 self.graph.set_graph_channel_config(channel_config)
791 }
792
793 pub fn connect(
811 &mut self,
812 src_node: NodeID,
813 dst_node: NodeID,
814 ports_src_dst: &[(PortIdx, PortIdx)],
815 check_for_cycles: bool,
816 ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
817 self.graph
818 .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
819 }
820
821 pub fn disconnect(
832 &mut self,
833 src_node: NodeID,
834 dst_node: NodeID,
835 ports_src_dst: &[(PortIdx, PortIdx)],
836 ) -> bool {
837 self.graph.disconnect(src_node, dst_node, ports_src_dst)
838 }
839
840 pub fn disconnect_all_between(
845 &mut self,
846 src_node: NodeID,
847 dst_node: NodeID,
848 ) -> SmallVec<[EdgeID; 4]> {
849 self.graph.disconnect_all_between(src_node, dst_node)
850 }
851
852 pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
856 self.graph.disconnect_by_edge_id(edge_id)
857 }
858
859 pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
861 self.graph.edge(edge_id)
862 }
863
864 pub fn cycle_detected(&mut self) -> bool {
868 self.graph.cycle_detected()
869 }
870
871 pub fn queue_event(&mut self, event: NodeEvent) {
876 self.event_group.push(event);
877 }
878
879 pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
884 self.queue_event(NodeEvent {
885 node_id,
886 #[cfg(feature = "scheduled_events")]
887 time: None,
888 event,
889 });
890 }
891
892 #[cfg(feature = "scheduled_events")]
900 pub fn schedule_event_for(
901 &mut self,
902 node_id: NodeID,
903 event: NodeEventType,
904 time: Option<EventInstant>,
905 ) {
906 self.queue_event(NodeEvent {
907 node_id,
908 time,
909 event,
910 });
911 }
912
913 #[cfg(feature = "scheduled_events")]
921 pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
922 self.queued_clear_scheduled_events
923 .push(ClearScheduledEventsEvent {
924 node_id: None,
925 event_type,
926 });
927 }
928
929 #[cfg(feature = "scheduled_events")]
937 pub fn cancel_scheduled_events_for(
938 &mut self,
939 node_id: NodeID,
940 event_type: ClearScheduledEventsType,
941 ) {
942 self.queued_clear_scheduled_events
943 .push(ClearScheduledEventsEvent {
944 node_id: Some(node_id),
945 event_type,
946 });
947 }
948
949 fn send_message_to_processor(
950 &mut self,
951 msg: ContextToProcessorMsg,
952 ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
953 self.to_processor_tx
954 .try_push(msg)
955 .map_err(|msg| (msg, UpdateError::MsgChannelFull))
956 }
957}
958
959impl<B: AudioBackend> Drop for FirewheelCtx<B> {
960 fn drop(&mut self) {
961 self.stop_stream();
962
963 #[cfg(not(target_family = "wasm"))]
966 if let Some(drop_rx) = self.processor_drop_rx.take() {
967 let now = bevy_platform::time::Instant::now();
968
969 while drop_rx.try_peek().is_none() {
970 if now.elapsed() > core::time::Duration::from_secs(2) {
971 break;
972 }
973
974 bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
975 }
976 }
977
978 firewheel_core::collector::GlobalCollector.collect();
979 }
980}
981
982impl<B: AudioBackend> FirewheelCtx<B> {
983 pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
985 ContextQueue {
986 context: self,
987 id,
988 #[cfg(feature = "scheduled_events")]
989 time: None,
990 }
991 }
992
993 #[cfg(feature = "scheduled_events")]
994 pub fn event_queue_scheduled(
995 &mut self,
996 id: NodeID,
997 time: Option<EventInstant>,
998 ) -> ContextQueue<'_, B> {
999 ContextQueue {
1000 context: self,
1001 id,
1002 time,
1003 }
1004 }
1005}
1006
1007pub struct ContextQueue<'a, B: AudioBackend> {
1028 context: &'a mut FirewheelCtx<B>,
1029 id: NodeID,
1030 #[cfg(feature = "scheduled_events")]
1031 time: Option<EventInstant>,
1032}
1033
1034#[cfg(feature = "scheduled_events")]
1035impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1036 pub fn time(&self) -> Option<EventInstant> {
1037 self.time
1038 }
1039}
1040
1041impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1042 fn push(&mut self, data: NodeEventType) {
1043 self.context.queue_event(NodeEvent {
1044 event: data,
1045 #[cfg(feature = "scheduled_events")]
1046 time: self.time,
1047 node_id: self.id,
1048 });
1049 }
1050}
1051
1052#[cfg(feature = "scheduled_events")]
1054#[derive(Default, Debug, Clone, Copy, PartialEq)]
1055pub enum ClearScheduledEventsType {
1056 #[default]
1058 All,
1059 NonMusicalOnly,
1061 MusicalOnly,
1063}
1064
1065fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1066 clock: &SharedClock<B::Instant>,
1067 active_state: &Option<ActiveState<B>>,
1068) -> Option<(Instant, Duration)> {
1069 active_state.as_ref().and_then(|active_state| {
1070 clock
1071 .process_timestamp
1072 .clone()
1073 .and_then(|process_timestamp| {
1074 active_state
1075 .backend_handle
1076 .delay_from_last_process(process_timestamp)
1077 .and_then(|delay| {
1078 Instant::now()
1079 .checked_sub(delay)
1080 .map(|instant| (instant, delay))
1081 })
1082 })
1083 })
1084}