1use std::any::Any;
4use std::cell::Cell;
5use std::ops::ControlFlow;
6use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crossbeam_channel::{Receiver, Sender};
11use dasp_sample::FromSample;
12use futures_channel::{mpsc, oneshot};
13use futures_util::StreamExt as _;
14
15use super::AudioRenderQuantum;
16use crate::buffer::AudioBuffer;
17use crate::context::{
18 AudioContextState, AudioNodeId, OfflineAudioContext, OfflineAudioContextCallback,
19};
20use crate::events::{EventDispatch, EventLoop};
21use crate::message::ControlMessage;
22use crate::node::ChannelInterpretation;
23use crate::render::AudioWorkletGlobalScope;
24use crate::{AudioRenderCapacityLoad, RENDER_QUANTUM_SIZE};
25
26use super::graph::Graph;
27
28pub(crate) struct RenderThread {
30 graph: Option<Graph>,
31 sample_rate: f32,
32 buffer_size: usize,
33 number_of_channels: usize,
36 suspended: bool,
37 state: Arc<AtomicU8>,
38 frames_played: Arc<AtomicU64>,
39 receiver: Option<Receiver<ControlMessage>>,
40 buffer_offset: Option<(usize, AudioRenderQuantum)>,
41 load_value_sender: Option<Sender<AudioRenderCapacityLoad>>,
42 event_sender: Sender<EventDispatch>,
43 garbage_collector: Option<llq::Producer<Box<dyn Any + Send>>>,
44}
45
46#[allow(clippy::non_send_fields_in_send_ty)]
52unsafe impl Send for Graph {}
53unsafe impl Sync for Graph {}
54unsafe impl Send for RenderThread {}
55unsafe impl Sync for RenderThread {}
56
57impl std::fmt::Debug for RenderThread {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("RenderThread")
60 .field("sample_rate", &self.sample_rate)
61 .field("buffer_size", &self.buffer_size)
62 .field("frames_played", &self.frames_played.load(Ordering::Relaxed))
63 .field("number_of_channels", &self.number_of_channels)
64 .finish_non_exhaustive()
65 }
66}
67
68impl RenderThread {
69 pub fn new(
70 sample_rate: f32,
71 number_of_channels: usize,
72 receiver: Receiver<ControlMessage>,
73 state: Arc<AtomicU8>,
74 frames_played: Arc<AtomicU64>,
75 event_sender: Sender<EventDispatch>,
76 ) -> Self {
77 Self {
78 graph: None,
79 sample_rate,
80 buffer_size: 0,
81 number_of_channels,
82 suspended: false,
83 state,
84 frames_played,
85 receiver: Some(receiver),
86 buffer_offset: None,
87 load_value_sender: None,
88 event_sender,
89 garbage_collector: None,
90 }
91 }
92
93 pub(crate) fn set_load_value_sender(
94 &mut self,
95 load_value_sender: Sender<AudioRenderCapacityLoad>,
96 ) {
97 self.load_value_sender = Some(load_value_sender);
98 }
99
100 pub(crate) fn spawn_garbage_collector_thread(&mut self) {
101 if self.garbage_collector.is_none() {
102 let (gc_producer, gc_consumer) = llq::Queue::new().split();
103 spawn_garbage_collector_thread(gc_consumer);
104 self.garbage_collector = Some(gc_producer);
105 }
106 }
107
108 #[inline(always)]
109 fn handle_control_messages(&mut self) {
110 if self.receiver.is_none() {
111 return;
112 }
113
114 while let Ok(msg) = self.receiver.as_ref().unwrap().try_recv() {
115 let result = self.handle_control_message(msg);
116 if result.is_break() {
117 return; }
119 }
120 }
121
122 fn handle_control_message(&mut self, msg: ControlMessage) -> ControlFlow<()> {
123 use ControlMessage::*;
124
125 match msg {
126 RegisterNode {
127 id: node_id,
128 reclaim_id,
129 node,
130 inputs,
131 outputs,
132 channel_config,
133 } => {
134 self.graph.as_mut().unwrap().add_node(
135 node_id,
136 reclaim_id,
137 node,
138 inputs,
139 outputs,
140 channel_config,
141 );
142 }
143 ConnectNode {
144 from,
145 to,
146 output,
147 input,
148 } => {
149 self.graph
150 .as_mut()
151 .unwrap()
152 .add_edge((from, output), (to, input));
153 }
154 DisconnectNode {
155 from,
156 output,
157 to,
158 input,
159 } => {
160 self.graph
161 .as_mut()
162 .unwrap()
163 .remove_edge((from, output), (to, input));
164 }
165 ControlHandleDropped { id } => {
166 self.graph.as_mut().unwrap().mark_control_handle_dropped(id);
167 }
168 MarkCycleBreaker { id } => {
169 self.graph.as_mut().unwrap().mark_cycle_breaker(id);
170 }
171 CloseAndRecycle { sender } => {
172 self.set_state(AudioContextState::Suspended);
173 let _ = sender.send(self.graph.take().unwrap());
174 self.receiver = None;
175 return ControlFlow::Break(()); }
177 Startup { graph } => {
178 debug_assert!(self.graph.is_none());
179 self.graph = Some(graph);
180 self.set_state(AudioContextState::Running);
181 }
182 NodeMessage { id, mut msg } => {
183 self.graph.as_mut().unwrap().route_message(id, msg.as_mut());
184 if let Some(gc) = self.garbage_collector.as_mut() {
185 gc.push(msg)
186 }
187 }
188 RunDiagnostics { mut buffer } => {
189 use std::io::Write;
190 writeln!(&mut buffer, "{:#?}", &self).ok();
191 writeln!(&mut buffer, "{:?}", &self.graph).ok();
192 self.event_sender
193 .try_send(EventDispatch::diagnostics(buffer))
194 .expect("Unable to send diagnostics - channel is full");
195 }
196 Suspend { notify } => {
197 self.suspended = true;
198 self.set_state(AudioContextState::Suspended);
199 notify.send();
200 }
201 Resume { notify } => {
202 self.suspended = false;
203 self.set_state(AudioContextState::Running);
204 notify.send();
205 }
206 Close { notify } => {
207 self.suspended = true;
208 self.set_state(AudioContextState::Closed);
209 notify.send();
210 }
211
212 SetChannelCount { id, count } => {
213 self.graph.as_mut().unwrap().set_channel_count(id, count);
214 }
215
216 SetChannelCountMode { id, mode } => {
217 self.graph
218 .as_mut()
219 .unwrap()
220 .set_channel_count_mode(id, mode);
221 }
222
223 SetChannelInterpretation { id, interpretation } => {
224 self.graph
225 .as_mut()
226 .unwrap()
227 .set_channel_interpretation(id, interpretation);
228 }
229 }
230
231 ControlFlow::Continue(()) }
233
234 pub fn render_audiobuffer_sync(
241 mut self,
242 context: &mut OfflineAudioContext,
243 mut suspend_callbacks: Vec<(usize, Box<OfflineAudioContextCallback>)>,
244 event_loop: &EventLoop,
245 ) -> AudioBuffer {
246 let length = context.length();
247 let sample_rate = self.sample_rate;
248
249 let mut buffer = Vec::with_capacity(self.number_of_channels);
251 buffer.resize_with(buffer.capacity(), || Vec::with_capacity(length));
252
253 let num_frames = length.div_ceil(RENDER_QUANTUM_SIZE);
254
255 self.handle_control_messages();
257
258 for quantum in 0..num_frames {
259 if suspend_callbacks.first().map(|&(q, _)| q) == Some(quantum) {
261 let callback = suspend_callbacks.remove(0).1;
262 (callback)(context);
263
264 self.handle_control_messages();
266 }
267
268 self.render_offline_quantum(&mut buffer);
269
270 let events_were_handled = event_loop.handle_pending_events();
271 if events_were_handled {
272 self.handle_control_messages();
274 }
275 }
276
277 self.unload_graph();
279 event_loop.handle_pending_events();
280
281 AudioBuffer::from(buffer, sample_rate)
282 }
283
284 pub async fn render_audiobuffer(
290 mut self,
291 length: usize,
292 mut suspend_callbacks: Vec<(usize, oneshot::Sender<()>)>,
293 mut resume_receiver: mpsc::Receiver<()>,
294 event_loop: &EventLoop,
295 ) -> AudioBuffer {
296 let sample_rate = self.sample_rate;
297
298 let mut buffer = Vec::with_capacity(self.number_of_channels);
300 buffer.resize_with(buffer.capacity(), || Vec::with_capacity(length));
301
302 let num_frames = length.div_ceil(RENDER_QUANTUM_SIZE);
303
304 self.handle_control_messages();
306
307 for quantum in 0..num_frames {
308 if suspend_callbacks.first().map(|&(q, _)| q) == Some(quantum) {
310 let sender = suspend_callbacks.remove(0).1;
311 sender.send(()).unwrap();
312 resume_receiver.next().await;
313
314 self.handle_control_messages();
316 }
317
318 self.render_offline_quantum(&mut buffer);
319
320 let events_were_handled = event_loop.handle_pending_events();
321 if events_were_handled {
322 self.handle_control_messages();
324 }
325 }
326
327 self.unload_graph();
329 event_loop.handle_pending_events();
330
331 AudioBuffer::from(buffer, sample_rate)
332 }
333
334 fn render_offline_quantum(&mut self, buffer: &mut [Vec<f32>]) {
336 let current_frame = self
338 .frames_played
339 .fetch_add(RENDER_QUANTUM_SIZE as u64, Ordering::Relaxed);
340 let current_time = current_frame as f64 / self.sample_rate as f64;
341
342 let scope = AudioWorkletGlobalScope {
343 current_frame,
344 current_time,
345 sample_rate: self.sample_rate,
346 event_sender: self.event_sender.clone(),
347 node_id: Cell::new(AudioNodeId(0)), };
349
350 let graph = self.graph.as_mut().unwrap();
352
353 #[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64"))]
355 let rendered = unsafe {
356 no_denormals::no_denormals(|| graph.render(&scope))
360 };
361 #[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64")))]
362 let rendered = graph.render(&scope);
363
364 let remaining = (buffer[0].capacity() - buffer[0].len()).min(RENDER_QUANTUM_SIZE);
366 let channels = rendered.channels();
367 buffer.iter_mut().enumerate().for_each(|(i, b)| {
368 let c = channels
369 .get(i)
370 .map(AsRef::as_ref)
371 .unwrap_or(&[0.; RENDER_QUANTUM_SIZE]);
374 b.extend_from_slice(&c[..remaining]);
375 });
376 }
377
378 fn unload_graph(mut self) {
380 let current_frame = self.frames_played.load(Ordering::Relaxed);
381 let current_time = current_frame as f64 / self.sample_rate as f64;
382
383 let scope = AudioWorkletGlobalScope {
384 current_frame,
385 current_time,
386 sample_rate: self.sample_rate,
387 event_sender: self.event_sender.clone(),
388 node_id: Cell::new(AudioNodeId(0)), };
390 self.graph.take().unwrap().before_drop(&scope);
391 }
392
393 pub fn render<S: FromSample<f32> + Clone>(&mut self, output_buffer: &mut [S]) {
394 let render_start = Instant::now();
396
397 #[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64"))]
401 unsafe {
402 no_denormals::no_denormals(|| self.render_inner(output_buffer))
406 };
407 #[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64")))]
408 self.render_inner(output_buffer);
409
410 if let Some(load_value_sender) = &self.load_value_sender {
412 let duration = render_start.elapsed().as_micros() as f64 / 1E6;
413 let max_duration = RENDER_QUANTUM_SIZE as f64 / self.sample_rate as f64;
414 let load_value = duration / max_duration;
415 let render_timestamp =
416 self.frames_played.load(Ordering::Relaxed) as f64 / self.sample_rate as f64;
417 let load_value_data = AudioRenderCapacityLoad {
418 render_timestamp,
419 load_value,
420 };
421 let _ = load_value_sender.try_send(load_value_data);
422 }
423 }
424
425 fn render_inner<S: FromSample<f32> + Clone>(&mut self, mut output_buffer: &mut [S]) {
426 self.buffer_size = output_buffer.len();
427
428 if let Some((offset, prev_rendered)) = self.buffer_offset.take() {
431 let leftover_len = (RENDER_QUANTUM_SIZE - offset) * self.number_of_channels;
432 let (first, next) = output_buffer.split_at_mut(leftover_len.min(output_buffer.len()));
434
435 for i in 0..self.number_of_channels {
437 let output = first.iter_mut().skip(i).step_by(self.number_of_channels);
438 let channel = prev_rendered.channel_data(i)[offset..].iter();
439 for (sample, input) in output.zip(channel) {
440 let value = S::from_sample_(*input);
441 *sample = value;
442 }
443 }
444
445 if next.is_empty() {
447 self.buffer_offset = Some((
448 offset + first.len() / self.number_of_channels,
449 prev_rendered,
450 ));
451 return;
452 }
453
454 output_buffer = next;
456 }
457
458 self.handle_control_messages();
460
461 if self.suspended || !self.graph.as_ref().is_some_and(Graph::is_active) {
463 output_buffer.fill(S::from_sample_(0.));
464 return;
465 }
466
467 let chunk_size = RENDER_QUANTUM_SIZE * self.number_of_channels;
470
471 for data in output_buffer.chunks_mut(chunk_size) {
472 let current_frame = self
474 .frames_played
475 .fetch_add(RENDER_QUANTUM_SIZE as u64, Ordering::Relaxed);
476 let current_time = current_frame as f64 / self.sample_rate as f64;
477
478 let scope = AudioWorkletGlobalScope {
479 current_frame,
480 current_time,
481 sample_rate: self.sample_rate,
482 event_sender: self.event_sender.clone(),
483 node_id: Cell::new(AudioNodeId(0)), };
485
486 let mut destination_buffer = self.graph.as_mut().unwrap().render(&scope).clone();
488
489 if destination_buffer.number_of_channels() < self.number_of_channels {
493 destination_buffer.mix(self.number_of_channels, ChannelInterpretation::Discrete);
494 }
495
496 for i in 0..self.number_of_channels {
498 let output = data.iter_mut().skip(i).step_by(self.number_of_channels);
499 let channel = destination_buffer.channel_data(i).iter();
500 for (sample, input) in output.zip(channel) {
501 let value = S::from_sample_(*input);
502 *sample = value;
503 }
504 }
505
506 if data.len() != chunk_size {
507 let channel_offset = data.len() / self.number_of_channels;
509 debug_assert!(channel_offset < RENDER_QUANTUM_SIZE);
510 self.buffer_offset = Some((channel_offset, destination_buffer));
511 }
512
513 self.handle_control_messages();
515 }
516 }
517
518 fn set_state(&self, state: AudioContextState) {
519 self.state.store(state as u8, Ordering::Relaxed);
520 self.event_sender
521 .try_send(EventDispatch::state_change(state))
522 .ok();
523 }
524}
525
526impl Drop for RenderThread {
527 fn drop(&mut self) {
528 if let Some(gc) = self.garbage_collector.as_mut() {
529 gc.push(llq::Node::new(Box::new(TerminateGarbageCollectorThread)))
530 }
531 log::info!("Audio render thread has been dropped");
532 }
533}
534
535const GARBAGE_COLLECTOR_THREAD_TIMEOUT: Duration = Duration::from_millis(100);
537
538#[derive(Debug)]
540struct TerminateGarbageCollectorThread;
541
542fn spawn_garbage_collector_thread(consumer: llq::Consumer<Box<dyn Any + Send>>) {
544 let _join_handle = std::thread::spawn(move || run_garbage_collector_thread(consumer));
545}
546
547fn run_garbage_collector_thread(mut consumer: llq::Consumer<Box<dyn Any + Send>>) {
548 log::info!("Entering garbage collector thread");
549 loop {
550 if let Some(node) = consumer.pop() {
551 if node
552 .as_ref()
553 .downcast_ref::<TerminateGarbageCollectorThread>()
554 .is_some()
555 {
556 log::info!("Terminating garbage collector thread");
557 break;
558 }
559 } else {
561 std::thread::sleep(GARBAGE_COLLECTOR_THREAD_TIMEOUT);
562 }
563 }
564 log::info!("Exiting garbage collector thread");
565}