Skip to main content

rill_graph/
graph.rs

1use crate::registry::{NodeRegistry, RegistryError};
2use rill_core::buffer::{BufferRegistry, FixedBuffer, TapeLoop};
3use rill_core::math::Transcendental;
4use rill_core::time::{ClockSource, ClockTick, SystemClock};
5use rill_core::traits::port::Port;
6use rill_core::traits::{SignalNode, NodeId, NodeParams, NodeVariant};
7use std::collections::VecDeque;
8
9// ============================================================================
10// Internal routing metadata
11// ============================================================================
12
13// ============================================================================
14// Build Errors
15// ============================================================================
16
17/// Errors that can occur during graph construction.
18#[derive(Debug, Clone)]
19pub enum BuildError {
20    /// A cycle was detected in the signal edge graph.
21    CycleDetected,
22}
23
24// ============================================================================
25// Graph Builder
26// ============================================================================
27
28// ============================================================================
29// Node Storage
30// ============================================================================
31
32pub(crate) struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
33    pub(crate) node: NodeVariant<T, BUF_SIZE>,
34}
35
36// ============================================================================
37// GraphBuilder (Mutable Construction)
38// ============================================================================
39
40/// A named resource (tape loop) shared between nodes in the graph.
41#[derive(Clone)]
42pub struct GraphResource {
43    /// Unique name referenced by node parameters.
44    pub name: String,
45    /// Resource kind string (`"tape"`).
46    pub kind: String,
47    /// Capacity in samples (for `"tape"` kind).
48    pub capacity: usize,
49}
50
51/// Mutable builder for an immutable signal graph.
52pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
53    nodes: Vec<NodeEntry<T, BUF_SIZE>>,
54    signal_edges: Vec<(usize, usize, usize, usize)>,
55    control_edges: Vec<(usize, usize, usize, usize)>,
56    clock_edges: Vec<(usize, usize, usize, usize)>,
57    feedback_edges: Vec<(usize, usize, usize, usize)>,
58    resources: Vec<GraphResource>,
59}
60
61impl<T: Transcendental, const BUF_SIZE: usize> Default for GraphBuilder<T, BUF_SIZE> {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
68    /// Create a new empty graph builder.
69    pub fn new() -> Self {
70        Self {
71            nodes: Vec::new(),
72            signal_edges: Vec::new(),
73            control_edges: Vec::new(),
74            clock_edges: Vec::new(),
75            feedback_edges: Vec::new(),
76            resources: Vec::new(),
77        }
78    }
79
80    /// Register a named resource.
81    pub fn add_resource(&mut self, resource: GraphResource) {
82        self.resources.push(resource);
83    }
84
85    /// Add a source node and return its index.
86    pub fn add_source(&mut self, source: Box<dyn rill_core::traits::Source<T, BUF_SIZE>>) -> usize {
87        let idx = self.nodes.len();
88        self.nodes.push(NodeEntry {
89            node: NodeVariant::Source(source),
90        });
91        idx
92    }
93
94    /// Add a processor node and return its index.
95    pub fn add_processor(
96        &mut self,
97        processor: Box<dyn rill_core::traits::Processor<T, BUF_SIZE>>,
98    ) -> usize {
99        let idx = self.nodes.len();
100        self.nodes.push(NodeEntry {
101            node: NodeVariant::Processor(processor),
102        });
103        idx
104    }
105
106    /// Add a sink node and return its index.
107    pub fn add_sink(&mut self, sink: Box<dyn rill_core::traits::Sink<T, BUF_SIZE>>) -> usize {
108        let idx = self.nodes.len();
109        self.nodes.push(NodeEntry {
110            node: NodeVariant::Sink(sink),
111        });
112        idx
113    }
114
115    /// Add a Router node (N→M configurable routing, no DSP).
116    pub fn add_router(
117        &mut self,
118        router: Box<dyn rill_core::traits::Router<T, BUF_SIZE>>,
119    ) -> usize {
120        let idx = self.nodes.len();
121        self.nodes.push(NodeEntry {
122            node: NodeVariant::Router(router),
123        });
124        idx
125    }
126
127    /// Add a node by type name via the registry.
128    ///
129    /// Looks up the type name in `registry`, calls its
130    /// NodeConstructor::construct, and pushes the resulting
131    /// [`NodeVariant`] into the graph. The node's [`NodeId`] is
132    /// automatically assigned from its position in the graph.
133    ///
134    /// Returns the index of the newly added node.
135    ///
136    /// # Errors
137    ///
138    /// Returns `RegistryError` if the type name is not registered or
139    /// construction fails.
140    pub fn add_node(
141        &mut self,
142        registry: &NodeRegistry<T, BUF_SIZE>,
143        type_name: &str,
144        params: &NodeParams,
145    ) -> Result<usize, RegistryError> {
146        let id = NodeId(self.nodes.len() as u32);
147        self.add_node_with_id(registry, type_name, params, id)
148    }
149
150    /// Add a node with an explicit [`NodeId`].
151    ///
152    /// Unlike [`add_node`](Self::add_node) which auto-assigns IDs, this
153    /// method uses the provided `id` directly. Important for serialization
154    /// where external references (e.g. patchbay bindings) depend on exact IDs.
155    ///
156    /// Returns the index (position) of the newly added node.
157    ///
158    /// # Errors
159    ///
160    /// Returns `RegistryError` if the type name is not registered or
161    /// construction fails.
162    ///
163    /// # Panics
164    ///
165    /// If `id` duplicates a previously registered ID the error is reported
166    /// by the caller — this method does not check for duplicates.
167    pub fn add_node_with_id(
168        &mut self,
169        registry: &NodeRegistry<T, BUF_SIZE>,
170        type_name: &str,
171        params: &NodeParams,
172        id: NodeId,
173    ) -> Result<usize, RegistryError> {
174        let node = registry.construct(type_name, id, params)?;
175        let idx = self.nodes.len();
176        self.nodes.push(NodeEntry { node });
177        Ok(idx)
178    }
179
180    /// Return the number of nodes added so far.
181    pub fn node_count(&self) -> usize {
182        self.nodes.len()
183    }
184
185    /// Connect signal output port `from_port` of node `from_node`
186    /// to signal input port `to_port` of node `to_node`.
187    pub fn connect_signal(
188        &mut self,
189        from_node: usize,
190        from_port: usize,
191        to_node: usize,
192        to_port: usize,
193    ) {
194        self.signal_edges
195            .push((from_node, from_port, to_node, to_port));
196    }
197
198    /// Connect a control output to a control input.
199    pub fn connect_control(
200        &mut self,
201        from_node: usize,
202        from_port: usize,
203        to_node: usize,
204        to_port: usize,
205    ) {
206        self.control_edges
207            .push((from_node, from_port, to_node, to_port));
208    }
209
210    /// Connect a clock output to a clock input.
211    pub fn connect_clock(
212        &mut self,
213        from_node: usize,
214        from_port: usize,
215        to_node: usize,
216        to_port: usize,
217    ) {
218        self.clock_edges
219            .push((from_node, from_port, to_node, to_port));
220    }
221
222    /// Connect a feedback output to a feedback input.
223    /// This creates a feedback path (previous output → current input).
224    pub fn connect_feedback(
225        &mut self,
226        from_node: usize,
227        from_port: usize,
228        to_node: usize,
229        to_port: usize,
230    ) {
231        self.feedback_edges
232            .push((from_node, from_port, to_node, to_port));
233    }
234
235    /// Build the immutable SignalGraph.
236    ///
237    /// # Errors
238    ///
239    /// Returns `BuildError::CycleDetected` if the signal edges contain a cycle.
240    pub fn build(
241        mut self,
242        clock_source: Box<dyn ClockSource>,
243    ) -> Result<SignalGraph<T, BUF_SIZE>, BuildError> {
244        let num_nodes = self.nodes.len();
245
246        // --- adjacency for Kahn (audio edges only; feedback is not a DAG edge) ---
247        let mut in_degree = vec![0usize; num_nodes];
248        let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
249
250        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
251            in_degree[to_n] += 1;
252            out_edges[from_n].push((from_p, to_n, to_p));
253        }
254
255        // --- Kahn's algorithm ---
256        let mut queue: VecDeque<usize> = in_degree
257            .iter()
258            .enumerate()
259            .filter(|(_, &d)| d == 0)
260            .map(|(i, _)| i)
261            .collect();
262
263        let mut topo = Vec::with_capacity(num_nodes);
264        let mut indeg = in_degree;
265        while let Some(idx) = queue.pop_front() {
266            topo.push(idx);
267            for &(_, to_n, _) in &out_edges[idx] {
268                indeg[to_n] -= 1;
269                if indeg[to_n] == 0 {
270                    queue.push_back(to_n);
271                }
272            }
273        }
274
275        if topo.len() != num_nodes {
276            return Err(BuildError::CycleDetected);
277        }
278
279        // --- populate Port::downstream, downstream_input_ptrs, parent ---
280        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
281            // downstream list (serialization)
282            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
283                port.downstream.push((to_n, to_p));
284            }
285            // Prepare pointers (safe: distinct indices in static DAG).
286            let in_ptr: *mut Port<T, BUF_SIZE> = self.nodes[to_n]
287                .node.input_port_mut(to_p)
288                .map(|p| p as *mut Port<T, BUF_SIZE>)
289                .unwrap_or(std::ptr::null_mut());
290            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
291            let out_ptr: *mut Port<T, BUF_SIZE> = self.nodes[from_n]
292                .node.output_port_mut(from_p)
293                .map(|p| p as *mut Port<T, BUF_SIZE>)
294                .unwrap_or(std::ptr::null_mut());
295            // Assign (safe: pointers were obtained without overlapping borrows).
296            if !in_ptr.is_null() && !out_ptr.is_null() {
297                #[allow(unsafe_code)]
298                unsafe {
299                    (*in_ptr).parent = parent;
300                    (*out_ptr).downstream_input_ptrs.push(in_ptr);
301                }
302            }
303        }
304
305        // --- downstream_nodes: unique downstream node pointers ---
306        for &(from_n, from_p, to_n, _) in &self.signal_edges {
307            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
308            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
309                let ptr_val = parent as usize;
310                let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
311                if !already {
312                    port.downstream_nodes.push(parent);
313                }
314            }
315        }
316
317        // --- upstream_buffer: zero-copy routing for 1:1 and fan-out ---
318        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
319            let upstream = self.nodes[from_n]
320                .node.output_port(from_p)
321                .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
322            if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
323                if port.upstream_buffer.is_none() {
324                    // First upstream: set zero-copy pointer
325                    port.upstream_buffer = upstream;
326                } else {
327                    // Fan-in: copy-based fallback
328                    port.upstream_buffer = None;
329                }
330            }
331        }
332
333        // --- enable feedback buffers on both output and input ports ---
334        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
335            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
336                port.feedback_buffer = Some(FixedBuffer::new());
337                port.feedback_downstream.push((to_n, to_p));
338            }
339            if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
340                port.feedback_buffer = Some(FixedBuffer::new());
341            }
342        }
343        // --- populate Port::feedback_ptrs on output ports ---
344        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
345            let ptr = self.nodes[to_n]
346                .node
347                .input_port(to_p)
348                .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
349                .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
350            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
351                if let Some(p) = ptr {
352                    port.feedback_ptrs.push(p);
353                }
354            }
355        }
356
357        let sample_rate = clock_source.sample_rate();
358
359        // Allocate named buffers (tape loops, etc.) from resource definitions.
360        let mut buffers = BufferRegistry::new();
361        for r in &self.resources {
362            if r.kind == "tape" {
363                if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
364                    buffers.register(&r.name, Box::new(tape));
365                }
366            }
367        }
368        let allocated = self.resources.clone();
369
370        Ok(SignalGraph {
371            nodes: self.nodes,
372            topo_order: topo,
373            clock_source,
374            resources: allocated,
375            current_tick: ClockTick::new(0, BUF_SIZE as u32, sample_rate),
376            buffers,
377        })
378    }
379}
380
381// ============================================================================
382// SignalGraph (Static DAG)
383// ============================================================================
384
385/// Immutable signal graph with static DAG topology.
386///
387/// Once built the graph cannot be modified. The graph owns no processing
388/// logic — it is a pure topology description. Processing is driven by
389/// port-level methods (`pre_process`, `snapshot_feedback`, `propagate`)
390/// called from external code (e.g. a real-time signal callback or an
391/// offline renderer).
392pub struct SignalGraph<T: Transcendental, const BUF_SIZE: usize> {
393    nodes: Vec<NodeEntry<T, BUF_SIZE>>,
394    topo_order: Vec<usize>,
395    #[allow(dead_code)]
396    clock_source: Box<dyn ClockSource>,
397    current_tick: ClockTick,
398    /// Resource metadata (name, kind, capacity) for serialization.
399    pub(crate) resources: Vec<GraphResource>,
400    /// Allocated buffer registry — named buffers shared between nodes.
401    pub buffers: BufferRegistry<T>,
402}
403
404impl<T: Transcendental, const BUF_SIZE: usize> SignalGraph<T, BUF_SIZE> {
405    /// Create an empty graph with the given clock source.
406    pub fn new(clock_source: Box<dyn ClockSource>) -> Self {
407        let sample_rate = clock_source.sample_rate();
408        Self {
409            nodes: Vec::new(),
410            topo_order: Vec::new(),
411            clock_source,
412            current_tick: ClockTick::new(0, BUF_SIZE as u32, sample_rate),
413            resources: Vec::new(),
414            buffers: BufferRegistry::new(),
415        }
416    }
417
418    /// Create an empty graph with a system clock at the given sample rate.
419    pub fn with_sample_rate(sample_rate: f32) -> Self {
420        Self::new(Box::new(SystemClock::with_sample_rate(sample_rate)))
421    }
422
423    /// Borrow an output port buffer (for inspection in tests).
424    pub fn output_buffer(&self, node_idx: usize, port_idx: usize) -> Option<&[T; BUF_SIZE]> {
425        self.nodes
426            .get(node_idx)?
427            .node
428            .output_port(port_idx)
429            .map(|p| p.buffer.as_array())
430    }
431
432    // ========================================================================
433    // Accessors
434    // ========================================================================
435
436    /// Return the current clock tick.
437    pub fn current_tick(&self) -> ClockTick {
438        self.current_tick
439    }
440
441    /// Return the number of nodes in the graph.
442    pub fn node_count(&self) -> usize {
443        self.nodes.len()
444    }
445
446    /// Return the topological ordering of node indices.
447    pub fn topo_order(&self) -> &[usize] {
448        &self.topo_order
449    }
450
451    // ── pub(crate) accessors for serialization ─────────────────────
452
453    #[allow(dead_code)]
454    pub(crate) fn node_entries(&self) -> &[NodeEntry<T, BUF_SIZE>] {
455        &self.nodes
456    }
457
458    #[allow(dead_code)]
459    pub(crate) fn sample_rate(&self) -> f32 {
460        self.current_tick.sample_rate
461    }
462
463    /// Access the named resources (tape loops, etc.) allocated for this graph.
464    #[allow(dead_code)]
465    pub fn resources(&self) -> &[GraphResource] {
466        &self.resources
467    }
468
469    // ── Dispatch ──────────────────────────────────────────────────
470
471    /// Dispatch `SetParameter` commands to their target nodes.
472    ///
473    /// Each command is routed to the node identified by `cmd.port.node_id()`
474    /// via that node's `apply_set_parameter` method.
475    pub fn dispatch_set_parameters(
476        &mut self,
477        commands: &[rill_core::queues::signal::SetParameter],
478    ) {
479        for cmd in commands {
480            let target = cmd.port.node_id();
481            for entry in self.nodes.iter_mut() {
482                if entry.node.id() == target {
483                    let _ = entry.node.apply_set_parameter(cmd);
484                    break;
485                }
486            }
487        }
488    }
489
490    /// Consume the graph and return its parts for the SignalEngine.
491    pub fn into_parts(
492        self,
493    ) -> (Vec<NodeVariant<T, BUF_SIZE>>, Vec<usize>, ClockTick) {
494        let nodes = self.nodes.into_iter().map(|e| e.node).collect();
495        (nodes, self.topo_order, self.current_tick)
496    }
497}
498
499// ============================================================================
500// Tests
501// ============================================================================
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use std::sync::Arc;
507    use rill_core::math::Transcendental;
508    use rill_core::time::ClockTick;
509    use rill_core::traits::{
510        algorithm::ActionContext,
511        processable::{ProcessContext, Processable},
512        SignalNode, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
513        PortDirection, PortId, ProcessResult, Processor, Sink, Source,
514    };
515
516    // ------------------------------------------------------------------------
517    // Mock: ConstantSource — fills output with a constant value
518    // ------------------------------------------------------------------------
519    struct ConstantSource<T: Transcendental, const BUF_SIZE: usize> {
520        value: T,
521        state: NodeState<T, BUF_SIZE>,
522        outputs: Vec<Port<T, BUF_SIZE>>,
523    }
524
525    impl<T: Transcendental, const BUF_SIZE: usize> ConstantSource<T, BUF_SIZE> {
526        fn new(value: T, sample_rate: f32) -> Self {
527            let mut outputs = Vec::with_capacity(1);
528            outputs.push(Port {
529                id: PortId::signal_out(NodeId(0), 0),
530                name: "output".into(),
531                direction: PortDirection::Output,
532                action: None,
533                pending_command: None,
534                buffer: Default::default(),
535                feedback_buffer: None,
536                downstream: Vec::new(),
537                feedback_downstream: Vec::new(),
538                feedback_ptrs: Vec::new(),
539                downstream_input_ptrs: Vec::new(),
540                downstream_nodes: Vec::new(),
541                parent: std::ptr::null_mut(),
542                upstream_buffer: None,
543            });
544            Self {
545                value,
546                state: NodeState::new(sample_rate),
547                outputs,
548            }
549        }
550    }
551
552    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
553        fn metadata(&self) -> NodeMetadata {
554            NodeMetadata {
555                type_name: None,
556                name: "ConstantSource".into(),
557                category: NodeCategory::Source,
558                description: String::new(),
559                author: String::new(),
560                version: "1.0".into(),
561                signal_inputs: 0,
562                signal_outputs: 1,
563                control_inputs: 0,
564                control_outputs: 0,
565                clock_inputs: 0,
566                clock_outputs: 0,
567                feedback_ports: 0,
568                parameters: vec![],
569            }
570        }
571        fn init(&mut self, _sample_rate: f32) {}
572        fn reset(&mut self) {}
573        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
574            None
575        }
576        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
577            Ok(())
578        }
579        fn id(&self) -> NodeId {
580            NodeId(0)
581        }
582        fn set_id(&mut self, _id: NodeId) {}
583        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
584            None
585        }
586        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
587            None
588        }
589        fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
590            self.outputs.get(index)
591        }
592        fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
593            self.outputs.get_mut(index)
594        }
595        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
596            None
597        }
598        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
599            None
600        }
601        fn state(&self) -> &NodeState<T, BUF_SIZE> {
602            &self.state
603        }
604        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
605            &mut self.state
606        }
607    }
608
609    impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
610        fn generate(
611            &mut self,
612            _clock: &ClockTick,
613            _control_inputs: &[T],
614            _clock_inputs: &[ClockTick],
615        ) -> ProcessResult<()> {
616            let out = self.outputs[0].buffer.as_mut_array();
617            for sample in out.iter_mut() {
618                *sample = self.value;
619            }
620            Ok(())
621        }
622        fn num_signal_outputs(&self) -> usize {
623            1
624        }
625    }
626
627    // ------------------------------------------------------------------------
628    // Mock: NoopProcessor — minimal processor for topology tests
629    // ------------------------------------------------------------------------
630    struct NoopProcessor<T: Transcendental, const BUF_SIZE: usize> {
631        state: NodeState<T, BUF_SIZE>,
632    }
633
634    impl<T: Transcendental, const BUF_SIZE: usize> NoopProcessor<T, BUF_SIZE> {
635        fn new(sample_rate: f32) -> Self {
636            Self {
637                state: NodeState::new(sample_rate),
638            }
639        }
640    }
641
642    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE> for NoopProcessor<T, BUF_SIZE> {
643        fn metadata(&self) -> NodeMetadata {
644            NodeMetadata {
645                type_name: None,
646                name: "NoopProcessor".into(),
647                category: NodeCategory::Processor,
648                description: String::new(),
649                author: String::new(),
650                version: "1.0".into(),
651                signal_inputs: 0,
652                signal_outputs: 0,
653                control_inputs: 0,
654                control_outputs: 0,
655                clock_inputs: 0,
656                clock_outputs: 0,
657                feedback_ports: 0,
658                parameters: vec![],
659            }
660        }
661        fn init(&mut self, _sample_rate: f32) {}
662        fn reset(&mut self) {}
663        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
664            None
665        }
666        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
667            Ok(())
668        }
669        fn id(&self) -> NodeId {
670            NodeId(1)
671        }
672        fn set_id(&mut self, _id: NodeId) {}
673        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
674            None
675        }
676        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
677            None
678        }
679        fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
680            None
681        }
682        fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
683            None
684        }
685        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
686            None
687        }
688        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
689            None
690        }
691        fn state(&self) -> &NodeState<T, BUF_SIZE> {
692            &self.state
693        }
694        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
695            &mut self.state
696        }
697    }
698
699    impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE> for NoopProcessor<T, BUF_SIZE> {
700        fn process(
701            &mut self,
702            _clock: &ClockTick,
703            _signal_inputs: &[&[T; BUF_SIZE]],
704            _control_inputs: &[T],
705            _clock_inputs: &[ClockTick],
706            _feedback_inputs: &[&[T; BUF_SIZE]],
707        ) -> ProcessResult<()> {
708            Ok(())
709        }
710    }
711
712    // ------------------------------------------------------------------------
713    // Mock: NoopSink — minimal sink for topology tests
714    // ------------------------------------------------------------------------
715    struct NoopSink<T: Transcendental, const BUF_SIZE: usize> {
716        state: NodeState<T, BUF_SIZE>,
717    }
718
719    impl<T: Transcendental, const BUF_SIZE: usize> NoopSink<T, BUF_SIZE> {
720        fn new(sample_rate: f32) -> Self {
721            Self {
722                state: NodeState::new(sample_rate),
723            }
724        }
725    }
726
727    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
728        fn metadata(&self) -> NodeMetadata {
729            NodeMetadata {
730                type_name: None,
731                name: "NoopSink".into(),
732                category: NodeCategory::Sink,
733                description: String::new(),
734                author: String::new(),
735                version: "1.0".into(),
736                signal_inputs: 0,
737                signal_outputs: 0,
738                control_inputs: 0,
739                control_outputs: 0,
740                clock_inputs: 0,
741                clock_outputs: 0,
742                feedback_ports: 0,
743                parameters: vec![],
744            }
745        }
746        fn init(&mut self, _sample_rate: f32) {}
747        fn reset(&mut self) {}
748        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
749            None
750        }
751        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
752            Ok(())
753        }
754        fn id(&self) -> NodeId {
755            NodeId(2)
756        }
757        fn set_id(&mut self, _id: NodeId) {}
758        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
759            None
760        }
761        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
762            None
763        }
764        fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
765            None
766        }
767        fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
768            None
769        }
770        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
771            None
772        }
773        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
774            None
775        }
776        fn state(&self) -> &NodeState<T, BUF_SIZE> {
777            &self.state
778        }
779        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
780            &mut self.state
781        }
782    }
783
784    impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
785        fn consume(
786            &mut self,
787            _clock: &ClockTick,
788            _signal_inputs: &[&[T; BUF_SIZE]],
789            _control_inputs: &[T],
790            _clock_inputs: &[ClockTick],
791            _feedback_inputs: &[&[T; BUF_SIZE]],
792        ) -> ProcessResult<()> {
793            Ok(())
794        }
795    }
796
797    // ========================================================================
798    // Tests
799    // ========================================================================
800
801    #[test]
802    fn test_graph_creation() {
803        let graph = SignalGraph::<f32, 64>::with_sample_rate(44100.0);
804        assert_eq!(graph.node_count(), 0);
805    }
806
807    #[test]
808    fn test_topo_order_correct() {
809        const BUF: usize = 64;
810        let mut builder = GraphBuilder::<f32, BUF>::new();
811
812        let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
813        let proc = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
814        let sink = builder.add_sink(Box::new(NoopSink::new(44100.0)));
815
816        builder.connect_signal(src, 0, proc, 0);
817        builder.connect_signal(proc, 0, sink, 0);
818
819        let graph = builder
820            .build(Box::new(SystemClock::with_sample_rate(44100.0)))
821            .expect("build failed");
822
823        let order = graph.topo_order();
824        let src_pos = order.iter().position(|&i| i == src).unwrap();
825        let proc_pos = order.iter().position(|&i| i == proc).unwrap();
826        let sink_pos = order.iter().position(|&i| i == sink).unwrap();
827        assert!(src_pos < proc_pos);
828        assert!(proc_pos < sink_pos);
829    }
830
831    #[test]
832    fn test_cycle_detection() {
833        const BUF: usize = 64;
834        let mut builder = GraphBuilder::<f32, BUF>::new();
835
836        let a = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
837        let b = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
838
839        builder.connect_signal(a, 0, b, 0);
840        builder.connect_signal(b, 0, a, 0);
841
842        let result = builder.build(Box::new(SystemClock::with_sample_rate(44100.0)));
843        assert!(matches!(result, Err(BuildError::CycleDetected)));
844    }
845
846    #[test]
847    fn test_source_node_create() {
848        const BUF: usize = 64;
849        let mut builder = GraphBuilder::<f32, BUF>::new();
850        let idx = builder.add_source(Box::new(ConstantSource::new(0.5, 44100.0)));
851        let graph = builder
852            .build(Box::new(SystemClock::with_sample_rate(44100.0)))
853            .expect("build failed");
854        assert_eq!(graph.node_count(), 1);
855        assert_eq!(graph.topo_order(), &[idx]);
856    }
857
858    // ========================================================================
859    // Port-based propagation tests
860    // ========================================================================
861
862    /// Simple Sink that captures its first input port for inspection.
863    pub struct TestSink<T: Transcendental, const BUF_SIZE: usize> {
864        id: NodeId,
865        state: NodeState<T, BUF_SIZE>,
866        pub inputs: Vec<Port<T, BUF_SIZE>>,
867        last_value: T,
868    }
869
870    impl<T: Transcendental, const BUF_SIZE: usize> TestSink<T, BUF_SIZE> {
871        fn new(id: NodeId, sample_rate: f32) -> Self {
872            let mut inputs = Vec::new();
873            inputs.push(Port::input(id, 0, "in"));
874            Self {
875                id,
876                state: NodeState::new(sample_rate),
877                inputs,
878                last_value: T::ZERO,
879            }
880        }
881        fn last_value(&self) -> T { self.last_value }
882    }
883
884    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
885        fn metadata(&self) -> NodeMetadata {
886            NodeMetadata {
887                type_name: None, name: "TestSink".into(), category: NodeCategory::Sink,
888                description: String::new(), author: String::new(), version: "1.0".into(),
889                signal_inputs: 1, signal_outputs: 0, control_inputs: 0, control_outputs: 0,
890                clock_inputs: 0, clock_outputs: 0, feedback_ports: 0, parameters: vec![],
891            }
892        }
893        fn init(&mut self, _: f32) {}
894        fn reset(&mut self) { self.state.sample_pos = 0; self.state.blocks_processed = 0; }
895        fn id(&self) -> NodeId { self.id } fn set_id(&mut self, id: NodeId) { self.id = id; }
896        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> { None }
897        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> { Ok(()) }
898        fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> { self.inputs.get(i) }
899        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> { self.inputs.get_mut(i) }
900        fn output_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> { None }
901        fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> { None }
902        fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> { None }
903        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> { None }
904        fn num_signal_inputs(&self) -> usize { 1 } fn num_signal_outputs(&self) -> usize { 0 }
905        fn state(&self) -> &NodeState<T, BUF_SIZE> { &self.state }
906        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> { &mut self.state }
907    }
908
909    impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
910        fn consume(
911            &mut self, _clock: &ClockTick,
912            _signal_inputs: &[&[T; BUF_SIZE]], _control_inputs: &[T],
913            _clock_inputs: &[ClockTick], _feedback_inputs: &[&[T; BUF_SIZE]],
914        ) -> ProcessResult<()> {
915            if let Some(port) = self.inputs.first() {
916                self.last_value = port.buffer.as_array()[0];
917            }
918            self.state.advance();
919            Ok(())
920        }
921    }
922
923    /// Processor with a `multiplier` parameter. Output = input × multiplier.
924    pub struct GainProcessor<T: Transcendental, const BUF_SIZE: usize> {
925        id: NodeId,
926        state: NodeState<T, BUF_SIZE>,
927        pub inputs: Vec<Port<T, BUF_SIZE>>,
928        pub outputs: Vec<Port<T, BUF_SIZE>>,
929        pub multiplier: T,
930    }
931
932    impl<T: Transcendental, const BUF_SIZE: usize> GainProcessor<T, BUF_SIZE> {
933        fn new(id: NodeId, sample_rate: f32, multiplier: T) -> Self {
934            let mut inputs = Vec::new();
935            inputs.push(Port::input(id, 0, "in"));
936            let mut outputs = Vec::new();
937            outputs.push(Port::output(id, 0, "out"));
938            Self {
939                id, state: NodeState::new(sample_rate),
940                inputs, outputs, multiplier,
941            }
942        }
943    }
944
945    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE> for GainProcessor<T, BUF_SIZE> {
946        fn metadata(&self) -> NodeMetadata {
947            NodeMetadata {
948                type_name: None, name: "GainProcessor".into(), category: NodeCategory::Processor,
949                description: String::new(), author: String::new(), version: "1.0".into(),
950                signal_inputs: 1, signal_outputs: 1, control_inputs: 0, control_outputs: 0,
951                clock_inputs: 0, clock_outputs: 0, feedback_ports: 0, parameters: vec![],
952            }
953        }
954        fn init(&mut self, _: f32) {}
955        fn reset(&mut self) { self.state.sample_pos = 0; self.state.blocks_processed = 0; }
956        fn id(&self) -> NodeId { self.id } fn set_id(&mut self, id: NodeId) { self.id = id; }
957        fn get_parameter(&self, id: &ParameterId) -> Option<ParamValue> {
958            match id.as_str() {
959                "multiplier" => Some(ParamValue::Float(self.multiplier.to_f32())),
960                _ => None,
961            }
962        }
963        fn set_parameter(&mut self, id: &ParameterId, value: ParamValue) -> ProcessResult<()> {
964            match id.as_str() {
965                "multiplier" => {
966                    if let Some(v) = value.as_f32() {
967                        self.multiplier = T::from_f32(v);
968                        Ok(())
969                    } else { Err(rill_core::ProcessError::parameter("expected float")) }
970                }
971                _ => Err(rill_core::ProcessError::parameter("unknown")),
972            }
973        }
974        fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> { self.inputs.get(i) }
975        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> { self.inputs.get_mut(i) }
976        fn output_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> { self.outputs.get(i) }
977        fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> { self.outputs.get_mut(i) }
978        fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> { None }
979        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> { None }
980        fn num_signal_inputs(&self) -> usize { 1 } fn num_signal_outputs(&self) -> usize { 1 }
981        fn state(&self) -> &NodeState<T, BUF_SIZE> { &self.state }
982        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> { &mut self.state }
983    }
984
985    impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE> for GainProcessor<T, BUF_SIZE> {
986        fn process(
987            &mut self, _clock: &ClockTick,
988            _signal_inputs: &[&[T; BUF_SIZE]], _control_inputs: &[T],
989            _clock_inputs: &[ClockTick], _feedback_inputs: &[&[T; BUF_SIZE]],
990        ) -> ProcessResult<()> {
991            let inp = *self.inputs[0].buffer.as_array();
992            let out = self.outputs[0].buffer.as_mut_array();
993            for i in 0..BUF_SIZE {
994                out[i] = inp[i] * self.multiplier;
995            }
996            self.state.advance();
997            Ok(())
998        }
999        fn latency(&self) -> usize { 0 }
1000    }
1001
1002    // ── Test: Source → Sink via GraphBuilder ────────────────────────
1003
1004    #[test]
1005    fn test_graph_source_to_sink() {
1006        const BUF: usize = 64;
1007        let mut builder = GraphBuilder::<f32, BUF>::new();
1008        let src = builder.add_source(Box::new(ConstantSource::new(42.0, 44100.0)));
1009        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(1), 44100.0)));
1010        builder.connect_signal(src, 0, snk, 0);
1011        let graph = builder.build(Box::new(SystemClock::with_sample_rate(44100.0))).unwrap();
1012
1013        let (mut nodes, topo, _) = graph.into_parts();
1014        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1015
1016        // Process source
1017        let mut ctx = ProcessContext { clock: &tick };
1018        let _ = nodes[topo[0]].process_block(&mut ctx);
1019
1020        // Propagate through builder-wired connections
1021        let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1022        let out_port = nodes[topo[0]].output_port(0).unwrap();
1023        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1024
1025        let sink_val = nodes[topo[1]].input_port(0).unwrap().buffer.as_array()[0];
1026        assert_eq!(sink_val, 42.0, "sink should receive source value");
1027    }
1028
1029    // ── Test: Source → Processor → Sink via GraphBuilder ────────────
1030
1031    #[test]
1032    fn test_graph_source_proc_sink() {
1033        const BUF: usize = 64;
1034        let mut builder = GraphBuilder::<f32, BUF>::new();
1035        let src = builder.add_source(Box::new(ConstantSource::new(10.0, 44100.0)));
1036        let proc = builder.add_processor(Box::new(
1037            GainProcessor::<f32, BUF>::new(NodeId(1), 44100.0, 3.0)));
1038        let snk = builder.add_sink(Box::new(
1039            TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1040        builder.connect_signal(src, 0, proc, 0);
1041        builder.connect_signal(proc, 0, snk, 0);
1042        let graph = builder.build(Box::new(SystemClock::with_sample_rate(44100.0))).unwrap();
1043
1044        let (mut nodes, topo, _) = graph.into_parts();
1045        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1046
1047        // Process source
1048        let mut ctx = ProcessContext { clock: &tick };
1049        let _ = nodes[topo[0]].process_block(&mut ctx);
1050
1051        // Propagate — should traverse source→processor→sink recursively
1052        let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1053        let out_port = nodes[topo[0]].output_port(0).unwrap();
1054        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1055
1056        let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1057        assert!((sink_val - 30.0).abs() < 1e-6,
1058            "source(10)×gain(3)=30, got {}", sink_val);
1059    }
1060
1061    //     ── Test: Command queue drain ───────────────────────────────────
1062
1063    #[test]
1064    fn test_command_queue_drain() {
1065        use rill_core::queues::MpscQueue;
1066        use rill_patchbay::control::ParameterCommand;
1067
1068        const BUF: usize = 64;
1069        let queue: Arc<MpscQueue<ParameterCommand>> = Arc::new(MpscQueue::new());
1070
1071        let mut builder = GraphBuilder::<f32, BUF>::new();
1072        builder.add_processor(Box::new(
1073            GainProcessor::<f32, BUF>::new(NodeId(0), 44100.0, 2.0)));
1074        let graph = builder.build(Box::new(SystemClock::with_sample_rate(44100.0))).unwrap();
1075        let (mut nodes, _, _) = graph.into_parts();
1076
1077        queue.push(ParameterCommand::new(NodeId(0), "multiplier", 5.0));
1078
1079        while let Some(cmd) = queue.pop() {
1080            let idx = cmd.node_id.inner() as usize;
1081            let pid = ParameterId::new(&cmd.param).unwrap();
1082            nodes[idx].set_parameter(&pid, ParamValue::Float(cmd.value)).unwrap();
1083        }
1084
1085        let pid = ParameterId::new("multiplier").unwrap();
1086        let val = nodes[0].get_parameter(&pid).unwrap().as_f32().unwrap();
1087        assert!((val - 5.0).abs() < 1e-6, "multiplier should be 5.0, got {}", val);
1088    }
1089
1090    // ── Test: Queue + propagate ─────────────────────────────────────
1091
1092    #[test]
1093    fn test_command_then_propagate() {
1094        use rill_core::queues::MpscQueue;
1095        use rill_patchbay::control::ParameterCommand;
1096
1097        const BUF: usize = 64;
1098        let queue: Arc<MpscQueue<ParameterCommand>> = Arc::new(MpscQueue::new());
1099
1100        let mut builder = GraphBuilder::<f32, BUF>::new();
1101        let src = builder.add_source(Box::new(ConstantSource::new(7.0, 44100.0)));
1102        let proc = builder.add_processor(Box::new(
1103            GainProcessor::<f32, BUF>::new(NodeId(1), 44100.0, 2.0)));
1104        let snk = builder.add_sink(Box::new(
1105            TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1106        builder.connect_signal(src, 0, proc, 0);
1107        builder.connect_signal(proc, 0, snk, 0);
1108        let graph = builder.build(Box::new(SystemClock::with_sample_rate(44100.0))).unwrap();
1109        let (mut nodes, topo, _) = graph.into_parts();
1110        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1111
1112        // Push command and drain
1113        queue.push(ParameterCommand::new(NodeId(1), "multiplier", 4.0));
1114        while let Some(cmd) = queue.pop() {
1115            let idx = cmd.node_id.inner() as usize;
1116            let pid = ParameterId::new(&cmd.param).unwrap();
1117            nodes[idx].set_parameter(&pid, ParamValue::Float(cmd.value)).unwrap();
1118        }
1119
1120        // Verify multiplier changed
1121        let pid = ParameterId::new("multiplier").unwrap();
1122        let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
1123        assert!((val - 4.0).abs() < 1e-6);
1124
1125        // Process + propagate
1126        let mut ctx = ProcessContext { clock: &tick };
1127        let _ = nodes[topo[0]].process_block(&mut ctx);
1128        let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1129        let out_port = nodes[topo[0]].output_port(0).unwrap();
1130        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1131
1132        let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1133        assert!((sink_val - 28.0).abs() < 1e-6,
1134            "source(7)×gain(4)=28, got {}", sink_val);
1135    }
1136
1137    // ── Test: Feedback propagation ──────────────────────────────────
1138
1139    #[test]
1140    fn test_feedback_propagation() {
1141        use rill_core::traits::algorithm::ActionContext;
1142
1143        const BUF: usize = 64;
1144        let mut builder = GraphBuilder::<f32, BUF>::new();
1145        let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
1146        let proc = builder.add_processor(Box::new(
1147            GainProcessor::<f32, BUF>::new(NodeId(1), 44100.0, 2.0)));
1148        let snk = builder.add_sink(Box::new(
1149            TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1150        // Signal path: source → processor → sink
1151        builder.connect_signal(src, 0, proc, 0);
1152        builder.connect_signal(proc, 0, snk, 0);
1153        // Feedback: processor output → processor input
1154        builder.connect_feedback(proc, 0, proc, 0);
1155        let graph = builder.build(Box::new(SystemClock::with_sample_rate(44100.0))).unwrap();
1156        let (mut nodes, topo, _) = graph.into_parts();
1157
1158        // ── Block 1: no feedback yet ──
1159        let tick1 = ClockTick::new(0, BUF as u32, 44100.0);
1160        let mut ctx = ProcessContext { clock: &tick1 };
1161        let _ = nodes[topo[0]].process_block(&mut ctx);  // source generates
1162        let ctx1 = ActionContext::new(&tick1);
1163        let out_port = nodes[topo[0]].output_port(0).unwrap();
1164        out_port.propagate(out_port.buffer(), &ctx1).unwrap();
1165        let block1 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1166        assert!((block1 - 2.0).abs() < 1e-6, "block1: 1.0×2.0=2.0, got {}", block1);
1167
1168        // ── Block 2: feedback from block1 should be mixed in ──
1169        let tick2 = ClockTick::new(BUF as u64, BUF as u32, 44100.0);
1170        let mut ctx = ProcessContext { clock: &tick2 };
1171        let _ = nodes[topo[0]].process_block(&mut ctx);  // source generates again
1172        let ctx2 = ActionContext::new(&tick2);
1173        let out_port = nodes[topo[0]].output_port(0).unwrap();
1174        out_port.propagate(out_port.buffer(), &ctx2).unwrap();
1175        let block2 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1176        // pre_process: input = 1.0 (source) + 2.0 (feedback from block1) = 3.0
1177        // process: 3.0 × 2.0 = 6.0
1178        assert!((block2 - 6.0).abs() < 1e-6, "block2: (1+2)×2=6.0, got {}", block2);
1179    }
1180
1181    // ── Test: drain_fn pattern (as used by AudioInput) ──────────────
1182
1183    #[test]
1184    fn test_drain_fn_before_propagate() {
1185        use rill_core::queues::MpscQueue;
1186        use rill_patchbay::control::ParameterCommand;
1187
1188        const BUF: usize = 64;
1189        let queue: Arc<MpscQueue<ParameterCommand>> = Arc::new(MpscQueue::new());
1190
1191        let mut builder = GraphBuilder::<f32, BUF>::new();
1192        let src = builder.add_source(Box::new(ConstantSource::new(5.0, 44100.0)));
1193        let proc = builder.add_processor(Box::new(
1194            GainProcessor::<f32, BUF>::new(NodeId(1), 44100.0, 1.0)));
1195        let snk = builder.add_sink(Box::new(
1196            TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1197        builder.connect_signal(src, 0, proc, 0);
1198        builder.connect_signal(proc, 0, snk, 0);
1199        let graph = builder.build(Box::new(SystemClock::with_sample_rate(44100.0))).unwrap();
1200        let (mut nodes, topo, _) = graph.into_parts();
1201        let nodes_ptr: *mut [NodeVariant<f32, BUF>] = &mut *nodes;
1202
1203        // drain_fn — exactly as AudioInput creates it
1204        let drain_fn: Box<dyn Fn(&mut [NodeVariant<f32, BUF>])> = {
1205            let q = queue.clone();
1206            Box::new(move |nd: &mut [NodeVariant<f32, BUF>]| {
1207                while let Some(cmd) = q.pop() {
1208                    let idx = cmd.node_id.inner() as usize;
1209                    if idx < nd.len() {
1210                        if let Ok(pid) = ParameterId::new(&cmd.param) {
1211                            let _ = nd[idx].set_parameter(&pid, ParamValue::Float(cmd.value));
1212                        }
1213                    }
1214                }
1215            })
1216        };
1217
1218        // Push command BEFORE processing
1219        queue.push(ParameterCommand::new(NodeId(1), "multiplier", 3.0));
1220
1221        // Processing cycle exactly as AudioInput callback does:
1222        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1223
1224        // Step 1: drain
1225        #[allow(unsafe_code)]
1226        unsafe { drain_fn(&mut *nodes_ptr); }
1227
1228        // Verify parameter applied
1229        let pid = ParameterId::new("multiplier").unwrap();
1230        #[allow(unsafe_code)]
1231        let val = unsafe { (*nodes_ptr)[1].get_parameter(&pid).unwrap().as_f32().unwrap() };
1232        assert!((val - 3.0).abs() < 1e-6, "multiplier should be 3.0, got {}", val);
1233
1234        // Step 2: source generate
1235        let mut ctx = ProcessContext { clock: &tick };
1236        #[allow(unsafe_code)]
1237        unsafe { (*nodes_ptr)[topo[0]].process_block(&mut ctx).unwrap(); }
1238
1239        // Step 3: propagate
1240        let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1241        #[allow(unsafe_code)]
1242        let out_port = unsafe { (*nodes_ptr)[topo[0]].output_port(0).unwrap() };
1243        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1244
1245        // Verify: source(5) × gain(3) = 15
1246        #[allow(unsafe_code)]
1247        let sink_val = unsafe { (*nodes_ptr)[topo[2]].input_port(0).unwrap().buffer.as_array()[0] };
1248        assert!((sink_val - 15.0).abs() < 1e-6,
1249            "source(5)×gain(3)=15, got {}", sink_val);
1250    }
1251}