firewheel_graph/
processor.rs

1use bevy_platform::sync::{atomic::Ordering, Arc};
2use core::{num::NonZeroU32, ops::Range};
3
4use ringbuf::traits::{Consumer, Producer};
5use thunderdome::Arena;
6
7use crate::{
8    context::ClockValues,
9    graph::{NodeHeapData, ScheduleHeapData},
10};
11use firewheel_core::{
12    clock::{ClockSamples, ClockSeconds, MusicalTime, MusicalTransport},
13    dsp::{buffer::ChannelBuffer, declick::DeclickValues},
14    event::{NodeEvent, NodeEventList},
15    node::{
16        AudioNodeProcessor, NodeID, ProcBuffers, ProcInfo, ProcessStatus, StreamStatus,
17        TransportInfo, NUM_SCRATCH_BUFFERS,
18    },
19    SilenceMask, StreamInfo,
20};
21
22pub struct FirewheelProcessor {
23    inner: Option<FirewheelProcessorInner>,
24    drop_tx: ringbuf::HeapProd<FirewheelProcessorInner>,
25}
26
27impl Drop for FirewheelProcessor {
28    fn drop(&mut self) {
29        let Some(mut inner) = self.inner.take() else {
30            return;
31        };
32
33        inner.stream_stopped();
34
35        // TODO: Either wait for `bevy_platform` to implement this method, or
36        // hide this behind a "std" feature flag.
37        if std::thread::panicking() {
38            inner.poisoned = true;
39        }
40
41        let _ = self.drop_tx.try_push(inner);
42    }
43}
44
45impl FirewheelProcessor {
46    pub(crate) fn new(
47        processor: FirewheelProcessorInner,
48        drop_tx: ringbuf::HeapProd<FirewheelProcessorInner>,
49    ) -> Self {
50        Self {
51            inner: Some(processor),
52            drop_tx,
53        }
54    }
55
56    pub fn process_interleaved(
57        &mut self,
58        input: &[f32],
59        output: &mut [f32],
60        num_in_channels: usize,
61        num_out_channels: usize,
62        frames: usize,
63        clock_seconds: ClockSeconds,
64        stream_status: StreamStatus,
65    ) {
66        if let Some(inner) = &mut self.inner {
67            inner.process_interleaved(
68                input,
69                output,
70                num_in_channels,
71                num_out_channels,
72                frames,
73                clock_seconds,
74                stream_status,
75            );
76        }
77    }
78}
79
80pub(crate) struct FirewheelProcessorInner {
81    nodes: Arena<NodeEntry>,
82    schedule_data: Option<Box<ScheduleHeapData>>,
83
84    from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
85    to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
86
87    event_buffer: Vec<NodeEvent>,
88
89    sample_rate: NonZeroU32,
90    sample_rate_recip: f64,
91    max_block_frames: usize,
92
93    clock_samples: ClockSamples,
94    clock_shared: Arc<ClockValues>,
95
96    last_clock_seconds: ClockSeconds,
97    clock_seconds_offset: f64,
98    is_new_stream: bool,
99
100    hard_clip_outputs: bool,
101
102    scratch_buffers: ChannelBuffer<f32, NUM_SCRATCH_BUFFERS>,
103    declick_values: DeclickValues,
104
105    transport: Option<TransportState>,
106
107    /// If a panic occurs while processing, this flag is set to let the
108    /// main thread know that it shouldn't try spawning a new audio stream
109    /// with the shared `Arc<AtomicRefCell<FirewheelProcessorInner>>` object.
110    pub(crate) poisoned: bool,
111}
112
113impl FirewheelProcessorInner {
114    /// Note, this method gets called on the main thread, not the audio thread.
115    pub(crate) fn new(
116        from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
117        to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
118        clock_shared: Arc<ClockValues>,
119        node_capacity: usize,
120        stream_info: &StreamInfo,
121        hard_clip_outputs: bool,
122    ) -> Self {
123        Self {
124            nodes: Arena::with_capacity(node_capacity * 2),
125            schedule_data: None,
126            from_graph_rx,
127            to_graph_tx,
128            event_buffer: Vec::new(),
129            sample_rate: stream_info.sample_rate,
130            sample_rate_recip: stream_info.sample_rate_recip,
131            max_block_frames: stream_info.max_block_frames.get() as usize,
132            clock_samples: ClockSamples(0),
133            clock_shared,
134            last_clock_seconds: ClockSeconds(0.0),
135            clock_seconds_offset: 0.0,
136            is_new_stream: false,
137            hard_clip_outputs,
138            scratch_buffers: ChannelBuffer::new(stream_info.max_block_frames.get() as usize),
139            declick_values: DeclickValues::new(stream_info.declick_frames),
140            transport: None,
141            poisoned: false,
142        }
143    }
144
145    fn stream_stopped(&mut self) {
146        for (_, node) in self.nodes.iter_mut() {
147            node.processor.stream_stopped();
148        }
149    }
150
151    /// Called when a new audio stream has been started to replace the old one.
152    ///
153    /// Note, this method gets called on the main thread, not the audio thread.
154    pub fn new_stream(&mut self, stream_info: &StreamInfo) {
155        for (_, node) in self.nodes.iter_mut() {
156            node.processor.new_stream(stream_info);
157        }
158
159        if self.sample_rate != stream_info.sample_rate {
160            self.sample_rate = stream_info.sample_rate;
161            self.sample_rate_recip = stream_info.sample_rate_recip;
162
163            self.declick_values = DeclickValues::new(stream_info.declick_frames);
164        }
165
166        if self.max_block_frames != stream_info.max_block_frames.get() as usize {
167            self.max_block_frames = stream_info.max_block_frames.get() as usize;
168
169            self.scratch_buffers = ChannelBuffer::new(stream_info.max_block_frames.get() as usize);
170        }
171
172        self.is_new_stream = true;
173    }
174
175    // TODO: Add a `process_deinterleaved` method.
176
177    /// Process the given buffers of audio data.
178    pub fn process_interleaved(
179        &mut self,
180        input: &[f32],
181        output: &mut [f32],
182        num_in_channels: usize,
183        num_out_channels: usize,
184        frames: usize,
185        clock_seconds: ClockSeconds,
186        stream_status: StreamStatus,
187    ) {
188        self.poll_messages();
189
190        let mut clock_samples = self.clock_samples;
191        self.clock_samples += ClockSamples(frames as i64);
192        self.clock_shared
193            .samples
194            .store(self.clock_samples.0, Ordering::Relaxed);
195
196        if self.is_new_stream {
197            self.is_new_stream = false;
198
199            // Apply an offset so that the clock appears to be steady for nodes.
200            self.clock_seconds_offset = self.last_clock_seconds.0 - clock_seconds.0;
201        }
202
203        let mut clock_seconds = ClockSeconds(clock_seconds.0 + self.clock_seconds_offset);
204        self.last_clock_seconds =
205            ClockSeconds(clock_seconds.0 + (frames as f64 * self.sample_rate_recip));
206        self.clock_shared
207            .seconds
208            .store(self.last_clock_seconds.0, Ordering::Relaxed);
209
210        if let Some(transport) = &self.transport {
211            if !transport.stopped && !transport.paused {
212                self.clock_shared.musical.store(
213                    transport
214                        .transport
215                        .sample_to_musical(
216                            self.clock_samples - transport.start_frame,
217                            self.sample_rate.get(),
218                            self.sample_rate_recip,
219                        )
220                        .0,
221                    Ordering::Relaxed,
222                );
223            }
224        }
225
226        if self.schedule_data.is_none() || frames == 0 {
227            output.fill(0.0);
228            //return FirewheelProcessorInnerStatus::Ok;
229            return;
230        };
231
232        assert_eq!(input.len(), frames * num_in_channels);
233        assert_eq!(output.len(), frames * num_out_channels);
234
235        let mut frames_processed = 0;
236        while frames_processed < frames {
237            let block_frames = (frames - frames_processed).min(self.max_block_frames);
238
239            // Prepare graph input buffers.
240            self.schedule_data
241                .as_mut()
242                .unwrap()
243                .schedule
244                .prepare_graph_inputs(
245                    block_frames,
246                    num_in_channels,
247                    |channels: &mut [&mut [f32]]| -> SilenceMask {
248                        firewheel_core::dsp::interleave::deinterleave(
249                            channels,
250                            0,
251                            &input[frames_processed * num_in_channels
252                                ..(frames_processed + block_frames) * num_in_channels],
253                            num_in_channels,
254                            true,
255                        )
256                    },
257                );
258
259            let next_clock_seconds =
260                clock_seconds + ClockSeconds(block_frames as f64 * self.sample_rate_recip);
261
262            self.process_block(
263                block_frames,
264                clock_samples,
265                clock_seconds..next_clock_seconds,
266                stream_status,
267            );
268
269            // Copy the output of the graph to the output buffer.
270            self.schedule_data
271                .as_mut()
272                .unwrap()
273                .schedule
274                .read_graph_outputs(
275                    block_frames,
276                    num_out_channels,
277                    |channels: &[&[f32]], silence_mask| {
278                        firewheel_core::dsp::interleave::interleave(
279                            channels,
280                            0,
281                            &mut output[frames_processed * num_out_channels
282                                ..(frames_processed + block_frames) * num_out_channels],
283                            num_out_channels,
284                            Some(silence_mask),
285                        );
286                    },
287                );
288
289            /*
290            if !self.running {
291                if frames_processed < frames {
292                    output[frames_processed * num_out_channels..].fill(0.0);
293                }
294                break;
295            }
296            */
297
298            frames_processed += block_frames;
299            clock_samples += ClockSamples(block_frames as i64);
300            clock_seconds = next_clock_seconds;
301        }
302
303        if self.hard_clip_outputs {
304            for s in output.iter_mut() {
305                *s = s.fract();
306            }
307        }
308
309        if self.event_buffer.capacity() > 0 {
310            let mut event_group = Vec::new();
311            core::mem::swap(&mut self.event_buffer, &mut event_group);
312
313            let _ = self
314                .to_graph_tx
315                .try_push(ProcessorToContextMsg::ReturnEventGroup(event_group));
316        }
317    }
318
319    fn poll_messages(&mut self) {
320        for msg in self.from_graph_rx.pop_iter() {
321            match msg {
322                ContextToProcessorMsg::EventGroup(mut event_group) => {
323                    let num_existing_events = self.event_buffer.len();
324
325                    if self.event_buffer.capacity() == 0 {
326                        core::mem::swap(&mut self.event_buffer, &mut event_group);
327                    } else {
328                        self.event_buffer.append(&mut event_group);
329
330                        let _ = self
331                            .to_graph_tx
332                            .try_push(ProcessorToContextMsg::ReturnEventGroup(event_group));
333                    }
334
335                    for (i, event) in self.event_buffer[num_existing_events..].iter().enumerate() {
336                        if let Some(node_entry) = self.nodes.get_mut(event.node_id.0) {
337                            node_entry
338                                .event_indices
339                                .push((i + num_existing_events) as u32);
340                        }
341                    }
342                }
343                ContextToProcessorMsg::NewSchedule(mut new_schedule_data) => {
344                    assert_eq!(
345                        new_schedule_data.schedule.max_block_frames(),
346                        self.max_block_frames
347                    );
348
349                    if let Some(mut old_schedule_data) = self.schedule_data.take() {
350                        core::mem::swap(
351                            &mut old_schedule_data.removed_nodes,
352                            &mut new_schedule_data.removed_nodes,
353                        );
354
355                        for node_id in new_schedule_data.nodes_to_remove.iter() {
356                            if let Some(node_entry) = self.nodes.remove(node_id.0) {
357                                old_schedule_data.removed_nodes.push(NodeHeapData {
358                                    id: *node_id,
359                                    processor: node_entry.processor,
360                                    event_buffer_indices: node_entry.event_indices,
361                                });
362                            }
363                        }
364
365                        let _ = self
366                            .to_graph_tx
367                            .try_push(ProcessorToContextMsg::ReturnSchedule(old_schedule_data));
368                    }
369
370                    for n in new_schedule_data.new_node_processors.drain(..) {
371                        assert!(self
372                            .nodes
373                            .insert_at(
374                                n.id.0,
375                                NodeEntry {
376                                    processor: n.processor,
377                                    event_indices: n.event_buffer_indices,
378                                }
379                            )
380                            .is_none());
381                    }
382
383                    self.schedule_data = Some(new_schedule_data);
384                }
385                ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs) => {
386                    self.hard_clip_outputs = hard_clip_outputs;
387                }
388                ContextToProcessorMsg::SetTransport(transport) => {
389                    if let Some(old_transport) = &mut self.transport {
390                        if let Some(new_transport) = &transport {
391                            if !old_transport.stopped {
392                                // Update the playhead so that the new transport resumes after
393                                // where the previous left off.
394
395                                let sample_time = if old_transport.paused {
396                                    old_transport.paused_at_frame - old_transport.start_frame
397                                } else {
398                                    self.clock_samples - old_transport.start_frame
399                                };
400
401                                let current_musical = old_transport.transport.sample_to_musical(
402                                    sample_time,
403                                    self.sample_rate.get(),
404                                    self.sample_rate_recip,
405                                );
406
407                                old_transport.start_frame = self.clock_samples
408                                    - new_transport
409                                        .musical_to_sample(current_musical, self.sample_rate.get());
410
411                                old_transport.paused_at_frame = self.clock_samples;
412                            }
413
414                            old_transport.transport = *new_transport;
415                        } else {
416                            self.transport = None;
417                            self.clock_shared.musical.store(0.0, Ordering::Relaxed);
418                        }
419                    } else {
420                        self.transport = transport.map(|transport| TransportState {
421                            transport,
422                            start_frame: ClockSamples::default(),
423                            paused_at_frame: ClockSamples::default(),
424                            paused_at_musical_time: MusicalTime::default(),
425                            paused: false,
426                            stopped: true,
427                        });
428
429                        self.clock_shared.musical.store(0.0, Ordering::Relaxed);
430                    }
431                }
432                ContextToProcessorMsg::StartOrRestartTransport => {
433                    if let Some(transport) = &mut self.transport {
434                        transport.stopped = false;
435                        transport.paused = false;
436                        transport.start_frame = self.clock_samples;
437                    }
438
439                    self.clock_shared.musical.store(0.0, Ordering::Relaxed);
440                }
441                ContextToProcessorMsg::PauseTransport => {
442                    if let Some(transport) = &mut self.transport {
443                        if !transport.stopped && !transport.paused {
444                            transport.paused = true;
445                            transport.paused_at_frame = self.clock_samples;
446                            transport.paused_at_musical_time =
447                                transport.transport.sample_to_musical(
448                                    self.clock_samples - transport.start_frame,
449                                    self.sample_rate.get(),
450                                    self.sample_rate_recip,
451                                );
452                        }
453                    }
454                }
455                ContextToProcessorMsg::ResumeTransport => {
456                    if let Some(transport) = &mut self.transport {
457                        if !transport.stopped && transport.paused {
458                            transport.paused = false;
459                            transport.start_frame +=
460                                ClockSamples(self.clock_samples.0 - transport.paused_at_frame.0);
461                        }
462                    }
463                }
464                ContextToProcessorMsg::StopTransport => {
465                    if let Some(transport) = &mut self.transport {
466                        transport.stopped = true;
467                    }
468
469                    self.clock_shared.musical.store(0.0, Ordering::Relaxed);
470                }
471            }
472        }
473    }
474
475    fn process_block(
476        &mut self,
477        block_frames: usize,
478        clock_samples: ClockSamples,
479        clock_seconds: Range<ClockSeconds>,
480        stream_status: StreamStatus,
481    ) {
482        if self.schedule_data.is_none() {
483            return;
484        }
485        let schedule_data = self.schedule_data.as_mut().unwrap();
486
487        let mut scratch_buffers = self.scratch_buffers.get_mut(self.max_block_frames);
488
489        let transport_info = if let Some(t) = &self.transport {
490            if t.stopped {
491                None
492            } else {
493                let (start_beat, end_beat) = if t.paused {
494                    (t.paused_at_musical_time, t.paused_at_musical_time)
495                } else {
496                    (
497                        t.transport.sample_to_musical(
498                            clock_samples - t.start_frame,
499                            self.sample_rate.get(),
500                            self.sample_rate_recip,
501                        ),
502                        t.transport.sample_to_musical(
503                            clock_samples - t.start_frame + ClockSamples(block_frames as i64),
504                            self.sample_rate.get(),
505                            self.sample_rate_recip,
506                        ),
507                    )
508                };
509
510                Some(TransportInfo {
511                    musical_clock: start_beat..end_beat,
512                    transport: &t.transport,
513                    paused: t.paused,
514                })
515            }
516        } else {
517            None
518        };
519
520        let mut proc_info = ProcInfo {
521            frames: block_frames,
522            in_silence_mask: SilenceMask::default(),
523            out_silence_mask: SilenceMask::default(),
524            clock_samples,
525            clock_seconds: clock_seconds.clone(),
526            transport_info,
527            stream_status,
528            declick_values: &self.declick_values,
529        };
530
531        schedule_data.schedule.process(
532            block_frames,
533            &mut scratch_buffers,
534            |node_id: NodeID,
535             in_silence_mask: SilenceMask,
536             out_silence_mask: SilenceMask,
537             proc_buffers: ProcBuffers|
538             -> ProcessStatus {
539                let Some(node_entry) = self.nodes.get_mut(node_id.0) else {
540                    return ProcessStatus::Bypass;
541                };
542
543                let events = NodeEventList::new(&mut self.event_buffer, &node_entry.event_indices);
544
545                proc_info.in_silence_mask = in_silence_mask;
546                proc_info.out_silence_mask = out_silence_mask;
547
548                let status = node_entry
549                    .processor
550                    .process(proc_buffers, &proc_info, events);
551
552                node_entry.event_indices.clear();
553
554                status
555            },
556        );
557    }
558}
559
560pub(crate) struct NodeEntry {
561    pub processor: Box<dyn AudioNodeProcessor>,
562    pub event_indices: Vec<u32>,
563}
564
565struct TransportState {
566    transport: MusicalTransport,
567    start_frame: ClockSamples,
568    paused_at_frame: ClockSamples,
569    paused_at_musical_time: MusicalTime,
570    paused: bool,
571    stopped: bool,
572}
573
574pub(crate) enum ContextToProcessorMsg {
575    EventGroup(Vec<NodeEvent>),
576    NewSchedule(Box<ScheduleHeapData>),
577    HardClipOutputs(bool),
578    SetTransport(Option<MusicalTransport>),
579    StartOrRestartTransport,
580    PauseTransport,
581    ResumeTransport,
582    StopTransport,
583}
584
585pub(crate) enum ProcessorToContextMsg {
586    ReturnEventGroup(Vec<NodeEvent>),
587    ReturnSchedule(Box<ScheduleHeapData>),
588}