1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::{
9 channel_config::{ChannelConfig, ChannelCount},
10 clock::AudioClock,
11 collector::Collector,
12 dsp::declick::DeclickValues,
13 event::{NodeEvent, NodeEventType},
14 node::{AudioNode, DynAudioNode, NodeID},
15 StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
21use bevy_platform::prelude::Box;
22#[cfg(not(feature = "std"))]
23use bevy_platform::prelude::Vec;
24
25use crate::error::RemoveNodeError;
26use crate::processor::BufferOutOfSpaceMode;
27use crate::{
28 backend::{AudioBackend, DeviceInfo},
29 error::{AddEdgeError, StartStreamError, UpdateError},
30 graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
31 processor::{
32 ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
33 SharedClock,
34 },
35};
36
37#[cfg(feature = "scheduled_events")]
38use crate::processor::ClearScheduledEventsEvent;
39#[cfg(feature = "scheduled_events")]
40use firewheel_core::clock::EventInstant;
41
42#[cfg(feature = "musical_transport")]
43use firewheel_core::clock::TransportState;
44
45#[derive(Debug, Clone, Copy, PartialEq)]
47pub struct FirewheelConfig {
48 pub num_graph_inputs: ChannelCount,
50 pub num_graph_outputs: ChannelCount,
52 pub hard_clip_outputs: bool,
61 pub initial_node_capacity: u32,
65 pub initial_edge_capacity: u32,
69 pub declick_seconds: f32,
74 pub initial_event_group_capacity: u32,
78 pub channel_capacity: u32,
82 pub event_queue_capacity: usize,
89 pub immediate_event_capacity: usize,
95 #[cfg(feature = "scheduled_events")]
103 pub scheduled_event_capacity: usize,
104 pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
108
109 pub logger_config: RealtimeLoggerConfig,
111}
112
113impl Default for FirewheelConfig {
114 fn default() -> Self {
115 Self {
116 num_graph_inputs: ChannelCount::ZERO,
117 num_graph_outputs: ChannelCount::STEREO,
118 hard_clip_outputs: false,
119 initial_node_capacity: 128,
120 initial_edge_capacity: 256,
121 declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
122 initial_event_group_capacity: 128,
123 channel_capacity: 64,
124 event_queue_capacity: 128,
125 immediate_event_capacity: 512,
126 #[cfg(feature = "scheduled_events")]
127 scheduled_event_capacity: 512,
128 buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
129 logger_config: RealtimeLoggerConfig::default(),
130 }
131 }
132}
133
134struct ActiveState<B: AudioBackend> {
135 backend_handle: B,
136 stream_info: StreamInfo,
137}
138
139pub struct FirewheelCtx<B: AudioBackend> {
141 graph: AudioGraph,
142
143 to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
144 from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
145 logger_rx: RealtimeLoggerMainThread,
146
147 active_state: Option<ActiveState<B>>,
148
149 processor_channel: Option<(
150 ringbuf::HeapCons<ContextToProcessorMsg>,
151 ringbuf::HeapProd<ProcessorToContextMsg>,
152 triple_buffer::Input<SharedClock<B::Instant>>,
153 RealtimeLogger,
154 )>,
155 processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
156
157 shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
158 sample_rate: NonZeroU32,
159 sample_rate_recip: f64,
160
161 #[cfg(feature = "musical_transport")]
162 transport_state: Box<TransportState>,
163 #[cfg(feature = "musical_transport")]
164 transport_state_alloc_reuse: Option<Box<TransportState>>,
165
166 event_group_pool: Vec<Vec<NodeEvent>>,
168 event_group: Vec<NodeEvent>,
169 initial_event_group_capacity: usize,
170
171 #[cfg(feature = "scheduled_events")]
172 queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
173
174 config: FirewheelConfig,
175}
176
177impl<B: AudioBackend> FirewheelCtx<B> {
178 pub fn new(config: FirewheelConfig) -> Self {
180 let (to_processor_tx, from_context_rx) =
181 ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
182 let (to_context_tx, from_processor_rx) =
183 ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
184 .split();
185
186 let initial_event_group_capacity = config.initial_event_group_capacity as usize;
187 let mut event_group_pool = Vec::with_capacity(16);
188 for _ in 0..3 {
189 event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
190 }
191
192 let (shared_clock_input, shared_clock_output) =
193 triple_buffer::triple_buffer(&SharedClock::default());
194
195 let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
196
197 Self {
198 graph: AudioGraph::new(&config),
199 to_processor_tx,
200 from_processor_rx,
201 logger_rx,
202 active_state: None,
203 processor_channel: Some((from_context_rx, to_context_tx, shared_clock_input, logger)),
204 processor_drop_rx: None,
205 shared_clock_output: RefCell::new(shared_clock_output),
206 sample_rate: NonZeroU32::new(44100).unwrap(),
207 sample_rate_recip: 44100.0f64.recip(),
208 #[cfg(feature = "musical_transport")]
209 transport_state: Box::new(TransportState::default()),
210 #[cfg(feature = "musical_transport")]
211 transport_state_alloc_reuse: None,
212 event_group_pool,
213 event_group: Vec::with_capacity(initial_event_group_capacity),
214 initial_event_group_capacity,
215 #[cfg(feature = "scheduled_events")]
216 queued_clear_scheduled_events: Vec::new(),
217 config,
218 }
219 }
220
221 pub fn active_backend(&self) -> Option<&B> {
224 self.active_state
225 .as_ref()
226 .map(|state| &state.backend_handle)
227 }
228
229 pub fn active_backend_mut(&mut self) -> Option<&mut B> {
232 self.active_state
233 .as_mut()
234 .map(|state| &mut state.backend_handle)
235 }
236
237 pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
239 B::available_input_devices()
240 }
241
242 pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
244 B::available_output_devices()
245 }
246
247 pub fn can_start_stream(&self) -> bool {
257 if self.is_audio_stream_running() {
258 false
259 } else if let Some(rx) = &self.processor_drop_rx {
260 rx.try_peek().is_some()
261 } else {
262 true
263 }
264 }
265
266 pub fn start_stream(
277 &mut self,
278 config: B::Config,
279 ) -> Result<(), StartStreamError<B::StartStreamError>> {
280 if self.is_audio_stream_running() {
281 return Err(StartStreamError::AlreadyStarted);
282 }
283
284 if !self.can_start_stream() {
285 return Err(StartStreamError::OldStreamNotFinishedStopping);
286 }
287
288 let (mut backend_handle, mut stream_info) =
289 B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
290
291 stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
292 stream_info.declick_frames = NonZeroU32::new(
293 (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
294 )
295 .unwrap_or(NonZeroU32::MIN);
296
297 let maybe_processor = self.processor_channel.take();
298
299 stream_info.prev_sample_rate = if maybe_processor.is_some() {
300 stream_info.sample_rate
301 } else {
302 self.sample_rate
303 };
304
305 self.sample_rate = stream_info.sample_rate;
306 self.sample_rate_recip = stream_info.sample_rate_recip;
307
308 let schedule = self.graph.compile(&stream_info)?;
309
310 let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
311
312 let processor = if let Some((from_context_rx, to_context_tx, shared_clock_input, logger)) =
313 maybe_processor
314 {
315 FirewheelProcessorInner::new(
316 from_context_rx,
317 to_context_tx,
318 shared_clock_input,
319 self.config.immediate_event_capacity,
320 #[cfg(feature = "scheduled_events")]
321 self.config.scheduled_event_capacity,
322 self.config.event_queue_capacity,
323 &stream_info,
324 self.config.hard_clip_outputs,
325 self.config.buffer_out_of_space_mode,
326 logger,
327 )
328 } else {
329 let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
330
331 if processor.poisoned {
332 panic!("The audio thread has panicked!");
333 }
334
335 processor.new_stream(&stream_info);
336
337 processor
338 };
339
340 backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
341
342 if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
343 {
344 panic!("Firewheel message channel is full!");
345 }
346
347 self.active_state = Some(ActiveState {
348 backend_handle,
349 stream_info,
350 });
351 self.processor_drop_rx = Some(drop_rx);
352
353 Ok(())
354 }
355
356 pub fn stop_stream(&mut self) {
358 self.active_state = None;
361 self.graph.deactivate();
362 }
363
364 pub fn is_audio_stream_running(&self) -> bool {
366 self.active_state.is_some()
367 }
368
369 pub fn stream_info(&self) -> Option<&StreamInfo> {
373 self.active_state.as_ref().map(|s| &s.stream_info)
374 }
375
376 pub fn audio_clock(&self) -> AudioClock {
391 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
397 let clock = clock_borrowed.read();
398
399 let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
400 .map(|(update_instant, _delay)| update_instant);
401
402 AudioClock {
403 samples: clock.clock_samples,
404 seconds: clock
405 .clock_samples
406 .to_seconds(self.sample_rate, self.sample_rate_recip),
407 #[cfg(feature = "musical_transport")]
408 musical: clock.current_playhead,
409 #[cfg(feature = "musical_transport")]
410 transport_is_playing: clock.transport_is_playing,
411 update_instant,
412 }
413 }
414
415 pub fn audio_clock_corrected(&self) -> AudioClock {
434 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
440 let clock = clock_borrowed.read();
441
442 let Some((update_instant, delay)) =
443 audio_clock_update_instant_and_delay(&clock, &self.active_state)
444 else {
445 return AudioClock {
448 samples: clock.clock_samples,
449 seconds: clock
450 .clock_samples
451 .to_seconds(self.sample_rate, self.sample_rate_recip),
452 #[cfg(feature = "musical_transport")]
453 musical: clock.current_playhead,
454 #[cfg(feature = "musical_transport")]
455 transport_is_playing: clock.transport_is_playing,
456 update_instant: None,
457 };
458 };
459
460 let delta_seconds = DurationSeconds(delay.as_secs_f64());
462
463 let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
464
465 #[cfg(feature = "musical_transport")]
466 let musical = clock.current_playhead.map(|musical_time| {
467 if clock.transport_is_playing && self.transport_state.transport.is_some() {
468 self.transport_state
469 .transport
470 .as_ref()
471 .unwrap()
472 .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
473 } else {
474 musical_time
475 }
476 });
477
478 AudioClock {
479 samples,
480 seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
481 #[cfg(feature = "musical_transport")]
482 musical,
483 #[cfg(feature = "musical_transport")]
484 transport_is_playing: clock.transport_is_playing,
485 update_instant: Some(update_instant),
486 }
487 }
488
489 pub fn audio_clock_instant(&self) -> Option<Instant> {
501 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
507 let clock = clock_borrowed.read();
508
509 audio_clock_update_instant_and_delay(&clock, &self.active_state)
510 .map(|(update_instant, _delay)| update_instant)
511 }
512
513 #[cfg(feature = "musical_transport")]
517 pub fn sync_transport(
518 &mut self,
519 transport: &TransportState,
520 ) -> Result<(), UpdateError<B::StreamError>> {
521 if &*self.transport_state != transport {
522 let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
523 *t = transport.clone();
524 t
525 } else {
526 Box::new(transport.clone())
527 };
528
529 self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
530 .map_err(|(_, e)| e)?;
531
532 *self.transport_state = transport.clone();
533 }
534
535 Ok(())
536 }
537
538 #[cfg(feature = "musical_transport")]
540 pub fn transport_state(&self) -> &TransportState {
541 &self.transport_state
542 }
543
544 #[cfg(feature = "musical_transport")]
546 pub fn transport(&self) -> &TransportState {
547 &self.transport_state
548 }
549
550 pub fn hard_clip_outputs(&self) -> bool {
552 self.config.hard_clip_outputs
553 }
554
555 pub fn set_hard_clip_outputs(
564 &mut self,
565 hard_clip_outputs: bool,
566 ) -> Result<(), UpdateError<B::StreamError>> {
567 if self.config.hard_clip_outputs == hard_clip_outputs {
568 return Ok(());
569 }
570 self.config.hard_clip_outputs = hard_clip_outputs;
571
572 self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
573 .map_err(|(_, e)| e)
574 }
575
576 pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
580 self.logger_rx.flush();
581
582 firewheel_core::collector::GlobalCollector.collect();
583
584 for msg in self.from_processor_rx.pop_iter() {
585 match msg {
586 ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
587 event_group.clear();
588 self.event_group_pool.push(event_group);
589 }
590 ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
591 let _ = schedule_data;
592 }
593 #[cfg(feature = "musical_transport")]
594 ProcessorToContextMsg::ReturnTransportState(transport_state) => {
595 if self.transport_state_alloc_reuse.is_none() {
596 self.transport_state_alloc_reuse = Some(transport_state);
597 }
598 }
599 #[cfg(feature = "scheduled_events")]
600 ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
601 let _ = msgs;
602 }
603 }
604 }
605
606 self.graph.update(
607 self.active_state.as_ref().map(|s| &s.stream_info),
608 &mut self.event_group,
609 );
610
611 if let Some(active_state) = &mut self.active_state {
612 if let Err(e) = active_state.backend_handle.poll_status() {
613 self.active_state = None;
614 self.graph.deactivate();
615
616 return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
617 }
618
619 if self
620 .processor_drop_rx
621 .as_ref()
622 .unwrap()
623 .try_peek()
624 .is_some()
625 {
626 self.active_state = None;
627 self.graph.deactivate();
628
629 return Err(UpdateError::StreamStoppedUnexpectedly(None));
630 }
631 }
632
633 if self.is_audio_stream_running() {
634 if self.graph.needs_compile() {
635 let schedule_data = self
636 .graph
637 .compile(&self.active_state.as_ref().unwrap().stream_info)?;
638
639 if let Err((msg, e)) = self
640 .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
641 {
642 let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
643 unreachable!();
644 };
645
646 self.graph.on_schedule_send_failed(schedule);
647
648 return Err(e);
649 }
650 }
651
652 #[cfg(feature = "scheduled_events")]
653 if !self.queued_clear_scheduled_events.is_empty() {
654 let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
655 self.queued_clear_scheduled_events.drain(..).collect();
656
657 if let Err((msg, e)) = self
658 .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
659 {
660 let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
661 unreachable!();
662 };
663
664 self.queued_clear_scheduled_events = msgs.drain(..).collect();
665
666 return Err(e);
667 }
668 }
669
670 if !self.event_group.is_empty() {
671 let mut next_event_group = self
672 .event_group_pool
673 .pop()
674 .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
675 core::mem::swap(&mut next_event_group, &mut self.event_group);
676
677 if let Err((msg, e)) = self
678 .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
679 {
680 let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
681 unreachable!();
682 };
683
684 core::mem::swap(&mut event_group, &mut self.event_group);
685 self.event_group_pool.push(event_group);
686
687 return Err(e);
688 }
689 }
690 }
691
692 Ok(())
693 }
694
695 pub fn graph_in_node_id(&self) -> NodeID {
697 self.graph.graph_in_node()
698 }
699
700 pub fn graph_out_node_id(&self) -> NodeID {
702 self.graph.graph_out_node()
703 }
704
705 pub fn add_node<T: AudioNode + 'static>(
707 &mut self,
708 node: T,
709 config: Option<T::Configuration>,
710 ) -> NodeID {
711 self.graph.add_node(node, config)
712 }
713
714 pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
716 self.graph.add_dyn_node(node)
717 }
718
719 pub fn remove_node(
730 &mut self,
731 node_id: NodeID,
732 ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
733 self.graph.remove_node(node_id)
734 }
735
736 pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
738 self.graph.node_info(id)
739 }
740
741 pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
743 self.graph.node_state(id)
744 }
745
746 pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
748 self.graph.node_state_dyn(id)
749 }
750
751 pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
753 self.graph.node_state_mut(id)
754 }
755
756 pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
757 self.graph.node_state_dyn_mut(id)
758 }
759
760 pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
762 self.graph.nodes()
763 }
764
765 pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
767 self.graph.edges()
768 }
769
770 pub fn set_graph_channel_config(
774 &mut self,
775 channel_config: ChannelConfig,
776 ) -> SmallVec<[EdgeID; 4]> {
777 self.graph.set_graph_channel_config(channel_config)
778 }
779
780 pub fn connect(
798 &mut self,
799 src_node: NodeID,
800 dst_node: NodeID,
801 ports_src_dst: &[(PortIdx, PortIdx)],
802 check_for_cycles: bool,
803 ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
804 self.graph
805 .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
806 }
807
808 pub fn disconnect(
819 &mut self,
820 src_node: NodeID,
821 dst_node: NodeID,
822 ports_src_dst: &[(PortIdx, PortIdx)],
823 ) -> bool {
824 self.graph.disconnect(src_node, dst_node, ports_src_dst)
825 }
826
827 pub fn disconnect_all_between(
832 &mut self,
833 src_node: NodeID,
834 dst_node: NodeID,
835 ) -> SmallVec<[EdgeID; 4]> {
836 self.graph.disconnect_all_between(src_node, dst_node)
837 }
838
839 pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
843 self.graph.disconnect_by_edge_id(edge_id)
844 }
845
846 pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
848 self.graph.edge(edge_id)
849 }
850
851 pub fn cycle_detected(&mut self) -> bool {
855 self.graph.cycle_detected()
856 }
857
858 pub fn queue_event(&mut self, event: NodeEvent) {
863 self.event_group.push(event);
864 }
865
866 pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
871 self.queue_event(NodeEvent {
872 node_id,
873 #[cfg(feature = "scheduled_events")]
874 time: None,
875 event,
876 });
877 }
878
879 #[cfg(feature = "scheduled_events")]
887 pub fn schedule_event_for(
888 &mut self,
889 node_id: NodeID,
890 event: NodeEventType,
891 time: Option<EventInstant>,
892 ) {
893 self.queue_event(NodeEvent {
894 node_id,
895 time,
896 event,
897 });
898 }
899
900 #[cfg(feature = "scheduled_events")]
908 pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
909 self.queued_clear_scheduled_events
910 .push(ClearScheduledEventsEvent {
911 node_id: None,
912 event_type,
913 });
914 }
915
916 #[cfg(feature = "scheduled_events")]
924 pub fn cancel_scheduled_events_for(
925 &mut self,
926 node_id: NodeID,
927 event_type: ClearScheduledEventsType,
928 ) {
929 self.queued_clear_scheduled_events
930 .push(ClearScheduledEventsEvent {
931 node_id: Some(node_id),
932 event_type,
933 });
934 }
935
936 fn send_message_to_processor(
937 &mut self,
938 msg: ContextToProcessorMsg,
939 ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
940 self.to_processor_tx
941 .try_push(msg)
942 .map_err(|msg| (msg, UpdateError::MsgChannelFull))
943 }
944}
945
946impl<B: AudioBackend> Drop for FirewheelCtx<B> {
947 fn drop(&mut self) {
948 self.stop_stream();
949
950 #[cfg(not(target_family = "wasm"))]
953 if let Some(drop_rx) = self.processor_drop_rx.take() {
954 let now = bevy_platform::time::Instant::now();
955
956 while drop_rx.try_peek().is_none() {
957 if now.elapsed() > core::time::Duration::from_secs(2) {
958 break;
959 }
960
961 bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
962 }
963 }
964
965 firewheel_core::collector::GlobalCollector.collect();
966 }
967}
968
969impl<B: AudioBackend> FirewheelCtx<B> {
970 pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
972 ContextQueue {
973 context: self,
974 id,
975 #[cfg(feature = "scheduled_events")]
976 time: None,
977 }
978 }
979
980 #[cfg(feature = "scheduled_events")]
981 pub fn event_queue_scheduled(
982 &mut self,
983 id: NodeID,
984 time: Option<EventInstant>,
985 ) -> ContextQueue<'_, B> {
986 ContextQueue {
987 context: self,
988 id,
989 time,
990 }
991 }
992}
993
994pub struct ContextQueue<'a, B: AudioBackend> {
1015 context: &'a mut FirewheelCtx<B>,
1016 id: NodeID,
1017 #[cfg(feature = "scheduled_events")]
1018 time: Option<EventInstant>,
1019}
1020
1021#[cfg(feature = "scheduled_events")]
1022impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1023 pub fn time(&self) -> Option<EventInstant> {
1024 self.time
1025 }
1026}
1027
1028impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1029 fn push(&mut self, data: NodeEventType) {
1030 self.context.queue_event(NodeEvent {
1031 event: data,
1032 #[cfg(feature = "scheduled_events")]
1033 time: self.time,
1034 node_id: self.id,
1035 });
1036 }
1037}
1038
1039#[cfg(feature = "scheduled_events")]
1041#[derive(Default, Debug, Clone, Copy, PartialEq)]
1042pub enum ClearScheduledEventsType {
1043 #[default]
1045 All,
1046 NonMusicalOnly,
1048 MusicalOnly,
1050}
1051
1052fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1053 clock: &SharedClock<B::Instant>,
1054 active_state: &Option<ActiveState<B>>,
1055) -> Option<(Instant, Duration)> {
1056 active_state.as_ref().and_then(|active_state| {
1057 clock
1058 .process_timestamp
1059 .clone()
1060 .and_then(|process_timestamp| {
1061 active_state
1062 .backend_handle
1063 .delay_from_last_process(process_timestamp)
1064 .and_then(|delay| {
1065 Instant::now()
1066 .checked_sub(delay)
1067 .map(|instant| (instant, delay))
1068 })
1069 })
1070 })
1071}