Skip to main content

rill_graph/
graph.rs

1use crate::backend_factory::BackendFactory;
2use crate::factory::NodeFactory;
3use rill_core::buffer::{Buffer, BufferRegistry, FixedBuffer, TapeLoop};
4use rill_core::math::Transcendental;
5use rill_core::queues::CommandEnum;
6use rill_core::time::ClockTick;
7use rill_core::traits::algorithm::ActionContext;
8use rill_core::traits::port::Port;
9use rill_core::traits::processable::{ProcessContext, Processable};
10use rill_core::traits::ParamValue;
11use rill_core::traits::{Node, NodeId, NodeVariant, Params};
12use rill_core_actor::{Actor, ActorRef, ActorSystem};
13use std::cell::UnsafeCell;
14use std::collections::{HashMap, VecDeque};
15use std::rc::Rc;
16use std::sync::atomic::AtomicBool;
17use std::sync::Arc;
18
19// ============================================================================
20// Internal routing metadata
21// ============================================================================
22
23// ============================================================================
24// Build Errors
25// ============================================================================
26
27/// Errors that can occur during graph construction.
28#[derive(Debug, Clone)]
29pub enum BuildError {
30    /// A cycle was detected in the signal edge graph.
31    CycleDetected,
32    /// Backend creation failed.
33    Backend(String),
34    /// Factory registration error (unknown node type).
35    Registry(String),
36}
37
38impl std::fmt::Display for BuildError {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            Self::CycleDetected => write!(f, "graph cycle detected"),
42            Self::Backend(msg) => write!(f, "backend error: {msg}"),
43            Self::Registry(msg) => write!(f, "registry error: {msg}"),
44        }
45    }
46}
47
48// ============================================================================
49// Graph Builder
50// ============================================================================
51
52// ============================================================================
53// Node Storage
54// ============================================================================
55
56/// A deferred node recipe — constructed at [`build`](GraphBuilder::build) time.
57struct NodeRecipe<T: Transcendental, const BUF_SIZE: usize> {
58    type_name: String,
59    id: NodeId,
60    params: Params,
61    backend: Option<String>,
62    routing_entries: Vec<(usize, usize, f32)>,
63    _phantom: std::marker::PhantomData<(T, [(); BUF_SIZE])>,
64}
65
66/// Temporary holder during build — wraps a constructed node for wiring.
67struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
68    node: NodeVariant<T, BUF_SIZE>,
69}
70
71// ============================================================================
72// GraphBuilder (Mutable Construction)
73// ============================================================================
74
75/// A named resource (tape loop) shared between nodes in the graph.
76#[derive(Clone)]
77pub struct GraphResource {
78    /// Unique name referenced by node parameters.
79    pub name: String,
80    /// Resource kind string (`"tape"`).
81    pub kind: String,
82    /// Capacity in samples (for `"tape"` kind).
83    pub capacity: usize,
84}
85
86/// Mutable builder for an immutable signal graph.
87///
88/// Stores deferred node recipes until [`build`](Self::build), which
89/// constructs all nodes, wires connections, and performs topological
90/// sort.  This keeps `GraphBuilder` `Send` — all non‑`Send` data
91/// is constructed inside the target thread.
92///
93/// # Node factory
94///
95/// The builder holds an [`Arc<NodeFactory>`] for constructing nodes by
96/// type name. Nodes registered via [`add_node_with_id`](Self::add_node_with_id)
97/// are only validated and constructed at [`build`](Self::build) time.
98pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
99    recipes: Vec<NodeRecipe<T, BUF_SIZE>>,
100    signal_edges: Vec<(usize, usize, usize, usize)>,
101    control_edges: Vec<(usize, usize, usize, usize)>,
102    clock_edges: Vec<(usize, usize, usize, usize)>,
103    feedback_edges: Vec<(usize, usize, usize, usize)>,
104    resources: Vec<GraphResource>,
105    /// Shared node factory (required, from Runtime).
106    factory: Arc<NodeFactory<T, BUF_SIZE>>,
107    /// Shared backend factory (required, from Runtime).
108    backend_factory: Arc<BackendFactory<T>>,
109    /// Default backend name for nodes that don't specify one explicitly.
110    default_backend: Option<String>,
111    /// Default backend parameters (sample_rate, buffer_size, channels).
112    backend_params: HashMap<String, ParamValue>,
113    /// Sample rate override. When set, used in [`build`](Self::build).
114    /// Populated from [`GraphDef::sample_rate`] during deserialization.
115    sample_rate: Option<f32>,
116    /// Parent RackCase ActorRef — Graph sends ClockTick here.
117    parent_ref: Option<ActorRef<CommandEnum>>,
118}
119
120impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
121    /// Create a new empty graph builder without a node factory.
122    pub fn new(
123        factory: Arc<NodeFactory<T, BUF_SIZE>>,
124        backend_factory: Arc<BackendFactory<T>>,
125    ) -> Self {
126        Self {
127            recipes: Vec::new(),
128            signal_edges: Vec::new(),
129            control_edges: Vec::new(),
130            clock_edges: Vec::new(),
131            feedback_edges: Vec::new(),
132            resources: Vec::new(),
133            factory,
134            backend_factory,
135            default_backend: None,
136            backend_params: HashMap::new(),
137            sample_rate: None,
138            parent_ref: None,
139        }
140    }
141
142    /// Add a node by type name using the internal factory.
143    ///
144    /// The type must be registered in the factory before [`build`](Self::build)
145    /// is called.
146    ///
147    /// Returns the index of the newly added node.
148    pub fn add_node(&mut self, type_name: &str, params: &Params) -> usize {
149        let id = NodeId(self.recipes.len() as u32);
150        self.add_node_with_id(type_name, params, id)
151    }
152
153    /// Add a node with an explicit [`NodeId`].
154    ///
155    /// Like [`add_node`](Self::add_node) but uses the provided `id`.
156    /// Important for serialization where external references depend on
157    /// exact IDs.
158    pub fn add_node_with_id(&mut self, type_name: &str, params: &Params, id: NodeId) -> usize {
159        let idx = self.recipes.len();
160        self.recipes.push(NodeRecipe {
161            type_name: type_name.to_string(),
162            id,
163            params: params.clone(),
164            backend: None,
165            routing_entries: Vec::new(),
166            _phantom: std::marker::PhantomData,
167        });
168        idx
169    }
170
171    /// Assign a named backend to the node at the given index.
172    pub fn set_node_backend(&mut self, idx: usize, name: String) {
173        if let Some(recipe) = self.recipes.get_mut(idx) {
174            recipe.backend = Some(name);
175        }
176    }
177
178    /// Store a routing matrix entry to be applied at build time.
179    pub fn add_routing_entry(&mut self, idx: usize, from: usize, to: usize, gain: f32) {
180        if let Some(recipe) = self.recipes.get_mut(idx) {
181            recipe.routing_entries.push((from, to, gain));
182        }
183    }
184
185    /// Register a named resource (tape loop, buffer, etc.).
186    pub fn add_resource(&mut self, resource: GraphResource) {
187        self.resources.push(resource);
188    }
189
190    /// Number of nodes added to the builder so far.
191    pub fn node_count(&self) -> usize {
192        self.recipes.len()
193    }
194
195    /// Set the default backend name and parameters. Nodes without an explicit
196    pub fn set_default_backend(&mut self, name: String, params: HashMap<String, ParamValue>) {
197        self.default_backend = Some(name);
198        self.backend_params = params;
199    }
200
201    /// Get the default backend name, if set.
202    pub fn default_backend_name(&self) -> Option<&String> {
203        self.default_backend.as_ref()
204    }
205
206    /// Set the sample rate for this builder.
207    pub fn set_sample_rate(&mut self, sr: f32) {
208        self.sample_rate = Some(sr);
209    }
210
211    /// Set the parent RackCase actor reference (Graph → parent ClockTick).
212    pub fn set_parent_ref(&mut self, parent: ActorRef<CommandEnum>) {
213        self.parent_ref = Some(parent);
214    }
215
216    /// Access the shared backend factory.
217    pub fn backend_factory(&self) -> &Arc<BackendFactory<T>> {
218        &self.backend_factory
219    }
220
221    /// Connect signal ports.
222    pub fn connect_signal(
223        &mut self,
224        from_node: usize,
225        from_port: usize,
226        to_node: usize,
227        to_port: usize,
228    ) {
229        self.signal_edges
230            .push((from_node, from_port, to_node, to_port));
231    }
232
233    /// Connect control ports (modulation values).
234    pub fn connect_control(
235        &mut self,
236        from_node: usize,
237        from_port: usize,
238        to_node: usize,
239        to_port: usize,
240    ) {
241        self.control_edges
242            .push((from_node, from_port, to_node, to_port));
243    }
244
245    /// Connect clock ports (timing events).
246    pub fn connect_clock(
247        &mut self,
248        from_node: usize,
249        from_port: usize,
250        to_node: usize,
251        to_port: usize,
252    ) {
253        self.clock_edges
254            .push((from_node, from_port, to_node, to_port));
255    }
256
257    /// Connect feedback ports (delay lines, state carryover).
258    pub fn connect_feedback(
259        &mut self,
260        from_node: usize,
261        from_port: usize,
262        to_node: usize,
263        to_port: usize,
264    ) {
265        self.feedback_edges
266            .push((from_node, from_port, to_node, to_port));
267    }
268
269    /// Build the graph.
270    ///
271    /// Creates backends for nodes that have a backend name set (via
272    /// `SourceDef::backend` / `SinkDef::backend` or the builder's default).  Finds the active
273    /// (driver) node and stores its index for [`Graph::run`].
274    pub fn build(self, system: &ActorSystem) -> Result<Graph<T, BUF_SIZE>, BuildError> {
275        // Phase 1: Construct all nodes from recipes
276        let mut node_entries: Vec<NodeEntry<T, BUF_SIZE>> = Vec::with_capacity(self.recipes.len());
277        for recipe in &self.recipes {
278            let node = self
279                .factory
280                .construct(&recipe.type_name, recipe.id, &recipe.params)
281                .map_err(|e| BuildError::Registry(format!("{e}")))?;
282            node_entries.push(NodeEntry { node });
283        }
284
285        // Apply pre-configured routing entries
286        for (idx, node) in node_entries.iter_mut().enumerate() {
287            for &(from, to, gain) in &self.recipes[idx].routing_entries {
288                if let NodeVariant::Router(ref mut router) = node.node {
289                    router.set_connection(from, to, T::from_f32(gain)).ok();
290                }
291            }
292        }
293
294        let num_nodes = node_entries.len();
295
296        // --- Phase 2: Resolve I/O backends for I/O nodes ---
297        let sr = self.sample_rate.unwrap_or(44100.0);
298        for (idx, recipe) in self.recipes.iter().enumerate() {
299            let name = match recipe.backend.as_ref() {
300                Some(n) => Some(n.clone()),
301                None => self.default_backend.clone(),
302            };
303            if let Some(ref name) = name {
304                let mut be_params = HashMap::new();
305                be_params.insert("sample_rate".into(), ParamValue::Float(sr));
306                be_params.insert("buffer_size".into(), ParamValue::Int(BUF_SIZE as i32));
307                if self.default_backend.as_ref() == Some(name) {
308                    for (k, v) in &self.backend_params {
309                        be_params.entry(k.clone()).or_insert_with(|| v.clone());
310                    }
311                }
312                let backend = self
313                    .backend_factory
314                    .create(name, &be_params)
315                    .map_err(BuildError::Backend)?;
316                if let Some(io_node) = node_entries[idx].node.as_io_node_mut() {
317                    io_node.resolve_backend(backend);
318                }
319            }
320        }
321
322        // --- Phase 3: adjacency for Kahn (signal edges only) ---
323        let mut in_degree = vec![0usize; num_nodes];
324        let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
325
326        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
327            in_degree[to_n] += 1;
328            out_edges[from_n].push((from_p, to_n, to_p));
329        }
330
331        // --- Kahn's algorithm ---
332        let mut queue: VecDeque<usize> = in_degree
333            .iter()
334            .enumerate()
335            .filter(|(_, &d)| d == 0)
336            .map(|(i, _)| i)
337            .collect();
338
339        let mut topo = Vec::with_capacity(num_nodes);
340        let mut indeg = in_degree;
341        while let Some(idx) = queue.pop_front() {
342            topo.push(idx);
343            for &(_, to_n, _) in &out_edges[idx] {
344                indeg[to_n] -= 1;
345                if indeg[to_n] == 0 {
346                    queue.push_back(to_n);
347                }
348            }
349        }
350
351        if topo.len() != num_nodes {
352            return Err(BuildError::CycleDetected);
353        }
354
355        // --- collect nodes into final Vec ---
356        let mut nodes: Vec<NodeVariant<T, BUF_SIZE>> =
357            node_entries.into_iter().map(|e| e.node).collect();
358
359        // --- Phase 4: port pointer wiring on the final nodes Vec ---
360        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
361            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
362                port.downstream.push((to_n, to_p));
363            }
364            let in_ptr: *mut Port<T, BUF_SIZE> = nodes[to_n]
365                .input_port_mut(to_p)
366                .map(|p| p as *mut Port<T, BUF_SIZE>)
367                .unwrap_or(std::ptr::null_mut());
368            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
369            let out_ptr: *mut Port<T, BUF_SIZE> = nodes[from_n]
370                .output_port_mut(from_p)
371                .map(|p| p as *mut Port<T, BUF_SIZE>)
372                .unwrap_or(std::ptr::null_mut());
373            if !in_ptr.is_null() && !out_ptr.is_null() {
374                #[allow(unsafe_code)]
375                unsafe {
376                    (*in_ptr).parent = parent;
377                    (*out_ptr).downstream_input_ptrs.push(in_ptr);
378                }
379            }
380        }
381
382        // --- downstream_nodes ---
383        for &(from_n, from_p, to_n, _) in &self.signal_edges {
384            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
385            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
386                let ptr_val = parent as usize;
387                let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
388                if !already {
389                    port.downstream_nodes.push(parent);
390                }
391            }
392        }
393
394        // --- upstream_buffer ---
395        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
396            let upstream = nodes[from_n]
397                .output_port(from_p)
398                .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
399            if let Some(port) = nodes[to_n].input_port_mut(to_p) {
400                if port.upstream_buffer.is_none() {
401                    port.upstream_buffer = upstream;
402                } else {
403                    port.upstream_buffer = None;
404                }
405            }
406        }
407
408        // --- feedback buffers ---
409        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
410            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
411                port.feedback_buffer = Some(FixedBuffer::new());
412                port.feedback_downstream.push((to_n, to_p));
413            }
414            if let Some(port) = nodes[to_n].input_port_mut(to_p) {
415                port.feedback_buffer = Some(FixedBuffer::new());
416            }
417        }
418        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
419            let ptr = nodes[to_n]
420                .input_port(to_p)
421                .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
422                .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
423            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
424                if let Some(p) = ptr {
425                    port.feedback_ptrs.push(p);
426                }
427            }
428        }
429
430        // Allocate named buffers
431        let mut buffers = BufferRegistry::new();
432        for r in &self.resources {
433            if r.kind == "tape" {
434                if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
435                    buffers.register(&r.name, Box::new(tape));
436                }
437            }
438        }
439        for entry in nodes.iter_mut() {
440            entry.resolve_resources(&buffers);
441        }
442
443        let source_idx = topo.first().copied().unwrap_or(0);
444        let mut active_node_idx = 0;
445        for (i, n) in nodes.iter_mut().enumerate() {
446            if n.as_active_node_mut().is_some() {
447                active_node_idx = i;
448                break;
449            }
450        }
451
452        let owned_buffers = buffers.into_inner();
453        let allocated = self.resources.clone();
454
455        // Wrap nodes in Rc<UnsafeCell<Vec<>>> — port pointers already valid in this Vec.
456        let nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>> = Rc::new(UnsafeCell::new(nodes));
457
458        let actor = system.spawn("graph", {
459            let n = nodes.clone();
460            #[allow(unsafe_code)]
461            move |msg: CommandEnum| {
462                if let CommandEnum::SetParameter(param) = msg {
463                    let idx = param.port.node_id().inner() as usize;
464                    unsafe {
465                        let nv = &mut *n.get();
466                        if idx < nv.len() {
467                            let _ = nv[idx].set_parameter(&param.parameter, param.value);
468                        }
469                    }
470                }
471            }
472        });
473
474        let actor_ref = actor.actor_ref();
475
476        Ok(Graph {
477            nodes,
478            topo_order: topo,
479            resources: allocated,
480            current_tick: ClockTick::new(0, BUF_SIZE as u32, sr),
481            buffers: owned_buffers,
482            source_idx,
483            active_node_idx,
484            actor: Some(actor),
485            actor_ref,
486            parent_ref: self.parent_ref.clone(),
487        })
488    }
489}
490
491// ============================================================================
492// Graph (Static DAG)
493// ============================================================================
494
495/// Immutable signal graph with static DAG topology.
496///
497/// Once built the graph cannot be modified. The graph owns no processing
498/// logic — it is a pure topology description. Processing is driven by
499/// port-level methods (`pre_process`, `snapshot_feedback`, `propagate`)
500/// called from external code (e.g. a real-time signal callback or an
501/// offline renderer).
502pub struct Graph<T: Transcendental, const BUF_SIZE: usize> {
503    nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>>,
504    topo_order: Vec<usize>,
505    source_idx: usize,
506    active_node_idx: usize,
507    current_tick: ClockTick,
508    pub(crate) resources: Vec<GraphResource>,
509    #[allow(dead_code)]
510    buffers: Vec<Box<dyn Buffer<T> + Send>>,
511    actor: Option<Actor<CommandEnum>>,
512    actor_ref: ActorRef<CommandEnum>,
513    parent_ref: Option<ActorRef<CommandEnum>>,
514}
515
516impl<T: Transcendental, const BUF_SIZE: usize> Graph<T, BUF_SIZE> {
517    // ========================================================================
518    // Accessors
519    // ========================================================================
520
521    /// Borrow the node array (read-only).
522    #[allow(unsafe_code)]
523    pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
524        unsafe { &*self.nodes.get() }
525    }
526
527    /// Return the current clock tick.
528    pub fn current_tick(&self) -> ClockTick {
529        self.current_tick
530    }
531
532    /// Return the number of nodes in the graph.
533    #[allow(unsafe_code)]
534    pub fn node_count(&self) -> usize {
535        unsafe { (*self.nodes.get()).len() }
536    }
537
538    /// Return the topological ordering of node indices.
539    pub fn topo_order(&self) -> &[usize] {
540        &self.topo_order
541    }
542
543    #[allow(dead_code)]
544    pub(crate) fn sample_rate(&self) -> f32 {
545        self.current_tick.sample_rate
546    }
547
548    /// Access the named resources (tape loops, etc.) allocated for this graph.
549    #[allow(dead_code)]
550    pub fn resources(&self) -> &[GraphResource] {
551        &self.resources
552    }
553
554    /// Run graph processing through the active node.
555    #[allow(unsafe_code)]
556    pub fn run(&mut self, running: Arc<AtomicBool>) -> Result<(), String> {
557        let mut actor = self.actor.take().ok_or("graph already running")?;
558        let source = self.source_idx;
559        let parent = self.parent_ref.clone();
560        let nodes = self.nodes.clone();
561        let idx = self.active_node_idx;
562
563        let tick: Box<dyn FnMut(u64, f32)> = Box::new(move |sample_pos, sample_rate| {
564            actor.drain();
565            let tick = ClockTick::new(sample_pos, BUF_SIZE as u32, sample_rate);
566            let mut ctx = ProcessContext { clock: &tick };
567            unsafe {
568                let nv = &mut *nodes.get();
569                let _ = nv[source].process_block(&mut ctx);
570                let action_ctx = ActionContext::new(&tick);
571                for po in 0..nv[source].num_signal_outputs() {
572                    if let Some(port) = nv[source].output_port(po) {
573                        let _ = port.propagate(port.buffer(), &action_ctx);
574                    }
575                }
576            }
577            if let Some(ref parent) = parent {
578                parent.send(CommandEnum::ClockTick(tick));
579            }
580        });
581
582        unsafe {
583            self.nodes.get().as_mut().unwrap()[idx]
584                .as_active_node_mut()
585                .ok_or("no active node")?
586                .run(tick, running)
587        }
588    }
589
590    /// Obtain an [`ActorRef`] for sending commands to this graph.
591    pub fn handle(&self) -> ActorRef<CommandEnum> {
592        self.actor_ref.clone()
593    }
594
595    /// Consume the graph and return its owned parts (test only).
596    #[cfg(test)]
597    pub fn into_parts(
598        self,
599    ) -> (
600        Vec<NodeVariant<T, BUF_SIZE>>,
601        Vec<usize>,
602        ClockTick,
603        Vec<Box<dyn Buffer<T> + Send>>,
604    ) {
605        let Self {
606            nodes,
607            topo_order,
608            current_tick,
609            resources: _,
610            buffers,
611            active_node_idx: _,
612            source_idx: _,
613            actor,
614            actor_ref: _,
615            parent_ref: _,
616        } = self;
617        drop(actor);
618        let nodes = Rc::try_unwrap(nodes).unwrap().into_inner();
619        (nodes, topo_order, current_tick, buffers)
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626    use rill_core::math::Transcendental;
627    use rill_core::time::ClockTick;
628    use rill_core::traits::{
629        Node, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
630        ProcessResult, Processor, Sink, Source,
631    };
632    use rill_core_actor::ActorSystem;
633    use std::sync::Arc;
634
635    fn test_system() -> ActorSystem {
636        ActorSystem::new()
637    }
638
639    fn test_factory<const B: usize>() -> Arc<NodeFactory<f32, B>> {
640        let mut f = NodeFactory::<f32, B>::new();
641
642        f.register_fn("test/const", |id, params| {
643            let value = params.get_f32("value", 1.0);
644            let mut node = ConstantSource::<f32, B>::new(id, value, params.sample_rate);
645            node.init(params.sample_rate);
646            NodeVariant::Source(Box::new(node))
647        });
648
649        f.register_fn("test/gain", |id, params| {
650            let gain = params.get_f32("gain", 1.0);
651            let mut node = GainProcessor::<f32, B>::new(id, params.sample_rate, gain);
652            node.init(params.sample_rate);
653            NodeVariant::Processor(Box::new(node))
654        });
655
656        f.register_fn("test/capture", |id, params| {
657            let mut node = CaptureSink::<f32, B>::new(id, params.sample_rate);
658            node.init(params.sample_rate);
659            NodeVariant::Sink(Box::new(node))
660        });
661
662        Arc::new(f)
663    }
664
665    fn test_builder<const B: usize>(factory: &Arc<NodeFactory<f32, B>>) -> GraphBuilder<f32, B> {
666        GraphBuilder::new(factory.clone(), Arc::new(BackendFactory::new()))
667    }
668
669    fn test_params(sample_rate: f32) -> Params {
670        let mut p = Params::new(sample_rate);
671        p.insert("value".to_string(), ParamValue::Float(sample_rate));
672        p
673    }
674
675    // ------------------------------------------------------------------------
676    // Test node implementations
677    // ------------------------------------------------------------------------
678
679    pub(crate) struct ConstantSource<T: Transcendental, const B: usize> {
680        id: NodeId,
681        value: T,
682        state: NodeState<T, B>,
683        output: Port<T, B>,
684    }
685
686    impl<T: Transcendental, const B: usize> ConstantSource<T, B> {
687        pub fn new(id: NodeId, value: T, sample_rate: f32) -> Self {
688            let state = NodeState::new(sample_rate);
689            let mut output = Port::output(id, 0, "out");
690            output.buffer = FixedBuffer::new();
691            Self {
692                id,
693                value,
694                state,
695                output,
696            }
697        }
698    }
699
700    impl<T: Transcendental, const B: usize> Node<T, B> for ConstantSource<T, B> {
701        fn id(&self) -> NodeId {
702            self.id
703        }
704        fn set_id(&mut self, id: NodeId) {
705            self.id = id;
706        }
707        fn metadata(&self) -> NodeMetadata {
708            NodeMetadata {
709                name: "ConstantSource".into(),
710                type_name: Some("test/const".into()),
711                category: NodeCategory::Source,
712                description: String::new(),
713                author: String::new(),
714                version: String::new(),
715                parameters: vec![],
716                signal_inputs: 0,
717                signal_outputs: 1,
718                control_inputs: 0,
719                control_outputs: 0,
720                clock_inputs: 0,
721                clock_outputs: 0,
722                feedback_ports: 0,
723            }
724        }
725        fn init(&mut self, _: f32) {}
726        fn reset(&mut self) {}
727        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
728            None
729        }
730        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
731            Ok(())
732        }
733        fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
734            None
735        }
736        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
737            None
738        }
739        fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
740            if i == 0 {
741                Some(&self.output)
742            } else {
743                None
744            }
745        }
746        fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
747            if i == 0 {
748                Some(&mut self.output)
749            } else {
750                None
751            }
752        }
753        fn input_port(&self, _: usize) -> Option<&Port<T, B>> {
754            None
755        }
756        fn input_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
757            None
758        }
759        fn state(&self) -> &NodeState<T, B> {
760            &self.state
761        }
762        fn state_mut(&mut self) -> &mut NodeState<T, B> {
763            &mut self.state
764        }
765    }
766
767    impl<T: Transcendental, const B: usize> Source<T, B> for ConstantSource<T, B> {
768        fn generate(&mut self, _: &ClockTick, _: &[T], _: &[ClockTick]) -> ProcessResult<()> {
769            self.output.buffer.as_mut_array().fill(self.value);
770            Ok(())
771        }
772    }
773
774    // ------------------------------------------------------------------------
775    // GainProcessor
776    // ------------------------------------------------------------------------
777
778    pub(crate) struct GainProcessor<T: Transcendental, const B: usize> {
779        id: NodeId,
780        gain: T,
781        state: NodeState<T, B>,
782        input: Port<T, B>,
783        output: Port<T, B>,
784    }
785
786    impl<T: Transcendental, const B: usize> GainProcessor<T, B> {
787        pub fn new(id: NodeId, sample_rate: f32, gain: T) -> Self {
788            let state = NodeState::new(sample_rate);
789            let input = Port::input(id, 0, "in");
790            let output = Port::output(id, 0, "out");
791            Self {
792                id,
793                gain,
794                state,
795                input,
796                output,
797            }
798        }
799    }
800
801    impl<T: Transcendental, const B: usize> Node<T, B> for GainProcessor<T, B> {
802        fn id(&self) -> NodeId {
803            self.id
804        }
805        fn set_id(&mut self, id: NodeId) {
806            self.id = id;
807        }
808        fn metadata(&self) -> NodeMetadata {
809            NodeMetadata {
810                name: "GainProcessor".into(),
811                type_name: Some("test/gain".into()),
812                category: NodeCategory::Processor,
813                description: String::new(),
814                author: String::new(),
815                version: String::new(),
816                parameters: vec![],
817                signal_inputs: 1,
818                signal_outputs: 1,
819                control_inputs: 0,
820                control_outputs: 0,
821                clock_inputs: 0,
822                clock_outputs: 0,
823                feedback_ports: 0,
824            }
825        }
826        fn init(&mut self, _: f32) {}
827        fn reset(&mut self) {}
828        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
829            None
830        }
831        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
832            Ok(())
833        }
834        fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
835            None
836        }
837        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
838            None
839        }
840        fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
841            if i == 0 {
842                Some(&self.input)
843            } else {
844                None
845            }
846        }
847        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
848            if i == 0 {
849                Some(&mut self.input)
850            } else {
851                None
852            }
853        }
854        fn num_signal_outputs(&self) -> usize {
855            1
856        }
857        fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
858            if i == 0 {
859                Some(&self.output)
860            } else {
861                None
862            }
863        }
864        fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
865            if i == 0 {
866                Some(&mut self.output)
867            } else {
868                None
869            }
870        }
871        fn state(&self) -> &NodeState<T, B> {
872            &self.state
873        }
874        fn state_mut(&mut self) -> &mut NodeState<T, B> {
875            &mut self.state
876        }
877    }
878
879    impl<T: Transcendental, const B: usize> Processor<T, B> for GainProcessor<T, B> {
880        fn process(
881            &mut self,
882            _: &ClockTick,
883            _: &[&[T; B]],
884            _: &[T],
885            _: &[ClockTick],
886            _: &[&[T; B]],
887        ) -> ProcessResult<()> {
888            let src = self.input.buffer.as_array();
889            let buf = self.output.buffer.as_mut_array();
890            for i in 0..B {
891                buf[i] = src[i] * self.gain;
892            }
893            Ok(())
894        }
895    }
896
897    // ------------------------------------------------------------------------
898    // CaptureSink — captures first sample of each block
899    // ------------------------------------------------------------------------
900
901    pub(crate) struct CaptureSink<T: Transcendental, const B: usize> {
902        id: NodeId,
903        state: NodeState<T, B>,
904        input: Port<T, B>,
905    }
906
907    impl<T: Transcendental, const B: usize> CaptureSink<T, B> {
908        pub fn new(id: NodeId, sample_rate: f32) -> Self {
909            let state = NodeState::new(sample_rate);
910            let input = Port::input(id, 0, "in");
911            Self { id, state, input }
912        }
913    }
914
915    impl<T: Transcendental, const B: usize> Node<T, B> for CaptureSink<T, B> {
916        fn id(&self) -> NodeId {
917            self.id
918        }
919        fn set_id(&mut self, id: NodeId) {
920            self.id = id;
921        }
922        fn metadata(&self) -> NodeMetadata {
923            NodeMetadata {
924                name: "CaptureSink".into(),
925                type_name: Some("test/capture".into()),
926                category: NodeCategory::Sink,
927                description: String::new(),
928                author: String::new(),
929                version: String::new(),
930                parameters: vec![],
931                signal_inputs: 1,
932                signal_outputs: 0,
933                control_inputs: 0,
934                control_outputs: 0,
935                clock_inputs: 0,
936                clock_outputs: 0,
937                feedback_ports: 0,
938            }
939        }
940        fn init(&mut self, _: f32) {}
941        fn reset(&mut self) {}
942        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
943            None
944        }
945        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
946            Ok(())
947        }
948        fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
949            None
950        }
951        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
952            None
953        }
954        fn output_port(&self, _: usize) -> Option<&Port<T, B>> {
955            None
956        }
957        fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
958            None
959        }
960        fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
961            if i == 0 {
962                Some(&self.input)
963            } else {
964                None
965            }
966        }
967        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
968            if i == 0 {
969                Some(&mut self.input)
970            } else {
971                None
972            }
973        }
974        fn state(&self) -> &NodeState<T, B> {
975            &self.state
976        }
977        fn state_mut(&mut self) -> &mut NodeState<T, B> {
978            &mut self.state
979        }
980    }
981
982    impl<T: Transcendental, const B: usize> Sink<T, B> for CaptureSink<T, B> {
983        fn consume(
984            &mut self,
985            _: &ClockTick,
986            _: &[&[T; B]],
987            _: &[T],
988            _: &[ClockTick],
989            _: &[&[T; B]],
990        ) -> ProcessResult<()> {
991            Ok(())
992        }
993    }
994
995    // ------------------------------------------------------------------------
996    // Graph signal flow tests
997    // ------------------------------------------------------------------------
998
999    const BUF: usize = 64;
1000
1001    #[test]
1002    #[allow(unsafe_code)]
1003    fn test_graph_source_to_sink() {
1004        let factory = test_factory::<BUF>();
1005        let mut builder = test_builder::<BUF>(&factory);
1006        let system = test_system();
1007
1008        let src_idx = builder.add_node("test/const", &test_params(44100.0));
1009        let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1010        builder.connect_signal(src_idx, 0, snk_idx, 0);
1011
1012        let graph = builder.build(&system).unwrap();
1013        let source_idx = graph.source_idx;
1014
1015        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1016        let mut ctx = ProcessContext { clock: &tick };
1017        let nodes = graph.nodes.clone();
1018        unsafe {
1019            let nv = &mut *nodes.get();
1020            nv[source_idx].process_block(&mut ctx).unwrap();
1021            let action_ctx = ActionContext::new(&tick);
1022            if let Some(port) = nv[source_idx].output_port(0) {
1023                port.propagate(port.buffer(), &action_ctx).unwrap();
1024            }
1025        }
1026        unsafe {
1027            let nv = &*nodes.get();
1028            let val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1029            assert!(val != 0.0, "signal should have propagated, got {}", val);
1030        }
1031    }
1032
1033    #[test]
1034    #[allow(unsafe_code)]
1035    fn test_graph_source_proc_sink() {
1036        let factory = test_factory::<BUF>();
1037        let mut builder = test_builder::<BUF>(&factory);
1038        let system = test_system();
1039
1040        let mut params = test_params(44100.0);
1041        params.insert("value".to_string(), ParamValue::Float(5.0));
1042        let src_idx = builder.add_node("test/const", &params);
1043
1044        let mut gain_params = test_params(44100.0);
1045        gain_params.insert("gain".to_string(), ParamValue::Float(3.0));
1046        let proc_idx = builder.add_node("test/gain", &gain_params);
1047
1048        let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1049
1050        builder.connect_signal(src_idx, 0, proc_idx, 0);
1051        builder.connect_signal(proc_idx, 0, snk_idx, 0);
1052
1053        let graph = builder.build(&system).unwrap();
1054        let source_idx = graph.source_idx;
1055
1056        eprintln!("topo: {:?}", graph.topo_order);
1057        eprintln!("source_idx: {source_idx}, src_idx: {src_idx}, proc_idx: {proc_idx}, snk_idx: {snk_idx}");
1058
1059        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1060        let mut ctx = ProcessContext { clock: &tick };
1061        let nodes = graph.nodes.clone();
1062        unsafe {
1063            let nv = &mut *nodes.get();
1064            eprintln!(
1065                "node types: src={:?}, proc={:?}, snk={:?}",
1066                std::mem::discriminant(&nv[0]),
1067                std::mem::discriminant(&nv[1]),
1068                std::mem::discriminant(&nv[2]),
1069            );
1070
1071            let _ = nv[source_idx].process_block(&mut ctx);
1072            let src_val = nv[source_idx].output_port(0).unwrap().buffer.as_array()[0];
1073            eprintln!("source output: {src_val}");
1074
1075            let action_ctx = ActionContext::new(&tick);
1076            let out_port = nv[source_idx].output_port(0).unwrap();
1077            eprintln!(
1078                "source output port downstream_nodes: {}",
1079                out_port.downstream_nodes.len()
1080            );
1081            eprintln!(
1082                "source output port downstream_input_ptrs: {}",
1083                out_port.downstream_input_ptrs.len()
1084            );
1085
1086            // Check processor output port connections BEFORE propagate
1087            {
1088                let proc_port = nv[proc_idx].output_port(0).unwrap();
1089                eprintln!(
1090                    "PROC OUT port downstream_nodes: {}",
1091                    proc_port.downstream_nodes.len()
1092                );
1093                eprintln!(
1094                    "PROC OUT port downstream_input_ptrs: {}",
1095                    proc_port.downstream_input_ptrs.len()
1096                );
1097                for (i, &dn) in proc_port.downstream.iter().enumerate() {
1098                    eprintln!("  downstream[{}]: (node={}, port={})", i, dn.0, dn.1);
1099                }
1100            }
1101
1102            // --- BUFFER ADDRESS DEBUG ---
1103            let src_out = nv[source_idx].output_port(0).unwrap();
1104            let proc_in = nv[proc_idx].input_port(0).unwrap();
1105            let proc_out = nv[proc_idx].output_port(0).unwrap();
1106            let snk_in = nv[snk_idx].input_port(0).unwrap();
1107            eprintln!("BUFFER ADDRESSES:");
1108            eprintln!(
1109                "  src output buf:  {:p}",
1110                src_out.buffer.as_array().as_ptr()
1111            );
1112            eprintln!(
1113                "  proc input buf:  {:p}",
1114                proc_in.buffer.as_array().as_ptr()
1115            );
1116            eprintln!(
1117                "  proc output buf: {:p}",
1118                proc_out.buffer.as_array().as_ptr()
1119            );
1120            eprintln!("  snk input buf:   {:p}", snk_in.buffer.as_array().as_ptr());
1121            eprintln!(
1122                "  proc_in.upstream_buffer.is_some(): {}",
1123                proc_in.upstream_buffer.is_some()
1124            );
1125            eprintln!(
1126                "  snk_in.upstream_buffer.is_some(): {}",
1127                snk_in.upstream_buffer.is_some()
1128            );
1129            // --- END DEBUG ---
1130
1131            out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1132
1133            // --- AFTER PROPAGATE: debug buffer values ---
1134            {
1135                let nv = &*nodes.get();
1136                let snk_in = nv[snk_idx].input_port(0).unwrap();
1137                eprintln!(
1138                    "AFTER propagate - snk input buf[0] via .buffer: {}",
1139                    snk_in.buffer.as_array()[0]
1140                );
1141                if let Some(up) = snk_in.upstream_buffer {
1142                    eprintln!(
1143                        "AFTER propagate - snk input via upstream ptr: {}",
1144                        (*up).as_array()[0]
1145                    );
1146                }
1147            }
1148
1149            let sink_buf = nv[snk_idx].input_port(0).unwrap().buffer.as_array();
1150            eprintln!("SINK input port buffer first sample: {}", sink_buf[0]);
1151
1152            // Check processor output port propagation
1153            let proc_out_port = nv[proc_idx].output_port(0).unwrap();
1154            eprintln!(
1155                "proc output port downstream_nodes: {}",
1156                proc_out_port.downstream_nodes.len()
1157            );
1158            eprintln!(
1159                "proc output port downstream_input_ptrs: {}",
1160                proc_out_port.downstream_input_ptrs.len()
1161            );
1162
1163            // Sink
1164            let sink_val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1165            eprintln!("sink input AFTER propagate: {sink_val}");
1166
1167            assert!(
1168                (sink_val - 15.0).abs() < 1e-4,
1169                "expected 15.0, got {}",
1170                sink_val
1171            );
1172        }
1173    }
1174}