1use crate::registry::{NodeRegistry, RegistryError};
2use rill_core::buffer::{BufferRegistry, FixedBuffer, TapeLoop};
3use rill_core::math::Transcendental;
4use rill_core::time::{ClockSource, ClockTick, SystemClock};
5use rill_core::traits::port::Port;
6use rill_core::traits::{NodeId, NodeParams, NodeVariant, SignalNode};
7use std::collections::VecDeque;
8
9#[derive(Debug, Clone)]
19pub enum BuildError {
20 CycleDetected,
22}
23
24pub(crate) struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
33 pub(crate) node: NodeVariant<T, BUF_SIZE>,
34}
35
36#[derive(Clone)]
42pub struct GraphResource {
43 pub name: String,
45 pub kind: String,
47 pub capacity: usize,
49}
50
51pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
53 nodes: Vec<NodeEntry<T, BUF_SIZE>>,
54 signal_edges: Vec<(usize, usize, usize, usize)>,
55 control_edges: Vec<(usize, usize, usize, usize)>,
56 clock_edges: Vec<(usize, usize, usize, usize)>,
57 feedback_edges: Vec<(usize, usize, usize, usize)>,
58 resources: Vec<GraphResource>,
59}
60
61impl<T: Transcendental, const BUF_SIZE: usize> Default for GraphBuilder<T, BUF_SIZE> {
62 fn default() -> Self {
63 Self::new()
64 }
65}
66
67impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
68 pub fn new() -> Self {
70 Self {
71 nodes: Vec::new(),
72 signal_edges: Vec::new(),
73 control_edges: Vec::new(),
74 clock_edges: Vec::new(),
75 feedback_edges: Vec::new(),
76 resources: Vec::new(),
77 }
78 }
79
80 pub fn add_resource(&mut self, resource: GraphResource) {
82 self.resources.push(resource);
83 }
84
85 pub fn add_source(&mut self, source: Box<dyn rill_core::traits::Source<T, BUF_SIZE>>) -> usize {
87 let idx = self.nodes.len();
88 self.nodes.push(NodeEntry {
89 node: NodeVariant::Source(source),
90 });
91 idx
92 }
93
94 pub fn add_processor(
96 &mut self,
97 processor: Box<dyn rill_core::traits::Processor<T, BUF_SIZE>>,
98 ) -> usize {
99 let idx = self.nodes.len();
100 self.nodes.push(NodeEntry {
101 node: NodeVariant::Processor(processor),
102 });
103 idx
104 }
105
106 pub fn add_sink(&mut self, sink: Box<dyn rill_core::traits::Sink<T, BUF_SIZE>>) -> usize {
108 let idx = self.nodes.len();
109 self.nodes.push(NodeEntry {
110 node: NodeVariant::Sink(sink),
111 });
112 idx
113 }
114
115 pub fn add_router(&mut self, router: Box<dyn rill_core::traits::Router<T, BUF_SIZE>>) -> usize {
117 let idx = self.nodes.len();
118 self.nodes.push(NodeEntry {
119 node: NodeVariant::Router(router),
120 });
121 idx
122 }
123
124 pub fn add_node(
138 &mut self,
139 registry: &NodeRegistry<T, BUF_SIZE>,
140 type_name: &str,
141 params: &NodeParams,
142 ) -> Result<usize, RegistryError> {
143 let id = NodeId(self.nodes.len() as u32);
144 self.add_node_with_id(registry, type_name, params, id)
145 }
146
147 pub fn add_node_with_id(
165 &mut self,
166 registry: &NodeRegistry<T, BUF_SIZE>,
167 type_name: &str,
168 params: &NodeParams,
169 id: NodeId,
170 ) -> Result<usize, RegistryError> {
171 let node = registry.construct(type_name, id, params)?;
172 let idx = self.nodes.len();
173 self.nodes.push(NodeEntry { node });
174 Ok(idx)
175 }
176
177 pub fn node_count(&self) -> usize {
179 self.nodes.len()
180 }
181
182 pub fn connect_signal(
185 &mut self,
186 from_node: usize,
187 from_port: usize,
188 to_node: usize,
189 to_port: usize,
190 ) {
191 self.signal_edges
192 .push((from_node, from_port, to_node, to_port));
193 }
194
195 pub fn connect_control(
197 &mut self,
198 from_node: usize,
199 from_port: usize,
200 to_node: usize,
201 to_port: usize,
202 ) {
203 self.control_edges
204 .push((from_node, from_port, to_node, to_port));
205 }
206
207 pub fn connect_clock(
209 &mut self,
210 from_node: usize,
211 from_port: usize,
212 to_node: usize,
213 to_port: usize,
214 ) {
215 self.clock_edges
216 .push((from_node, from_port, to_node, to_port));
217 }
218
219 pub fn connect_feedback(
222 &mut self,
223 from_node: usize,
224 from_port: usize,
225 to_node: usize,
226 to_port: usize,
227 ) {
228 self.feedback_edges
229 .push((from_node, from_port, to_node, to_port));
230 }
231
232 pub fn build(
238 mut self,
239 clock_source: Box<dyn ClockSource>,
240 ) -> Result<SignalGraph<T, BUF_SIZE>, BuildError> {
241 let num_nodes = self.nodes.len();
242
243 let mut in_degree = vec![0usize; num_nodes];
245 let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
246
247 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
248 in_degree[to_n] += 1;
249 out_edges[from_n].push((from_p, to_n, to_p));
250 }
251
252 let mut queue: VecDeque<usize> = in_degree
254 .iter()
255 .enumerate()
256 .filter(|(_, &d)| d == 0)
257 .map(|(i, _)| i)
258 .collect();
259
260 let mut topo = Vec::with_capacity(num_nodes);
261 let mut indeg = in_degree;
262 while let Some(idx) = queue.pop_front() {
263 topo.push(idx);
264 for &(_, to_n, _) in &out_edges[idx] {
265 indeg[to_n] -= 1;
266 if indeg[to_n] == 0 {
267 queue.push_back(to_n);
268 }
269 }
270 }
271
272 if topo.len() != num_nodes {
273 return Err(BuildError::CycleDetected);
274 }
275
276 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
278 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
280 port.downstream.push((to_n, to_p));
281 }
282 let in_ptr: *mut Port<T, BUF_SIZE> = self.nodes[to_n]
284 .node
285 .input_port_mut(to_p)
286 .map(|p| p as *mut Port<T, BUF_SIZE>)
287 .unwrap_or(std::ptr::null_mut());
288 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
289 let out_ptr: *mut Port<T, BUF_SIZE> = self.nodes[from_n]
290 .node
291 .output_port_mut(from_p)
292 .map(|p| p as *mut Port<T, BUF_SIZE>)
293 .unwrap_or(std::ptr::null_mut());
294 if !in_ptr.is_null() && !out_ptr.is_null() {
296 #[allow(unsafe_code)]
297 unsafe {
298 (*in_ptr).parent = parent;
299 (*out_ptr).downstream_input_ptrs.push(in_ptr);
300 }
301 }
302 }
303
304 for &(from_n, from_p, to_n, _) in &self.signal_edges {
306 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
307 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
308 let ptr_val = parent as usize;
309 let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
310 if !already {
311 port.downstream_nodes.push(parent);
312 }
313 }
314 }
315
316 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
318 let upstream = self.nodes[from_n]
319 .node
320 .output_port(from_p)
321 .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
322 if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
323 if port.upstream_buffer.is_none() {
324 port.upstream_buffer = upstream;
326 } else {
327 port.upstream_buffer = None;
329 }
330 }
331 }
332
333 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
335 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
336 port.feedback_buffer = Some(FixedBuffer::new());
337 port.feedback_downstream.push((to_n, to_p));
338 }
339 if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
340 port.feedback_buffer = Some(FixedBuffer::new());
341 }
342 }
343 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
345 let ptr = self.nodes[to_n]
346 .node
347 .input_port(to_p)
348 .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
349 .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
350 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
351 if let Some(p) = ptr {
352 port.feedback_ptrs.push(p);
353 }
354 }
355 }
356
357 let sample_rate = clock_source.sample_rate();
358
359 let mut buffers = BufferRegistry::new();
361 for r in &self.resources {
362 if r.kind == "tape" {
363 if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
364 buffers.register(&r.name, Box::new(tape));
365 }
366 }
367 }
368 let allocated = self.resources.clone();
369
370 Ok(SignalGraph {
371 nodes: self.nodes,
372 topo_order: topo,
373 clock_source,
374 resources: allocated,
375 current_tick: ClockTick::new(0, BUF_SIZE as u32, sample_rate),
376 buffers,
377 })
378 }
379}
380
381pub struct SignalGraph<T: Transcendental, const BUF_SIZE: usize> {
393 nodes: Vec<NodeEntry<T, BUF_SIZE>>,
394 topo_order: Vec<usize>,
395 #[allow(dead_code)]
396 clock_source: Box<dyn ClockSource>,
397 current_tick: ClockTick,
398 pub(crate) resources: Vec<GraphResource>,
400 pub buffers: BufferRegistry<T>,
402}
403
404impl<T: Transcendental, const BUF_SIZE: usize> SignalGraph<T, BUF_SIZE> {
405 pub fn new(clock_source: Box<dyn ClockSource>) -> Self {
407 let sample_rate = clock_source.sample_rate();
408 Self {
409 nodes: Vec::new(),
410 topo_order: Vec::new(),
411 clock_source,
412 current_tick: ClockTick::new(0, BUF_SIZE as u32, sample_rate),
413 resources: Vec::new(),
414 buffers: BufferRegistry::new(),
415 }
416 }
417
418 pub fn with_sample_rate(sample_rate: f32) -> Self {
420 Self::new(Box::new(SystemClock::with_sample_rate(sample_rate)))
421 }
422
423 pub fn output_buffer(&self, node_idx: usize, port_idx: usize) -> Option<&[T; BUF_SIZE]> {
425 self.nodes
426 .get(node_idx)?
427 .node
428 .output_port(port_idx)
429 .map(|p| p.buffer.as_array())
430 }
431
432 pub fn current_tick(&self) -> ClockTick {
438 self.current_tick
439 }
440
441 pub fn node_count(&self) -> usize {
443 self.nodes.len()
444 }
445
446 pub fn topo_order(&self) -> &[usize] {
448 &self.topo_order
449 }
450
451 #[allow(dead_code)]
454 pub(crate) fn node_entries(&self) -> &[NodeEntry<T, BUF_SIZE>] {
455 &self.nodes
456 }
457
458 #[allow(dead_code)]
459 pub(crate) fn sample_rate(&self) -> f32 {
460 self.current_tick.sample_rate
461 }
462
463 #[allow(dead_code)]
465 pub fn resources(&self) -> &[GraphResource] {
466 &self.resources
467 }
468
469 pub fn dispatch_set_parameters(
476 &mut self,
477 commands: &[rill_core::queues::signal::SetParameter],
478 ) {
479 for cmd in commands {
480 let target = cmd.port.node_id();
481 for entry in self.nodes.iter_mut() {
482 if entry.node.id() == target {
483 let _ = entry.node.apply_set_parameter(cmd);
484 break;
485 }
486 }
487 }
488 }
489
490 pub fn into_parts(self) -> (Vec<NodeVariant<T, BUF_SIZE>>, Vec<usize>, ClockTick) {
492 let nodes = self.nodes.into_iter().map(|e| e.node).collect();
493 (nodes, self.topo_order, self.current_tick)
494 }
495}
496
497#[cfg(test)]
502mod tests {
503 use super::*;
504 use rill_core::math::Transcendental;
505 use rill_core::time::ClockTick;
506 use rill_core::traits::{
507 processable::{ProcessContext, Processable},
508 NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
509 PortDirection, PortId, ProcessResult, Processor, SignalNode, Sink, Source,
510 };
511 use std::sync::Arc;
512
513 struct ConstantSource<T: Transcendental, const BUF_SIZE: usize> {
517 value: T,
518 state: NodeState<T, BUF_SIZE>,
519 outputs: Vec<Port<T, BUF_SIZE>>,
520 }
521
522 impl<T: Transcendental, const BUF_SIZE: usize> ConstantSource<T, BUF_SIZE> {
523 fn new(value: T, sample_rate: f32) -> Self {
524 let mut outputs = Vec::with_capacity(1);
525 outputs.push(Port {
526 id: PortId::signal_out(NodeId(0), 0),
527 name: "output".into(),
528 direction: PortDirection::Output,
529 action: None,
530 pending_command: None,
531 buffer: Default::default(),
532 feedback_buffer: None,
533 downstream: Vec::new(),
534 feedback_downstream: Vec::new(),
535 feedback_ptrs: Vec::new(),
536 downstream_input_ptrs: Vec::new(),
537 downstream_nodes: Vec::new(),
538 parent: std::ptr::null_mut(),
539 upstream_buffer: None,
540 });
541 Self {
542 value,
543 state: NodeState::new(sample_rate),
544 outputs,
545 }
546 }
547 }
548
549 impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
550 for ConstantSource<T, BUF_SIZE>
551 {
552 fn metadata(&self) -> NodeMetadata {
553 NodeMetadata {
554 type_name: None,
555 name: "ConstantSource".into(),
556 category: NodeCategory::Source,
557 description: String::new(),
558 author: String::new(),
559 version: "1.0".into(),
560 signal_inputs: 0,
561 signal_outputs: 1,
562 control_inputs: 0,
563 control_outputs: 0,
564 clock_inputs: 0,
565 clock_outputs: 0,
566 feedback_ports: 0,
567 parameters: vec![],
568 }
569 }
570 fn init(&mut self, _sample_rate: f32) {}
571 fn reset(&mut self) {}
572 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
573 None
574 }
575 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
576 Ok(())
577 }
578 fn id(&self) -> NodeId {
579 NodeId(0)
580 }
581 fn set_id(&mut self, _id: NodeId) {}
582 fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
583 None
584 }
585 fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
586 None
587 }
588 fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
589 self.outputs.get(index)
590 }
591 fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
592 self.outputs.get_mut(index)
593 }
594 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
595 None
596 }
597 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
598 None
599 }
600 fn state(&self) -> &NodeState<T, BUF_SIZE> {
601 &self.state
602 }
603 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
604 &mut self.state
605 }
606 }
607
608 impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
609 fn generate(
610 &mut self,
611 _clock: &ClockTick,
612 _control_inputs: &[T],
613 _clock_inputs: &[ClockTick],
614 ) -> ProcessResult<()> {
615 let out = self.outputs[0].buffer.as_mut_array();
616 for sample in out.iter_mut() {
617 *sample = self.value;
618 }
619 Ok(())
620 }
621 fn num_signal_outputs(&self) -> usize {
622 1
623 }
624 }
625
626 struct NoopProcessor<T: Transcendental, const BUF_SIZE: usize> {
630 state: NodeState<T, BUF_SIZE>,
631 }
632
633 impl<T: Transcendental, const BUF_SIZE: usize> NoopProcessor<T, BUF_SIZE> {
634 fn new(sample_rate: f32) -> Self {
635 Self {
636 state: NodeState::new(sample_rate),
637 }
638 }
639 }
640
641 impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
642 for NoopProcessor<T, BUF_SIZE>
643 {
644 fn metadata(&self) -> NodeMetadata {
645 NodeMetadata {
646 type_name: None,
647 name: "NoopProcessor".into(),
648 category: NodeCategory::Processor,
649 description: String::new(),
650 author: String::new(),
651 version: "1.0".into(),
652 signal_inputs: 0,
653 signal_outputs: 0,
654 control_inputs: 0,
655 control_outputs: 0,
656 clock_inputs: 0,
657 clock_outputs: 0,
658 feedback_ports: 0,
659 parameters: vec![],
660 }
661 }
662 fn init(&mut self, _sample_rate: f32) {}
663 fn reset(&mut self) {}
664 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
665 None
666 }
667 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
668 Ok(())
669 }
670 fn id(&self) -> NodeId {
671 NodeId(1)
672 }
673 fn set_id(&mut self, _id: NodeId) {}
674 fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
675 None
676 }
677 fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
678 None
679 }
680 fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
681 None
682 }
683 fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
684 None
685 }
686 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
687 None
688 }
689 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
690 None
691 }
692 fn state(&self) -> &NodeState<T, BUF_SIZE> {
693 &self.state
694 }
695 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
696 &mut self.state
697 }
698 }
699
700 impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
701 for NoopProcessor<T, BUF_SIZE>
702 {
703 fn process(
704 &mut self,
705 _clock: &ClockTick,
706 _signal_inputs: &[&[T; BUF_SIZE]],
707 _control_inputs: &[T],
708 _clock_inputs: &[ClockTick],
709 _feedback_inputs: &[&[T; BUF_SIZE]],
710 ) -> ProcessResult<()> {
711 Ok(())
712 }
713 }
714
715 struct NoopSink<T: Transcendental, const BUF_SIZE: usize> {
719 state: NodeState<T, BUF_SIZE>,
720 }
721
722 impl<T: Transcendental, const BUF_SIZE: usize> NoopSink<T, BUF_SIZE> {
723 fn new(sample_rate: f32) -> Self {
724 Self {
725 state: NodeState::new(sample_rate),
726 }
727 }
728 }
729
730 impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
731 fn metadata(&self) -> NodeMetadata {
732 NodeMetadata {
733 type_name: None,
734 name: "NoopSink".into(),
735 category: NodeCategory::Sink,
736 description: String::new(),
737 author: String::new(),
738 version: "1.0".into(),
739 signal_inputs: 0,
740 signal_outputs: 0,
741 control_inputs: 0,
742 control_outputs: 0,
743 clock_inputs: 0,
744 clock_outputs: 0,
745 feedback_ports: 0,
746 parameters: vec![],
747 }
748 }
749 fn init(&mut self, _sample_rate: f32) {}
750 fn reset(&mut self) {}
751 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
752 None
753 }
754 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
755 Ok(())
756 }
757 fn id(&self) -> NodeId {
758 NodeId(2)
759 }
760 fn set_id(&mut self, _id: NodeId) {}
761 fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
762 None
763 }
764 fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
765 None
766 }
767 fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
768 None
769 }
770 fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
771 None
772 }
773 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
774 None
775 }
776 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
777 None
778 }
779 fn state(&self) -> &NodeState<T, BUF_SIZE> {
780 &self.state
781 }
782 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
783 &mut self.state
784 }
785 }
786
787 impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
788 fn consume(
789 &mut self,
790 _clock: &ClockTick,
791 _signal_inputs: &[&[T; BUF_SIZE]],
792 _control_inputs: &[T],
793 _clock_inputs: &[ClockTick],
794 _feedback_inputs: &[&[T; BUF_SIZE]],
795 ) -> ProcessResult<()> {
796 Ok(())
797 }
798 }
799
800 #[test]
805 fn test_graph_creation() {
806 let graph = SignalGraph::<f32, 64>::with_sample_rate(44100.0);
807 assert_eq!(graph.node_count(), 0);
808 }
809
810 #[test]
811 fn test_topo_order_correct() {
812 const BUF: usize = 64;
813 let mut builder = GraphBuilder::<f32, BUF>::new();
814
815 let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
816 let proc = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
817 let sink = builder.add_sink(Box::new(NoopSink::new(44100.0)));
818
819 builder.connect_signal(src, 0, proc, 0);
820 builder.connect_signal(proc, 0, sink, 0);
821
822 let graph = builder
823 .build(Box::new(SystemClock::with_sample_rate(44100.0)))
824 .expect("build failed");
825
826 let order = graph.topo_order();
827 let src_pos = order.iter().position(|&i| i == src).unwrap();
828 let proc_pos = order.iter().position(|&i| i == proc).unwrap();
829 let sink_pos = order.iter().position(|&i| i == sink).unwrap();
830 assert!(src_pos < proc_pos);
831 assert!(proc_pos < sink_pos);
832 }
833
834 #[test]
835 fn test_cycle_detection() {
836 const BUF: usize = 64;
837 let mut builder = GraphBuilder::<f32, BUF>::new();
838
839 let a = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
840 let b = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
841
842 builder.connect_signal(a, 0, b, 0);
843 builder.connect_signal(b, 0, a, 0);
844
845 let result = builder.build(Box::new(SystemClock::with_sample_rate(44100.0)));
846 assert!(matches!(result, Err(BuildError::CycleDetected)));
847 }
848
849 #[test]
850 fn test_source_node_create() {
851 const BUF: usize = 64;
852 let mut builder = GraphBuilder::<f32, BUF>::new();
853 let idx = builder.add_source(Box::new(ConstantSource::new(0.5, 44100.0)));
854 let graph = builder
855 .build(Box::new(SystemClock::with_sample_rate(44100.0)))
856 .expect("build failed");
857 assert_eq!(graph.node_count(), 1);
858 assert_eq!(graph.topo_order(), &[idx]);
859 }
860
861 pub struct TestSink<T: Transcendental, const BUF_SIZE: usize> {
867 id: NodeId,
868 state: NodeState<T, BUF_SIZE>,
869 pub inputs: Vec<Port<T, BUF_SIZE>>,
870 last_value: T,
871 }
872
873 impl<T: Transcendental, const BUF_SIZE: usize> TestSink<T, BUF_SIZE> {
874 fn new(id: NodeId, sample_rate: f32) -> Self {
875 let mut inputs = Vec::new();
876 inputs.push(Port::input(id, 0, "in"));
877 Self {
878 id,
879 state: NodeState::new(sample_rate),
880 inputs,
881 last_value: T::ZERO,
882 }
883 }
884 #[allow(dead_code)]
885 fn last_value(&self) -> T {
886 self.last_value
887 }
888 }
889
890 impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
891 fn metadata(&self) -> NodeMetadata {
892 NodeMetadata {
893 type_name: None,
894 name: "TestSink".into(),
895 category: NodeCategory::Sink,
896 description: String::new(),
897 author: String::new(),
898 version: "1.0".into(),
899 signal_inputs: 1,
900 signal_outputs: 0,
901 control_inputs: 0,
902 control_outputs: 0,
903 clock_inputs: 0,
904 clock_outputs: 0,
905 feedback_ports: 0,
906 parameters: vec![],
907 }
908 }
909 fn init(&mut self, _: f32) {}
910 fn reset(&mut self) {
911 self.state.sample_pos = 0;
912 self.state.blocks_processed = 0;
913 }
914 fn id(&self) -> NodeId {
915 self.id
916 }
917 fn set_id(&mut self, id: NodeId) {
918 self.id = id;
919 }
920 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
921 None
922 }
923 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
924 Ok(())
925 }
926 fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
927 self.inputs.get(i)
928 }
929 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
930 self.inputs.get_mut(i)
931 }
932 fn output_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
933 None
934 }
935 fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
936 None
937 }
938 fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
939 None
940 }
941 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
942 None
943 }
944 fn num_signal_inputs(&self) -> usize {
945 1
946 }
947 fn num_signal_outputs(&self) -> usize {
948 0
949 }
950 fn state(&self) -> &NodeState<T, BUF_SIZE> {
951 &self.state
952 }
953 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
954 &mut self.state
955 }
956 }
957
958 impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
959 fn consume(
960 &mut self,
961 _clock: &ClockTick,
962 _signal_inputs: &[&[T; BUF_SIZE]],
963 _control_inputs: &[T],
964 _clock_inputs: &[ClockTick],
965 _feedback_inputs: &[&[T; BUF_SIZE]],
966 ) -> ProcessResult<()> {
967 if let Some(port) = self.inputs.first() {
968 self.last_value = port.buffer.as_array()[0];
969 }
970 self.state.advance();
971 Ok(())
972 }
973 }
974
975 pub struct GainProcessor<T: Transcendental, const BUF_SIZE: usize> {
977 id: NodeId,
978 state: NodeState<T, BUF_SIZE>,
979 pub inputs: Vec<Port<T, BUF_SIZE>>,
980 pub outputs: Vec<Port<T, BUF_SIZE>>,
981 pub multiplier: T,
982 }
983
984 impl<T: Transcendental, const BUF_SIZE: usize> GainProcessor<T, BUF_SIZE> {
985 fn new(id: NodeId, sample_rate: f32, multiplier: T) -> Self {
986 let mut inputs = Vec::new();
987 inputs.push(Port::input(id, 0, "in"));
988 let mut outputs = Vec::new();
989 outputs.push(Port::output(id, 0, "out"));
990 Self {
991 id,
992 state: NodeState::new(sample_rate),
993 inputs,
994 outputs,
995 multiplier,
996 }
997 }
998 }
999
1000 impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
1001 for GainProcessor<T, BUF_SIZE>
1002 {
1003 fn metadata(&self) -> NodeMetadata {
1004 NodeMetadata {
1005 type_name: None,
1006 name: "GainProcessor".into(),
1007 category: NodeCategory::Processor,
1008 description: String::new(),
1009 author: String::new(),
1010 version: "1.0".into(),
1011 signal_inputs: 1,
1012 signal_outputs: 1,
1013 control_inputs: 0,
1014 control_outputs: 0,
1015 clock_inputs: 0,
1016 clock_outputs: 0,
1017 feedback_ports: 0,
1018 parameters: vec![],
1019 }
1020 }
1021 fn init(&mut self, _: f32) {}
1022 fn reset(&mut self) {
1023 self.state.sample_pos = 0;
1024 self.state.blocks_processed = 0;
1025 }
1026 fn id(&self) -> NodeId {
1027 self.id
1028 }
1029 fn set_id(&mut self, id: NodeId) {
1030 self.id = id;
1031 }
1032 fn get_parameter(&self, id: &ParameterId) -> Option<ParamValue> {
1033 match id.as_str() {
1034 "multiplier" => Some(ParamValue::Float(self.multiplier.to_f32())),
1035 _ => None,
1036 }
1037 }
1038 fn set_parameter(&mut self, id: &ParameterId, value: ParamValue) -> ProcessResult<()> {
1039 match id.as_str() {
1040 "multiplier" => {
1041 if let Some(v) = value.as_f32() {
1042 self.multiplier = T::from_f32(v);
1043 Ok(())
1044 } else {
1045 Err(rill_core::ProcessError::parameter("expected float"))
1046 }
1047 }
1048 _ => Err(rill_core::ProcessError::parameter("unknown")),
1049 }
1050 }
1051 fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1052 self.inputs.get(i)
1053 }
1054 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1055 self.inputs.get_mut(i)
1056 }
1057 fn output_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1058 self.outputs.get(i)
1059 }
1060 fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1061 self.outputs.get_mut(i)
1062 }
1063 fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1064 None
1065 }
1066 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1067 None
1068 }
1069 fn num_signal_inputs(&self) -> usize {
1070 1
1071 }
1072 fn num_signal_outputs(&self) -> usize {
1073 1
1074 }
1075 fn state(&self) -> &NodeState<T, BUF_SIZE> {
1076 &self.state
1077 }
1078 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
1079 &mut self.state
1080 }
1081 }
1082
1083 impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
1084 for GainProcessor<T, BUF_SIZE>
1085 {
1086 fn process(
1087 &mut self,
1088 _clock: &ClockTick,
1089 _signal_inputs: &[&[T; BUF_SIZE]],
1090 _control_inputs: &[T],
1091 _clock_inputs: &[ClockTick],
1092 _feedback_inputs: &[&[T; BUF_SIZE]],
1093 ) -> ProcessResult<()> {
1094 let inp = *self.inputs[0].buffer.as_array();
1095 let out = self.outputs[0].buffer.as_mut_array();
1096 for i in 0..BUF_SIZE {
1097 out[i] = inp[i] * self.multiplier;
1098 }
1099 self.state.advance();
1100 Ok(())
1101 }
1102 fn latency(&self) -> usize {
1103 0
1104 }
1105 }
1106
1107 #[test]
1110 fn test_graph_source_to_sink() {
1111 const BUF: usize = 64;
1112 let mut builder = GraphBuilder::<f32, BUF>::new();
1113 let src = builder.add_source(Box::new(ConstantSource::new(42.0, 44100.0)));
1114 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(1), 44100.0)));
1115 builder.connect_signal(src, 0, snk, 0);
1116 let graph = builder
1117 .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1118 .unwrap();
1119
1120 let (mut nodes, topo, _) = graph.into_parts();
1121 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1122
1123 let mut ctx = ProcessContext { clock: &tick };
1125 let _ = nodes[topo[0]].process_block(&mut ctx);
1126
1127 let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1129 let out_port = nodes[topo[0]].output_port(0).unwrap();
1130 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1131
1132 let sink_val = nodes[topo[1]].input_port(0).unwrap().buffer.as_array()[0];
1133 assert_eq!(sink_val, 42.0, "sink should receive source value");
1134 }
1135
1136 #[test]
1139 fn test_graph_source_proc_sink() {
1140 const BUF: usize = 64;
1141 let mut builder = GraphBuilder::<f32, BUF>::new();
1142 let src = builder.add_source(Box::new(ConstantSource::new(10.0, 44100.0)));
1143 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1144 NodeId(1),
1145 44100.0,
1146 3.0,
1147 )));
1148 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1149 builder.connect_signal(src, 0, proc, 0);
1150 builder.connect_signal(proc, 0, snk, 0);
1151 let graph = builder
1152 .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1153 .unwrap();
1154
1155 let (mut nodes, topo, _) = graph.into_parts();
1156 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1157
1158 let mut ctx = ProcessContext { clock: &tick };
1160 let _ = nodes[topo[0]].process_block(&mut ctx);
1161
1162 let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1164 let out_port = nodes[topo[0]].output_port(0).unwrap();
1165 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1166
1167 let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1168 assert!(
1169 (sink_val - 30.0).abs() < 1e-6,
1170 "source(10)×gain(3)=30, got {}",
1171 sink_val
1172 );
1173 }
1174
1175 #[test]
1178 fn test_command_queue_drain() {
1179 use rill_core::queues::{MpscQueue, SetParameter, SignalSource};
1180 use rill_core::traits::PortId;
1181
1182 const BUF: usize = 64;
1183 let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1184
1185 let mut builder = GraphBuilder::<f32, BUF>::new();
1186 builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1187 NodeId(0),
1188 44100.0,
1189 2.0,
1190 )));
1191 let graph = builder
1192 .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1193 .unwrap();
1194 let (mut nodes, _, _) = graph.into_parts();
1195
1196 let _ = queue.push(SetParameter::new(
1197 PortId::control_in(NodeId(0), 0),
1198 ParameterId::new("multiplier").unwrap(),
1199 5.0,
1200 SignalSource::Manual,
1201 ));
1202
1203 while let Some(cmd) = queue.pop() {
1204 let idx = cmd.port.node_id().inner() as usize;
1205 let pid = cmd.parameter.clone();
1206 let _ = nodes[idx].set_parameter(&pid, ParamValue::Float(cmd.value));
1207 }
1208
1209 let pid = ParameterId::new("multiplier").unwrap();
1210 let val = nodes[0].get_parameter(&pid).unwrap().as_f32().unwrap();
1211 assert!(
1212 (val - 5.0).abs() < 1e-6,
1213 "multiplier should be 5.0, got {}",
1214 val
1215 );
1216 }
1217
1218 #[test]
1221 fn test_command_then_propagate() {
1222 use rill_core::queues::{MpscQueue, SetParameter, SignalSource};
1223 use rill_core::traits::PortId;
1224
1225 const BUF: usize = 64;
1226 let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1227
1228 let mut builder = GraphBuilder::<f32, BUF>::new();
1229 let src = builder.add_source(Box::new(ConstantSource::new(7.0, 44100.0)));
1230 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1231 NodeId(1),
1232 44100.0,
1233 2.0,
1234 )));
1235 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1236 builder.connect_signal(src, 0, proc, 0);
1237 builder.connect_signal(proc, 0, snk, 0);
1238 let graph = builder
1239 .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1240 .unwrap();
1241 let (mut nodes, topo, _) = graph.into_parts();
1242 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1243
1244 let _ = queue.push(SetParameter::new(
1246 PortId::control_in(NodeId(1), 0),
1247 ParameterId::new("multiplier").unwrap(),
1248 4.0,
1249 SignalSource::Manual,
1250 ));
1251 while let Some(cmd) = queue.pop() {
1252 let idx = cmd.port.node_id().inner() as usize;
1253 let pid = cmd.parameter.clone();
1254 let _ = nodes[idx].set_parameter(&pid, ParamValue::Float(cmd.value));
1255 }
1256
1257 let pid = ParameterId::new("multiplier").unwrap();
1259 let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
1260 assert!((val - 4.0).abs() < 1e-6);
1261
1262 let mut ctx = ProcessContext { clock: &tick };
1264 let _ = nodes[topo[0]].process_block(&mut ctx);
1265 let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1266 let out_port = nodes[topo[0]].output_port(0).unwrap();
1267 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1268
1269 let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1270 assert!(
1271 (sink_val - 28.0).abs() < 1e-6,
1272 "source(7)×gain(4)=28, got {}",
1273 sink_val
1274 );
1275 }
1276
1277 #[test]
1280 fn test_feedback_propagation() {
1281 use rill_core::traits::algorithm::ActionContext;
1282
1283 const BUF: usize = 64;
1284 let mut builder = GraphBuilder::<f32, BUF>::new();
1285 let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
1286 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1287 NodeId(1),
1288 44100.0,
1289 2.0,
1290 )));
1291 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1292 builder.connect_signal(src, 0, proc, 0);
1294 builder.connect_signal(proc, 0, snk, 0);
1295 builder.connect_feedback(proc, 0, proc, 0);
1297 let graph = builder
1298 .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1299 .unwrap();
1300 let (mut nodes, topo, _) = graph.into_parts();
1301
1302 let tick1 = ClockTick::new(0, BUF as u32, 44100.0);
1304 let mut ctx = ProcessContext { clock: &tick1 };
1305 let _ = nodes[topo[0]].process_block(&mut ctx); let ctx1 = ActionContext::new(&tick1);
1307 let out_port = nodes[topo[0]].output_port(0).unwrap();
1308 out_port.propagate(out_port.buffer(), &ctx1).unwrap();
1309 let block1 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1310 assert!(
1311 (block1 - 2.0).abs() < 1e-6,
1312 "block1: 1.0×2.0=2.0, got {}",
1313 block1
1314 );
1315
1316 let tick2 = ClockTick::new(BUF as u64, BUF as u32, 44100.0);
1318 let mut ctx = ProcessContext { clock: &tick2 };
1319 let _ = nodes[topo[0]].process_block(&mut ctx); let ctx2 = ActionContext::new(&tick2);
1321 let out_port = nodes[topo[0]].output_port(0).unwrap();
1322 out_port.propagate(out_port.buffer(), &ctx2).unwrap();
1323 let block2 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1324 assert!(
1327 (block2 - 6.0).abs() < 1e-6,
1328 "block2: (1+2)×2=6.0, got {}",
1329 block2
1330 );
1331 }
1332
1333 #[test]
1336 fn test_drain_fn_before_propagate() {
1337 use rill_core::queues::{MpscQueue, SetParameter, SignalSource};
1338 use rill_core::traits::PortId;
1339
1340 const BUF: usize = 64;
1341 let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1342
1343 let mut builder = GraphBuilder::<f32, BUF>::new();
1344 let src = builder.add_source(Box::new(ConstantSource::new(5.0, 44100.0)));
1345 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1346 NodeId(1),
1347 44100.0,
1348 1.0,
1349 )));
1350 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1351 builder.connect_signal(src, 0, proc, 0);
1352 builder.connect_signal(proc, 0, snk, 0);
1353 let graph = builder
1354 .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1355 .unwrap();
1356 let (mut nodes, topo, _) = graph.into_parts();
1357 let nodes_ptr: *mut [NodeVariant<f32, BUF>] = &mut *nodes;
1358
1359 let drain_fn: Box<dyn Fn(&mut [NodeVariant<f32, BUF>])> = {
1361 let q = queue.clone();
1362 Box::new(move |nd: &mut [NodeVariant<f32, BUF>]| {
1363 while let Some(cmd) = q.pop() {
1364 let idx = cmd.port.node_id().inner() as usize;
1365 if idx < nd.len() {
1366 let pid = cmd.parameter.clone();
1367 let _ = nd[idx].set_parameter(&pid, ParamValue::Float(cmd.value));
1368 }
1369 }
1370 })
1371 };
1372
1373 let _ = queue.push(SetParameter::new(
1375 PortId::control_in(NodeId(1), 0),
1376 ParameterId::new("multiplier").unwrap(),
1377 3.0,
1378 SignalSource::Manual,
1379 ));
1380
1381 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1383
1384 #[allow(unsafe_code)]
1386 unsafe {
1387 drain_fn(&mut *nodes_ptr);
1388 }
1389
1390 let pid = ParameterId::new("multiplier").unwrap();
1392 #[allow(unsafe_code)]
1393 let val = unsafe {
1394 (*nodes_ptr)[1]
1395 .get_parameter(&pid)
1396 .unwrap()
1397 .as_f32()
1398 .unwrap()
1399 };
1400 assert!(
1401 (val - 3.0).abs() < 1e-6,
1402 "multiplier should be 3.0, got {}",
1403 val
1404 );
1405
1406 let mut ctx = ProcessContext { clock: &tick };
1408 #[allow(unsafe_code)]
1409 unsafe {
1410 (*nodes_ptr)[topo[0]].process_block(&mut ctx).unwrap();
1411 }
1412
1413 let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1415 #[allow(unsafe_code)]
1416 let out_port = unsafe { (*nodes_ptr)[topo[0]].output_port(0).unwrap() };
1417 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1418
1419 #[allow(unsafe_code)]
1421 let sink_val = unsafe {
1422 (*nodes_ptr)[topo[2]]
1423 .input_port(0)
1424 .unwrap()
1425 .buffer
1426 .as_array()[0]
1427 };
1428 assert!(
1429 (sink_val - 15.0).abs() < 1e-6,
1430 "source(5)×gain(3)=15, got {}",
1431 sink_val
1432 );
1433 }
1434}