1use crate::backend_factory::BackendFactory;
2use crate::factory::NodeFactory;
3use rill_core::buffer::{Buffer, BufferRegistry, FixedBuffer, TapeLoop};
4use rill_core::math::Transcendental;
5use rill_core::queues::CommandEnum;
6use rill_core::time::{ClockTick, RenderContext, SystemClock};
7use rill_core::traits::port::Port;
8use rill_core::traits::processable::Processable;
9use rill_core::traits::ParamValue;
10use rill_core::traits::{Node, NodeId, NodeVariant, Params};
11use rill_core_actor::{Actor, ActorRef, ActorSystem};
12use std::cell::UnsafeCell;
13use std::collections::{HashMap, VecDeque};
14use std::rc::Rc;
15use std::sync::atomic::AtomicBool;
16use std::sync::Arc;
17
18#[derive(Debug, Clone)]
28pub enum BuildError {
29 CycleDetected,
31 Backend(String),
33 Registry(String),
35}
36
37impl std::fmt::Display for BuildError {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Self::CycleDetected => write!(f, "graph cycle detected"),
41 Self::Backend(msg) => write!(f, "backend error: {msg}"),
42 Self::Registry(msg) => write!(f, "registry error: {msg}"),
43 }
44 }
45}
46
47struct NodeRecipe<T: Transcendental, const BUF_SIZE: usize> {
57 type_name: String,
58 id: NodeId,
59 params: Params,
60 backend: Option<String>,
61 routing_entries: Vec<(usize, usize, f32)>,
62 _phantom: std::marker::PhantomData<(T, [(); BUF_SIZE])>,
63}
64
65struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
67 node: NodeVariant<T, BUF_SIZE>,
68}
69
70#[derive(Clone)]
76pub struct GraphResource {
77 pub name: String,
79 pub kind: String,
81 pub capacity: usize,
83}
84
85pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
98 recipes: Vec<NodeRecipe<T, BUF_SIZE>>,
99 signal_edges: Vec<(usize, usize, usize, usize)>,
100 control_edges: Vec<(usize, usize, usize, usize)>,
101 clock_edges: Vec<(usize, usize, usize, usize)>,
102 feedback_edges: Vec<(usize, usize, usize, usize)>,
103 resources: Vec<GraphResource>,
104 factory: Arc<NodeFactory<T, BUF_SIZE>>,
106 backend_factory: Arc<BackendFactory<T>>,
108 default_backend: Option<String>,
110 backend_params: HashMap<String, ParamValue>,
112 sample_rate: Option<f32>,
115 parent_ref: Option<ActorRef<CommandEnum>>,
117}
118
119impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
120 pub fn new(
122 factory: Arc<NodeFactory<T, BUF_SIZE>>,
123 backend_factory: Arc<BackendFactory<T>>,
124 ) -> Self {
125 Self {
126 recipes: Vec::new(),
127 signal_edges: Vec::new(),
128 control_edges: Vec::new(),
129 clock_edges: Vec::new(),
130 feedback_edges: Vec::new(),
131 resources: Vec::new(),
132 factory,
133 backend_factory,
134 default_backend: None,
135 backend_params: HashMap::new(),
136 sample_rate: None,
137 parent_ref: None,
138 }
139 }
140
141 pub fn add_node(&mut self, type_name: &str, params: &Params) -> usize {
148 let id = NodeId(self.recipes.len() as u32);
149 self.add_node_with_id(type_name, params, id)
150 }
151
152 pub fn add_node_with_id(&mut self, type_name: &str, params: &Params, id: NodeId) -> usize {
158 let idx = self.recipes.len();
159 self.recipes.push(NodeRecipe {
160 type_name: type_name.to_string(),
161 id,
162 params: params.clone(),
163 backend: None,
164 routing_entries: Vec::new(),
165 _phantom: std::marker::PhantomData,
166 });
167 idx
168 }
169
170 pub fn set_node_backend(&mut self, idx: usize, name: String) {
172 if let Some(recipe) = self.recipes.get_mut(idx) {
173 recipe.backend = Some(name);
174 }
175 }
176
177 pub fn add_routing_entry(&mut self, idx: usize, from: usize, to: usize, gain: f32) {
179 if let Some(recipe) = self.recipes.get_mut(idx) {
180 recipe.routing_entries.push((from, to, gain));
181 }
182 }
183
184 pub fn add_resource(&mut self, resource: GraphResource) {
186 self.resources.push(resource);
187 }
188
189 pub fn node_count(&self) -> usize {
191 self.recipes.len()
192 }
193
194 pub fn set_default_backend(&mut self, name: String, params: HashMap<String, ParamValue>) {
196 self.default_backend = Some(name);
197 self.backend_params = params;
198 }
199
200 pub fn default_backend_name(&self) -> Option<&String> {
202 self.default_backend.as_ref()
203 }
204
205 pub fn set_sample_rate(&mut self, sr: f32) {
207 self.sample_rate = Some(sr);
208 }
209
210 pub fn set_parent_ref(&mut self, parent: ActorRef<CommandEnum>) {
212 self.parent_ref = Some(parent);
213 }
214
215 pub fn backend_factory(&self) -> &Arc<BackendFactory<T>> {
217 &self.backend_factory
218 }
219
220 pub fn connect_signal(
222 &mut self,
223 from_node: usize,
224 from_port: usize,
225 to_node: usize,
226 to_port: usize,
227 ) {
228 self.signal_edges
229 .push((from_node, from_port, to_node, to_port));
230 }
231
232 pub fn connect_control(
234 &mut self,
235 from_node: usize,
236 from_port: usize,
237 to_node: usize,
238 to_port: usize,
239 ) {
240 self.control_edges
241 .push((from_node, from_port, to_node, to_port));
242 }
243
244 pub fn connect_clock(
246 &mut self,
247 from_node: usize,
248 from_port: usize,
249 to_node: usize,
250 to_port: usize,
251 ) {
252 self.clock_edges
253 .push((from_node, from_port, to_node, to_port));
254 }
255
256 pub fn connect_feedback(
258 &mut self,
259 from_node: usize,
260 from_port: usize,
261 to_node: usize,
262 to_port: usize,
263 ) {
264 self.feedback_edges
265 .push((from_node, from_port, to_node, to_port));
266 }
267
268 pub fn build(self, system: &ActorSystem) -> Result<Graph<T, BUF_SIZE>, BuildError> {
274 let mut node_entries: Vec<NodeEntry<T, BUF_SIZE>> = Vec::with_capacity(self.recipes.len());
276 for recipe in &self.recipes {
277 let node = self
278 .factory
279 .construct(&recipe.type_name, recipe.id, &recipe.params)
280 .map_err(|e| BuildError::Registry(format!("{e}")))?;
281 node_entries.push(NodeEntry { node });
282 }
283
284 for (idx, node) in node_entries.iter_mut().enumerate() {
286 for &(from, to, gain) in &self.recipes[idx].routing_entries {
287 if let NodeVariant::Router(ref mut router) = node.node {
288 router.set_connection(from, to, T::from_f32(gain)).ok();
289 }
290 }
291 }
292
293 let num_nodes = node_entries.len();
294
295 let sr = self.sample_rate.unwrap_or(44100.0);
297 for (idx, recipe) in self.recipes.iter().enumerate() {
298 let name = match recipe.backend.as_ref() {
299 Some(n) => Some(n.clone()),
300 None => self.default_backend.clone(),
301 };
302 if let Some(ref name) = name {
303 let mut be_params = HashMap::new();
304 be_params.insert("sample_rate".into(), ParamValue::Float(sr));
305 be_params.insert("buffer_size".into(), ParamValue::Int(BUF_SIZE as i32));
306 if self.default_backend.as_ref() == Some(name) {
307 for (k, v) in &self.backend_params {
308 be_params.entry(k.clone()).or_insert_with(|| v.clone());
309 }
310 }
311 let backend = self
312 .backend_factory
313 .create(name, &be_params)
314 .map_err(BuildError::Backend)?;
315 if let Some(io_node) = node_entries[idx].node.as_io_node_mut() {
316 io_node.resolve_backend(backend);
317 }
318 }
319 }
320
321 let mut in_degree = vec![0usize; num_nodes];
323 let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
324
325 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
326 in_degree[to_n] += 1;
327 out_edges[from_n].push((from_p, to_n, to_p));
328 }
329
330 let mut queue: VecDeque<usize> = in_degree
332 .iter()
333 .enumerate()
334 .filter(|(_, &d)| d == 0)
335 .map(|(i, _)| i)
336 .collect();
337
338 let mut topo = Vec::with_capacity(num_nodes);
339 let mut indeg = in_degree;
340 while let Some(idx) = queue.pop_front() {
341 topo.push(idx);
342 for &(_, to_n, _) in &out_edges[idx] {
343 indeg[to_n] -= 1;
344 if indeg[to_n] == 0 {
345 queue.push_back(to_n);
346 }
347 }
348 }
349
350 if topo.len() != num_nodes {
351 return Err(BuildError::CycleDetected);
352 }
353
354 let mut nodes: Vec<NodeVariant<T, BUF_SIZE>> =
356 node_entries.into_iter().map(|e| e.node).collect();
357
358 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
360 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
361 port.downstream.push((to_n, to_p));
362 }
363 let in_ptr: *mut Port<T, BUF_SIZE> = nodes[to_n]
364 .input_port_mut(to_p)
365 .map(|p| p as *mut Port<T, BUF_SIZE>)
366 .unwrap_or(std::ptr::null_mut());
367 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
368 let out_ptr: *mut Port<T, BUF_SIZE> = nodes[from_n]
369 .output_port_mut(from_p)
370 .map(|p| p as *mut Port<T, BUF_SIZE>)
371 .unwrap_or(std::ptr::null_mut());
372 if !in_ptr.is_null() && !out_ptr.is_null() {
373 #[allow(unsafe_code)]
374 unsafe {
375 (*in_ptr).parent = parent;
376 (*out_ptr).downstream_input_ptrs.push(in_ptr);
377 }
378 }
379 }
380
381 for &(from_n, from_p, to_n, _) in &self.signal_edges {
383 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
384 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
385 let ptr_val = parent as usize;
386 let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
387 if !already {
388 port.downstream_nodes.push(parent);
389 }
390 }
391 }
392
393 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
395 let upstream = nodes[from_n]
396 .output_port(from_p)
397 .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
398 if let Some(port) = nodes[to_n].input_port_mut(to_p) {
399 if port.upstream_buffer.is_none() {
400 port.upstream_buffer = upstream;
401 } else {
402 port.upstream_buffer = None;
403 }
404 }
405 }
406
407 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
409 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
410 port.feedback_buffer = Some(FixedBuffer::new());
411 port.feedback_downstream.push((to_n, to_p));
412 }
413 if let Some(port) = nodes[to_n].input_port_mut(to_p) {
414 port.feedback_buffer = Some(FixedBuffer::new());
415 }
416 }
417 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
418 let ptr = nodes[to_n]
419 .input_port(to_p)
420 .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
421 .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
422 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
423 if let Some(p) = ptr {
424 port.feedback_ptrs.push(p);
425 }
426 }
427 }
428
429 let mut buffers = BufferRegistry::new();
431 for r in &self.resources {
432 if r.kind == "tape" {
433 if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
434 buffers.register(&r.name, Box::new(tape));
435 }
436 }
437 }
438 for entry in nodes.iter_mut() {
439 entry.resolve_resources(&buffers);
440 }
441
442 let source_idx = topo.first().copied().unwrap_or(0);
443 let mut active_node_idx = 0;
444 for (i, n) in nodes.iter_mut().enumerate() {
445 if n.as_active_node_mut().is_some() {
446 active_node_idx = i;
447 break;
448 }
449 }
450
451 let owned_buffers = buffers.into_inner();
452 let allocated = self.resources.clone();
453
454 let nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>> = Rc::new(UnsafeCell::new(nodes));
456
457 let actor = system.spawn("graph", {
458 let n = nodes.clone();
459 #[allow(unsafe_code)]
460 move |msg: CommandEnum| {
461 if let CommandEnum::SetParameter(param) = msg {
462 let idx = param.port.node_id().inner() as usize;
463 unsafe {
464 let nv = &mut *n.get();
465 if idx < nv.len() {
466 let _ = nv[idx].set_parameter(¶m.parameter, param.value);
467 }
468 }
469 }
470 }
471 });
472
473 let actor_ref = actor.actor_ref();
474
475 Ok(Graph {
476 nodes,
477 topo_order: topo,
478 resources: allocated,
479 current_tick: ClockTick::new(0, BUF_SIZE as u32, sr),
480 buffers: owned_buffers,
481 source_idx,
482 active_node_idx,
483 actor: Some(actor),
484 actor_ref,
485 parent_ref: self.parent_ref.clone(),
486 system_clock: None,
487 })
488 }
489}
490
491pub struct Graph<T: Transcendental, const BUF_SIZE: usize> {
503 nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>>,
504 topo_order: Vec<usize>,
505 source_idx: usize,
506 active_node_idx: usize,
507 current_tick: ClockTick,
508 pub(crate) resources: Vec<GraphResource>,
509 #[allow(dead_code)]
510 buffers: Vec<Box<dyn Buffer<T> + Send>>,
511 actor: Option<Actor<CommandEnum>>,
512 actor_ref: ActorRef<CommandEnum>,
513 parent_ref: Option<ActorRef<CommandEnum>>,
514 pub system_clock: Option<Arc<SystemClock>>,
517}
518
519impl<T: Transcendental, const BUF_SIZE: usize> Graph<T, BUF_SIZE> {
520 #[allow(unsafe_code)]
526 pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
527 unsafe { &*self.nodes.get() }
528 }
529
530 pub fn current_tick(&self) -> ClockTick {
532 self.current_tick
533 }
534
535 #[allow(unsafe_code)]
537 pub fn node_count(&self) -> usize {
538 unsafe { (*self.nodes.get()).len() }
539 }
540
541 pub fn topo_order(&self) -> &[usize] {
543 &self.topo_order
544 }
545
546 #[allow(dead_code)]
547 pub(crate) fn sample_rate(&self) -> f32 {
548 self.current_tick.sample_rate
549 }
550
551 #[allow(dead_code)]
553 pub fn resources(&self) -> &[GraphResource] {
554 &self.resources
555 }
556
557 #[allow(unsafe_code)]
559 pub fn run(&mut self, running: Arc<AtomicBool>) -> Result<(), String> {
560 let mut actor = self.actor.take().ok_or("graph already running")?;
561 let source = self.source_idx;
562 let parent = self.parent_ref.clone();
563 let nodes = self.nodes.clone();
564 let idx = self.active_node_idx;
565 let sys_clock = self.system_clock.clone();
566
567 let tick: Box<dyn FnMut(u64, f32)> = Box::new(move |sample_pos, sample_rate| {
568 actor.drain();
569 let ctx = if let Some(ref clock) = sys_clock {
570 RenderContext::with_tempo(
571 sample_pos,
572 BUF_SIZE as u32,
573 sample_rate,
574 clock.bpm() as f32,
575 )
576 } else {
577 RenderContext::new(sample_pos, BUF_SIZE as u32, sample_rate)
578 };
579 let tick = ClockTick::with_tempo(sample_pos, BUF_SIZE as u32, sample_rate, ctx.bpm());
580 unsafe {
581 let nv = &mut *nodes.get();
582 let _ = nv[source].process_block(&ctx);
583 for po in 0..nv[source].num_signal_outputs() {
584 if let Some(port) = nv[source].output_port(po) {
585 let _ = port.propagate(port.buffer(), &ctx);
586 }
587 }
588 }
589 if let Some(ref parent) = parent {
590 parent.send(CommandEnum::ClockTick(tick));
591 }
592 });
593
594 unsafe {
595 self.nodes.get().as_mut().unwrap()[idx]
596 .as_active_node_mut()
597 .ok_or("no active node")?
598 .run(tick, running)
599 }
600 }
601
602 pub fn handle(&self) -> ActorRef<CommandEnum> {
604 self.actor_ref.clone()
605 }
606
607 #[cfg(test)]
609 pub fn into_parts(
610 self,
611 ) -> (
612 Vec<NodeVariant<T, BUF_SIZE>>,
613 Vec<usize>,
614 ClockTick,
615 Vec<Box<dyn Buffer<T> + Send>>,
616 ) {
617 let Self {
618 nodes,
619 topo_order,
620 current_tick,
621 resources: _,
622 buffers,
623 active_node_idx: _,
624 source_idx: _,
625 actor,
626 actor_ref: _,
627 parent_ref: _,
628 system_clock: _,
629 } = self;
630 drop(actor);
631 let nodes = Rc::try_unwrap(nodes).unwrap().into_inner();
632 (nodes, topo_order, current_tick, buffers)
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639 use rill_core::math::Transcendental;
640 use rill_core::time::RenderContext;
641 use rill_core::traits::{
642 Node, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
643 ProcessResult, Processor, Sink, Source,
644 };
645 use rill_core_actor::ActorSystem;
646 use std::sync::Arc;
647
648 fn test_system() -> ActorSystem {
649 ActorSystem::new()
650 }
651
652 fn test_factory<const B: usize>() -> Arc<NodeFactory<f32, B>> {
653 let mut f = NodeFactory::<f32, B>::new();
654
655 f.register_fn("test/const", |id, params| {
656 let value = params.get_f32("value", 1.0);
657 let mut node = ConstantSource::<f32, B>::new(id, value, params.sample_rate);
658 node.init(params.sample_rate);
659 NodeVariant::Source(Box::new(node))
660 });
661
662 f.register_fn("test/gain", |id, params| {
663 let gain = params.get_f32("gain", 1.0);
664 let mut node = GainProcessor::<f32, B>::new(id, params.sample_rate, gain);
665 node.init(params.sample_rate);
666 NodeVariant::Processor(Box::new(node))
667 });
668
669 f.register_fn("test/capture", |id, params| {
670 let mut node = CaptureSink::<f32, B>::new(id, params.sample_rate);
671 node.init(params.sample_rate);
672 NodeVariant::Sink(Box::new(node))
673 });
674
675 Arc::new(f)
676 }
677
678 fn test_builder<const B: usize>(factory: &Arc<NodeFactory<f32, B>>) -> GraphBuilder<f32, B> {
679 GraphBuilder::new(factory.clone(), Arc::new(BackendFactory::new()))
680 }
681
682 fn test_params(sample_rate: f32) -> Params {
683 let mut p = Params::new(sample_rate);
684 p.insert("value".to_string(), ParamValue::Float(sample_rate));
685 p
686 }
687
688 pub(crate) struct ConstantSource<T: Transcendental, const B: usize> {
693 id: NodeId,
694 value: T,
695 state: NodeState<T, B>,
696 output: Port<T, B>,
697 }
698
699 impl<T: Transcendental, const B: usize> ConstantSource<T, B> {
700 pub fn new(id: NodeId, value: T, sample_rate: f32) -> Self {
701 let state = NodeState::new(sample_rate);
702 let mut output = Port::output(id, 0, "out");
703 output.buffer = FixedBuffer::new();
704 Self {
705 id,
706 value,
707 state,
708 output,
709 }
710 }
711 }
712
713 impl<T: Transcendental, const B: usize> Node<T, B> for ConstantSource<T, B> {
714 fn id(&self) -> NodeId {
715 self.id
716 }
717 fn set_id(&mut self, id: NodeId) {
718 self.id = id;
719 }
720 fn metadata(&self) -> NodeMetadata {
721 NodeMetadata {
722 name: "ConstantSource".into(),
723 type_name: Some("test/const".into()),
724 category: NodeCategory::Source,
725 description: String::new(),
726 author: String::new(),
727 version: String::new(),
728 parameters: vec![],
729 signal_inputs: 0,
730 signal_outputs: 1,
731 control_inputs: 0,
732 control_outputs: 0,
733 clock_inputs: 0,
734 clock_outputs: 0,
735 feedback_ports: 0,
736 }
737 }
738 fn init(&mut self, _: f32) {}
739 fn reset(&mut self) {}
740 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
741 None
742 }
743 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
744 Ok(())
745 }
746 fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
747 None
748 }
749 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
750 None
751 }
752 fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
753 if i == 0 {
754 Some(&self.output)
755 } else {
756 None
757 }
758 }
759 fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
760 if i == 0 {
761 Some(&mut self.output)
762 } else {
763 None
764 }
765 }
766 fn input_port(&self, _: usize) -> Option<&Port<T, B>> {
767 None
768 }
769 fn input_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
770 None
771 }
772 fn state(&self) -> &NodeState<T, B> {
773 &self.state
774 }
775 fn state_mut(&mut self) -> &mut NodeState<T, B> {
776 &mut self.state
777 }
778 }
779
780 impl<T: Transcendental, const B: usize> Source<T, B> for ConstantSource<T, B> {
781 fn generate(
782 &mut self,
783 _: &RenderContext,
784 _: &[T],
785 _: &[RenderContext],
786 ) -> ProcessResult<()> {
787 self.output.buffer.as_mut_array().fill(self.value);
788 Ok(())
789 }
790 }
791
792 pub(crate) struct GainProcessor<T: Transcendental, const B: usize> {
797 id: NodeId,
798 gain: T,
799 state: NodeState<T, B>,
800 input: Port<T, B>,
801 output: Port<T, B>,
802 }
803
804 impl<T: Transcendental, const B: usize> GainProcessor<T, B> {
805 pub fn new(id: NodeId, sample_rate: f32, gain: T) -> Self {
806 let state = NodeState::new(sample_rate);
807 let input = Port::input(id, 0, "in");
808 let output = Port::output(id, 0, "out");
809 Self {
810 id,
811 gain,
812 state,
813 input,
814 output,
815 }
816 }
817 }
818
819 impl<T: Transcendental, const B: usize> Node<T, B> for GainProcessor<T, B> {
820 fn id(&self) -> NodeId {
821 self.id
822 }
823 fn set_id(&mut self, id: NodeId) {
824 self.id = id;
825 }
826 fn metadata(&self) -> NodeMetadata {
827 NodeMetadata {
828 name: "GainProcessor".into(),
829 type_name: Some("test/gain".into()),
830 category: NodeCategory::Processor,
831 description: String::new(),
832 author: String::new(),
833 version: String::new(),
834 parameters: vec![],
835 signal_inputs: 1,
836 signal_outputs: 1,
837 control_inputs: 0,
838 control_outputs: 0,
839 clock_inputs: 0,
840 clock_outputs: 0,
841 feedback_ports: 0,
842 }
843 }
844 fn init(&mut self, _: f32) {}
845 fn reset(&mut self) {}
846 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
847 None
848 }
849 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
850 Ok(())
851 }
852 fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
853 None
854 }
855 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
856 None
857 }
858 fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
859 if i == 0 {
860 Some(&self.input)
861 } else {
862 None
863 }
864 }
865 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
866 if i == 0 {
867 Some(&mut self.input)
868 } else {
869 None
870 }
871 }
872 fn num_signal_outputs(&self) -> usize {
873 1
874 }
875 fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
876 if i == 0 {
877 Some(&self.output)
878 } else {
879 None
880 }
881 }
882 fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
883 if i == 0 {
884 Some(&mut self.output)
885 } else {
886 None
887 }
888 }
889 fn state(&self) -> &NodeState<T, B> {
890 &self.state
891 }
892 fn state_mut(&mut self) -> &mut NodeState<T, B> {
893 &mut self.state
894 }
895 }
896
897 impl<T: Transcendental, const B: usize> Processor<T, B> for GainProcessor<T, B> {
898 fn process(
899 &mut self,
900 _: &RenderContext,
901 _: &[&[T; B]],
902 _: &[T],
903 _: &[RenderContext],
904 _: &[&[T; B]],
905 ) -> ProcessResult<()> {
906 let src = self.input.buffer.as_array();
907 let buf = self.output.buffer.as_mut_array();
908 for i in 0..B {
909 buf[i] = src[i] * self.gain;
910 }
911 Ok(())
912 }
913 }
914
915 pub(crate) struct CaptureSink<T: Transcendental, const B: usize> {
920 id: NodeId,
921 state: NodeState<T, B>,
922 input: Port<T, B>,
923 }
924
925 impl<T: Transcendental, const B: usize> CaptureSink<T, B> {
926 pub fn new(id: NodeId, sample_rate: f32) -> Self {
927 let state = NodeState::new(sample_rate);
928 let input = Port::input(id, 0, "in");
929 Self { id, state, input }
930 }
931 }
932
933 impl<T: Transcendental, const B: usize> Node<T, B> for CaptureSink<T, B> {
934 fn id(&self) -> NodeId {
935 self.id
936 }
937 fn set_id(&mut self, id: NodeId) {
938 self.id = id;
939 }
940 fn metadata(&self) -> NodeMetadata {
941 NodeMetadata {
942 name: "CaptureSink".into(),
943 type_name: Some("test/capture".into()),
944 category: NodeCategory::Sink,
945 description: String::new(),
946 author: String::new(),
947 version: String::new(),
948 parameters: vec![],
949 signal_inputs: 1,
950 signal_outputs: 0,
951 control_inputs: 0,
952 control_outputs: 0,
953 clock_inputs: 0,
954 clock_outputs: 0,
955 feedback_ports: 0,
956 }
957 }
958 fn init(&mut self, _: f32) {}
959 fn reset(&mut self) {}
960 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
961 None
962 }
963 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
964 Ok(())
965 }
966 fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
967 None
968 }
969 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
970 None
971 }
972 fn output_port(&self, _: usize) -> Option<&Port<T, B>> {
973 None
974 }
975 fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
976 None
977 }
978 fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
979 if i == 0 {
980 Some(&self.input)
981 } else {
982 None
983 }
984 }
985 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
986 if i == 0 {
987 Some(&mut self.input)
988 } else {
989 None
990 }
991 }
992 fn state(&self) -> &NodeState<T, B> {
993 &self.state
994 }
995 fn state_mut(&mut self) -> &mut NodeState<T, B> {
996 &mut self.state
997 }
998 }
999
1000 impl<T: Transcendental, const B: usize> Sink<T, B> for CaptureSink<T, B> {
1001 fn consume(
1002 &mut self,
1003 _: &RenderContext,
1004 _: &[&[T; B]],
1005 _: &[T],
1006 _: &[RenderContext],
1007 _: &[&[T; B]],
1008 ) -> ProcessResult<()> {
1009 Ok(())
1010 }
1011 }
1012
1013 const BUF: usize = 64;
1018
1019 #[test]
1020 #[allow(unsafe_code)]
1021 fn test_graph_source_to_sink() {
1022 let factory = test_factory::<BUF>();
1023 let mut builder = test_builder::<BUF>(&factory);
1024 let system = test_system();
1025
1026 let src_idx = builder.add_node("test/const", &test_params(44100.0));
1027 let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1028 builder.connect_signal(src_idx, 0, snk_idx, 0);
1029
1030 let graph = builder.build(&system).unwrap();
1031 let source_idx = graph.source_idx;
1032
1033 let ctx = RenderContext::new(0, BUF as u32, 44100.0);
1034 let nodes = graph.nodes.clone();
1035 unsafe {
1036 let nv = &mut *nodes.get();
1037 nv[source_idx].process_block(&ctx).unwrap();
1038 if let Some(port) = nv[source_idx].output_port(0) {
1039 port.propagate(port.buffer(), &ctx).unwrap();
1040 }
1041 }
1042 unsafe {
1043 let nv = &*nodes.get();
1044 let val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1045 assert!(val != 0.0, "signal should have propagated, got {}", val);
1046 }
1047 }
1048
1049 #[test]
1050 #[allow(unsafe_code)]
1051 fn test_graph_source_proc_sink() {
1052 let factory = test_factory::<BUF>();
1053 let mut builder = test_builder::<BUF>(&factory);
1054 let system = test_system();
1055
1056 let mut params = test_params(44100.0);
1057 params.insert("value".to_string(), ParamValue::Float(5.0));
1058 let src_idx = builder.add_node("test/const", ¶ms);
1059
1060 let mut gain_params = test_params(44100.0);
1061 gain_params.insert("gain".to_string(), ParamValue::Float(3.0));
1062 let proc_idx = builder.add_node("test/gain", &gain_params);
1063
1064 let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1065
1066 builder.connect_signal(src_idx, 0, proc_idx, 0);
1067 builder.connect_signal(proc_idx, 0, snk_idx, 0);
1068
1069 let graph = builder.build(&system).unwrap();
1070 let source_idx = graph.source_idx;
1071
1072 eprintln!("topo: {:?}", graph.topo_order);
1073 eprintln!("source_idx: {source_idx}, src_idx: {src_idx}, proc_idx: {proc_idx}, snk_idx: {snk_idx}");
1074
1075 let ctx = RenderContext::new(0, BUF as u32, 44100.0);
1076 let nodes = graph.nodes.clone();
1077 unsafe {
1078 let nv = &mut *nodes.get();
1079 eprintln!(
1080 "node types: src={:?}, proc={:?}, snk={:?}",
1081 std::mem::discriminant(&nv[0]),
1082 std::mem::discriminant(&nv[1]),
1083 std::mem::discriminant(&nv[2]),
1084 );
1085
1086 let _ = nv[source_idx].process_block(&ctx);
1087 let src_val = nv[source_idx].output_port(0).unwrap().buffer.as_array()[0];
1088 eprintln!("source output: {src_val}");
1089
1090 let out_port = nv[source_idx].output_port(0).unwrap();
1091 eprintln!(
1092 "source output port downstream_nodes: {}",
1093 out_port.downstream_nodes.len()
1094 );
1095 eprintln!(
1096 "source output port downstream_input_ptrs: {}",
1097 out_port.downstream_input_ptrs.len()
1098 );
1099
1100 {
1102 let proc_port = nv[proc_idx].output_port(0).unwrap();
1103 eprintln!(
1104 "PROC OUT port downstream_nodes: {}",
1105 proc_port.downstream_nodes.len()
1106 );
1107 eprintln!(
1108 "PROC OUT port downstream_input_ptrs: {}",
1109 proc_port.downstream_input_ptrs.len()
1110 );
1111 for (i, &dn) in proc_port.downstream.iter().enumerate() {
1112 eprintln!(" downstream[{}]: (node={}, port={})", i, dn.0, dn.1);
1113 }
1114 }
1115
1116 let src_out = nv[source_idx].output_port(0).unwrap();
1118 let proc_in = nv[proc_idx].input_port(0).unwrap();
1119 let proc_out = nv[proc_idx].output_port(0).unwrap();
1120 let snk_in = nv[snk_idx].input_port(0).unwrap();
1121 eprintln!("BUFFER ADDRESSES:");
1122 eprintln!(
1123 " src output buf: {:p}",
1124 src_out.buffer.as_array().as_ptr()
1125 );
1126 eprintln!(
1127 " proc input buf: {:p}",
1128 proc_in.buffer.as_array().as_ptr()
1129 );
1130 eprintln!(
1131 " proc output buf: {:p}",
1132 proc_out.buffer.as_array().as_ptr()
1133 );
1134 eprintln!(" snk input buf: {:p}", snk_in.buffer.as_array().as_ptr());
1135 eprintln!(
1136 " proc_in.upstream_buffer.is_some(): {}",
1137 proc_in.upstream_buffer.is_some()
1138 );
1139 eprintln!(
1140 " snk_in.upstream_buffer.is_some(): {}",
1141 snk_in.upstream_buffer.is_some()
1142 );
1143 out_port.propagate(out_port.buffer(), &ctx).unwrap();
1146
1147 {
1149 let nv = &*nodes.get();
1150 let snk_in = nv[snk_idx].input_port(0).unwrap();
1151 eprintln!(
1152 "AFTER propagate - snk input buf[0] via .buffer: {}",
1153 snk_in.buffer.as_array()[0]
1154 );
1155 if let Some(up) = snk_in.upstream_buffer {
1156 eprintln!(
1157 "AFTER propagate - snk input via upstream ptr: {}",
1158 (*up).as_array()[0]
1159 );
1160 }
1161 }
1162
1163 let sink_buf = nv[snk_idx].input_port(0).unwrap().buffer.as_array();
1164 eprintln!("SINK input port buffer first sample: {}", sink_buf[0]);
1165
1166 let proc_out_port = nv[proc_idx].output_port(0).unwrap();
1168 eprintln!(
1169 "proc output port downstream_nodes: {}",
1170 proc_out_port.downstream_nodes.len()
1171 );
1172 eprintln!(
1173 "proc output port downstream_input_ptrs: {}",
1174 proc_out_port.downstream_input_ptrs.len()
1175 );
1176
1177 let sink_val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1179 eprintln!("sink input AFTER propagate: {sink_val}");
1180
1181 assert!(
1182 (sink_val - 15.0).abs() < 1e-4,
1183 "expected 15.0, got {}",
1184 sink_val
1185 );
1186 }
1187 }
1188}