1use bevy_platform::sync::{atomic::Ordering, Arc};
2use core::{num::NonZeroU32, ops::Range};
3
4use ringbuf::traits::{Consumer, Producer};
5use thunderdome::Arena;
6
7use crate::{
8 context::ClockValues,
9 graph::{NodeHeapData, ScheduleHeapData},
10};
11use firewheel_core::{
12 clock::{ClockSamples, ClockSeconds, MusicalTime, MusicalTransport},
13 dsp::{buffer::ChannelBuffer, declick::DeclickValues},
14 event::{NodeEvent, NodeEventList},
15 node::{
16 AudioNodeProcessor, NodeID, ProcBuffers, ProcInfo, ProcessStatus, StreamStatus,
17 TransportInfo, NUM_SCRATCH_BUFFERS,
18 },
19 SilenceMask, StreamInfo,
20};
21
22pub struct FirewheelProcessor {
23 inner: Option<FirewheelProcessorInner>,
24 drop_tx: ringbuf::HeapProd<FirewheelProcessorInner>,
25}
26
27impl Drop for FirewheelProcessor {
28 fn drop(&mut self) {
29 let Some(mut inner) = self.inner.take() else {
30 return;
31 };
32
33 inner.stream_stopped();
34
35 if std::thread::panicking() {
38 inner.poisoned = true;
39 }
40
41 let _ = self.drop_tx.try_push(inner);
42 }
43}
44
45impl FirewheelProcessor {
46 pub(crate) fn new(
47 processor: FirewheelProcessorInner,
48 drop_tx: ringbuf::HeapProd<FirewheelProcessorInner>,
49 ) -> Self {
50 Self {
51 inner: Some(processor),
52 drop_tx,
53 }
54 }
55
56 pub fn process_interleaved(
57 &mut self,
58 input: &[f32],
59 output: &mut [f32],
60 num_in_channels: usize,
61 num_out_channels: usize,
62 frames: usize,
63 clock_seconds: ClockSeconds,
64 stream_status: StreamStatus,
65 ) {
66 if let Some(inner) = &mut self.inner {
67 inner.process_interleaved(
68 input,
69 output,
70 num_in_channels,
71 num_out_channels,
72 frames,
73 clock_seconds,
74 stream_status,
75 );
76 }
77 }
78}
79
80pub(crate) struct FirewheelProcessorInner {
81 nodes: Arena<NodeEntry>,
82 schedule_data: Option<Box<ScheduleHeapData>>,
83
84 from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
85 to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
86
87 event_buffer: Vec<NodeEvent>,
88
89 sample_rate: NonZeroU32,
90 sample_rate_recip: f64,
91 max_block_frames: usize,
92
93 clock_samples: ClockSamples,
94 clock_shared: Arc<ClockValues>,
95
96 last_clock_seconds: ClockSeconds,
97 clock_seconds_offset: f64,
98 is_new_stream: bool,
99
100 hard_clip_outputs: bool,
101
102 scratch_buffers: ChannelBuffer<f32, NUM_SCRATCH_BUFFERS>,
103 declick_values: DeclickValues,
104
105 transport: Option<TransportState>,
106
107 pub(crate) poisoned: bool,
111}
112
113impl FirewheelProcessorInner {
114 pub(crate) fn new(
116 from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
117 to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
118 clock_shared: Arc<ClockValues>,
119 node_capacity: usize,
120 stream_info: &StreamInfo,
121 hard_clip_outputs: bool,
122 ) -> Self {
123 Self {
124 nodes: Arena::with_capacity(node_capacity * 2),
125 schedule_data: None,
126 from_graph_rx,
127 to_graph_tx,
128 event_buffer: Vec::new(),
129 sample_rate: stream_info.sample_rate,
130 sample_rate_recip: stream_info.sample_rate_recip,
131 max_block_frames: stream_info.max_block_frames.get() as usize,
132 clock_samples: ClockSamples(0),
133 clock_shared,
134 last_clock_seconds: ClockSeconds(0.0),
135 clock_seconds_offset: 0.0,
136 is_new_stream: false,
137 hard_clip_outputs,
138 scratch_buffers: ChannelBuffer::new(stream_info.max_block_frames.get() as usize),
139 declick_values: DeclickValues::new(stream_info.declick_frames),
140 transport: None,
141 poisoned: false,
142 }
143 }
144
145 fn stream_stopped(&mut self) {
146 for (_, node) in self.nodes.iter_mut() {
147 node.processor.stream_stopped();
148 }
149 }
150
151 pub fn new_stream(&mut self, stream_info: &StreamInfo) {
155 for (_, node) in self.nodes.iter_mut() {
156 node.processor.new_stream(stream_info);
157 }
158
159 if self.sample_rate != stream_info.sample_rate {
160 self.sample_rate = stream_info.sample_rate;
161 self.sample_rate_recip = stream_info.sample_rate_recip;
162
163 self.declick_values = DeclickValues::new(stream_info.declick_frames);
164 }
165
166 if self.max_block_frames != stream_info.max_block_frames.get() as usize {
167 self.max_block_frames = stream_info.max_block_frames.get() as usize;
168
169 self.scratch_buffers = ChannelBuffer::new(stream_info.max_block_frames.get() as usize);
170 }
171
172 self.is_new_stream = true;
173 }
174
175 pub fn process_interleaved(
179 &mut self,
180 input: &[f32],
181 output: &mut [f32],
182 num_in_channels: usize,
183 num_out_channels: usize,
184 frames: usize,
185 clock_seconds: ClockSeconds,
186 stream_status: StreamStatus,
187 ) {
188 self.poll_messages();
189
190 let mut clock_samples = self.clock_samples;
191 self.clock_samples += ClockSamples(frames as i64);
192 self.clock_shared
193 .samples
194 .store(self.clock_samples.0, Ordering::Relaxed);
195
196 if self.is_new_stream {
197 self.is_new_stream = false;
198
199 self.clock_seconds_offset = self.last_clock_seconds.0 - clock_seconds.0;
201 }
202
203 let mut clock_seconds = ClockSeconds(clock_seconds.0 + self.clock_seconds_offset);
204 self.last_clock_seconds =
205 ClockSeconds(clock_seconds.0 + (frames as f64 * self.sample_rate_recip));
206 self.clock_shared
207 .seconds
208 .store(self.last_clock_seconds.0, Ordering::Relaxed);
209
210 if let Some(transport) = &self.transport {
211 if !transport.stopped && !transport.paused {
212 self.clock_shared.musical.store(
213 transport
214 .transport
215 .sample_to_musical(
216 self.clock_samples - transport.start_frame,
217 self.sample_rate.get(),
218 self.sample_rate_recip,
219 )
220 .0,
221 Ordering::Relaxed,
222 );
223 }
224 }
225
226 if self.schedule_data.is_none() || frames == 0 {
227 output.fill(0.0);
228 return;
230 };
231
232 assert_eq!(input.len(), frames * num_in_channels);
233 assert_eq!(output.len(), frames * num_out_channels);
234
235 let mut frames_processed = 0;
236 while frames_processed < frames {
237 let block_frames = (frames - frames_processed).min(self.max_block_frames);
238
239 self.schedule_data
241 .as_mut()
242 .unwrap()
243 .schedule
244 .prepare_graph_inputs(
245 block_frames,
246 num_in_channels,
247 |channels: &mut [&mut [f32]]| -> SilenceMask {
248 firewheel_core::dsp::interleave::deinterleave(
249 channels,
250 0,
251 &input[frames_processed * num_in_channels
252 ..(frames_processed + block_frames) * num_in_channels],
253 num_in_channels,
254 true,
255 )
256 },
257 );
258
259 let next_clock_seconds =
260 clock_seconds + ClockSeconds(block_frames as f64 * self.sample_rate_recip);
261
262 self.process_block(
263 block_frames,
264 clock_samples,
265 clock_seconds..next_clock_seconds,
266 stream_status,
267 );
268
269 self.schedule_data
271 .as_mut()
272 .unwrap()
273 .schedule
274 .read_graph_outputs(
275 block_frames,
276 num_out_channels,
277 |channels: &[&[f32]], silence_mask| {
278 firewheel_core::dsp::interleave::interleave(
279 channels,
280 0,
281 &mut output[frames_processed * num_out_channels
282 ..(frames_processed + block_frames) * num_out_channels],
283 num_out_channels,
284 Some(silence_mask),
285 );
286 },
287 );
288
289 frames_processed += block_frames;
299 clock_samples += ClockSamples(block_frames as i64);
300 clock_seconds = next_clock_seconds;
301 }
302
303 if self.hard_clip_outputs {
304 for s in output.iter_mut() {
305 *s = s.fract();
306 }
307 }
308
309 if self.event_buffer.capacity() > 0 {
310 let mut event_group = Vec::new();
311 core::mem::swap(&mut self.event_buffer, &mut event_group);
312
313 let _ = self
314 .to_graph_tx
315 .try_push(ProcessorToContextMsg::ReturnEventGroup(event_group));
316 }
317 }
318
319 fn poll_messages(&mut self) {
320 for msg in self.from_graph_rx.pop_iter() {
321 match msg {
322 ContextToProcessorMsg::EventGroup(mut event_group) => {
323 let num_existing_events = self.event_buffer.len();
324
325 if self.event_buffer.capacity() == 0 {
326 core::mem::swap(&mut self.event_buffer, &mut event_group);
327 } else {
328 self.event_buffer.append(&mut event_group);
329
330 let _ = self
331 .to_graph_tx
332 .try_push(ProcessorToContextMsg::ReturnEventGroup(event_group));
333 }
334
335 for (i, event) in self.event_buffer[num_existing_events..].iter().enumerate() {
336 if let Some(node_entry) = self.nodes.get_mut(event.node_id.0) {
337 node_entry
338 .event_indices
339 .push((i + num_existing_events) as u32);
340 }
341 }
342 }
343 ContextToProcessorMsg::NewSchedule(mut new_schedule_data) => {
344 assert_eq!(
345 new_schedule_data.schedule.max_block_frames(),
346 self.max_block_frames
347 );
348
349 if let Some(mut old_schedule_data) = self.schedule_data.take() {
350 core::mem::swap(
351 &mut old_schedule_data.removed_nodes,
352 &mut new_schedule_data.removed_nodes,
353 );
354
355 for node_id in new_schedule_data.nodes_to_remove.iter() {
356 if let Some(node_entry) = self.nodes.remove(node_id.0) {
357 old_schedule_data.removed_nodes.push(NodeHeapData {
358 id: *node_id,
359 processor: node_entry.processor,
360 event_buffer_indices: node_entry.event_indices,
361 });
362 }
363 }
364
365 let _ = self
366 .to_graph_tx
367 .try_push(ProcessorToContextMsg::ReturnSchedule(old_schedule_data));
368 }
369
370 for n in new_schedule_data.new_node_processors.drain(..) {
371 assert!(self
372 .nodes
373 .insert_at(
374 n.id.0,
375 NodeEntry {
376 processor: n.processor,
377 event_indices: n.event_buffer_indices,
378 }
379 )
380 .is_none());
381 }
382
383 self.schedule_data = Some(new_schedule_data);
384 }
385 ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs) => {
386 self.hard_clip_outputs = hard_clip_outputs;
387 }
388 ContextToProcessorMsg::SetTransport(transport) => {
389 if let Some(old_transport) = &mut self.transport {
390 if let Some(new_transport) = &transport {
391 if !old_transport.stopped {
392 let sample_time = if old_transport.paused {
396 old_transport.paused_at_frame - old_transport.start_frame
397 } else {
398 self.clock_samples - old_transport.start_frame
399 };
400
401 let current_musical = old_transport.transport.sample_to_musical(
402 sample_time,
403 self.sample_rate.get(),
404 self.sample_rate_recip,
405 );
406
407 old_transport.start_frame = self.clock_samples
408 - new_transport
409 .musical_to_sample(current_musical, self.sample_rate.get());
410
411 old_transport.paused_at_frame = self.clock_samples;
412 }
413
414 old_transport.transport = *new_transport;
415 } else {
416 self.transport = None;
417 self.clock_shared.musical.store(0.0, Ordering::Relaxed);
418 }
419 } else {
420 self.transport = transport.map(|transport| TransportState {
421 transport,
422 start_frame: ClockSamples::default(),
423 paused_at_frame: ClockSamples::default(),
424 paused_at_musical_time: MusicalTime::default(),
425 paused: false,
426 stopped: true,
427 });
428
429 self.clock_shared.musical.store(0.0, Ordering::Relaxed);
430 }
431 }
432 ContextToProcessorMsg::StartOrRestartTransport => {
433 if let Some(transport) = &mut self.transport {
434 transport.stopped = false;
435 transport.paused = false;
436 transport.start_frame = self.clock_samples;
437 }
438
439 self.clock_shared.musical.store(0.0, Ordering::Relaxed);
440 }
441 ContextToProcessorMsg::PauseTransport => {
442 if let Some(transport) = &mut self.transport {
443 if !transport.stopped && !transport.paused {
444 transport.paused = true;
445 transport.paused_at_frame = self.clock_samples;
446 transport.paused_at_musical_time =
447 transport.transport.sample_to_musical(
448 self.clock_samples - transport.start_frame,
449 self.sample_rate.get(),
450 self.sample_rate_recip,
451 );
452 }
453 }
454 }
455 ContextToProcessorMsg::ResumeTransport => {
456 if let Some(transport) = &mut self.transport {
457 if !transport.stopped && transport.paused {
458 transport.paused = false;
459 transport.start_frame +=
460 ClockSamples(self.clock_samples.0 - transport.paused_at_frame.0);
461 }
462 }
463 }
464 ContextToProcessorMsg::StopTransport => {
465 if let Some(transport) = &mut self.transport {
466 transport.stopped = true;
467 }
468
469 self.clock_shared.musical.store(0.0, Ordering::Relaxed);
470 }
471 }
472 }
473 }
474
475 fn process_block(
476 &mut self,
477 block_frames: usize,
478 clock_samples: ClockSamples,
479 clock_seconds: Range<ClockSeconds>,
480 stream_status: StreamStatus,
481 ) {
482 if self.schedule_data.is_none() {
483 return;
484 }
485 let schedule_data = self.schedule_data.as_mut().unwrap();
486
487 let mut scratch_buffers = self.scratch_buffers.get_mut(self.max_block_frames);
488
489 let transport_info = if let Some(t) = &self.transport {
490 if t.stopped {
491 None
492 } else {
493 let (start_beat, end_beat) = if t.paused {
494 (t.paused_at_musical_time, t.paused_at_musical_time)
495 } else {
496 (
497 t.transport.sample_to_musical(
498 clock_samples - t.start_frame,
499 self.sample_rate.get(),
500 self.sample_rate_recip,
501 ),
502 t.transport.sample_to_musical(
503 clock_samples - t.start_frame + ClockSamples(block_frames as i64),
504 self.sample_rate.get(),
505 self.sample_rate_recip,
506 ),
507 )
508 };
509
510 Some(TransportInfo {
511 musical_clock: start_beat..end_beat,
512 transport: &t.transport,
513 paused: t.paused,
514 })
515 }
516 } else {
517 None
518 };
519
520 let mut proc_info = ProcInfo {
521 frames: block_frames,
522 in_silence_mask: SilenceMask::default(),
523 out_silence_mask: SilenceMask::default(),
524 clock_samples,
525 clock_seconds: clock_seconds.clone(),
526 transport_info,
527 stream_status,
528 declick_values: &self.declick_values,
529 };
530
531 schedule_data.schedule.process(
532 block_frames,
533 &mut scratch_buffers,
534 |node_id: NodeID,
535 in_silence_mask: SilenceMask,
536 out_silence_mask: SilenceMask,
537 proc_buffers: ProcBuffers|
538 -> ProcessStatus {
539 let Some(node_entry) = self.nodes.get_mut(node_id.0) else {
540 return ProcessStatus::Bypass;
541 };
542
543 let events = NodeEventList::new(&mut self.event_buffer, &node_entry.event_indices);
544
545 proc_info.in_silence_mask = in_silence_mask;
546 proc_info.out_silence_mask = out_silence_mask;
547
548 let status = node_entry
549 .processor
550 .process(proc_buffers, &proc_info, events);
551
552 node_entry.event_indices.clear();
553
554 status
555 },
556 );
557 }
558}
559
560pub(crate) struct NodeEntry {
561 pub processor: Box<dyn AudioNodeProcessor>,
562 pub event_indices: Vec<u32>,
563}
564
565struct TransportState {
566 transport: MusicalTransport,
567 start_frame: ClockSamples,
568 paused_at_frame: ClockSamples,
569 paused_at_musical_time: MusicalTime,
570 paused: bool,
571 stopped: bool,
572}
573
574pub(crate) enum ContextToProcessorMsg {
575 EventGroup(Vec<NodeEvent>),
576 NewSchedule(Box<ScheduleHeapData>),
577 HardClipOutputs(bool),
578 SetTransport(Option<MusicalTransport>),
579 StartOrRestartTransport,
580 PauseTransport,
581 ResumeTransport,
582 StopTransport,
583}
584
585pub(crate) enum ProcessorToContextMsg {
586 ReturnEventGroup(Vec<NodeEvent>),
587 ReturnSchedule(Box<ScheduleHeapData>),
588}