1use crate::backend_factory::BackendFactory;
2use crate::factory::NodeFactory;
3use rill_core::buffer::{Buffer, BufferRegistry, FixedBuffer, TapeLoop};
4use rill_core::math::Transcendental;
5use rill_core::queues::CommandEnum;
6use rill_core::time::ClockTick;
7use rill_core::traits::algorithm::ActionContext;
8use rill_core::traits::port::Port;
9use rill_core::traits::processable::{ProcessContext, Processable};
10use rill_core::traits::ParamValue;
11use rill_core::traits::{Node, NodeId, NodeVariant, Params};
12use rill_core_actor::{Actor, ActorRef, ActorSystem};
13use std::cell::UnsafeCell;
14use std::collections::{HashMap, VecDeque};
15use std::rc::Rc;
16use std::sync::atomic::AtomicBool;
17use std::sync::Arc;
18
19#[derive(Debug, Clone)]
29pub enum BuildError {
30 CycleDetected,
32 Backend(String),
34 Registry(String),
36}
37
38impl std::fmt::Display for BuildError {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 Self::CycleDetected => write!(f, "graph cycle detected"),
42 Self::Backend(msg) => write!(f, "backend error: {msg}"),
43 Self::Registry(msg) => write!(f, "registry error: {msg}"),
44 }
45 }
46}
47
48struct NodeRecipe<T: Transcendental, const BUF_SIZE: usize> {
58 type_name: String,
59 id: NodeId,
60 params: Params,
61 backend: Option<String>,
62 routing_entries: Vec<(usize, usize, f32)>,
63 _phantom: std::marker::PhantomData<(T, [(); BUF_SIZE])>,
64}
65
66struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
68 node: NodeVariant<T, BUF_SIZE>,
69}
70
71#[derive(Clone)]
77pub struct GraphResource {
78 pub name: String,
80 pub kind: String,
82 pub capacity: usize,
84}
85
86pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
99 recipes: Vec<NodeRecipe<T, BUF_SIZE>>,
100 signal_edges: Vec<(usize, usize, usize, usize)>,
101 control_edges: Vec<(usize, usize, usize, usize)>,
102 clock_edges: Vec<(usize, usize, usize, usize)>,
103 feedback_edges: Vec<(usize, usize, usize, usize)>,
104 resources: Vec<GraphResource>,
105 factory: Arc<NodeFactory<T, BUF_SIZE>>,
107 backend_factory: Arc<BackendFactory<T>>,
109 default_backend: Option<String>,
111 backend_params: HashMap<String, ParamValue>,
113 sample_rate: Option<f32>,
116 parent_ref: Option<ActorRef<CommandEnum>>,
118}
119
120impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
121 pub fn new(
123 factory: Arc<NodeFactory<T, BUF_SIZE>>,
124 backend_factory: Arc<BackendFactory<T>>,
125 ) -> Self {
126 Self {
127 recipes: Vec::new(),
128 signal_edges: Vec::new(),
129 control_edges: Vec::new(),
130 clock_edges: Vec::new(),
131 feedback_edges: Vec::new(),
132 resources: Vec::new(),
133 factory,
134 backend_factory,
135 default_backend: None,
136 backend_params: HashMap::new(),
137 sample_rate: None,
138 parent_ref: None,
139 }
140 }
141
142 pub fn add_node(&mut self, type_name: &str, params: &Params) -> usize {
149 let id = NodeId(self.recipes.len() as u32);
150 self.add_node_with_id(type_name, params, id)
151 }
152
153 pub fn add_node_with_id(&mut self, type_name: &str, params: &Params, id: NodeId) -> usize {
159 let idx = self.recipes.len();
160 self.recipes.push(NodeRecipe {
161 type_name: type_name.to_string(),
162 id,
163 params: params.clone(),
164 backend: None,
165 routing_entries: Vec::new(),
166 _phantom: std::marker::PhantomData,
167 });
168 idx
169 }
170
171 pub fn set_node_backend(&mut self, idx: usize, name: String) {
173 if let Some(recipe) = self.recipes.get_mut(idx) {
174 recipe.backend = Some(name);
175 }
176 }
177
178 pub fn add_routing_entry(&mut self, idx: usize, from: usize, to: usize, gain: f32) {
180 if let Some(recipe) = self.recipes.get_mut(idx) {
181 recipe.routing_entries.push((from, to, gain));
182 }
183 }
184
185 pub fn add_resource(&mut self, resource: GraphResource) {
187 self.resources.push(resource);
188 }
189
190 pub fn node_count(&self) -> usize {
192 self.recipes.len()
193 }
194
195 pub fn set_default_backend(&mut self, name: String, params: HashMap<String, ParamValue>) {
197 self.default_backend = Some(name);
198 self.backend_params = params;
199 }
200
201 pub fn default_backend_name(&self) -> Option<&String> {
203 self.default_backend.as_ref()
204 }
205
206 pub fn set_sample_rate(&mut self, sr: f32) {
208 self.sample_rate = Some(sr);
209 }
210
211 pub fn set_parent_ref(&mut self, parent: ActorRef<CommandEnum>) {
213 self.parent_ref = Some(parent);
214 }
215
216 pub fn backend_factory(&self) -> &Arc<BackendFactory<T>> {
218 &self.backend_factory
219 }
220
221 pub fn connect_signal(
223 &mut self,
224 from_node: usize,
225 from_port: usize,
226 to_node: usize,
227 to_port: usize,
228 ) {
229 self.signal_edges
230 .push((from_node, from_port, to_node, to_port));
231 }
232
233 pub fn connect_control(
235 &mut self,
236 from_node: usize,
237 from_port: usize,
238 to_node: usize,
239 to_port: usize,
240 ) {
241 self.control_edges
242 .push((from_node, from_port, to_node, to_port));
243 }
244
245 pub fn connect_clock(
247 &mut self,
248 from_node: usize,
249 from_port: usize,
250 to_node: usize,
251 to_port: usize,
252 ) {
253 self.clock_edges
254 .push((from_node, from_port, to_node, to_port));
255 }
256
257 pub fn connect_feedback(
259 &mut self,
260 from_node: usize,
261 from_port: usize,
262 to_node: usize,
263 to_port: usize,
264 ) {
265 self.feedback_edges
266 .push((from_node, from_port, to_node, to_port));
267 }
268
269 pub fn build(self, system: &ActorSystem) -> Result<Graph<T, BUF_SIZE>, BuildError> {
275 let mut node_entries: Vec<NodeEntry<T, BUF_SIZE>> = Vec::with_capacity(self.recipes.len());
277 for recipe in &self.recipes {
278 let node = self
279 .factory
280 .construct(&recipe.type_name, recipe.id, &recipe.params)
281 .map_err(|e| BuildError::Registry(format!("{e}")))?;
282 node_entries.push(NodeEntry { node });
283 }
284
285 for (idx, node) in node_entries.iter_mut().enumerate() {
287 for &(from, to, gain) in &self.recipes[idx].routing_entries {
288 if let NodeVariant::Router(ref mut router) = node.node {
289 router.set_connection(from, to, T::from_f32(gain)).ok();
290 }
291 }
292 }
293
294 let num_nodes = node_entries.len();
295
296 let sr = self.sample_rate.unwrap_or(44100.0);
298 for (idx, recipe) in self.recipes.iter().enumerate() {
299 let name = match recipe.backend.as_ref() {
300 Some(n) => Some(n.clone()),
301 None => self.default_backend.clone(),
302 };
303 if let Some(ref name) = name {
304 let mut be_params = HashMap::new();
305 be_params.insert("sample_rate".into(), ParamValue::Float(sr));
306 be_params.insert("buffer_size".into(), ParamValue::Int(BUF_SIZE as i32));
307 if self.default_backend.as_ref() == Some(name) {
308 for (k, v) in &self.backend_params {
309 be_params.entry(k.clone()).or_insert_with(|| v.clone());
310 }
311 }
312 let backend = self
313 .backend_factory
314 .create(name, &be_params)
315 .map_err(BuildError::Backend)?;
316 if let Some(io_node) = node_entries[idx].node.as_io_node_mut() {
317 io_node.resolve_backend(backend);
318 }
319 }
320 }
321
322 let mut in_degree = vec![0usize; num_nodes];
324 let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
325
326 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
327 in_degree[to_n] += 1;
328 out_edges[from_n].push((from_p, to_n, to_p));
329 }
330
331 let mut queue: VecDeque<usize> = in_degree
333 .iter()
334 .enumerate()
335 .filter(|(_, &d)| d == 0)
336 .map(|(i, _)| i)
337 .collect();
338
339 let mut topo = Vec::with_capacity(num_nodes);
340 let mut indeg = in_degree;
341 while let Some(idx) = queue.pop_front() {
342 topo.push(idx);
343 for &(_, to_n, _) in &out_edges[idx] {
344 indeg[to_n] -= 1;
345 if indeg[to_n] == 0 {
346 queue.push_back(to_n);
347 }
348 }
349 }
350
351 if topo.len() != num_nodes {
352 return Err(BuildError::CycleDetected);
353 }
354
355 let mut nodes: Vec<NodeVariant<T, BUF_SIZE>> =
357 node_entries.into_iter().map(|e| e.node).collect();
358
359 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
361 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
362 port.downstream.push((to_n, to_p));
363 }
364 let in_ptr: *mut Port<T, BUF_SIZE> = nodes[to_n]
365 .input_port_mut(to_p)
366 .map(|p| p as *mut Port<T, BUF_SIZE>)
367 .unwrap_or(std::ptr::null_mut());
368 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
369 let out_ptr: *mut Port<T, BUF_SIZE> = nodes[from_n]
370 .output_port_mut(from_p)
371 .map(|p| p as *mut Port<T, BUF_SIZE>)
372 .unwrap_or(std::ptr::null_mut());
373 if !in_ptr.is_null() && !out_ptr.is_null() {
374 #[allow(unsafe_code)]
375 unsafe {
376 (*in_ptr).parent = parent;
377 (*out_ptr).downstream_input_ptrs.push(in_ptr);
378 }
379 }
380 }
381
382 for &(from_n, from_p, to_n, _) in &self.signal_edges {
384 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
385 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
386 let ptr_val = parent as usize;
387 let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
388 if !already {
389 port.downstream_nodes.push(parent);
390 }
391 }
392 }
393
394 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
396 let upstream = nodes[from_n]
397 .output_port(from_p)
398 .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
399 if let Some(port) = nodes[to_n].input_port_mut(to_p) {
400 if port.upstream_buffer.is_none() {
401 port.upstream_buffer = upstream;
402 } else {
403 port.upstream_buffer = None;
404 }
405 }
406 }
407
408 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
410 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
411 port.feedback_buffer = Some(FixedBuffer::new());
412 port.feedback_downstream.push((to_n, to_p));
413 }
414 if let Some(port) = nodes[to_n].input_port_mut(to_p) {
415 port.feedback_buffer = Some(FixedBuffer::new());
416 }
417 }
418 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
419 let ptr = nodes[to_n]
420 .input_port(to_p)
421 .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
422 .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
423 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
424 if let Some(p) = ptr {
425 port.feedback_ptrs.push(p);
426 }
427 }
428 }
429
430 let mut buffers = BufferRegistry::new();
432 for r in &self.resources {
433 if r.kind == "tape" {
434 if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
435 buffers.register(&r.name, Box::new(tape));
436 }
437 }
438 }
439 for entry in nodes.iter_mut() {
440 entry.resolve_resources(&buffers);
441 }
442
443 let source_idx = topo.first().copied().unwrap_or(0);
444 let mut active_node_idx = 0;
445 for (i, n) in nodes.iter_mut().enumerate() {
446 if n.as_active_node_mut().is_some() {
447 active_node_idx = i;
448 break;
449 }
450 }
451
452 let owned_buffers = buffers.into_inner();
453 let allocated = self.resources.clone();
454
455 let nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>> = Rc::new(UnsafeCell::new(nodes));
457
458 let actor = system.spawn("graph", {
459 let n = nodes.clone();
460 #[allow(unsafe_code)]
461 move |msg: CommandEnum| {
462 if let CommandEnum::SetParameter(param) = msg {
463 let idx = param.port.node_id().inner() as usize;
464 unsafe {
465 let nv = &mut *n.get();
466 if idx < nv.len() {
467 let _ = nv[idx].set_parameter(¶m.parameter, param.value);
468 }
469 }
470 }
471 }
472 });
473
474 let actor_ref = actor.actor_ref();
475
476 Ok(Graph {
477 nodes,
478 topo_order: topo,
479 resources: allocated,
480 current_tick: ClockTick::new(0, BUF_SIZE as u32, sr),
481 buffers: owned_buffers,
482 source_idx,
483 active_node_idx,
484 actor: Some(actor),
485 actor_ref,
486 parent_ref: self.parent_ref.clone(),
487 })
488 }
489}
490
491pub struct Graph<T: Transcendental, const BUF_SIZE: usize> {
503 nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>>,
504 topo_order: Vec<usize>,
505 source_idx: usize,
506 active_node_idx: usize,
507 current_tick: ClockTick,
508 pub(crate) resources: Vec<GraphResource>,
509 #[allow(dead_code)]
510 buffers: Vec<Box<dyn Buffer<T> + Send>>,
511 actor: Option<Actor<CommandEnum>>,
512 actor_ref: ActorRef<CommandEnum>,
513 parent_ref: Option<ActorRef<CommandEnum>>,
514}
515
516impl<T: Transcendental, const BUF_SIZE: usize> Graph<T, BUF_SIZE> {
517 #[allow(unsafe_code)]
523 pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
524 unsafe { &*self.nodes.get() }
525 }
526
527 pub fn current_tick(&self) -> ClockTick {
529 self.current_tick
530 }
531
532 #[allow(unsafe_code)]
534 pub fn node_count(&self) -> usize {
535 unsafe { (*self.nodes.get()).len() }
536 }
537
538 pub fn topo_order(&self) -> &[usize] {
540 &self.topo_order
541 }
542
543 #[allow(dead_code)]
544 pub(crate) fn sample_rate(&self) -> f32 {
545 self.current_tick.sample_rate
546 }
547
548 #[allow(dead_code)]
550 pub fn resources(&self) -> &[GraphResource] {
551 &self.resources
552 }
553
554 #[allow(unsafe_code)]
556 pub fn run(&mut self, running: Arc<AtomicBool>) -> Result<(), String> {
557 let mut actor = self.actor.take().ok_or("graph already running")?;
558 let source = self.source_idx;
559 let parent = self.parent_ref.clone();
560 let nodes = self.nodes.clone();
561 let idx = self.active_node_idx;
562
563 let tick: Box<dyn FnMut(u64, f32)> = Box::new(move |sample_pos, sample_rate| {
564 actor.drain();
565 let tick = ClockTick::new(sample_pos, BUF_SIZE as u32, sample_rate);
566 let mut ctx = ProcessContext { clock: &tick };
567 unsafe {
568 let nv = &mut *nodes.get();
569 let _ = nv[source].process_block(&mut ctx);
570 let action_ctx = ActionContext::new(&tick);
571 for po in 0..nv[source].num_signal_outputs() {
572 if let Some(port) = nv[source].output_port(po) {
573 let _ = port.propagate(port.buffer(), &action_ctx);
574 }
575 }
576 }
577 if let Some(ref parent) = parent {
578 parent.send(CommandEnum::ClockTick(tick));
579 }
580 });
581
582 unsafe {
583 self.nodes.get().as_mut().unwrap()[idx]
584 .as_active_node_mut()
585 .ok_or("no active node")?
586 .run(tick, running)
587 }
588 }
589
590 pub fn handle(&self) -> ActorRef<CommandEnum> {
592 self.actor_ref.clone()
593 }
594
595 #[cfg(test)]
597 pub fn into_parts(
598 self,
599 ) -> (
600 Vec<NodeVariant<T, BUF_SIZE>>,
601 Vec<usize>,
602 ClockTick,
603 Vec<Box<dyn Buffer<T> + Send>>,
604 ) {
605 let Self {
606 nodes,
607 topo_order,
608 current_tick,
609 resources: _,
610 buffers,
611 active_node_idx: _,
612 source_idx: _,
613 actor,
614 actor_ref: _,
615 parent_ref: _,
616 } = self;
617 drop(actor);
618 let nodes = Rc::try_unwrap(nodes).unwrap().into_inner();
619 (nodes, topo_order, current_tick, buffers)
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626 use rill_core::math::Transcendental;
627 use rill_core::time::ClockTick;
628 use rill_core::traits::{
629 Node, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
630 ProcessResult, Processor, Sink, Source,
631 };
632 use rill_core_actor::ActorSystem;
633 use std::sync::Arc;
634
635 fn test_system() -> ActorSystem {
636 ActorSystem::new()
637 }
638
639 fn test_factory<const B: usize>() -> Arc<NodeFactory<f32, B>> {
640 let mut f = NodeFactory::<f32, B>::new();
641
642 f.register_fn("test/const", |id, params| {
643 let value = params.get_f32("value", 1.0);
644 let mut node = ConstantSource::<f32, B>::new(id, value, params.sample_rate);
645 node.init(params.sample_rate);
646 NodeVariant::Source(Box::new(node))
647 });
648
649 f.register_fn("test/gain", |id, params| {
650 let gain = params.get_f32("gain", 1.0);
651 let mut node = GainProcessor::<f32, B>::new(id, params.sample_rate, gain);
652 node.init(params.sample_rate);
653 NodeVariant::Processor(Box::new(node))
654 });
655
656 f.register_fn("test/capture", |id, params| {
657 let mut node = CaptureSink::<f32, B>::new(id, params.sample_rate);
658 node.init(params.sample_rate);
659 NodeVariant::Sink(Box::new(node))
660 });
661
662 Arc::new(f)
663 }
664
665 fn test_builder<const B: usize>(factory: &Arc<NodeFactory<f32, B>>) -> GraphBuilder<f32, B> {
666 GraphBuilder::new(factory.clone(), Arc::new(BackendFactory::new()))
667 }
668
669 fn test_params(sample_rate: f32) -> Params {
670 let mut p = Params::new(sample_rate);
671 p.insert("value".to_string(), ParamValue::Float(sample_rate));
672 p
673 }
674
675 pub(crate) struct ConstantSource<T: Transcendental, const B: usize> {
680 id: NodeId,
681 value: T,
682 state: NodeState<T, B>,
683 output: Port<T, B>,
684 }
685
686 impl<T: Transcendental, const B: usize> ConstantSource<T, B> {
687 pub fn new(id: NodeId, value: T, sample_rate: f32) -> Self {
688 let state = NodeState::new(sample_rate);
689 let mut output = Port::output(id, 0, "out");
690 output.buffer = FixedBuffer::new();
691 Self {
692 id,
693 value,
694 state,
695 output,
696 }
697 }
698 }
699
700 impl<T: Transcendental, const B: usize> Node<T, B> for ConstantSource<T, B> {
701 fn id(&self) -> NodeId {
702 self.id
703 }
704 fn set_id(&mut self, id: NodeId) {
705 self.id = id;
706 }
707 fn metadata(&self) -> NodeMetadata {
708 NodeMetadata {
709 name: "ConstantSource".into(),
710 type_name: Some("test/const".into()),
711 category: NodeCategory::Source,
712 description: String::new(),
713 author: String::new(),
714 version: String::new(),
715 parameters: vec![],
716 signal_inputs: 0,
717 signal_outputs: 1,
718 control_inputs: 0,
719 control_outputs: 0,
720 clock_inputs: 0,
721 clock_outputs: 0,
722 feedback_ports: 0,
723 }
724 }
725 fn init(&mut self, _: f32) {}
726 fn reset(&mut self) {}
727 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
728 None
729 }
730 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
731 Ok(())
732 }
733 fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
734 None
735 }
736 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
737 None
738 }
739 fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
740 if i == 0 {
741 Some(&self.output)
742 } else {
743 None
744 }
745 }
746 fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
747 if i == 0 {
748 Some(&mut self.output)
749 } else {
750 None
751 }
752 }
753 fn input_port(&self, _: usize) -> Option<&Port<T, B>> {
754 None
755 }
756 fn input_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
757 None
758 }
759 fn state(&self) -> &NodeState<T, B> {
760 &self.state
761 }
762 fn state_mut(&mut self) -> &mut NodeState<T, B> {
763 &mut self.state
764 }
765 }
766
767 impl<T: Transcendental, const B: usize> Source<T, B> for ConstantSource<T, B> {
768 fn generate(&mut self, _: &ClockTick, _: &[T], _: &[ClockTick]) -> ProcessResult<()> {
769 self.output.buffer.as_mut_array().fill(self.value);
770 Ok(())
771 }
772 }
773
774 pub(crate) struct GainProcessor<T: Transcendental, const B: usize> {
779 id: NodeId,
780 gain: T,
781 state: NodeState<T, B>,
782 input: Port<T, B>,
783 output: Port<T, B>,
784 }
785
786 impl<T: Transcendental, const B: usize> GainProcessor<T, B> {
787 pub fn new(id: NodeId, sample_rate: f32, gain: T) -> Self {
788 let state = NodeState::new(sample_rate);
789 let input = Port::input(id, 0, "in");
790 let output = Port::output(id, 0, "out");
791 Self {
792 id,
793 gain,
794 state,
795 input,
796 output,
797 }
798 }
799 }
800
801 impl<T: Transcendental, const B: usize> Node<T, B> for GainProcessor<T, B> {
802 fn id(&self) -> NodeId {
803 self.id
804 }
805 fn set_id(&mut self, id: NodeId) {
806 self.id = id;
807 }
808 fn metadata(&self) -> NodeMetadata {
809 NodeMetadata {
810 name: "GainProcessor".into(),
811 type_name: Some("test/gain".into()),
812 category: NodeCategory::Processor,
813 description: String::new(),
814 author: String::new(),
815 version: String::new(),
816 parameters: vec![],
817 signal_inputs: 1,
818 signal_outputs: 1,
819 control_inputs: 0,
820 control_outputs: 0,
821 clock_inputs: 0,
822 clock_outputs: 0,
823 feedback_ports: 0,
824 }
825 }
826 fn init(&mut self, _: f32) {}
827 fn reset(&mut self) {}
828 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
829 None
830 }
831 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
832 Ok(())
833 }
834 fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
835 None
836 }
837 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
838 None
839 }
840 fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
841 if i == 0 {
842 Some(&self.input)
843 } else {
844 None
845 }
846 }
847 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
848 if i == 0 {
849 Some(&mut self.input)
850 } else {
851 None
852 }
853 }
854 fn num_signal_outputs(&self) -> usize {
855 1
856 }
857 fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
858 if i == 0 {
859 Some(&self.output)
860 } else {
861 None
862 }
863 }
864 fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
865 if i == 0 {
866 Some(&mut self.output)
867 } else {
868 None
869 }
870 }
871 fn state(&self) -> &NodeState<T, B> {
872 &self.state
873 }
874 fn state_mut(&mut self) -> &mut NodeState<T, B> {
875 &mut self.state
876 }
877 }
878
879 impl<T: Transcendental, const B: usize> Processor<T, B> for GainProcessor<T, B> {
880 fn process(
881 &mut self,
882 _: &ClockTick,
883 _: &[&[T; B]],
884 _: &[T],
885 _: &[ClockTick],
886 _: &[&[T; B]],
887 ) -> ProcessResult<()> {
888 let src = self.input.buffer.as_array();
889 let buf = self.output.buffer.as_mut_array();
890 for i in 0..B {
891 buf[i] = src[i] * self.gain;
892 }
893 Ok(())
894 }
895 }
896
897 pub(crate) struct CaptureSink<T: Transcendental, const B: usize> {
902 id: NodeId,
903 state: NodeState<T, B>,
904 input: Port<T, B>,
905 }
906
907 impl<T: Transcendental, const B: usize> CaptureSink<T, B> {
908 pub fn new(id: NodeId, sample_rate: f32) -> Self {
909 let state = NodeState::new(sample_rate);
910 let input = Port::input(id, 0, "in");
911 Self { id, state, input }
912 }
913 }
914
915 impl<T: Transcendental, const B: usize> Node<T, B> for CaptureSink<T, B> {
916 fn id(&self) -> NodeId {
917 self.id
918 }
919 fn set_id(&mut self, id: NodeId) {
920 self.id = id;
921 }
922 fn metadata(&self) -> NodeMetadata {
923 NodeMetadata {
924 name: "CaptureSink".into(),
925 type_name: Some("test/capture".into()),
926 category: NodeCategory::Sink,
927 description: String::new(),
928 author: String::new(),
929 version: String::new(),
930 parameters: vec![],
931 signal_inputs: 1,
932 signal_outputs: 0,
933 control_inputs: 0,
934 control_outputs: 0,
935 clock_inputs: 0,
936 clock_outputs: 0,
937 feedback_ports: 0,
938 }
939 }
940 fn init(&mut self, _: f32) {}
941 fn reset(&mut self) {}
942 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
943 None
944 }
945 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
946 Ok(())
947 }
948 fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
949 None
950 }
951 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
952 None
953 }
954 fn output_port(&self, _: usize) -> Option<&Port<T, B>> {
955 None
956 }
957 fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
958 None
959 }
960 fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
961 if i == 0 {
962 Some(&self.input)
963 } else {
964 None
965 }
966 }
967 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
968 if i == 0 {
969 Some(&mut self.input)
970 } else {
971 None
972 }
973 }
974 fn state(&self) -> &NodeState<T, B> {
975 &self.state
976 }
977 fn state_mut(&mut self) -> &mut NodeState<T, B> {
978 &mut self.state
979 }
980 }
981
982 impl<T: Transcendental, const B: usize> Sink<T, B> for CaptureSink<T, B> {
983 fn consume(
984 &mut self,
985 _: &ClockTick,
986 _: &[&[T; B]],
987 _: &[T],
988 _: &[ClockTick],
989 _: &[&[T; B]],
990 ) -> ProcessResult<()> {
991 Ok(())
992 }
993 }
994
995 const BUF: usize = 64;
1000
1001 #[test]
1002 #[allow(unsafe_code)]
1003 fn test_graph_source_to_sink() {
1004 let factory = test_factory::<BUF>();
1005 let mut builder = test_builder::<BUF>(&factory);
1006 let system = test_system();
1007
1008 let src_idx = builder.add_node("test/const", &test_params(44100.0));
1009 let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1010 builder.connect_signal(src_idx, 0, snk_idx, 0);
1011
1012 let graph = builder.build(&system).unwrap();
1013 let source_idx = graph.source_idx;
1014
1015 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1016 let mut ctx = ProcessContext { clock: &tick };
1017 let nodes = graph.nodes.clone();
1018 unsafe {
1019 let nv = &mut *nodes.get();
1020 nv[source_idx].process_block(&mut ctx).unwrap();
1021 let action_ctx = ActionContext::new(&tick);
1022 if let Some(port) = nv[source_idx].output_port(0) {
1023 port.propagate(port.buffer(), &action_ctx).unwrap();
1024 }
1025 }
1026 unsafe {
1027 let nv = &*nodes.get();
1028 let val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1029 assert!(val != 0.0, "signal should have propagated, got {}", val);
1030 }
1031 }
1032
1033 #[test]
1034 #[allow(unsafe_code)]
1035 fn test_graph_source_proc_sink() {
1036 let factory = test_factory::<BUF>();
1037 let mut builder = test_builder::<BUF>(&factory);
1038 let system = test_system();
1039
1040 let mut params = test_params(44100.0);
1041 params.insert("value".to_string(), ParamValue::Float(5.0));
1042 let src_idx = builder.add_node("test/const", ¶ms);
1043
1044 let mut gain_params = test_params(44100.0);
1045 gain_params.insert("gain".to_string(), ParamValue::Float(3.0));
1046 let proc_idx = builder.add_node("test/gain", &gain_params);
1047
1048 let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1049
1050 builder.connect_signal(src_idx, 0, proc_idx, 0);
1051 builder.connect_signal(proc_idx, 0, snk_idx, 0);
1052
1053 let graph = builder.build(&system).unwrap();
1054 let source_idx = graph.source_idx;
1055
1056 eprintln!("topo: {:?}", graph.topo_order);
1057 eprintln!("source_idx: {source_idx}, src_idx: {src_idx}, proc_idx: {proc_idx}, snk_idx: {snk_idx}");
1058
1059 let tick = ClockTick::new(0, BUF as u32, 44100.0);
1060 let mut ctx = ProcessContext { clock: &tick };
1061 let nodes = graph.nodes.clone();
1062 unsafe {
1063 let nv = &mut *nodes.get();
1064 eprintln!(
1065 "node types: src={:?}, proc={:?}, snk={:?}",
1066 std::mem::discriminant(&nv[0]),
1067 std::mem::discriminant(&nv[1]),
1068 std::mem::discriminant(&nv[2]),
1069 );
1070
1071 let _ = nv[source_idx].process_block(&mut ctx);
1072 let src_val = nv[source_idx].output_port(0).unwrap().buffer.as_array()[0];
1073 eprintln!("source output: {src_val}");
1074
1075 let action_ctx = ActionContext::new(&tick);
1076 let out_port = nv[source_idx].output_port(0).unwrap();
1077 eprintln!(
1078 "source output port downstream_nodes: {}",
1079 out_port.downstream_nodes.len()
1080 );
1081 eprintln!(
1082 "source output port downstream_input_ptrs: {}",
1083 out_port.downstream_input_ptrs.len()
1084 );
1085
1086 {
1088 let proc_port = nv[proc_idx].output_port(0).unwrap();
1089 eprintln!(
1090 "PROC OUT port downstream_nodes: {}",
1091 proc_port.downstream_nodes.len()
1092 );
1093 eprintln!(
1094 "PROC OUT port downstream_input_ptrs: {}",
1095 proc_port.downstream_input_ptrs.len()
1096 );
1097 for (i, &dn) in proc_port.downstream.iter().enumerate() {
1098 eprintln!(" downstream[{}]: (node={}, port={})", i, dn.0, dn.1);
1099 }
1100 }
1101
1102 let src_out = nv[source_idx].output_port(0).unwrap();
1104 let proc_in = nv[proc_idx].input_port(0).unwrap();
1105 let proc_out = nv[proc_idx].output_port(0).unwrap();
1106 let snk_in = nv[snk_idx].input_port(0).unwrap();
1107 eprintln!("BUFFER ADDRESSES:");
1108 eprintln!(
1109 " src output buf: {:p}",
1110 src_out.buffer.as_array().as_ptr()
1111 );
1112 eprintln!(
1113 " proc input buf: {:p}",
1114 proc_in.buffer.as_array().as_ptr()
1115 );
1116 eprintln!(
1117 " proc output buf: {:p}",
1118 proc_out.buffer.as_array().as_ptr()
1119 );
1120 eprintln!(" snk input buf: {:p}", snk_in.buffer.as_array().as_ptr());
1121 eprintln!(
1122 " proc_in.upstream_buffer.is_some(): {}",
1123 proc_in.upstream_buffer.is_some()
1124 );
1125 eprintln!(
1126 " snk_in.upstream_buffer.is_some(): {}",
1127 snk_in.upstream_buffer.is_some()
1128 );
1129 out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1132
1133 {
1135 let nv = &*nodes.get();
1136 let snk_in = nv[snk_idx].input_port(0).unwrap();
1137 eprintln!(
1138 "AFTER propagate - snk input buf[0] via .buffer: {}",
1139 snk_in.buffer.as_array()[0]
1140 );
1141 if let Some(up) = snk_in.upstream_buffer {
1142 eprintln!(
1143 "AFTER propagate - snk input via upstream ptr: {}",
1144 (*up).as_array()[0]
1145 );
1146 }
1147 }
1148
1149 let sink_buf = nv[snk_idx].input_port(0).unwrap().buffer.as_array();
1150 eprintln!("SINK input port buffer first sample: {}", sink_buf[0]);
1151
1152 let proc_out_port = nv[proc_idx].output_port(0).unwrap();
1154 eprintln!(
1155 "proc output port downstream_nodes: {}",
1156 proc_out_port.downstream_nodes.len()
1157 );
1158 eprintln!(
1159 "proc output port downstream_input_ptrs: {}",
1160 proc_out_port.downstream_input_ptrs.len()
1161 );
1162
1163 let sink_val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1165 eprintln!("sink input AFTER propagate: {sink_val}");
1166
1167 assert!(
1168 (sink_val - 15.0).abs() < 1e-4,
1169 "expected 15.0, got {}",
1170 sink_val
1171 );
1172 }
1173 }
1174}