web_audio_api/render/
thread.rs

1//! Communicates with the control thread and ships audio samples to the hardware
2
3use std::any::Any;
4use std::cell::Cell;
5use std::ops::ControlFlow;
6use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crossbeam_channel::{Receiver, Sender};
11use dasp_sample::FromSample;
12use futures_channel::{mpsc, oneshot};
13use futures_util::StreamExt as _;
14
15use super::AudioRenderQuantum;
16use crate::buffer::AudioBuffer;
17use crate::context::{
18    AudioContextState, AudioNodeId, OfflineAudioContext, OfflineAudioContextCallback,
19};
20use crate::events::{EventDispatch, EventLoop};
21use crate::message::ControlMessage;
22use crate::node::ChannelInterpretation;
23use crate::render::AudioWorkletGlobalScope;
24use crate::{AudioRenderCapacityLoad, RENDER_QUANTUM_SIZE};
25
26use super::graph::Graph;
27
28/// Operations running off the system-level audio callback
29pub(crate) struct RenderThread {
30    graph: Option<Graph>,
31    sample_rate: f32,
32    buffer_size: usize,
33    /// number of channels of the backend stream, i.e. sound card number of
34    /// channels clamped to MAX_CHANNELS
35    number_of_channels: usize,
36    suspended: bool,
37    state: Arc<AtomicU8>,
38    frames_played: Arc<AtomicU64>,
39    receiver: Option<Receiver<ControlMessage>>,
40    buffer_offset: Option<(usize, AudioRenderQuantum)>,
41    load_value_sender: Option<Sender<AudioRenderCapacityLoad>>,
42    event_sender: Sender<EventDispatch>,
43    garbage_collector: Option<llq::Producer<Box<dyn Any + Send>>>,
44}
45
46// SAFETY:
47// The RenderThread is not Send/Sync since it contains `AudioRenderQuantum`s (which use Rc), but
48// these are only accessed within the same thread (the render thread). Due to the cpal constraints
49// we can neither move the RenderThread object into the render thread, nor can we initialize the
50// Rc's in that thread.
51#[allow(clippy::non_send_fields_in_send_ty)]
52unsafe impl Send for Graph {}
53unsafe impl Sync for Graph {}
54unsafe impl Send for RenderThread {}
55unsafe impl Sync for RenderThread {}
56
57impl std::fmt::Debug for RenderThread {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("RenderThread")
60            .field("sample_rate", &self.sample_rate)
61            .field("buffer_size", &self.buffer_size)
62            .field("frames_played", &self.frames_played.load(Ordering::Relaxed))
63            .field("number_of_channels", &self.number_of_channels)
64            .finish_non_exhaustive()
65    }
66}
67
68impl RenderThread {
69    pub fn new(
70        sample_rate: f32,
71        number_of_channels: usize,
72        receiver: Receiver<ControlMessage>,
73        state: Arc<AtomicU8>,
74        frames_played: Arc<AtomicU64>,
75        event_sender: Sender<EventDispatch>,
76    ) -> Self {
77        Self {
78            graph: None,
79            sample_rate,
80            buffer_size: 0,
81            number_of_channels,
82            suspended: false,
83            state,
84            frames_played,
85            receiver: Some(receiver),
86            buffer_offset: None,
87            load_value_sender: None,
88            event_sender,
89            garbage_collector: None,
90        }
91    }
92
93    pub(crate) fn set_load_value_sender(
94        &mut self,
95        load_value_sender: Sender<AudioRenderCapacityLoad>,
96    ) {
97        self.load_value_sender = Some(load_value_sender);
98    }
99
100    pub(crate) fn spawn_garbage_collector_thread(&mut self) {
101        if self.garbage_collector.is_none() {
102            let (gc_producer, gc_consumer) = llq::Queue::new().split();
103            spawn_garbage_collector_thread(gc_consumer);
104            self.garbage_collector = Some(gc_producer);
105        }
106    }
107
108    #[inline(always)]
109    fn handle_control_messages(&mut self) {
110        if self.receiver.is_none() {
111            return;
112        }
113
114        while let Ok(msg) = self.receiver.as_ref().unwrap().try_recv() {
115            let result = self.handle_control_message(msg);
116            if result.is_break() {
117                return; // stop processing
118            }
119        }
120    }
121
122    fn handle_control_message(&mut self, msg: ControlMessage) -> ControlFlow<()> {
123        use ControlMessage::*;
124
125        match msg {
126            RegisterNode {
127                id: node_id,
128                reclaim_id,
129                node,
130                inputs,
131                outputs,
132                channel_config,
133            } => {
134                self.graph.as_mut().unwrap().add_node(
135                    node_id,
136                    reclaim_id,
137                    node,
138                    inputs,
139                    outputs,
140                    channel_config,
141                );
142            }
143            ConnectNode {
144                from,
145                to,
146                output,
147                input,
148            } => {
149                self.graph
150                    .as_mut()
151                    .unwrap()
152                    .add_edge((from, output), (to, input));
153            }
154            DisconnectNode {
155                from,
156                output,
157                to,
158                input,
159            } => {
160                self.graph
161                    .as_mut()
162                    .unwrap()
163                    .remove_edge((from, output), (to, input));
164            }
165            ControlHandleDropped { id } => {
166                self.graph.as_mut().unwrap().mark_control_handle_dropped(id);
167            }
168            MarkCycleBreaker { id } => {
169                self.graph.as_mut().unwrap().mark_cycle_breaker(id);
170            }
171            CloseAndRecycle { sender } => {
172                self.set_state(AudioContextState::Suspended);
173                let _ = sender.send(self.graph.take().unwrap());
174                self.receiver = None;
175                return ControlFlow::Break(()); // no further handling of ctrl msgs
176            }
177            Startup { graph } => {
178                debug_assert!(self.graph.is_none());
179                self.graph = Some(graph);
180                self.set_state(AudioContextState::Running);
181            }
182            NodeMessage { id, mut msg } => {
183                self.graph.as_mut().unwrap().route_message(id, msg.as_mut());
184                if let Some(gc) = self.garbage_collector.as_mut() {
185                    gc.push(msg)
186                }
187            }
188            RunDiagnostics { mut buffer } => {
189                use std::io::Write;
190                writeln!(&mut buffer, "{:#?}", &self).ok();
191                writeln!(&mut buffer, "{:?}", &self.graph).ok();
192                self.event_sender
193                    .try_send(EventDispatch::diagnostics(buffer))
194                    .expect("Unable to send diagnostics - channel is full");
195            }
196            Suspend { notify } => {
197                self.suspended = true;
198                self.set_state(AudioContextState::Suspended);
199                notify.send();
200            }
201            Resume { notify } => {
202                self.suspended = false;
203                self.set_state(AudioContextState::Running);
204                notify.send();
205            }
206            Close { notify } => {
207                self.suspended = true;
208                self.set_state(AudioContextState::Closed);
209                notify.send();
210            }
211
212            SetChannelCount { id, count } => {
213                self.graph.as_mut().unwrap().set_channel_count(id, count);
214            }
215
216            SetChannelCountMode { id, mode } => {
217                self.graph
218                    .as_mut()
219                    .unwrap()
220                    .set_channel_count_mode(id, mode);
221            }
222
223            SetChannelInterpretation { id, interpretation } => {
224                self.graph
225                    .as_mut()
226                    .unwrap()
227                    .set_channel_interpretation(id, interpretation);
228            }
229        }
230
231        ControlFlow::Continue(()) // continue handling more messages
232    }
233
234    // Render method of the `OfflineAudioContext::start_rendering_sync`
235    //
236    // This method is not spec compliant and obviously marked as synchronous, so we
237    // don't launch a thread.
238    //
239    // cf. https://webaudio.github.io/web-audio-api/#dom-offlineaudiocontext-startrendering
240    pub fn render_audiobuffer_sync(
241        mut self,
242        context: &mut OfflineAudioContext,
243        mut suspend_callbacks: Vec<(usize, Box<OfflineAudioContextCallback>)>,
244        event_loop: &EventLoop,
245    ) -> AudioBuffer {
246        let length = context.length();
247        let sample_rate = self.sample_rate;
248
249        // construct a properly sized output buffer
250        let mut buffer = Vec::with_capacity(self.number_of_channels);
251        buffer.resize_with(buffer.capacity(), || Vec::with_capacity(length));
252
253        let num_frames = length.div_ceil(RENDER_QUANTUM_SIZE);
254
255        // Handle initial control messages
256        self.handle_control_messages();
257
258        for quantum in 0..num_frames {
259            // Suspend at given times and run callbacks
260            if suspend_callbacks.first().map(|&(q, _)| q) == Some(quantum) {
261                let callback = suspend_callbacks.remove(0).1;
262                (callback)(context);
263
264                // Handle any control messages that may have been submitted by the callback
265                self.handle_control_messages();
266            }
267
268            self.render_offline_quantum(&mut buffer);
269
270            let events_were_handled = event_loop.handle_pending_events();
271            if events_were_handled {
272                // Handle any control messages that may have been submitted by the handler
273                self.handle_control_messages();
274            }
275        }
276
277        // call destructors of all alive nodes and handle any resulting events
278        self.unload_graph();
279        event_loop.handle_pending_events();
280
281        AudioBuffer::from(buffer, sample_rate)
282    }
283
284    // Render method of the `OfflineAudioContext::start_rendering`
285    //
286    // This is the async interface, as compared to render_audiobuffer_sync
287    //
288    // cf. https://webaudio.github.io/web-audio-api/#dom-offlineaudiocontext-startrendering
289    pub async fn render_audiobuffer(
290        mut self,
291        length: usize,
292        mut suspend_callbacks: Vec<(usize, oneshot::Sender<()>)>,
293        mut resume_receiver: mpsc::Receiver<()>,
294        event_loop: &EventLoop,
295    ) -> AudioBuffer {
296        let sample_rate = self.sample_rate;
297
298        // construct a properly sized output buffer
299        let mut buffer = Vec::with_capacity(self.number_of_channels);
300        buffer.resize_with(buffer.capacity(), || Vec::with_capacity(length));
301
302        let num_frames = length.div_ceil(RENDER_QUANTUM_SIZE);
303
304        // Handle addition/removal of nodes/edges
305        self.handle_control_messages();
306
307        for quantum in 0..num_frames {
308            // Suspend at given times and run callbacks
309            if suspend_callbacks.first().map(|&(q, _)| q) == Some(quantum) {
310                let sender = suspend_callbacks.remove(0).1;
311                sender.send(()).unwrap();
312                resume_receiver.next().await;
313
314                // Handle addition/removal of nodes/edges
315                self.handle_control_messages();
316            }
317
318            self.render_offline_quantum(&mut buffer);
319
320            let events_were_handled = event_loop.handle_pending_events();
321            if events_were_handled {
322                // Handle any control messages that may have been submitted by the handler
323                self.handle_control_messages();
324            }
325        }
326
327        // call destructors of all alive nodes and handle any resulting events
328        self.unload_graph();
329        event_loop.handle_pending_events();
330
331        AudioBuffer::from(buffer, sample_rate)
332    }
333
334    /// Render a single quantum into an AudioBuffer
335    fn render_offline_quantum(&mut self, buffer: &mut [Vec<f32>]) {
336        // Update time
337        let current_frame = self
338            .frames_played
339            .fetch_add(RENDER_QUANTUM_SIZE as u64, Ordering::Relaxed);
340        let current_time = current_frame as f64 / self.sample_rate as f64;
341
342        let scope = AudioWorkletGlobalScope {
343            current_frame,
344            current_time,
345            sample_rate: self.sample_rate,
346            event_sender: self.event_sender.clone(),
347            node_id: Cell::new(AudioNodeId(0)), // placeholder value
348        };
349
350        // Render audio graph
351        let graph = self.graph.as_mut().unwrap();
352
353        // For x64 and aarch, process with denormal floats disabled (for performance, #194)
354        #[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64"))]
355        let rendered = unsafe {
356            // SAFETY: potentially risky - "modifying the masking flags, rounding mode, or
357            // denormals-are-zero mode flags leads to immediate Undefined Behavior: Rust assumes
358            // that these are always in their default state and will optimize accordingly."
359            no_denormals::no_denormals(|| graph.render(&scope))
360        };
361        #[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64")))]
362        let rendered = graph.render(&scope);
363
364        // Use a specialized copyToChannel implementation for performance
365        let remaining = (buffer[0].capacity() - buffer[0].len()).min(RENDER_QUANTUM_SIZE);
366        let channels = rendered.channels();
367        buffer.iter_mut().enumerate().for_each(|(i, b)| {
368            let c = channels
369                .get(i)
370                .map(AsRef::as_ref)
371                // When there are no input nodes for the destination, only a single silent channel
372                // is emitted. So manually pad the missing channels with silence
373                .unwrap_or(&[0.; RENDER_QUANTUM_SIZE]);
374            b.extend_from_slice(&c[..remaining]);
375        });
376    }
377
378    /// Run destructors of all alive nodes in the audio graph
379    fn unload_graph(mut self) {
380        let current_frame = self.frames_played.load(Ordering::Relaxed);
381        let current_time = current_frame as f64 / self.sample_rate as f64;
382
383        let scope = AudioWorkletGlobalScope {
384            current_frame,
385            current_time,
386            sample_rate: self.sample_rate,
387            event_sender: self.event_sender.clone(),
388            node_id: Cell::new(AudioNodeId(0)), // placeholder value
389        };
390        self.graph.take().unwrap().before_drop(&scope);
391    }
392
393    pub fn render<S: FromSample<f32> + Clone>(&mut self, output_buffer: &mut [S]) {
394        // Collect timing information
395        let render_start = Instant::now();
396
397        // Perform actual rendering
398
399        // For x64 and aarch, process with denormal floats disabled (for performance, #194)
400        #[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64"))]
401        unsafe {
402            // SAFETY: potentially risky - "modifying the masking flags, rounding mode, or
403            // denormals-are-zero mode flags leads to immediate Undefined Behavior: Rust assumes
404            // that these are always in their default state and will optimize accordingly."
405            no_denormals::no_denormals(|| self.render_inner(output_buffer))
406        };
407        #[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64")))]
408        self.render_inner(output_buffer);
409
410        // calculate load value and ship to control thread
411        if let Some(load_value_sender) = &self.load_value_sender {
412            let duration = render_start.elapsed().as_micros() as f64 / 1E6;
413            let max_duration = RENDER_QUANTUM_SIZE as f64 / self.sample_rate as f64;
414            let load_value = duration / max_duration;
415            let render_timestamp =
416                self.frames_played.load(Ordering::Relaxed) as f64 / self.sample_rate as f64;
417            let load_value_data = AudioRenderCapacityLoad {
418                render_timestamp,
419                load_value,
420            };
421            let _ = load_value_sender.try_send(load_value_data);
422        }
423    }
424
425    fn render_inner<S: FromSample<f32> + Clone>(&mut self, mut output_buffer: &mut [S]) {
426        self.buffer_size = output_buffer.len();
427
428        // There may be audio frames left over from the previous render call,
429        // if the cpal buffer size did not align with our internal RENDER_QUANTUM_SIZE
430        if let Some((offset, prev_rendered)) = self.buffer_offset.take() {
431            let leftover_len = (RENDER_QUANTUM_SIZE - offset) * self.number_of_channels;
432            // split the leftover frames slice, to fit in `buffer`
433            let (first, next) = output_buffer.split_at_mut(leftover_len.min(output_buffer.len()));
434
435            // copy rendered audio into output slice
436            for i in 0..self.number_of_channels {
437                let output = first.iter_mut().skip(i).step_by(self.number_of_channels);
438                let channel = prev_rendered.channel_data(i)[offset..].iter();
439                for (sample, input) in output.zip(channel) {
440                    let value = S::from_sample_(*input);
441                    *sample = value;
442                }
443            }
444
445            // exit early if we are done filling the buffer with the previously rendered data
446            if next.is_empty() {
447                self.buffer_offset = Some((
448                    offset + first.len() / self.number_of_channels,
449                    prev_rendered,
450                ));
451                return;
452            }
453
454            // if there's still space left in the buffer, continue rendering
455            output_buffer = next;
456        }
457
458        // handle addition/removal of nodes/edges
459        self.handle_control_messages();
460
461        // if the thread is still booting, suspended, or shutting down, fill with silence
462        if self.suspended || !self.graph.as_ref().is_some_and(Graph::is_active) {
463            output_buffer.fill(S::from_sample_(0.));
464            return;
465        }
466
467        // The audio graph is rendered in chunks of RENDER_QUANTUM_SIZE frames.  But some audio backends
468        // may not be able to emit chunks of this size.
469        let chunk_size = RENDER_QUANTUM_SIZE * self.number_of_channels;
470
471        for data in output_buffer.chunks_mut(chunk_size) {
472            // update time
473            let current_frame = self
474                .frames_played
475                .fetch_add(RENDER_QUANTUM_SIZE as u64, Ordering::Relaxed);
476            let current_time = current_frame as f64 / self.sample_rate as f64;
477
478            let scope = AudioWorkletGlobalScope {
479                current_frame,
480                current_time,
481                sample_rate: self.sample_rate,
482                event_sender: self.event_sender.clone(),
483                node_id: Cell::new(AudioNodeId(0)), // placeholder value
484            };
485
486            // render audio graph, clone it in case we need to mutate/store the value later
487            let mut destination_buffer = self.graph.as_mut().unwrap().render(&scope).clone();
488
489            // online AudioContext allows channel count to be less than the number
490            // of channels of the backend stream, i.e. number of channels of the
491            // soundcard clamped to MAX_CHANNELS.
492            if destination_buffer.number_of_channels() < self.number_of_channels {
493                destination_buffer.mix(self.number_of_channels, ChannelInterpretation::Discrete);
494            }
495
496            // copy rendered audio into output slice
497            for i in 0..self.number_of_channels {
498                let output = data.iter_mut().skip(i).step_by(self.number_of_channels);
499                let channel = destination_buffer.channel_data(i).iter();
500                for (sample, input) in output.zip(channel) {
501                    let value = S::from_sample_(*input);
502                    *sample = value;
503                }
504            }
505
506            if data.len() != chunk_size {
507                // this is the last chunk, and it contained less than RENDER_QUANTUM_SIZE samples
508                let channel_offset = data.len() / self.number_of_channels;
509                debug_assert!(channel_offset < RENDER_QUANTUM_SIZE);
510                self.buffer_offset = Some((channel_offset, destination_buffer));
511            }
512
513            // handle addition/removal of nodes/edges
514            self.handle_control_messages();
515        }
516    }
517
518    fn set_state(&self, state: AudioContextState) {
519        self.state.store(state as u8, Ordering::Relaxed);
520        self.event_sender
521            .try_send(EventDispatch::state_change(state))
522            .ok();
523    }
524}
525
526impl Drop for RenderThread {
527    fn drop(&mut self) {
528        if let Some(gc) = self.garbage_collector.as_mut() {
529            gc.push(llq::Node::new(Box::new(TerminateGarbageCollectorThread)))
530        }
531        log::info!("Audio render thread has been dropped");
532    }
533}
534
535// Controls the polling frequency of the garbage collector thread.
536const GARBAGE_COLLECTOR_THREAD_TIMEOUT: Duration = Duration::from_millis(100);
537
538// Poison pill that terminates the garbage collector thread.
539#[derive(Debug)]
540struct TerminateGarbageCollectorThread;
541
542// Spawns a sidecar thread of the `RenderThread` for dropping resources.
543fn spawn_garbage_collector_thread(consumer: llq::Consumer<Box<dyn Any + Send>>) {
544    let _join_handle = std::thread::spawn(move || run_garbage_collector_thread(consumer));
545}
546
547fn run_garbage_collector_thread(mut consumer: llq::Consumer<Box<dyn Any + Send>>) {
548    log::info!("Entering garbage collector thread");
549    loop {
550        if let Some(node) = consumer.pop() {
551            if node
552                .as_ref()
553                .downcast_ref::<TerminateGarbageCollectorThread>()
554                .is_some()
555            {
556                log::info!("Terminating garbage collector thread");
557                break;
558            }
559            // Implicitly drop the received node.
560        } else {
561            std::thread::sleep(GARBAGE_COLLECTOR_THREAD_TIMEOUT);
562        }
563    }
564    log::info!("Exiting garbage collector thread");
565}