1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::node::ProcStore;
9use firewheel_core::{
10 channel_config::{ChannelConfig, ChannelCount},
11 clock::AudioClock,
12 dsp::declick::DeclickValues,
13 event::{NodeEvent, NodeEventType},
14 node::{AudioNode, DynAudioNode, NodeID},
15 StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(not(feature = "std"))]
21use num_traits::Float;
22
23#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
24use bevy_platform::prelude::Box;
25#[cfg(not(feature = "std"))]
26use bevy_platform::prelude::Vec;
27
28use crate::error::RemoveNodeError;
29use crate::processor::BufferOutOfSpaceMode;
30use crate::{
31 backend::AudioBackend,
32 error::{AddEdgeError, StartStreamError, UpdateError},
33 graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
34 processor::{
35 ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
36 SharedClock,
37 },
38};
39
40#[cfg(feature = "scheduled_events")]
41use crate::processor::ClearScheduledEventsEvent;
42#[cfg(feature = "scheduled_events")]
43use firewheel_core::clock::EventInstant;
44
45#[cfg(feature = "musical_transport")]
46use firewheel_core::clock::TransportState;
47
48#[derive(Debug, Clone, Copy, PartialEq)]
50#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub struct FirewheelConfig {
53 pub num_graph_inputs: ChannelCount,
55 pub num_graph_outputs: ChannelCount,
57 pub hard_clip_outputs: bool,
66 pub initial_node_capacity: u32,
70 pub initial_edge_capacity: u32,
74 pub declick_seconds: f32,
79 pub initial_event_group_capacity: u32,
83 pub channel_capacity: u32,
87 pub event_queue_capacity: usize,
94 pub immediate_event_capacity: usize,
100 #[cfg(feature = "scheduled_events")]
108 pub scheduled_event_capacity: usize,
109 pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
113
114 pub logger_config: RealtimeLoggerConfig,
116
117 pub debug_force_clear_buffers: bool,
127
128 pub proc_store_capacity: usize,
132}
133
134impl Default for FirewheelConfig {
135 fn default() -> Self {
136 Self {
137 num_graph_inputs: ChannelCount::ZERO,
138 num_graph_outputs: ChannelCount::STEREO,
139 hard_clip_outputs: false,
140 initial_node_capacity: 128,
141 initial_edge_capacity: 256,
142 declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
143 initial_event_group_capacity: 128,
144 channel_capacity: 64,
145 event_queue_capacity: 128,
146 immediate_event_capacity: 512,
147 #[cfg(feature = "scheduled_events")]
148 scheduled_event_capacity: 512,
149 buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
150 logger_config: RealtimeLoggerConfig::default(),
151 debug_force_clear_buffers: false,
152 proc_store_capacity: 8,
153 }
154 }
155}
156
157struct ActiveState<B: AudioBackend> {
158 backend_handle: B,
159 stream_info: StreamInfo,
160}
161
162pub(crate) struct ProcessorChannel<B: AudioBackend> {
163 pub(crate) from_context_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
164 pub(crate) to_context_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
165 pub(crate) shared_clock_input: triple_buffer::Input<SharedClock<B::Instant>>,
166 pub(crate) logger: RealtimeLogger,
167 pub(crate) store: ProcStore,
168}
169
170pub struct FirewheelCtx<B: AudioBackend> {
172 graph: AudioGraph,
173
174 to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
175 from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
176 logger_rx: RealtimeLoggerMainThread,
177
178 active_state: Option<ActiveState<B>>,
179
180 processor_channel: Option<ProcessorChannel<B>>,
181 processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
182
183 shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
184 sample_rate: NonZeroU32,
185 sample_rate_recip: f64,
186
187 #[cfg(feature = "musical_transport")]
188 transport_state: Box<TransportState>,
189 #[cfg(feature = "musical_transport")]
190 transport_state_alloc_reuse: Option<Box<TransportState>>,
191
192 event_group_pool: Vec<Vec<NodeEvent>>,
194 event_group: Vec<NodeEvent>,
195 initial_event_group_capacity: usize,
196
197 #[cfg(feature = "scheduled_events")]
198 queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
199
200 config: FirewheelConfig,
201}
202
203impl<B: AudioBackend> FirewheelCtx<B> {
204 pub fn new(config: FirewheelConfig) -> Self {
206 let (to_processor_tx, from_context_rx) =
207 ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
208 let (to_context_tx, from_processor_rx) =
209 ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
210 .split();
211
212 let initial_event_group_capacity = config.initial_event_group_capacity as usize;
213 let mut event_group_pool = Vec::with_capacity(16);
214 for _ in 0..3 {
215 event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
216 }
217
218 let (shared_clock_input, shared_clock_output) =
219 triple_buffer::triple_buffer(&SharedClock::default());
220
221 let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
222
223 let store = ProcStore::with_capacity(config.proc_store_capacity);
224
225 Self {
226 graph: AudioGraph::new(&config),
227 to_processor_tx,
228 from_processor_rx,
229 logger_rx,
230 active_state: None,
231 processor_channel: Some(ProcessorChannel {
232 from_context_rx,
233 to_context_tx,
234 shared_clock_input,
235 logger,
236 store,
237 }),
238 processor_drop_rx: None,
239 shared_clock_output: RefCell::new(shared_clock_output),
240 sample_rate: NonZeroU32::new(44100).unwrap(),
241 sample_rate_recip: 44100.0f64.recip(),
242 #[cfg(feature = "musical_transport")]
243 transport_state: Box::new(TransportState::default()),
244 #[cfg(feature = "musical_transport")]
245 transport_state_alloc_reuse: None,
246 event_group_pool,
247 event_group: Vec::with_capacity(initial_event_group_capacity),
248 initial_event_group_capacity,
249 #[cfg(feature = "scheduled_events")]
250 queued_clear_scheduled_events: Vec::new(),
251 config,
252 }
253 }
254
255 pub fn proc_store(&self) -> Option<&ProcStore> {
259 if let Some(proc_channel) = &self.processor_channel {
260 Some(&proc_channel.store)
261 } else if let Some(processor) = self.processor_drop_rx.as_ref().unwrap().last() {
262 if processor.poisoned {
263 panic!("The audio thread has panicked!");
264 }
265
266 Some(&processor.extra.store)
267 } else {
268 None
269 }
270 }
271
272 pub fn proc_store_mut(&mut self) -> Option<&mut ProcStore> {
276 if let Some(proc_channel) = &mut self.processor_channel {
277 Some(&mut proc_channel.store)
278 } else if let Some(processor) = self.processor_drop_rx.as_mut().unwrap().last_mut() {
279 if processor.poisoned {
280 panic!("The audio thread has panicked!");
281 }
282
283 Some(&mut processor.extra.store)
284 } else {
285 None
286 }
287 }
288
289 pub fn active_backend(&self) -> Option<&B> {
292 self.active_state
293 .as_ref()
294 .map(|state| &state.backend_handle)
295 }
296
297 pub fn active_backend_mut(&mut self) -> Option<&mut B> {
300 self.active_state
301 .as_mut()
302 .map(|state| &mut state.backend_handle)
303 }
304
305 pub fn device_enumerator(&self) -> B::Enumerator {
308 B::enumerator()
309 }
310
311 pub fn can_start_stream(&self) -> bool {
321 if self.is_audio_stream_running() {
322 false
323 } else if let Some(rx) = &self.processor_drop_rx {
324 rx.try_peek().is_some()
325 } else {
326 true
327 }
328 }
329
330 pub fn start_stream(
341 &mut self,
342 config: B::Config,
343 ) -> Result<(), StartStreamError<B::StartStreamError>> {
344 if self.is_audio_stream_running() {
345 return Err(StartStreamError::AlreadyStarted);
346 }
347
348 if !self.can_start_stream() {
349 return Err(StartStreamError::OldStreamNotFinishedStopping);
350 }
351
352 let (mut backend_handle, mut stream_info) =
353 B::start_stream(config).map_err(StartStreamError::BackendError)?;
354
355 stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
356 stream_info.declick_frames = NonZeroU32::new(
357 (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
358 )
359 .unwrap_or(NonZeroU32::MIN);
360
361 let maybe_processor = self.processor_channel.take();
362
363 stream_info.prev_sample_rate = if maybe_processor.is_some() {
364 stream_info.sample_rate
365 } else {
366 self.sample_rate
367 };
368
369 self.sample_rate = stream_info.sample_rate;
370 self.sample_rate_recip = stream_info.sample_rate_recip;
371
372 let schedule = self.graph.compile(&stream_info)?;
373
374 let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
375
376 let processor = if let Some(proc_channel) = maybe_processor {
377 FirewheelProcessorInner::new(
378 proc_channel,
379 self.config.immediate_event_capacity,
380 #[cfg(feature = "scheduled_events")]
381 self.config.scheduled_event_capacity,
382 self.config.event_queue_capacity,
383 &stream_info,
384 self.config.hard_clip_outputs,
385 self.config.buffer_out_of_space_mode,
386 self.config.debug_force_clear_buffers,
387 )
388 } else {
389 let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
390
391 if processor.poisoned {
392 panic!("The audio thread has panicked!");
393 }
394
395 processor.new_stream(&stream_info);
396
397 processor
398 };
399
400 backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
401
402 if self
403 .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
404 .is_err()
405 {
406 panic!("Firewheel message channel is full!");
407 }
408
409 self.active_state = Some(ActiveState {
410 backend_handle,
411 stream_info,
412 });
413 self.processor_drop_rx = Some(drop_rx);
414
415 Ok(())
416 }
417
418 pub fn stop_stream(&mut self) {
420 self.active_state = None;
423 self.graph.deactivate();
424 }
425
426 pub fn is_audio_stream_running(&self) -> bool {
428 self.active_state.is_some()
429 }
430
431 pub fn stream_info(&self) -> Option<&StreamInfo> {
435 self.active_state.as_ref().map(|s| &s.stream_info)
436 }
437
438 pub fn audio_clock(&self) -> AudioClock {
453 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
459 let clock = clock_borrowed.read();
460
461 let update_instant = audio_clock_update_instant_and_delay(clock, &self.active_state)
462 .map(|(update_instant, _delay)| update_instant);
463
464 AudioClock {
465 samples: clock.clock_samples,
466 seconds: clock
467 .clock_samples
468 .to_seconds(self.sample_rate, self.sample_rate_recip),
469 #[cfg(feature = "musical_transport")]
470 musical: clock.current_playhead,
471 #[cfg(feature = "musical_transport")]
472 transport_is_playing: clock.transport_is_playing,
473 update_instant,
474 }
475 }
476
477 pub fn audio_clock_corrected(&self) -> AudioClock {
496 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
502 let clock = clock_borrowed.read();
503
504 let Some((update_instant, delay)) =
505 audio_clock_update_instant_and_delay(clock, &self.active_state)
506 else {
507 return AudioClock {
510 samples: clock.clock_samples,
511 seconds: clock
512 .clock_samples
513 .to_seconds(self.sample_rate, self.sample_rate_recip),
514 #[cfg(feature = "musical_transport")]
515 musical: clock.current_playhead,
516 #[cfg(feature = "musical_transport")]
517 transport_is_playing: clock.transport_is_playing,
518 update_instant: None,
519 };
520 };
521
522 let delta_seconds = DurationSeconds(delay.as_secs_f64());
524
525 let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
526
527 #[cfg(feature = "musical_transport")]
528 let musical = clock.current_playhead.map(|musical_time| {
529 if clock.transport_is_playing && self.transport_state.transport.is_some() {
530 self.transport_state
531 .transport
532 .as_ref()
533 .unwrap()
534 .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
535 } else {
536 musical_time
537 }
538 });
539
540 AudioClock {
541 samples,
542 seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
543 #[cfg(feature = "musical_transport")]
544 musical,
545 #[cfg(feature = "musical_transport")]
546 transport_is_playing: clock.transport_is_playing,
547 update_instant: Some(update_instant),
548 }
549 }
550
551 pub fn audio_clock_instant(&self) -> Option<Instant> {
563 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
569 let clock = clock_borrowed.read();
570
571 audio_clock_update_instant_and_delay(clock, &self.active_state)
572 .map(|(update_instant, _delay)| update_instant)
573 }
574
575 #[cfg(feature = "musical_transport")]
579 pub fn sync_transport(
580 &mut self,
581 transport: &TransportState,
582 ) -> Result<(), UpdateError<B::StreamError>> {
583 if &*self.transport_state != transport {
584 let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
585 *t = transport.clone();
586 t
587 } else {
588 Box::new(transport.clone())
589 };
590
591 self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
592 .map_err(|(_, e)| e)?;
593
594 *self.transport_state = transport.clone();
595 }
596
597 Ok(())
598 }
599
600 #[cfg(feature = "musical_transport")]
602 pub fn transport_state(&self) -> &TransportState {
603 &self.transport_state
604 }
605
606 #[cfg(feature = "musical_transport")]
608 pub fn transport(&self) -> &TransportState {
609 &self.transport_state
610 }
611
612 pub fn hard_clip_outputs(&self) -> bool {
614 self.config.hard_clip_outputs
615 }
616
617 pub fn set_hard_clip_outputs(
626 &mut self,
627 hard_clip_outputs: bool,
628 ) -> Result<(), UpdateError<B::StreamError>> {
629 if self.config.hard_clip_outputs == hard_clip_outputs {
630 return Ok(());
631 }
632 self.config.hard_clip_outputs = hard_clip_outputs;
633
634 self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
635 .map_err(|(_, e)| e)
636 }
637
638 pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
642 self.logger_rx.flush(
643 |msg| {
644 #[cfg(feature = "tracing")]
645 tracing::error!("{}", msg);
646
647 #[cfg(all(feature = "log", not(feature = "tracing")))]
648 log::error!("{}", msg);
649
650 let _ = msg;
651 },
652 #[cfg(debug_assertions)]
653 |msg| {
654 #[cfg(feature = "tracing")]
655 tracing::debug!("{}", msg);
656
657 #[cfg(all(feature = "log", not(feature = "tracing")))]
658 log::debug!("{}", msg);
659
660 let _ = msg;
661 },
662 );
663
664 firewheel_core::collector::GlobalRtGc::collect();
665
666 for msg in self.from_processor_rx.pop_iter() {
667 match msg {
668 ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
669 event_group.clear();
670 self.event_group_pool.push(event_group);
671 }
672 ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
673 let _ = schedule_data;
674 }
675 #[cfg(feature = "musical_transport")]
676 ProcessorToContextMsg::ReturnTransportState(transport_state) => {
677 if self.transport_state_alloc_reuse.is_none() {
678 self.transport_state_alloc_reuse = Some(transport_state);
679 }
680 }
681 #[cfg(feature = "scheduled_events")]
682 ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
683 let _ = msgs;
684 }
685 }
686 }
687
688 self.graph.update(
689 self.active_state.as_ref().map(|s| &s.stream_info),
690 &mut self.event_group,
691 );
692
693 if let Some(active_state) = &mut self.active_state {
694 if let Err(e) = active_state.backend_handle.poll_status() {
695 self.active_state = None;
696 self.graph.deactivate();
697
698 return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
699 }
700
701 if self
702 .processor_drop_rx
703 .as_ref()
704 .unwrap()
705 .try_peek()
706 .is_some()
707 {
708 self.active_state = None;
709 self.graph.deactivate();
710
711 return Err(UpdateError::StreamStoppedUnexpectedly(None));
712 }
713 }
714
715 if self.is_audio_stream_running() {
716 if self.graph.needs_compile() {
717 let schedule_data = self
718 .graph
719 .compile(&self.active_state.as_ref().unwrap().stream_info)?;
720
721 if let Err((msg, e)) = self
722 .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
723 {
724 let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
725 unreachable!();
726 };
727
728 self.graph.on_schedule_send_failed(schedule);
729
730 return Err(e);
731 }
732 }
733
734 #[cfg(feature = "scheduled_events")]
735 if !self.queued_clear_scheduled_events.is_empty() {
736 let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
737 self.queued_clear_scheduled_events.drain(..).collect();
738
739 if let Err((msg, e)) = self
740 .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
741 {
742 let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
743 unreachable!();
744 };
745
746 self.queued_clear_scheduled_events = msgs.drain(..).collect();
747
748 return Err(e);
749 }
750 }
751
752 if !self.event_group.is_empty() {
753 let mut next_event_group = self
754 .event_group_pool
755 .pop()
756 .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
757 core::mem::swap(&mut next_event_group, &mut self.event_group);
758
759 if let Err((msg, e)) = self
760 .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
761 {
762 let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
763 unreachable!();
764 };
765
766 core::mem::swap(&mut event_group, &mut self.event_group);
767 self.event_group_pool.push(event_group);
768
769 return Err(e);
770 }
771 }
772 }
773
774 Ok(())
775 }
776
777 pub fn graph_in_node_id(&self) -> NodeID {
779 self.graph.graph_in_node()
780 }
781
782 pub fn graph_out_node_id(&self) -> NodeID {
784 self.graph.graph_out_node()
785 }
786
787 pub fn add_node<T: AudioNode + 'static>(
789 &mut self,
790 node: T,
791 config: Option<T::Configuration>,
792 ) -> NodeID {
793 self.graph.add_node(node, config)
794 }
795
796 pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
798 self.graph.add_dyn_node(node)
799 }
800
801 pub fn remove_node(
812 &mut self,
813 node_id: NodeID,
814 ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
815 self.graph.remove_node(node_id)
816 }
817
818 pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
820 self.graph.node_info(id)
821 }
822
823 pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
825 self.graph.node_state(id)
826 }
827
828 pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
830 self.graph.node_state_dyn(id)
831 }
832
833 pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
835 self.graph.node_state_mut(id)
836 }
837
838 pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
839 self.graph.node_state_dyn_mut(id)
840 }
841
842 pub fn nodes(&self) -> impl Iterator<Item = &NodeEntry> {
844 self.graph.nodes()
845 }
846
847 pub fn edges(&self) -> impl Iterator<Item = &Edge> {
849 self.graph.edges()
850 }
851
852 pub fn set_graph_channel_config(
856 &mut self,
857 channel_config: ChannelConfig,
858 ) -> SmallVec<[EdgeID; 4]> {
859 self.graph.set_graph_channel_config(channel_config)
860 }
861
862 pub fn connect(
880 &mut self,
881 src_node: NodeID,
882 dst_node: NodeID,
883 ports_src_dst: &[(PortIdx, PortIdx)],
884 check_for_cycles: bool,
885 ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
886 self.graph
887 .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
888 }
889
890 pub fn disconnect(
901 &mut self,
902 src_node: NodeID,
903 dst_node: NodeID,
904 ports_src_dst: &[(PortIdx, PortIdx)],
905 ) -> bool {
906 self.graph.disconnect(src_node, dst_node, ports_src_dst)
907 }
908
909 pub fn disconnect_all_between(
914 &mut self,
915 src_node: NodeID,
916 dst_node: NodeID,
917 ) -> SmallVec<[EdgeID; 4]> {
918 self.graph.disconnect_all_between(src_node, dst_node)
919 }
920
921 pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
925 self.graph.disconnect_by_edge_id(edge_id)
926 }
927
928 pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
930 self.graph.edge(edge_id)
931 }
932
933 pub fn cycle_detected(&mut self) -> bool {
937 self.graph.cycle_detected()
938 }
939
940 pub fn queue_event(&mut self, event: NodeEvent) {
945 self.event_group.push(event);
946 }
947
948 pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
953 self.queue_event(NodeEvent {
954 node_id,
955 #[cfg(feature = "scheduled_events")]
956 time: None,
957 event,
958 });
959 }
960
961 #[cfg(feature = "scheduled_events")]
969 pub fn schedule_event_for(
970 &mut self,
971 node_id: NodeID,
972 event: NodeEventType,
973 time: Option<EventInstant>,
974 ) {
975 self.queue_event(NodeEvent {
976 node_id,
977 time,
978 event,
979 });
980 }
981
982 #[cfg(feature = "scheduled_events")]
990 pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
991 self.queued_clear_scheduled_events
992 .push(ClearScheduledEventsEvent {
993 node_id: None,
994 event_type,
995 });
996 }
997
998 #[cfg(feature = "scheduled_events")]
1006 pub fn cancel_scheduled_events_for(
1007 &mut self,
1008 node_id: NodeID,
1009 event_type: ClearScheduledEventsType,
1010 ) {
1011 self.queued_clear_scheduled_events
1012 .push(ClearScheduledEventsEvent {
1013 node_id: Some(node_id),
1014 event_type,
1015 });
1016 }
1017
1018 fn send_message_to_processor(
1019 &mut self,
1020 msg: ContextToProcessorMsg,
1021 ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
1022 self.to_processor_tx
1023 .try_push(msg)
1024 .map_err(|msg| (msg, UpdateError::MsgChannelFull))
1025 }
1026}
1027
1028impl<B: AudioBackend> Drop for FirewheelCtx<B> {
1029 fn drop(&mut self) {
1030 self.stop_stream();
1031
1032 #[cfg(not(target_family = "wasm"))]
1035 if let Some(drop_rx) = self.processor_drop_rx.take() {
1036 let now = bevy_platform::time::Instant::now();
1037
1038 while drop_rx.try_peek().is_none() {
1039 if now.elapsed() > core::time::Duration::from_secs(2) {
1040 break;
1041 }
1042
1043 bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
1044 }
1045 }
1046
1047 firewheel_core::collector::GlobalRtGc::collect();
1048 }
1049}
1050
1051impl<B: AudioBackend> FirewheelCtx<B> {
1052 pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
1054 ContextQueue {
1055 context: self,
1056 id,
1057 #[cfg(feature = "scheduled_events")]
1058 time: None,
1059 }
1060 }
1061
1062 #[cfg(feature = "scheduled_events")]
1063 pub fn event_queue_scheduled(
1064 &mut self,
1065 id: NodeID,
1066 time: Option<EventInstant>,
1067 ) -> ContextQueue<'_, B> {
1068 ContextQueue {
1069 context: self,
1070 id,
1071 time,
1072 }
1073 }
1074}
1075
1076pub struct ContextQueue<'a, B: AudioBackend> {
1097 context: &'a mut FirewheelCtx<B>,
1098 id: NodeID,
1099 #[cfg(feature = "scheduled_events")]
1100 time: Option<EventInstant>,
1101}
1102
1103#[cfg(feature = "scheduled_events")]
1104impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1105 pub fn time(&self) -> Option<EventInstant> {
1106 self.time
1107 }
1108}
1109
1110impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1111 fn push(&mut self, data: NodeEventType) {
1112 self.context.queue_event(NodeEvent {
1113 event: data,
1114 #[cfg(feature = "scheduled_events")]
1115 time: self.time,
1116 node_id: self.id,
1117 });
1118 }
1119}
1120
1121#[cfg(feature = "scheduled_events")]
1123#[derive(Default, Debug, Clone, Copy, PartialEq)]
1124pub enum ClearScheduledEventsType {
1125 #[default]
1127 All,
1128 NonMusicalOnly,
1130 MusicalOnly,
1132}
1133
1134fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1135 clock: &SharedClock<B::Instant>,
1136 active_state: &Option<ActiveState<B>>,
1137) -> Option<(Instant, Duration)> {
1138 active_state.as_ref().and_then(|active_state| {
1139 clock
1140 .process_timestamp
1141 .clone()
1142 .and_then(|process_timestamp| {
1143 active_state
1144 .backend_handle
1145 .delay_from_last_process(process_timestamp)
1146 .and_then(|delay| {
1147 Instant::now()
1148 .checked_sub(delay)
1149 .map(|instant| (instant, delay))
1150 })
1151 })
1152 })
1153}