1use crate::backend_factory::BackendFactory;
2use crate::factory::{NodeFactory, RegistryError};
3use rill_core::buffer::{Buffer, BufferRegistry, FixedBuffer, TapeLoop};
4use rill_core::math::Transcendental;
5use rill_core::queues::{MpscQueue, SetParameter};
6use rill_core::time::ClockTick;
7use rill_core::traits::algorithm::ActionContext;
8use rill_core::traits::port::Port;
9use rill_core::traits::processable::{ProcessContext, Processable};
10use rill_core::traits::ParamValue;
11use rill_core::traits::{Node, NodeId, NodeVariant, Params};
12use rill_core_actor::{ActorCell, ActorRef};
13use std::collections::{HashMap, VecDeque};
14use std::sync::atomic::AtomicBool;
15use std::sync::Arc;
16
17#[derive(Debug, Clone)]
27pub enum BuildError {
28 CycleDetected,
30 Backend(String),
32}
33
34impl std::fmt::Display for BuildError {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 Self::CycleDetected => write!(f, "graph cycle detected"),
38 Self::Backend(msg) => write!(f, "backend error: {msg}"),
39 }
40 }
41}
42
43pub(crate) struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
52 pub(crate) node: NodeVariant<T, BUF_SIZE>,
53}
54
55#[derive(Clone)]
61pub struct GraphResource {
62 pub name: String,
64 pub kind: String,
66 pub capacity: usize,
68}
69
70pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
77 nodes: Vec<NodeEntry<T, BUF_SIZE>>,
78 node_backends: Vec<Option<String>>,
79 signal_edges: Vec<(usize, usize, usize, usize)>,
80 control_edges: Vec<(usize, usize, usize, usize)>,
81 clock_edges: Vec<(usize, usize, usize, usize)>,
82 feedback_edges: Vec<(usize, usize, usize, usize)>,
83 resources: Vec<GraphResource>,
84 factory: Arc<NodeFactory<T, BUF_SIZE>>,
86 backend_factory: Arc<BackendFactory<T>>,
88 default_backend: Option<String>,
90 backend_params: HashMap<String, ParamValue>,
92 sample_rate: Option<f32>,
95 clock_tx: Option<ActorRef<ClockTick>>,
97}
98
99impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
100 pub fn new(
102 factory: Arc<NodeFactory<T, BUF_SIZE>>,
103 backend_factory: Arc<BackendFactory<T>>,
104 ) -> Self {
105 Self {
106 nodes: Vec::new(),
107 node_backends: Vec::new(),
108 signal_edges: Vec::new(),
109 control_edges: Vec::new(),
110 clock_edges: Vec::new(),
111 feedback_edges: Vec::new(),
112 resources: Vec::new(),
113 factory,
114 backend_factory,
115 default_backend: None,
116 backend_params: HashMap::new(),
117 sample_rate: None,
118 clock_tx: None,
119 }
120 }
121
122 pub fn add_node(&mut self, type_name: &str, params: &Params) -> Result<usize, RegistryError> {
134 let id = NodeId(self.nodes.len() as u32);
135 self.add_node_with_id(type_name, params, id)
136 }
137
138 pub fn add_node_with_id(
144 &mut self,
145 type_name: &str,
146 params: &Params,
147 id: NodeId,
148 ) -> Result<usize, RegistryError> {
149 let node = self.factory.construct(type_name, id, params)?;
150 let idx = self.nodes.len();
151 self.nodes.push(NodeEntry { node });
152 self.node_backends.push(None);
153 Ok(idx)
154 }
155
156 pub fn set_node_backend(&mut self, idx: usize, name: String) {
162 if idx < self.node_backends.len() {
163 self.node_backends[idx] = Some(name);
164 }
165 }
166
167 pub fn add_resource(&mut self, resource: GraphResource) {
169 self.resources.push(resource);
170 }
171
172 pub fn node_count(&self) -> usize {
174 self.nodes.len()
175 }
176
177 pub fn set_default_backend(&mut self, name: String, params: HashMap<String, ParamValue>) {
180 self.default_backend = Some(name);
181 self.backend_params = params;
182 }
183
184 pub fn default_backend_name(&self) -> Option<&String> {
186 self.default_backend.as_ref()
187 }
188
189 pub fn set_sample_rate(&mut self, sr: f32) {
191 self.sample_rate = Some(sr);
192 }
193
194 pub fn set_clock_tx(&mut self, tx: ActorRef<ClockTick>) {
196 self.clock_tx = Some(tx);
197 }
198
199 pub fn backend_factory(&self) -> &Arc<BackendFactory<T>> {
201 &self.backend_factory
202 }
203
204 pub fn add_source(&mut self, source: Box<dyn rill_core::traits::Source<T, BUF_SIZE>>) -> usize {
206 let idx = self.nodes.len();
207 self.nodes.push(NodeEntry {
208 node: NodeVariant::Source(source),
209 });
210 self.node_backends.push(None);
211 idx
212 }
213
214 pub fn add_processor(
216 &mut self,
217 processor: Box<dyn rill_core::traits::Processor<T, BUF_SIZE>>,
218 ) -> usize {
219 let idx = self.nodes.len();
220 self.nodes.push(NodeEntry {
221 node: NodeVariant::Processor(processor),
222 });
223 self.node_backends.push(None);
224 idx
225 }
226
227 pub fn add_sink(&mut self, sink: Box<dyn rill_core::traits::Sink<T, BUF_SIZE>>) -> usize {
229 let idx = self.nodes.len();
230 self.nodes.push(NodeEntry {
231 node: NodeVariant::Sink(sink),
232 });
233 self.node_backends.push(None);
234 idx
235 }
236
237 pub fn add_router(&mut self, router: Box<dyn rill_core::traits::Router<T, BUF_SIZE>>) -> usize {
239 let idx = self.nodes.len();
240 self.nodes.push(NodeEntry {
241 node: NodeVariant::Router(router),
242 });
243 self.node_backends.push(None);
244 idx
245 }
246
247 pub fn connect_signal(
249 &mut self,
250 from_node: usize,
251 from_port: usize,
252 to_node: usize,
253 to_port: usize,
254 ) {
255 self.signal_edges
256 .push((from_node, from_port, to_node, to_port));
257 }
258
259 pub fn connect_control(
261 &mut self,
262 from_node: usize,
263 from_port: usize,
264 to_node: usize,
265 to_port: usize,
266 ) {
267 self.control_edges
268 .push((from_node, from_port, to_node, to_port));
269 }
270
271 pub fn connect_clock(
273 &mut self,
274 from_node: usize,
275 from_port: usize,
276 to_node: usize,
277 to_port: usize,
278 ) {
279 self.clock_edges
280 .push((from_node, from_port, to_node, to_port));
281 }
282
283 pub fn connect_feedback(
285 &mut self,
286 from_node: usize,
287 from_port: usize,
288 to_node: usize,
289 to_port: usize,
290 ) {
291 self.feedback_edges
292 .push((from_node, from_port, to_node, to_port));
293 }
294
295 pub fn build(mut self) -> Result<Graph<T, BUF_SIZE>, BuildError> {
301 let num_nodes = self.nodes.len();
302
303 let mut in_degree = vec![0usize; num_nodes];
305 let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
306
307 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
308 in_degree[to_n] += 1;
309 out_edges[from_n].push((from_p, to_n, to_p));
310 }
311
312 let mut queue: VecDeque<usize> = in_degree
314 .iter()
315 .enumerate()
316 .filter(|(_, &d)| d == 0)
317 .map(|(i, _)| i)
318 .collect();
319
320 let mut topo = Vec::with_capacity(num_nodes);
321 let mut indeg = in_degree;
322 while let Some(idx) = queue.pop_front() {
323 topo.push(idx);
324 for &(_, to_n, _) in &out_edges[idx] {
325 indeg[to_n] -= 1;
326 if indeg[to_n] == 0 {
327 queue.push_back(to_n);
328 }
329 }
330 }
331
332 if topo.len() != num_nodes {
333 return Err(BuildError::CycleDetected);
334 }
335
336 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
338 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
340 port.downstream.push((to_n, to_p));
341 }
342 let in_ptr: *mut Port<T, BUF_SIZE> = self.nodes[to_n]
344 .node
345 .input_port_mut(to_p)
346 .map(|p| p as *mut Port<T, BUF_SIZE>)
347 .unwrap_or(std::ptr::null_mut());
348 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
349 let out_ptr: *mut Port<T, BUF_SIZE> = self.nodes[from_n]
350 .node
351 .output_port_mut(from_p)
352 .map(|p| p as *mut Port<T, BUF_SIZE>)
353 .unwrap_or(std::ptr::null_mut());
354 if !in_ptr.is_null() && !out_ptr.is_null() {
356 #[allow(unsafe_code)]
357 unsafe {
358 (*in_ptr).parent = parent;
359 (*out_ptr).downstream_input_ptrs.push(in_ptr);
360 }
361 }
362 }
363
364 for &(from_n, from_p, to_n, _) in &self.signal_edges {
366 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
367 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
368 let ptr_val = parent as usize;
369 let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
370 if !already {
371 port.downstream_nodes.push(parent);
372 }
373 }
374 }
375
376 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
378 let upstream = self.nodes[from_n]
379 .node
380 .output_port(from_p)
381 .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
382 if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
383 if port.upstream_buffer.is_none() {
384 port.upstream_buffer = upstream;
386 } else {
387 port.upstream_buffer = None;
389 }
390 }
391 }
392
393 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
395 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
396 port.feedback_buffer = Some(FixedBuffer::new());
397 port.feedback_downstream.push((to_n, to_p));
398 }
399 if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
400 port.feedback_buffer = Some(FixedBuffer::new());
401 }
402 }
403 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
405 let ptr = self.nodes[to_n]
406 .node
407 .input_port(to_p)
408 .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
409 .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
410 if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
411 if let Some(p) = ptr {
412 port.feedback_ptrs.push(p);
413 }
414 }
415 }
416
417 let sr = self.sample_rate.unwrap_or(44100.0);
418
419 let mut buffers = BufferRegistry::new();
421 for r in &self.resources {
422 if r.kind == "tape" {
423 if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
424 buffers.register(&r.name, Box::new(tape));
425 }
426 }
427 }
428
429 for entry in &mut self.nodes {
433 entry.node.resolve_resources(&buffers);
434 }
435
436 for (idx, be_name) in self.node_backends.iter().enumerate() {
440 let name = match be_name {
441 Some(ref n) => Some(n.clone()),
442 None => self.default_backend.clone(),
443 };
444 if let Some(ref name) = name {
445 let mut be_params = HashMap::new();
446 be_params.insert("sample_rate".into(), ParamValue::Float(sr));
447 be_params.insert("buffer_size".into(), ParamValue::Int(BUF_SIZE as i32));
448 if self.default_backend.as_ref() == Some(name) {
449 for (k, v) in &self.backend_params {
450 be_params.entry(k.clone()).or_insert_with(|| v.clone());
451 }
452 }
453 let backend = self
454 .backend_factory
455 .create(name, &be_params)
456 .map_err(BuildError::Backend)?;
457 if let Some(io_node) = self.nodes[idx].node.as_io_node_mut() {
458 io_node.resolve_backend(backend);
459 }
460 }
461 }
462
463 let mut nodes: Vec<NodeVariant<T, BUF_SIZE>> =
464 self.nodes.into_iter().map(|e| e.node).collect();
465
466 let cmd_queue = Arc::new(MpscQueue::<SetParameter>::with_capacity(64));
468 let mut active_node_idx = None;
469 for (i, n) in nodes.iter_mut().enumerate() {
470 if n.as_active_node_mut().is_some() {
471 active_node_idx = Some(i);
472 break;
473 }
474 }
475 let have_queue = active_node_idx.is_some();
476 let command_queue = if have_queue { Some(cmd_queue) } else { None };
477
478 let owned_buffers = buffers.into_inner();
479
480 let allocated = self.resources.clone();
481
482 Ok(Graph {
483 nodes,
484 topo_order: topo,
485 resources: allocated,
486 current_tick: ClockTick::new(0, BUF_SIZE as u32, sr),
487 buffers: owned_buffers,
488 active_node_idx,
489 command_queue,
490 clock_tx: self.clock_tx.clone(),
491 })
492 }
493}
494
495pub struct Graph<T: Transcendental, const BUF_SIZE: usize> {
507 nodes: Vec<NodeVariant<T, BUF_SIZE>>,
508 topo_order: Vec<usize>,
509 current_tick: ClockTick,
510 pub(crate) resources: Vec<GraphResource>,
512 #[allow(dead_code)]
514 buffers: Vec<Box<dyn Buffer<T> + Send>>,
515 active_node_idx: Option<usize>,
517 command_queue: Option<Arc<MpscQueue<SetParameter>>>,
519 clock_tx: Option<ActorRef<ClockTick>>,
521}
522
523impl<T: Transcendental, const BUF_SIZE: usize> Graph<T, BUF_SIZE> {
524 pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
530 &self.nodes
531 }
532
533 pub fn nodes_mut(&mut self) -> &mut [NodeVariant<T, BUF_SIZE>] {
535 &mut self.nodes
536 }
537
538 pub fn current_tick(&self) -> ClockTick {
540 self.current_tick
541 }
542
543 pub fn node_count(&self) -> usize {
545 self.nodes.len()
546 }
547
548 pub fn topo_order(&self) -> &[usize] {
550 &self.topo_order
551 }
552
553 #[allow(dead_code)]
554 pub(crate) fn sample_rate(&self) -> f32 {
555 self.current_tick.sample_rate
556 }
557
558 #[allow(dead_code)]
560 pub fn resources(&self) -> &[GraphResource] {
561 &self.resources
562 }
563
564 #[allow(unsafe_code)]
569 pub fn run(&mut self, running: Arc<AtomicBool>) -> Result<(), String> {
570 let Some(idx) = self.active_node_idx else {
571 return Ok(());
572 };
573 let source_idx = self.topo_order[0];
574 let cmd_queue = self
575 .command_queue
576 .clone()
577 .unwrap_or_else(|| Arc::new(MpscQueue::new()));
578 let clock_tx = self
579 .clock_tx
580 .clone()
581 .unwrap_or_else(|| ActorRef::new(&Arc::new(MpscQueue::new())));
582
583 let graph_ptr: *mut Graph<T, BUF_SIZE> = self;
584 let tick: Box<dyn FnMut(u64, f32)> = Box::new(move |sample_pos, sample_rate| {
585 let graph = unsafe { &mut *graph_ptr };
586 while let Some(cmd) = cmd_queue.pop() {
588 let i = cmd.port.node_id().inner() as usize;
589 if i < graph.nodes.len() {
590 let _ = graph.nodes[i].set_parameter(&cmd.parameter, cmd.value);
591 }
592 }
593 let tick = ClockTick::new(sample_pos, BUF_SIZE as u32, sample_rate);
595 let mut ctx = ProcessContext { clock: &tick };
596 let _ = graph.nodes[source_idx].process_block(&mut ctx);
597 let action_ctx = ActionContext::new(&tick);
598 for po in 0..graph.nodes[source_idx].num_signal_outputs() {
599 if let Some(port) = graph.nodes[source_idx].output_port(po) {
600 let _ = port.propagate(port.buffer(), &action_ctx);
601 }
602 }
603 clock_tx.send(tick);
605 });
606
607 self.nodes[idx]
608 .as_active_node_mut()
609 .ok_or("no active node")?
610 .run(tick, running)
611 }
612
613 pub fn handle(&self) -> Option<ActorRef<SetParameter>> {
619 let mailbox = self.command_queue.as_ref()?;
620 Some(ActorRef::new(mailbox))
621 }
622
623 #[cfg(test)]
625 pub fn into_parts(
626 self,
627 ) -> (
628 Vec<NodeVariant<T, BUF_SIZE>>,
629 Vec<usize>,
630 ClockTick,
631 Vec<Box<dyn Buffer<T> + Send>>,
632 ) {
633 let Self {
634 nodes,
635 topo_order,
636 current_tick,
637 resources: _,
638 buffers,
639 active_node_idx: _,
640 command_queue: _,
641 clock_tx: _,
642 } = self;
643 (nodes, topo_order, current_tick, buffers)
644 }
645}
646
647impl<T: Transcendental, const BUF_SIZE: usize> ActorCell for Graph<T, BUF_SIZE> {
652 type Msg = SetParameter;
653
654 fn receive(&mut self, msg: SetParameter) {
656 let idx = msg.port.node_id().inner() as usize;
657 if idx < self.nodes.len() {
658 let _ = self.nodes[idx].set_parameter(&msg.parameter, msg.value);
659 }
660 }
661}
662
663#[cfg(test)]
664mod tests {
665 use super::*;
666 use rill_core::math::Transcendental;
667 use rill_core::time::ClockTick;
668 use rill_core::traits::algorithm::ActionContext;
669 use rill_core::traits::processable::{ProcessContext, Processable};
670 use rill_core::traits::{
671 Node, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
672 PortDirection, PortId, ProcessResult, Processor, Sink, Source,
673 };
674 use std::sync::Arc;
675
676 fn test_builder<const B: usize>() -> GraphBuilder<f32, B> {
678 GraphBuilder::new(
679 Arc::new(NodeFactory::new()),
680 Arc::new(BackendFactory::new()),
681 )
682 }
683
684 struct ConstantSource<T: Transcendental, const BUF_SIZE: usize> {
688 value: T,
689 state: NodeState<T, BUF_SIZE>,
690 outputs: Vec<Port<T, BUF_SIZE>>,
691 }
692
693 impl<T: Transcendental, const BUF_SIZE: usize> ConstantSource<T, BUF_SIZE> {
694 fn new(value: T, sample_rate: f32) -> Self {
695 let mut outputs = Vec::with_capacity(1);
696 outputs.push(Port {
697 id: PortId::signal_out(NodeId(0), 0),
698 name: "output".into(),
699 direction: PortDirection::Output,
700 action: None,
701 pending_command: None,
702 buffer: Default::default(),
703 feedback_buffer: None,
704 downstream: Vec::new(),
705 feedback_downstream: Vec::new(),
706 feedback_ptrs: Vec::new(),
707 downstream_input_ptrs: Vec::new(),
708 downstream_nodes: Vec::new(),
709 parent: std::ptr::null_mut(),
710 upstream_buffer: None,
711 });
712 Self {
713 value,
714 state: NodeState::new(sample_rate),
715 outputs,
716 }
717 }
718 }
719
720 impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
721 fn metadata(&self) -> NodeMetadata {
722 NodeMetadata {
723 type_name: None,
724 name: "ConstantSource".into(),
725 category: NodeCategory::Source,
726 description: String::new(),
727 author: String::new(),
728 version: "1.0".into(),
729 signal_inputs: 0,
730 signal_outputs: 1,
731 control_inputs: 0,
732 control_outputs: 0,
733 clock_inputs: 0,
734 clock_outputs: 0,
735 feedback_ports: 0,
736 parameters: vec![],
737 }
738 }
739 fn init(&mut self, _sample_rate: f32) {}
740 fn reset(&mut self) {}
741 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
742 None
743 }
744 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
745 Ok(())
746 }
747 fn id(&self) -> NodeId {
748 NodeId(0)
749 }
750 fn set_id(&mut self, _id: NodeId) {}
751 fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
752 None
753 }
754 fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
755 None
756 }
757 fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
758 self.outputs.get(index)
759 }
760 fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
761 self.outputs.get_mut(index)
762 }
763 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
764 None
765 }
766 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
767 None
768 }
769 fn state(&self) -> &NodeState<T, BUF_SIZE> {
770 &self.state
771 }
772 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
773 &mut self.state
774 }
775 }
776
777 impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
778 fn generate(
779 &mut self,
780 _clock: &ClockTick,
781 _control_inputs: &[T],
782 _clock_inputs: &[ClockTick],
783 ) -> ProcessResult<()> {
784 let out = self.outputs[0].buffer.as_mut_array();
785 for sample in out.iter_mut() {
786 *sample = self.value;
787 }
788 Ok(())
789 }
790 fn num_signal_outputs(&self) -> usize {
791 1
792 }
793 }
794
795 struct NoopProcessor<T: Transcendental, const BUF_SIZE: usize> {
799 state: NodeState<T, BUF_SIZE>,
800 }
801
802 impl<T: Transcendental, const BUF_SIZE: usize> NoopProcessor<T, BUF_SIZE> {
803 fn new(sample_rate: f32) -> Self {
804 Self {
805 state: NodeState::new(sample_rate),
806 }
807 }
808 }
809
810 impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for NoopProcessor<T, BUF_SIZE> {
811 fn metadata(&self) -> NodeMetadata {
812 NodeMetadata {
813 type_name: None,
814 name: "NoopProcessor".into(),
815 category: NodeCategory::Processor,
816 description: String::new(),
817 author: String::new(),
818 version: "1.0".into(),
819 signal_inputs: 0,
820 signal_outputs: 0,
821 control_inputs: 0,
822 control_outputs: 0,
823 clock_inputs: 0,
824 clock_outputs: 0,
825 feedback_ports: 0,
826 parameters: vec![],
827 }
828 }
829 fn init(&mut self, _sample_rate: f32) {}
830 fn reset(&mut self) {}
831 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
832 None
833 }
834 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
835 Ok(())
836 }
837 fn id(&self) -> NodeId {
838 NodeId(1)
839 }
840 fn set_id(&mut self, _id: NodeId) {}
841 fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
842 None
843 }
844 fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
845 None
846 }
847 fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
848 None
849 }
850 fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
851 None
852 }
853 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
854 None
855 }
856 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
857 None
858 }
859 fn state(&self) -> &NodeState<T, BUF_SIZE> {
860 &self.state
861 }
862 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
863 &mut self.state
864 }
865 }
866
867 impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
868 for NoopProcessor<T, BUF_SIZE>
869 {
870 fn process(
871 &mut self,
872 _clock: &ClockTick,
873 _signal_inputs: &[&[T; BUF_SIZE]],
874 _control_inputs: &[T],
875 _clock_inputs: &[ClockTick],
876 _feedback_inputs: &[&[T; BUF_SIZE]],
877 ) -> ProcessResult<()> {
878 Ok(())
879 }
880 }
881
882 struct NoopSink<T: Transcendental, const BUF_SIZE: usize> {
886 state: NodeState<T, BUF_SIZE>,
887 }
888
889 impl<T: Transcendental, const BUF_SIZE: usize> NoopSink<T, BUF_SIZE> {
890 fn new(sample_rate: f32) -> Self {
891 Self {
892 state: NodeState::new(sample_rate),
893 }
894 }
895 }
896
897 impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
898 fn metadata(&self) -> NodeMetadata {
899 NodeMetadata {
900 type_name: None,
901 name: "NoopSink".into(),
902 category: NodeCategory::Sink,
903 description: String::new(),
904 author: String::new(),
905 version: "1.0".into(),
906 signal_inputs: 0,
907 signal_outputs: 0,
908 control_inputs: 0,
909 control_outputs: 0,
910 clock_inputs: 0,
911 clock_outputs: 0,
912 feedback_ports: 0,
913 parameters: vec![],
914 }
915 }
916 fn init(&mut self, _sample_rate: f32) {}
917 fn reset(&mut self) {}
918 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
919 None
920 }
921 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
922 Ok(())
923 }
924 fn id(&self) -> NodeId {
925 NodeId(2)
926 }
927 fn set_id(&mut self, _id: NodeId) {}
928 fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
929 None
930 }
931 fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
932 None
933 }
934 fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
935 None
936 }
937 fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
938 None
939 }
940 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
941 None
942 }
943 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
944 None
945 }
946 fn state(&self) -> &NodeState<T, BUF_SIZE> {
947 &self.state
948 }
949 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
950 &mut self.state
951 }
952 }
953
954 impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
955 fn consume(
956 &mut self,
957 _clock: &ClockTick,
958 _signal_inputs: &[&[T; BUF_SIZE]],
959 _control_inputs: &[T],
960 _clock_inputs: &[ClockTick],
961 _feedback_inputs: &[&[T; BUF_SIZE]],
962 ) -> ProcessResult<()> {
963 Ok(())
964 }
965 }
966
967 #[test]
972 fn test_topo_order_correct() {
973 const BUF: usize = 64;
974 let mut builder = test_builder::<BUF>();
975
976 let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
977 let proc = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
978 let sink = builder.add_sink(Box::new(NoopSink::new(44100.0)));
979
980 builder.connect_signal(src, 0, proc, 0);
981 builder.connect_signal(proc, 0, sink, 0);
982
983 let graph = builder.build().expect("build failed");
984
985 let order = graph.topo_order();
986 let src_pos = order.iter().position(|&i| i == src).unwrap();
987 let proc_pos = order.iter().position(|&i| i == proc).unwrap();
988 let sink_pos = order.iter().position(|&i| i == sink).unwrap();
989 assert!(src_pos < proc_pos);
990 assert!(proc_pos < sink_pos);
991 }
992
993 #[test]
994 fn test_cycle_detection() {
995 const BUF: usize = 64;
996 let mut builder = test_builder::<BUF>();
997
998 let a = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
999 let b = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
1000
1001 builder.connect_signal(a, 0, b, 0);
1002 builder.connect_signal(b, 0, a, 0);
1003
1004 let result = builder.build();
1005 assert!(matches!(result, Err(BuildError::CycleDetected)));
1006 }
1007
1008 #[test]
1009 fn test_source_node_create() {
1010 const BUF: usize = 64;
1011 let mut builder = test_builder::<BUF>();
1012 let idx = builder.add_source(Box::new(ConstantSource::new(0.5, 44100.0)));
1013 let graph = builder.build().expect("build failed");
1014 assert_eq!(graph.node_count(), 1);
1015 assert_eq!(graph.topo_order(), &[idx]);
1016 }
1017
1018 pub struct TestSink<T: Transcendental, const BUF_SIZE: usize> {
1024 id: NodeId,
1025 state: NodeState<T, BUF_SIZE>,
1026 pub inputs: Vec<Port<T, BUF_SIZE>>,
1027 last_value: T,
1028 }
1029
1030 impl<T: Transcendental, const BUF_SIZE: usize> TestSink<T, BUF_SIZE> {
1031 fn new(id: NodeId, sample_rate: f32) -> Self {
1032 let mut inputs = Vec::new();
1033 inputs.push(Port::input(id, 0, "in"));
1034 Self {
1035 id,
1036 state: NodeState::new(sample_rate),
1037 inputs,
1038 last_value: T::ZERO,
1039 }
1040 }
1041 }
1042
1043 impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
1044 fn metadata(&self) -> NodeMetadata {
1045 NodeMetadata {
1046 type_name: None,
1047 name: "TestSink".into(),
1048 category: NodeCategory::Sink,
1049 description: String::new(),
1050 author: String::new(),
1051 version: "1.0".into(),
1052 signal_inputs: 1,
1053 signal_outputs: 0,
1054 control_inputs: 0,
1055 control_outputs: 0,
1056 clock_inputs: 0,
1057 clock_outputs: 0,
1058 feedback_ports: 0,
1059 parameters: vec![],
1060 }
1061 }
1062 fn init(&mut self, _: f32) {}
1063 fn reset(&mut self) {
1064 self.state.sample_pos = 0;
1065 self.state.blocks_processed = 0;
1066 }
1067 fn id(&self) -> NodeId {
1068 self.id
1069 }
1070 fn set_id(&mut self, id: NodeId) {
1071 self.id = id;
1072 }
1073 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
1074 None
1075 }
1076 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
1077 Ok(())
1078 }
1079 fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1080 self.inputs.get(i)
1081 }
1082 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1083 self.inputs.get_mut(i)
1084 }
1085 fn output_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1086 None
1087 }
1088 fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1089 None
1090 }
1091 fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1092 None
1093 }
1094 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1095 None
1096 }
1097 fn num_signal_inputs(&self) -> usize {
1098 1
1099 }
1100 fn num_signal_outputs(&self) -> usize {
1101 0
1102 }
1103 fn state(&self) -> &NodeState<T, BUF_SIZE> {
1104 &self.state
1105 }
1106 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
1107 &mut self.state
1108 }
1109 }
1110
1111 impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
1112 fn consume(
1113 &mut self,
1114 _clock: &ClockTick,
1115 _signal_inputs: &[&[T; BUF_SIZE]],
1116 _control_inputs: &[T],
1117 _clock_inputs: &[ClockTick],
1118 _feedback_inputs: &[&[T; BUF_SIZE]],
1119 ) -> ProcessResult<()> {
1120 if let Some(port) = self.inputs.first() {
1121 self.last_value = port.buffer.as_array()[0];
1122 }
1123 self.state.advance();
1124 Ok(())
1125 }
1126 }
1127
1128 pub struct GainProcessor<T: Transcendental, const BUF_SIZE: usize> {
1130 id: NodeId,
1131 state: NodeState<T, BUF_SIZE>,
1132 pub inputs: Vec<Port<T, BUF_SIZE>>,
1133 pub outputs: Vec<Port<T, BUF_SIZE>>,
1134 pub multiplier: T,
1135 }
1136
1137 impl<T: Transcendental, const BUF_SIZE: usize> GainProcessor<T, BUF_SIZE> {
1138 fn new(id: NodeId, sample_rate: f32, multiplier: T) -> Self {
1139 let mut inputs = Vec::new();
1140 inputs.push(Port::input(id, 0, "in"));
1141 let mut outputs = Vec::new();
1142 outputs.push(Port::output(id, 0, "out"));
1143 Self {
1144 id,
1145 state: NodeState::new(sample_rate),
1146 inputs,
1147 outputs,
1148 multiplier,
1149 }
1150 }
1151 }
1152
1153 impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for GainProcessor<T, BUF_SIZE> {
1154 fn metadata(&self) -> NodeMetadata {
1155 NodeMetadata {
1156 type_name: None,
1157 name: "GainProcessor".into(),
1158 category: NodeCategory::Processor,
1159 description: String::new(),
1160 author: String::new(),
1161 version: "1.0".into(),
1162 signal_inputs: 1,
1163 signal_outputs: 1,
1164 control_inputs: 0,
1165 control_outputs: 0,
1166 clock_inputs: 0,
1167 clock_outputs: 0,
1168 feedback_ports: 0,
1169 parameters: vec![],
1170 }
1171 }
1172 fn init(&mut self, _: f32) {}
1173 fn reset(&mut self) {
1174 self.state.sample_pos = 0;
1175 self.state.blocks_processed = 0;
1176 }
1177 fn id(&self) -> NodeId {
1178 self.id
1179 }
1180 fn set_id(&mut self, id: NodeId) {
1181 self.id = id;
1182 }
1183 fn get_parameter(&self, id: &ParameterId) -> Option<ParamValue> {
1184 match id.as_str() {
1185 "multiplier" => Some(ParamValue::Float(self.multiplier.to_f32())),
1186 _ => None,
1187 }
1188 }
1189 fn set_parameter(&mut self, id: &ParameterId, value: ParamValue) -> ProcessResult<()> {
1190 match id.as_str() {
1191 "multiplier" => {
1192 if let Some(v) = value.as_f32() {
1193 self.multiplier = T::from_f32(v);
1194 Ok(())
1195 } else {
1196 Err(rill_core::ProcessError::parameter("expected float"))
1197 }
1198 }
1199 _ => Err(rill_core::ProcessError::parameter("unknown")),
1200 }
1201 }
1202 fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1203 self.inputs.get(i)
1204 }
1205 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1206 self.inputs.get_mut(i)
1207 }
1208 fn output_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1209 self.outputs.get(i)
1210 }
1211 fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1212 self.outputs.get_mut(i)
1213 }
1214 fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1215 None
1216 }
1217 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1218 None
1219 }
1220 fn num_signal_inputs(&self) -> usize {
1221 1
1222 }
1223 fn num_signal_outputs(&self) -> usize {
1224 1
1225 }
1226 fn state(&self) -> &NodeState<T, BUF_SIZE> {
1227 &self.state
1228 }
1229 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
1230 &mut self.state
1231 }
1232 }
1233
1234 impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
1235 for GainProcessor<T, BUF_SIZE>
1236 {
1237 fn process(
1238 &mut self,
1239 _clock: &ClockTick,
1240 _signal_inputs: &[&[T; BUF_SIZE]],
1241 _control_inputs: &[T],
1242 _clock_inputs: &[ClockTick],
1243 _feedback_inputs: &[&[T; BUF_SIZE]],
1244 ) -> ProcessResult<()> {
1245 let inp = *self.inputs[0].buffer.as_array();
1246 let out = self.outputs[0].buffer.as_mut_array();
1247 for i in 0..BUF_SIZE {
1248 out[i] = inp[i] * self.multiplier;
1249 }
1250 self.state.advance();
1251 Ok(())
1252 }
1253 fn latency(&self) -> usize {
1254 0
1255 }
1256 }
1257
1258 #[test]
1261 fn test_graph_source_to_sink() {
1262 use rill_core::traits::algorithm::ActionContext;
1263 use rill_core::traits::processable::{ProcessContext, Processable};
1264 const BUF: usize = 64;
1265 let mut builder = test_builder::<BUF>();
1266 let src = builder.add_source(Box::new(ConstantSource::new(42.0, 44100.0)));
1267 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(1), 44100.0)));
1268 builder.connect_signal(src, 0, snk, 0);
1269 let graph = builder.build().unwrap();
1270 let (mut nodes, topo, _, _bufs) = graph.into_parts();
1271 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1272
1273 let mut ctx = ProcessContext { clock: &tick };
1274 let _ = nodes[topo[0]].process_block(&mut ctx);
1275 let action_ctx = ActionContext::new(&tick);
1276 let out_port = nodes[topo[0]].output_port(0).unwrap();
1277 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1278
1279 let sink_val = nodes[topo[1]].input_port(0).unwrap().buffer.as_array()[0];
1280 assert_eq!(sink_val, 42.0, "sink should receive source value");
1281 }
1282
1283 #[test]
1286 fn test_graph_source_proc_sink() {
1287 use rill_core::traits::algorithm::ActionContext;
1288 use rill_core::traits::processable::{ProcessContext, Processable};
1289 const BUF: usize = 64;
1290 let mut builder = test_builder::<BUF>();
1291 let src = builder.add_source(Box::new(ConstantSource::new(10.0, 44100.0)));
1292 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1293 NodeId(1),
1294 44100.0,
1295 3.0,
1296 )));
1297 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1298 builder.connect_signal(src, 0, proc, 0);
1299 builder.connect_signal(proc, 0, snk, 0);
1300 let graph = builder.build().unwrap();
1301 let (mut nodes, topo, _, _bufs) = graph.into_parts();
1302 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1303
1304 let mut ctx = ProcessContext { clock: &tick };
1305 let _ = nodes[topo[0]].process_block(&mut ctx);
1306 let action_ctx = ActionContext::new(&tick);
1307 let out_port = nodes[topo[0]].output_port(0).unwrap();
1308 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1309
1310 let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1311 assert!(
1312 (sink_val - 30.0).abs() < 1e-6,
1313 "source(10)×gain(3)=30, got {}",
1314 sink_val
1315 );
1316 }
1317
1318 #[test]
1321 fn test_command_queue_drain() {
1322 use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1323 use rill_core::traits::PortId;
1324
1325 const BUF: usize = 64;
1326 let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1327
1328 let mut builder = test_builder::<BUF>();
1329 builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1330 NodeId(0),
1331 44100.0,
1332 2.0,
1333 )));
1334 let graph = builder.build().unwrap();
1335 let (mut nodes, _, _, _bufs) = graph.into_parts();
1336
1337 let _ = queue.push(SetParameter::new(
1338 PortId::control_in(NodeId(0), 0),
1339 ParameterId::new("multiplier").unwrap(),
1340 ParamValue::Float(5.0),
1341 SignalOrigin::Manual,
1342 ));
1343
1344 while let Some(cmd) = queue.pop() {
1345 let idx = cmd.port.node_id().inner() as usize;
1346 let pid = cmd.parameter.clone();
1347 let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1348 }
1349
1350 let pid = ParameterId::new("multiplier").unwrap();
1351 let val = nodes[0].get_parameter(&pid).unwrap().as_f32().unwrap();
1352 assert!(
1353 (val - 5.0).abs() < 1e-6,
1354 "multiplier should be 5.0, got {}",
1355 val
1356 );
1357 }
1358
1359 #[test]
1362 fn test_command_then_propagate() {
1363 use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1364 use rill_core::traits::algorithm::ActionContext;
1365 use rill_core::traits::processable::{ProcessContext, Processable};
1366 use rill_core::traits::PortId;
1367
1368 const BUF: usize = 64;
1369 let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1370
1371 let mut builder = test_builder::<BUF>();
1372 let src = builder.add_source(Box::new(ConstantSource::new(7.0, 44100.0)));
1373 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1374 NodeId(1),
1375 44100.0,
1376 2.0,
1377 )));
1378 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1379 builder.connect_signal(src, 0, proc, 0);
1380 builder.connect_signal(proc, 0, snk, 0);
1381 let graph = builder.build().unwrap();
1382 let (mut nodes, topo, _, _bufs) = graph.into_parts();
1383 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1384
1385 let _ = queue.push(SetParameter::new(
1387 PortId::control_in(NodeId(1), 0),
1388 ParameterId::new("multiplier").unwrap(),
1389 ParamValue::Float(4.0),
1390 SignalOrigin::Manual,
1391 ));
1392 while let Some(cmd) = queue.pop() {
1393 let idx = cmd.port.node_id().inner() as usize;
1394 let pid = cmd.parameter.clone();
1395 let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1396 }
1397
1398 let pid = ParameterId::new("multiplier").unwrap();
1400 let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
1401 assert!((val - 4.0).abs() < 1e-6);
1402
1403 let mut ctx = ProcessContext { clock: &tick };
1405 let _ = nodes[topo[0]].process_block(&mut ctx);
1406 let action_ctx = ActionContext::new(&tick);
1407 let out_port = nodes[topo[0]].output_port(0).unwrap();
1408 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1409
1410 let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1411 assert!(
1412 (sink_val - 28.0).abs() < 1e-6,
1413 "source(7)×gain(4)=28, got {}",
1414 sink_val
1415 );
1416 }
1417
1418 #[test]
1421 fn test_feedback_propagation() {
1422 use rill_core::traits::algorithm::ActionContext;
1423 use rill_core::traits::processable::{ProcessContext, Processable};
1424
1425 const BUF: usize = 64;
1426 let mut builder = test_builder::<BUF>();
1427 let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
1428 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1429 NodeId(1),
1430 44100.0,
1431 2.0,
1432 )));
1433 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1434 builder.connect_signal(src, 0, proc, 0);
1435 builder.connect_signal(proc, 0, snk, 0);
1436 builder.connect_feedback(proc, 0, proc, 0);
1437 let graph = builder.build().unwrap();
1438 let (mut nodes, topo, _, _bufs) = graph.into_parts();
1439
1440 let tick1 = ClockTick::new(0, BUF as u32, 44100.0);
1442 let mut ctx = ProcessContext { clock: &tick1 };
1443 let _ = nodes[topo[0]].process_block(&mut ctx);
1444 let ctx1 = ActionContext::new(&tick1);
1445 let out_port = nodes[topo[0]].output_port(0).unwrap();
1446 out_port.propagate(out_port.buffer(), &ctx1).unwrap();
1447 let block1 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1448 assert!(
1449 (block1 - 2.0).abs() < 1e-6,
1450 "block1: 1.0×2.0=2.0, got {}",
1451 block1
1452 );
1453
1454 let tick2 = ClockTick::new(BUF as u64, BUF as u32, 44100.0);
1456 let mut ctx = ProcessContext { clock: &tick2 };
1457 let _ = nodes[topo[0]].process_block(&mut ctx);
1458 let ctx2 = ActionContext::new(&tick2);
1459 let out_port = nodes[topo[0]].output_port(0).unwrap();
1460 out_port.propagate(out_port.buffer(), &ctx2).unwrap();
1461 let block2 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1462 assert!(
1463 (block2 - 6.0).abs() < 1e-6,
1464 "block2: (1+2)×2=6.0, got {}",
1465 block2
1466 );
1467 }
1468
1469 #[test]
1472 fn test_drain_fn_before_propagate() {
1473 use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1474 use rill_core::traits::algorithm::ActionContext;
1475 use rill_core::traits::processable::{ProcessContext, Processable};
1476 use rill_core::traits::PortId;
1477
1478 const BUF: usize = 64;
1479 let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1480
1481 let mut builder = test_builder::<BUF>();
1482 let src = builder.add_source(Box::new(ConstantSource::new(5.0, 44100.0)));
1483 let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1484 NodeId(1),
1485 44100.0,
1486 1.0,
1487 )));
1488 let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1489 builder.connect_signal(src, 0, proc, 0);
1490 builder.connect_signal(proc, 0, snk, 0);
1491 let graph = builder.build().unwrap();
1492 let (mut nodes, topo, _, _bufs) = graph.into_parts();
1493 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1494
1495 let _ = queue.push(SetParameter::new(
1497 PortId::control_in(NodeId(1), 0),
1498 ParameterId::new("multiplier").unwrap(),
1499 ParamValue::Float(3.0),
1500 SignalOrigin::Manual,
1501 ));
1502
1503 while let Some(cmd) = queue.pop() {
1505 let idx = cmd.port.node_id().inner() as usize;
1506 let pid = cmd.parameter.clone();
1507 let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1508 }
1509
1510 let pid = ParameterId::new("multiplier").unwrap();
1512 let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
1513 assert!(
1514 (val - 3.0).abs() < 1e-6,
1515 "multiplier should be 3.0, got {}",
1516 val
1517 );
1518
1519 let mut ctx = ProcessContext { clock: &tick };
1521 let _ = nodes[topo[0]].process_block(&mut ctx).unwrap();
1522
1523 let action_ctx = ActionContext::new(&tick);
1525 let out_port = nodes[topo[0]].output_port(0).unwrap();
1526 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1527
1528 let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1530 assert!(
1531 (sink_val - 15.0).abs() < 1e-6,
1532 "source(5)×gain(3)=15, got {}",
1533 sink_val
1534 );
1535 }
1536}