1use bevy_platform::time::Instant;
2use core::cell::RefCell;
3use core::num::NonZeroU32;
4use core::time::Duration;
5use core::{any::Any, f64};
6use firewheel_core::clock::DurationSeconds;
7use firewheel_core::log::{RealtimeLogger, RealtimeLoggerConfig, RealtimeLoggerMainThread};
8use firewheel_core::node::ProcStore;
9use firewheel_core::{
10 channel_config::{ChannelConfig, ChannelCount},
11 clock::AudioClock,
12 dsp::declick::DeclickValues,
13 event::{NodeEvent, NodeEventType},
14 node::{AudioNode, DynAudioNode, NodeID},
15 StreamInfo,
16};
17use ringbuf::traits::{Consumer, Producer, Split};
18use smallvec::SmallVec;
19
20#[cfg(all(not(feature = "std"), feature = "musical_transport"))]
21use bevy_platform::prelude::Box;
22#[cfg(not(feature = "std"))]
23use bevy_platform::prelude::Vec;
24
25use crate::error::RemoveNodeError;
26use crate::processor::BufferOutOfSpaceMode;
27use crate::{
28 backend::{AudioBackend, DeviceInfo},
29 error::{AddEdgeError, StartStreamError, UpdateError},
30 graph::{AudioGraph, Edge, EdgeID, NodeEntry, PortIdx},
31 processor::{
32 ContextToProcessorMsg, FirewheelProcessor, FirewheelProcessorInner, ProcessorToContextMsg,
33 SharedClock,
34 },
35};
36
37#[cfg(feature = "scheduled_events")]
38use crate::processor::ClearScheduledEventsEvent;
39#[cfg(feature = "scheduled_events")]
40use firewheel_core::clock::EventInstant;
41
42#[cfg(feature = "musical_transport")]
43use firewheel_core::clock::TransportState;
44
45#[derive(Debug, Clone, Copy, PartialEq)]
47#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
48#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
49pub struct FirewheelConfig {
50 pub num_graph_inputs: ChannelCount,
52 pub num_graph_outputs: ChannelCount,
54 pub hard_clip_outputs: bool,
63 pub initial_node_capacity: u32,
67 pub initial_edge_capacity: u32,
71 pub declick_seconds: f32,
76 pub initial_event_group_capacity: u32,
80 pub channel_capacity: u32,
84 pub event_queue_capacity: usize,
91 pub immediate_event_capacity: usize,
97 #[cfg(feature = "scheduled_events")]
105 pub scheduled_event_capacity: usize,
106 pub buffer_out_of_space_mode: BufferOutOfSpaceMode,
110
111 pub logger_config: RealtimeLoggerConfig,
113
114 pub debug_force_clear_buffers: bool,
124
125 pub proc_store_capacity: usize,
129}
130
131impl Default for FirewheelConfig {
132 fn default() -> Self {
133 Self {
134 num_graph_inputs: ChannelCount::ZERO,
135 num_graph_outputs: ChannelCount::STEREO,
136 hard_clip_outputs: false,
137 initial_node_capacity: 128,
138 initial_edge_capacity: 256,
139 declick_seconds: DeclickValues::DEFAULT_FADE_SECONDS,
140 initial_event_group_capacity: 128,
141 channel_capacity: 64,
142 event_queue_capacity: 128,
143 immediate_event_capacity: 512,
144 #[cfg(feature = "scheduled_events")]
145 scheduled_event_capacity: 512,
146 buffer_out_of_space_mode: BufferOutOfSpaceMode::AllocateOnAudioThread,
147 logger_config: RealtimeLoggerConfig::default(),
148 debug_force_clear_buffers: false,
149 proc_store_capacity: 8,
150 }
151 }
152}
153
154struct ActiveState<B: AudioBackend> {
155 backend_handle: B,
156 stream_info: StreamInfo,
157}
158
159pub struct FirewheelCtx<B: AudioBackend> {
161 graph: AudioGraph,
162
163 to_processor_tx: ringbuf::HeapProd<ContextToProcessorMsg>,
164 from_processor_rx: ringbuf::HeapCons<ProcessorToContextMsg>,
165 logger_rx: RealtimeLoggerMainThread,
166
167 active_state: Option<ActiveState<B>>,
168
169 processor_channel: Option<(
170 ringbuf::HeapCons<ContextToProcessorMsg>,
171 ringbuf::HeapProd<ProcessorToContextMsg>,
172 triple_buffer::Input<SharedClock<B::Instant>>,
173 RealtimeLogger,
174 ProcStore,
175 )>,
176 processor_drop_rx: Option<ringbuf::HeapCons<FirewheelProcessorInner<B>>>,
177
178 shared_clock_output: RefCell<triple_buffer::Output<SharedClock<B::Instant>>>,
179 sample_rate: NonZeroU32,
180 sample_rate_recip: f64,
181
182 #[cfg(feature = "musical_transport")]
183 transport_state: Box<TransportState>,
184 #[cfg(feature = "musical_transport")]
185 transport_state_alloc_reuse: Option<Box<TransportState>>,
186
187 event_group_pool: Vec<Vec<NodeEvent>>,
189 event_group: Vec<NodeEvent>,
190 initial_event_group_capacity: usize,
191
192 #[cfg(feature = "scheduled_events")]
193 queued_clear_scheduled_events: Vec<ClearScheduledEventsEvent>,
194
195 config: FirewheelConfig,
196}
197
198impl<B: AudioBackend> FirewheelCtx<B> {
199 pub fn new(config: FirewheelConfig) -> Self {
201 let (to_processor_tx, from_context_rx) =
202 ringbuf::HeapRb::<ContextToProcessorMsg>::new(config.channel_capacity as usize).split();
203 let (to_context_tx, from_processor_rx) =
204 ringbuf::HeapRb::<ProcessorToContextMsg>::new(config.channel_capacity as usize * 2)
205 .split();
206
207 let initial_event_group_capacity = config.initial_event_group_capacity as usize;
208 let mut event_group_pool = Vec::with_capacity(16);
209 for _ in 0..3 {
210 event_group_pool.push(Vec::with_capacity(initial_event_group_capacity));
211 }
212
213 let (shared_clock_input, shared_clock_output) =
214 triple_buffer::triple_buffer(&SharedClock::default());
215
216 let (logger, logger_rx) = firewheel_core::log::realtime_logger(config.logger_config);
217
218 let proc_store = ProcStore::with_capacity(config.proc_store_capacity);
219
220 Self {
221 graph: AudioGraph::new(&config),
222 to_processor_tx,
223 from_processor_rx,
224 logger_rx,
225 active_state: None,
226 processor_channel: Some((
227 from_context_rx,
228 to_context_tx,
229 shared_clock_input,
230 logger,
231 proc_store,
232 )),
233 processor_drop_rx: None,
234 shared_clock_output: RefCell::new(shared_clock_output),
235 sample_rate: NonZeroU32::new(44100).unwrap(),
236 sample_rate_recip: 44100.0f64.recip(),
237 #[cfg(feature = "musical_transport")]
238 transport_state: Box::new(TransportState::default()),
239 #[cfg(feature = "musical_transport")]
240 transport_state_alloc_reuse: None,
241 event_group_pool,
242 event_group: Vec::with_capacity(initial_event_group_capacity),
243 initial_event_group_capacity,
244 #[cfg(feature = "scheduled_events")]
245 queued_clear_scheduled_events: Vec::new(),
246 config,
247 }
248 }
249
250 pub fn proc_store(&self) -> Option<&ProcStore> {
254 if let Some((_, _, _, _, proc_store)) = &self.processor_channel {
255 Some(proc_store)
256 } else if let Some(processor) = self.processor_drop_rx.as_ref().unwrap().last() {
257 if processor.poisoned {
258 panic!("The audio thread has panicked!");
259 }
260
261 Some(&processor.extra.store)
262 } else {
263 None
264 }
265 }
266
267 pub fn proc_store_mut(&mut self) -> Option<&mut ProcStore> {
271 if let Some((_, _, _, _, proc_store)) = &mut self.processor_channel {
272 Some(proc_store)
273 } else if let Some(processor) = self.processor_drop_rx.as_mut().unwrap().last_mut() {
274 if processor.poisoned {
275 panic!("The audio thread has panicked!");
276 }
277
278 Some(&mut processor.extra.store)
279 } else {
280 None
281 }
282 }
283
284 pub fn active_backend(&self) -> Option<&B> {
287 self.active_state
288 .as_ref()
289 .map(|state| &state.backend_handle)
290 }
291
292 pub fn active_backend_mut(&mut self) -> Option<&mut B> {
295 self.active_state
296 .as_mut()
297 .map(|state| &mut state.backend_handle)
298 }
299
300 pub fn available_input_devices(&self) -> Vec<DeviceInfo> {
302 B::available_input_devices()
303 }
304
305 pub fn available_output_devices(&self) -> Vec<DeviceInfo> {
307 B::available_output_devices()
308 }
309
310 pub fn can_start_stream(&self) -> bool {
320 if self.is_audio_stream_running() {
321 false
322 } else if let Some(rx) = &self.processor_drop_rx {
323 rx.try_peek().is_some()
324 } else {
325 true
326 }
327 }
328
329 pub fn start_stream(
340 &mut self,
341 config: B::Config,
342 ) -> Result<(), StartStreamError<B::StartStreamError>> {
343 if self.is_audio_stream_running() {
344 return Err(StartStreamError::AlreadyStarted);
345 }
346
347 if !self.can_start_stream() {
348 return Err(StartStreamError::OldStreamNotFinishedStopping);
349 }
350
351 let (mut backend_handle, mut stream_info) =
352 B::start_stream(config).map_err(|e| StartStreamError::BackendError(e))?;
353
354 stream_info.sample_rate_recip = (stream_info.sample_rate.get() as f64).recip();
355 stream_info.declick_frames = NonZeroU32::new(
356 (self.config.declick_seconds * stream_info.sample_rate.get() as f32).round() as u32,
357 )
358 .unwrap_or(NonZeroU32::MIN);
359
360 let maybe_processor = self.processor_channel.take();
361
362 stream_info.prev_sample_rate = if maybe_processor.is_some() {
363 stream_info.sample_rate
364 } else {
365 self.sample_rate
366 };
367
368 self.sample_rate = stream_info.sample_rate;
369 self.sample_rate_recip = stream_info.sample_rate_recip;
370
371 let schedule = self.graph.compile(&stream_info)?;
372
373 let (drop_tx, drop_rx) = ringbuf::HeapRb::<FirewheelProcessorInner<B>>::new(1).split();
374
375 let processor =
376 if let Some((from_context_rx, to_context_tx, shared_clock_input, logger, proc_store)) =
377 maybe_processor
378 {
379 FirewheelProcessorInner::new(
380 from_context_rx,
381 to_context_tx,
382 shared_clock_input,
383 self.config.immediate_event_capacity,
384 #[cfg(feature = "scheduled_events")]
385 self.config.scheduled_event_capacity,
386 self.config.event_queue_capacity,
387 &stream_info,
388 self.config.hard_clip_outputs,
389 self.config.buffer_out_of_space_mode,
390 logger,
391 self.config.debug_force_clear_buffers,
392 proc_store,
393 )
394 } else {
395 let mut processor = self.processor_drop_rx.as_mut().unwrap().try_pop().unwrap();
396
397 if processor.poisoned {
398 panic!("The audio thread has panicked!");
399 }
400
401 processor.new_stream(&stream_info);
402
403 processor
404 };
405
406 backend_handle.set_processor(FirewheelProcessor::new(processor, drop_tx));
407
408 if let Err(_) = self.send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule))
409 {
410 panic!("Firewheel message channel is full!");
411 }
412
413 self.active_state = Some(ActiveState {
414 backend_handle,
415 stream_info,
416 });
417 self.processor_drop_rx = Some(drop_rx);
418
419 Ok(())
420 }
421
422 pub fn stop_stream(&mut self) {
424 self.active_state = None;
427 self.graph.deactivate();
428 }
429
430 pub fn is_audio_stream_running(&self) -> bool {
432 self.active_state.is_some()
433 }
434
435 pub fn stream_info(&self) -> Option<&StreamInfo> {
439 self.active_state.as_ref().map(|s| &s.stream_info)
440 }
441
442 pub fn audio_clock(&self) -> AudioClock {
457 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
463 let clock = clock_borrowed.read();
464
465 let update_instant = audio_clock_update_instant_and_delay(&clock, &self.active_state)
466 .map(|(update_instant, _delay)| update_instant);
467
468 AudioClock {
469 samples: clock.clock_samples,
470 seconds: clock
471 .clock_samples
472 .to_seconds(self.sample_rate, self.sample_rate_recip),
473 #[cfg(feature = "musical_transport")]
474 musical: clock.current_playhead,
475 #[cfg(feature = "musical_transport")]
476 transport_is_playing: clock.transport_is_playing,
477 update_instant,
478 }
479 }
480
481 pub fn audio_clock_corrected(&self) -> AudioClock {
500 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
506 let clock = clock_borrowed.read();
507
508 let Some((update_instant, delay)) =
509 audio_clock_update_instant_and_delay(&clock, &self.active_state)
510 else {
511 return AudioClock {
514 samples: clock.clock_samples,
515 seconds: clock
516 .clock_samples
517 .to_seconds(self.sample_rate, self.sample_rate_recip),
518 #[cfg(feature = "musical_transport")]
519 musical: clock.current_playhead,
520 #[cfg(feature = "musical_transport")]
521 transport_is_playing: clock.transport_is_playing,
522 update_instant: None,
523 };
524 };
525
526 let delta_seconds = DurationSeconds(delay.as_secs_f64());
528
529 let samples = clock.clock_samples + delta_seconds.to_samples(self.sample_rate);
530
531 #[cfg(feature = "musical_transport")]
532 let musical = clock.current_playhead.map(|musical_time| {
533 if clock.transport_is_playing && self.transport_state.transport.is_some() {
534 self.transport_state
535 .transport
536 .as_ref()
537 .unwrap()
538 .delta_seconds_from(musical_time, delta_seconds, clock.speed_multiplier)
539 } else {
540 musical_time
541 }
542 });
543
544 AudioClock {
545 samples,
546 seconds: samples.to_seconds(self.sample_rate, self.sample_rate_recip),
547 #[cfg(feature = "musical_transport")]
548 musical,
549 #[cfg(feature = "musical_transport")]
550 transport_is_playing: clock.transport_is_playing,
551 update_instant: Some(update_instant),
552 }
553 }
554
555 pub fn audio_clock_instant(&self) -> Option<Instant> {
567 let mut clock_borrowed = self.shared_clock_output.borrow_mut();
573 let clock = clock_borrowed.read();
574
575 audio_clock_update_instant_and_delay(&clock, &self.active_state)
576 .map(|(update_instant, _delay)| update_instant)
577 }
578
579 #[cfg(feature = "musical_transport")]
583 pub fn sync_transport(
584 &mut self,
585 transport: &TransportState,
586 ) -> Result<(), UpdateError<B::StreamError>> {
587 if &*self.transport_state != transport {
588 let transport_msg = if let Some(mut t) = self.transport_state_alloc_reuse.take() {
589 *t = transport.clone();
590 t
591 } else {
592 Box::new(transport.clone())
593 };
594
595 self.send_message_to_processor(ContextToProcessorMsg::SetTransportState(transport_msg))
596 .map_err(|(_, e)| e)?;
597
598 *self.transport_state = transport.clone();
599 }
600
601 Ok(())
602 }
603
604 #[cfg(feature = "musical_transport")]
606 pub fn transport_state(&self) -> &TransportState {
607 &self.transport_state
608 }
609
610 #[cfg(feature = "musical_transport")]
612 pub fn transport(&self) -> &TransportState {
613 &self.transport_state
614 }
615
616 pub fn hard_clip_outputs(&self) -> bool {
618 self.config.hard_clip_outputs
619 }
620
621 pub fn set_hard_clip_outputs(
630 &mut self,
631 hard_clip_outputs: bool,
632 ) -> Result<(), UpdateError<B::StreamError>> {
633 if self.config.hard_clip_outputs == hard_clip_outputs {
634 return Ok(());
635 }
636 self.config.hard_clip_outputs = hard_clip_outputs;
637
638 self.send_message_to_processor(ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs))
639 .map_err(|(_, e)| e)
640 }
641
642 pub fn update(&mut self) -> Result<(), UpdateError<B::StreamError>> {
646 self.logger_rx.flush();
647
648 firewheel_core::collector::GlobalRtGc::collect();
649
650 for msg in self.from_processor_rx.pop_iter() {
651 match msg {
652 ProcessorToContextMsg::ReturnEventGroup(mut event_group) => {
653 event_group.clear();
654 self.event_group_pool.push(event_group);
655 }
656 ProcessorToContextMsg::ReturnSchedule(schedule_data) => {
657 let _ = schedule_data;
658 }
659 #[cfg(feature = "musical_transport")]
660 ProcessorToContextMsg::ReturnTransportState(transport_state) => {
661 if self.transport_state_alloc_reuse.is_none() {
662 self.transport_state_alloc_reuse = Some(transport_state);
663 }
664 }
665 #[cfg(feature = "scheduled_events")]
666 ProcessorToContextMsg::ReturnClearScheduledEvents(msgs) => {
667 let _ = msgs;
668 }
669 }
670 }
671
672 self.graph.update(
673 self.active_state.as_ref().map(|s| &s.stream_info),
674 &mut self.event_group,
675 );
676
677 if let Some(active_state) = &mut self.active_state {
678 if let Err(e) = active_state.backend_handle.poll_status() {
679 self.active_state = None;
680 self.graph.deactivate();
681
682 return Err(UpdateError::StreamStoppedUnexpectedly(Some(e)));
683 }
684
685 if self
686 .processor_drop_rx
687 .as_ref()
688 .unwrap()
689 .try_peek()
690 .is_some()
691 {
692 self.active_state = None;
693 self.graph.deactivate();
694
695 return Err(UpdateError::StreamStoppedUnexpectedly(None));
696 }
697 }
698
699 if self.is_audio_stream_running() {
700 if self.graph.needs_compile() {
701 let schedule_data = self
702 .graph
703 .compile(&self.active_state.as_ref().unwrap().stream_info)?;
704
705 if let Err((msg, e)) = self
706 .send_message_to_processor(ContextToProcessorMsg::NewSchedule(schedule_data))
707 {
708 let ContextToProcessorMsg::NewSchedule(schedule) = msg else {
709 unreachable!();
710 };
711
712 self.graph.on_schedule_send_failed(schedule);
713
714 return Err(e);
715 }
716 }
717
718 #[cfg(feature = "scheduled_events")]
719 if !self.queued_clear_scheduled_events.is_empty() {
720 let msgs: SmallVec<[ClearScheduledEventsEvent; 1]> =
721 self.queued_clear_scheduled_events.drain(..).collect();
722
723 if let Err((msg, e)) = self
724 .send_message_to_processor(ContextToProcessorMsg::ClearScheduledEvents(msgs))
725 {
726 let ContextToProcessorMsg::ClearScheduledEvents(mut msgs) = msg else {
727 unreachable!();
728 };
729
730 self.queued_clear_scheduled_events = msgs.drain(..).collect();
731
732 return Err(e);
733 }
734 }
735
736 if !self.event_group.is_empty() {
737 let mut next_event_group = self
738 .event_group_pool
739 .pop()
740 .unwrap_or_else(|| Vec::with_capacity(self.initial_event_group_capacity));
741 core::mem::swap(&mut next_event_group, &mut self.event_group);
742
743 if let Err((msg, e)) = self
744 .send_message_to_processor(ContextToProcessorMsg::EventGroup(next_event_group))
745 {
746 let ContextToProcessorMsg::EventGroup(mut event_group) = msg else {
747 unreachable!();
748 };
749
750 core::mem::swap(&mut event_group, &mut self.event_group);
751 self.event_group_pool.push(event_group);
752
753 return Err(e);
754 }
755 }
756 }
757
758 Ok(())
759 }
760
761 pub fn graph_in_node_id(&self) -> NodeID {
763 self.graph.graph_in_node()
764 }
765
766 pub fn graph_out_node_id(&self) -> NodeID {
768 self.graph.graph_out_node()
769 }
770
771 pub fn add_node<T: AudioNode + 'static>(
773 &mut self,
774 node: T,
775 config: Option<T::Configuration>,
776 ) -> NodeID {
777 self.graph.add_node(node, config)
778 }
779
780 pub fn add_dyn_node<T: DynAudioNode + 'static>(&mut self, node: T) -> NodeID {
782 self.graph.add_dyn_node(node)
783 }
784
785 pub fn remove_node(
796 &mut self,
797 node_id: NodeID,
798 ) -> Result<SmallVec<[EdgeID; 4]>, RemoveNodeError> {
799 self.graph.remove_node(node_id)
800 }
801
802 pub fn node_info(&self, id: NodeID) -> Option<&NodeEntry> {
804 self.graph.node_info(id)
805 }
806
807 pub fn node_state<T: 'static>(&self, id: NodeID) -> Option<&T> {
809 self.graph.node_state(id)
810 }
811
812 pub fn node_state_dyn(&self, id: NodeID) -> Option<&dyn Any> {
814 self.graph.node_state_dyn(id)
815 }
816
817 pub fn node_state_mut<T: 'static>(&mut self, id: NodeID) -> Option<&mut T> {
819 self.graph.node_state_mut(id)
820 }
821
822 pub fn node_state_dyn_mut(&mut self, id: NodeID) -> Option<&mut dyn Any> {
823 self.graph.node_state_dyn_mut(id)
824 }
825
826 pub fn nodes<'a>(&'a self) -> impl Iterator<Item = &'a NodeEntry> {
828 self.graph.nodes()
829 }
830
831 pub fn edges<'a>(&'a self) -> impl Iterator<Item = &'a Edge> {
833 self.graph.edges()
834 }
835
836 pub fn set_graph_channel_config(
840 &mut self,
841 channel_config: ChannelConfig,
842 ) -> SmallVec<[EdgeID; 4]> {
843 self.graph.set_graph_channel_config(channel_config)
844 }
845
846 pub fn connect(
864 &mut self,
865 src_node: NodeID,
866 dst_node: NodeID,
867 ports_src_dst: &[(PortIdx, PortIdx)],
868 check_for_cycles: bool,
869 ) -> Result<SmallVec<[EdgeID; 4]>, AddEdgeError> {
870 self.graph
871 .connect(src_node, dst_node, ports_src_dst, check_for_cycles)
872 }
873
874 pub fn disconnect(
885 &mut self,
886 src_node: NodeID,
887 dst_node: NodeID,
888 ports_src_dst: &[(PortIdx, PortIdx)],
889 ) -> bool {
890 self.graph.disconnect(src_node, dst_node, ports_src_dst)
891 }
892
893 pub fn disconnect_all_between(
898 &mut self,
899 src_node: NodeID,
900 dst_node: NodeID,
901 ) -> SmallVec<[EdgeID; 4]> {
902 self.graph.disconnect_all_between(src_node, dst_node)
903 }
904
905 pub fn disconnect_by_edge_id(&mut self, edge_id: EdgeID) -> bool {
909 self.graph.disconnect_by_edge_id(edge_id)
910 }
911
912 pub fn edge(&self, edge_id: EdgeID) -> Option<&Edge> {
914 self.graph.edge(edge_id)
915 }
916
917 pub fn cycle_detected(&mut self) -> bool {
921 self.graph.cycle_detected()
922 }
923
924 pub fn queue_event(&mut self, event: NodeEvent) {
929 self.event_group.push(event);
930 }
931
932 pub fn queue_event_for(&mut self, node_id: NodeID, event: NodeEventType) {
937 self.queue_event(NodeEvent {
938 node_id,
939 #[cfg(feature = "scheduled_events")]
940 time: None,
941 event,
942 });
943 }
944
945 #[cfg(feature = "scheduled_events")]
953 pub fn schedule_event_for(
954 &mut self,
955 node_id: NodeID,
956 event: NodeEventType,
957 time: Option<EventInstant>,
958 ) {
959 self.queue_event(NodeEvent {
960 node_id,
961 time,
962 event,
963 });
964 }
965
966 #[cfg(feature = "scheduled_events")]
974 pub fn cancel_all_scheduled_events(&mut self, event_type: ClearScheduledEventsType) {
975 self.queued_clear_scheduled_events
976 .push(ClearScheduledEventsEvent {
977 node_id: None,
978 event_type,
979 });
980 }
981
982 #[cfg(feature = "scheduled_events")]
990 pub fn cancel_scheduled_events_for(
991 &mut self,
992 node_id: NodeID,
993 event_type: ClearScheduledEventsType,
994 ) {
995 self.queued_clear_scheduled_events
996 .push(ClearScheduledEventsEvent {
997 node_id: Some(node_id),
998 event_type,
999 });
1000 }
1001
1002 fn send_message_to_processor(
1003 &mut self,
1004 msg: ContextToProcessorMsg,
1005 ) -> Result<(), (ContextToProcessorMsg, UpdateError<B::StreamError>)> {
1006 self.to_processor_tx
1007 .try_push(msg)
1008 .map_err(|msg| (msg, UpdateError::MsgChannelFull))
1009 }
1010}
1011
1012impl<B: AudioBackend> Drop for FirewheelCtx<B> {
1013 fn drop(&mut self) {
1014 self.stop_stream();
1015
1016 #[cfg(not(target_family = "wasm"))]
1019 if let Some(drop_rx) = self.processor_drop_rx.take() {
1020 let now = bevy_platform::time::Instant::now();
1021
1022 while drop_rx.try_peek().is_none() {
1023 if now.elapsed() > core::time::Duration::from_secs(2) {
1024 break;
1025 }
1026
1027 bevy_platform::thread::sleep(core::time::Duration::from_millis(2));
1028 }
1029 }
1030
1031 firewheel_core::collector::GlobalRtGc::collect();
1032 }
1033}
1034
1035impl<B: AudioBackend> FirewheelCtx<B> {
1036 pub fn event_queue(&mut self, id: NodeID) -> ContextQueue<'_, B> {
1038 ContextQueue {
1039 context: self,
1040 id,
1041 #[cfg(feature = "scheduled_events")]
1042 time: None,
1043 }
1044 }
1045
1046 #[cfg(feature = "scheduled_events")]
1047 pub fn event_queue_scheduled(
1048 &mut self,
1049 id: NodeID,
1050 time: Option<EventInstant>,
1051 ) -> ContextQueue<'_, B> {
1052 ContextQueue {
1053 context: self,
1054 id,
1055 time,
1056 }
1057 }
1058}
1059
1060pub struct ContextQueue<'a, B: AudioBackend> {
1081 context: &'a mut FirewheelCtx<B>,
1082 id: NodeID,
1083 #[cfg(feature = "scheduled_events")]
1084 time: Option<EventInstant>,
1085}
1086
1087#[cfg(feature = "scheduled_events")]
1088impl<'a, B: AudioBackend> ContextQueue<'a, B> {
1089 pub fn time(&self) -> Option<EventInstant> {
1090 self.time
1091 }
1092}
1093
1094impl<B: AudioBackend> firewheel_core::diff::EventQueue for ContextQueue<'_, B> {
1095 fn push(&mut self, data: NodeEventType) {
1096 self.context.queue_event(NodeEvent {
1097 event: data,
1098 #[cfg(feature = "scheduled_events")]
1099 time: self.time,
1100 node_id: self.id,
1101 });
1102 }
1103}
1104
1105#[cfg(feature = "scheduled_events")]
1107#[derive(Default, Debug, Clone, Copy, PartialEq)]
1108pub enum ClearScheduledEventsType {
1109 #[default]
1111 All,
1112 NonMusicalOnly,
1114 MusicalOnly,
1116}
1117
1118fn audio_clock_update_instant_and_delay<B: AudioBackend>(
1119 clock: &SharedClock<B::Instant>,
1120 active_state: &Option<ActiveState<B>>,
1121) -> Option<(Instant, Duration)> {
1122 active_state.as_ref().and_then(|active_state| {
1123 clock
1124 .process_timestamp
1125 .clone()
1126 .and_then(|process_timestamp| {
1127 active_state
1128 .backend_handle
1129 .delay_from_last_process(process_timestamp)
1130 .and_then(|delay| {
1131 Instant::now()
1132 .checked_sub(delay)
1133 .map(|instant| (instant, delay))
1134 })
1135 })
1136 })
1137}