firewheel_graph/
processor.rs

1use std::{
2    num::NonZeroU32,
3    ops::Range,
4    sync::{atomic::Ordering, Arc},
5};
6
7use ringbuf::traits::{Consumer, Producer};
8use thunderdome::Arena;
9
10use crate::{
11    context::ClockValues,
12    graph::{NodeHeapData, ScheduleHeapData},
13};
14use firewheel_core::{
15    clock::{ClockSamples, ClockSeconds, MusicalTime, MusicalTransport},
16    dsp::{buffer::ChannelBuffer, declick::DeclickValues},
17    event::{NodeEvent, NodeEventList},
18    node::{
19        AudioNodeProcessor, NodeID, ProcBuffers, ProcInfo, ProcessStatus, StreamStatus,
20        TransportInfo, NUM_SCRATCH_BUFFERS,
21    },
22    SilenceMask, StreamInfo,
23};
24
25pub struct FirewheelProcessor {
26    inner: Option<FirewheelProcessorInner>,
27    drop_tx: ringbuf::HeapProd<FirewheelProcessorInner>,
28}
29
30impl Drop for FirewheelProcessor {
31    fn drop(&mut self) {
32        let Some(mut inner) = self.inner.take() else {
33            return;
34        };
35
36        inner.stream_stopped();
37
38        if std::thread::panicking() {
39            inner.poisoned = true;
40        }
41
42        let _ = self.drop_tx.try_push(inner);
43    }
44}
45
46impl FirewheelProcessor {
47    pub(crate) fn new(
48        processor: FirewheelProcessorInner,
49        drop_tx: ringbuf::HeapProd<FirewheelProcessorInner>,
50    ) -> Self {
51        Self {
52            inner: Some(processor),
53            drop_tx,
54        }
55    }
56
57    pub fn process_interleaved(
58        &mut self,
59        input: &[f32],
60        output: &mut [f32],
61        num_in_channels: usize,
62        num_out_channels: usize,
63        frames: usize,
64        clock_seconds: ClockSeconds,
65        stream_status: StreamStatus,
66    ) {
67        if let Some(inner) = &mut self.inner {
68            inner.process_interleaved(
69                input,
70                output,
71                num_in_channels,
72                num_out_channels,
73                frames,
74                clock_seconds,
75                stream_status,
76            );
77        }
78    }
79}
80
81pub(crate) struct FirewheelProcessorInner {
82    nodes: Arena<NodeEntry>,
83    schedule_data: Option<Box<ScheduleHeapData>>,
84
85    from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
86    to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
87
88    event_buffer: Vec<NodeEvent>,
89
90    sample_rate: NonZeroU32,
91    sample_rate_recip: f64,
92    max_block_frames: usize,
93
94    clock_samples: ClockSamples,
95    clock_shared: Arc<ClockValues>,
96
97    last_clock_seconds: ClockSeconds,
98    clock_seconds_offset: f64,
99    is_new_stream: bool,
100
101    hard_clip_outputs: bool,
102
103    scratch_buffers: ChannelBuffer<f32, NUM_SCRATCH_BUFFERS>,
104    declick_values: DeclickValues,
105
106    transport: Option<TransportState>,
107
108    /// If a panic occurs while processing, this flag is set to let the
109    /// main thread know that it shouldn't try spawning a new audio stream
110    /// with the shared `Arc<AtomicRefCell<FirewheelProcessorInner>>` object.
111    pub(crate) poisoned: bool,
112}
113
114impl FirewheelProcessorInner {
115    /// Note, this method gets called on the main thread, not the audio thread.
116    pub(crate) fn new(
117        from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
118        to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
119        clock_shared: Arc<ClockValues>,
120        node_capacity: usize,
121        stream_info: &StreamInfo,
122        hard_clip_outputs: bool,
123    ) -> Self {
124        Self {
125            nodes: Arena::with_capacity(node_capacity * 2),
126            schedule_data: None,
127            from_graph_rx,
128            to_graph_tx,
129            event_buffer: Vec::new(),
130            sample_rate: stream_info.sample_rate,
131            sample_rate_recip: stream_info.sample_rate_recip,
132            max_block_frames: stream_info.max_block_frames.get() as usize,
133            clock_samples: ClockSamples(0),
134            clock_shared,
135            last_clock_seconds: ClockSeconds(0.0),
136            clock_seconds_offset: 0.0,
137            is_new_stream: false,
138            hard_clip_outputs,
139            scratch_buffers: ChannelBuffer::new(stream_info.max_block_frames.get() as usize),
140            declick_values: DeclickValues::new(stream_info.declick_frames),
141            transport: None,
142            poisoned: false,
143        }
144    }
145
146    fn stream_stopped(&mut self) {
147        for (_, node) in self.nodes.iter_mut() {
148            node.processor.stream_stopped();
149        }
150    }
151
152    /// Called when a new audio stream has been started to replace the old one.
153    ///
154    /// Note, this method gets called on the main thread, not the audio thread.
155    pub fn new_stream(&mut self, stream_info: &StreamInfo) {
156        for (_, node) in self.nodes.iter_mut() {
157            node.processor.new_stream(stream_info);
158        }
159
160        if self.sample_rate != stream_info.sample_rate {
161            self.sample_rate = stream_info.sample_rate;
162            self.sample_rate_recip = stream_info.sample_rate_recip;
163
164            self.declick_values = DeclickValues::new(stream_info.declick_frames);
165        }
166
167        if self.max_block_frames != stream_info.max_block_frames.get() as usize {
168            self.max_block_frames = stream_info.max_block_frames.get() as usize;
169
170            self.scratch_buffers = ChannelBuffer::new(stream_info.max_block_frames.get() as usize);
171        }
172
173        self.is_new_stream = true;
174    }
175
176    // TODO: Add a `process_deinterleaved` method.
177
178    /// Process the given buffers of audio data.
179    pub fn process_interleaved(
180        &mut self,
181        input: &[f32],
182        output: &mut [f32],
183        num_in_channels: usize,
184        num_out_channels: usize,
185        frames: usize,
186        clock_seconds: ClockSeconds,
187        stream_status: StreamStatus,
188    ) {
189        self.poll_messages();
190
191        let mut clock_samples = self.clock_samples;
192        self.clock_samples += ClockSamples(frames as i64);
193        self.clock_shared
194            .samples
195            .store(self.clock_samples.0, Ordering::Relaxed);
196
197        if self.is_new_stream {
198            self.is_new_stream = false;
199
200            // Apply an offset so that the clock appears to be steady for nodes.
201            self.clock_seconds_offset = self.last_clock_seconds.0 - clock_seconds.0;
202        }
203
204        let mut clock_seconds = ClockSeconds(clock_seconds.0 + self.clock_seconds_offset);
205        self.last_clock_seconds =
206            ClockSeconds(clock_seconds.0 + (frames as f64 * self.sample_rate_recip));
207        self.clock_shared
208            .seconds
209            .store(self.last_clock_seconds.0, Ordering::Relaxed);
210
211        if let Some(transport) = &self.transport {
212            if !transport.stopped && !transport.paused {
213                self.clock_shared.musical.store(
214                    transport
215                        .transport
216                        .sample_to_musical(
217                            self.clock_samples - transport.start_frame,
218                            self.sample_rate.get(),
219                            self.sample_rate_recip,
220                        )
221                        .0,
222                    Ordering::Relaxed,
223                );
224            }
225        }
226
227        if self.schedule_data.is_none() || frames == 0 {
228            output.fill(0.0);
229            //return FirewheelProcessorInnerStatus::Ok;
230            return;
231        };
232
233        assert_eq!(input.len(), frames * num_in_channels);
234        assert_eq!(output.len(), frames * num_out_channels);
235
236        let mut frames_processed = 0;
237        while frames_processed < frames {
238            let block_frames = (frames - frames_processed).min(self.max_block_frames);
239
240            // Prepare graph input buffers.
241            self.schedule_data
242                .as_mut()
243                .unwrap()
244                .schedule
245                .prepare_graph_inputs(
246                    block_frames,
247                    num_in_channels,
248                    |channels: &mut [&mut [f32]]| -> SilenceMask {
249                        firewheel_core::dsp::interleave::deinterleave(
250                            channels,
251                            0,
252                            &input[frames_processed * num_in_channels
253                                ..(frames_processed + block_frames) * num_in_channels],
254                            num_in_channels,
255                            true,
256                        )
257                    },
258                );
259
260            let next_clock_seconds =
261                clock_seconds + ClockSeconds(block_frames as f64 * self.sample_rate_recip);
262
263            self.process_block(
264                block_frames,
265                clock_samples,
266                clock_seconds..next_clock_seconds,
267                stream_status,
268            );
269
270            // Copy the output of the graph to the output buffer.
271            self.schedule_data
272                .as_mut()
273                .unwrap()
274                .schedule
275                .read_graph_outputs(
276                    block_frames,
277                    num_out_channels,
278                    |channels: &[&[f32]], silence_mask| {
279                        firewheel_core::dsp::interleave::interleave(
280                            channels,
281                            0,
282                            &mut output[frames_processed * num_out_channels
283                                ..(frames_processed + block_frames) * num_out_channels],
284                            num_out_channels,
285                            Some(silence_mask),
286                        );
287                    },
288                );
289
290            /*
291            if !self.running {
292                if frames_processed < frames {
293                    output[frames_processed * num_out_channels..].fill(0.0);
294                }
295                break;
296            }
297            */
298
299            frames_processed += block_frames;
300            clock_samples += ClockSamples(block_frames as i64);
301            clock_seconds = next_clock_seconds;
302        }
303
304        if self.hard_clip_outputs {
305            for s in output.iter_mut() {
306                *s = s.fract();
307            }
308        }
309
310        if self.event_buffer.capacity() > 0 {
311            let mut event_group = Vec::new();
312            std::mem::swap(&mut self.event_buffer, &mut event_group);
313
314            let _ = self
315                .to_graph_tx
316                .try_push(ProcessorToContextMsg::ReturnEventGroup(event_group));
317        }
318    }
319
320    fn poll_messages(&mut self) {
321        for msg in self.from_graph_rx.pop_iter() {
322            match msg {
323                ContextToProcessorMsg::EventGroup(mut event_group) => {
324                    let num_existing_events = self.event_buffer.len();
325
326                    if self.event_buffer.capacity() == 0 {
327                        std::mem::swap(&mut self.event_buffer, &mut event_group);
328                    } else {
329                        self.event_buffer.append(&mut event_group);
330
331                        let _ = self
332                            .to_graph_tx
333                            .try_push(ProcessorToContextMsg::ReturnEventGroup(event_group));
334                    }
335
336                    for (i, event) in self.event_buffer[num_existing_events..].iter().enumerate() {
337                        if let Some(node_entry) = self.nodes.get_mut(event.node_id.0) {
338                            node_entry
339                                .event_indices
340                                .push((i + num_existing_events) as u32);
341                        }
342                    }
343                }
344                ContextToProcessorMsg::NewSchedule(mut new_schedule_data) => {
345                    assert_eq!(
346                        new_schedule_data.schedule.max_block_frames(),
347                        self.max_block_frames
348                    );
349
350                    if let Some(mut old_schedule_data) = self.schedule_data.take() {
351                        std::mem::swap(
352                            &mut old_schedule_data.removed_nodes,
353                            &mut new_schedule_data.removed_nodes,
354                        );
355
356                        for node_id in new_schedule_data.nodes_to_remove.iter() {
357                            if let Some(node_entry) = self.nodes.remove(node_id.0) {
358                                old_schedule_data.removed_nodes.push(NodeHeapData {
359                                    id: *node_id,
360                                    processor: node_entry.processor,
361                                    event_buffer_indices: node_entry.event_indices,
362                                });
363                            }
364                        }
365
366                        let _ = self
367                            .to_graph_tx
368                            .try_push(ProcessorToContextMsg::ReturnSchedule(old_schedule_data));
369                    }
370
371                    for n in new_schedule_data.new_node_processors.drain(..) {
372                        assert!(self
373                            .nodes
374                            .insert_at(
375                                n.id.0,
376                                NodeEntry {
377                                    processor: n.processor,
378                                    event_indices: n.event_buffer_indices,
379                                }
380                            )
381                            .is_none());
382                    }
383
384                    self.schedule_data = Some(new_schedule_data);
385                }
386                ContextToProcessorMsg::HardClipOutputs(hard_clip_outputs) => {
387                    self.hard_clip_outputs = hard_clip_outputs;
388                }
389                ContextToProcessorMsg::SetTransport(transport) => {
390                    if let Some(old_transport) = &mut self.transport {
391                        if let Some(new_transport) = &transport {
392                            if !old_transport.stopped {
393                                // Update the playhead so that the new transport resumes after
394                                // where the previous left off.
395
396                                let sample_time = if old_transport.paused {
397                                    old_transport.paused_at_frame - old_transport.start_frame
398                                } else {
399                                    self.clock_samples - old_transport.start_frame
400                                };
401
402                                let current_musical = old_transport.transport.sample_to_musical(
403                                    sample_time,
404                                    self.sample_rate.get(),
405                                    self.sample_rate_recip,
406                                );
407
408                                old_transport.start_frame = self.clock_samples
409                                    - new_transport
410                                        .musical_to_sample(current_musical, self.sample_rate.get());
411
412                                old_transport.paused_at_frame = self.clock_samples;
413                            }
414
415                            old_transport.transport = *new_transport;
416                        } else {
417                            self.transport = None;
418                            self.clock_shared.musical.store(0.0, Ordering::Relaxed);
419                        }
420                    } else {
421                        self.transport = transport.map(|transport| TransportState {
422                            transport,
423                            start_frame: ClockSamples::default(),
424                            paused_at_frame: ClockSamples::default(),
425                            paused_at_musical_time: MusicalTime::default(),
426                            paused: false,
427                            stopped: true,
428                        });
429
430                        self.clock_shared.musical.store(0.0, Ordering::Relaxed);
431                    }
432                }
433                ContextToProcessorMsg::StartOrRestartTransport => {
434                    if let Some(transport) = &mut self.transport {
435                        transport.stopped = false;
436                        transport.paused = false;
437                        transport.start_frame = self.clock_samples;
438                    }
439
440                    self.clock_shared.musical.store(0.0, Ordering::Relaxed);
441                }
442                ContextToProcessorMsg::PauseTransport => {
443                    if let Some(transport) = &mut self.transport {
444                        if !transport.stopped && !transport.paused {
445                            transport.paused = true;
446                            transport.paused_at_frame = self.clock_samples;
447                            transport.paused_at_musical_time =
448                                transport.transport.sample_to_musical(
449                                    self.clock_samples - transport.start_frame,
450                                    self.sample_rate.get(),
451                                    self.sample_rate_recip,
452                                );
453                        }
454                    }
455                }
456                ContextToProcessorMsg::ResumeTransport => {
457                    if let Some(transport) = &mut self.transport {
458                        if !transport.stopped && transport.paused {
459                            transport.paused = false;
460                            transport.start_frame +=
461                                ClockSamples(self.clock_samples.0 - transport.paused_at_frame.0);
462                        }
463                    }
464                }
465                ContextToProcessorMsg::StopTransport => {
466                    if let Some(transport) = &mut self.transport {
467                        transport.stopped = true;
468                    }
469
470                    self.clock_shared.musical.store(0.0, Ordering::Relaxed);
471                }
472            }
473        }
474    }
475
476    fn process_block(
477        &mut self,
478        block_frames: usize,
479        clock_samples: ClockSamples,
480        clock_seconds: Range<ClockSeconds>,
481        stream_status: StreamStatus,
482    ) {
483        if self.schedule_data.is_none() {
484            return;
485        }
486        let schedule_data = self.schedule_data.as_mut().unwrap();
487
488        let mut scratch_buffers = self.scratch_buffers.get_mut(self.max_block_frames);
489
490        let transport_info = if let Some(t) = &self.transport {
491            if t.stopped {
492                None
493            } else {
494                let (start_beat, end_beat) = if t.paused {
495                    (t.paused_at_musical_time, t.paused_at_musical_time)
496                } else {
497                    (
498                        t.transport.sample_to_musical(
499                            clock_samples - t.start_frame,
500                            self.sample_rate.get(),
501                            self.sample_rate_recip,
502                        ),
503                        t.transport.sample_to_musical(
504                            clock_samples - t.start_frame + ClockSamples(block_frames as i64),
505                            self.sample_rate.get(),
506                            self.sample_rate_recip,
507                        ),
508                    )
509                };
510
511                Some(TransportInfo {
512                    musical_clock: start_beat..end_beat,
513                    transport: &t.transport,
514                    paused: t.paused,
515                })
516            }
517        } else {
518            None
519        };
520
521        let mut proc_info = ProcInfo {
522            frames: block_frames,
523            in_silence_mask: SilenceMask::default(),
524            out_silence_mask: SilenceMask::default(),
525            clock_samples,
526            clock_seconds: clock_seconds.clone(),
527            transport_info,
528            stream_status,
529            declick_values: &self.declick_values,
530        };
531
532        schedule_data.schedule.process(
533            block_frames,
534            &mut scratch_buffers,
535            |node_id: NodeID,
536             in_silence_mask: SilenceMask,
537             out_silence_mask: SilenceMask,
538             proc_buffers: ProcBuffers|
539             -> ProcessStatus {
540                let Some(node_entry) = self.nodes.get_mut(node_id.0) else {
541                    return ProcessStatus::Bypass;
542                };
543
544                let events = NodeEventList::new(&mut self.event_buffer, &node_entry.event_indices);
545
546                proc_info.in_silence_mask = in_silence_mask;
547                proc_info.out_silence_mask = out_silence_mask;
548
549                let status = node_entry
550                    .processor
551                    .process(proc_buffers, &proc_info, events);
552
553                node_entry.event_indices.clear();
554
555                status
556            },
557        );
558    }
559}
560
561pub(crate) struct NodeEntry {
562    pub processor: Box<dyn AudioNodeProcessor>,
563    pub event_indices: Vec<u32>,
564}
565
566struct TransportState {
567    transport: MusicalTransport,
568    start_frame: ClockSamples,
569    paused_at_frame: ClockSamples,
570    paused_at_musical_time: MusicalTime,
571    paused: bool,
572    stopped: bool,
573}
574
575pub(crate) enum ContextToProcessorMsg {
576    EventGroup(Vec<NodeEvent>),
577    NewSchedule(Box<ScheduleHeapData>),
578    HardClipOutputs(bool),
579    SetTransport(Option<MusicalTransport>),
580    StartOrRestartTransport,
581    PauseTransport,
582    ResumeTransport,
583    StopTransport,
584}
585
586pub(crate) enum ProcessorToContextMsg {
587    ReturnEventGroup(Vec<NodeEvent>),
588    ReturnSchedule(Box<ScheduleHeapData>),
589}