1use core::{num::NonZeroU32, usize};
2
3use ringbuf::traits::Producer;
4use thunderdome::Arena;
5
6#[cfg(not(feature = "std"))]
7use bevy_platform::prelude::{Box, Vec};
8
9use firewheel_core::{
10 clock::InstantSamples,
11 dsp::{buffer::ChannelBuffer, declick::DeclickValues},
12 event::{NodeEvent, ProcEventsIndex},
13 log::RealtimeLogger,
14 node::{AudioNodeProcessor, ProcExtra, ProcStore},
15 StreamInfo,
16};
17
18use crate::{
19 backend::{AudioBackend, BackendProcessInfo},
20 graph::ScheduleHeapData,
21 processor::event_scheduler::{EventScheduler, NodeEventSchedulerData},
22};
23
24#[cfg(feature = "scheduled_events")]
25use crate::context::ClearScheduledEventsType;
26#[cfg(feature = "scheduled_events")]
27use firewheel_core::node::NodeID;
28#[cfg(feature = "scheduled_events")]
29use smallvec::SmallVec;
30
31#[cfg(feature = "musical_transport")]
32use firewheel_core::clock::{InstantMusical, TransportState};
33
34mod event_scheduler;
35mod handle_messages;
36mod process;
37
38#[cfg(feature = "musical_transport")]
39mod transport;
40#[cfg(feature = "musical_transport")]
41use transport::ProcTransportState;
42
43pub struct FirewheelProcessor<B: AudioBackend> {
44 inner: Option<FirewheelProcessorInner<B>>,
45 drop_tx: ringbuf::HeapProd<FirewheelProcessorInner<B>>,
46}
47
48impl<B: AudioBackend> Drop for FirewheelProcessor<B> {
49 fn drop(&mut self) {
50 let Some(mut inner) = self.inner.take() else {
51 return;
52 };
53
54 inner.stream_stopped();
55
56 #[cfg(feature = "std")]
58 if std::thread::panicking() {
59 inner.poisoned = true;
60 }
61
62 let _ = self.drop_tx.try_push(inner);
63 }
64}
65
66impl<B: AudioBackend> FirewheelProcessor<B> {
67 pub(crate) fn new(
68 processor: FirewheelProcessorInner<B>,
69 drop_tx: ringbuf::HeapProd<FirewheelProcessorInner<B>>,
70 ) -> Self {
71 Self {
72 inner: Some(processor),
73 drop_tx,
74 }
75 }
76
77 pub fn process_interleaved(
78 &mut self,
79 input: &[f32],
80 output: &mut [f32],
81 info: BackendProcessInfo<B>,
82 ) {
83 if let Some(inner) = &mut self.inner {
84 inner.process_interleaved(input, output, info);
85 }
86 }
87}
88
89pub(crate) struct FirewheelProcessorInner<B: AudioBackend> {
90 nodes: Arena<NodeEntry>,
91 schedule_data: Option<Box<ScheduleHeapData>>,
92
93 from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
94 to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
95
96 event_scheduler: EventScheduler,
97 proc_event_queue: Vec<ProcEventsIndex>,
98
99 sample_rate: NonZeroU32,
100 sample_rate_recip: f64,
101 max_block_frames: usize,
102
103 clock_samples: InstantSamples,
104 shared_clock_input: triple_buffer::Input<SharedClock<B::Instant>>,
105
106 #[cfg(feature = "musical_transport")]
107 proc_transport_state: ProcTransportState,
108
109 hard_clip_outputs: bool,
110
111 pub(crate) extra: ProcExtra,
112
113 pub(crate) poisoned: bool,
117 debug_force_clear_buffers: bool,
118}
119
120impl<B: AudioBackend> FirewheelProcessorInner<B> {
121 pub(crate) fn new(
123 from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
124 to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
125 shared_clock_input: triple_buffer::Input<SharedClock<B::Instant>>,
126 immediate_event_buffer_capacity: usize,
127 #[cfg(feature = "scheduled_events")] scheduled_event_buffer_capacity: usize,
128 node_event_buffer_capacity: usize,
129 stream_info: &StreamInfo,
130 hard_clip_outputs: bool,
131 buffer_out_of_space_mode: BufferOutOfSpaceMode,
132 logger: RealtimeLogger,
133 debug_force_clear_buffers: bool,
134 store: ProcStore,
135 ) -> Self {
136 Self {
137 nodes: Arena::new(),
138 schedule_data: None,
139 from_graph_rx,
140 to_graph_tx,
141 event_scheduler: EventScheduler::new(
142 immediate_event_buffer_capacity,
143 #[cfg(feature = "scheduled_events")]
144 scheduled_event_buffer_capacity,
145 buffer_out_of_space_mode,
146 ),
147 proc_event_queue: Vec::with_capacity(node_event_buffer_capacity),
148 sample_rate: stream_info.sample_rate,
149 sample_rate_recip: stream_info.sample_rate_recip,
150 max_block_frames: stream_info.max_block_frames.get() as usize,
151 clock_samples: InstantSamples(0),
152 shared_clock_input,
153 #[cfg(feature = "musical_transport")]
154 proc_transport_state: ProcTransportState::new(),
155 hard_clip_outputs,
156 extra: ProcExtra {
157 scratch_buffers: ChannelBuffer::new(stream_info.max_block_frames.get() as usize),
158 declick_values: DeclickValues::new(stream_info.declick_frames),
159 logger,
160 store,
161 },
162 poisoned: false,
163 debug_force_clear_buffers,
164 }
165 }
166}
167
168pub(crate) struct NodeEntry {
169 pub processor: Box<dyn AudioNodeProcessor>,
170 pub prev_output_was_silent: bool,
171
172 event_data: NodeEventSchedulerData,
173}
174
175pub(crate) enum ContextToProcessorMsg {
176 EventGroup(Vec<NodeEvent>),
177 NewSchedule(Box<ScheduleHeapData>),
178 HardClipOutputs(bool),
179 #[cfg(feature = "musical_transport")]
180 SetTransportState(Box<TransportState>),
181 #[cfg(feature = "scheduled_events")]
182 ClearScheduledEvents(SmallVec<[ClearScheduledEventsEvent; 1]>),
183}
184
185pub(crate) enum ProcessorToContextMsg {
186 ReturnEventGroup(Vec<NodeEvent>),
187 ReturnSchedule(Box<ScheduleHeapData>),
188 #[cfg(feature = "musical_transport")]
189 ReturnTransportState(Box<TransportState>),
190 #[cfg(feature = "scheduled_events")]
191 ReturnClearScheduledEvents(SmallVec<[ClearScheduledEventsEvent; 1]>),
192}
193
194#[cfg(feature = "scheduled_events")]
195pub(crate) struct ClearScheduledEventsEvent {
196 pub node_id: Option<NodeID>,
198 pub event_type: ClearScheduledEventsType,
199}
200
201#[derive(Clone)]
202pub(crate) struct SharedClock<I: Clone> {
203 pub clock_samples: InstantSamples,
204 #[cfg(feature = "musical_transport")]
205 pub current_playhead: Option<InstantMusical>,
206 #[cfg(feature = "musical_transport")]
207 pub speed_multiplier: f64,
208 #[cfg(feature = "musical_transport")]
209 pub transport_is_playing: bool,
210 pub process_timestamp: Option<I>,
211}
212
213impl<I: Clone> Default for SharedClock<I> {
214 fn default() -> Self {
215 Self {
216 clock_samples: InstantSamples(0),
217 #[cfg(feature = "musical_transport")]
218 current_playhead: None,
219 #[cfg(feature = "musical_transport")]
220 speed_multiplier: 1.0,
221 #[cfg(feature = "musical_transport")]
222 transport_is_playing: false,
223 process_timestamp: None,
224 }
225 }
226}
227
228#[derive(Default, Debug, Clone, Copy, PartialEq, PartialOrd)]
230#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
231#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
232pub enum BufferOutOfSpaceMode {
233 #[default]
234 AllocateOnAudioThread,
239 Panic,
242 DropEvents,
248}