1use crate::factory::NodeFactory;
2use std::sync::Arc;
3
4use rill_core::buffer::{Buffer, BufferRegistry, FixedBuffer, TapeLoop};
5use rill_core::io::{IoCapture, IoDriver, IoPlayback};
6use rill_core::math::Transcendental;
7use rill_core::queues::CommandEnum;
8use rill_core::time::{ClockTick, RenderContext, SystemClock};
9use rill_core::traits::port::Port;
10use rill_core::traits::processable::Processable;
11use rill_core::traits::{Node, NodeId, NodeVariant, Params, ProcessResult};
12use rill_core_actor::{Actor, ActorRef, ActorSystem};
13use std::cell::UnsafeCell;
14use std::collections::VecDeque;
15use std::rc::Rc;
16use std::sync::atomic::AtomicBool;
17
18#[derive(Debug, Clone)]
28pub enum BuildError {
29 CycleDetected,
31 Backend(String),
33 Registry(String),
35}
36
37impl std::fmt::Display for BuildError {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Self::CycleDetected => write!(f, "graph cycle detected"),
41 Self::Backend(msg) => write!(f, "backend error: {msg}"),
42 Self::Registry(msg) => write!(f, "registry error: {msg}"),
43 }
44 }
45}
46
47struct NodeRecipe<T: Transcendental, const BUF_SIZE: usize> {
57 type_name: String,
58 id: NodeId,
59 params: Params,
60 routing_entries: Vec<(usize, usize, f32)>,
61 _phantom: std::marker::PhantomData<(T, [(); BUF_SIZE])>,
62}
63
64struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
66 node: NodeVariant<T, BUF_SIZE>,
67}
68
69#[derive(Clone)]
75pub struct GraphResource {
76 pub name: String,
78 pub kind: String,
80 pub capacity: usize,
82}
83
84pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
97 recipes: Vec<NodeRecipe<T, BUF_SIZE>>,
98 signal_edges: Vec<(usize, usize, usize, usize)>,
99 control_edges: Vec<(usize, usize, usize, usize)>,
100 clock_edges: Vec<(usize, usize, usize, usize)>,
101 feedback_edges: Vec<(usize, usize, usize, usize)>,
102 resources: Vec<GraphResource>,
103 factory: Arc<NodeFactory<T, BUF_SIZE>>,
105 sample_rate: Option<f32>,
108 parent_ref: Option<ActorRef<CommandEnum>>,
110}
111
112impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
113 pub fn new(factory: Arc<NodeFactory<T, BUF_SIZE>>) -> Self {
115 Self {
116 recipes: Vec::new(),
117 signal_edges: Vec::new(),
118 control_edges: Vec::new(),
119 clock_edges: Vec::new(),
120 feedback_edges: Vec::new(),
121 resources: Vec::new(),
122 factory,
123 sample_rate: None,
124 parent_ref: None,
125 }
126 }
127
128 pub fn add_node(&mut self, type_name: &str, params: &Params) -> usize {
135 let id = NodeId(self.recipes.len() as u32);
136 self.add_node_with_id(type_name, params, id)
137 }
138
139 pub fn add_node_with_id(&mut self, type_name: &str, params: &Params, id: NodeId) -> usize {
145 let idx = self.recipes.len();
146 self.recipes.push(NodeRecipe {
147 type_name: type_name.to_string(),
148 id,
149 params: params.clone(),
150 routing_entries: Vec::new(),
151 _phantom: std::marker::PhantomData,
152 });
153 idx
154 }
155
156 pub fn add_routing_entry(&mut self, idx: usize, from: usize, to: usize, gain: f32) {
158 if let Some(recipe) = self.recipes.get_mut(idx) {
159 recipe.routing_entries.push((from, to, gain));
160 }
161 }
162
163 pub fn add_resource(&mut self, resource: GraphResource) {
165 self.resources.push(resource);
166 }
167
168 pub fn node_count(&self) -> usize {
170 self.recipes.len()
171 }
172
173 pub fn set_sample_rate(&mut self, sr: f32) {
175 self.sample_rate = Some(sr);
176 }
177
178 pub fn set_parent_ref(&mut self, parent: ActorRef<CommandEnum>) {
180 self.parent_ref = Some(parent);
181 }
182
183 pub fn connect_signal(
185 &mut self,
186 from_node: usize,
187 from_port: usize,
188 to_node: usize,
189 to_port: usize,
190 ) {
191 self.signal_edges
192 .push((from_node, from_port, to_node, to_port));
193 }
194
195 pub fn connect_control(
197 &mut self,
198 from_node: usize,
199 from_port: usize,
200 to_node: usize,
201 to_port: usize,
202 ) {
203 self.control_edges
204 .push((from_node, from_port, to_node, to_port));
205 }
206
207 pub fn connect_clock(
209 &mut self,
210 from_node: usize,
211 from_port: usize,
212 to_node: usize,
213 to_port: usize,
214 ) {
215 self.clock_edges
216 .push((from_node, from_port, to_node, to_port));
217 }
218
219 pub fn connect_feedback(
221 &mut self,
222 from_node: usize,
223 from_port: usize,
224 to_node: usize,
225 to_port: usize,
226 ) {
227 self.feedback_edges
228 .push((from_node, from_port, to_node, to_port));
229 }
230
231 pub fn build(self, system: &ActorSystem) -> Result<Graph<T, BUF_SIZE>, BuildError> {
237 let mut node_entries: Vec<NodeEntry<T, BUF_SIZE>> = Vec::with_capacity(self.recipes.len());
239 for recipe in &self.recipes {
240 let node = self
241 .factory
242 .construct(&recipe.type_name, recipe.id, &recipe.params)
243 .map_err(|e| BuildError::Registry(format!("{e}")))?;
244 node_entries.push(NodeEntry { node });
245 }
246
247 for (idx, node) in node_entries.iter_mut().enumerate() {
249 for &(from, to, gain) in &self.recipes[idx].routing_entries {
250 if let NodeVariant::Router(ref mut router) = node.node {
251 router.set_connection(from, to, T::from_f32(gain)).ok();
252 }
253 }
254 }
255
256 let num_nodes = node_entries.len();
257
258 let mut in_degree = vec![0usize; num_nodes];
260 let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
261
262 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
263 in_degree[to_n] += 1;
264 out_edges[from_n].push((from_p, to_n, to_p));
265 }
266
267 let mut queue: VecDeque<usize> = in_degree
269 .iter()
270 .enumerate()
271 .filter(|(_, &d)| d == 0)
272 .map(|(i, _)| i)
273 .collect();
274
275 let mut topo = Vec::with_capacity(num_nodes);
276 let mut indeg = in_degree;
277 while let Some(idx) = queue.pop_front() {
278 topo.push(idx);
279 for &(_, to_n, _) in &out_edges[idx] {
280 indeg[to_n] -= 1;
281 if indeg[to_n] == 0 {
282 queue.push_back(to_n);
283 }
284 }
285 }
286
287 if topo.len() != num_nodes {
288 return Err(BuildError::CycleDetected);
289 }
290
291 let mut nodes: Vec<NodeVariant<T, BUF_SIZE>> =
293 node_entries.into_iter().map(|e| e.node).collect();
294
295 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
297 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
298 port.downstream.push((to_n, to_p));
299 }
300 let in_ptr: *mut Port<T, BUF_SIZE> = nodes[to_n]
301 .input_port_mut(to_p)
302 .map(|p| p as *mut Port<T, BUF_SIZE>)
303 .unwrap_or(std::ptr::null_mut());
304 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
305 let out_ptr: *mut Port<T, BUF_SIZE> = nodes[from_n]
306 .output_port_mut(from_p)
307 .map(|p| p as *mut Port<T, BUF_SIZE>)
308 .unwrap_or(std::ptr::null_mut());
309 if !in_ptr.is_null() && !out_ptr.is_null() {
310 #[allow(unsafe_code)]
311 unsafe {
312 (*in_ptr).parent = parent;
313 (*out_ptr).downstream_input_ptrs.push(in_ptr);
314 }
315 }
316 }
317
318 for &(from_n, from_p, to_n, _) in &self.signal_edges {
320 let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
321 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
322 let ptr_val = parent as usize;
323 let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
324 if !already {
325 port.downstream_nodes.push(parent);
326 }
327 }
328 }
329
330 for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
332 let upstream = nodes[from_n]
333 .output_port(from_p)
334 .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
335 if let Some(port) = nodes[to_n].input_port_mut(to_p) {
336 if port.upstream_buffer.is_none() {
337 port.upstream_buffer = upstream;
338 } else {
339 port.upstream_buffer = None;
340 }
341 }
342 }
343
344 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
346 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
347 port.feedback_buffer = Some(FixedBuffer::new());
348 port.feedback_downstream.push((to_n, to_p));
349 }
350 if let Some(port) = nodes[to_n].input_port_mut(to_p) {
351 port.feedback_buffer = Some(FixedBuffer::new());
352 }
353 }
354 for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
355 let ptr = nodes[to_n]
356 .input_port(to_p)
357 .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
358 .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
359 if let Some(port) = nodes[from_n].output_port_mut(from_p) {
360 if let Some(p) = ptr {
361 port.feedback_ptrs.push(p);
362 }
363 }
364 }
365
366 let mut buffers = BufferRegistry::new();
368 for r in &self.resources {
369 if r.kind == "tape" {
370 if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
371 buffers.register(&r.name, Box::new(tape));
372 }
373 }
374 }
375 for entry in nodes.iter_mut() {
376 entry.resolve_resources(&buffers);
377 }
378
379 let source_idx = topo.first().copied().unwrap_or(0);
380
381 let owned_buffers = buffers.into_inner();
382 let allocated = self.resources.clone();
383
384 let nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>> = Rc::new(UnsafeCell::new(nodes));
386
387 let actor = system.spawn("graph", {
388 let n = nodes.clone();
389 #[allow(unsafe_code)]
390 move |msg: CommandEnum| {
391 if let CommandEnum::SetParameter(param) = msg {
392 let idx = param.port.node_id().inner() as usize;
393 unsafe {
394 let nv = &mut *n.get();
395 if idx < nv.len() {
396 let _ = nv[idx].set_parameter(¶m.parameter, param.value);
397 }
398 }
399 }
400 }
401 });
402
403 let actor_ref = actor.actor_ref();
404
405 Ok(Graph {
406 nodes,
407 topo_order: topo,
408 resources: allocated,
409 current_tick: ClockTick::new(
410 0,
411 BUF_SIZE as u32,
412 self.sample_rate.unwrap_or(44100.0),
413 String::new(),
414 ),
415 buffers: owned_buffers,
416 source_idx,
417 actor: Some(actor),
418 actor_ref,
419 parent_ref: self.parent_ref.clone(),
420 system_clock: None,
421 })
422 }
423}
424
425#[cfg(test)]
431type GraphParts<T, const BUF_SIZE: usize> = (
432 Vec<NodeVariant<T, BUF_SIZE>>,
433 Vec<usize>,
434 ClockTick,
435 Vec<Box<dyn Buffer<T> + Send>>,
436);
437
438pub struct Graph<T: Transcendental, const BUF_SIZE: usize> {
446 nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>>,
447 topo_order: Vec<usize>,
448 source_idx: usize,
449 current_tick: ClockTick,
450 pub(crate) resources: Vec<GraphResource>,
451 #[allow(dead_code)]
452 buffers: Vec<Box<dyn Buffer<T> + Send>>,
453 actor: Option<Actor<CommandEnum>>,
454 actor_ref: ActorRef<CommandEnum>,
455 parent_ref: Option<ActorRef<CommandEnum>>,
456 pub system_clock: Option<Arc<SystemClock>>,
459}
460
461pub struct ProcessingState<T: Transcendental, const BUF_SIZE: usize> {
469 actor: Actor<CommandEnum>,
470 nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>>,
471 source_idx: usize,
472 parent_ref: Option<ActorRef<CommandEnum>>,
473 system_clock: Option<Arc<SystemClock>>,
474 #[allow(dead_code)]
475 buffers: Vec<Box<dyn Buffer<T> + Send>>,
476}
477
478impl<T: Transcendental, const BUF_SIZE: usize> ProcessingState<T, BUF_SIZE> {
479 #[allow(unsafe_code)]
484 pub fn process_block(&mut self, tick: &ClockTick) -> ProcessResult<()> {
485 self.actor.drain();
486 let mut ctx = if let Some(ref clock) = self.system_clock {
487 RenderContext::with_tempo(
488 tick.sample_pos,
489 tick.samples_since_last,
490 tick.sample_rate,
491 clock.bpm() as f32,
492 )
493 } else {
494 RenderContext::new(tick.sample_pos, tick.samples_since_last, tick.sample_rate)
495 };
496 ctx.speed_ratio = tick.speed_ratio;
497 unsafe {
498 let nv = &mut *self.nodes.get();
499 let _ = nv[self.source_idx].process_block(&ctx, tick);
500 for po in 0..nv[self.source_idx].num_signal_outputs() {
501 if let Some(port) = nv[self.source_idx].output_port(po) {
502 let _ = port.propagate(port.buffer(), &ctx, tick);
503 }
504 }
505 }
506 Ok(())
507 }
508
509 pub fn send_clock_tick(&self, tick: &ClockTick) {
515 if tick.is_final {
516 if let Some(ref parent) = self.parent_ref {
517 parent.send(CommandEnum::ClockTick(tick.clone()));
518 }
519 }
520 }
521
522 #[allow(unsafe_code)]
528 pub fn wire_backends(
529 &mut self,
530 capture: Option<Arc<dyn IoCapture>>,
531 playback: Option<Arc<dyn IoPlayback>>,
532 ) {
533 unsafe {
534 let nv = &mut *self.nodes.get();
535 for node in nv.iter_mut() {
536 if let Some(ref c) = capture {
537 if let NodeVariant::Source(src) = node {
538 src.set_capture(c.clone())
539 }
540 }
541 if let Some(ref p) = playback {
542 if let NodeVariant::Sink(sink) = node {
543 sink.set_playback(p.clone())
544 }
545 }
546 }
547 }
548 }
549
550 pub fn run_with_driver(
555 mut self,
556 driver: Arc<dyn IoDriver>,
557 running: Arc<AtomicBool>,
558 ) -> Result<(), String> {
559 self.actor.drain();
560 driver.set_process_callback(Box::new(move |tick: &ClockTick| {
561 let _ = self.process_block(tick);
562 self.send_clock_tick(tick);
563 }));
564 driver.run(running.clone())?;
565 while running.load(std::sync::atomic::Ordering::Acquire) {
566 std::thread::park();
567 }
568 let _ = driver.stop();
569 Ok(())
570 }
571}
572
573impl<T: Transcendental, const BUF_SIZE: usize> Graph<T, BUF_SIZE> {
574 #[allow(unsafe_code)]
580 pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
581 unsafe { &*self.nodes.get() }
582 }
583
584 pub fn current_tick(&self) -> ClockTick {
586 self.current_tick.clone()
587 }
588
589 #[allow(unsafe_code)]
591 pub fn node_count(&self) -> usize {
592 unsafe { (*self.nodes.get()).len() }
593 }
594
595 pub fn topo_order(&self) -> &[usize] {
597 &self.topo_order
598 }
599
600 #[allow(dead_code)]
601 pub(crate) fn sample_rate(&self) -> f32 {
602 self.current_tick.sample_rate
603 }
604
605 #[allow(dead_code)]
607 pub fn resources(&self) -> &[GraphResource] {
608 &self.resources
609 }
610
611 #[allow(unsafe_code)]
623 pub fn process_block(&mut self, tick: &ClockTick) -> ProcessResult<()> {
624 if let Some(ref mut actor) = self.actor {
625 actor.drain();
626 }
627 let ctx = if let Some(ref clock) = self.system_clock {
628 RenderContext::with_tempo(
629 tick.sample_pos,
630 tick.samples_since_last,
631 tick.sample_rate,
632 clock.bpm() as f32,
633 )
634 } else {
635 RenderContext::new(tick.sample_pos, tick.samples_since_last, tick.sample_rate)
636 };
637 self.current_tick = tick.clone();
638 unsafe {
639 let nv = &mut *self.nodes.get();
640 let _ = nv[self.source_idx].process_block(&ctx, tick);
641 for po in 0..nv[self.source_idx].num_signal_outputs() {
642 if let Some(port) = nv[self.source_idx].output_port(po) {
643 let _ = port.propagate(port.buffer(), &ctx, tick);
644 }
645 }
646 }
647 Ok(())
648 }
649
650 pub fn into_processing_state(mut self) -> ProcessingState<T, BUF_SIZE> {
656 let actor = self.actor.take().expect("graph actor missing");
657 ProcessingState {
658 actor,
659 nodes: self.nodes,
660 source_idx: self.source_idx,
661 parent_ref: self.parent_ref,
662 system_clock: self.system_clock,
663 buffers: self.buffers,
664 }
665 }
666
667 pub fn handle(&self) -> ActorRef<CommandEnum> {
669 self.actor_ref.clone()
670 }
671
672 #[cfg(test)]
674 pub fn into_parts(self) -> GraphParts<T, BUF_SIZE> {
675 let Self {
676 nodes,
677 topo_order,
678 current_tick,
679 resources: _,
680 buffers,
681 source_idx: _,
682 actor,
683 actor_ref: _,
684 parent_ref: _,
685 system_clock: _,
686 } = self;
687 drop(actor);
688 let nodes = Rc::try_unwrap(nodes).unwrap().into_inner();
689 (nodes, topo_order, current_tick, buffers)
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use super::*;
696 use rill_core::math::Transcendental;
697 use rill_core::time::RenderContext;
698
699 use rill_core::traits::{
700 Node, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
701 ProcessResult, Processor, Sink, Source,
702 };
703 use rill_core_actor::ActorSystem;
704 use std::sync::Arc;
705
706 fn test_system() -> ActorSystem {
707 ActorSystem::new()
708 }
709
710 fn test_factory<const B: usize>() -> Arc<NodeFactory<f32, B>> {
711 let mut f = NodeFactory::<f32, B>::new();
712
713 f.register_fn("test/const", |id, params| {
714 let value = params.get_f32("value", 1.0);
715 let mut node = ConstantSource::<f32, B>::new(id, value, params.sample_rate);
716 node.init(params.sample_rate);
717 NodeVariant::Source(Box::new(node))
718 });
719
720 f.register_fn("test/gain", |id, params| {
721 let gain = params.get_f32("gain", 1.0);
722 let mut node = GainProcessor::<f32, B>::new(id, params.sample_rate, gain);
723 node.init(params.sample_rate);
724 NodeVariant::Processor(Box::new(node))
725 });
726
727 f.register_fn("test/capture", |id, params| {
728 let mut node = CaptureSink::<f32, B>::new(id, params.sample_rate);
729 node.init(params.sample_rate);
730 NodeVariant::Sink(Box::new(node))
731 });
732
733 Arc::new(f)
734 }
735
736 fn test_builder<const B: usize>(factory: &Arc<NodeFactory<f32, B>>) -> GraphBuilder<f32, B> {
737 GraphBuilder::new(factory.clone())
738 }
739
740 fn test_params(sample_rate: f32) -> Params {
741 let mut p = Params::new(sample_rate);
742 p.insert("value".to_string(), ParamValue::Float(sample_rate));
743 p
744 }
745
746 pub(crate) struct ConstantSource<T: Transcendental, const B: usize> {
751 id: NodeId,
752 value: T,
753 state: NodeState<T, B>,
754 output: Port<T, B>,
755 }
756
757 impl<T: Transcendental, const B: usize> ConstantSource<T, B> {
758 pub fn new(id: NodeId, value: T, sample_rate: f32) -> Self {
759 let state = NodeState::new(sample_rate);
760 let mut output = Port::output(id, 0, "out");
761 output.buffer = FixedBuffer::new();
762 Self {
763 id,
764 value,
765 state,
766 output,
767 }
768 }
769 }
770
771 impl<T: Transcendental, const B: usize> Node<T, B> for ConstantSource<T, B> {
772 fn id(&self) -> NodeId {
773 self.id
774 }
775 fn set_id(&mut self, id: NodeId) {
776 self.id = id;
777 }
778 fn metadata(&self) -> NodeMetadata {
779 NodeMetadata {
780 name: "ConstantSource".into(),
781 type_name: Some("test/const".into()),
782 category: NodeCategory::Source,
783 description: String::new(),
784 author: String::new(),
785 version: String::new(),
786 parameters: vec![],
787 signal_inputs: 0,
788 signal_outputs: 1,
789 control_inputs: 0,
790 control_outputs: 0,
791 clock_inputs: 0,
792 clock_outputs: 0,
793 feedback_ports: 0,
794 }
795 }
796 fn init(&mut self, _: f32) {}
797 fn reset(&mut self) {}
798 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
799 None
800 }
801 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
802 Ok(())
803 }
804 fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
805 None
806 }
807 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
808 None
809 }
810 fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
811 if i == 0 {
812 Some(&self.output)
813 } else {
814 None
815 }
816 }
817 fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
818 if i == 0 {
819 Some(&mut self.output)
820 } else {
821 None
822 }
823 }
824 fn input_port(&self, _: usize) -> Option<&Port<T, B>> {
825 None
826 }
827 fn input_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
828 None
829 }
830 fn state(&self) -> &NodeState<T, B> {
831 &self.state
832 }
833 fn state_mut(&mut self) -> &mut NodeState<T, B> {
834 &mut self.state
835 }
836 }
837
838 impl<T: Transcendental, const B: usize> Source<T, B> for ConstantSource<T, B> {
839 fn generate(
840 &mut self,
841 _: &RenderContext,
842 _: &[T],
843 _: &[RenderContext],
844 _: &ClockTick,
845 ) -> ProcessResult<()> {
846 self.output.buffer.as_mut_array().fill(self.value);
847 Ok(())
848 }
849 }
850
851 pub(crate) struct GainProcessor<T: Transcendental, const B: usize> {
856 id: NodeId,
857 gain: T,
858 state: NodeState<T, B>,
859 input: Port<T, B>,
860 output: Port<T, B>,
861 }
862
863 impl<T: Transcendental, const B: usize> GainProcessor<T, B> {
864 pub fn new(id: NodeId, sample_rate: f32, gain: T) -> Self {
865 let state = NodeState::new(sample_rate);
866 let input = Port::input(id, 0, "in");
867 let output = Port::output(id, 0, "out");
868 Self {
869 id,
870 gain,
871 state,
872 input,
873 output,
874 }
875 }
876 }
877
878 impl<T: Transcendental, const B: usize> Node<T, B> for GainProcessor<T, B> {
879 fn id(&self) -> NodeId {
880 self.id
881 }
882 fn set_id(&mut self, id: NodeId) {
883 self.id = id;
884 }
885 fn metadata(&self) -> NodeMetadata {
886 NodeMetadata {
887 name: "GainProcessor".into(),
888 type_name: Some("test/gain".into()),
889 category: NodeCategory::Processor,
890 description: String::new(),
891 author: String::new(),
892 version: String::new(),
893 parameters: vec![],
894 signal_inputs: 1,
895 signal_outputs: 1,
896 control_inputs: 0,
897 control_outputs: 0,
898 clock_inputs: 0,
899 clock_outputs: 0,
900 feedback_ports: 0,
901 }
902 }
903 fn init(&mut self, _: f32) {}
904 fn reset(&mut self) {}
905 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
906 None
907 }
908 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
909 Ok(())
910 }
911 fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
912 None
913 }
914 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
915 None
916 }
917 fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
918 if i == 0 {
919 Some(&self.input)
920 } else {
921 None
922 }
923 }
924 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
925 if i == 0 {
926 Some(&mut self.input)
927 } else {
928 None
929 }
930 }
931 fn num_signal_outputs(&self) -> usize {
932 1
933 }
934 fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
935 if i == 0 {
936 Some(&self.output)
937 } else {
938 None
939 }
940 }
941 fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
942 if i == 0 {
943 Some(&mut self.output)
944 } else {
945 None
946 }
947 }
948 fn state(&self) -> &NodeState<T, B> {
949 &self.state
950 }
951 fn state_mut(&mut self) -> &mut NodeState<T, B> {
952 &mut self.state
953 }
954 }
955
956 impl<T: Transcendental, const B: usize> Processor<T, B> for GainProcessor<T, B> {
957 fn process(
958 &mut self,
959 _: &RenderContext,
960 _: &[&[T; B]],
961 _: &[T],
962 _: &[RenderContext],
963 _: &[&[T; B]],
964 ) -> ProcessResult<()> {
965 let src = self.input.buffer.as_array();
966 let buf = self.output.buffer.as_mut_array();
967 for i in 0..B {
968 buf[i] = src[i] * self.gain;
969 }
970 Ok(())
971 }
972 }
973
974 pub(crate) struct CaptureSink<T: Transcendental, const B: usize> {
979 id: NodeId,
980 state: NodeState<T, B>,
981 input: Port<T, B>,
982 }
983
984 impl<T: Transcendental, const B: usize> CaptureSink<T, B> {
985 pub fn new(id: NodeId, sample_rate: f32) -> Self {
986 let state = NodeState::new(sample_rate);
987 let input = Port::input(id, 0, "in");
988 Self { id, state, input }
989 }
990 }
991
992 impl<T: Transcendental, const B: usize> Node<T, B> for CaptureSink<T, B> {
993 fn id(&self) -> NodeId {
994 self.id
995 }
996 fn set_id(&mut self, id: NodeId) {
997 self.id = id;
998 }
999 fn metadata(&self) -> NodeMetadata {
1000 NodeMetadata {
1001 name: "CaptureSink".into(),
1002 type_name: Some("test/capture".into()),
1003 category: NodeCategory::Sink,
1004 description: String::new(),
1005 author: String::new(),
1006 version: String::new(),
1007 parameters: vec![],
1008 signal_inputs: 1,
1009 signal_outputs: 0,
1010 control_inputs: 0,
1011 control_outputs: 0,
1012 clock_inputs: 0,
1013 clock_outputs: 0,
1014 feedback_ports: 0,
1015 }
1016 }
1017 fn init(&mut self, _: f32) {}
1018 fn reset(&mut self) {}
1019 fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
1020 None
1021 }
1022 fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
1023 Ok(())
1024 }
1025 fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
1026 None
1027 }
1028 fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
1029 None
1030 }
1031 fn output_port(&self, _: usize) -> Option<&Port<T, B>> {
1032 None
1033 }
1034 fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
1035 None
1036 }
1037 fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
1038 if i == 0 {
1039 Some(&self.input)
1040 } else {
1041 None
1042 }
1043 }
1044 fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
1045 if i == 0 {
1046 Some(&mut self.input)
1047 } else {
1048 None
1049 }
1050 }
1051 fn state(&self) -> &NodeState<T, B> {
1052 &self.state
1053 }
1054 fn state_mut(&mut self) -> &mut NodeState<T, B> {
1055 &mut self.state
1056 }
1057 }
1058
1059 impl<T: Transcendental, const B: usize> Sink<T, B> for CaptureSink<T, B> {
1060 fn consume(
1061 &mut self,
1062 _: &RenderContext,
1063 _: &[&[T; B]],
1064 _: &[T],
1065 _: &[RenderContext],
1066 _: &[&[T; B]],
1067 _: &ClockTick,
1068 ) -> ProcessResult<()> {
1069 Ok(())
1070 }
1071 }
1072
1073 const BUF: usize = 64;
1078
1079 #[test]
1080 #[allow(unsafe_code)]
1081 fn test_graph_source_to_sink() {
1082 let factory = test_factory::<BUF>();
1083 let mut builder = test_builder::<BUF>(&factory);
1084 let system = test_system();
1085
1086 let src_idx = builder.add_node("test/const", &test_params(44100.0));
1087 let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1088 builder.connect_signal(src_idx, 0, snk_idx, 0);
1089
1090 let graph = builder.build(&system).unwrap();
1091 let source_idx = graph.source_idx;
1092
1093 let ctx = RenderContext::new(0, BUF as u32, 44100.0);
1094 let tick = ClockTick::new(0, BUF as u32, 44100.0, String::new());
1095 let nodes = graph.nodes.clone();
1096 unsafe {
1097 let nv = &mut *nodes.get();
1098 nv[source_idx].process_block(&ctx, &tick).unwrap();
1099 if let Some(port) = nv[source_idx].output_port(0) {
1100 port.propagate(port.buffer(), &ctx, &tick).unwrap();
1101 }
1102 }
1103 unsafe {
1104 let nv = &*nodes.get();
1105 let val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1106 assert!(val != 0.0, "signal should have propagated, got {}", val);
1107 }
1108 }
1109
1110 #[test]
1111 #[allow(unsafe_code)]
1112 fn test_graph_source_proc_sink() {
1113 let factory = test_factory::<BUF>();
1114 let mut builder = test_builder::<BUF>(&factory);
1115 let system = test_system();
1116
1117 let mut params = test_params(44100.0);
1118 params.insert("value".to_string(), ParamValue::Float(5.0));
1119 let src_idx = builder.add_node("test/const", ¶ms);
1120
1121 let mut gain_params = test_params(44100.0);
1122 gain_params.insert("gain".to_string(), ParamValue::Float(3.0));
1123 let proc_idx = builder.add_node("test/gain", &gain_params);
1124
1125 let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1126
1127 builder.connect_signal(src_idx, 0, proc_idx, 0);
1128 builder.connect_signal(proc_idx, 0, snk_idx, 0);
1129
1130 let graph = builder.build(&system).unwrap();
1131 let source_idx = graph.source_idx;
1132
1133 eprintln!("topo: {:?}", graph.topo_order);
1134 eprintln!("source_idx: {source_idx}, src_idx: {src_idx}, proc_idx: {proc_idx}, snk_idx: {snk_idx}");
1135
1136 let ctx = RenderContext::new(0, BUF as u32, 44100.0);
1137 let tick = ClockTick::new(0, BUF as u32, 44100.0, String::new());
1138 let nodes = graph.nodes.clone();
1139 unsafe {
1140 let nv = &mut *nodes.get();
1141 eprintln!(
1142 "node types: src={:?}, proc={:?}, snk={:?}",
1143 std::mem::discriminant(&nv[0]),
1144 std::mem::discriminant(&nv[1]),
1145 std::mem::discriminant(&nv[2]),
1146 );
1147
1148 let _ = nv[source_idx].process_block(&ctx, &tick);
1149 let src_val = nv[source_idx].output_port(0).unwrap().buffer.as_array()[0];
1150 eprintln!("source output: {src_val}");
1151
1152 let out_port = nv[source_idx].output_port(0).unwrap();
1153 eprintln!(
1154 "source output port downstream_nodes: {}",
1155 out_port.downstream_nodes.len()
1156 );
1157 eprintln!(
1158 "source output port downstream_input_ptrs: {}",
1159 out_port.downstream_input_ptrs.len()
1160 );
1161
1162 {
1164 let proc_port = nv[proc_idx].output_port(0).unwrap();
1165 eprintln!(
1166 "PROC OUT port downstream_nodes: {}",
1167 proc_port.downstream_nodes.len()
1168 );
1169 eprintln!(
1170 "PROC OUT port downstream_input_ptrs: {}",
1171 proc_port.downstream_input_ptrs.len()
1172 );
1173 for (i, &dn) in proc_port.downstream.iter().enumerate() {
1174 eprintln!(" downstream[{}]: (node={}, port={})", i, dn.0, dn.1);
1175 }
1176 }
1177
1178 let src_out = nv[source_idx].output_port(0).unwrap();
1180 let proc_in = nv[proc_idx].input_port(0).unwrap();
1181 let proc_out = nv[proc_idx].output_port(0).unwrap();
1182 let snk_in = nv[snk_idx].input_port(0).unwrap();
1183 eprintln!("BUFFER ADDRESSES:");
1184 eprintln!(
1185 " src output buf: {:p}",
1186 src_out.buffer.as_array().as_ptr()
1187 );
1188 eprintln!(
1189 " proc input buf: {:p}",
1190 proc_in.buffer.as_array().as_ptr()
1191 );
1192 eprintln!(
1193 " proc output buf: {:p}",
1194 proc_out.buffer.as_array().as_ptr()
1195 );
1196 eprintln!(" snk input buf: {:p}", snk_in.buffer.as_array().as_ptr());
1197 eprintln!(
1198 " proc_in.upstream_buffer.is_some(): {}",
1199 proc_in.upstream_buffer.is_some()
1200 );
1201 eprintln!(
1202 " snk_in.upstream_buffer.is_some(): {}",
1203 snk_in.upstream_buffer.is_some()
1204 );
1205 out_port.propagate(out_port.buffer(), &ctx, &tick).unwrap();
1208
1209 {
1211 let nv = &*nodes.get();
1212 let snk_in = nv[snk_idx].input_port(0).unwrap();
1213 eprintln!(
1214 "AFTER propagate - snk input buf[0] via .buffer: {}",
1215 snk_in.buffer.as_array()[0]
1216 );
1217 if let Some(up) = snk_in.upstream_buffer {
1218 eprintln!(
1219 "AFTER propagate - snk input via upstream ptr: {}",
1220 (*up).as_array()[0]
1221 );
1222 }
1223 }
1224
1225 let sink_buf = nv[snk_idx].input_port(0).unwrap().buffer.as_array();
1226 eprintln!("SINK input port buffer first sample: {}", sink_buf[0]);
1227
1228 let proc_out_port = nv[proc_idx].output_port(0).unwrap();
1230 eprintln!(
1231 "proc output port downstream_nodes: {}",
1232 proc_out_port.downstream_nodes.len()
1233 );
1234 eprintln!(
1235 "proc output port downstream_input_ptrs: {}",
1236 proc_out_port.downstream_input_ptrs.len()
1237 );
1238
1239 let sink_val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1241 eprintln!("sink input AFTER propagate: {sink_val}");
1242
1243 assert!(
1244 (sink_val - 15.0).abs() < 1e-4,
1245 "expected 15.0, got {}",
1246 sink_val
1247 );
1248 }
1249 }
1250}