use crate::backend_factory::BackendFactory;
use crate::factory::{NodeFactory, RegistryError};
use rill_core::buffer::{Buffer, BufferRegistry, FixedBuffer, TapeLoop};
use rill_core::math::Transcendental;
use rill_core::queues::{MpscQueue, SetParameter};
use rill_core::time::ClockTick;
use rill_core::traits::algorithm::ActionContext;
use rill_core::traits::port::Port;
use rill_core::traits::processable::{ProcessContext, Processable};
use rill_core::traits::ParamValue;
use rill_core::traits::{Node, NodeId, NodeVariant, Params};
use rill_core_actor::{ActorCell, ActorRef};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub enum BuildError {
CycleDetected,
Backend(String),
}
impl std::fmt::Display for BuildError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CycleDetected => write!(f, "graph cycle detected"),
Self::Backend(msg) => write!(f, "backend error: {msg}"),
}
}
}
pub(crate) struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
pub(crate) node: NodeVariant<T, BUF_SIZE>,
}
#[derive(Clone)]
pub struct GraphResource {
pub name: String,
pub kind: String,
pub capacity: usize,
}
pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
nodes: Vec<NodeEntry<T, BUF_SIZE>>,
node_backends: Vec<Option<String>>,
signal_edges: Vec<(usize, usize, usize, usize)>,
control_edges: Vec<(usize, usize, usize, usize)>,
clock_edges: Vec<(usize, usize, usize, usize)>,
feedback_edges: Vec<(usize, usize, usize, usize)>,
resources: Vec<GraphResource>,
factory: Arc<NodeFactory<T, BUF_SIZE>>,
backend_factory: Arc<BackendFactory<T>>,
default_backend: Option<String>,
backend_params: HashMap<String, ParamValue>,
sample_rate: Option<f32>,
clock_tx: Option<ActorRef<ClockTick>>,
}
impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
pub fn new(
factory: Arc<NodeFactory<T, BUF_SIZE>>,
backend_factory: Arc<BackendFactory<T>>,
) -> Self {
Self {
nodes: Vec::new(),
node_backends: Vec::new(),
signal_edges: Vec::new(),
control_edges: Vec::new(),
clock_edges: Vec::new(),
feedback_edges: Vec::new(),
resources: Vec::new(),
factory,
backend_factory,
default_backend: None,
backend_params: HashMap::new(),
sample_rate: None,
clock_tx: None,
}
}
pub fn add_node(&mut self, type_name: &str, params: &Params) -> Result<usize, RegistryError> {
let id = NodeId(self.nodes.len() as u32);
self.add_node_with_id(type_name, params, id)
}
pub fn add_node_with_id(
&mut self,
type_name: &str,
params: &Params,
id: NodeId,
) -> Result<usize, RegistryError> {
let node = self.factory.construct(type_name, id, params)?;
let idx = self.nodes.len();
self.nodes.push(NodeEntry { node });
self.node_backends.push(None);
Ok(idx)
}
pub fn set_node_backend(&mut self, idx: usize, name: String) {
if idx < self.node_backends.len() {
self.node_backends[idx] = Some(name);
}
}
pub fn add_resource(&mut self, resource: GraphResource) {
self.resources.push(resource);
}
pub fn node_count(&self) -> usize {
self.nodes.len()
}
pub fn set_default_backend(&mut self, name: String, params: HashMap<String, ParamValue>) {
self.default_backend = Some(name);
self.backend_params = params;
}
pub fn default_backend_name(&self) -> Option<&String> {
self.default_backend.as_ref()
}
pub fn set_sample_rate(&mut self, sr: f32) {
self.sample_rate = Some(sr);
}
pub fn set_clock_tx(&mut self, tx: ActorRef<ClockTick>) {
self.clock_tx = Some(tx);
}
pub fn backend_factory(&self) -> &Arc<BackendFactory<T>> {
&self.backend_factory
}
pub fn add_source(&mut self, source: Box<dyn rill_core::traits::Source<T, BUF_SIZE>>) -> usize {
let idx = self.nodes.len();
self.nodes.push(NodeEntry {
node: NodeVariant::Source(source),
});
self.node_backends.push(None);
idx
}
pub fn add_processor(
&mut self,
processor: Box<dyn rill_core::traits::Processor<T, BUF_SIZE>>,
) -> usize {
let idx = self.nodes.len();
self.nodes.push(NodeEntry {
node: NodeVariant::Processor(processor),
});
self.node_backends.push(None);
idx
}
pub fn add_sink(&mut self, sink: Box<dyn rill_core::traits::Sink<T, BUF_SIZE>>) -> usize {
let idx = self.nodes.len();
self.nodes.push(NodeEntry {
node: NodeVariant::Sink(sink),
});
self.node_backends.push(None);
idx
}
pub fn add_router(&mut self, router: Box<dyn rill_core::traits::Router<T, BUF_SIZE>>) -> usize {
let idx = self.nodes.len();
self.nodes.push(NodeEntry {
node: NodeVariant::Router(router),
});
self.node_backends.push(None);
idx
}
pub fn connect_signal(
&mut self,
from_node: usize,
from_port: usize,
to_node: usize,
to_port: usize,
) {
self.signal_edges
.push((from_node, from_port, to_node, to_port));
}
pub fn connect_control(
&mut self,
from_node: usize,
from_port: usize,
to_node: usize,
to_port: usize,
) {
self.control_edges
.push((from_node, from_port, to_node, to_port));
}
pub fn connect_clock(
&mut self,
from_node: usize,
from_port: usize,
to_node: usize,
to_port: usize,
) {
self.clock_edges
.push((from_node, from_port, to_node, to_port));
}
pub fn connect_feedback(
&mut self,
from_node: usize,
from_port: usize,
to_node: usize,
to_port: usize,
) {
self.feedback_edges
.push((from_node, from_port, to_node, to_port));
}
pub fn build(mut self) -> Result<Graph<T, BUF_SIZE>, BuildError> {
let num_nodes = self.nodes.len();
let mut in_degree = vec![0usize; num_nodes];
let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
in_degree[to_n] += 1;
out_edges[from_n].push((from_p, to_n, to_p));
}
let mut queue: VecDeque<usize> = in_degree
.iter()
.enumerate()
.filter(|(_, &d)| d == 0)
.map(|(i, _)| i)
.collect();
let mut topo = Vec::with_capacity(num_nodes);
let mut indeg = in_degree;
while let Some(idx) = queue.pop_front() {
topo.push(idx);
for &(_, to_n, _) in &out_edges[idx] {
indeg[to_n] -= 1;
if indeg[to_n] == 0 {
queue.push_back(to_n);
}
}
}
if topo.len() != num_nodes {
return Err(BuildError::CycleDetected);
}
for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
port.downstream.push((to_n, to_p));
}
let in_ptr: *mut Port<T, BUF_SIZE> = self.nodes[to_n]
.node
.input_port_mut(to_p)
.map(|p| p as *mut Port<T, BUF_SIZE>)
.unwrap_or(std::ptr::null_mut());
let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
let out_ptr: *mut Port<T, BUF_SIZE> = self.nodes[from_n]
.node
.output_port_mut(from_p)
.map(|p| p as *mut Port<T, BUF_SIZE>)
.unwrap_or(std::ptr::null_mut());
if !in_ptr.is_null() && !out_ptr.is_null() {
#[allow(unsafe_code)]
unsafe {
(*in_ptr).parent = parent;
(*out_ptr).downstream_input_ptrs.push(in_ptr);
}
}
}
for &(from_n, from_p, to_n, _) in &self.signal_edges {
let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
let ptr_val = parent as usize;
let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
if !already {
port.downstream_nodes.push(parent);
}
}
}
for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
let upstream = self.nodes[from_n]
.node
.output_port(from_p)
.map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
if port.upstream_buffer.is_none() {
port.upstream_buffer = upstream;
} else {
port.upstream_buffer = None;
}
}
}
for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
port.feedback_buffer = Some(FixedBuffer::new());
port.feedback_downstream.push((to_n, to_p));
}
if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
port.feedback_buffer = Some(FixedBuffer::new());
}
}
for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
let ptr = self.nodes[to_n]
.node
.input_port(to_p)
.map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
.map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
if let Some(p) = ptr {
port.feedback_ptrs.push(p);
}
}
}
let sr = self.sample_rate.unwrap_or(44100.0);
let mut buffers = BufferRegistry::new();
for r in &self.resources {
if r.kind == "tape" {
if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
buffers.register(&r.name, Box::new(tape));
}
}
}
for entry in &mut self.nodes {
entry.node.resolve_resources(&buffers);
}
for (idx, be_name) in self.node_backends.iter().enumerate() {
let name = match be_name {
Some(ref n) => Some(n.clone()),
None => self.default_backend.clone(),
};
if let Some(ref name) = name {
let mut be_params = HashMap::new();
be_params.insert("sample_rate".into(), ParamValue::Float(sr));
be_params.insert("buffer_size".into(), ParamValue::Int(BUF_SIZE as i32));
if self.default_backend.as_ref() == Some(name) {
for (k, v) in &self.backend_params {
be_params.entry(k.clone()).or_insert_with(|| v.clone());
}
}
let backend = self
.backend_factory
.create(name, &be_params)
.map_err(BuildError::Backend)?;
if let Some(io_node) = self.nodes[idx].node.as_io_node_mut() {
io_node.resolve_backend(backend);
}
}
}
let mut nodes: Vec<NodeVariant<T, BUF_SIZE>> =
self.nodes.into_iter().map(|e| e.node).collect();
let cmd_queue = Arc::new(MpscQueue::<SetParameter>::with_capacity(64));
let mut active_node_idx = None;
for (i, n) in nodes.iter_mut().enumerate() {
if n.as_active_node_mut().is_some() {
active_node_idx = Some(i);
break;
}
}
let have_queue = active_node_idx.is_some();
let command_queue = if have_queue { Some(cmd_queue) } else { None };
let owned_buffers = buffers.into_inner();
let allocated = self.resources.clone();
Ok(Graph {
nodes,
topo_order: topo,
resources: allocated,
current_tick: ClockTick::new(0, BUF_SIZE as u32, sr),
buffers: owned_buffers,
active_node_idx,
command_queue,
clock_tx: self.clock_tx.clone(),
})
}
}
pub struct Graph<T: Transcendental, const BUF_SIZE: usize> {
nodes: Vec<NodeVariant<T, BUF_SIZE>>,
topo_order: Vec<usize>,
current_tick: ClockTick,
pub(crate) resources: Vec<GraphResource>,
#[allow(dead_code)]
buffers: Vec<Box<dyn Buffer<T> + Send>>,
active_node_idx: Option<usize>,
command_queue: Option<Arc<MpscQueue<SetParameter>>>,
clock_tx: Option<ActorRef<ClockTick>>,
}
impl<T: Transcendental, const BUF_SIZE: usize> Graph<T, BUF_SIZE> {
pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
&self.nodes
}
pub fn nodes_mut(&mut self) -> &mut [NodeVariant<T, BUF_SIZE>] {
&mut self.nodes
}
pub fn current_tick(&self) -> ClockTick {
self.current_tick
}
pub fn node_count(&self) -> usize {
self.nodes.len()
}
pub fn topo_order(&self) -> &[usize] {
&self.topo_order
}
#[allow(dead_code)]
pub(crate) fn sample_rate(&self) -> f32 {
self.current_tick.sample_rate
}
#[allow(dead_code)]
pub fn resources(&self) -> &[GraphResource] {
&self.resources
}
#[allow(unsafe_code)]
pub fn run(&mut self, running: Arc<AtomicBool>) -> Result<(), String> {
let Some(idx) = self.active_node_idx else {
return Ok(());
};
let source_idx = self.topo_order[0];
let cmd_queue = self
.command_queue
.clone()
.unwrap_or_else(|| Arc::new(MpscQueue::new()));
let clock_tx = self
.clock_tx
.clone()
.unwrap_or_else(|| ActorRef::new(&Arc::new(MpscQueue::new())));
let graph_ptr: *mut Graph<T, BUF_SIZE> = self;
let tick: Box<dyn FnMut(u64, f32)> = Box::new(move |sample_pos, sample_rate| {
let graph = unsafe { &mut *graph_ptr };
while let Some(cmd) = cmd_queue.pop() {
let i = cmd.port.node_id().inner() as usize;
if i < graph.nodes.len() {
let _ = graph.nodes[i].set_parameter(&cmd.parameter, cmd.value);
}
}
let tick = ClockTick::new(sample_pos, BUF_SIZE as u32, sample_rate);
let mut ctx = ProcessContext { clock: &tick };
let _ = graph.nodes[source_idx].process_block(&mut ctx);
let action_ctx = ActionContext::new(&tick);
for po in 0..graph.nodes[source_idx].num_signal_outputs() {
if let Some(port) = graph.nodes[source_idx].output_port(po) {
let _ = port.propagate(port.buffer(), &action_ctx);
}
}
clock_tx.send(tick);
});
self.nodes[idx]
.as_active_node_mut()
.ok_or("no active node")?
.run(tick, running)
}
pub fn handle(&self) -> Option<ActorRef<SetParameter>> {
let mailbox = self.command_queue.as_ref()?;
Some(ActorRef::new(mailbox))
}
#[cfg(test)]
pub fn into_parts(
self,
) -> (
Vec<NodeVariant<T, BUF_SIZE>>,
Vec<usize>,
ClockTick,
Vec<Box<dyn Buffer<T> + Send>>,
) {
let Self {
nodes,
topo_order,
current_tick,
resources: _,
buffers,
active_node_idx: _,
command_queue: _,
clock_tx: _,
} = self;
(nodes, topo_order, current_tick, buffers)
}
}
impl<T: Transcendental, const BUF_SIZE: usize> ActorCell for Graph<T, BUF_SIZE> {
type Msg = SetParameter;
fn receive(&mut self, msg: SetParameter) {
let idx = msg.port.node_id().inner() as usize;
if idx < self.nodes.len() {
let _ = self.nodes[idx].set_parameter(&msg.parameter, msg.value);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use rill_core::math::Transcendental;
use rill_core::time::ClockTick;
use rill_core::traits::algorithm::ActionContext;
use rill_core::traits::processable::{ProcessContext, Processable};
use rill_core::traits::{
Node, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
PortDirection, PortId, ProcessResult, Processor, Sink, Source,
};
use std::sync::Arc;
fn test_builder<const B: usize>() -> GraphBuilder<f32, B> {
GraphBuilder::new(
Arc::new(NodeFactory::new()),
Arc::new(BackendFactory::new()),
)
}
struct ConstantSource<T: Transcendental, const BUF_SIZE: usize> {
value: T,
state: NodeState<T, BUF_SIZE>,
outputs: Vec<Port<T, BUF_SIZE>>,
}
impl<T: Transcendental, const BUF_SIZE: usize> ConstantSource<T, BUF_SIZE> {
fn new(value: T, sample_rate: f32) -> Self {
let mut outputs = Vec::with_capacity(1);
outputs.push(Port {
id: PortId::signal_out(NodeId(0), 0),
name: "output".into(),
direction: PortDirection::Output,
action: None,
pending_command: None,
buffer: Default::default(),
feedback_buffer: None,
downstream: Vec::new(),
feedback_downstream: Vec::new(),
feedback_ptrs: Vec::new(),
downstream_input_ptrs: Vec::new(),
downstream_nodes: Vec::new(),
parent: std::ptr::null_mut(),
upstream_buffer: None,
});
Self {
value,
state: NodeState::new(sample_rate),
outputs,
}
}
}
impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
fn metadata(&self) -> NodeMetadata {
NodeMetadata {
type_name: None,
name: "ConstantSource".into(),
category: NodeCategory::Source,
description: String::new(),
author: String::new(),
version: "1.0".into(),
signal_inputs: 0,
signal_outputs: 1,
control_inputs: 0,
control_outputs: 0,
clock_inputs: 0,
clock_outputs: 0,
feedback_ports: 0,
parameters: vec![],
}
}
fn init(&mut self, _sample_rate: f32) {}
fn reset(&mut self) {}
fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
None
}
fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
Ok(())
}
fn id(&self) -> NodeId {
NodeId(0)
}
fn set_id(&mut self, _id: NodeId) {}
fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
None
}
fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
None
}
fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
self.outputs.get(index)
}
fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
self.outputs.get_mut(index)
}
fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
None
}
fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
None
}
fn state(&self) -> &NodeState<T, BUF_SIZE> {
&self.state
}
fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
&mut self.state
}
}
impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
fn generate(
&mut self,
_clock: &ClockTick,
_control_inputs: &[T],
_clock_inputs: &[ClockTick],
) -> ProcessResult<()> {
let out = self.outputs[0].buffer.as_mut_array();
for sample in out.iter_mut() {
*sample = self.value;
}
Ok(())
}
fn num_signal_outputs(&self) -> usize {
1
}
}
struct NoopProcessor<T: Transcendental, const BUF_SIZE: usize> {
state: NodeState<T, BUF_SIZE>,
}
impl<T: Transcendental, const BUF_SIZE: usize> NoopProcessor<T, BUF_SIZE> {
fn new(sample_rate: f32) -> Self {
Self {
state: NodeState::new(sample_rate),
}
}
}
impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for NoopProcessor<T, BUF_SIZE> {
fn metadata(&self) -> NodeMetadata {
NodeMetadata {
type_name: None,
name: "NoopProcessor".into(),
category: NodeCategory::Processor,
description: String::new(),
author: String::new(),
version: "1.0".into(),
signal_inputs: 0,
signal_outputs: 0,
control_inputs: 0,
control_outputs: 0,
clock_inputs: 0,
clock_outputs: 0,
feedback_ports: 0,
parameters: vec![],
}
}
fn init(&mut self, _sample_rate: f32) {}
fn reset(&mut self) {}
fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
None
}
fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
Ok(())
}
fn id(&self) -> NodeId {
NodeId(1)
}
fn set_id(&mut self, _id: NodeId) {}
fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
None
}
fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
None
}
fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
None
}
fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
None
}
fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
None
}
fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
None
}
fn state(&self) -> &NodeState<T, BUF_SIZE> {
&self.state
}
fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
&mut self.state
}
}
impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
for NoopProcessor<T, BUF_SIZE>
{
fn process(
&mut self,
_clock: &ClockTick,
_signal_inputs: &[&[T; BUF_SIZE]],
_control_inputs: &[T],
_clock_inputs: &[ClockTick],
_feedback_inputs: &[&[T; BUF_SIZE]],
) -> ProcessResult<()> {
Ok(())
}
}
struct NoopSink<T: Transcendental, const BUF_SIZE: usize> {
state: NodeState<T, BUF_SIZE>,
}
impl<T: Transcendental, const BUF_SIZE: usize> NoopSink<T, BUF_SIZE> {
fn new(sample_rate: f32) -> Self {
Self {
state: NodeState::new(sample_rate),
}
}
}
impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
fn metadata(&self) -> NodeMetadata {
NodeMetadata {
type_name: None,
name: "NoopSink".into(),
category: NodeCategory::Sink,
description: String::new(),
author: String::new(),
version: "1.0".into(),
signal_inputs: 0,
signal_outputs: 0,
control_inputs: 0,
control_outputs: 0,
clock_inputs: 0,
clock_outputs: 0,
feedback_ports: 0,
parameters: vec![],
}
}
fn init(&mut self, _sample_rate: f32) {}
fn reset(&mut self) {}
fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
None
}
fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
Ok(())
}
fn id(&self) -> NodeId {
NodeId(2)
}
fn set_id(&mut self, _id: NodeId) {}
fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
None
}
fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
None
}
fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
None
}
fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
None
}
fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
None
}
fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
None
}
fn state(&self) -> &NodeState<T, BUF_SIZE> {
&self.state
}
fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
&mut self.state
}
}
impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
fn consume(
&mut self,
_clock: &ClockTick,
_signal_inputs: &[&[T; BUF_SIZE]],
_control_inputs: &[T],
_clock_inputs: &[ClockTick],
_feedback_inputs: &[&[T; BUF_SIZE]],
) -> ProcessResult<()> {
Ok(())
}
}
#[test]
fn test_topo_order_correct() {
const BUF: usize = 64;
let mut builder = test_builder::<BUF>();
let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
let proc = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
let sink = builder.add_sink(Box::new(NoopSink::new(44100.0)));
builder.connect_signal(src, 0, proc, 0);
builder.connect_signal(proc, 0, sink, 0);
let graph = builder.build().expect("build failed");
let order = graph.topo_order();
let src_pos = order.iter().position(|&i| i == src).unwrap();
let proc_pos = order.iter().position(|&i| i == proc).unwrap();
let sink_pos = order.iter().position(|&i| i == sink).unwrap();
assert!(src_pos < proc_pos);
assert!(proc_pos < sink_pos);
}
#[test]
fn test_cycle_detection() {
const BUF: usize = 64;
let mut builder = test_builder::<BUF>();
let a = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
let b = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
builder.connect_signal(a, 0, b, 0);
builder.connect_signal(b, 0, a, 0);
let result = builder.build();
assert!(matches!(result, Err(BuildError::CycleDetected)));
}
#[test]
fn test_source_node_create() {
const BUF: usize = 64;
let mut builder = test_builder::<BUF>();
let idx = builder.add_source(Box::new(ConstantSource::new(0.5, 44100.0)));
let graph = builder.build().expect("build failed");
assert_eq!(graph.node_count(), 1);
assert_eq!(graph.topo_order(), &[idx]);
}
pub struct TestSink<T: Transcendental, const BUF_SIZE: usize> {
id: NodeId,
state: NodeState<T, BUF_SIZE>,
pub inputs: Vec<Port<T, BUF_SIZE>>,
last_value: T,
}
impl<T: Transcendental, const BUF_SIZE: usize> TestSink<T, BUF_SIZE> {
fn new(id: NodeId, sample_rate: f32) -> Self {
let mut inputs = Vec::new();
inputs.push(Port::input(id, 0, "in"));
Self {
id,
state: NodeState::new(sample_rate),
inputs,
last_value: T::ZERO,
}
}
}
impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
fn metadata(&self) -> NodeMetadata {
NodeMetadata {
type_name: None,
name: "TestSink".into(),
category: NodeCategory::Sink,
description: String::new(),
author: String::new(),
version: "1.0".into(),
signal_inputs: 1,
signal_outputs: 0,
control_inputs: 0,
control_outputs: 0,
clock_inputs: 0,
clock_outputs: 0,
feedback_ports: 0,
parameters: vec![],
}
}
fn init(&mut self, _: f32) {}
fn reset(&mut self) {
self.state.sample_pos = 0;
self.state.blocks_processed = 0;
}
fn id(&self) -> NodeId {
self.id
}
fn set_id(&mut self, id: NodeId) {
self.id = id;
}
fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
None
}
fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
Ok(())
}
fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
self.inputs.get(i)
}
fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
self.inputs.get_mut(i)
}
fn output_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
None
}
fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
None
}
fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
None
}
fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
None
}
fn num_signal_inputs(&self) -> usize {
1
}
fn num_signal_outputs(&self) -> usize {
0
}
fn state(&self) -> &NodeState<T, BUF_SIZE> {
&self.state
}
fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
&mut self.state
}
}
impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
fn consume(
&mut self,
_clock: &ClockTick,
_signal_inputs: &[&[T; BUF_SIZE]],
_control_inputs: &[T],
_clock_inputs: &[ClockTick],
_feedback_inputs: &[&[T; BUF_SIZE]],
) -> ProcessResult<()> {
if let Some(port) = self.inputs.first() {
self.last_value = port.buffer.as_array()[0];
}
self.state.advance();
Ok(())
}
}
pub struct GainProcessor<T: Transcendental, const BUF_SIZE: usize> {
id: NodeId,
state: NodeState<T, BUF_SIZE>,
pub inputs: Vec<Port<T, BUF_SIZE>>,
pub outputs: Vec<Port<T, BUF_SIZE>>,
pub multiplier: T,
}
impl<T: Transcendental, const BUF_SIZE: usize> GainProcessor<T, BUF_SIZE> {
fn new(id: NodeId, sample_rate: f32, multiplier: T) -> Self {
let mut inputs = Vec::new();
inputs.push(Port::input(id, 0, "in"));
let mut outputs = Vec::new();
outputs.push(Port::output(id, 0, "out"));
Self {
id,
state: NodeState::new(sample_rate),
inputs,
outputs,
multiplier,
}
}
}
impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for GainProcessor<T, BUF_SIZE> {
fn metadata(&self) -> NodeMetadata {
NodeMetadata {
type_name: None,
name: "GainProcessor".into(),
category: NodeCategory::Processor,
description: String::new(),
author: String::new(),
version: "1.0".into(),
signal_inputs: 1,
signal_outputs: 1,
control_inputs: 0,
control_outputs: 0,
clock_inputs: 0,
clock_outputs: 0,
feedback_ports: 0,
parameters: vec![],
}
}
fn init(&mut self, _: f32) {}
fn reset(&mut self) {
self.state.sample_pos = 0;
self.state.blocks_processed = 0;
}
fn id(&self) -> NodeId {
self.id
}
fn set_id(&mut self, id: NodeId) {
self.id = id;
}
fn get_parameter(&self, id: &ParameterId) -> Option<ParamValue> {
match id.as_str() {
"multiplier" => Some(ParamValue::Float(self.multiplier.to_f32())),
_ => None,
}
}
fn set_parameter(&mut self, id: &ParameterId, value: ParamValue) -> ProcessResult<()> {
match id.as_str() {
"multiplier" => {
if let Some(v) = value.as_f32() {
self.multiplier = T::from_f32(v);
Ok(())
} else {
Err(rill_core::ProcessError::parameter("expected float"))
}
}
_ => Err(rill_core::ProcessError::parameter("unknown")),
}
}
fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
self.inputs.get(i)
}
fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
self.inputs.get_mut(i)
}
fn output_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
self.outputs.get(i)
}
fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
self.outputs.get_mut(i)
}
fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
None
}
fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
None
}
fn num_signal_inputs(&self) -> usize {
1
}
fn num_signal_outputs(&self) -> usize {
1
}
fn state(&self) -> &NodeState<T, BUF_SIZE> {
&self.state
}
fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
&mut self.state
}
}
impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
for GainProcessor<T, BUF_SIZE>
{
fn process(
&mut self,
_clock: &ClockTick,
_signal_inputs: &[&[T; BUF_SIZE]],
_control_inputs: &[T],
_clock_inputs: &[ClockTick],
_feedback_inputs: &[&[T; BUF_SIZE]],
) -> ProcessResult<()> {
let inp = *self.inputs[0].buffer.as_array();
let out = self.outputs[0].buffer.as_mut_array();
for i in 0..BUF_SIZE {
out[i] = inp[i] * self.multiplier;
}
self.state.advance();
Ok(())
}
fn latency(&self) -> usize {
0
}
}
#[test]
fn test_graph_source_to_sink() {
use rill_core::traits::algorithm::ActionContext;
use rill_core::traits::processable::{ProcessContext, Processable};
const BUF: usize = 64;
let mut builder = test_builder::<BUF>();
let src = builder.add_source(Box::new(ConstantSource::new(42.0, 44100.0)));
let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(1), 44100.0)));
builder.connect_signal(src, 0, snk, 0);
let graph = builder.build().unwrap();
let (mut nodes, topo, _, _bufs) = graph.into_parts();
let tick = ClockTick::new(0, BUF as u32, 44100.0);
let mut ctx = ProcessContext { clock: &tick };
let _ = nodes[topo[0]].process_block(&mut ctx);
let action_ctx = ActionContext::new(&tick);
let out_port = nodes[topo[0]].output_port(0).unwrap();
out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
let sink_val = nodes[topo[1]].input_port(0).unwrap().buffer.as_array()[0];
assert_eq!(sink_val, 42.0, "sink should receive source value");
}
#[test]
fn test_graph_source_proc_sink() {
use rill_core::traits::algorithm::ActionContext;
use rill_core::traits::processable::{ProcessContext, Processable};
const BUF: usize = 64;
let mut builder = test_builder::<BUF>();
let src = builder.add_source(Box::new(ConstantSource::new(10.0, 44100.0)));
let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
NodeId(1),
44100.0,
3.0,
)));
let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
builder.connect_signal(src, 0, proc, 0);
builder.connect_signal(proc, 0, snk, 0);
let graph = builder.build().unwrap();
let (mut nodes, topo, _, _bufs) = graph.into_parts();
let tick = ClockTick::new(0, BUF as u32, 44100.0);
let mut ctx = ProcessContext { clock: &tick };
let _ = nodes[topo[0]].process_block(&mut ctx);
let action_ctx = ActionContext::new(&tick);
let out_port = nodes[topo[0]].output_port(0).unwrap();
out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
assert!(
(sink_val - 30.0).abs() < 1e-6,
"source(10)×gain(3)=30, got {}",
sink_val
);
}
#[test]
fn test_command_queue_drain() {
use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
use rill_core::traits::PortId;
const BUF: usize = 64;
let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
let mut builder = test_builder::<BUF>();
builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
NodeId(0),
44100.0,
2.0,
)));
let graph = builder.build().unwrap();
let (mut nodes, _, _, _bufs) = graph.into_parts();
let _ = queue.push(SetParameter::new(
PortId::control_in(NodeId(0), 0),
ParameterId::new("multiplier").unwrap(),
ParamValue::Float(5.0),
SignalOrigin::Manual,
));
while let Some(cmd) = queue.pop() {
let idx = cmd.port.node_id().inner() as usize;
let pid = cmd.parameter.clone();
let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
}
let pid = ParameterId::new("multiplier").unwrap();
let val = nodes[0].get_parameter(&pid).unwrap().as_f32().unwrap();
assert!(
(val - 5.0).abs() < 1e-6,
"multiplier should be 5.0, got {}",
val
);
}
#[test]
fn test_command_then_propagate() {
use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
use rill_core::traits::algorithm::ActionContext;
use rill_core::traits::processable::{ProcessContext, Processable};
use rill_core::traits::PortId;
const BUF: usize = 64;
let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
let mut builder = test_builder::<BUF>();
let src = builder.add_source(Box::new(ConstantSource::new(7.0, 44100.0)));
let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
NodeId(1),
44100.0,
2.0,
)));
let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
builder.connect_signal(src, 0, proc, 0);
builder.connect_signal(proc, 0, snk, 0);
let graph = builder.build().unwrap();
let (mut nodes, topo, _, _bufs) = graph.into_parts();
let tick = ClockTick::new(0, BUF as u32, 44100.0);
let _ = queue.push(SetParameter::new(
PortId::control_in(NodeId(1), 0),
ParameterId::new("multiplier").unwrap(),
ParamValue::Float(4.0),
SignalOrigin::Manual,
));
while let Some(cmd) = queue.pop() {
let idx = cmd.port.node_id().inner() as usize;
let pid = cmd.parameter.clone();
let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
}
let pid = ParameterId::new("multiplier").unwrap();
let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
assert!((val - 4.0).abs() < 1e-6);
let mut ctx = ProcessContext { clock: &tick };
let _ = nodes[topo[0]].process_block(&mut ctx);
let action_ctx = ActionContext::new(&tick);
let out_port = nodes[topo[0]].output_port(0).unwrap();
out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
assert!(
(sink_val - 28.0).abs() < 1e-6,
"source(7)×gain(4)=28, got {}",
sink_val
);
}
#[test]
fn test_feedback_propagation() {
use rill_core::traits::algorithm::ActionContext;
use rill_core::traits::processable::{ProcessContext, Processable};
const BUF: usize = 64;
let mut builder = test_builder::<BUF>();
let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
NodeId(1),
44100.0,
2.0,
)));
let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
builder.connect_signal(src, 0, proc, 0);
builder.connect_signal(proc, 0, snk, 0);
builder.connect_feedback(proc, 0, proc, 0);
let graph = builder.build().unwrap();
let (mut nodes, topo, _, _bufs) = graph.into_parts();
let tick1 = ClockTick::new(0, BUF as u32, 44100.0);
let mut ctx = ProcessContext { clock: &tick1 };
let _ = nodes[topo[0]].process_block(&mut ctx);
let ctx1 = ActionContext::new(&tick1);
let out_port = nodes[topo[0]].output_port(0).unwrap();
out_port.propagate(out_port.buffer(), &ctx1).unwrap();
let block1 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
assert!(
(block1 - 2.0).abs() < 1e-6,
"block1: 1.0×2.0=2.0, got {}",
block1
);
let tick2 = ClockTick::new(BUF as u64, BUF as u32, 44100.0);
let mut ctx = ProcessContext { clock: &tick2 };
let _ = nodes[topo[0]].process_block(&mut ctx);
let ctx2 = ActionContext::new(&tick2);
let out_port = nodes[topo[0]].output_port(0).unwrap();
out_port.propagate(out_port.buffer(), &ctx2).unwrap();
let block2 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
assert!(
(block2 - 6.0).abs() < 1e-6,
"block2: (1+2)×2=6.0, got {}",
block2
);
}
#[test]
fn test_drain_fn_before_propagate() {
use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
use rill_core::traits::algorithm::ActionContext;
use rill_core::traits::processable::{ProcessContext, Processable};
use rill_core::traits::PortId;
const BUF: usize = 64;
let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
let mut builder = test_builder::<BUF>();
let src = builder.add_source(Box::new(ConstantSource::new(5.0, 44100.0)));
let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
NodeId(1),
44100.0,
1.0,
)));
let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
builder.connect_signal(src, 0, proc, 0);
builder.connect_signal(proc, 0, snk, 0);
let graph = builder.build().unwrap();
let (mut nodes, topo, _, _bufs) = graph.into_parts();
let tick = ClockTick::new(0, BUF as u32, 44100.0);
let _ = queue.push(SetParameter::new(
PortId::control_in(NodeId(1), 0),
ParameterId::new("multiplier").unwrap(),
ParamValue::Float(3.0),
SignalOrigin::Manual,
));
while let Some(cmd) = queue.pop() {
let idx = cmd.port.node_id().inner() as usize;
let pid = cmd.parameter.clone();
let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
}
let pid = ParameterId::new("multiplier").unwrap();
let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
assert!(
(val - 3.0).abs() < 1e-6,
"multiplier should be 3.0, got {}",
val
);
let mut ctx = ProcessContext { clock: &tick };
let _ = nodes[topo[0]].process_block(&mut ctx).unwrap();
let action_ctx = ActionContext::new(&tick);
let out_port = nodes[topo[0]].output_port(0).unwrap();
out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
assert!(
(sink_val - 15.0).abs() < 1e-6,
"source(5)×gain(3)=15, got {}",
sink_val
);
}
}