Skip to main content

rill_graph/
graph.rs

1use crate::backend_factory::BackendFactory;
2use crate::factory::NodeFactory;
3use rill_core::buffer::{Buffer, BufferRegistry, FixedBuffer, TapeLoop};
4use rill_core::math::Transcendental;
5use rill_core::queues::CommandEnum;
6use rill_core::time::{ClockTick, RenderContext, SystemClock};
7use rill_core::traits::port::Port;
8use rill_core::traits::processable::Processable;
9use rill_core::traits::ParamValue;
10use rill_core::traits::{Node, NodeId, NodeVariant, Params};
11use rill_core_actor::{Actor, ActorRef, ActorSystem};
12use std::cell::UnsafeCell;
13use std::collections::{HashMap, VecDeque};
14use std::rc::Rc;
15use std::sync::atomic::AtomicBool;
16use std::sync::Arc;
17
18// ============================================================================
19// Internal routing metadata
20// ============================================================================
21
22// ============================================================================
23// Build Errors
24// ============================================================================
25
26/// Errors that can occur during graph construction.
27#[derive(Debug, Clone)]
28pub enum BuildError {
29    /// A cycle was detected in the signal edge graph.
30    CycleDetected,
31    /// Backend creation failed.
32    Backend(String),
33    /// Factory registration error (unknown node type).
34    Registry(String),
35}
36
37impl std::fmt::Display for BuildError {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Self::CycleDetected => write!(f, "graph cycle detected"),
41            Self::Backend(msg) => write!(f, "backend error: {msg}"),
42            Self::Registry(msg) => write!(f, "registry error: {msg}"),
43        }
44    }
45}
46
47// ============================================================================
48// Graph Builder
49// ============================================================================
50
51// ============================================================================
52// Node Storage
53// ============================================================================
54
55/// A deferred node recipe — constructed at [`build`](GraphBuilder::build) time.
56struct NodeRecipe<T: Transcendental, const BUF_SIZE: usize> {
57    type_name: String,
58    id: NodeId,
59    params: Params,
60    backend: Option<String>,
61    routing_entries: Vec<(usize, usize, f32)>,
62    _phantom: std::marker::PhantomData<(T, [(); BUF_SIZE])>,
63}
64
65/// Temporary holder during build — wraps a constructed node for wiring.
66struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
67    node: NodeVariant<T, BUF_SIZE>,
68}
69
70// ============================================================================
71// GraphBuilder (Mutable Construction)
72// ============================================================================
73
74/// A named resource (tape loop) shared between nodes in the graph.
75#[derive(Clone)]
76pub struct GraphResource {
77    /// Unique name referenced by node parameters.
78    pub name: String,
79    /// Resource kind string (`"tape"`).
80    pub kind: String,
81    /// Capacity in samples (for `"tape"` kind).
82    pub capacity: usize,
83}
84
85/// Mutable builder for an immutable signal graph.
86///
87/// Stores deferred node recipes until [`build`](Self::build), which
88/// constructs all nodes, wires connections, and performs topological
89/// sort.  This keeps `GraphBuilder` `Send` — all non‑`Send` data
90/// is constructed inside the target thread.
91///
92/// # Node factory
93///
94/// The builder holds an [`Arc<NodeFactory>`] for constructing nodes by
95/// type name. Nodes registered via [`add_node_with_id`](Self::add_node_with_id)
96/// are only validated and constructed at [`build`](Self::build) time.
97pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
98    recipes: Vec<NodeRecipe<T, BUF_SIZE>>,
99    signal_edges: Vec<(usize, usize, usize, usize)>,
100    control_edges: Vec<(usize, usize, usize, usize)>,
101    clock_edges: Vec<(usize, usize, usize, usize)>,
102    feedback_edges: Vec<(usize, usize, usize, usize)>,
103    resources: Vec<GraphResource>,
104    /// Shared node factory (required, from Runtime).
105    factory: Arc<NodeFactory<T, BUF_SIZE>>,
106    /// Shared backend factory (required, from Runtime).
107    backend_factory: Arc<BackendFactory<T>>,
108    /// Default backend name for nodes that don't specify one explicitly.
109    default_backend: Option<String>,
110    /// Default backend parameters (sample_rate, buffer_size, channels).
111    backend_params: HashMap<String, ParamValue>,
112    /// Sample rate override. When set, used in [`build`](Self::build).
113    /// Populated from [`GraphDef::sample_rate`] during deserialization.
114    sample_rate: Option<f32>,
115    /// Parent RackCase ActorRef — Graph sends ClockTick here.
116    parent_ref: Option<ActorRef<CommandEnum>>,
117}
118
119impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
120    /// Create a new empty graph builder without a node factory.
121    pub fn new(
122        factory: Arc<NodeFactory<T, BUF_SIZE>>,
123        backend_factory: Arc<BackendFactory<T>>,
124    ) -> Self {
125        Self {
126            recipes: Vec::new(),
127            signal_edges: Vec::new(),
128            control_edges: Vec::new(),
129            clock_edges: Vec::new(),
130            feedback_edges: Vec::new(),
131            resources: Vec::new(),
132            factory,
133            backend_factory,
134            default_backend: None,
135            backend_params: HashMap::new(),
136            sample_rate: None,
137            parent_ref: None,
138        }
139    }
140
141    /// Add a node by type name using the internal factory.
142    ///
143    /// The type must be registered in the factory before [`build`](Self::build)
144    /// is called.
145    ///
146    /// Returns the index of the newly added node.
147    pub fn add_node(&mut self, type_name: &str, params: &Params) -> usize {
148        let id = NodeId(self.recipes.len() as u32);
149        self.add_node_with_id(type_name, params, id)
150    }
151
152    /// Add a node with an explicit [`NodeId`].
153    ///
154    /// Like [`add_node`](Self::add_node) but uses the provided `id`.
155    /// Important for serialization where external references depend on
156    /// exact IDs.
157    pub fn add_node_with_id(&mut self, type_name: &str, params: &Params, id: NodeId) -> usize {
158        let idx = self.recipes.len();
159        self.recipes.push(NodeRecipe {
160            type_name: type_name.to_string(),
161            id,
162            params: params.clone(),
163            backend: None,
164            routing_entries: Vec::new(),
165            _phantom: std::marker::PhantomData,
166        });
167        idx
168    }
169
170    /// Assign a named backend to the node at the given index.
171    pub fn set_node_backend(&mut self, idx: usize, name: String) {
172        if let Some(recipe) = self.recipes.get_mut(idx) {
173            recipe.backend = Some(name);
174        }
175    }
176
177    /// Store a routing matrix entry to be applied at build time.
178    pub fn add_routing_entry(&mut self, idx: usize, from: usize, to: usize, gain: f32) {
179        if let Some(recipe) = self.recipes.get_mut(idx) {
180            recipe.routing_entries.push((from, to, gain));
181        }
182    }
183
184    /// Register a named resource (tape loop, buffer, etc.).
185    pub fn add_resource(&mut self, resource: GraphResource) {
186        self.resources.push(resource);
187    }
188
189    /// Number of nodes added to the builder so far.
190    pub fn node_count(&self) -> usize {
191        self.recipes.len()
192    }
193
194    /// Set the default backend name and parameters. Nodes without an explicit
195    pub fn set_default_backend(&mut self, name: String, params: HashMap<String, ParamValue>) {
196        self.default_backend = Some(name);
197        self.backend_params = params;
198    }
199
200    /// Get the default backend name, if set.
201    pub fn default_backend_name(&self) -> Option<&String> {
202        self.default_backend.as_ref()
203    }
204
205    /// Set the sample rate for this builder.
206    pub fn set_sample_rate(&mut self, sr: f32) {
207        self.sample_rate = Some(sr);
208    }
209
210    /// Set the parent RackCase actor reference (Graph → parent ClockTick).
211    pub fn set_parent_ref(&mut self, parent: ActorRef<CommandEnum>) {
212        self.parent_ref = Some(parent);
213    }
214
215    /// Access the shared backend factory.
216    pub fn backend_factory(&self) -> &Arc<BackendFactory<T>> {
217        &self.backend_factory
218    }
219
220    /// Connect signal ports.
221    pub fn connect_signal(
222        &mut self,
223        from_node: usize,
224        from_port: usize,
225        to_node: usize,
226        to_port: usize,
227    ) {
228        self.signal_edges
229            .push((from_node, from_port, to_node, to_port));
230    }
231
232    /// Connect control ports (modulation values).
233    pub fn connect_control(
234        &mut self,
235        from_node: usize,
236        from_port: usize,
237        to_node: usize,
238        to_port: usize,
239    ) {
240        self.control_edges
241            .push((from_node, from_port, to_node, to_port));
242    }
243
244    /// Connect clock ports (timing events).
245    pub fn connect_clock(
246        &mut self,
247        from_node: usize,
248        from_port: usize,
249        to_node: usize,
250        to_port: usize,
251    ) {
252        self.clock_edges
253            .push((from_node, from_port, to_node, to_port));
254    }
255
256    /// Connect feedback ports (delay lines, state carryover).
257    pub fn connect_feedback(
258        &mut self,
259        from_node: usize,
260        from_port: usize,
261        to_node: usize,
262        to_port: usize,
263    ) {
264        self.feedback_edges
265            .push((from_node, from_port, to_node, to_port));
266    }
267
268    /// Build the graph.
269    ///
270    /// Creates backends for nodes that have a backend name set (via
271    /// `SourceDef::backend` / `SinkDef::backend` or the builder's default).  Finds the active
272    /// (driver) node and stores its index for [`Graph::run`].
273    pub fn build(self, system: &ActorSystem) -> Result<Graph<T, BUF_SIZE>, BuildError> {
274        // Phase 1: Construct all nodes from recipes
275        let mut node_entries: Vec<NodeEntry<T, BUF_SIZE>> = Vec::with_capacity(self.recipes.len());
276        for recipe in &self.recipes {
277            let node = self
278                .factory
279                .construct(&recipe.type_name, recipe.id, &recipe.params)
280                .map_err(|e| BuildError::Registry(format!("{e}")))?;
281            node_entries.push(NodeEntry { node });
282        }
283
284        // Apply pre-configured routing entries
285        for (idx, node) in node_entries.iter_mut().enumerate() {
286            for &(from, to, gain) in &self.recipes[idx].routing_entries {
287                if let NodeVariant::Router(ref mut router) = node.node {
288                    router.set_connection(from, to, T::from_f32(gain)).ok();
289                }
290            }
291        }
292
293        let num_nodes = node_entries.len();
294
295        // --- Phase 2: Resolve I/O backends for I/O nodes ---
296        let sr = self.sample_rate.unwrap_or(44100.0);
297        for (idx, recipe) in self.recipes.iter().enumerate() {
298            let name = match recipe.backend.as_ref() {
299                Some(n) => Some(n.clone()),
300                None => self.default_backend.clone(),
301            };
302            if let Some(ref name) = name {
303                let mut be_params = HashMap::new();
304                be_params.insert("sample_rate".into(), ParamValue::Float(sr));
305                be_params.insert("buffer_size".into(), ParamValue::Int(BUF_SIZE as i32));
306                if self.default_backend.as_ref() == Some(name) {
307                    for (k, v) in &self.backend_params {
308                        be_params.entry(k.clone()).or_insert_with(|| v.clone());
309                    }
310                }
311                let backend = self
312                    .backend_factory
313                    .create(name, &be_params)
314                    .map_err(BuildError::Backend)?;
315                if let Some(io_node) = node_entries[idx].node.as_io_node_mut() {
316                    io_node.resolve_backend(backend);
317                }
318            }
319        }
320
321        // --- Phase 3: adjacency for Kahn (signal edges only) ---
322        let mut in_degree = vec![0usize; num_nodes];
323        let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
324
325        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
326            in_degree[to_n] += 1;
327            out_edges[from_n].push((from_p, to_n, to_p));
328        }
329
330        // --- Kahn's algorithm ---
331        let mut queue: VecDeque<usize> = in_degree
332            .iter()
333            .enumerate()
334            .filter(|(_, &d)| d == 0)
335            .map(|(i, _)| i)
336            .collect();
337
338        let mut topo = Vec::with_capacity(num_nodes);
339        let mut indeg = in_degree;
340        while let Some(idx) = queue.pop_front() {
341            topo.push(idx);
342            for &(_, to_n, _) in &out_edges[idx] {
343                indeg[to_n] -= 1;
344                if indeg[to_n] == 0 {
345                    queue.push_back(to_n);
346                }
347            }
348        }
349
350        if topo.len() != num_nodes {
351            return Err(BuildError::CycleDetected);
352        }
353
354        // --- collect nodes into final Vec ---
355        let mut nodes: Vec<NodeVariant<T, BUF_SIZE>> =
356            node_entries.into_iter().map(|e| e.node).collect();
357
358        // --- Phase 4: port pointer wiring on the final nodes Vec ---
359        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
360            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
361                port.downstream.push((to_n, to_p));
362            }
363            let in_ptr: *mut Port<T, BUF_SIZE> = nodes[to_n]
364                .input_port_mut(to_p)
365                .map(|p| p as *mut Port<T, BUF_SIZE>)
366                .unwrap_or(std::ptr::null_mut());
367            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
368            let out_ptr: *mut Port<T, BUF_SIZE> = nodes[from_n]
369                .output_port_mut(from_p)
370                .map(|p| p as *mut Port<T, BUF_SIZE>)
371                .unwrap_or(std::ptr::null_mut());
372            if !in_ptr.is_null() && !out_ptr.is_null() {
373                #[allow(unsafe_code)]
374                unsafe {
375                    (*in_ptr).parent = parent;
376                    (*out_ptr).downstream_input_ptrs.push(in_ptr);
377                }
378            }
379        }
380
381        // --- downstream_nodes ---
382        for &(from_n, from_p, to_n, _) in &self.signal_edges {
383            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
384            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
385                let ptr_val = parent as usize;
386                let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
387                if !already {
388                    port.downstream_nodes.push(parent);
389                }
390            }
391        }
392
393        // --- upstream_buffer ---
394        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
395            let upstream = nodes[from_n]
396                .output_port(from_p)
397                .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
398            if let Some(port) = nodes[to_n].input_port_mut(to_p) {
399                if port.upstream_buffer.is_none() {
400                    port.upstream_buffer = upstream;
401                } else {
402                    port.upstream_buffer = None;
403                }
404            }
405        }
406
407        // --- feedback buffers ---
408        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
409            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
410                port.feedback_buffer = Some(FixedBuffer::new());
411                port.feedback_downstream.push((to_n, to_p));
412            }
413            if let Some(port) = nodes[to_n].input_port_mut(to_p) {
414                port.feedback_buffer = Some(FixedBuffer::new());
415            }
416        }
417        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
418            let ptr = nodes[to_n]
419                .input_port(to_p)
420                .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
421                .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
422            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
423                if let Some(p) = ptr {
424                    port.feedback_ptrs.push(p);
425                }
426            }
427        }
428
429        // Allocate named buffers
430        let mut buffers = BufferRegistry::new();
431        for r in &self.resources {
432            if r.kind == "tape" {
433                if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
434                    buffers.register(&r.name, Box::new(tape));
435                }
436            }
437        }
438        for entry in nodes.iter_mut() {
439            entry.resolve_resources(&buffers);
440        }
441
442        let source_idx = topo.first().copied().unwrap_or(0);
443        let mut active_node_idx = 0;
444        for (i, n) in nodes.iter_mut().enumerate() {
445            if n.as_active_node_mut().is_some() {
446                active_node_idx = i;
447                break;
448            }
449        }
450
451        let owned_buffers = buffers.into_inner();
452        let allocated = self.resources.clone();
453
454        // Wrap nodes in Rc<UnsafeCell<Vec<>>> — port pointers already valid in this Vec.
455        let nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>> = Rc::new(UnsafeCell::new(nodes));
456
457        let actor = system.spawn("graph", {
458            let n = nodes.clone();
459            #[allow(unsafe_code)]
460            move |msg: CommandEnum| {
461                if let CommandEnum::SetParameter(param) = msg {
462                    let idx = param.port.node_id().inner() as usize;
463                    unsafe {
464                        let nv = &mut *n.get();
465                        if idx < nv.len() {
466                            let _ = nv[idx].set_parameter(&param.parameter, param.value);
467                        }
468                    }
469                }
470            }
471        });
472
473        let actor_ref = actor.actor_ref();
474
475        Ok(Graph {
476            nodes,
477            topo_order: topo,
478            resources: allocated,
479            current_tick: ClockTick::new(0, BUF_SIZE as u32, sr),
480            buffers: owned_buffers,
481            source_idx,
482            active_node_idx,
483            actor: Some(actor),
484            actor_ref,
485            parent_ref: self.parent_ref.clone(),
486            system_clock: None,
487        })
488    }
489}
490
491// ============================================================================
492// Graph (Static DAG)
493// ============================================================================
494
495/// Immutable signal graph with static DAG topology.
496///
497/// Once built the graph cannot be modified. The graph owns no processing
498/// logic — it is a pure topology description. Processing is driven by
499/// port-level methods (`pre_process`, `snapshot_feedback`, `propagate`)
500/// called from external code (e.g. a real-time signal callback or an
501/// offline renderer).
502pub struct Graph<T: Transcendental, const BUF_SIZE: usize> {
503    nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>>,
504    topo_order: Vec<usize>,
505    source_idx: usize,
506    active_node_idx: usize,
507    current_tick: ClockTick,
508    pub(crate) resources: Vec<GraphResource>,
509    #[allow(dead_code)]
510    buffers: Vec<Box<dyn Buffer<T> + Send>>,
511    actor: Option<Actor<CommandEnum>>,
512    actor_ref: ActorRef<CommandEnum>,
513    parent_ref: Option<ActorRef<CommandEnum>>,
514    /// Optional shared system clock, updated by external sync sources (MIDI, JACK transport).
515    /// When set, the I/O callback reads BPM from it and creates `ClockTick::with_tempo`.
516    pub system_clock: Option<Arc<SystemClock>>,
517}
518
519impl<T: Transcendental, const BUF_SIZE: usize> Graph<T, BUF_SIZE> {
520    // ========================================================================
521    // Accessors
522    // ========================================================================
523
524    /// Borrow the node array (read-only).
525    #[allow(unsafe_code)]
526    pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
527        unsafe { &*self.nodes.get() }
528    }
529
530    /// Return the current clock tick.
531    pub fn current_tick(&self) -> ClockTick {
532        self.current_tick
533    }
534
535    /// Return the number of nodes in the graph.
536    #[allow(unsafe_code)]
537    pub fn node_count(&self) -> usize {
538        unsafe { (*self.nodes.get()).len() }
539    }
540
541    /// Return the topological ordering of node indices.
542    pub fn topo_order(&self) -> &[usize] {
543        &self.topo_order
544    }
545
546    #[allow(dead_code)]
547    pub(crate) fn sample_rate(&self) -> f32 {
548        self.current_tick.sample_rate
549    }
550
551    /// Access the named resources (tape loops, etc.) allocated for this graph.
552    #[allow(dead_code)]
553    pub fn resources(&self) -> &[GraphResource] {
554        &self.resources
555    }
556
557    /// Run graph processing through the active node.
558    #[allow(unsafe_code)]
559    pub fn run(&mut self, running: Arc<AtomicBool>) -> Result<(), String> {
560        let mut actor = self.actor.take().ok_or("graph already running")?;
561        let source = self.source_idx;
562        let parent = self.parent_ref.clone();
563        let nodes = self.nodes.clone();
564        let idx = self.active_node_idx;
565        let sys_clock = self.system_clock.clone();
566
567        let tick: Box<dyn FnMut(u64, f32)> = Box::new(move |sample_pos, sample_rate| {
568            actor.drain();
569            let ctx = if let Some(ref clock) = sys_clock {
570                RenderContext::with_tempo(
571                    sample_pos,
572                    BUF_SIZE as u32,
573                    sample_rate,
574                    clock.bpm() as f32,
575                )
576            } else {
577                RenderContext::new(sample_pos, BUF_SIZE as u32, sample_rate)
578            };
579            let tick = ClockTick::with_tempo(sample_pos, BUF_SIZE as u32, sample_rate, ctx.bpm());
580            unsafe {
581                let nv = &mut *nodes.get();
582                let _ = nv[source].process_block(&ctx);
583                for po in 0..nv[source].num_signal_outputs() {
584                    if let Some(port) = nv[source].output_port(po) {
585                        let _ = port.propagate(port.buffer(), &ctx);
586                    }
587                }
588            }
589            if let Some(ref parent) = parent {
590                parent.send(CommandEnum::ClockTick(tick));
591            }
592        });
593
594        unsafe {
595            self.nodes.get().as_mut().unwrap()[idx]
596                .as_active_node_mut()
597                .ok_or("no active node")?
598                .run(tick, running)
599        }
600    }
601
602    /// Obtain an [`ActorRef`] for sending commands to this graph.
603    pub fn handle(&self) -> ActorRef<CommandEnum> {
604        self.actor_ref.clone()
605    }
606
607    /// Consume the graph and return its owned parts (test only).
608    #[cfg(test)]
609    pub fn into_parts(
610        self,
611    ) -> (
612        Vec<NodeVariant<T, BUF_SIZE>>,
613        Vec<usize>,
614        ClockTick,
615        Vec<Box<dyn Buffer<T> + Send>>,
616    ) {
617        let Self {
618            nodes,
619            topo_order,
620            current_tick,
621            resources: _,
622            buffers,
623            active_node_idx: _,
624            source_idx: _,
625            actor,
626            actor_ref: _,
627            parent_ref: _,
628            system_clock: _,
629        } = self;
630        drop(actor);
631        let nodes = Rc::try_unwrap(nodes).unwrap().into_inner();
632        (nodes, topo_order, current_tick, buffers)
633    }
634}
635
636#[cfg(test)]
637mod tests {
638    use super::*;
639    use rill_core::math::Transcendental;
640    use rill_core::time::RenderContext;
641    use rill_core::traits::{
642        Node, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
643        ProcessResult, Processor, Sink, Source,
644    };
645    use rill_core_actor::ActorSystem;
646    use std::sync::Arc;
647
648    fn test_system() -> ActorSystem {
649        ActorSystem::new()
650    }
651
652    fn test_factory<const B: usize>() -> Arc<NodeFactory<f32, B>> {
653        let mut f = NodeFactory::<f32, B>::new();
654
655        f.register_fn("test/const", |id, params| {
656            let value = params.get_f32("value", 1.0);
657            let mut node = ConstantSource::<f32, B>::new(id, value, params.sample_rate);
658            node.init(params.sample_rate);
659            NodeVariant::Source(Box::new(node))
660        });
661
662        f.register_fn("test/gain", |id, params| {
663            let gain = params.get_f32("gain", 1.0);
664            let mut node = GainProcessor::<f32, B>::new(id, params.sample_rate, gain);
665            node.init(params.sample_rate);
666            NodeVariant::Processor(Box::new(node))
667        });
668
669        f.register_fn("test/capture", |id, params| {
670            let mut node = CaptureSink::<f32, B>::new(id, params.sample_rate);
671            node.init(params.sample_rate);
672            NodeVariant::Sink(Box::new(node))
673        });
674
675        Arc::new(f)
676    }
677
678    fn test_builder<const B: usize>(factory: &Arc<NodeFactory<f32, B>>) -> GraphBuilder<f32, B> {
679        GraphBuilder::new(factory.clone(), Arc::new(BackendFactory::new()))
680    }
681
682    fn test_params(sample_rate: f32) -> Params {
683        let mut p = Params::new(sample_rate);
684        p.insert("value".to_string(), ParamValue::Float(sample_rate));
685        p
686    }
687
688    // ------------------------------------------------------------------------
689    // Test node implementations
690    // ------------------------------------------------------------------------
691
692    pub(crate) struct ConstantSource<T: Transcendental, const B: usize> {
693        id: NodeId,
694        value: T,
695        state: NodeState<T, B>,
696        output: Port<T, B>,
697    }
698
699    impl<T: Transcendental, const B: usize> ConstantSource<T, B> {
700        pub fn new(id: NodeId, value: T, sample_rate: f32) -> Self {
701            let state = NodeState::new(sample_rate);
702            let mut output = Port::output(id, 0, "out");
703            output.buffer = FixedBuffer::new();
704            Self {
705                id,
706                value,
707                state,
708                output,
709            }
710        }
711    }
712
713    impl<T: Transcendental, const B: usize> Node<T, B> for ConstantSource<T, B> {
714        fn id(&self) -> NodeId {
715            self.id
716        }
717        fn set_id(&mut self, id: NodeId) {
718            self.id = id;
719        }
720        fn metadata(&self) -> NodeMetadata {
721            NodeMetadata {
722                name: "ConstantSource".into(),
723                type_name: Some("test/const".into()),
724                category: NodeCategory::Source,
725                description: String::new(),
726                author: String::new(),
727                version: String::new(),
728                parameters: vec![],
729                signal_inputs: 0,
730                signal_outputs: 1,
731                control_inputs: 0,
732                control_outputs: 0,
733                clock_inputs: 0,
734                clock_outputs: 0,
735                feedback_ports: 0,
736            }
737        }
738        fn init(&mut self, _: f32) {}
739        fn reset(&mut self) {}
740        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
741            None
742        }
743        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
744            Ok(())
745        }
746        fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
747            None
748        }
749        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
750            None
751        }
752        fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
753            if i == 0 {
754                Some(&self.output)
755            } else {
756                None
757            }
758        }
759        fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
760            if i == 0 {
761                Some(&mut self.output)
762            } else {
763                None
764            }
765        }
766        fn input_port(&self, _: usize) -> Option<&Port<T, B>> {
767            None
768        }
769        fn input_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
770            None
771        }
772        fn state(&self) -> &NodeState<T, B> {
773            &self.state
774        }
775        fn state_mut(&mut self) -> &mut NodeState<T, B> {
776            &mut self.state
777        }
778    }
779
780    impl<T: Transcendental, const B: usize> Source<T, B> for ConstantSource<T, B> {
781        fn generate(
782            &mut self,
783            _: &RenderContext,
784            _: &[T],
785            _: &[RenderContext],
786        ) -> ProcessResult<()> {
787            self.output.buffer.as_mut_array().fill(self.value);
788            Ok(())
789        }
790    }
791
792    // ------------------------------------------------------------------------
793    // GainProcessor
794    // ------------------------------------------------------------------------
795
796    pub(crate) struct GainProcessor<T: Transcendental, const B: usize> {
797        id: NodeId,
798        gain: T,
799        state: NodeState<T, B>,
800        input: Port<T, B>,
801        output: Port<T, B>,
802    }
803
804    impl<T: Transcendental, const B: usize> GainProcessor<T, B> {
805        pub fn new(id: NodeId, sample_rate: f32, gain: T) -> Self {
806            let state = NodeState::new(sample_rate);
807            let input = Port::input(id, 0, "in");
808            let output = Port::output(id, 0, "out");
809            Self {
810                id,
811                gain,
812                state,
813                input,
814                output,
815            }
816        }
817    }
818
819    impl<T: Transcendental, const B: usize> Node<T, B> for GainProcessor<T, B> {
820        fn id(&self) -> NodeId {
821            self.id
822        }
823        fn set_id(&mut self, id: NodeId) {
824            self.id = id;
825        }
826        fn metadata(&self) -> NodeMetadata {
827            NodeMetadata {
828                name: "GainProcessor".into(),
829                type_name: Some("test/gain".into()),
830                category: NodeCategory::Processor,
831                description: String::new(),
832                author: String::new(),
833                version: String::new(),
834                parameters: vec![],
835                signal_inputs: 1,
836                signal_outputs: 1,
837                control_inputs: 0,
838                control_outputs: 0,
839                clock_inputs: 0,
840                clock_outputs: 0,
841                feedback_ports: 0,
842            }
843        }
844        fn init(&mut self, _: f32) {}
845        fn reset(&mut self) {}
846        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
847            None
848        }
849        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
850            Ok(())
851        }
852        fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
853            None
854        }
855        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
856            None
857        }
858        fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
859            if i == 0 {
860                Some(&self.input)
861            } else {
862                None
863            }
864        }
865        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
866            if i == 0 {
867                Some(&mut self.input)
868            } else {
869                None
870            }
871        }
872        fn num_signal_outputs(&self) -> usize {
873            1
874        }
875        fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
876            if i == 0 {
877                Some(&self.output)
878            } else {
879                None
880            }
881        }
882        fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
883            if i == 0 {
884                Some(&mut self.output)
885            } else {
886                None
887            }
888        }
889        fn state(&self) -> &NodeState<T, B> {
890            &self.state
891        }
892        fn state_mut(&mut self) -> &mut NodeState<T, B> {
893            &mut self.state
894        }
895    }
896
897    impl<T: Transcendental, const B: usize> Processor<T, B> for GainProcessor<T, B> {
898        fn process(
899            &mut self,
900            _: &RenderContext,
901            _: &[&[T; B]],
902            _: &[T],
903            _: &[RenderContext],
904            _: &[&[T; B]],
905        ) -> ProcessResult<()> {
906            let src = self.input.buffer.as_array();
907            let buf = self.output.buffer.as_mut_array();
908            for i in 0..B {
909                buf[i] = src[i] * self.gain;
910            }
911            Ok(())
912        }
913    }
914
915    // ------------------------------------------------------------------------
916    // CaptureSink — captures first sample of each block
917    // ------------------------------------------------------------------------
918
919    pub(crate) struct CaptureSink<T: Transcendental, const B: usize> {
920        id: NodeId,
921        state: NodeState<T, B>,
922        input: Port<T, B>,
923    }
924
925    impl<T: Transcendental, const B: usize> CaptureSink<T, B> {
926        pub fn new(id: NodeId, sample_rate: f32) -> Self {
927            let state = NodeState::new(sample_rate);
928            let input = Port::input(id, 0, "in");
929            Self { id, state, input }
930        }
931    }
932
933    impl<T: Transcendental, const B: usize> Node<T, B> for CaptureSink<T, B> {
934        fn id(&self) -> NodeId {
935            self.id
936        }
937        fn set_id(&mut self, id: NodeId) {
938            self.id = id;
939        }
940        fn metadata(&self) -> NodeMetadata {
941            NodeMetadata {
942                name: "CaptureSink".into(),
943                type_name: Some("test/capture".into()),
944                category: NodeCategory::Sink,
945                description: String::new(),
946                author: String::new(),
947                version: String::new(),
948                parameters: vec![],
949                signal_inputs: 1,
950                signal_outputs: 0,
951                control_inputs: 0,
952                control_outputs: 0,
953                clock_inputs: 0,
954                clock_outputs: 0,
955                feedback_ports: 0,
956            }
957        }
958        fn init(&mut self, _: f32) {}
959        fn reset(&mut self) {}
960        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
961            None
962        }
963        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
964            Ok(())
965        }
966        fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
967            None
968        }
969        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
970            None
971        }
972        fn output_port(&self, _: usize) -> Option<&Port<T, B>> {
973            None
974        }
975        fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
976            None
977        }
978        fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
979            if i == 0 {
980                Some(&self.input)
981            } else {
982                None
983            }
984        }
985        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
986            if i == 0 {
987                Some(&mut self.input)
988            } else {
989                None
990            }
991        }
992        fn state(&self) -> &NodeState<T, B> {
993            &self.state
994        }
995        fn state_mut(&mut self) -> &mut NodeState<T, B> {
996            &mut self.state
997        }
998    }
999
1000    impl<T: Transcendental, const B: usize> Sink<T, B> for CaptureSink<T, B> {
1001        fn consume(
1002            &mut self,
1003            _: &RenderContext,
1004            _: &[&[T; B]],
1005            _: &[T],
1006            _: &[RenderContext],
1007            _: &[&[T; B]],
1008        ) -> ProcessResult<()> {
1009            Ok(())
1010        }
1011    }
1012
1013    // ------------------------------------------------------------------------
1014    // Graph signal flow tests
1015    // ------------------------------------------------------------------------
1016
1017    const BUF: usize = 64;
1018
1019    #[test]
1020    #[allow(unsafe_code)]
1021    fn test_graph_source_to_sink() {
1022        let factory = test_factory::<BUF>();
1023        let mut builder = test_builder::<BUF>(&factory);
1024        let system = test_system();
1025
1026        let src_idx = builder.add_node("test/const", &test_params(44100.0));
1027        let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1028        builder.connect_signal(src_idx, 0, snk_idx, 0);
1029
1030        let graph = builder.build(&system).unwrap();
1031        let source_idx = graph.source_idx;
1032
1033        let ctx = RenderContext::new(0, BUF as u32, 44100.0);
1034        let nodes = graph.nodes.clone();
1035        unsafe {
1036            let nv = &mut *nodes.get();
1037            nv[source_idx].process_block(&ctx).unwrap();
1038            if let Some(port) = nv[source_idx].output_port(0) {
1039                port.propagate(port.buffer(), &ctx).unwrap();
1040            }
1041        }
1042        unsafe {
1043            let nv = &*nodes.get();
1044            let val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1045            assert!(val != 0.0, "signal should have propagated, got {}", val);
1046        }
1047    }
1048
1049    #[test]
1050    #[allow(unsafe_code)]
1051    fn test_graph_source_proc_sink() {
1052        let factory = test_factory::<BUF>();
1053        let mut builder = test_builder::<BUF>(&factory);
1054        let system = test_system();
1055
1056        let mut params = test_params(44100.0);
1057        params.insert("value".to_string(), ParamValue::Float(5.0));
1058        let src_idx = builder.add_node("test/const", &params);
1059
1060        let mut gain_params = test_params(44100.0);
1061        gain_params.insert("gain".to_string(), ParamValue::Float(3.0));
1062        let proc_idx = builder.add_node("test/gain", &gain_params);
1063
1064        let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1065
1066        builder.connect_signal(src_idx, 0, proc_idx, 0);
1067        builder.connect_signal(proc_idx, 0, snk_idx, 0);
1068
1069        let graph = builder.build(&system).unwrap();
1070        let source_idx = graph.source_idx;
1071
1072        eprintln!("topo: {:?}", graph.topo_order);
1073        eprintln!("source_idx: {source_idx}, src_idx: {src_idx}, proc_idx: {proc_idx}, snk_idx: {snk_idx}");
1074
1075        let ctx = RenderContext::new(0, BUF as u32, 44100.0);
1076        let nodes = graph.nodes.clone();
1077        unsafe {
1078            let nv = &mut *nodes.get();
1079            eprintln!(
1080                "node types: src={:?}, proc={:?}, snk={:?}",
1081                std::mem::discriminant(&nv[0]),
1082                std::mem::discriminant(&nv[1]),
1083                std::mem::discriminant(&nv[2]),
1084            );
1085
1086            let _ = nv[source_idx].process_block(&ctx);
1087            let src_val = nv[source_idx].output_port(0).unwrap().buffer.as_array()[0];
1088            eprintln!("source output: {src_val}");
1089
1090            let out_port = nv[source_idx].output_port(0).unwrap();
1091            eprintln!(
1092                "source output port downstream_nodes: {}",
1093                out_port.downstream_nodes.len()
1094            );
1095            eprintln!(
1096                "source output port downstream_input_ptrs: {}",
1097                out_port.downstream_input_ptrs.len()
1098            );
1099
1100            // Check processor output port connections BEFORE propagate
1101            {
1102                let proc_port = nv[proc_idx].output_port(0).unwrap();
1103                eprintln!(
1104                    "PROC OUT port downstream_nodes: {}",
1105                    proc_port.downstream_nodes.len()
1106                );
1107                eprintln!(
1108                    "PROC OUT port downstream_input_ptrs: {}",
1109                    proc_port.downstream_input_ptrs.len()
1110                );
1111                for (i, &dn) in proc_port.downstream.iter().enumerate() {
1112                    eprintln!("  downstream[{}]: (node={}, port={})", i, dn.0, dn.1);
1113                }
1114            }
1115
1116            // --- BUFFER ADDRESS DEBUG ---
1117            let src_out = nv[source_idx].output_port(0).unwrap();
1118            let proc_in = nv[proc_idx].input_port(0).unwrap();
1119            let proc_out = nv[proc_idx].output_port(0).unwrap();
1120            let snk_in = nv[snk_idx].input_port(0).unwrap();
1121            eprintln!("BUFFER ADDRESSES:");
1122            eprintln!(
1123                "  src output buf:  {:p}",
1124                src_out.buffer.as_array().as_ptr()
1125            );
1126            eprintln!(
1127                "  proc input buf:  {:p}",
1128                proc_in.buffer.as_array().as_ptr()
1129            );
1130            eprintln!(
1131                "  proc output buf: {:p}",
1132                proc_out.buffer.as_array().as_ptr()
1133            );
1134            eprintln!("  snk input buf:   {:p}", snk_in.buffer.as_array().as_ptr());
1135            eprintln!(
1136                "  proc_in.upstream_buffer.is_some(): {}",
1137                proc_in.upstream_buffer.is_some()
1138            );
1139            eprintln!(
1140                "  snk_in.upstream_buffer.is_some(): {}",
1141                snk_in.upstream_buffer.is_some()
1142            );
1143            // --- END DEBUG ---
1144
1145            out_port.propagate(out_port.buffer(), &ctx).unwrap();
1146
1147            // --- AFTER PROPAGATE: debug buffer values ---
1148            {
1149                let nv = &*nodes.get();
1150                let snk_in = nv[snk_idx].input_port(0).unwrap();
1151                eprintln!(
1152                    "AFTER propagate - snk input buf[0] via .buffer: {}",
1153                    snk_in.buffer.as_array()[0]
1154                );
1155                if let Some(up) = snk_in.upstream_buffer {
1156                    eprintln!(
1157                        "AFTER propagate - snk input via upstream ptr: {}",
1158                        (*up).as_array()[0]
1159                    );
1160                }
1161            }
1162
1163            let sink_buf = nv[snk_idx].input_port(0).unwrap().buffer.as_array();
1164            eprintln!("SINK input port buffer first sample: {}", sink_buf[0]);
1165
1166            // Check processor output port propagation
1167            let proc_out_port = nv[proc_idx].output_port(0).unwrap();
1168            eprintln!(
1169                "proc output port downstream_nodes: {}",
1170                proc_out_port.downstream_nodes.len()
1171            );
1172            eprintln!(
1173                "proc output port downstream_input_ptrs: {}",
1174                proc_out_port.downstream_input_ptrs.len()
1175            );
1176
1177            // Sink
1178            let sink_val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1179            eprintln!("sink input AFTER propagate: {sink_val}");
1180
1181            assert!(
1182                (sink_val - 15.0).abs() < 1e-4,
1183                "expected 15.0, got {}",
1184                sink_val
1185            );
1186        }
1187    }
1188}