1use crate::backend_factory::BackendFactory;
2use crate::factory::{NodeFactory, RegistryError};
3use rill_core::buffer::{Buffer, BufferRegistry, FixedBuffer, TapeLoop};
4use rill_core::math::Transcendental;
5use rill_core::queues::{MpscQueue, SetParameter};
6use rill_core::time::ClockTick;
7use rill_core::traits::active::GraphHandle;
8use rill_core::traits::port::Port;
9use rill_core::traits::ParamValue;
10use rill_core::traits::{Node, NodeId, NodeVariant, Params};
11use rill_core_actor::{ActorCell, ActorRef};
12use std::collections::{HashMap, VecDeque};
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15
16#[derive(Debug, Clone)]
26pub enum BuildError {
27 CycleDetected,
29 Backend(String),
31}
32
33impl std::fmt::Display for BuildError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::CycleDetected => write!(f, "graph cycle detected"),
37 Self::Backend(msg) => write!(f, "backend error: {msg}"),
38 }
39 }
40}
41
42pub(crate) struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
51 pub(crate) node: NodeVariant<T, BUF_SIZE>,
52}
53
54#[derive(Clone)]
60pub struct GraphResource {
61 pub name: String,
63 pub kind: String,
65 pub capacity: usize,
67}
68
69pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
76 nodes: Vec<NodeEntry<T, BUF_SIZE>>,
77 signal_edges: Vec<(usize, usize, usize, usize)>,
78 control_edges: Vec<(usize, usize, usize, usize)>,
79 clock_edges: Vec<(usize, usize, usize, usize)>,
80 feedback_edges: Vec<(usize, usize, usize, usize)>,
81 resources: Vec<GraphResource>,
82 factory: Arc<NodeFactory<T, BUF_SIZE>>,
84 backends: Arc<BackendFactory<T>>,
86 backend_config: Option<BackendConfig>,
88}
89
90struct BackendConfig {
91 name: String,
92 params: HashMap<String, ParamValue>,
93}
94
95impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
96 pub fn new(factory: Arc<NodeFactory<T, BUF_SIZE>>, backends: Arc<BackendFactory<T>>) -> Self {
98 Self {
99 nodes: Vec::new(),
100 signal_edges: Vec::new(),
101 control_edges: Vec::new(),
102 clock_edges: Vec::new(),
103 feedback_edges: Vec::new(),
104 resources: Vec::new(),
105 factory,
106 backends,
107 backend_config: None,
108 }
109 }
110
111 pub fn add_node(&mut self, type_name: &str, params: &Params) -> Result<usize, RegistryError> {
123 let id = NodeId(self.nodes.len() as u32);
124 self.add_node_with_id(type_name, params, id)
125 }
126
127 pub fn add_node_with_id(
133 &mut self,
134 type_name: &str,
135 params: &Params,
136 id: NodeId,
137 ) -> Result<usize, RegistryError> {
138 let node = self.factory.construct(type_name, id, params)?;
139 let idx = self.nodes.len();
140 self.nodes.push(NodeEntry { node });
141 Ok(idx)
142 }
143
144 pub fn add_resource(&mut self, resource: GraphResource) {
146 self.resources.push(resource);
147 }
148
149 pub fn node_count(&self) -> usize {
151 self.nodes.len()
152 }
153
154 pub fn backend_factory(&self) -> &Arc<BackendFactory<T>> {
156 &self.backends
157 }
158
159 pub fn add_source(&mut self, source: Box<dyn rill_core::traits::Source<T, BUF_SIZE>>) -> usize {
161 let idx = self.nodes.len();
162 self.nodes.push(NodeEntry {
163 node: NodeVariant::Source(source),
164 });
165 idx
166 }
167
168 pub fn add_processor(
170 &mut self,
171 processor: Box<dyn rill_core::traits::Processor<T, BUF_SIZE>>,
172 ) -> usize {
173 let idx = self.nodes.len();
174 self.nodes.push(NodeEntry {
175 node: NodeVariant::Processor(processor),
176 });
177 idx
178 }
179
180 pub fn add_sink(&mut self, sink: Box<dyn rill_core::traits::Sink<T, BUF_SIZE>>) -> usize {
182 let idx = self.nodes.len();
183 self.nodes.push(NodeEntry {
184 node: NodeVariant::Sink(sink),
185 });
186 idx
187 }
188
189 pub fn add_router(&mut self, router: Box<dyn rill_core::traits::Router<T, BUF_SIZE>>) -> usize {
191 let idx = self.nodes.len();
192 self.nodes.push(NodeEntry {
193 node: NodeVariant::Router(router),
194 });
195 idx
196 }
197
198 pub fn connect_signal(
200 &mut self,
201 from_node: usize,
202 from_port: usize,
203 to_node: usize,
204 to_port: usize,
205 ) {
206 self.signal_edges
207 .push((from_node, from_port, to_node, to_port));
208 }
209
210 pub fn connect_control(
212 &mut self,
213 from_node: usize,
214 from_port: usize,
215 to_node: usize,
216 to_port: usize,
217 ) {
218 self.control_edges
219 .push((from_node, from_port, to_node, to_port));
220 }
221
222 pub fn connect_clock(
224 &mut self,
225 from_node: usize,
226 from_port: usize,
227 to_node: usize,
228 to_port: usize,
229 ) {
230 self.clock_edges
231 .push((from_node, from_port, to_node, to_port));
232 }
233
234 pub fn connect_feedback(
236 &mut self,
237 from_node: usize,
238 from_port: usize,
239 to_node: usize,
240 to_port: usize,
241 ) {
242 self.feedback_edges
243 .push((from_node, from_port, to_node, to_port));
244 }
245
246 pub fn with_backend(mut self, backend_name: &str, params: HashMap<String, ParamValue>) -> Self {
258 self.backend_config = Some(BackendConfig {
259 name: backend_name.to_string(),
260 params,
261 });
262 self
263 }
264
265 pub fn build(mut self) -> Result<Graph<T, BUF_SIZE>, BuildError> {
272 let (backend_name, params) = match self.backend_config.as_ref() {
273 Some(cfg) => (Some(cfg.name.as_str()), &cfg.params),
274 None => (None, &HashMap::new()),
275 };
276 let num_nodes = self.nodes.len();
277
278 let mut in_degree = vec![0usize; num_nodes];
280 let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
281
282 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
283 in_degree[to_n] += 1;
284 out_edges[from_n].push((from_p, to_n, to_p));
285 }
286
287 let mut queue: VecDeque<usize> = in_degree
289 .iter()
290 .enumerate()
291 .filter(|(_, &d)| d == 0)
292 .map(|(i, _)| i)
293 .collect();
294
295 let mut topo = Vec::with_capacity(num_nodes);
296 let mut indeg = in_degree;
297 while let Some(idx) = queue.pop_front() {
298 topo.push(idx);
299 for &(_, to_n, _) in &out_edges[idx] {
300 indeg[to_n] -= 1;
301 if indeg[to_n] == 0 {
302 queue.push_back(to_n);
303 }
304 }
305 }
306
307 if topo.len() != num_nodes {
308 return Err(BuildError::CycleDetected);
309 }
310
311 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
313 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
315 port.downstream.push((to_n, to_p));
316 }
317 let in_ptr: *mut Port<T, BUF_SIZE> = self.nodes[to_n]
319 .node
320 .input_port_mut(to_p)
321 .map(|p| p as *mut Port<T, BUF_SIZE>)
322 .unwrap_or(std::ptr::null_mut());
323 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
324 let out_ptr: *mut Port<T, BUF_SIZE> = self.nodes[from_n]
325 .node
326 .output_port_mut(from_p)
327 .map(|p| p as *mut Port<T, BUF_SIZE>)
328 .unwrap_or(std::ptr::null_mut());
329 if !in_ptr.is_null() && !out_ptr.is_null() {
331 #[allow(unsafe_code)]
332 unsafe {
333 (*in_ptr).parent = parent;
334 (*out_ptr).downstream_input_ptrs.push(in_ptr);
335 }
336 }
337 }
338
339 for &(from_n, from_p, to_n, _) in &self.signal_edges {
341 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
342 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
343 let ptr_val = parent as usize;
344 let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
345 if !already {
346 port.downstream_nodes.push(parent);
347 }
348 }
349 }
350
351 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
353 let upstream = self.nodes[from_n]
354 .node
355 .output_port(from_p)
356 .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
357 if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
358 if port.upstream_buffer.is_none() {
359 port.upstream_buffer = upstream;
361 } else {
362 port.upstream_buffer = None;
364 }
365 }
366 }
367
368 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
370 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
371 port.feedback_buffer = Some(FixedBuffer::new());
372 port.feedback_downstream.push((to_n, to_p));
373 }
374 if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
375 port.feedback_buffer = Some(FixedBuffer::new());
376 }
377 }
378 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
380 let ptr = self.nodes[to_n]
381 .node
382 .input_port(to_p)
383 .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
384 .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
385 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
386 if let Some(p) = ptr {
387 port.feedback_ptrs.push(p);
388 }
389 }
390 }
391
392 let sr = params
393 .get("sample_rate")
394 .and_then(|v| v.as_i32())
395 .unwrap_or(44100) as f32;
396
397 let mut buffers = BufferRegistry::new();
399 for r in &self.resources {
400 if r.kind == "tape" {
401 if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
402 buffers.register(&r.name, Box::new(tape));
403 }
404 }
405 }
406
407 for entry in &mut self.nodes {
411 entry.node.resolve_resources(&buffers);
412 }
413
414 let backend_box = if let Some(name) = backend_name {
416 let b = self
417 .backends
418 .create(name, params)
419 .map_err(BuildError::Backend)?;
420 let ptr: *mut dyn rill_core::io::IoBackend<T> = &*b
421 as *const dyn rill_core::io::IoBackend<T>
422 as *mut dyn rill_core::io::IoBackend<T>;
423 for entry in &mut self.nodes {
424 entry.node.resolve_backend(ptr);
425 }
426 Some(b)
427 } else {
428 None
429 };
430
431 let mut nodes: Vec<NodeVariant<T, BUF_SIZE>> =
432 self.nodes.into_iter().map(|e| e.node).collect();
433
434 let cmd_queue = Arc::new(MpscQueue::<SetParameter>::with_capacity(64));
436 let have_queue = if let Some(ref _backend) = backend_box {
437 let driver_idx = nodes
438 .iter()
439 .position(|n| {
440 let name = n.metadata().name;
441 name == "AudioInput" || name == "Input"
442 })
443 .or_else(|| {
444 nodes.iter().position(|n| {
445 let name = n.metadata().name;
446 name == "AudioOutput" || name == "Output"
447 })
448 });
449 if let Some(driver_idx) = driver_idx {
450 let nodes_ptr = nodes.as_mut_ptr();
451 let len = nodes.len();
452 let source_idx = topo[0];
453 let queue_ptr: *const MpscQueue<SetParameter> = Arc::as_ptr(&cmd_queue);
454 let handle = GraphHandle {
455 nodes: nodes_ptr as *mut u8,
456 len,
457 source_idx,
458 sample_rate: sr,
459 queue: queue_ptr,
460 };
461 nodes[driver_idx].start(handle);
462 true
463 } else {
464 false
465 }
466 } else {
467 false
468 };
469 let command_queue = if have_queue { Some(cmd_queue) } else { None };
470
471 let owned_buffers = buffers.into_inner();
472
473 let allocated = self.resources.clone();
474
475 Ok(Graph {
476 nodes,
477 topo_order: topo,
478 resources: allocated,
479 current_tick: ClockTick::new(0, BUF_SIZE as u32, sr),
480 buffers: owned_buffers,
481 backend: backend_box,
482 command_queue,
483 })
484 }
485}
486
487pub struct Graph<T: Transcendental, const BUF_SIZE: usize> {
499 nodes: Vec<NodeVariant<T, BUF_SIZE>>,
500 topo_order: Vec<usize>,
501 current_tick: ClockTick,
502 pub(crate) resources: Vec<GraphResource>,
504 #[allow(dead_code)]
506 buffers: Vec<Box<dyn Buffer<T> + Send>>,
507 #[allow(dead_code)]
509 backend: Option<Box<dyn rill_core::io::IoBackend<T>>>,
510 command_queue: Option<Arc<MpscQueue<SetParameter>>>,
512}
513
514impl<T: Transcendental, const BUF_SIZE: usize> Graph<T, BUF_SIZE> {
515 pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
521 &self.nodes
522 }
523
524 pub fn nodes_mut(&mut self) -> &mut [NodeVariant<T, BUF_SIZE>] {
526 &mut self.nodes
527 }
528
529 pub fn current_tick(&self) -> ClockTick {
531 self.current_tick
532 }
533
534 pub fn node_count(&self) -> usize {
536 self.nodes.len()
537 }
538
539 pub fn topo_order(&self) -> &[usize] {
541 &self.topo_order
542 }
543
544 #[allow(dead_code)]
545 pub(crate) fn sample_rate(&self) -> f32 {
546 self.current_tick.sample_rate
547 }
548
549 #[allow(dead_code)]
551 pub fn resources(&self) -> &[GraphResource] {
552 &self.resources
553 }
554
555 #[allow(dead_code)]
557 pub(crate) fn backend_ref(&self) -> Option<&dyn rill_core::io::IoBackend<T>> {
558 self.backend.as_deref()
559 }
560
561 pub fn run(&self, running: Arc<AtomicBool>) -> Result<(), String> {
568 if let Some(ref backend) = self.backend {
569 backend.run(running.clone())?;
570 while running.load(Ordering::Acquire) {
571 std::thread::park();
572 }
573 backend.stop()
574 } else {
575 Ok(())
576 }
577 }
578
579 pub fn handle(&self) -> Option<ActorRef<SetParameter>> {
585 let mailbox = self.command_queue.as_ref()?;
586 Some(ActorRef::new(mailbox))
587 }
588
589 #[cfg(test)]
591 pub fn into_parts(
592 self,
593 ) -> (
594 Vec<NodeVariant<T, BUF_SIZE>>,
595 Vec<usize>,
596 ClockTick,
597 Vec<Box<dyn Buffer<T> + Send>>,
598 ) {
599 let Self {
600 nodes,
601 topo_order,
602 current_tick,
603 resources: _,
604 buffers,
605 backend: _,
606 command_queue: _,
607 } = self;
608 (nodes, topo_order, current_tick, buffers)
609 }
610}
611
612impl<T: Transcendental, const BUF_SIZE: usize> ActorCell for Graph<T, BUF_SIZE> {
617 type Msg = SetParameter;
618
619 fn receive(&mut self, msg: SetParameter) {
621 let idx = msg.port.node_id().inner() as usize;
622 if idx < self.nodes.len() {
623 let _ = self.nodes[idx].set_parameter(&msg.parameter, msg.value);
624 }
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use super::*;
631 use rill_core::math::Transcendental;
632 use rill_core::time::ClockTick;
633 use rill_core::traits::active::ActiveNode;
634 use rill_core::traits::algorithm::ActionContext;
635 use rill_core::traits::processable::{ProcessContext, Processable};
636 use rill_core::traits::{
637 Node, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
638 PortDirection, PortId, ProcessResult, Processor, Sink, Source,
639 };
640 use std::sync::Arc;
641
642 fn test_builder<const B: usize>() -> GraphBuilder<f32, B> {
644 GraphBuilder::new(
645 Arc::new(NodeFactory::new()),
646 Arc::new(BackendFactory::new()),
647 )
648 }
649
650 struct ConstantSource<T: Transcendental, const BUF_SIZE: usize> {
654 value: T,
655 state: NodeState<T, BUF_SIZE>,
656 outputs: Vec<Port<T, BUF_SIZE>>,
657 }
658
659 impl<T: Transcendental, const BUF_SIZE: usize> ConstantSource<T, BUF_SIZE> {
660 fn new(value: T, sample_rate: f32) -> Self {
661 let mut outputs = Vec::with_capacity(1);
662 outputs.push(Port {
663 id: PortId::signal_out(NodeId(0), 0),
664 name: "output".into(),
665 direction: PortDirection::Output,
666 action: None,
667 pending_command: None,
668 buffer: Default::default(),
669 feedback_buffer: None,
670 downstream: Vec::new(),
671 feedback_downstream: Vec::new(),
672 feedback_ptrs: Vec::new(),
673 downstream_input_ptrs: Vec::new(),
674 downstream_nodes: Vec::new(),
675 parent: std::ptr::null_mut(),
676 upstream_buffer: None,
677 });
678 Self {
679 value,
680 state: NodeState::new(sample_rate),
681 outputs,
682 }
683 }
684 }
685
686 impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
687 fn metadata(&self) -> NodeMetadata {
688 NodeMetadata {
689 type_name: None,
690 name: "ConstantSource".into(),
691 category: NodeCategory::Source,
692 description: String::new(),
693 author: String::new(),
694 version: "1.0".into(),
695 signal_inputs: 0,
696 signal_outputs: 1,
697 control_inputs: 0,
698 control_outputs: 0,
699 clock_inputs: 0,
700 clock_outputs: 0,
701 feedback_ports: 0,
702 parameters: vec![],
703 }
704 }
705 fn init(&mut self, _sample_rate: f32) {}
706 fn reset(&mut self) {}
707 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
708 None
709 }
710 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
711 Ok(())
712 }
713 fn id(&self) -> NodeId {
714 NodeId(0)
715 }
716 fn set_id(&mut self, _id: NodeId) {}
717 fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
718 None
719 }
720 fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
721 None
722 }
723 fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
724 self.outputs.get(index)
725 }
726 fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
727 self.outputs.get_mut(index)
728 }
729 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
730 None
731 }
732 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
733 None
734 }
735 fn state(&self) -> &NodeState<T, BUF_SIZE> {
736 &self.state
737 }
738 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
739 &mut self.state
740 }
741 }
742
743 impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
744 fn generate(
745 &mut self,
746 _clock: &ClockTick,
747 _control_inputs: &[T],
748 _clock_inputs: &[ClockTick],
749 ) -> ProcessResult<()> {
750 let out = self.outputs[0].buffer.as_mut_array();
751 for sample in out.iter_mut() {
752 *sample = self.value;
753 }
754 Ok(())
755 }
756 fn num_signal_outputs(&self) -> usize {
757 1
758 }
759 }
760
761 impl<const BUF_SIZE: usize> ActiveNode for ConstantSource<f32, BUF_SIZE> {
762 fn start(&mut self, handle: GraphHandle) {
763 #[allow(unsafe_code)]
764 unsafe {
765 let nodes = std::slice::from_raw_parts_mut(
766 handle.nodes as *mut NodeVariant<f32, BUF_SIZE>,
767 handle.len,
768 );
769 let idx = handle.source_idx;
770 let tick = ClockTick::new(0, BUF_SIZE as u32, handle.sample_rate);
771 let mut ctx = ProcessContext { clock: &tick };
772 let _ = nodes[idx].process_block(&mut ctx);
773 let action_ctx = ActionContext::new(&tick);
774 for po in 0..nodes[idx].num_signal_outputs() {
775 if let Some(port) = nodes[idx].output_port(po) {
776 let _ = port.propagate(port.buffer(), &action_ctx);
777 }
778 }
779 }
780 }
781 fn stop(&mut self) {}
782 }
783
784 struct NoopProcessor<T: Transcendental, const BUF_SIZE: usize> {
788 state: NodeState<T, BUF_SIZE>,
789 }
790
791 impl<T: Transcendental, const BUF_SIZE: usize> NoopProcessor<T, BUF_SIZE> {
792 fn new(sample_rate: f32) -> Self {
793 Self {
794 state: NodeState::new(sample_rate),
795 }
796 }
797 }
798
799 impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for NoopProcessor<T, BUF_SIZE> {
800 fn metadata(&self) -> NodeMetadata {
801 NodeMetadata {
802 type_name: None,
803 name: "NoopProcessor".into(),
804 category: NodeCategory::Processor,
805 description: String::new(),
806 author: String::new(),
807 version: "1.0".into(),
808 signal_inputs: 0,
809 signal_outputs: 0,
810 control_inputs: 0,
811 control_outputs: 0,
812 clock_inputs: 0,
813 clock_outputs: 0,
814 feedback_ports: 0,
815 parameters: vec![],
816 }
817 }
818 fn init(&mut self, _sample_rate: f32) {}
819 fn reset(&mut self) {}
820 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
821 None
822 }
823 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
824 Ok(())
825 }
826 fn id(&self) -> NodeId {
827 NodeId(1)
828 }
829 fn set_id(&mut self, _id: NodeId) {}
830 fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
831 None
832 }
833 fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
834 None
835 }
836 fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
837 None
838 }
839 fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
840 None
841 }
842 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
843 None
844 }
845 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
846 None
847 }
848 fn state(&self) -> &NodeState<T, BUF_SIZE> {
849 &self.state
850 }
851 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
852 &mut self.state
853 }
854 }
855
856 impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
857 for NoopProcessor<T, BUF_SIZE>
858 {
859 fn process(
860 &mut self,
861 _clock: &ClockTick,
862 _signal_inputs: &[&[T; BUF_SIZE]],
863 _control_inputs: &[T],
864 _clock_inputs: &[ClockTick],
865 _feedback_inputs: &[&[T; BUF_SIZE]],
866 ) -> ProcessResult<()> {
867 Ok(())
868 }
869 }
870
871 struct NoopSink<T: Transcendental, const BUF_SIZE: usize> {
875 state: NodeState<T, BUF_SIZE>,
876 }
877
878 impl<T: Transcendental, const BUF_SIZE: usize> NoopSink<T, BUF_SIZE> {
879 fn new(sample_rate: f32) -> Self {
880 Self {
881 state: NodeState::new(sample_rate),
882 }
883 }
884 }
885
886 impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
887 fn metadata(&self) -> NodeMetadata {
888 NodeMetadata {
889 type_name: None,
890 name: "NoopSink".into(),
891 category: NodeCategory::Sink,
892 description: String::new(),
893 author: String::new(),
894 version: "1.0".into(),
895 signal_inputs: 0,
896 signal_outputs: 0,
897 control_inputs: 0,
898 control_outputs: 0,
899 clock_inputs: 0,
900 clock_outputs: 0,
901 feedback_ports: 0,
902 parameters: vec![],
903 }
904 }
905 fn init(&mut self, _sample_rate: f32) {}
906 fn reset(&mut self) {}
907 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
908 None
909 }
910 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
911 Ok(())
912 }
913 fn id(&self) -> NodeId {
914 NodeId(2)
915 }
916 fn set_id(&mut self, _id: NodeId) {}
917 fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
918 None
919 }
920 fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
921 None
922 }
923 fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
924 None
925 }
926 fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
927 None
928 }
929 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
930 None
931 }
932 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
933 None
934 }
935 fn state(&self) -> &NodeState<T, BUF_SIZE> {
936 &self.state
937 }
938 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
939 &mut self.state
940 }
941 }
942
943 impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
944 fn consume(
945 &mut self,
946 _clock: &ClockTick,
947 _signal_inputs: &[&[T; BUF_SIZE]],
948 _control_inputs: &[T],
949 _clock_inputs: &[ClockTick],
950 _feedback_inputs: &[&[T; BUF_SIZE]],
951 ) -> ProcessResult<()> {
952 Ok(())
953 }
954 }
955
956 #[test]
961 fn test_topo_order_correct() {
962 const BUF: usize = 64;
963 let mut builder = test_builder::<BUF>();
964
965 let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
966 let proc = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
967 let sink = builder.add_sink(Box::new(NoopSink::new(44100.0)));
968
969 builder.connect_signal(src, 0, proc, 0);
970 builder.connect_signal(proc, 0, sink, 0);
971
972 let graph = builder.build().expect("build failed");
973
974 let order = graph.topo_order();
975 let src_pos = order.iter().position(|&i| i == src).unwrap();
976 let proc_pos = order.iter().position(|&i| i == proc).unwrap();
977 let sink_pos = order.iter().position(|&i| i == sink).unwrap();
978 assert!(src_pos < proc_pos);
979 assert!(proc_pos < sink_pos);
980 }
981
982 #[test]
983 fn test_cycle_detection() {
984 const BUF: usize = 64;
985 let mut builder = test_builder::<BUF>();
986
987 let a = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
988 let b = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
989
990 builder.connect_signal(a, 0, b, 0);
991 builder.connect_signal(b, 0, a, 0);
992
993 let result = builder.build();
994 assert!(matches!(result, Err(BuildError::CycleDetected)));
995 }
996
997 #[test]
998 fn test_source_node_create() {
999 const BUF: usize = 64;
1000 let mut builder = test_builder::<BUF>();
1001 let idx = builder.add_source(Box::new(ConstantSource::new(0.5, 44100.0)));
1002 let graph = builder.build().expect("build failed");
1003 assert_eq!(graph.node_count(), 1);
1004 assert_eq!(graph.topo_order(), &[idx]);
1005 }
1006
1007 pub struct TestSink<T: Transcendental, const BUF_SIZE: usize> {
1013 id: NodeId,
1014 state: NodeState<T, BUF_SIZE>,
1015 pub inputs: Vec<Port<T, BUF_SIZE>>,
1016 last_value: T,
1017 }
1018
1019 impl<T: Transcendental, const BUF_SIZE: usize> TestSink<T, BUF_SIZE> {
1020 fn new(id: NodeId, sample_rate: f32) -> Self {
1021 let mut inputs = Vec::new();
1022 inputs.push(Port::input(id, 0, "in"));
1023 Self {
1024 id,
1025 state: NodeState::new(sample_rate),
1026 inputs,
1027 last_value: T::ZERO,
1028 }
1029 }
1030 #[allow(dead_code)]
1031 fn last_value(&self) -> T {
1032 self.last_value
1033 }
1034 }
1035
1036 impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
1037 fn metadata(&self) -> NodeMetadata {
1038 NodeMetadata {
1039 type_name: None,
1040 name: "TestSink".into(),
1041 category: NodeCategory::Sink,
1042 description: String::new(),
1043 author: String::new(),
1044 version: "1.0".into(),
1045 signal_inputs: 1,
1046 signal_outputs: 0,
1047 control_inputs: 0,
1048 control_outputs: 0,
1049 clock_inputs: 0,
1050 clock_outputs: 0,
1051 feedback_ports: 0,
1052 parameters: vec![],
1053 }
1054 }
1055 fn init(&mut self, _: f32) {}
1056 fn reset(&mut self) {
1057 self.state.sample_pos = 0;
1058 self.state.blocks_processed = 0;
1059 }
1060 fn id(&self) -> NodeId {
1061 self.id
1062 }
1063 fn set_id(&mut self, id: NodeId) {
1064 self.id = id;
1065 }
1066 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
1067 None
1068 }
1069 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
1070 Ok(())
1071 }
1072 fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1073 self.inputs.get(i)
1074 }
1075 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1076 self.inputs.get_mut(i)
1077 }
1078 fn output_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1079 None
1080 }
1081 fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1082 None
1083 }
1084 fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1085 None
1086 }
1087 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1088 None
1089 }
1090 fn num_signal_inputs(&self) -> usize {
1091 1
1092 }
1093 fn num_signal_outputs(&self) -> usize {
1094 0
1095 }
1096 fn state(&self) -> &NodeState<T, BUF_SIZE> {
1097 &self.state
1098 }
1099 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
1100 &mut self.state
1101 }
1102 }
1103
1104 impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
1105 fn consume(
1106 &mut self,
1107 _clock: &ClockTick,
1108 _signal_inputs: &[&[T; BUF_SIZE]],
1109 _control_inputs: &[T],
1110 _clock_inputs: &[ClockTick],
1111 _feedback_inputs: &[&[T; BUF_SIZE]],
1112 ) -> ProcessResult<()> {
1113 if let Some(port) = self.inputs.first() {
1114 self.last_value = port.buffer.as_array()[0];
1115 }
1116 self.state.advance();
1117 Ok(())
1118 }
1119 }
1120
1121 pub struct GainProcessor<T: Transcendental, const BUF_SIZE: usize> {
1123 id: NodeId,
1124 state: NodeState<T, BUF_SIZE>,
1125 pub inputs: Vec<Port<T, BUF_SIZE>>,
1126 pub outputs: Vec<Port<T, BUF_SIZE>>,
1127 pub multiplier: T,
1128 }
1129
1130 impl<T: Transcendental, const BUF_SIZE: usize> GainProcessor<T, BUF_SIZE> {
1131 fn new(id: NodeId, sample_rate: f32, multiplier: T) -> Self {
1132 let mut inputs = Vec::new();
1133 inputs.push(Port::input(id, 0, "in"));
1134 let mut outputs = Vec::new();
1135 outputs.push(Port::output(id, 0, "out"));
1136 Self {
1137 id,
1138 state: NodeState::new(sample_rate),
1139 inputs,
1140 outputs,
1141 multiplier,
1142 }
1143 }
1144 }
1145
1146 impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for GainProcessor<T, BUF_SIZE> {
1147 fn metadata(&self) -> NodeMetadata {
1148 NodeMetadata {
1149 type_name: None,
1150 name: "GainProcessor".into(),
1151 category: NodeCategory::Processor,
1152 description: String::new(),
1153 author: String::new(),
1154 version: "1.0".into(),
1155 signal_inputs: 1,
1156 signal_outputs: 1,
1157 control_inputs: 0,
1158 control_outputs: 0,
1159 clock_inputs: 0,
1160 clock_outputs: 0,
1161 feedback_ports: 0,
1162 parameters: vec![],
1163 }
1164 }
1165 fn init(&mut self, _: f32) {}
1166 fn reset(&mut self) {
1167 self.state.sample_pos = 0;
1168 self.state.blocks_processed = 0;
1169 }
1170 fn id(&self) -> NodeId {
1171 self.id
1172 }
1173 fn set_id(&mut self, id: NodeId) {
1174 self.id = id;
1175 }
1176 fn get_parameter(&self, id: &ParameterId) -> Option<ParamValue> {
1177 match id.as_str() {
1178 "multiplier" => Some(ParamValue::Float(self.multiplier.to_f32())),
1179 _ => None,
1180 }
1181 }
1182 fn set_parameter(&mut self, id: &ParameterId, value: ParamValue) -> ProcessResult<()> {
1183 match id.as_str() {
1184 "multiplier" => {
1185 if let Some(v) = value.as_f32() {
1186 self.multiplier = T::from_f32(v);
1187 Ok(())
1188 } else {
1189 Err(rill_core::ProcessError::parameter("expected float"))
1190 }
1191 }
1192 _ => Err(rill_core::ProcessError::parameter("unknown")),
1193 }
1194 }
1195 fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1196 self.inputs.get(i)
1197 }
1198 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1199 self.inputs.get_mut(i)
1200 }
1201 fn output_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1202 self.outputs.get(i)
1203 }
1204 fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1205 self.outputs.get_mut(i)
1206 }
1207 fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1208 None
1209 }
1210 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1211 None
1212 }
1213 fn num_signal_inputs(&self) -> usize {
1214 1
1215 }
1216 fn num_signal_outputs(&self) -> usize {
1217 1
1218 }
1219 fn state(&self) -> &NodeState<T, BUF_SIZE> {
1220 &self.state
1221 }
1222 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
1223 &mut self.state
1224 }
1225 }
1226
1227 impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
1228 for GainProcessor<T, BUF_SIZE>
1229 {
1230 fn process(
1231 &mut self,
1232 _clock: &ClockTick,
1233 _signal_inputs: &[&[T; BUF_SIZE]],
1234 _control_inputs: &[T],
1235 _clock_inputs: &[ClockTick],
1236 _feedback_inputs: &[&[T; BUF_SIZE]],
1237 ) -> ProcessResult<()> {
1238 let inp = *self.inputs[0].buffer.as_array();
1239 let out = self.outputs[0].buffer.as_mut_array();
1240 for i in 0..BUF_SIZE {
1241 out[i] = inp[i] * self.multiplier;
1242 }
1243 self.state.advance();
1244 Ok(())
1245 }
1246 fn latency(&self) -> usize {
1247 0
1248 }
1249 }
1250
1251 #[test]
1254 fn test_graph_source_to_sink() {
1255 use rill_core::traits::algorithm::ActionContext;
1256 use rill_core::traits::processable::{ProcessContext, Processable};
1257 const BUF: usize = 64;
1258 let mut builder = test_builder::<BUF>();
1259 let src = builder.add_source(Box::new(ConstantSource::new(42.0, 44100.0)));
1260 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(1), 44100.0)));
1261 builder.connect_signal(src, 0, snk, 0);
1262 let graph = builder.build().unwrap();
1263 let (mut nodes, topo, _, _bufs) = graph.into_parts();
1264 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1265
1266 let mut ctx = ProcessContext { clock: &tick };
1267 let _ = nodes[topo[0]].process_block(&mut ctx);
1268 let action_ctx = ActionContext::new(&tick);
1269 let out_port = nodes[topo[0]].output_port(0).unwrap();
1270 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1271
1272 let sink_val = nodes[topo[1]].input_port(0).unwrap().buffer.as_array()[0];
1273 assert_eq!(sink_val, 42.0, "sink should receive source value");
1274 }
1275
1276 #[test]
1279 fn test_graph_source_proc_sink() {
1280 use rill_core::traits::algorithm::ActionContext;
1281 use rill_core::traits::processable::{ProcessContext, Processable};
1282 const BUF: usize = 64;
1283 let mut builder = test_builder::<BUF>();
1284 let src = builder.add_source(Box::new(ConstantSource::new(10.0, 44100.0)));
1285 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1286 NodeId(1),
1287 44100.0,
1288 3.0,
1289 )));
1290 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1291 builder.connect_signal(src, 0, proc, 0);
1292 builder.connect_signal(proc, 0, snk, 0);
1293 let graph = builder.build().unwrap();
1294 let (mut nodes, topo, _, _bufs) = graph.into_parts();
1295 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1296
1297 let mut ctx = ProcessContext { clock: &tick };
1298 let _ = nodes[topo[0]].process_block(&mut ctx);
1299 let action_ctx = ActionContext::new(&tick);
1300 let out_port = nodes[topo[0]].output_port(0).unwrap();
1301 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1302
1303 let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1304 assert!(
1305 (sink_val - 30.0).abs() < 1e-6,
1306 "source(10)×gain(3)=30, got {}",
1307 sink_val
1308 );
1309 }
1310
1311 #[test]
1314 fn test_command_queue_drain() {
1315 use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1316 use rill_core::traits::PortId;
1317
1318 const BUF: usize = 64;
1319 let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1320
1321 let mut builder = test_builder::<BUF>();
1322 builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1323 NodeId(0),
1324 44100.0,
1325 2.0,
1326 )));
1327 let graph = builder.build().unwrap();
1328 let (mut nodes, _, _, _bufs) = graph.into_parts();
1329
1330 let _ = queue.push(SetParameter::new(
1331 PortId::control_in(NodeId(0), 0),
1332 ParameterId::new("multiplier").unwrap(),
1333 ParamValue::Float(5.0),
1334 SignalOrigin::Manual,
1335 ));
1336
1337 while let Some(cmd) = queue.pop() {
1338 let idx = cmd.port.node_id().inner() as usize;
1339 let pid = cmd.parameter.clone();
1340 let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1341 }
1342
1343 let pid = ParameterId::new("multiplier").unwrap();
1344 let val = nodes[0].get_parameter(&pid).unwrap().as_f32().unwrap();
1345 assert!(
1346 (val - 5.0).abs() < 1e-6,
1347 "multiplier should be 5.0, got {}",
1348 val
1349 );
1350 }
1351
1352 #[test]
1355 fn test_command_then_propagate() {
1356 use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1357 use rill_core::traits::algorithm::ActionContext;
1358 use rill_core::traits::processable::{ProcessContext, Processable};
1359 use rill_core::traits::PortId;
1360
1361 const BUF: usize = 64;
1362 let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1363
1364 let mut builder = test_builder::<BUF>();
1365 let src = builder.add_source(Box::new(ConstantSource::new(7.0, 44100.0)));
1366 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1367 NodeId(1),
1368 44100.0,
1369 2.0,
1370 )));
1371 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1372 builder.connect_signal(src, 0, proc, 0);
1373 builder.connect_signal(proc, 0, snk, 0);
1374 let graph = builder.build().unwrap();
1375 let (mut nodes, topo, _, _bufs) = graph.into_parts();
1376 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1377
1378 let _ = queue.push(SetParameter::new(
1380 PortId::control_in(NodeId(1), 0),
1381 ParameterId::new("multiplier").unwrap(),
1382 ParamValue::Float(4.0),
1383 SignalOrigin::Manual,
1384 ));
1385 while let Some(cmd) = queue.pop() {
1386 let idx = cmd.port.node_id().inner() as usize;
1387 let pid = cmd.parameter.clone();
1388 let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1389 }
1390
1391 let pid = ParameterId::new("multiplier").unwrap();
1393 let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
1394 assert!((val - 4.0).abs() < 1e-6);
1395
1396 let mut ctx = ProcessContext { clock: &tick };
1398 let _ = nodes[topo[0]].process_block(&mut ctx);
1399 let action_ctx = ActionContext::new(&tick);
1400 let out_port = nodes[topo[0]].output_port(0).unwrap();
1401 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1402
1403 let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1404 assert!(
1405 (sink_val - 28.0).abs() < 1e-6,
1406 "source(7)×gain(4)=28, got {}",
1407 sink_val
1408 );
1409 }
1410
1411 #[test]
1414 fn test_feedback_propagation() {
1415 use rill_core::traits::algorithm::ActionContext;
1416 use rill_core::traits::processable::{ProcessContext, Processable};
1417
1418 const BUF: usize = 64;
1419 let mut builder = test_builder::<BUF>();
1420 let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
1421 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1422 NodeId(1),
1423 44100.0,
1424 2.0,
1425 )));
1426 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1427 builder.connect_signal(src, 0, proc, 0);
1428 builder.connect_signal(proc, 0, snk, 0);
1429 builder.connect_feedback(proc, 0, proc, 0);
1430 let graph = builder.build().unwrap();
1431 let (mut nodes, topo, _, _bufs) = graph.into_parts();
1432
1433 let tick1 = ClockTick::new(0, BUF as u32, 44100.0);
1435 let mut ctx = ProcessContext { clock: &tick1 };
1436 let _ = nodes[topo[0]].process_block(&mut ctx);
1437 let ctx1 = ActionContext::new(&tick1);
1438 let out_port = nodes[topo[0]].output_port(0).unwrap();
1439 out_port.propagate(out_port.buffer(), &ctx1).unwrap();
1440 let block1 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1441 assert!(
1442 (block1 - 2.0).abs() < 1e-6,
1443 "block1: 1.0×2.0=2.0, got {}",
1444 block1
1445 );
1446
1447 let tick2 = ClockTick::new(BUF as u64, BUF as u32, 44100.0);
1449 let mut ctx = ProcessContext { clock: &tick2 };
1450 let _ = nodes[topo[0]].process_block(&mut ctx);
1451 let ctx2 = ActionContext::new(&tick2);
1452 let out_port = nodes[topo[0]].output_port(0).unwrap();
1453 out_port.propagate(out_port.buffer(), &ctx2).unwrap();
1454 let block2 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1455 assert!(
1456 (block2 - 6.0).abs() < 1e-6,
1457 "block2: (1+2)×2=6.0, got {}",
1458 block2
1459 );
1460 }
1461
1462 #[test]
1465 fn test_drain_fn_before_propagate() {
1466 use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1467 use rill_core::traits::algorithm::ActionContext;
1468 use rill_core::traits::processable::{ProcessContext, Processable};
1469 use rill_core::traits::PortId;
1470
1471 const BUF: usize = 64;
1472 let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1473
1474 let mut builder = test_builder::<BUF>();
1475 let src = builder.add_source(Box::new(ConstantSource::new(5.0, 44100.0)));
1476 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1477 NodeId(1),
1478 44100.0,
1479 1.0,
1480 )));
1481 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1482 builder.connect_signal(src, 0, proc, 0);
1483 builder.connect_signal(proc, 0, snk, 0);
1484 let graph = builder.build().unwrap();
1485 let (mut nodes, topo, _, _bufs) = graph.into_parts();
1486 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1487
1488 let _ = queue.push(SetParameter::new(
1490 PortId::control_in(NodeId(1), 0),
1491 ParameterId::new("multiplier").unwrap(),
1492 ParamValue::Float(3.0),
1493 SignalOrigin::Manual,
1494 ));
1495
1496 while let Some(cmd) = queue.pop() {
1498 let idx = cmd.port.node_id().inner() as usize;
1499 let pid = cmd.parameter.clone();
1500 let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1501 }
1502
1503 let pid = ParameterId::new("multiplier").unwrap();
1505 let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
1506 assert!(
1507 (val - 3.0).abs() < 1e-6,
1508 "multiplier should be 3.0, got {}",
1509 val
1510 );
1511
1512 let mut ctx = ProcessContext { clock: &tick };
1514 let _ = nodes[topo[0]].process_block(&mut ctx).unwrap();
1515
1516 let action_ctx = ActionContext::new(&tick);
1518 let out_port = nodes[topo[0]].output_port(0).unwrap();
1519 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1520
1521 let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1523 assert!(
1524 (sink_val - 15.0).abs() < 1e-6,
1525 "source(5)×gain(3)=15, got {}",
1526 sink_val
1527 );
1528 }
1529}