1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::node::ProcStore;
9use firewheel_core::{
10 channel_config::{ChannelConfig, ChannelCount},
11 clock::AudioClock,
12 dsp::declick::DeclickValues,
13 event::{NodeEvent, NodeEventType},
14 node::{AudioNode, DynAudioNode, NodeID},
15 StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(not(feature = "std"))]
21use num_traits::Float;
22
23#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
24use bevy_platform::prelude::Box;
25#[cfg(not(feature = "std"))]
26use bevy_platform::prelude::Vec;
27
28use crate::error::RemoveNodeError;
29use crate::processor::BufferOutOfSpaceMode;
30use crate::{
31 backend::AudioBackend,
32 error::{AddEdgeError, StartStreamError, UpdateError},
33 graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
34 processor::{
35 ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
36 SharedClock,
37 },
38};
39
40#[cfg(feature = "scheduled_events")]
41use crate::processor::ClearScheduledEventsEvent;
42#[cfg(feature = "scheduled_events")]
43use firewheel_core::clock::EventInstant;
44
45#[cfg(feature = "musical_transport")]
46use firewheel_core::clock::TransportState;
47
48#[derive(Debug, Clone, Copy, PartialEq)]
50#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub struct FirewheelConfig {
53 pub num_graph_inputs: ChannelCount,
55 pub num_graph_outputs: ChannelCount,
57 pub hard_clip_outputs: bool,
66 pub initial_node_capacity: u32,
70 pub initial_edge_capacity: u32,
74 pub declick_seconds: f32,
79 pub initial_event_group_capacity: u32,
83 pub channel_capacity: u32,
87 pub event_queue_capacity: usize,
94 pub immediate_event_capacity: usize,
100 #[cfg(feature = "scheduled_events")]
108 pub scheduled_event_capacity: usize,
109 pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
113
114 pub logger_config: RealtimeLoggerConfig,
116
117 pub debug_force_clear_buffers: bool,
127
128 pub proc_store_capacity: usize,
132}
133
134impl Default for FirewheelConfig {
135 fn default() -> Self {
136 Self {
137 num_graph_inputs: ChannelCount::ZERO,
138 num_graph_outputs: ChannelCount::STEREO,
139 hard_clip_outputs: false,
140 initial_node_capacity: 128,
141 initial_edge_capacity: 256,
142 declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
143 initial_event_group_capacity: 128,
144 channel_capacity: 64,
145 event_queue_capacity: 128,
146 immediate_event_capacity: 512,
147 #[cfg(feature = "scheduled_events")]
148 scheduled_event_capacity: 512,
149 buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
150 logger_config: RealtimeLoggerConfig::default(),
151 debug_force_clear_buffers: false,
152 proc_store_capacity: 8,
153 }
154 }
155}
156
157struct ActiveState<B: AudioBackend> {
158 backend_handle: B,
159 stream_info: StreamInfo,
160}
161
162pub struct FirewheelCtx<B: AudioBackend> {
164 graph: AudioGraph,
165
166 to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
167 from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
168 logger_rx: RealtimeLoggerMainThread,
169
170 active_state: Option<ActiveState<B>>,
171
172 processor_channel: Option<(
173 ringbuf::HeapCons<ContextToProcessorMsg>,
174 ringbuf::HeapProd<ProcessorToContextMsg>,
175 triple_buffer::Input<SharedClock<B::Instant>>,
176 RealtimeLogger,
177 ProcStore,
178 )>,
179 processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
180
181 shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
182 sample_rate: NonZeroU32,
183 sample_rate_recip: f64,
184
185 #[cfg(feature = "musical_transport")]
186 transport_state: Box<TransportState>,
187 #[cfg(feature = "musical_transport")]
188 transport_state_alloc_reuse: Option<Box<TransportState>>,
189
190 event_group_pool: Vec<Vec<NodeEvent>>,
192 event_group: Vec<NodeEvent>,
193 initial_event_group_capacity: usize,
194
195 #[cfg(feature = "scheduled_events")]
196 queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
197
198 config: FirewheelConfig,
199}
200
201impl<B: AudioBackend> FirewheelCtx<B> {
202 pub fn new(config: FirewheelConfig) -> Self {
204 let (to_processor_tx, from_context_rx) =
205 ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
206 let (to_context_tx, from_processor_rx) =
207 ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
208 .split();
209
210 let initial_event_group_capacity = config.initial_event_group_capacity as usize;
211 let mut event_group_pool = Vec::with_capacity(16);
212 for _ in 0..3 {
213 event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
214 }
215
216 let (shared_clock_input, shared_clock_output) =
217 triple_buffer::triple_buffer(&SharedClock::default());
218
219 let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
220
221 let proc_store = ProcStore::with_capacity(config.proc_store_capacity);
222
223 Self {
224 graph: AudioGraph::new(&config),
225 to_processor_tx,
226 from_processor_rx,
227 logger_rx,
228 active_state: None,
229 processor_channel: Some((
230 from_context_rx,
231 to_context_tx,
232 shared_clock_input,
233 logger,
234 proc_store,
235 )),
236 processor_drop_rx: None,
237 shared_clock_output: RefCell::new(shared_clock_output),
238 sample_rate: NonZeroU32::new(44100).unwrap(),
239 sample_rate_recip: 44100.0f64.recip(),
240 #[cfg(feature = "musical_transport")]
241 transport_state: Box::new(TransportState::default()),
242 #[cfg(feature = "musical_transport")]
243 transport_state_alloc_reuse: None,
244 event_group_pool,
245 event_group: Vec::with_capacity(initial_event_group_capacity),
246 initial_event_group_capacity,
247 #[cfg(feature = "scheduled_events")]
248 queued_clear_scheduled_events: Vec::new(),
249 config,
250 }
251 }
252
253 pub fn proc_store(&self) -> Option<&ProcStore> {
257 if let Some((_, _, _, _, proc_store)) = &self.processor_channel {
258 Some(proc_store)
259 } else if let Some(processor) = self.processor_drop_rx.as_ref().unwrap().last() {
260 if processor.poisoned {
261 panic!("The audio thread has panicked!");
262 }
263
264 Some(&processor.extra.store)
265 } else {
266 None
267 }
268 }
269
270 pub fn proc_store_mut(&mut self) -> Option<&mut ProcStore> {
274 if let Some((_, _, _, _, proc_store)) = &mut self.processor_channel {
275 Some(proc_store)
276 } else if let Some(processor) = self.processor_drop_rx.as_mut().unwrap().last_mut() {
277 if processor.poisoned {
278 panic!("The audio thread has panicked!");
279 }
280
281 Some(&mut processor.extra.store)
282 } else {
283 None
284 }
285 }
286
287 pub fn active_backend(&self) -> Option<&B> {
290 self.active_state
291 .as_ref()
292 .map(|state| &state.backend_handle)
293 }
294
295 pub fn active_backend_mut(&mut self) -> Option<&mut B> {
298 self.active_state
299 .as_mut()
300 .map(|state| &mut state.backend_handle)
301 }
302
303 pub fn device_enumerator(&self) -> B::Enumerator {
306 B::enumerator()
307 }
308
309 pub fn can_start_stream(&self) -> bool {
319 if self.is_audio_stream_running() {
320 false
321 } else if let Some(rx) = &self.processor_drop_rx {
322 rx.try_peek().is_some()
323 } else {
324 true
325 }
326 }
327
328 pub fn start_stream(
339 &mut self,
340 config: B::Config,
341 ) -> Result<(), StartStreamError<B::StartStreamError>> {
342 if self.is_audio_stream_running() {
343 return Err(StartStreamError::AlreadyStarted);
344 }
345
346 if !self.can_start_stream() {
347 return Err(StartStreamError::OldStreamNotFinishedStopping);
348 }
349
350 let (mut backend_handle, mut stream_info) =
351 B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
352
353 stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
354 stream_info.declick_frames = NonZeroU32::new(
355 (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
356 )
357 .unwrap_or(NonZeroU32::MIN);
358
359 let maybe_processor = self.processor_channel.take();
360
361 stream_info.prev_sample_rate = if maybe_processor.is_some() {
362 stream_info.sample_rate
363 } else {
364 self.sample_rate
365 };
366
367 self.sample_rate = stream_info.sample_rate;
368 self.sample_rate_recip = stream_info.sample_rate_recip;
369
370 let schedule = self.graph.compile(&stream_info)?;
371
372 let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
373
374 let processor =
375 if let Some((from_context_rx, to_context_tx, shared_clock_input, logger, proc_store)) =
376 maybe_processor
377 {
378 FirewheelProcessorInner::new(
379 from_context_rx,
380 to_context_tx,
381 shared_clock_input,
382 self.config.immediate_event_capacity,
383 #[cfg(feature = "scheduled_events")]
384 self.config.scheduled_event_capacity,
385 self.config.event_queue_capacity,
386 &stream_info,
387 self.config.hard_clip_outputs,
388 self.config.buffer_out_of_space_mode,
389 logger,
390 self.config.debug_force_clear_buffers,
391 proc_store,
392 )
393 } else {
394 let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
395
396 if processor.poisoned {
397 panic!("The audio thread has panicked!");
398 }
399
400 processor.new_stream(&stream_info);
401
402 processor
403 };
404
405 backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
406
407 if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
408 {
409 panic!("Firewheel message channel is full!");
410 }
411
412 self.active_state = Some(ActiveState {
413 backend_handle,
414 stream_info,
415 });
416 self.processor_drop_rx = Some(drop_rx);
417
418 Ok(())
419 }
420
421 pub fn stop_stream(&mut self) {
423 self.active_state = None;
426 self.graph.deactivate();
427 }
428
429 pub fn is_audio_stream_running(&self) -> bool {
431 self.active_state.is_some()
432 }
433
434 pub fn stream_info(&self) -> Option<&StreamInfo> {
438 self.active_state.as_ref().map(|s| &s.stream_info)
439 }
440
441 pub fn audio_clock(&self) -> AudioClock {
456 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
462 let clock = clock_borrowed.read();
463
464 let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
465 .map(|(update_instant, _delay)| update_instant);
466
467 AudioClock {
468 samples: clock.clock_samples,
469 seconds: clock
470 .clock_samples
471 .to_seconds(self.sample_rate, self.sample_rate_recip),
472 #[cfg(feature = "musical_transport")]
473 musical: clock.current_playhead,
474 #[cfg(feature = "musical_transport")]
475 transport_is_playing: clock.transport_is_playing,
476 update_instant,
477 }
478 }
479
480 pub fn audio_clock_corrected(&self) -> AudioClock {
499 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
505 let clock = clock_borrowed.read();
506
507 let Some((update_instant, delay)) =
508 audio_clock_update_instant_and_delay(&clock, &self.active_state)
509 else {
510 return AudioClock {
513 samples: clock.clock_samples,
514 seconds: clock
515 .clock_samples
516 .to_seconds(self.sample_rate, self.sample_rate_recip),
517 #[cfg(feature = "musical_transport")]
518 musical: clock.current_playhead,
519 #[cfg(feature = "musical_transport")]
520 transport_is_playing: clock.transport_is_playing,
521 update_instant: None,
522 };
523 };
524
525 let delta_seconds = DurationSeconds(delay.as_secs_f64());
527
528 let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
529
530 #[cfg(feature = "musical_transport")]
531 let musical = clock.current_playhead.map(|musical_time| {
532 if clock.transport_is_playing && self.transport_state.transport.is_some() {
533 self.transport_state
534 .transport
535 .as_ref()
536 .unwrap()
537 .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
538 } else {
539 musical_time
540 }
541 });
542
543 AudioClock {
544 samples,
545 seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
546 #[cfg(feature = "musical_transport")]
547 musical,
548 #[cfg(feature = "musical_transport")]
549 transport_is_playing: clock.transport_is_playing,
550 update_instant: Some(update_instant),
551 }
552 }
553
554 pub fn audio_clock_instant(&self) -> Option<Instant> {
566 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
572 let clock = clock_borrowed.read();
573
574 audio_clock_update_instant_and_delay(&clock, &self.active_state)
575 .map(|(update_instant, _delay)| update_instant)
576 }
577
578 #[cfg(feature = "musical_transport")]
582 pub fn sync_transport(
583 &mut self,
584 transport: &TransportState,
585 ) -> Result<(), UpdateError<B::StreamError>> {
586 if &*self.transport_state != transport {
587 let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
588 *t = transport.clone();
589 t
590 } else {
591 Box::new(transport.clone())
592 };
593
594 self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
595 .map_err(|(_, e)| e)?;
596
597 *self.transport_state = transport.clone();
598 }
599
600 Ok(())
601 }
602
603 #[cfg(feature = "musical_transport")]
605 pub fn transport_state(&self) -> &TransportState {
606 &self.transport_state
607 }
608
609 #[cfg(feature = "musical_transport")]
611 pub fn transport(&self) -> &TransportState {
612 &self.transport_state
613 }
614
615 pub fn hard_clip_outputs(&self) -> bool {
617 self.config.hard_clip_outputs
618 }
619
620 pub fn set_hard_clip_outputs(
629 &mut self,
630 hard_clip_outputs: bool,
631 ) -> Result<(), UpdateError<B::StreamError>> {
632 if self.config.hard_clip_outputs == hard_clip_outputs {
633 return Ok(());
634 }
635 self.config.hard_clip_outputs = hard_clip_outputs;
636
637 self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
638 .map_err(|(_, e)| e)
639 }
640
641 pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
645 self.logger_rx.flush(
646 |msg| {
647 #[cfg(feature = "tracing")]
648 tracing::error!("{}", msg);
649
650 #[cfg(all(feature = "log", not(feature = "tracing")))]
651 log::error!("{}", msg);
652
653 let _ = msg;
654 },
655 #[cfg(debug_assertions)]
656 |msg| {
657 #[cfg(feature = "tracing")]
658 tracing::debug!("{}", msg);
659
660 #[cfg(all(feature = "log", not(feature = "tracing")))]
661 log::debug!("{}", msg);
662
663 let _ = msg;
664 },
665 );
666
667 firewheel_core::collector::GlobalRtGc::collect();
668
669 for msg in self.from_processor_rx.pop_iter() {
670 match msg {
671 ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
672 event_group.clear();
673 self.event_group_pool.push(event_group);
674 }
675 ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
676 let _ = schedule_data;
677 }
678 #[cfg(feature = "musical_transport")]
679 ProcessorToContextMsg::ReturnTransportState(transport_state) => {
680 if self.transport_state_alloc_reuse.is_none() {
681 self.transport_state_alloc_reuse = Some(transport_state);
682 }
683 }
684 #[cfg(feature = "scheduled_events")]
685 ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
686 let _ = msgs;
687 }
688 }
689 }
690
691 self.graph.update(
692 self.active_state.as_ref().map(|s| &s.stream_info),
693 &mut self.event_group,
694 );
695
696 if let Some(active_state) = &mut self.active_state {
697 if let Err(e) = active_state.backend_handle.poll_status() {
698 self.active_state = None;
699 self.graph.deactivate();
700
701 return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
702 }
703
704 if self
705 .processor_drop_rx
706 .as_ref()
707 .unwrap()
708 .try_peek()
709 .is_some()
710 {
711 self.active_state = None;
712 self.graph.deactivate();
713
714 return Err(UpdateError::StreamStoppedUnexpectedly(None));
715 }
716 }
717
718 if self.is_audio_stream_running() {
719 if self.graph.needs_compile() {
720 let schedule_data = self
721 .graph
722 .compile(&self.active_state.as_ref().unwrap().stream_info)?;
723
724 if let Err((msg, e)) = self
725 .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
726 {
727 let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
728 unreachable!();
729 };
730
731 self.graph.on_schedule_send_failed(schedule);
732
733 return Err(e);
734 }
735 }
736
737 #[cfg(feature = "scheduled_events")]
738 if !self.queued_clear_scheduled_events.is_empty() {
739 let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
740 self.queued_clear_scheduled_events.drain(..).collect();
741
742 if let Err((msg, e)) = self
743 .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
744 {
745 let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
746 unreachable!();
747 };
748
749 self.queued_clear_scheduled_events = msgs.drain(..).collect();
750
751 return Err(e);
752 }
753 }
754
755 if !self.event_group.is_empty() {
756 let mut next_event_group = self
757 .event_group_pool
758 .pop()
759 .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
760 core::mem::swap(&mut next_event_group, &mut self.event_group);
761
762 if let Err((msg, e)) = self
763 .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
764 {
765 let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
766 unreachable!();
767 };
768
769 core::mem::swap(&mut event_group, &mut self.event_group);
770 self.event_group_pool.push(event_group);
771
772 return Err(e);
773 }
774 }
775 }
776
777 Ok(())
778 }
779
780 pub fn graph_in_node_id(&self) -> NodeID {
782 self.graph.graph_in_node()
783 }
784
785 pub fn graph_out_node_id(&self) -> NodeID {
787 self.graph.graph_out_node()
788 }
789
790 pub fn add_node<T: AudioNode + 'static>(
792 &mut self,
793 node: T,
794 config: Option<T::Configuration>,
795 ) -> NodeID {
796 self.graph.add_node(node, config)
797 }
798
799 pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
801 self.graph.add_dyn_node(node)
802 }
803
804 pub fn remove_node(
815 &mut self,
816 node_id: NodeID,
817 ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
818 self.graph.remove_node(node_id)
819 }
820
821 pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
823 self.graph.node_info(id)
824 }
825
826 pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
828 self.graph.node_state(id)
829 }
830
831 pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
833 self.graph.node_state_dyn(id)
834 }
835
836 pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
838 self.graph.node_state_mut(id)
839 }
840
841 pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
842 self.graph.node_state_dyn_mut(id)
843 }
844
845 pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
847 self.graph.nodes()
848 }
849
850 pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
852 self.graph.edges()
853 }
854
855 pub fn set_graph_channel_config(
859 &mut self,
860 channel_config: ChannelConfig,
861 ) -> SmallVec<[EdgeID; 4]> {
862 self.graph.set_graph_channel_config(channel_config)
863 }
864
865 pub fn connect(
883 &mut self,
884 src_node: NodeID,
885 dst_node: NodeID,
886 ports_src_dst: &[(PortIdx, PortIdx)],
887 check_for_cycles: bool,
888 ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
889 self.graph
890 .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
891 }
892
893 pub fn disconnect(
904 &mut self,
905 src_node: NodeID,
906 dst_node: NodeID,
907 ports_src_dst: &[(PortIdx, PortIdx)],
908 ) -> bool {
909 self.graph.disconnect(src_node, dst_node, ports_src_dst)
910 }
911
912 pub fn disconnect_all_between(
917 &mut self,
918 src_node: NodeID,
919 dst_node: NodeID,
920 ) -> SmallVec<[EdgeID; 4]> {
921 self.graph.disconnect_all_between(src_node, dst_node)
922 }
923
924 pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
928 self.graph.disconnect_by_edge_id(edge_id)
929 }
930
931 pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
933 self.graph.edge(edge_id)
934 }
935
936 pub fn cycle_detected(&mut self) -> bool {
940 self.graph.cycle_detected()
941 }
942
943 pub fn queue_event(&mut self, event: NodeEvent) {
948 self.event_group.push(event);
949 }
950
951 pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
956 self.queue_event(NodeEvent {
957 node_id,
958 #[cfg(feature = "scheduled_events")]
959 time: None,
960 event,
961 });
962 }
963
964 #[cfg(feature = "scheduled_events")]
972 pub fn schedule_event_for(
973 &mut self,
974 node_id: NodeID,
975 event: NodeEventType,
976 time: Option<EventInstant>,
977 ) {
978 self.queue_event(NodeEvent {
979 node_id,
980 time,
981 event,
982 });
983 }
984
985 #[cfg(feature = "scheduled_events")]
993 pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
994 self.queued_clear_scheduled_events
995 .push(ClearScheduledEventsEvent {
996 node_id: None,
997 event_type,
998 });
999 }
1000
1001 #[cfg(feature = "scheduled_events")]
1009 pub fn cancel_scheduled_events_for(
1010 &mut self,
1011 node_id: NodeID,
1012 event_type: ClearScheduledEventsType,
1013 ) {
1014 self.queued_clear_scheduled_events
1015 .push(ClearScheduledEventsEvent {
1016 node_id: Some(node_id),
1017 event_type,
1018 });
1019 }
1020
1021 fn send_message_to_processor(
1022 &mut self,
1023 msg: ContextToProcessorMsg,
1024 ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
1025 self.to_processor_tx
1026 .try_push(msg)
1027 .map_err(|msg| (msg, UpdateError::MsgChannelFull))
1028 }
1029}
1030
1031impl<B: AudioBackend> Drop for FirewheelCtx<B> {
1032 fn drop(&mut self) {
1033 self.stop_stream();
1034
1035 #[cfg(not(target_family = "wasm"))]
1038 if let Some(drop_rx) = self.processor_drop_rx.take() {
1039 let now = bevy_platform::time::Instant::now();
1040
1041 while drop_rx.try_peek().is_none() {
1042 if now.elapsed() > core::time::Duration::from_secs(2) {
1043 break;
1044 }
1045
1046 bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
1047 }
1048 }
1049
1050 firewheel_core::collector::GlobalRtGc::collect();
1051 }
1052}
1053
1054impl<B: AudioBackend> FirewheelCtx<B> {
1055 pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
1057 ContextQueue {
1058 context: self,
1059 id,
1060 #[cfg(feature = "scheduled_events")]
1061 time: None,
1062 }
1063 }
1064
1065 #[cfg(feature = "scheduled_events")]
1066 pub fn event_queue_scheduled(
1067 &mut self,
1068 id: NodeID,
1069 time: Option<EventInstant>,
1070 ) -> ContextQueue<'_, B> {
1071 ContextQueue {
1072 context: self,
1073 id,
1074 time,
1075 }
1076 }
1077}
1078
1079pub struct ContextQueue<'a, B: AudioBackend> {
1100 context: &'a mut FirewheelCtx<B>,
1101 id: NodeID,
1102 #[cfg(feature = "scheduled_events")]
1103 time: Option<EventInstant>,
1104}
1105
1106#[cfg(feature = "scheduled_events")]
1107impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1108 pub fn time(&self) -> Option<EventInstant> {
1109 self.time
1110 }
1111}
1112
1113impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1114 fn push(&mut self, data: NodeEventType) {
1115 self.context.queue_event(NodeEvent {
1116 event: data,
1117 #[cfg(feature = "scheduled_events")]
1118 time: self.time,
1119 node_id: self.id,
1120 });
1121 }
1122}
1123
1124#[cfg(feature = "scheduled_events")]
1126#[derive(Default, Debug, Clone, Copy, PartialEq)]
1127pub enum ClearScheduledEventsType {
1128 #[default]
1130 All,
1131 NonMusicalOnly,
1133 MusicalOnly,
1135}
1136
1137fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1138 clock: &SharedClock<B::Instant>,
1139 active_state: &Option<ActiveState<B>>,
1140) -> Option<(Instant, Duration)> {
1141 active_state.as_ref().and_then(|active_state| {
1142 clock
1143 .process_timestamp
1144 .clone()
1145 .and_then(|process_timestamp| {
1146 active_state
1147 .backend_handle
1148 .delay_from_last_process(process_timestamp)
1149 .and_then(|delay| {
1150 Instant::now()
1151 .checked_sub(delay)
1152 .map(|instant| (instant, delay))
1153 })
1154 })
1155 })
1156}