1use std::{
2 num::NonZeroU32,
3 ops::Range,
4 sync::{atomic::Ordering, Arc},
5};
6
7use ringbuf::traits::{Consumer, Producer};
8use thunderdome::Arena;
9
10use crate::{
11 context::ClockValues,
12 graph::{NodeHeapData, ScheduleHeapData},
13};
14use firewheel_core::{
15 clock::{ClockSamples, ClockSeconds, MusicalTime, MusicalTransport},
16 dsp::{buffer::ChannelBuffer, declick::DeclickValues},
17 event::{NodeEvent, NodeEventList},
18 node::{
19 AudioNodeProcessor, NodeID, ProcBuffers, ProcInfo, ProcessStatus, StreamStatus,
20 TransportInfo, NUM_SCRATCH_BUFFERS,
21 },
22 SilenceMask, StreamInfo,
23};
24
25pub struct FirewheelProcessor {
26 inner: Option<FirewheelProcessorInner>,
27 drop_tx: ringbuf::HeapProd<FirewheelProcessorInner>,
28}
29
30impl Drop for FirewheelProcessor {
31 fn drop(&mut self) {
32 let Some(mut inner) = self.inner.take() else {
33 return;
34 };
35
36 inner.stream_stopped();
37
38 if std::thread::panicking() {
39 inner.poisoned = true;
40 }
41
42 let _ = self.drop_tx.try_push(inner);
43 }
44}
45
46impl FirewheelProcessor {
47 pub(crate) fn new(
48 processor: FirewheelProcessorInner,
49 drop_tx: ringbuf::HeapProd<FirewheelProcessorInner>,
50 ) -> Self {
51 Self {
52 inner: Some(processor),
53 drop_tx,
54 }
55 }
56
57 pub fn process_interleaved(
58 &mut self,
59 input: &[f32],
60 output: &mut [f32],
61 num_in_channels: usize,
62 num_out_channels: usize,
63 frames: usize,
64 clock_seconds: ClockSeconds,
65 stream_status: StreamStatus,
66 ) {
67 if let Some(inner) = &mut self.inner {
68 inner.process_interleaved(
69 input,
70 output,
71 num_in_channels,
72 num_out_channels,
73 frames,
74 clock_seconds,
75 stream_status,
76 );
77 }
78 }
79}
80
81pub(crate) struct FirewheelProcessorInner {
82 nodes: Arena<NodeEntry>,
83 schedule_data: Option<Box<ScheduleHeapData>>,
84
85 from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
86 to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
87
88 event_buffer: Vec<NodeEvent>,
89
90 sample_rate: NonZeroU32,
91 sample_rate_recip: f64,
92 max_block_frames: usize,
93
94 clock_samples: ClockSamples,
95 clock_shared: Arc<ClockValues>,
96
97 last_clock_seconds: ClockSeconds,
98 clock_seconds_offset: f64,
99 is_new_stream: bool,
100
101 hard_clip_outputs: bool,
102
103 scratch_buffers: ChannelBuffer<f32, NUM_SCRATCH_BUFFERS>,
104 declick_values: DeclickValues,
105
106 transport: Option<TransportState>,
107
108 pub(crate) poisoned: bool,
112}
113
114impl FirewheelProcessorInner {
115 pub(crate) fn new(
117 from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
118 to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
119 clock_shared: Arc<ClockValues>,
120 node_capacity: usize,
121 stream_info: &StreamInfo,
122 hard_clip_outputs: bool,
123 ) -> Self {
124 Self {
125 nodes: Arena::with_capacity(node_capacity * 2),
126 schedule_data: None,
127 from_graph_rx,
128 to_graph_tx,
129 event_buffer: Vec::new(),
130 sample_rate: stream_info.sample_rate,
131 sample_rate_recip: stream_info.sample_rate_recip,
132 max_block_frames: stream_info.max_block_frames.get() as usize,
133 clock_samples: ClockSamples(0),
134 clock_shared,
135 last_clock_seconds: ClockSeconds(0.0),
136 clock_seconds_offset: 0.0,
137 is_new_stream: false,
138 hard_clip_outputs,
139 scratch_buffers: ChannelBuffer::new(stream_info.max_block_frames.get() as usize),
140 declick_values: DeclickValues::new(stream_info.declick_frames),
141 transport: None,
142 poisoned: false,
143 }
144 }
145
146 fn stream_stopped(&mut self) {
147 for (_, node) in self.nodes.iter_mut() {
148 node.processor.stream_stopped();
149 }
150 }
151
152 pub fn new_stream(&mut self, stream_info: &StreamInfo) {
156 for (_, node) in self.nodes.iter_mut() {
157 node.processor.new_stream(stream_info);
158 }
159
160 if self.sample_rate != stream_info.sample_rate {
161 self.sample_rate = stream_info.sample_rate;
162 self.sample_rate_recip = stream_info.sample_rate_recip;
163
164 self.declick_values = DeclickValues::new(stream_info.declick_frames);
165 }
166
167 if self.max_block_frames != stream_info.max_block_frames.get() as usize {
168 self.max_block_frames = stream_info.max_block_frames.get() as usize;
169
170 self.scratch_buffers = ChannelBuffer::new(stream_info.max_block_frames.get() as usize);
171 }
172
173 self.is_new_stream = true;
174 }
175
176 pub fn process_interleaved(
180 &mut self,
181 input: &[f32],
182 output: &mut [f32],
183 num_in_channels: usize,
184 num_out_channels: usize,
185 frames: usize,
186 clock_seconds: ClockSeconds,
187 stream_status: StreamStatus,
188 ) {
189 self.poll_messages();
190
191 let mut clock_samples = self.clock_samples;
192 self.clock_samples += ClockSamples(frames as i64);
193 self.clock_shared
194 .samples
195 .store(self.clock_samples.0, Ordering::Relaxed);
196
197 if self.is_new_stream {
198 self.is_new_stream = false;
199
200 self.clock_seconds_offset = self.last_clock_seconds.0 - clock_seconds.0;
202 }
203
204 let mut clock_seconds = ClockSeconds(clock_seconds.0 + self.clock_seconds_offset);
205 self.last_clock_seconds =
206 ClockSeconds(clock_seconds.0 + (frames as f64 * self.sample_rate_recip));
207 self.clock_shared
208 .seconds
209 .store(self.last_clock_seconds.0, Ordering::Relaxed);
210
211 if let Some(transport) = &self.transport {
212 if !transport.stopped && !transport.paused {
213 self.clock_shared.musical.store(
214 transport
215 .transport
216 .sample_to_musical(
217 self.clock_samples - transport.start_frame,
218 self.sample_rate.get(),
219 self.sample_rate_recip,
220 )
221 .0,
222 Ordering::Relaxed,
223 );
224 }
225 }
226
227 if self.schedule_data.is_none() || frames == 0 {
228 output.fill(0.0);
229 return;
231 };
232
233 assert_eq!(input.len(), frames * num_in_channels);
234 assert_eq!(output.len(), frames * num_out_channels);
235
236 let mut frames_processed = 0;
237 while frames_processed < frames {
238 let block_frames = (frames - frames_processed).min(self.max_block_frames);
239
240 self.schedule_data
242 .as_mut()
243 .unwrap()
244 .schedule
245 .prepare_graph_inputs(
246 block_frames,
247 num_in_channels,
248 |channels: &mut [&mut [f32]]| -> SilenceMask {
249 firewheel_core::dsp::interleave::deinterleave(
250 channels,
251 0,
252 &input[frames_processed * num_in_channels
253 ..(frames_processed + block_frames) * num_in_channels],
254 num_in_channels,
255 true,
256 )
257 },
258 );
259
260 let next_clock_seconds =
261 clock_seconds + ClockSeconds(block_frames as f64 * self.sample_rate_recip);
262
263 self.process_block(
264 block_frames,
265 clock_samples,
266 clock_seconds..next_clock_seconds,
267 stream_status,
268 );
269
270 self.schedule_data
272 .as_mut()
273 .unwrap()
274 .schedule
275 .read_graph_outputs(
276 block_frames,
277 num_out_channels,
278 |channels: &[&[f32]], silence_mask| {
279 firewheel_core::dsp::interleave::interleave(
280 channels,
281 0,
282 &mut output[frames_processed * num_out_channels
283 ..(frames_processed + block_frames) * num_out_channels],
284 num_out_channels,
285 Some(silence_mask),
286 );
287 },
288 );
289
290 frames_processed += block_frames;
300 clock_samples += ClockSamples(block_frames as i64);
301 clock_seconds = next_clock_seconds;
302 }
303
304 if self.hard_clip_outputs {
305 for s in output.iter_mut() {
306 *s = s.fract();
307 }
308 }
309
310 if self.event_buffer.capacity() > 0 {
311 let mut event_group = Vec::new();
312 std::mem::swap(&mut self.event_buffer, &mut event_group);
313
314 let _ = self
315 .to_graph_tx
316 .try_push(ProcessorToContextMsg::ReturnEventGroup(event_group));
317 }
318 }
319
320 fn poll_messages(&mut self) {
321 for msg in self.from_graph_rx.pop_iter() {
322 match msg {
323 ContextToProcessorMsg::EventGroup(mut event_group) => {
324 let num_existing_events = self.event_buffer.len();
325
326 if self.event_buffer.capacity() == 0 {
327 std::mem::swap(&mut self.event_buffer, &mut event_group);
328 } else {
329 self.event_buffer.append(&mut event_group);
330
331 let _ = self
332 .to_graph_tx
333 .try_push(ProcessorToContextMsg::ReturnEventGroup(event_group));
334 }
335
336 for (i, event) in self.event_buffer[num_existing_events..].iter().enumerate() {
337 if let Some(node_entry) = self.nodes.get_mut(event.node_id.0) {
338 node_entry
339 .event_indices
340 .push((i + num_existing_events) as u32);
341 }
342 }
343 }
344 ContextToProcessorMsg::NewSchedule(mut new_schedule_data) => {
345 assert_eq!(
346 new_schedule_data.schedule.max_block_frames(),
347 self.max_block_frames
348 );
349
350 if let Some(mut old_schedule_data) = self.schedule_data.take() {
351 std::mem::swap(
352 &mut old_schedule_data.removed_nodes,
353 &mut new_schedule_data.removed_nodes,
354 );
355
356 for node_id in new_schedule_data.nodes_to_remove.iter() {
357 if let Some(node_entry) = self.nodes.remove(node_id.0) {
358 old_schedule_data.removed_nodes.push(NodeHeapData {
359 id: *node_id,
360 processor: node_entry.processor,
361 event_buffer_indices: node_entry.event_indices,
362 });
363 }
364 }
365
366 let _ = self
367 .to_graph_tx
368 .try_push(ProcessorToContextMsg::ReturnSchedule(old_schedule_data));
369 }
370
371 for n in new_schedule_data.new_node_processors.drain(..) {
372 assert!(self
373 .nodes
374 .insert_at(
375 n.id.0,
376 NodeEntry {
377 processor: n.processor,
378 event_indices: n.event_buffer_indices,
379 }
380 )
381 .is_none());
382 }
383
384 self.schedule_data = Some(new_schedule_data);
385 }
386 ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs) => {
387 self.hard_clip_outputs = hard_clip_outputs;
388 }
389 ContextToProcessorMsg::SetTransport(transport) => {
390 if let Some(old_transport) = &mut self.transport {
391 if let Some(new_transport) = &transport {
392 if !old_transport.stopped {
393 let sample_time = if old_transport.paused {
397 old_transport.paused_at_frame - old_transport.start_frame
398 } else {
399 self.clock_samples - old_transport.start_frame
400 };
401
402 let current_musical = old_transport.transport.sample_to_musical(
403 sample_time,
404 self.sample_rate.get(),
405 self.sample_rate_recip,
406 );
407
408 old_transport.start_frame = self.clock_samples
409 - new_transport
410 .musical_to_sample(current_musical, self.sample_rate.get());
411
412 old_transport.paused_at_frame = self.clock_samples;
413 }
414
415 old_transport.transport = *new_transport;
416 } else {
417 self.transport = None;
418 self.clock_shared.musical.store(0.0, Ordering::Relaxed);
419 }
420 } else {
421 self.transport = transport.map(|transport| TransportState {
422 transport,
423 start_frame: ClockSamples::default(),
424 paused_at_frame: ClockSamples::default(),
425 paused_at_musical_time: MusicalTime::default(),
426 paused: false,
427 stopped: true,
428 });
429
430 self.clock_shared.musical.store(0.0, Ordering::Relaxed);
431 }
432 }
433 ContextToProcessorMsg::StartOrRestartTransport => {
434 if let Some(transport) = &mut self.transport {
435 transport.stopped = false;
436 transport.paused = false;
437 transport.start_frame = self.clock_samples;
438 }
439
440 self.clock_shared.musical.store(0.0, Ordering::Relaxed);
441 }
442 ContextToProcessorMsg::PauseTransport => {
443 if let Some(transport) = &mut self.transport {
444 if !transport.stopped && !transport.paused {
445 transport.paused = true;
446 transport.paused_at_frame = self.clock_samples;
447 transport.paused_at_musical_time =
448 transport.transport.sample_to_musical(
449 self.clock_samples - transport.start_frame,
450 self.sample_rate.get(),
451 self.sample_rate_recip,
452 );
453 }
454 }
455 }
456 ContextToProcessorMsg::ResumeTransport => {
457 if let Some(transport) = &mut self.transport {
458 if !transport.stopped && transport.paused {
459 transport.paused = false;
460 transport.start_frame +=
461 ClockSamples(self.clock_samples.0 - transport.paused_at_frame.0);
462 }
463 }
464 }
465 ContextToProcessorMsg::StopTransport => {
466 if let Some(transport) = &mut self.transport {
467 transport.stopped = true;
468 }
469
470 self.clock_shared.musical.store(0.0, Ordering::Relaxed);
471 }
472 }
473 }
474 }
475
476 fn process_block(
477 &mut self,
478 block_frames: usize,
479 clock_samples: ClockSamples,
480 clock_seconds: Range<ClockSeconds>,
481 stream_status: StreamStatus,
482 ) {
483 if self.schedule_data.is_none() {
484 return;
485 }
486 let schedule_data = self.schedule_data.as_mut().unwrap();
487
488 let mut scratch_buffers = self.scratch_buffers.get_mut(self.max_block_frames);
489
490 let transport_info = if let Some(t) = &self.transport {
491 if t.stopped {
492 None
493 } else {
494 let (start_beat, end_beat) = if t.paused {
495 (t.paused_at_musical_time, t.paused_at_musical_time)
496 } else {
497 (
498 t.transport.sample_to_musical(
499 clock_samples - t.start_frame,
500 self.sample_rate.get(),
501 self.sample_rate_recip,
502 ),
503 t.transport.sample_to_musical(
504 clock_samples - t.start_frame + ClockSamples(block_frames as i64),
505 self.sample_rate.get(),
506 self.sample_rate_recip,
507 ),
508 )
509 };
510
511 Some(TransportInfo {
512 musical_clock: start_beat..end_beat,
513 transport: &t.transport,
514 paused: t.paused,
515 })
516 }
517 } else {
518 None
519 };
520
521 let mut proc_info = ProcInfo {
522 frames: block_frames,
523 in_silence_mask: SilenceMask::default(),
524 out_silence_mask: SilenceMask::default(),
525 clock_samples,
526 clock_seconds: clock_seconds.clone(),
527 transport_info,
528 stream_status,
529 declick_values: &self.declick_values,
530 };
531
532 schedule_data.schedule.process(
533 block_frames,
534 &mut scratch_buffers,
535 |node_id: NodeID,
536 in_silence_mask: SilenceMask,
537 out_silence_mask: SilenceMask,
538 proc_buffers: ProcBuffers|
539 -> ProcessStatus {
540 let Some(node_entry) = self.nodes.get_mut(node_id.0) else {
541 return ProcessStatus::Bypass;
542 };
543
544 let events = NodeEventList::new(&mut self.event_buffer, &node_entry.event_indices);
545
546 proc_info.in_silence_mask = in_silence_mask;
547 proc_info.out_silence_mask = out_silence_mask;
548
549 let status = node_entry
550 .processor
551 .process(proc_buffers, &proc_info, events);
552
553 node_entry.event_indices.clear();
554
555 status
556 },
557 );
558 }
559}
560
561pub(crate) struct NodeEntry {
562 pub processor: Box<dyn AudioNodeProcessor>,
563 pub event_indices: Vec<u32>,
564}
565
566struct TransportState {
567 transport: MusicalTransport,
568 start_frame: ClockSamples,
569 paused_at_frame: ClockSamples,
570 paused_at_musical_time: MusicalTime,
571 paused: bool,
572 stopped: bool,
573}
574
575pub(crate) enum ContextToProcessorMsg {
576 EventGroup(Vec<NodeEvent>),
577 NewSchedule(Box<ScheduleHeapData>),
578 HardClipOutputs(bool),
579 SetTransport(Option<MusicalTransport>),
580 StartOrRestartTransport,
581 PauseTransport,
582 ResumeTransport,
583 StopTransport,
584}
585
586pub(crate) enum ProcessorToContextMsg {
587 ReturnEventGroup(Vec<NodeEvent>),
588 ReturnSchedule(Box<ScheduleHeapData>),
589}