1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::node::ProcStore;
9use firewheel_core::{
10 channel_config::{ChannelConfig, ChannelCount},
11 clock::AudioClock,
12 dsp::declick::DeclickValues,
13 event::{NodeEvent, NodeEventType},
14 node::{AudioNode, DynAudioNode, NodeID},
15 StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(not(feature = "std"))]
21use num_traits::Float;
22
23#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
24use bevy_platform::prelude::Box;
25#[cfg(not(feature = "std"))]
26use bevy_platform::prelude::Vec;
27
28use crate::error::RemoveNodeError;
29use crate::processor::BufferOutOfSpaceMode;
30use crate::{
31 backend::{AudioBackend, DeviceInfo},
32 error::{AddEdgeError, StartStreamError, UpdateError},
33 graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
34 processor::{
35 ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
36 SharedClock,
37 },
38};
39
40#[cfg(feature = "scheduled_events")]
41use crate::processor::ClearScheduledEventsEvent;
42#[cfg(feature = "scheduled_events")]
43use firewheel_core::clock::EventInstant;
44
45#[cfg(feature = "musical_transport")]
46use firewheel_core::clock::TransportState;
47
48#[derive(Debug, Clone, Copy, PartialEq)]
50#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub struct FirewheelConfig {
53 pub num_graph_inputs: ChannelCount,
55 pub num_graph_outputs: ChannelCount,
57 pub hard_clip_outputs: bool,
66 pub initial_node_capacity: u32,
70 pub initial_edge_capacity: u32,
74 pub declick_seconds: f32,
79 pub initial_event_group_capacity: u32,
83 pub channel_capacity: u32,
87 pub event_queue_capacity: usize,
94 pub immediate_event_capacity: usize,
100 #[cfg(feature = "scheduled_events")]
108 pub scheduled_event_capacity: usize,
109 pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
113
114 pub logger_config: RealtimeLoggerConfig,
116
117 pub debug_force_clear_buffers: bool,
127
128 pub proc_store_capacity: usize,
132}
133
134impl Default for FirewheelConfig {
135 fn default() -> Self {
136 Self {
137 num_graph_inputs: ChannelCount::ZERO,
138 num_graph_outputs: ChannelCount::STEREO,
139 hard_clip_outputs: false,
140 initial_node_capacity: 128,
141 initial_edge_capacity: 256,
142 declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
143 initial_event_group_capacity: 128,
144 channel_capacity: 64,
145 event_queue_capacity: 128,
146 immediate_event_capacity: 512,
147 #[cfg(feature = "scheduled_events")]
148 scheduled_event_capacity: 512,
149 buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
150 logger_config: RealtimeLoggerConfig::default(),
151 debug_force_clear_buffers: false,
152 proc_store_capacity: 8,
153 }
154 }
155}
156
157struct ActiveState<B: AudioBackend> {
158 backend_handle: B,
159 stream_info: StreamInfo,
160}
161
162pub struct FirewheelCtx<B: AudioBackend> {
164 graph: AudioGraph,
165
166 to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
167 from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
168 logger_rx: RealtimeLoggerMainThread,
169
170 active_state: Option<ActiveState<B>>,
171
172 processor_channel: Option<(
173 ringbuf::HeapCons<ContextToProcessorMsg>,
174 ringbuf::HeapProd<ProcessorToContextMsg>,
175 triple_buffer::Input<SharedClock<B::Instant>>,
176 RealtimeLogger,
177 ProcStore,
178 )>,
179 processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
180
181 shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
182 sample_rate: NonZeroU32,
183 sample_rate_recip: f64,
184
185 #[cfg(feature = "musical_transport")]
186 transport_state: Box<TransportState>,
187 #[cfg(feature = "musical_transport")]
188 transport_state_alloc_reuse: Option<Box<TransportState>>,
189
190 event_group_pool: Vec<Vec<NodeEvent>>,
192 event_group: Vec<NodeEvent>,
193 initial_event_group_capacity: usize,
194
195 #[cfg(feature = "scheduled_events")]
196 queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
197
198 config: FirewheelConfig,
199}
200
201impl<B: AudioBackend> FirewheelCtx<B> {
202 pub fn new(config: FirewheelConfig) -> Self {
204 let (to_processor_tx, from_context_rx) =
205 ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
206 let (to_context_tx, from_processor_rx) =
207 ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
208 .split();
209
210 let initial_event_group_capacity = config.initial_event_group_capacity as usize;
211 let mut event_group_pool = Vec::with_capacity(16);
212 for _ in 0..3 {
213 event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
214 }
215
216 let (shared_clock_input, shared_clock_output) =
217 triple_buffer::triple_buffer(&SharedClock::default());
218
219 let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
220
221 let proc_store = ProcStore::with_capacity(config.proc_store_capacity);
222
223 Self {
224 graph: AudioGraph::new(&config),
225 to_processor_tx,
226 from_processor_rx,
227 logger_rx,
228 active_state: None,
229 processor_channel: Some((
230 from_context_rx,
231 to_context_tx,
232 shared_clock_input,
233 logger,
234 proc_store,
235 )),
236 processor_drop_rx: None,
237 shared_clock_output: RefCell::new(shared_clock_output),
238 sample_rate: NonZeroU32::new(44100).unwrap(),
239 sample_rate_recip: 44100.0f64.recip(),
240 #[cfg(feature = "musical_transport")]
241 transport_state: Box::new(TransportState::default()),
242 #[cfg(feature = "musical_transport")]
243 transport_state_alloc_reuse: None,
244 event_group_pool,
245 event_group: Vec::with_capacity(initial_event_group_capacity),
246 initial_event_group_capacity,
247 #[cfg(feature = "scheduled_events")]
248 queued_clear_scheduled_events: Vec::new(),
249 config,
250 }
251 }
252
253 pub fn proc_store(&self) -> Option<&ProcStore> {
257 if let Some((_, _, _, _, proc_store)) = &self.processor_channel {
258 Some(proc_store)
259 } else if let Some(processor) = self.processor_drop_rx.as_ref().unwrap().last() {
260 if processor.poisoned {
261 panic!("The audio thread has panicked!");
262 }
263
264 Some(&processor.extra.store)
265 } else {
266 None
267 }
268 }
269
270 pub fn proc_store_mut(&mut self) -> Option<&mut ProcStore> {
274 if let Some((_, _, _, _, proc_store)) = &mut self.processor_channel {
275 Some(proc_store)
276 } else if let Some(processor) = self.processor_drop_rx.as_mut().unwrap().last_mut() {
277 if processor.poisoned {
278 panic!("The audio thread has panicked!");
279 }
280
281 Some(&mut processor.extra.store)
282 } else {
283 None
284 }
285 }
286
287 pub fn active_backend(&self) -> Option<&B> {
290 self.active_state
291 .as_ref()
292 .map(|state| &state.backend_handle)
293 }
294
295 pub fn active_backend_mut(&mut self) -> Option<&mut B> {
298 self.active_state
299 .as_mut()
300 .map(|state| &mut state.backend_handle)
301 }
302
303 pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
305 B::available_input_devices()
306 }
307
308 pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
310 B::available_output_devices()
311 }
312
313 pub fn can_start_stream(&self) -> bool {
323 if self.is_audio_stream_running() {
324 false
325 } else if let Some(rx) = &self.processor_drop_rx {
326 rx.try_peek().is_some()
327 } else {
328 true
329 }
330 }
331
332 pub fn start_stream(
343 &mut self,
344 config: B::Config,
345 ) -> Result<(), StartStreamError<B::StartStreamError>> {
346 if self.is_audio_stream_running() {
347 return Err(StartStreamError::AlreadyStarted);
348 }
349
350 if !self.can_start_stream() {
351 return Err(StartStreamError::OldStreamNotFinishedStopping);
352 }
353
354 let (mut backend_handle, mut stream_info) =
355 B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
356
357 stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
358 stream_info.declick_frames = NonZeroU32::new(
359 (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
360 )
361 .unwrap_or(NonZeroU32::MIN);
362
363 let maybe_processor = self.processor_channel.take();
364
365 stream_info.prev_sample_rate = if maybe_processor.is_some() {
366 stream_info.sample_rate
367 } else {
368 self.sample_rate
369 };
370
371 self.sample_rate = stream_info.sample_rate;
372 self.sample_rate_recip = stream_info.sample_rate_recip;
373
374 let schedule = self.graph.compile(&stream_info)?;
375
376 let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
377
378 let processor =
379 if let Some((from_context_rx, to_context_tx, shared_clock_input, logger, proc_store)) =
380 maybe_processor
381 {
382 FirewheelProcessorInner::new(
383 from_context_rx,
384 to_context_tx,
385 shared_clock_input,
386 self.config.immediate_event_capacity,
387 #[cfg(feature = "scheduled_events")]
388 self.config.scheduled_event_capacity,
389 self.config.event_queue_capacity,
390 &stream_info,
391 self.config.hard_clip_outputs,
392 self.config.buffer_out_of_space_mode,
393 logger,
394 self.config.debug_force_clear_buffers,
395 proc_store,
396 )
397 } else {
398 let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
399
400 if processor.poisoned {
401 panic!("The audio thread has panicked!");
402 }
403
404 processor.new_stream(&stream_info);
405
406 processor
407 };
408
409 backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
410
411 if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
412 {
413 panic!("Firewheel message channel is full!");
414 }
415
416 self.active_state = Some(ActiveState {
417 backend_handle,
418 stream_info,
419 });
420 self.processor_drop_rx = Some(drop_rx);
421
422 Ok(())
423 }
424
425 pub fn stop_stream(&mut self) {
427 self.active_state = None;
430 self.graph.deactivate();
431 }
432
433 pub fn is_audio_stream_running(&self) -> bool {
435 self.active_state.is_some()
436 }
437
438 pub fn stream_info(&self) -> Option<&StreamInfo> {
442 self.active_state.as_ref().map(|s| &s.stream_info)
443 }
444
445 pub fn audio_clock(&self) -> AudioClock {
460 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
466 let clock = clock_borrowed.read();
467
468 let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
469 .map(|(update_instant, _delay)| update_instant);
470
471 AudioClock {
472 samples: clock.clock_samples,
473 seconds: clock
474 .clock_samples
475 .to_seconds(self.sample_rate, self.sample_rate_recip),
476 #[cfg(feature = "musical_transport")]
477 musical: clock.current_playhead,
478 #[cfg(feature = "musical_transport")]
479 transport_is_playing: clock.transport_is_playing,
480 update_instant,
481 }
482 }
483
484 pub fn audio_clock_corrected(&self) -> AudioClock {
503 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
509 let clock = clock_borrowed.read();
510
511 let Some((update_instant, delay)) =
512 audio_clock_update_instant_and_delay(&clock, &self.active_state)
513 else {
514 return AudioClock {
517 samples: clock.clock_samples,
518 seconds: clock
519 .clock_samples
520 .to_seconds(self.sample_rate, self.sample_rate_recip),
521 #[cfg(feature = "musical_transport")]
522 musical: clock.current_playhead,
523 #[cfg(feature = "musical_transport")]
524 transport_is_playing: clock.transport_is_playing,
525 update_instant: None,
526 };
527 };
528
529 let delta_seconds = DurationSeconds(delay.as_secs_f64());
531
532 let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
533
534 #[cfg(feature = "musical_transport")]
535 let musical = clock.current_playhead.map(|musical_time| {
536 if clock.transport_is_playing && self.transport_state.transport.is_some() {
537 self.transport_state
538 .transport
539 .as_ref()
540 .unwrap()
541 .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
542 } else {
543 musical_time
544 }
545 });
546
547 AudioClock {
548 samples,
549 seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
550 #[cfg(feature = "musical_transport")]
551 musical,
552 #[cfg(feature = "musical_transport")]
553 transport_is_playing: clock.transport_is_playing,
554 update_instant: Some(update_instant),
555 }
556 }
557
558 pub fn audio_clock_instant(&self) -> Option<Instant> {
570 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
576 let clock = clock_borrowed.read();
577
578 audio_clock_update_instant_and_delay(&clock, &self.active_state)
579 .map(|(update_instant, _delay)| update_instant)
580 }
581
582 #[cfg(feature = "musical_transport")]
586 pub fn sync_transport(
587 &mut self,
588 transport: &TransportState,
589 ) -> Result<(), UpdateError<B::StreamError>> {
590 if &*self.transport_state != transport {
591 let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
592 *t = transport.clone();
593 t
594 } else {
595 Box::new(transport.clone())
596 };
597
598 self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
599 .map_err(|(_, e)| e)?;
600
601 *self.transport_state = transport.clone();
602 }
603
604 Ok(())
605 }
606
607 #[cfg(feature = "musical_transport")]
609 pub fn transport_state(&self) -> &TransportState {
610 &self.transport_state
611 }
612
613 #[cfg(feature = "musical_transport")]
615 pub fn transport(&self) -> &TransportState {
616 &self.transport_state
617 }
618
619 pub fn hard_clip_outputs(&self) -> bool {
621 self.config.hard_clip_outputs
622 }
623
624 pub fn set_hard_clip_outputs(
633 &mut self,
634 hard_clip_outputs: bool,
635 ) -> Result<(), UpdateError<B::StreamError>> {
636 if self.config.hard_clip_outputs == hard_clip_outputs {
637 return Ok(());
638 }
639 self.config.hard_clip_outputs = hard_clip_outputs;
640
641 self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
642 .map_err(|(_, e)| e)
643 }
644
645 pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
649 self.logger_rx.flush();
650
651 firewheel_core::collector::GlobalRtGc::collect();
652
653 for msg in self.from_processor_rx.pop_iter() {
654 match msg {
655 ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
656 event_group.clear();
657 self.event_group_pool.push(event_group);
658 }
659 ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
660 let _ = schedule_data;
661 }
662 #[cfg(feature = "musical_transport")]
663 ProcessorToContextMsg::ReturnTransportState(transport_state) => {
664 if self.transport_state_alloc_reuse.is_none() {
665 self.transport_state_alloc_reuse = Some(transport_state);
666 }
667 }
668 #[cfg(feature = "scheduled_events")]
669 ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
670 let _ = msgs;
671 }
672 }
673 }
674
675 self.graph.update(
676 self.active_state.as_ref().map(|s| &s.stream_info),
677 &mut self.event_group,
678 );
679
680 if let Some(active_state) = &mut self.active_state {
681 if let Err(e) = active_state.backend_handle.poll_status() {
682 self.active_state = None;
683 self.graph.deactivate();
684
685 return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
686 }
687
688 if self
689 .processor_drop_rx
690 .as_ref()
691 .unwrap()
692 .try_peek()
693 .is_some()
694 {
695 self.active_state = None;
696 self.graph.deactivate();
697
698 return Err(UpdateError::StreamStoppedUnexpectedly(None));
699 }
700 }
701
702 if self.is_audio_stream_running() {
703 if self.graph.needs_compile() {
704 let schedule_data = self
705 .graph
706 .compile(&self.active_state.as_ref().unwrap().stream_info)?;
707
708 if let Err((msg, e)) = self
709 .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
710 {
711 let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
712 unreachable!();
713 };
714
715 self.graph.on_schedule_send_failed(schedule);
716
717 return Err(e);
718 }
719 }
720
721 #[cfg(feature = "scheduled_events")]
722 if !self.queued_clear_scheduled_events.is_empty() {
723 let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
724 self.queued_clear_scheduled_events.drain(..).collect();
725
726 if let Err((msg, e)) = self
727 .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
728 {
729 let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
730 unreachable!();
731 };
732
733 self.queued_clear_scheduled_events = msgs.drain(..).collect();
734
735 return Err(e);
736 }
737 }
738
739 if !self.event_group.is_empty() {
740 let mut next_event_group = self
741 .event_group_pool
742 .pop()
743 .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
744 core::mem::swap(&mut next_event_group, &mut self.event_group);
745
746 if let Err((msg, e)) = self
747 .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
748 {
749 let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
750 unreachable!();
751 };
752
753 core::mem::swap(&mut event_group, &mut self.event_group);
754 self.event_group_pool.push(event_group);
755
756 return Err(e);
757 }
758 }
759 }
760
761 Ok(())
762 }
763
764 pub fn graph_in_node_id(&self) -> NodeID {
766 self.graph.graph_in_node()
767 }
768
769 pub fn graph_out_node_id(&self) -> NodeID {
771 self.graph.graph_out_node()
772 }
773
774 pub fn add_node<T: AudioNode + 'static>(
776 &mut self,
777 node: T,
778 config: Option<T::Configuration>,
779 ) -> NodeID {
780 self.graph.add_node(node, config)
781 }
782
783 pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
785 self.graph.add_dyn_node(node)
786 }
787
788 pub fn remove_node(
799 &mut self,
800 node_id: NodeID,
801 ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
802 self.graph.remove_node(node_id)
803 }
804
805 pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
807 self.graph.node_info(id)
808 }
809
810 pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
812 self.graph.node_state(id)
813 }
814
815 pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
817 self.graph.node_state_dyn(id)
818 }
819
820 pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
822 self.graph.node_state_mut(id)
823 }
824
825 pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
826 self.graph.node_state_dyn_mut(id)
827 }
828
829 pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
831 self.graph.nodes()
832 }
833
834 pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
836 self.graph.edges()
837 }
838
839 pub fn set_graph_channel_config(
843 &mut self,
844 channel_config: ChannelConfig,
845 ) -> SmallVec<[EdgeID; 4]> {
846 self.graph.set_graph_channel_config(channel_config)
847 }
848
849 pub fn connect(
867 &mut self,
868 src_node: NodeID,
869 dst_node: NodeID,
870 ports_src_dst: &[(PortIdx, PortIdx)],
871 check_for_cycles: bool,
872 ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
873 self.graph
874 .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
875 }
876
877 pub fn disconnect(
888 &mut self,
889 src_node: NodeID,
890 dst_node: NodeID,
891 ports_src_dst: &[(PortIdx, PortIdx)],
892 ) -> bool {
893 self.graph.disconnect(src_node, dst_node, ports_src_dst)
894 }
895
896 pub fn disconnect_all_between(
901 &mut self,
902 src_node: NodeID,
903 dst_node: NodeID,
904 ) -> SmallVec<[EdgeID; 4]> {
905 self.graph.disconnect_all_between(src_node, dst_node)
906 }
907
908 pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
912 self.graph.disconnect_by_edge_id(edge_id)
913 }
914
915 pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
917 self.graph.edge(edge_id)
918 }
919
920 pub fn cycle_detected(&mut self) -> bool {
924 self.graph.cycle_detected()
925 }
926
927 pub fn queue_event(&mut self, event: NodeEvent) {
932 self.event_group.push(event);
933 }
934
935 pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
940 self.queue_event(NodeEvent {
941 node_id,
942 #[cfg(feature = "scheduled_events")]
943 time: None,
944 event,
945 });
946 }
947
948 #[cfg(feature = "scheduled_events")]
956 pub fn schedule_event_for(
957 &mut self,
958 node_id: NodeID,
959 event: NodeEventType,
960 time: Option<EventInstant>,
961 ) {
962 self.queue_event(NodeEvent {
963 node_id,
964 time,
965 event,
966 });
967 }
968
969 #[cfg(feature = "scheduled_events")]
977 pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
978 self.queued_clear_scheduled_events
979 .push(ClearScheduledEventsEvent {
980 node_id: None,
981 event_type,
982 });
983 }
984
985 #[cfg(feature = "scheduled_events")]
993 pub fn cancel_scheduled_events_for(
994 &mut self,
995 node_id: NodeID,
996 event_type: ClearScheduledEventsType,
997 ) {
998 self.queued_clear_scheduled_events
999 .push(ClearScheduledEventsEvent {
1000 node_id: Some(node_id),
1001 event_type,
1002 });
1003 }
1004
1005 fn send_message_to_processor(
1006 &mut self,
1007 msg: ContextToProcessorMsg,
1008 ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
1009 self.to_processor_tx
1010 .try_push(msg)
1011 .map_err(|msg| (msg, UpdateError::MsgChannelFull))
1012 }
1013}
1014
1015impl<B: AudioBackend> Drop for FirewheelCtx<B> {
1016 fn drop(&mut self) {
1017 self.stop_stream();
1018
1019 #[cfg(not(target_family = "wasm"))]
1022 if let Some(drop_rx) = self.processor_drop_rx.take() {
1023 let now = bevy_platform::time::Instant::now();
1024
1025 while drop_rx.try_peek().is_none() {
1026 if now.elapsed() > core::time::Duration::from_secs(2) {
1027 break;
1028 }
1029
1030 bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
1031 }
1032 }
1033
1034 firewheel_core::collector::GlobalRtGc::collect();
1035 }
1036}
1037
1038impl<B: AudioBackend> FirewheelCtx<B> {
1039 pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
1041 ContextQueue {
1042 context: self,
1043 id,
1044 #[cfg(feature = "scheduled_events")]
1045 time: None,
1046 }
1047 }
1048
1049 #[cfg(feature = "scheduled_events")]
1050 pub fn event_queue_scheduled(
1051 &mut self,
1052 id: NodeID,
1053 time: Option<EventInstant>,
1054 ) -> ContextQueue<'_, B> {
1055 ContextQueue {
1056 context: self,
1057 id,
1058 time,
1059 }
1060 }
1061}
1062
1063pub struct ContextQueue<'a, B: AudioBackend> {
1084 context: &'a mut FirewheelCtx<B>,
1085 id: NodeID,
1086 #[cfg(feature = "scheduled_events")]
1087 time: Option<EventInstant>,
1088}
1089
1090#[cfg(feature = "scheduled_events")]
1091impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1092 pub fn time(&self) -> Option<EventInstant> {
1093 self.time
1094 }
1095}
1096
1097impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1098 fn push(&mut self, data: NodeEventType) {
1099 self.context.queue_event(NodeEvent {
1100 event: data,
1101 #[cfg(feature = "scheduled_events")]
1102 time: self.time,
1103 node_id: self.id,
1104 });
1105 }
1106}
1107
1108#[cfg(feature = "scheduled_events")]
1110#[derive(Default, Debug, Clone, Copy, PartialEq)]
1111pub enum ClearScheduledEventsType {
1112 #[default]
1114 All,
1115 NonMusicalOnly,
1117 MusicalOnly,
1119}
1120
1121fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1122 clock: &SharedClock<B::Instant>,
1123 active_state: &Option<ActiveState<B>>,
1124) -> Option<(Instant, Duration)> {
1125 active_state.as_ref().and_then(|active_state| {
1126 clock
1127 .process_timestamp
1128 .clone()
1129 .and_then(|process_timestamp| {
1130 active_state
1131 .backend_handle
1132 .delay_from_last_process(process_timestamp)
1133 .and_then(|delay| {
1134 Instant::now()
1135 .checked_sub(delay)
1136 .map(|instant| (instant, delay))
1137 })
1138 })
1139 })
1140}