firewheel_graph/
processor.rs

1use core::{num::NonZeroU32, usize};
2
3use ringbuf::traits::Producer;
4use thunderdome::Arena;
5
6#[cfg(not(feature = "std"))]
7use bevy_platform::prelude::{Box, Vec};
8
9use firewheel_core::{
10    clock::InstantSamples,
11    dsp::{buffer::ChannelBuffer, declick::DeclickValues},
12    event::{NodeEvent, ProcEventsIndex},
13    log::RealtimeLogger,
14    node::{AudioNodeProcessor, ProcExtra, ProcStore},
15    StreamInfo,
16};
17
18use crate::{
19    backend::{AudioBackend, BackendProcessInfo},
20    graph::ScheduleHeapData,
21    processor::event_scheduler::{EventScheduler, NodeEventSchedulerData},
22};
23
24#[cfg(feature = "scheduled_events")]
25use crate::context::ClearScheduledEventsType;
26#[cfg(feature = "scheduled_events")]
27use firewheel_core::node::NodeID;
28#[cfg(feature = "scheduled_events")]
29use smallvec::SmallVec;
30
31#[cfg(feature = "musical_transport")]
32use firewheel_core::clock::{InstantMusical, TransportState};
33
34mod event_scheduler;
35mod handle_messages;
36mod process;
37
38#[cfg(feature = "musical_transport")]
39mod transport;
40#[cfg(feature = "musical_transport")]
41use transport::ProcTransportState;
42
43pub struct FirewheelProcessor<B: AudioBackend> {
44    inner: Option<FirewheelProcessorInner<B>>,
45    drop_tx: ringbuf::HeapProd<FirewheelProcessorInner<B>>,
46}
47
48impl<B: AudioBackend> Drop for FirewheelProcessor<B> {
49    fn drop(&mut self) {
50        let Some(mut inner) = self.inner.take() else {
51            return;
52        };
53
54        inner.stream_stopped();
55
56        // TODO: Remove this feature gate if `bevy_platform` implements this.
57        #[cfg(feature = "std")]
58        if std::thread::panicking() {
59            inner.poisoned = true;
60        }
61
62        let _ = self.drop_tx.try_push(inner);
63    }
64}
65
66impl<B: AudioBackend> FirewheelProcessor<B> {
67    pub(crate) fn new(
68        processor: FirewheelProcessorInner<B>,
69        drop_tx: ringbuf::HeapProd<FirewheelProcessorInner<B>>,
70    ) -> Self {
71        Self {
72            inner: Some(processor),
73            drop_tx,
74        }
75    }
76
77    pub fn process_interleaved(
78        &mut self,
79        input: &[f32],
80        output: &mut [f32],
81        info: BackendProcessInfo<B>,
82    ) {
83        if let Some(inner) = &mut self.inner {
84            inner.process_interleaved(input, output, info);
85        }
86    }
87}
88
89pub(crate) struct FirewheelProcessorInner<B: AudioBackend> {
90    nodes: Arena<NodeEntry>,
91    schedule_data: Option<Box<ScheduleHeapData>>,
92
93    from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
94    to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
95
96    event_scheduler: EventScheduler,
97    proc_event_queue: Vec<ProcEventsIndex>,
98
99    sample_rate: NonZeroU32,
100    sample_rate_recip: f64,
101    max_block_frames: usize,
102
103    clock_samples: InstantSamples,
104    shared_clock_input: triple_buffer::Input<SharedClock<B::Instant>>,
105
106    #[cfg(feature = "musical_transport")]
107    proc_transport_state: ProcTransportState,
108
109    hard_clip_outputs: bool,
110
111    pub(crate) extra: ProcExtra,
112
113    /// If a panic occurs while processing, this flag is set to let the
114    /// main thread know that it shouldn't try spawning a new audio stream
115    /// with the shared `Arc<AtomicRefCell<FirewheelProcessorInner>>` object.
116    pub(crate) poisoned: bool,
117    debug_force_clear_buffers: bool,
118}
119
120impl<B: AudioBackend> FirewheelProcessorInner<B> {
121    /// Note, this method gets called on the main thread, not the audio thread.
122    pub(crate) fn new(
123        from_graph_rx: ringbuf::HeapCons<ContextToProcessorMsg>,
124        to_graph_tx: ringbuf::HeapProd<ProcessorToContextMsg>,
125        shared_clock_input: triple_buffer::Input<SharedClock<B::Instant>>,
126        immediate_event_buffer_capacity: usize,
127        #[cfg(feature = "scheduled_events")] scheduled_event_buffer_capacity: usize,
128        node_event_buffer_capacity: usize,
129        stream_info: &StreamInfo,
130        hard_clip_outputs: bool,
131        buffer_out_of_space_mode: BufferOutOfSpaceMode,
132        logger: RealtimeLogger,
133        debug_force_clear_buffers: bool,
134        store: ProcStore,
135    ) -> Self {
136        Self {
137            nodes: Arena::new(),
138            schedule_data: None,
139            from_graph_rx,
140            to_graph_tx,
141            event_scheduler: EventScheduler::new(
142                immediate_event_buffer_capacity,
143                #[cfg(feature = "scheduled_events")]
144                scheduled_event_buffer_capacity,
145                buffer_out_of_space_mode,
146            ),
147            proc_event_queue: Vec::with_capacity(node_event_buffer_capacity),
148            sample_rate: stream_info.sample_rate,
149            sample_rate_recip: stream_info.sample_rate_recip,
150            max_block_frames: stream_info.max_block_frames.get() as usize,
151            clock_samples: InstantSamples(0),
152            shared_clock_input,
153            #[cfg(feature = "musical_transport")]
154            proc_transport_state: ProcTransportState::new(),
155            hard_clip_outputs,
156            extra: ProcExtra {
157                scratch_buffers: ChannelBuffer::new(stream_info.max_block_frames.get() as usize),
158                declick_values: DeclickValues::new(stream_info.declick_frames),
159                logger,
160                store,
161            },
162            poisoned: false,
163            debug_force_clear_buffers,
164        }
165    }
166}
167
168pub(crate) struct NodeEntry {
169    pub processor: Box<dyn AudioNodeProcessor>,
170    pub prev_output_was_silent: bool,
171
172    event_data: NodeEventSchedulerData,
173}
174
175pub(crate) enum ContextToProcessorMsg {
176    EventGroup(Vec<NodeEvent>),
177    NewSchedule(Box<ScheduleHeapData>),
178    HardClipOutputs(bool),
179    #[cfg(feature = "musical_transport")]
180    SetTransportState(Box<TransportState>),
181    #[cfg(feature = "scheduled_events")]
182    ClearScheduledEvents(SmallVec<[ClearScheduledEventsEvent; 1]>),
183}
184
185pub(crate) enum ProcessorToContextMsg {
186    ReturnEventGroup(Vec<NodeEvent>),
187    ReturnSchedule(Box<ScheduleHeapData>),
188    #[cfg(feature = "musical_transport")]
189    ReturnTransportState(Box<TransportState>),
190    #[cfg(feature = "scheduled_events")]
191    ReturnClearScheduledEvents(SmallVec<[ClearScheduledEventsEvent; 1]>),
192}
193
194#[cfg(feature = "scheduled_events")]
195pub(crate) struct ClearScheduledEventsEvent {
196    /// If `None`, then clear events for all nodes.
197    pub node_id: Option<NodeID>,
198    pub event_type: ClearScheduledEventsType,
199}
200
201#[derive(Clone)]
202pub(crate) struct SharedClock<I: Clone> {
203    pub clock_samples: InstantSamples,
204    #[cfg(feature = "musical_transport")]
205    pub current_playhead: Option<InstantMusical>,
206    #[cfg(feature = "musical_transport")]
207    pub speed_multiplier: f64,
208    #[cfg(feature = "musical_transport")]
209    pub transport_is_playing: bool,
210    pub process_timestamp: Option<I>,
211}
212
213impl<I: Clone> Default for SharedClock<I> {
214    fn default() -> Self {
215        Self {
216            clock_samples: InstantSamples(0),
217            #[cfg(feature = "musical_transport")]
218            current_playhead: None,
219            #[cfg(feature = "musical_transport")]
220            speed_multiplier: 1.0,
221            #[cfg(feature = "musical_transport")]
222            transport_is_playing: false,
223            process_timestamp: None,
224        }
225    }
226}
227
228/// How to handle event buffers on the audio thread running out of space.
229#[derive(Default, Debug, Clone, Copy, PartialEq, PartialOrd)]
230#[cfg_attr(feature = "bevy_reflect", derive(bevy_reflect::Reflect))]
231#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
232pub enum BufferOutOfSpaceMode {
233    #[default]
234    /// If an event buffer on the audio thread ran out of space to fit new
235    /// events, reallocate on the audio thread to fit the new items. If this
236    /// happens, it may cause underruns (audio glitches), and a warning will
237    /// be logged.
238    AllocateOnAudioThread,
239    /// If an event buffer on the audio thread ran out of space to fit new
240    /// events, then panic.
241    Panic,
242    /// If an event buffer on the audio thread ran out of space to fit new
243    /// events, drop those events to avoid allocating on the audio thread.
244    /// If this happens, a warning will be logged.
245    ///
246    /// (Not generally recommended, but the option is here if you want it.)
247    DropEvents,
248}