1use core::num::NonZeroU32;
2
3use ringbuf::traits::Producer;
4use thunderdome::Arena;
5
6#[cfg(not(feature = "std"))]
7use bevy_platform::prelude::{Box, Vec};
8
9use firewheel_core::{
10 clock::InstantSamples,
11 dsp::{buffer::ChannelBuffer, declick::DeclickValues},
12 event::{NodeEvent, ProcEventsIndex},
13 node::{AudioNodeProcessor, ProcExtra},
14 StreamInfo,
15};
16
17use crate::{
18 backend::{AudioBackend, BackendProcessInfo},
19 context::ProcessorChannel,
20 graph::ScheduleHeapData,
21 processor::event_scheduler::{EventScheduler, NodeEventSchedulerData},
22};
23
24#[cfg(feature = "scheduled_events")]
25use crate::context::ClearScheduledEventsType;
26#[cfg(feature = "scheduled_events")]
27use firewheel_core::node::NodeID;
28#[cfg(feature = "scheduled_events")]
29use smallvec::SmallVec;
30
31#[cfg(feature = "musical_transport")]
32use firewheel_core::clock::{InstantMusical, TransportState};
33
34mod event_scheduler;
35mod handle_messages;
36mod process;
37
38#[cfg(feature = "musical_transport")]
39mod transport;
40#[cfg(feature = "musical_transport")]
41use transport::ProcTransportState;
42
43pub struct FirewheelProcessor<B: AudioBackend> {
44 inner: Option<FirewheelProcessorInner<B>>,
45 drop_tx: ringbuf::HeapProd<FirewheelProcessorInner<B>>,
46}
47
48impl<B: AudioBackend> Drop for FirewheelProcessor<B> {
49 fn drop(&mut self) {
50 let Some(mut inner) = self.inner.take() else {
51 return;
52 };
53
54 inner.stream_stopped();
55
56 #[cfg(feature = "std")]
58 if std::thread::panicking() {
59 inner.poisoned = true;
60 }
61
62 let _ = self.drop_tx.try_push(inner);
63 }
64}
65
66impl<B: AudioBackend> FirewheelProcessor<B> {
67 pub(crate) fn new(
68 processor: FirewheelProcessorInner<B>,
69 drop_tx: ringbuf::HeapProd<FirewheelProcessorInner<B>>,
70 ) -> Self {
71 Self {
72 inner: Some(processor),
73 drop_tx,
74 }
75 }
76
77 pub fn process_interleaved(
78 &mut self,
79 input: &[f32],
80 output: &mut [f32],
81 info: BackendProcessInfo<B>,
82 ) {
83 if let Some(inner) = &mut self.inner {
84 inner.process_interleaved(input, output, info);
85 }
86 }
87}
88
89pub(crate) struct FirewheelProcessorInner<B: AudioBackend> {
90 nodes: Arena<NodeEntry>,
91 schedule_data: Option<Box<ScheduleHeapData>>,
92
93 from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
94 to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
95
96 event_scheduler: EventScheduler,
97 proc_event_queue: Vec<ProcEventsIndex>,
98
99 sample_rate: NonZeroU32,
100 sample_rate_recip: f64,
101 max_block_frames: usize,
102
103 clock_samples: InstantSamples,
104 shared_clock_input: triple_buffer::Input<SharedClock<B::Instant>>,
105
106 #[cfg(feature = "musical_transport")]
107 proc_transport_state: ProcTransportState,
108
109 hard_clip_outputs: bool,
110
111 pub(crate) extra: ProcExtra,
112
113 pub(crate) poisoned: bool,
117 debug_force_clear_buffers: bool,
118}
119
120impl<B: AudioBackend> FirewheelProcessorInner<B> {
121 pub(crate) fn new(
123 proc_channel: ProcessorChannel<B>,
124 immediate_event_buffer_capacity: usize,
125 #[cfg(feature = "scheduled_events")] scheduled_event_buffer_capacity: usize,
126 node_event_buffer_capacity: usize,
127 stream_info: &StreamInfo,
128 hard_clip_outputs: bool,
129 buffer_out_of_space_mode: BufferOutOfSpaceMode,
130 debug_force_clear_buffers: bool,
131 ) -> Self {
132 let ProcessorChannel {
133 from_context_rx,
134 to_context_tx,
135 shared_clock_input,
136 logger,
137 store,
138 } = proc_channel;
139 Self {
140 nodes: Arena::new(),
141 schedule_data: None,
142 from_graph_rx: from_context_rx,
143 to_graph_tx: to_context_tx,
144 event_scheduler: EventScheduler::new(
145 immediate_event_buffer_capacity,
146 #[cfg(feature = "scheduled_events")]
147 scheduled_event_buffer_capacity,
148 buffer_out_of_space_mode,
149 ),
150 proc_event_queue: Vec::with_capacity(node_event_buffer_capacity),
151 sample_rate: stream_info.sample_rate,
152 sample_rate_recip: stream_info.sample_rate_recip,
153 max_block_frames: stream_info.max_block_frames.get() as usize,
154 clock_samples: InstantSamples(0),
155 shared_clock_input,
156 #[cfg(feature = "musical_transport")]
157 proc_transport_state: ProcTransportState::new(),
158 hard_clip_outputs,
159 extra: ProcExtra {
160 scratch_buffers: ChannelBuffer::new(stream_info.max_block_frames.get() as usize),
161 declick_values: DeclickValues::new(stream_info.declick_frames),
162 logger,
163 store,
164 },
165 poisoned: false,
166 debug_force_clear_buffers,
167 }
168 }
169}
170
171pub(crate) struct NodeEntry {
172 pub processor: Box<dyn AudioNodeProcessor>,
173 pub prev_output_was_silent: bool,
174
175 event_data: NodeEventSchedulerData,
176}
177
178pub(crate) enum ContextToProcessorMsg {
179 EventGroup(Vec<NodeEvent>),
180 NewSchedule(Box<ScheduleHeapData>),
181 HardClipOutputs(bool),
182 #[cfg(feature = "musical_transport")]
183 SetTransportState(Box<TransportState>),
184 #[cfg(feature = "scheduled_events")]
185 ClearScheduledEvents(SmallVec<[ClearScheduledEventsEvent; 1]>),
186}
187
188pub(crate) enum ProcessorToContextMsg {
189 ReturnEventGroup(Vec<NodeEvent>),
190 ReturnSchedule(Box<ScheduleHeapData>),
191 #[cfg(feature = "musical_transport")]
192 ReturnTransportState(Box<TransportState>),
193 #[cfg(feature = "scheduled_events")]
194 ReturnClearScheduledEvents(SmallVec<[ClearScheduledEventsEvent; 1]>),
195}
196
197#[cfg(feature = "scheduled_events")]
198pub(crate) struct ClearScheduledEventsEvent {
199 pub node_id: Option<NodeID>,
201 pub event_type: ClearScheduledEventsType,
202}
203
204#[derive(Clone)]
205pub(crate) struct SharedClock<I: Clone> {
206 pub clock_samples: InstantSamples,
207 #[cfg(feature = "musical_transport")]
208 pub current_playhead: Option<InstantMusical>,
209 #[cfg(feature = "musical_transport")]
210 pub speed_multiplier: f64,
211 #[cfg(feature = "musical_transport")]
212 pub transport_is_playing: bool,
213 pub process_timestamp: Option<I>,
214}
215
216impl<I: Clone> Default for SharedClock<I> {
217 fn default() -> Self {
218 Self {
219 clock_samples: InstantSamples(0),
220 #[cfg(feature = "musical_transport")]
221 current_playhead: None,
222 #[cfg(feature = "musical_transport")]
223 speed_multiplier: 1.0,
224 #[cfg(feature = "musical_transport")]
225 transport_is_playing: false,
226 process_timestamp: None,
227 }
228 }
229}
230
231#[derive(Default, Debug, Clone, Copy, PartialEq, PartialOrd)]
233#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
234#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
235pub enum BufferOutOfSpaceMode {
236 #[default]
237 AllocateOnAudioThread,
242 Panic,
245 DropEvents,
251}