use std::any::Any;
use std::cell::Cell;
use std::ops::ControlFlow;
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crossbeam_channel::{Receiver, Sender};
use dasp_sample::FromSample;
use futures_channel::{mpsc, oneshot};
use futures_util::StreamExt as _;
use super::AudioRenderQuantum;
use crate::buffer::AudioBuffer;
use crate::context::{
AudioContextState, AudioNodeId, OfflineAudioContext, OfflineAudioContextCallback,
};
use crate::events::{EventDispatch, EventLoop};
use crate::message::ControlMessage;
use crate::node::ChannelInterpretation;
use crate::render::AudioWorkletGlobalScope;
use crate::{AudioRenderCapacityLoad, RENDER_QUANTUM_SIZE};
use super::graph::Graph;
pub(crate) struct RenderThread {
graph: Option<Graph>,
sample_rate: f32,
buffer_size: usize,
number_of_channels: usize,
suspended: bool,
state: Arc<AtomicU8>,
frames_played: Arc<AtomicU64>,
receiver: Option<Receiver<ControlMessage>>,
buffer_offset: Option<(usize, AudioRenderQuantum)>,
load_value_sender: Option<Sender<AudioRenderCapacityLoad>>,
event_sender: Sender<EventDispatch>,
garbage_collector: Option<llq::Producer<Box<dyn Any + Send>>>,
}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl Send for Graph {}
unsafe impl Sync for Graph {}
unsafe impl Send for RenderThread {}
unsafe impl Sync for RenderThread {}
impl std::fmt::Debug for RenderThread {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RenderThread")
.field("sample_rate", &self.sample_rate)
.field("buffer_size", &self.buffer_size)
.field("frames_played", &self.frames_played.load(Ordering::Relaxed))
.field("number_of_channels", &self.number_of_channels)
.finish_non_exhaustive()
}
}
impl RenderThread {
pub fn new(
sample_rate: f32,
number_of_channels: usize,
receiver: Receiver<ControlMessage>,
state: Arc<AtomicU8>,
frames_played: Arc<AtomicU64>,
event_sender: Sender<EventDispatch>,
) -> Self {
Self {
graph: None,
sample_rate,
buffer_size: 0,
number_of_channels,
suspended: false,
state,
frames_played,
receiver: Some(receiver),
buffer_offset: None,
load_value_sender: None,
event_sender,
garbage_collector: None,
}
}
pub(crate) fn set_load_value_sender(
&mut self,
load_value_sender: Sender<AudioRenderCapacityLoad>,
) {
self.load_value_sender = Some(load_value_sender);
}
pub(crate) fn spawn_garbage_collector_thread(&mut self) {
if self.garbage_collector.is_none() {
let (gc_producer, gc_consumer) = llq::Queue::new().split();
spawn_garbage_collector_thread(gc_consumer);
self.garbage_collector = Some(gc_producer);
}
}
#[inline(always)]
fn handle_control_messages(&mut self) {
if self.receiver.is_none() {
return;
}
while let Ok(msg) = self.receiver.as_ref().unwrap().try_recv() {
let result = self.handle_control_message(msg);
if result.is_break() {
return; }
}
}
fn handle_control_message(&mut self, msg: ControlMessage) -> ControlFlow<()> {
use ControlMessage::*;
match msg {
RegisterNode {
id: node_id,
reclaim_id,
node,
inputs,
outputs,
channel_config,
} => {
self.graph.as_mut().unwrap().add_node(
node_id,
reclaim_id,
node,
inputs,
outputs,
channel_config,
);
}
ConnectNode {
from,
to,
output,
input,
} => {
self.graph
.as_mut()
.unwrap()
.add_edge((from, output), (to, input));
}
DisconnectNode {
from,
output,
to,
input,
} => {
self.graph
.as_mut()
.unwrap()
.remove_edge((from, output), (to, input));
}
ControlHandleDropped { id } => {
self.graph.as_mut().unwrap().mark_control_handle_dropped(id);
}
MarkCycleBreaker { id } => {
self.graph.as_mut().unwrap().mark_cycle_breaker(id);
}
CloseAndRecycle { sender } => {
self.set_state(AudioContextState::Suspended);
let _ = sender.send(self.graph.take().unwrap());
self.receiver = None;
return ControlFlow::Break(()); }
Startup { graph } => {
debug_assert!(self.graph.is_none());
self.graph = Some(graph);
self.set_state(AudioContextState::Running);
}
NodeMessage { id, mut msg } => {
self.graph.as_mut().unwrap().route_message(id, msg.as_mut());
if let Some(gc) = self.garbage_collector.as_mut() {
gc.push(msg)
}
}
RunDiagnostics { mut buffer } => {
use std::io::Write;
writeln!(&mut buffer, "{:#?}", &self).ok();
writeln!(&mut buffer, "{:?}", &self.graph).ok();
self.event_sender
.try_send(EventDispatch::diagnostics(buffer))
.expect("Unable to send diagnostics - channel is full");
}
Suspend { notify } => {
self.suspended = true;
self.set_state(AudioContextState::Suspended);
notify.send();
}
Resume { notify } => {
self.suspended = false;
self.set_state(AudioContextState::Running);
notify.send();
}
Close { notify } => {
self.suspended = true;
self.set_state(AudioContextState::Closed);
notify.send();
}
SetChannelCount { id, count } => {
self.graph.as_mut().unwrap().set_channel_count(id, count);
}
SetChannelCountMode { id, mode } => {
self.graph
.as_mut()
.unwrap()
.set_channel_count_mode(id, mode);
}
SetChannelInterpretation { id, interpretation } => {
self.graph
.as_mut()
.unwrap()
.set_channel_interpretation(id, interpretation);
}
}
ControlFlow::Continue(()) }
pub fn render_audiobuffer_sync(
mut self,
context: &mut OfflineAudioContext,
mut suspend_callbacks: Vec<(usize, Box<OfflineAudioContextCallback>)>,
event_loop: &EventLoop,
) -> AudioBuffer {
let length = context.length();
let sample_rate = self.sample_rate;
let mut buffer = Vec::with_capacity(self.number_of_channels);
buffer.resize_with(buffer.capacity(), || Vec::with_capacity(length));
let num_frames = length.div_ceil(RENDER_QUANTUM_SIZE);
self.handle_control_messages();
for quantum in 0..num_frames {
if suspend_callbacks.first().map(|&(q, _)| q) == Some(quantum) {
let callback = suspend_callbacks.remove(0).1;
(callback)(context);
self.handle_control_messages();
}
self.render_offline_quantum(&mut buffer);
let events_were_handled = event_loop.handle_pending_events();
if events_were_handled {
self.handle_control_messages();
}
}
self.unload_graph();
event_loop.handle_pending_events();
AudioBuffer::from(buffer, sample_rate)
}
pub async fn render_audiobuffer(
mut self,
length: usize,
mut suspend_callbacks: Vec<(usize, oneshot::Sender<()>)>,
mut resume_receiver: mpsc::Receiver<()>,
event_loop: &EventLoop,
) -> AudioBuffer {
let sample_rate = self.sample_rate;
let mut buffer = Vec::with_capacity(self.number_of_channels);
buffer.resize_with(buffer.capacity(), || Vec::with_capacity(length));
let num_frames = length.div_ceil(RENDER_QUANTUM_SIZE);
self.handle_control_messages();
for quantum in 0..num_frames {
if suspend_callbacks.first().map(|&(q, _)| q) == Some(quantum) {
let sender = suspend_callbacks.remove(0).1;
sender.send(()).unwrap();
resume_receiver.next().await;
self.handle_control_messages();
}
self.render_offline_quantum(&mut buffer);
let events_were_handled = event_loop.handle_pending_events();
if events_were_handled {
self.handle_control_messages();
}
}
self.unload_graph();
event_loop.handle_pending_events();
AudioBuffer::from(buffer, sample_rate)
}
fn render_offline_quantum(&mut self, buffer: &mut [Vec<f32>]) {
let current_frame = self
.frames_played
.fetch_add(RENDER_QUANTUM_SIZE as u64, Ordering::Relaxed);
let current_time = current_frame as f64 / self.sample_rate as f64;
let scope = AudioWorkletGlobalScope {
current_frame,
current_time,
sample_rate: self.sample_rate,
event_sender: self.event_sender.clone(),
node_id: Cell::new(AudioNodeId(0)), };
let graph = self.graph.as_mut().unwrap();
#[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64"))]
let rendered = unsafe {
no_denormals::no_denormals(|| graph.render(&scope))
};
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64")))]
let rendered = graph.render(&scope);
let remaining = (buffer[0].capacity() - buffer[0].len()).min(RENDER_QUANTUM_SIZE);
let channels = rendered.channels();
buffer.iter_mut().enumerate().for_each(|(i, b)| {
let c = channels
.get(i)
.map(AsRef::as_ref)
.unwrap_or(&[0.; RENDER_QUANTUM_SIZE]);
b.extend_from_slice(&c[..remaining]);
});
}
fn unload_graph(mut self) {
let current_frame = self.frames_played.load(Ordering::Relaxed);
let current_time = current_frame as f64 / self.sample_rate as f64;
let scope = AudioWorkletGlobalScope {
current_frame,
current_time,
sample_rate: self.sample_rate,
event_sender: self.event_sender.clone(),
node_id: Cell::new(AudioNodeId(0)), };
self.graph.take().unwrap().before_drop(&scope);
}
pub fn render<S: FromSample<f32> + Clone>(&mut self, output_buffer: &mut [S]) {
let render_start = Instant::now();
#[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64"))]
unsafe {
no_denormals::no_denormals(|| self.render_inner(output_buffer))
};
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64")))]
self.render_inner(output_buffer);
if let Some(load_value_sender) = &self.load_value_sender {
let duration = render_start.elapsed().as_micros() as f64 / 1E6;
let max_duration = RENDER_QUANTUM_SIZE as f64 / self.sample_rate as f64;
let load_value = duration / max_duration;
let render_timestamp =
self.frames_played.load(Ordering::Relaxed) as f64 / self.sample_rate as f64;
let load_value_data = AudioRenderCapacityLoad {
render_timestamp,
load_value,
};
let _ = load_value_sender.try_send(load_value_data);
}
}
fn render_inner<S: FromSample<f32> + Clone>(&mut self, mut output_buffer: &mut [S]) {
self.buffer_size = output_buffer.len();
if let Some((offset, prev_rendered)) = self.buffer_offset.take() {
let leftover_len = (RENDER_QUANTUM_SIZE - offset) * self.number_of_channels;
let (first, next) = output_buffer.split_at_mut(leftover_len.min(output_buffer.len()));
for i in 0..self.number_of_channels {
let output = first.iter_mut().skip(i).step_by(self.number_of_channels);
let channel = prev_rendered.channel_data(i)[offset..].iter();
for (sample, input) in output.zip(channel) {
let value = S::from_sample_(*input);
*sample = value;
}
}
if next.is_empty() {
self.buffer_offset = Some((
offset + first.len() / self.number_of_channels,
prev_rendered,
));
return;
}
output_buffer = next;
}
self.handle_control_messages();
if self.suspended || !self.graph.as_ref().is_some_and(Graph::is_active) {
output_buffer.fill(S::from_sample_(0.));
return;
}
let chunk_size = RENDER_QUANTUM_SIZE * self.number_of_channels;
for data in output_buffer.chunks_mut(chunk_size) {
let current_frame = self
.frames_played
.fetch_add(RENDER_QUANTUM_SIZE as u64, Ordering::Relaxed);
let current_time = current_frame as f64 / self.sample_rate as f64;
let scope = AudioWorkletGlobalScope {
current_frame,
current_time,
sample_rate: self.sample_rate,
event_sender: self.event_sender.clone(),
node_id: Cell::new(AudioNodeId(0)), };
let mut destination_buffer = self.graph.as_mut().unwrap().render(&scope).clone();
if destination_buffer.number_of_channels() < self.number_of_channels {
destination_buffer.mix(self.number_of_channels, ChannelInterpretation::Discrete);
}
for i in 0..self.number_of_channels {
let output = data.iter_mut().skip(i).step_by(self.number_of_channels);
let channel = destination_buffer.channel_data(i).iter();
for (sample, input) in output.zip(channel) {
let value = S::from_sample_(*input);
*sample = value;
}
}
if data.len() != chunk_size {
let channel_offset = data.len() / self.number_of_channels;
debug_assert!(channel_offset < RENDER_QUANTUM_SIZE);
self.buffer_offset = Some((channel_offset, destination_buffer));
}
self.handle_control_messages();
}
}
fn set_state(&self, state: AudioContextState) {
self.state.store(state as u8, Ordering::Relaxed);
self.event_sender
.try_send(EventDispatch::state_change(state))
.ok();
}
}
impl Drop for RenderThread {
fn drop(&mut self) {
if let Some(gc) = self.garbage_collector.as_mut() {
gc.push(llq::Node::new(Box::new(TerminateGarbageCollectorThread)))
}
log::info!("Audio render thread has been dropped");
}
}
const GARBAGE_COLLECTOR_THREAD_TIMEOUT: Duration = Duration::from_millis(100);
#[derive(Debug)]
struct TerminateGarbageCollectorThread;
fn spawn_garbage_collector_thread(consumer: llq::Consumer<Box<dyn Any + Send>>) {
let _join_handle = std::thread::spawn(move || run_garbage_collector_thread(consumer));
}
fn run_garbage_collector_thread(mut consumer: llq::Consumer<Box<dyn Any + Send>>) {
log::info!("Entering garbage collector thread");
loop {
if let Some(node) = consumer.pop() {
if node
.as_ref()
.downcast_ref::<TerminateGarbageCollectorThread>()
.is_some()
{
log::info!("Terminating garbage collector thread");
break;
}
} else {
std::thread::sleep(GARBAGE_COLLECTOR_THREAD_TIMEOUT);
}
}
log::info!("Exiting garbage collector thread");
}