1use core::{num::NonZeroU32, usize};
2
3use ringbuf::traits::Producer;
4use thunderdome::Arena;
5
6#[cfg(not(feature = "std"))]
7use bevy_platform::prelude::{Box, Vec};
8
9use firewheel_core::{
10 clock::InstantSamples,
11 dsp::{buffer::ChannelBuffer, declick::DeclickValues},
12 event::{NodeEvent, ProcEventsIndex},
13 log::RealtimeLogger,
14 node::{AudioNodeProcessor, ProcExtra},
15 StreamInfo,
16};
17
18use crate::{
19 backend::{AudioBackend, BackendProcessInfo},
20 graph::ScheduleHeapData,
21 processor::event_scheduler::{EventScheduler, NodeEventSchedulerData},
22};
23
24#[cfg(feature = "scheduled_events")]
25use crate::context::ClearScheduledEventsType;
26#[cfg(feature = "scheduled_events")]
27use firewheel_core::node::NodeID;
28#[cfg(feature = "scheduled_events")]
29use smallvec::SmallVec;
30
31#[cfg(feature = "musical_transport")]
32use firewheel_core::clock::{InstantMusical, TransportState};
33
34mod event_scheduler;
35mod handle_messages;
36mod process;
37
38#[cfg(feature = "musical_transport")]
39mod transport;
40#[cfg(feature = "musical_transport")]
41use transport::ProcTransportState;
42
43pub struct FirewheelProcessor<B: AudioBackend> {
44 inner: Option<FirewheelProcessorInner<B>>,
45 drop_tx: ringbuf::HeapProd<FirewheelProcessorInner<B>>,
46}
47
48impl<B: AudioBackend> Drop for FirewheelProcessor<B> {
49 fn drop(&mut self) {
50 let Some(mut inner) = self.inner.take() else {
51 return;
52 };
53
54 inner.stream_stopped();
55
56 #[cfg(feature = "std")]
58 if std::thread::panicking() {
59 inner.poisoned = true;
60 }
61
62 let _ = self.drop_tx.try_push(inner);
63 }
64}
65
66impl<B: AudioBackend> FirewheelProcessor<B> {
67 pub(crate) fn new(
68 processor: FirewheelProcessorInner<B>,
69 drop_tx: ringbuf::HeapProd<FirewheelProcessorInner<B>>,
70 ) -> Self {
71 Self {
72 inner: Some(processor),
73 drop_tx,
74 }
75 }
76
77 pub fn process_interleaved(
78 &mut self,
79 input: &[f32],
80 output: &mut [f32],
81 info: BackendProcessInfo<B>,
82 ) {
83 if let Some(inner) = &mut self.inner {
84 inner.process_interleaved(input, output, info);
85 }
86 }
87}
88
89pub(crate) struct FirewheelProcessorInner<B: AudioBackend> {
90 nodes: Arena<NodeEntry>,
91 schedule_data: Option<Box<ScheduleHeapData>>,
92
93 from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
94 to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
95
96 event_scheduler: EventScheduler,
97 proc_event_queue: Vec<ProcEventsIndex>,
98
99 sample_rate: NonZeroU32,
100 sample_rate_recip: f64,
101 max_block_frames: usize,
102
103 clock_samples: InstantSamples,
104 shared_clock_input: triple_buffer::Input<SharedClock<B::Instant>>,
105
106 #[cfg(feature = "musical_transport")]
107 proc_transport_state: ProcTransportState,
108
109 hard_clip_outputs: bool,
110
111 extra: ProcExtra,
112
113 pub(crate) poisoned: bool,
117 debug_force_clear_buffers: bool,
118}
119
120impl<B: AudioBackend> FirewheelProcessorInner<B> {
121 pub(crate) fn new(
123 from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
124 to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
125 shared_clock_input: triple_buffer::Input<SharedClock<B::Instant>>,
126 immediate_event_buffer_capacity: usize,
127 #[cfg(feature = "scheduled_events")] scheduled_event_buffer_capacity: usize,
128 node_event_buffer_capacity: usize,
129 stream_info: &StreamInfo,
130 hard_clip_outputs: bool,
131 buffer_out_of_space_mode: BufferOutOfSpaceMode,
132 logger: RealtimeLogger,
133 debug_force_clear_buffers: bool,
134 ) -> Self {
135 Self {
136 nodes: Arena::new(),
137 schedule_data: None,
138 from_graph_rx,
139 to_graph_tx,
140 event_scheduler: EventScheduler::new(
141 immediate_event_buffer_capacity,
142 #[cfg(feature = "scheduled_events")]
143 scheduled_event_buffer_capacity,
144 buffer_out_of_space_mode,
145 ),
146 proc_event_queue: Vec::with_capacity(node_event_buffer_capacity),
147 sample_rate: stream_info.sample_rate,
148 sample_rate_recip: stream_info.sample_rate_recip,
149 max_block_frames: stream_info.max_block_frames.get() as usize,
150 clock_samples: InstantSamples(0),
151 shared_clock_input,
152 #[cfg(feature = "musical_transport")]
153 proc_transport_state: ProcTransportState::new(),
154 hard_clip_outputs,
155 extra: ProcExtra {
156 scratch_buffers: ChannelBuffer::new(stream_info.max_block_frames.get() as usize),
157 declick_values: DeclickValues::new(stream_info.declick_frames),
158 logger,
159 },
160 poisoned: false,
161 debug_force_clear_buffers,
162 }
163 }
164}
165
166pub(crate) struct NodeEntry {
167 pub processor: Box<dyn AudioNodeProcessor>,
168
169 event_data: NodeEventSchedulerData,
170}
171
172pub(crate) enum ContextToProcessorMsg {
173 EventGroup(Vec<NodeEvent>),
174 NewSchedule(Box<ScheduleHeapData>),
175 HardClipOutputs(bool),
176 #[cfg(feature = "musical_transport")]
177 SetTransportState(Box<TransportState>),
178 #[cfg(feature = "scheduled_events")]
179 ClearScheduledEvents(SmallVec<[ClearScheduledEventsEvent; 1]>),
180}
181
182pub(crate) enum ProcessorToContextMsg {
183 ReturnEventGroup(Vec<NodeEvent>),
184 ReturnSchedule(Box<ScheduleHeapData>),
185 #[cfg(feature = "musical_transport")]
186 ReturnTransportState(Box<TransportState>),
187 #[cfg(feature = "scheduled_events")]
188 ReturnClearScheduledEvents(SmallVec<[ClearScheduledEventsEvent; 1]>),
189}
190
191#[cfg(feature = "scheduled_events")]
192pub(crate) struct ClearScheduledEventsEvent {
193 pub node_id: Option<NodeID>,
195 pub event_type: ClearScheduledEventsType,
196}
197
198#[derive(Clone)]
199pub(crate) struct SharedClock<I: Clone> {
200 pub clock_samples: InstantSamples,
201 #[cfg(feature = "musical_transport")]
202 pub current_playhead: Option<InstantMusical>,
203 #[cfg(feature = "musical_transport")]
204 pub speed_multiplier: f64,
205 #[cfg(feature = "musical_transport")]
206 pub transport_is_playing: bool,
207 pub process_timestamp: Option<I>,
208}
209
210impl<I: Clone> Default for SharedClock<I> {
211 fn default() -> Self {
212 Self {
213 clock_samples: InstantSamples(0),
214 #[cfg(feature = "musical_transport")]
215 current_playhead: None,
216 #[cfg(feature = "musical_transport")]
217 speed_multiplier: 1.0,
218 #[cfg(feature = "musical_transport")]
219 transport_is_playing: false,
220 process_timestamp: None,
221 }
222 }
223}
224
225#[derive(Default, Debug, Clone, Copy, PartialEq, PartialOrd)]
227pub enum BufferOutOfSpaceMode {
228 #[default]
229 AllocateOnAudioThread,
234 Panic,
237 DropEvents,
243}