use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::atomic::{AtomicU64, Ordering};
use cpal::Sample;
use crossbeam_channel::Receiver;
use crate::alloc::{Alloc, AudioBuffer};
use crate::buffer::{ChannelConfig, ChannelCountMode};
use crate::message::ControlMessage;
use crate::process::{AudioParamValues, AudioProcessor};
use crate::{SampleRate, BUFFER_SIZE};
pub(crate) struct RenderThread {
graph: Graph,
sample_rate: SampleRate,
channels: usize,
frames_played: AtomicU64,
receiver: Receiver<ControlMessage>,
}
unsafe impl Send for RenderThread {}
impl RenderThread {
pub fn new(
sample_rate: SampleRate,
channels: usize,
receiver: Receiver<ControlMessage>,
) -> Self {
Self {
graph: Graph::new(),
sample_rate,
channels,
frames_played: AtomicU64::new(0),
receiver,
}
}
fn handle_control_messages(&mut self) {
for msg in self.receiver.try_iter() {
use ControlMessage::*;
match msg {
RegisterNode {
id,
node,
inputs,
outputs,
channel_config,
} => {
self.graph
.add_node(NodeIndex(id), node, inputs, outputs, channel_config);
}
ConnectNode {
from,
to,
output,
input,
} => {
self.graph
.add_edge((NodeIndex(from), output), (NodeIndex(to), input));
}
DisconnectNode { from, to } => {
self.graph.remove_edge(NodeIndex(from), NodeIndex(to));
}
DisconnectAll { from } => {
self.graph.remove_edges_from(NodeIndex(from));
}
FreeWhenFinished { id } => {
self.graph.mark_free_when_finished(NodeIndex(id));
}
AudioParamEvent { to, event } => {
to.send(event).expect("Audioparam disappeared unexpectedly")
}
}
}
}
pub fn render_audiobuffer(&mut self, length: usize) -> crate::buffer::AudioBuffer {
debug_assert_eq!(length % BUFFER_SIZE as usize, 0);
let mut buf = crate::buffer::AudioBuffer::new(self.channels, 0, self.sample_rate);
for _ in 0..length / BUFFER_SIZE as usize {
self.handle_control_messages();
let timestamp = self
.frames_played
.fetch_add(BUFFER_SIZE as u64, Ordering::SeqCst) as f64
/ self.sample_rate.0 as f64;
let rendered = self.graph.render(timestamp, self.sample_rate);
buf.extend_alloc(rendered);
}
buf
}
pub fn render<S: Sample>(&mut self, buffer: &mut [S]) {
let chunk_size = BUFFER_SIZE as usize * self.channels as usize;
debug_assert_eq!(buffer.len() % chunk_size, 0);
for data in buffer.chunks_exact_mut(chunk_size) {
self.handle_control_messages();
let timestamp = self
.frames_played
.fetch_add(BUFFER_SIZE as u64, Ordering::SeqCst) as f64
/ self.sample_rate.0 as f64;
let rendered = self.graph.render(timestamp, self.sample_rate);
for i in 0..self.channels {
let output = data.iter_mut().skip(i).step_by(self.channels);
let channel = rendered.channel_data(i).iter();
for (sample, input) in output.zip(channel) {
let value = Sample::from::<f32>(input);
*sample = value;
}
}
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub struct NodeIndex(pub u64);
pub struct Node {
processor: Box<dyn AudioProcessor>,
inputs: Vec<AudioBuffer>,
outputs: Vec<AudioBuffer>,
channel_config: ChannelConfig,
free_when_finished: bool,
has_inputs_connected: bool,
has_outputs_connected: bool,
}
impl Node {
fn process(&mut self, params: AudioParamValues, timestamp: f64, sample_rate: SampleRate) {
self.processor.process(
&self.inputs[..],
&mut self.outputs[..],
params,
timestamp,
sample_rate,
)
}
fn can_free(&self) -> bool {
if !self.free_when_finished {
return false;
}
if !self.has_outputs_connected {
return true;
}
if !self.has_inputs_connected && !self.processor.tail_time() {
return true;
}
false
}
pub fn get_buffer(&self) -> &AudioBuffer {
self.outputs.get(0).unwrap()
}
}
pub(crate) struct Graph {
nodes: HashMap<NodeIndex, Node>,
edges: HashSet<((NodeIndex, u32), (NodeIndex, u32))>,
marked: Vec<NodeIndex>,
marked_temp: Vec<NodeIndex>,
ordered: Vec<NodeIndex>,
in_cycle: Vec<NodeIndex>,
alloc: Alloc,
}
impl Graph {
pub fn new() -> Self {
Graph {
nodes: HashMap::new(),
edges: HashSet::new(),
ordered: vec![],
marked: vec![],
marked_temp: vec![],
in_cycle: vec![],
alloc: Alloc::with_capacity(64),
}
}
pub fn add_node(
&mut self,
index: NodeIndex,
processor: Box<dyn AudioProcessor>,
inputs: usize,
outputs: usize,
channel_config: ChannelConfig,
) {
let inputs = vec![AudioBuffer::new(self.alloc.silence()); inputs];
let outputs = vec![AudioBuffer::new(self.alloc.silence()); outputs];
self.nodes.insert(
index,
Node {
processor,
inputs,
outputs,
channel_config,
free_when_finished: false,
has_inputs_connected: true,
has_outputs_connected: true,
},
);
}
pub fn add_edge(&mut self, source: (NodeIndex, u32), dest: (NodeIndex, u32)) {
self.edges.insert((source, dest));
self.ordered.clear(); }
pub fn remove_edge(&mut self, source: NodeIndex, dest: NodeIndex) {
self.edges.retain(|&(s, d)| s.0 != source || d.0 != dest);
self.ordered.clear(); }
pub fn remove_edges_from(&mut self, source: NodeIndex) {
self.edges.retain(|&(s, _d)| s.0 != source);
self.ordered.clear(); }
fn mark_free_when_finished(&mut self, index: NodeIndex) {
self.nodes.get_mut(&index).unwrap().free_when_finished = true;
}
pub fn children(&self, node: NodeIndex) -> impl Iterator<Item = NodeIndex> + '_ {
self.edges
.iter()
.filter(move |&(_s, d)| d.0 == node)
.map(|&(s, _d)| s.0)
}
fn visit(
&self,
n: NodeIndex,
marked: &mut Vec<NodeIndex>,
marked_temp: &mut Vec<NodeIndex>,
ordered: &mut Vec<NodeIndex>,
in_cycle: &mut Vec<NodeIndex>,
) {
if let Some(pos) = marked_temp.iter().position(|&m| m == n) {
in_cycle.extend_from_slice(&marked_temp[pos..]);
return;
}
if marked.contains(&n) {
return;
}
marked.push(n);
marked_temp.push(n);
self.children(n)
.for_each(|c| self.visit(c, marked, marked_temp, ordered, in_cycle));
marked_temp.retain(|marked| *marked != n);
ordered.insert(0, n);
}
fn order_nodes(&mut self) {
let mut ordered = std::mem::replace(&mut self.ordered, vec![]);
let mut marked = std::mem::replace(&mut self.marked, vec![]);
let mut marked_temp = std::mem::replace(&mut self.marked_temp, vec![]);
let mut in_cycle = std::mem::replace(&mut self.in_cycle, vec![]);
ordered.clear();
marked.clear();
marked_temp.clear();
in_cycle.clear();
self.nodes.keys().for_each(|&i| {
self.visit(
i,
&mut marked,
&mut marked_temp,
&mut ordered,
&mut in_cycle,
);
});
ordered.retain(|o| !in_cycle.contains(o));
for key in in_cycle.iter() {
self.nodes
.get_mut(key)
.unwrap()
.outputs
.iter_mut()
.for_each(AudioBuffer::make_silent);
}
ordered.reverse();
self.ordered = ordered;
self.marked = marked;
self.marked_temp = marked_temp;
self.in_cycle = in_cycle;
}
pub fn render(&mut self, timestamp: f64, sample_rate: SampleRate) -> &AudioBuffer {
if self.ordered.is_empty() {
self.order_nodes();
}
let ordered = &self.ordered;
let edges = &self.edges;
let nodes = &mut self.nodes;
let mut drop_nodes = vec![];
ordered.iter().for_each(|index| {
let mut node = nodes.remove(index).unwrap();
let mut has_inputs_connected = false;
node.inputs.iter_mut().for_each(|i| i.make_silent());
edges
.iter()
.filter_map(move |(s, d)| {
if d.0 == *index && d.1 != u32::MAX {
Some((s, d.1))
} else {
None
}
})
.for_each(|(&(node_index, output), input)| {
let input_node = nodes.get(&node_index).unwrap();
let signal = &input_node.outputs[output as usize];
node.inputs[input as usize].add(signal, node.channel_config.interpretation());
has_inputs_connected = true;
});
let mode = node.channel_config.count_mode();
let count = node.channel_config.count();
let interpretation = node.channel_config.interpretation();
node.inputs.iter_mut().for_each(|input_buf| {
let cur_channels = input_buf.number_of_channels();
let new_channels = match mode {
ChannelCountMode::Max => cur_channels,
ChannelCountMode::Explicit => count,
ChannelCountMode::ClampedMax => cur_channels.min(count),
};
input_buf.mix(new_channels, interpretation);
});
let params = AudioParamValues::from(&*nodes);
node.process(params, timestamp, sample_rate);
node.has_inputs_connected = has_inputs_connected;
if node.can_free() {
drop_nodes.push(*index);
}
nodes.insert(*index, node);
});
for index in drop_nodes {
self.remove_edges_from(index);
self.nodes.remove(&index);
}
&self.nodes.get(&NodeIndex(0)).unwrap().outputs[0]
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, Clone)]
struct TestNode {}
impl AudioProcessor for TestNode {
fn process(
&mut self,
_inputs: &[AudioBuffer],
_outputs: &mut [AudioBuffer],
_params: AudioParamValues,
_timestamp: f64,
_sample_rate: SampleRate,
) {
}
fn tail_time(&self) -> bool {
false
}
}
fn config() -> ChannelConfig {
crate::buffer::ChannelConfigOptions {
count: 2,
mode: crate::buffer::ChannelCountMode::Explicit,
interpretation: crate::buffer::ChannelInterpretation::Speakers,
}
.into()
}
#[test]
fn test_add_remove() {
let mut graph = Graph::new();
let node = Box::new(TestNode {});
graph.add_node(NodeIndex(0), node.clone(), 1, 1, config());
graph.add_node(NodeIndex(1), node.clone(), 1, 1, config());
graph.add_node(NodeIndex(2), node.clone(), 1, 1, config());
graph.add_node(NodeIndex(3), node.clone(), 1, 1, config());
graph.add_edge((NodeIndex(1), 0), (NodeIndex(0), 0));
graph.add_edge((NodeIndex(2), 0), (NodeIndex(1), 0));
graph.add_edge((NodeIndex(3), 0), (NodeIndex(0), 0));
graph.order_nodes();
assert_eq!(graph.ordered.len(), 4); assert_eq!(graph.ordered[3], NodeIndex(0));
let pos1 = graph
.ordered
.iter()
.position(|&n| n == NodeIndex(1))
.unwrap();
let pos2 = graph
.ordered
.iter()
.position(|&n| n == NodeIndex(2))
.unwrap();
assert!(pos2 < pos1);
graph.remove_edge(NodeIndex(1), NodeIndex(0));
graph.order_nodes();
assert_eq!(graph.ordered.len(), 4); let pos1 = graph
.ordered
.iter()
.position(|&n| n == NodeIndex(1))
.unwrap();
let pos2 = graph
.ordered
.iter()
.position(|&n| n == NodeIndex(2))
.unwrap();
assert!(pos2 < pos1); }
#[test]
fn test_remove_all() {
let mut graph = Graph::new();
let node = Box::new(TestNode {});
graph.add_node(NodeIndex(0), node.clone(), 1, 1, config());
graph.add_node(NodeIndex(1), node.clone(), 1, 1, config());
graph.add_node(NodeIndex(2), node.clone(), 1, 1, config());
graph.add_edge((NodeIndex(1), 0), (NodeIndex(0), 0));
graph.add_edge((NodeIndex(1), 0), (NodeIndex(2), 0));
graph.add_edge((NodeIndex(2), 0), (NodeIndex(0), 0));
graph.order_nodes();
assert_eq!(
graph.ordered,
vec![NodeIndex(1), NodeIndex(2), NodeIndex(0)]
);
graph.remove_edges_from(NodeIndex(1));
graph.order_nodes();
assert_eq!(graph.ordered.len(), 3); let pos0 = graph
.ordered
.iter()
.position(|&n| n == NodeIndex(0))
.unwrap();
let pos2 = graph
.ordered
.iter()
.position(|&n| n == NodeIndex(2))
.unwrap();
assert!(pos2 < pos0); }
#[test]
fn test_cycle() {
let mut graph = Graph::new();
let node = Box::new(TestNode {});
graph.add_node(NodeIndex(0), node.clone(), 1, 1, config());
graph.add_node(NodeIndex(1), node.clone(), 1, 1, config());
graph.add_node(NodeIndex(2), node.clone(), 1, 1, config());
graph.add_node(NodeIndex(3), node.clone(), 1, 1, config());
graph.add_node(NodeIndex(4), node.clone(), 1, 1, config());
graph.add_edge((NodeIndex(4), 0), (NodeIndex(2), 0));
graph.add_edge((NodeIndex(2), 0), (NodeIndex(1), 0));
graph.add_edge((NodeIndex(1), 0), (NodeIndex(0), 0));
graph.add_edge((NodeIndex(1), 0), (NodeIndex(2), 0));
graph.add_edge((NodeIndex(3), 0), (NodeIndex(0), 0));
graph.order_nodes();
let pos0 = graph.ordered.iter().position(|&n| n == NodeIndex(0));
let pos1 = graph.ordered.iter().position(|&n| n == NodeIndex(1));
let pos2 = graph.ordered.iter().position(|&n| n == NodeIndex(2));
let pos3 = graph.ordered.iter().position(|&n| n == NodeIndex(3));
let pos4 = graph.ordered.iter().position(|&n| n == NodeIndex(4));
assert_eq!(pos1, None);
assert_eq!(pos2, None);
assert!(pos4.is_some());
assert!(pos3.unwrap() < pos0.unwrap());
}
}