Skip to main content

rill_graph/
graph.rs

1use crate::registry::{NodeRegistry, RegistryError};
2use rill_core::buffer::{BufferRegistry, FixedBuffer, TapeLoop};
3use rill_core::math::Transcendental;
4use rill_core::time::{ClockSource, ClockTick, SystemClock};
5use rill_core::traits::port::Port;
6use rill_core::traits::{NodeId, NodeParams, NodeVariant, SignalNode};
7use std::collections::VecDeque;
8
9// ============================================================================
10// Internal routing metadata
11// ============================================================================
12
13// ============================================================================
14// Build Errors
15// ============================================================================
16
17/// Errors that can occur during graph construction.
18#[derive(Debug, Clone)]
19pub enum BuildError {
20    /// A cycle was detected in the signal edge graph.
21    CycleDetected,
22}
23
24// ============================================================================
25// Graph Builder
26// ============================================================================
27
28// ============================================================================
29// Node Storage
30// ============================================================================
31
32pub(crate) struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
33    pub(crate) node: NodeVariant<T, BUF_SIZE>,
34}
35
36// ============================================================================
37// GraphBuilder (Mutable Construction)
38// ============================================================================
39
40/// A named resource (tape loop) shared between nodes in the graph.
41#[derive(Clone)]
42pub struct GraphResource {
43    /// Unique name referenced by node parameters.
44    pub name: String,
45    /// Resource kind string (`"tape"`).
46    pub kind: String,
47    /// Capacity in samples (for `"tape"` kind).
48    pub capacity: usize,
49}
50
51/// Mutable builder for an immutable signal graph.
52pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
53    nodes: Vec<NodeEntry<T, BUF_SIZE>>,
54    signal_edges: Vec<(usize, usize, usize, usize)>,
55    control_edges: Vec<(usize, usize, usize, usize)>,
56    clock_edges: Vec<(usize, usize, usize, usize)>,
57    feedback_edges: Vec<(usize, usize, usize, usize)>,
58    resources: Vec<GraphResource>,
59}
60
61impl<T: Transcendental, const BUF_SIZE: usize> Default for GraphBuilder<T, BUF_SIZE> {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
68    /// Create a new empty graph builder.
69    pub fn new() -> Self {
70        Self {
71            nodes: Vec::new(),
72            signal_edges: Vec::new(),
73            control_edges: Vec::new(),
74            clock_edges: Vec::new(),
75            feedback_edges: Vec::new(),
76            resources: Vec::new(),
77        }
78    }
79
80    /// Register a named resource.
81    pub fn add_resource(&mut self, resource: GraphResource) {
82        self.resources.push(resource);
83    }
84
85    /// Add a source node and return its index.
86    pub fn add_source(&mut self, source: Box<dyn rill_core::traits::Source<T, BUF_SIZE>>) -> usize {
87        let idx = self.nodes.len();
88        self.nodes.push(NodeEntry {
89            node: NodeVariant::Source(source),
90        });
91        idx
92    }
93
94    /// Add a processor node and return its index.
95    pub fn add_processor(
96        &mut self,
97        processor: Box<dyn rill_core::traits::Processor<T, BUF_SIZE>>,
98    ) -> usize {
99        let idx = self.nodes.len();
100        self.nodes.push(NodeEntry {
101            node: NodeVariant::Processor(processor),
102        });
103        idx
104    }
105
106    /// Add a sink node and return its index.
107    pub fn add_sink(&mut self, sink: Box<dyn rill_core::traits::Sink<T, BUF_SIZE>>) -> usize {
108        let idx = self.nodes.len();
109        self.nodes.push(NodeEntry {
110            node: NodeVariant::Sink(sink),
111        });
112        idx
113    }
114
115    /// Add a Router node (N→M configurable routing, no DSP).
116    pub fn add_router(&mut self, router: Box<dyn rill_core::traits::Router<T, BUF_SIZE>>) -> usize {
117        let idx = self.nodes.len();
118        self.nodes.push(NodeEntry {
119            node: NodeVariant::Router(router),
120        });
121        idx
122    }
123
124    /// Add a node by type name via the registry.
125    ///
126    /// Looks up the type name in `registry`, calls its
127    /// NodeConstructor::construct, and pushes the resulting
128    /// [`NodeVariant`] into the graph. The node's [`NodeId`] is
129    /// automatically assigned from its position in the graph.
130    ///
131    /// Returns the index of the newly added node.
132    ///
133    /// # Errors
134    ///
135    /// Returns `RegistryError` if the type name is not registered or
136    /// construction fails.
137    pub fn add_node(
138        &mut self,
139        registry: &NodeRegistry<T, BUF_SIZE>,
140        type_name: &str,
141        params: &NodeParams,
142    ) -> Result<usize, RegistryError> {
143        let id = NodeId(self.nodes.len() as u32);
144        self.add_node_with_id(registry, type_name, params, id)
145    }
146
147    /// Add a node with an explicit [`NodeId`].
148    ///
149    /// Unlike [`add_node`](Self::add_node) which auto-assigns IDs, this
150    /// method uses the provided `id` directly. Important for serialization
151    /// where external references (e.g. patchbay bindings) depend on exact IDs.
152    ///
153    /// Returns the index (position) of the newly added node.
154    ///
155    /// # Errors
156    ///
157    /// Returns `RegistryError` if the type name is not registered or
158    /// construction fails.
159    ///
160    /// # Panics
161    ///
162    /// If `id` duplicates a previously registered ID the error is reported
163    /// by the caller — this method does not check for duplicates.
164    pub fn add_node_with_id(
165        &mut self,
166        registry: &NodeRegistry<T, BUF_SIZE>,
167        type_name: &str,
168        params: &NodeParams,
169        id: NodeId,
170    ) -> Result<usize, RegistryError> {
171        let node = registry.construct(type_name, id, params)?;
172        let idx = self.nodes.len();
173        self.nodes.push(NodeEntry { node });
174        Ok(idx)
175    }
176
177    /// Return the number of nodes added so far.
178    pub fn node_count(&self) -> usize {
179        self.nodes.len()
180    }
181
182    /// Connect signal output port `from_port` of node `from_node`
183    /// to signal input port `to_port` of node `to_node`.
184    pub fn connect_signal(
185        &mut self,
186        from_node: usize,
187        from_port: usize,
188        to_node: usize,
189        to_port: usize,
190    ) {
191        self.signal_edges
192            .push((from_node, from_port, to_node, to_port));
193    }
194
195    /// Connect a control output to a control input.
196    pub fn connect_control(
197        &mut self,
198        from_node: usize,
199        from_port: usize,
200        to_node: usize,
201        to_port: usize,
202    ) {
203        self.control_edges
204            .push((from_node, from_port, to_node, to_port));
205    }
206
207    /// Connect a clock output to a clock input.
208    pub fn connect_clock(
209        &mut self,
210        from_node: usize,
211        from_port: usize,
212        to_node: usize,
213        to_port: usize,
214    ) {
215        self.clock_edges
216            .push((from_node, from_port, to_node, to_port));
217    }
218
219    /// Connect a feedback output to a feedback input.
220    /// This creates a feedback path (previous output → current input).
221    pub fn connect_feedback(
222        &mut self,
223        from_node: usize,
224        from_port: usize,
225        to_node: usize,
226        to_port: usize,
227    ) {
228        self.feedback_edges
229            .push((from_node, from_port, to_node, to_port));
230    }
231
232    /// Build the immutable SignalGraph.
233    ///
234    /// # Errors
235    ///
236    /// Returns `BuildError::CycleDetected` if the signal edges contain a cycle.
237    pub fn build(
238        mut self,
239        clock_source: Box<dyn ClockSource>,
240    ) -> Result<SignalGraph<T, BUF_SIZE>, BuildError> {
241        let num_nodes = self.nodes.len();
242
243        // --- adjacency for Kahn (audio edges only; feedback is not a DAG edge) ---
244        let mut in_degree = vec![0usize; num_nodes];
245        let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
246
247        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
248            in_degree[to_n] += 1;
249            out_edges[from_n].push((from_p, to_n, to_p));
250        }
251
252        // --- Kahn's algorithm ---
253        let mut queue: VecDeque<usize> = in_degree
254            .iter()
255            .enumerate()
256            .filter(|(_, &d)| d == 0)
257            .map(|(i, _)| i)
258            .collect();
259
260        let mut topo = Vec::with_capacity(num_nodes);
261        let mut indeg = in_degree;
262        while let Some(idx) = queue.pop_front() {
263            topo.push(idx);
264            for &(_, to_n, _) in &out_edges[idx] {
265                indeg[to_n] -= 1;
266                if indeg[to_n] == 0 {
267                    queue.push_back(to_n);
268                }
269            }
270        }
271
272        if topo.len() != num_nodes {
273            return Err(BuildError::CycleDetected);
274        }
275
276        // --- populate Port::downstream, downstream_input_ptrs, parent ---
277        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
278            // downstream list (serialization)
279            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
280                port.downstream.push((to_n, to_p));
281            }
282            // Prepare pointers (safe: distinct indices in static DAG).
283            let in_ptr: *mut Port<T, BUF_SIZE> = self.nodes[to_n]
284                .node
285                .input_port_mut(to_p)
286                .map(|p| p as *mut Port<T, BUF_SIZE>)
287                .unwrap_or(std::ptr::null_mut());
288            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
289            let out_ptr: *mut Port<T, BUF_SIZE> = self.nodes[from_n]
290                .node
291                .output_port_mut(from_p)
292                .map(|p| p as *mut Port<T, BUF_SIZE>)
293                .unwrap_or(std::ptr::null_mut());
294            // Assign (safe: pointers were obtained without overlapping borrows).
295            if !in_ptr.is_null() && !out_ptr.is_null() {
296                #[allow(unsafe_code)]
297                unsafe {
298                    (*in_ptr).parent = parent;
299                    (*out_ptr).downstream_input_ptrs.push(in_ptr);
300                }
301            }
302        }
303
304        // --- downstream_nodes: unique downstream node pointers ---
305        for &(from_n, from_p, to_n, _) in &self.signal_edges {
306            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
307            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
308                let ptr_val = parent as usize;
309                let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
310                if !already {
311                    port.downstream_nodes.push(parent);
312                }
313            }
314        }
315
316        // --- upstream_buffer: zero-copy routing for 1:1 and fan-out ---
317        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
318            let upstream = self.nodes[from_n]
319                .node
320                .output_port(from_p)
321                .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
322            if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
323                if port.upstream_buffer.is_none() {
324                    // First upstream: set zero-copy pointer
325                    port.upstream_buffer = upstream;
326                } else {
327                    // Fan-in: copy-based fallback
328                    port.upstream_buffer = None;
329                }
330            }
331        }
332
333        // --- enable feedback buffers on both output and input ports ---
334        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
335            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
336                port.feedback_buffer = Some(FixedBuffer::new());
337                port.feedback_downstream.push((to_n, to_p));
338            }
339            if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
340                port.feedback_buffer = Some(FixedBuffer::new());
341            }
342        }
343        // --- populate Port::feedback_ptrs on output ports ---
344        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
345            let ptr = self.nodes[to_n]
346                .node
347                .input_port(to_p)
348                .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
349                .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
350            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
351                if let Some(p) = ptr {
352                    port.feedback_ptrs.push(p);
353                }
354            }
355        }
356
357        let sample_rate = clock_source.sample_rate();
358
359        // Allocate named buffers (tape loops, etc.) from resource definitions.
360        let mut buffers = BufferRegistry::new();
361        for r in &self.resources {
362            if r.kind == "tape" {
363                if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
364                    buffers.register(&r.name, Box::new(tape));
365                }
366            }
367        }
368        let allocated = self.resources.clone();
369
370        Ok(SignalGraph {
371            nodes: self.nodes,
372            topo_order: topo,
373            clock_source,
374            resources: allocated,
375            current_tick: ClockTick::new(0, BUF_SIZE as u32, sample_rate),
376            buffers,
377        })
378    }
379}
380
381// ============================================================================
382// SignalGraph (Static DAG)
383// ============================================================================
384
385/// Immutable signal graph with static DAG topology.
386///
387/// Once built the graph cannot be modified. The graph owns no processing
388/// logic — it is a pure topology description. Processing is driven by
389/// port-level methods (`pre_process`, `snapshot_feedback`, `propagate`)
390/// called from external code (e.g. a real-time signal callback or an
391/// offline renderer).
392pub struct SignalGraph<T: Transcendental, const BUF_SIZE: usize> {
393    nodes: Vec<NodeEntry<T, BUF_SIZE>>,
394    topo_order: Vec<usize>,
395    #[allow(dead_code)]
396    clock_source: Box<dyn ClockSource>,
397    current_tick: ClockTick,
398    /// Resource metadata (name, kind, capacity) for serialization.
399    pub(crate) resources: Vec<GraphResource>,
400    /// Allocated buffer registry — named buffers shared between nodes.
401    pub buffers: BufferRegistry<T>,
402}
403
404impl<T: Transcendental, const BUF_SIZE: usize> SignalGraph<T, BUF_SIZE> {
405    /// Create an empty graph with the given clock source.
406    pub fn new(clock_source: Box<dyn ClockSource>) -> Self {
407        let sample_rate = clock_source.sample_rate();
408        Self {
409            nodes: Vec::new(),
410            topo_order: Vec::new(),
411            clock_source,
412            current_tick: ClockTick::new(0, BUF_SIZE as u32, sample_rate),
413            resources: Vec::new(),
414            buffers: BufferRegistry::new(),
415        }
416    }
417
418    /// Create an empty graph with a system clock at the given sample rate.
419    pub fn with_sample_rate(sample_rate: f32) -> Self {
420        Self::new(Box::new(SystemClock::with_sample_rate(sample_rate)))
421    }
422
423    /// Borrow an output port buffer (for inspection in tests).
424    pub fn output_buffer(&self, node_idx: usize, port_idx: usize) -> Option<&[T; BUF_SIZE]> {
425        self.nodes
426            .get(node_idx)?
427            .node
428            .output_port(port_idx)
429            .map(|p| p.buffer.as_array())
430    }
431
432    // ========================================================================
433    // Accessors
434    // ========================================================================
435
436    /// Return the current clock tick.
437    pub fn current_tick(&self) -> ClockTick {
438        self.current_tick
439    }
440
441    /// Return the number of nodes in the graph.
442    pub fn node_count(&self) -> usize {
443        self.nodes.len()
444    }
445
446    /// Return the topological ordering of node indices.
447    pub fn topo_order(&self) -> &[usize] {
448        &self.topo_order
449    }
450
451    // ── pub(crate) accessors for serialization ─────────────────────
452
453    #[allow(dead_code)]
454    pub(crate) fn node_entries(&self) -> &[NodeEntry<T, BUF_SIZE>] {
455        &self.nodes
456    }
457
458    #[allow(dead_code)]
459    pub(crate) fn sample_rate(&self) -> f32 {
460        self.current_tick.sample_rate
461    }
462
463    /// Access the named resources (tape loops, etc.) allocated for this graph.
464    #[allow(dead_code)]
465    pub fn resources(&self) -> &[GraphResource] {
466        &self.resources
467    }
468
469    // ── Dispatch ──────────────────────────────────────────────────
470
471    /// Dispatch `SetParameter` commands to their target nodes.
472    ///
473    /// Each command is routed to the node identified by `cmd.port.node_id()`
474    /// via that node's `apply_set_parameter` method.
475    pub fn dispatch_set_parameters(
476        &mut self,
477        commands: &[rill_core::queues::signal::SetParameter],
478    ) {
479        for cmd in commands {
480            let target = cmd.port.node_id();
481            for entry in self.nodes.iter_mut() {
482                if entry.node.id() == target {
483                    let _ = entry.node.apply_set_parameter(cmd);
484                    break;
485                }
486            }
487        }
488    }
489
490    /// Consume the graph and return its parts for the SignalEngine.
491    pub fn into_parts(self) -> (Vec<NodeVariant<T, BUF_SIZE>>, Vec<usize>, ClockTick) {
492        let nodes = self.nodes.into_iter().map(|e| e.node).collect();
493        (nodes, self.topo_order, self.current_tick)
494    }
495}
496
497// ============================================================================
498// Tests
499// ============================================================================
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use rill_core::math::Transcendental;
505    use rill_core::time::ClockTick;
506    use rill_core::traits::{
507        processable::{ProcessContext, Processable},
508        NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
509        PortDirection, PortId, ProcessResult, Processor, SignalNode, Sink, Source,
510    };
511    use std::sync::Arc;
512
513    // ------------------------------------------------------------------------
514    // Mock: ConstantSource — fills output with a constant value
515    // ------------------------------------------------------------------------
516    struct ConstantSource<T: Transcendental, const BUF_SIZE: usize> {
517        value: T,
518        state: NodeState<T, BUF_SIZE>,
519        outputs: Vec<Port<T, BUF_SIZE>>,
520    }
521
522    impl<T: Transcendental, const BUF_SIZE: usize> ConstantSource<T, BUF_SIZE> {
523        fn new(value: T, sample_rate: f32) -> Self {
524            let mut outputs = Vec::with_capacity(1);
525            outputs.push(Port {
526                id: PortId::signal_out(NodeId(0), 0),
527                name: "output".into(),
528                direction: PortDirection::Output,
529                action: None,
530                pending_command: None,
531                buffer: Default::default(),
532                feedback_buffer: None,
533                downstream: Vec::new(),
534                feedback_downstream: Vec::new(),
535                feedback_ptrs: Vec::new(),
536                downstream_input_ptrs: Vec::new(),
537                downstream_nodes: Vec::new(),
538                parent: std::ptr::null_mut(),
539                upstream_buffer: None,
540            });
541            Self {
542                value,
543                state: NodeState::new(sample_rate),
544                outputs,
545            }
546        }
547    }
548
549    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
550        for ConstantSource<T, BUF_SIZE>
551    {
552        fn metadata(&self) -> NodeMetadata {
553            NodeMetadata {
554                type_name: None,
555                name: "ConstantSource".into(),
556                category: NodeCategory::Source,
557                description: String::new(),
558                author: String::new(),
559                version: "1.0".into(),
560                signal_inputs: 0,
561                signal_outputs: 1,
562                control_inputs: 0,
563                control_outputs: 0,
564                clock_inputs: 0,
565                clock_outputs: 0,
566                feedback_ports: 0,
567                parameters: vec![],
568            }
569        }
570        fn init(&mut self, _sample_rate: f32) {}
571        fn reset(&mut self) {}
572        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
573            None
574        }
575        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
576            Ok(())
577        }
578        fn id(&self) -> NodeId {
579            NodeId(0)
580        }
581        fn set_id(&mut self, _id: NodeId) {}
582        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
583            None
584        }
585        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
586            None
587        }
588        fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
589            self.outputs.get(index)
590        }
591        fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
592            self.outputs.get_mut(index)
593        }
594        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
595            None
596        }
597        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
598            None
599        }
600        fn state(&self) -> &NodeState<T, BUF_SIZE> {
601            &self.state
602        }
603        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
604            &mut self.state
605        }
606    }
607
608    impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
609        fn generate(
610            &mut self,
611            _clock: &ClockTick,
612            _control_inputs: &[T],
613            _clock_inputs: &[ClockTick],
614        ) -> ProcessResult<()> {
615            let out = self.outputs[0].buffer.as_mut_array();
616            for sample in out.iter_mut() {
617                *sample = self.value;
618            }
619            Ok(())
620        }
621        fn num_signal_outputs(&self) -> usize {
622            1
623        }
624    }
625
626    // ------------------------------------------------------------------------
627    // Mock: NoopProcessor — minimal processor for topology tests
628    // ------------------------------------------------------------------------
629    struct NoopProcessor<T: Transcendental, const BUF_SIZE: usize> {
630        state: NodeState<T, BUF_SIZE>,
631    }
632
633    impl<T: Transcendental, const BUF_SIZE: usize> NoopProcessor<T, BUF_SIZE> {
634        fn new(sample_rate: f32) -> Self {
635            Self {
636                state: NodeState::new(sample_rate),
637            }
638        }
639    }
640
641    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
642        for NoopProcessor<T, BUF_SIZE>
643    {
644        fn metadata(&self) -> NodeMetadata {
645            NodeMetadata {
646                type_name: None,
647                name: "NoopProcessor".into(),
648                category: NodeCategory::Processor,
649                description: String::new(),
650                author: String::new(),
651                version: "1.0".into(),
652                signal_inputs: 0,
653                signal_outputs: 0,
654                control_inputs: 0,
655                control_outputs: 0,
656                clock_inputs: 0,
657                clock_outputs: 0,
658                feedback_ports: 0,
659                parameters: vec![],
660            }
661        }
662        fn init(&mut self, _sample_rate: f32) {}
663        fn reset(&mut self) {}
664        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
665            None
666        }
667        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
668            Ok(())
669        }
670        fn id(&self) -> NodeId {
671            NodeId(1)
672        }
673        fn set_id(&mut self, _id: NodeId) {}
674        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
675            None
676        }
677        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
678            None
679        }
680        fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
681            None
682        }
683        fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
684            None
685        }
686        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
687            None
688        }
689        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
690            None
691        }
692        fn state(&self) -> &NodeState<T, BUF_SIZE> {
693            &self.state
694        }
695        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
696            &mut self.state
697        }
698    }
699
700    impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
701        for NoopProcessor<T, BUF_SIZE>
702    {
703        fn process(
704            &mut self,
705            _clock: &ClockTick,
706            _signal_inputs: &[&[T; BUF_SIZE]],
707            _control_inputs: &[T],
708            _clock_inputs: &[ClockTick],
709            _feedback_inputs: &[&[T; BUF_SIZE]],
710        ) -> ProcessResult<()> {
711            Ok(())
712        }
713    }
714
715    // ------------------------------------------------------------------------
716    // Mock: NoopSink — minimal sink for topology tests
717    // ------------------------------------------------------------------------
718    struct NoopSink<T: Transcendental, const BUF_SIZE: usize> {
719        state: NodeState<T, BUF_SIZE>,
720    }
721
722    impl<T: Transcendental, const BUF_SIZE: usize> NoopSink<T, BUF_SIZE> {
723        fn new(sample_rate: f32) -> Self {
724            Self {
725                state: NodeState::new(sample_rate),
726            }
727        }
728    }
729
730    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
731        fn metadata(&self) -> NodeMetadata {
732            NodeMetadata {
733                type_name: None,
734                name: "NoopSink".into(),
735                category: NodeCategory::Sink,
736                description: String::new(),
737                author: String::new(),
738                version: "1.0".into(),
739                signal_inputs: 0,
740                signal_outputs: 0,
741                control_inputs: 0,
742                control_outputs: 0,
743                clock_inputs: 0,
744                clock_outputs: 0,
745                feedback_ports: 0,
746                parameters: vec![],
747            }
748        }
749        fn init(&mut self, _sample_rate: f32) {}
750        fn reset(&mut self) {}
751        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
752            None
753        }
754        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
755            Ok(())
756        }
757        fn id(&self) -> NodeId {
758            NodeId(2)
759        }
760        fn set_id(&mut self, _id: NodeId) {}
761        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
762            None
763        }
764        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
765            None
766        }
767        fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
768            None
769        }
770        fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
771            None
772        }
773        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
774            None
775        }
776        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
777            None
778        }
779        fn state(&self) -> &NodeState<T, BUF_SIZE> {
780            &self.state
781        }
782        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
783            &mut self.state
784        }
785    }
786
787    impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
788        fn consume(
789            &mut self,
790            _clock: &ClockTick,
791            _signal_inputs: &[&[T; BUF_SIZE]],
792            _control_inputs: &[T],
793            _clock_inputs: &[ClockTick],
794            _feedback_inputs: &[&[T; BUF_SIZE]],
795        ) -> ProcessResult<()> {
796            Ok(())
797        }
798    }
799
800    // ========================================================================
801    // Tests
802    // ========================================================================
803
804    #[test]
805    fn test_graph_creation() {
806        let graph = SignalGraph::<f32, 64>::with_sample_rate(44100.0);
807        assert_eq!(graph.node_count(), 0);
808    }
809
810    #[test]
811    fn test_topo_order_correct() {
812        const BUF: usize = 64;
813        let mut builder = GraphBuilder::<f32, BUF>::new();
814
815        let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
816        let proc = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
817        let sink = builder.add_sink(Box::new(NoopSink::new(44100.0)));
818
819        builder.connect_signal(src, 0, proc, 0);
820        builder.connect_signal(proc, 0, sink, 0);
821
822        let graph = builder
823            .build(Box::new(SystemClock::with_sample_rate(44100.0)))
824            .expect("build failed");
825
826        let order = graph.topo_order();
827        let src_pos = order.iter().position(|&i| i == src).unwrap();
828        let proc_pos = order.iter().position(|&i| i == proc).unwrap();
829        let sink_pos = order.iter().position(|&i| i == sink).unwrap();
830        assert!(src_pos < proc_pos);
831        assert!(proc_pos < sink_pos);
832    }
833
834    #[test]
835    fn test_cycle_detection() {
836        const BUF: usize = 64;
837        let mut builder = GraphBuilder::<f32, BUF>::new();
838
839        let a = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
840        let b = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
841
842        builder.connect_signal(a, 0, b, 0);
843        builder.connect_signal(b, 0, a, 0);
844
845        let result = builder.build(Box::new(SystemClock::with_sample_rate(44100.0)));
846        assert!(matches!(result, Err(BuildError::CycleDetected)));
847    }
848
849    #[test]
850    fn test_source_node_create() {
851        const BUF: usize = 64;
852        let mut builder = GraphBuilder::<f32, BUF>::new();
853        let idx = builder.add_source(Box::new(ConstantSource::new(0.5, 44100.0)));
854        let graph = builder
855            .build(Box::new(SystemClock::with_sample_rate(44100.0)))
856            .expect("build failed");
857        assert_eq!(graph.node_count(), 1);
858        assert_eq!(graph.topo_order(), &[idx]);
859    }
860
861    // ========================================================================
862    // Port-based propagation tests
863    // ========================================================================
864
865    /// Simple Sink that captures its first input port for inspection.
866    pub struct TestSink<T: Transcendental, const BUF_SIZE: usize> {
867        id: NodeId,
868        state: NodeState<T, BUF_SIZE>,
869        pub inputs: Vec<Port<T, BUF_SIZE>>,
870        last_value: T,
871    }
872
873    impl<T: Transcendental, const BUF_SIZE: usize> TestSink<T, BUF_SIZE> {
874        fn new(id: NodeId, sample_rate: f32) -> Self {
875            let mut inputs = Vec::new();
876            inputs.push(Port::input(id, 0, "in"));
877            Self {
878                id,
879                state: NodeState::new(sample_rate),
880                inputs,
881                last_value: T::ZERO,
882            }
883        }
884        #[allow(dead_code)]
885        fn last_value(&self) -> T {
886            self.last_value
887        }
888    }
889
890    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
891        fn metadata(&self) -> NodeMetadata {
892            NodeMetadata {
893                type_name: None,
894                name: "TestSink".into(),
895                category: NodeCategory::Sink,
896                description: String::new(),
897                author: String::new(),
898                version: "1.0".into(),
899                signal_inputs: 1,
900                signal_outputs: 0,
901                control_inputs: 0,
902                control_outputs: 0,
903                clock_inputs: 0,
904                clock_outputs: 0,
905                feedback_ports: 0,
906                parameters: vec![],
907            }
908        }
909        fn init(&mut self, _: f32) {}
910        fn reset(&mut self) {
911            self.state.sample_pos = 0;
912            self.state.blocks_processed = 0;
913        }
914        fn id(&self) -> NodeId {
915            self.id
916        }
917        fn set_id(&mut self, id: NodeId) {
918            self.id = id;
919        }
920        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
921            None
922        }
923        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
924            Ok(())
925        }
926        fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
927            self.inputs.get(i)
928        }
929        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
930            self.inputs.get_mut(i)
931        }
932        fn output_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
933            None
934        }
935        fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
936            None
937        }
938        fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
939            None
940        }
941        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
942            None
943        }
944        fn num_signal_inputs(&self) -> usize {
945            1
946        }
947        fn num_signal_outputs(&self) -> usize {
948            0
949        }
950        fn state(&self) -> &NodeState<T, BUF_SIZE> {
951            &self.state
952        }
953        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
954            &mut self.state
955        }
956    }
957
958    impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
959        fn consume(
960            &mut self,
961            _clock: &ClockTick,
962            _signal_inputs: &[&[T; BUF_SIZE]],
963            _control_inputs: &[T],
964            _clock_inputs: &[ClockTick],
965            _feedback_inputs: &[&[T; BUF_SIZE]],
966        ) -> ProcessResult<()> {
967            if let Some(port) = self.inputs.first() {
968                self.last_value = port.buffer.as_array()[0];
969            }
970            self.state.advance();
971            Ok(())
972        }
973    }
974
975    /// Processor with a `multiplier` parameter. Output = input × multiplier.
976    pub struct GainProcessor<T: Transcendental, const BUF_SIZE: usize> {
977        id: NodeId,
978        state: NodeState<T, BUF_SIZE>,
979        pub inputs: Vec<Port<T, BUF_SIZE>>,
980        pub outputs: Vec<Port<T, BUF_SIZE>>,
981        pub multiplier: T,
982    }
983
984    impl<T: Transcendental, const BUF_SIZE: usize> GainProcessor<T, BUF_SIZE> {
985        fn new(id: NodeId, sample_rate: f32, multiplier: T) -> Self {
986            let mut inputs = Vec::new();
987            inputs.push(Port::input(id, 0, "in"));
988            let mut outputs = Vec::new();
989            outputs.push(Port::output(id, 0, "out"));
990            Self {
991                id,
992                state: NodeState::new(sample_rate),
993                inputs,
994                outputs,
995                multiplier,
996            }
997        }
998    }
999
1000    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
1001        for GainProcessor<T, BUF_SIZE>
1002    {
1003        fn metadata(&self) -> NodeMetadata {
1004            NodeMetadata {
1005                type_name: None,
1006                name: "GainProcessor".into(),
1007                category: NodeCategory::Processor,
1008                description: String::new(),
1009                author: String::new(),
1010                version: "1.0".into(),
1011                signal_inputs: 1,
1012                signal_outputs: 1,
1013                control_inputs: 0,
1014                control_outputs: 0,
1015                clock_inputs: 0,
1016                clock_outputs: 0,
1017                feedback_ports: 0,
1018                parameters: vec![],
1019            }
1020        }
1021        fn init(&mut self, _: f32) {}
1022        fn reset(&mut self) {
1023            self.state.sample_pos = 0;
1024            self.state.blocks_processed = 0;
1025        }
1026        fn id(&self) -> NodeId {
1027            self.id
1028        }
1029        fn set_id(&mut self, id: NodeId) {
1030            self.id = id;
1031        }
1032        fn get_parameter(&self, id: &ParameterId) -> Option<ParamValue> {
1033            match id.as_str() {
1034                "multiplier" => Some(ParamValue::Float(self.multiplier.to_f32())),
1035                _ => None,
1036            }
1037        }
1038        fn set_parameter(&mut self, id: &ParameterId, value: ParamValue) -> ProcessResult<()> {
1039            match id.as_str() {
1040                "multiplier" => {
1041                    if let Some(v) = value.as_f32() {
1042                        self.multiplier = T::from_f32(v);
1043                        Ok(())
1044                    } else {
1045                        Err(rill_core::ProcessError::parameter("expected float"))
1046                    }
1047                }
1048                _ => Err(rill_core::ProcessError::parameter("unknown")),
1049            }
1050        }
1051        fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1052            self.inputs.get(i)
1053        }
1054        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1055            self.inputs.get_mut(i)
1056        }
1057        fn output_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1058            self.outputs.get(i)
1059        }
1060        fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1061            self.outputs.get_mut(i)
1062        }
1063        fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1064            None
1065        }
1066        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1067            None
1068        }
1069        fn num_signal_inputs(&self) -> usize {
1070            1
1071        }
1072        fn num_signal_outputs(&self) -> usize {
1073            1
1074        }
1075        fn state(&self) -> &NodeState<T, BUF_SIZE> {
1076            &self.state
1077        }
1078        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
1079            &mut self.state
1080        }
1081    }
1082
1083    impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
1084        for GainProcessor<T, BUF_SIZE>
1085    {
1086        fn process(
1087            &mut self,
1088            _clock: &ClockTick,
1089            _signal_inputs: &[&[T; BUF_SIZE]],
1090            _control_inputs: &[T],
1091            _clock_inputs: &[ClockTick],
1092            _feedback_inputs: &[&[T; BUF_SIZE]],
1093        ) -> ProcessResult<()> {
1094            let inp = *self.inputs[0].buffer.as_array();
1095            let out = self.outputs[0].buffer.as_mut_array();
1096            for i in 0..BUF_SIZE {
1097                out[i] = inp[i] * self.multiplier;
1098            }
1099            self.state.advance();
1100            Ok(())
1101        }
1102        fn latency(&self) -> usize {
1103            0
1104        }
1105    }
1106
1107    // ── Test: Source → Sink via GraphBuilder ────────────────────────
1108
1109    #[test]
1110    fn test_graph_source_to_sink() {
1111        const BUF: usize = 64;
1112        let mut builder = GraphBuilder::<f32, BUF>::new();
1113        let src = builder.add_source(Box::new(ConstantSource::new(42.0, 44100.0)));
1114        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(1), 44100.0)));
1115        builder.connect_signal(src, 0, snk, 0);
1116        let graph = builder
1117            .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1118            .unwrap();
1119
1120        let (mut nodes, topo, _) = graph.into_parts();
1121        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1122
1123        // Process source
1124        let mut ctx = ProcessContext { clock: &tick };
1125        let _ = nodes[topo[0]].process_block(&mut ctx);
1126
1127        // Propagate through builder-wired connections
1128        let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1129        let out_port = nodes[topo[0]].output_port(0).unwrap();
1130        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1131
1132        let sink_val = nodes[topo[1]].input_port(0).unwrap().buffer.as_array()[0];
1133        assert_eq!(sink_val, 42.0, "sink should receive source value");
1134    }
1135
1136    // ── Test: Source → Processor → Sink via GraphBuilder ────────────
1137
1138    #[test]
1139    fn test_graph_source_proc_sink() {
1140        const BUF: usize = 64;
1141        let mut builder = GraphBuilder::<f32, BUF>::new();
1142        let src = builder.add_source(Box::new(ConstantSource::new(10.0, 44100.0)));
1143        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1144            NodeId(1),
1145            44100.0,
1146            3.0,
1147        )));
1148        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1149        builder.connect_signal(src, 0, proc, 0);
1150        builder.connect_signal(proc, 0, snk, 0);
1151        let graph = builder
1152            .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1153            .unwrap();
1154
1155        let (mut nodes, topo, _) = graph.into_parts();
1156        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1157
1158        // Process source
1159        let mut ctx = ProcessContext { clock: &tick };
1160        let _ = nodes[topo[0]].process_block(&mut ctx);
1161
1162        // Propagate — should traverse source→processor→sink recursively
1163        let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1164        let out_port = nodes[topo[0]].output_port(0).unwrap();
1165        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1166
1167        let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1168        assert!(
1169            (sink_val - 30.0).abs() < 1e-6,
1170            "source(10)×gain(3)=30, got {}",
1171            sink_val
1172        );
1173    }
1174
1175    //     ── Test: Command queue drain ───────────────────────────────────
1176
1177    #[test]
1178    fn test_command_queue_drain() {
1179        use rill_core::queues::{MpscQueue, SetParameter, SignalSource};
1180        use rill_core::traits::PortId;
1181
1182        const BUF: usize = 64;
1183        let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1184
1185        let mut builder = GraphBuilder::<f32, BUF>::new();
1186        builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1187            NodeId(0),
1188            44100.0,
1189            2.0,
1190        )));
1191        let graph = builder
1192            .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1193            .unwrap();
1194        let (mut nodes, _, _) = graph.into_parts();
1195
1196        let _ = queue.push(SetParameter::new(
1197            PortId::control_in(NodeId(0), 0),
1198            ParameterId::new("multiplier").unwrap(),
1199            5.0,
1200            SignalSource::Manual,
1201        ));
1202
1203        while let Some(cmd) = queue.pop() {
1204            let idx = cmd.port.node_id().inner() as usize;
1205            let pid = cmd.parameter.clone();
1206            let _ = nodes[idx].set_parameter(&pid, ParamValue::Float(cmd.value));
1207        }
1208
1209        let pid = ParameterId::new("multiplier").unwrap();
1210        let val = nodes[0].get_parameter(&pid).unwrap().as_f32().unwrap();
1211        assert!(
1212            (val - 5.0).abs() < 1e-6,
1213            "multiplier should be 5.0, got {}",
1214            val
1215        );
1216    }
1217
1218    // ── Test: Queue + propagate ─────────────────────────────────────
1219
1220    #[test]
1221    fn test_command_then_propagate() {
1222        use rill_core::queues::{MpscQueue, SetParameter, SignalSource};
1223        use rill_core::traits::PortId;
1224
1225        const BUF: usize = 64;
1226        let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1227
1228        let mut builder = GraphBuilder::<f32, BUF>::new();
1229        let src = builder.add_source(Box::new(ConstantSource::new(7.0, 44100.0)));
1230        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1231            NodeId(1),
1232            44100.0,
1233            2.0,
1234        )));
1235        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1236        builder.connect_signal(src, 0, proc, 0);
1237        builder.connect_signal(proc, 0, snk, 0);
1238        let graph = builder
1239            .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1240            .unwrap();
1241        let (mut nodes, topo, _) = graph.into_parts();
1242        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1243
1244        // Push command and drain
1245        let _ = queue.push(SetParameter::new(
1246            PortId::control_in(NodeId(1), 0),
1247            ParameterId::new("multiplier").unwrap(),
1248            4.0,
1249            SignalSource::Manual,
1250        ));
1251        while let Some(cmd) = queue.pop() {
1252            let idx = cmd.port.node_id().inner() as usize;
1253            let pid = cmd.parameter.clone();
1254            let _ = nodes[idx].set_parameter(&pid, ParamValue::Float(cmd.value));
1255        }
1256
1257        // Verify multiplier changed
1258        let pid = ParameterId::new("multiplier").unwrap();
1259        let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
1260        assert!((val - 4.0).abs() < 1e-6);
1261
1262        // Process + propagate
1263        let mut ctx = ProcessContext { clock: &tick };
1264        let _ = nodes[topo[0]].process_block(&mut ctx);
1265        let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1266        let out_port = nodes[topo[0]].output_port(0).unwrap();
1267        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1268
1269        let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1270        assert!(
1271            (sink_val - 28.0).abs() < 1e-6,
1272            "source(7)×gain(4)=28, got {}",
1273            sink_val
1274        );
1275    }
1276
1277    // ── Test: Feedback propagation ──────────────────────────────────
1278
1279    #[test]
1280    fn test_feedback_propagation() {
1281        use rill_core::traits::algorithm::ActionContext;
1282
1283        const BUF: usize = 64;
1284        let mut builder = GraphBuilder::<f32, BUF>::new();
1285        let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
1286        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1287            NodeId(1),
1288            44100.0,
1289            2.0,
1290        )));
1291        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1292        // Signal path: source → processor → sink
1293        builder.connect_signal(src, 0, proc, 0);
1294        builder.connect_signal(proc, 0, snk, 0);
1295        // Feedback: processor output → processor input
1296        builder.connect_feedback(proc, 0, proc, 0);
1297        let graph = builder
1298            .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1299            .unwrap();
1300        let (mut nodes, topo, _) = graph.into_parts();
1301
1302        // ── Block 1: no feedback yet ──
1303        let tick1 = ClockTick::new(0, BUF as u32, 44100.0);
1304        let mut ctx = ProcessContext { clock: &tick1 };
1305        let _ = nodes[topo[0]].process_block(&mut ctx); // source generates
1306        let ctx1 = ActionContext::new(&tick1);
1307        let out_port = nodes[topo[0]].output_port(0).unwrap();
1308        out_port.propagate(out_port.buffer(), &ctx1).unwrap();
1309        let block1 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1310        assert!(
1311            (block1 - 2.0).abs() < 1e-6,
1312            "block1: 1.0×2.0=2.0, got {}",
1313            block1
1314        );
1315
1316        // ── Block 2: feedback from block1 should be mixed in ──
1317        let tick2 = ClockTick::new(BUF as u64, BUF as u32, 44100.0);
1318        let mut ctx = ProcessContext { clock: &tick2 };
1319        let _ = nodes[topo[0]].process_block(&mut ctx); // source generates again
1320        let ctx2 = ActionContext::new(&tick2);
1321        let out_port = nodes[topo[0]].output_port(0).unwrap();
1322        out_port.propagate(out_port.buffer(), &ctx2).unwrap();
1323        let block2 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1324        // pre_process: input = 1.0 (source) + 2.0 (feedback from block1) = 3.0
1325        // process: 3.0 × 2.0 = 6.0
1326        assert!(
1327            (block2 - 6.0).abs() < 1e-6,
1328            "block2: (1+2)×2=6.0, got {}",
1329            block2
1330        );
1331    }
1332
1333    // ── Test: drain_fn pattern (as used by AudioInput) ──────────────
1334
1335    #[test]
1336    fn test_drain_fn_before_propagate() {
1337        use rill_core::queues::{MpscQueue, SetParameter, SignalSource};
1338        use rill_core::traits::PortId;
1339
1340        const BUF: usize = 64;
1341        let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1342
1343        let mut builder = GraphBuilder::<f32, BUF>::new();
1344        let src = builder.add_source(Box::new(ConstantSource::new(5.0, 44100.0)));
1345        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1346            NodeId(1),
1347            44100.0,
1348            1.0,
1349        )));
1350        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1351        builder.connect_signal(src, 0, proc, 0);
1352        builder.connect_signal(proc, 0, snk, 0);
1353        let graph = builder
1354            .build(Box::new(SystemClock::with_sample_rate(44100.0)))
1355            .unwrap();
1356        let (mut nodes, topo, _) = graph.into_parts();
1357        let nodes_ptr: *mut [NodeVariant<f32, BUF>] = &mut *nodes;
1358
1359        // drain_fn — exactly as AudioInput creates it
1360        let drain_fn: Box<dyn Fn(&mut [NodeVariant<f32, BUF>])> = {
1361            let q = queue.clone();
1362            Box::new(move |nd: &mut [NodeVariant<f32, BUF>]| {
1363                while let Some(cmd) = q.pop() {
1364                    let idx = cmd.port.node_id().inner() as usize;
1365                    if idx < nd.len() {
1366                        let pid = cmd.parameter.clone();
1367                        let _ = nd[idx].set_parameter(&pid, ParamValue::Float(cmd.value));
1368                    }
1369                }
1370            })
1371        };
1372
1373        // Push command BEFORE processing
1374        let _ = queue.push(SetParameter::new(
1375            PortId::control_in(NodeId(1), 0),
1376            ParameterId::new("multiplier").unwrap(),
1377            3.0,
1378            SignalSource::Manual,
1379        ));
1380
1381        // Processing cycle exactly as AudioInput callback does:
1382        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1383
1384        // Step 1: drain
1385        #[allow(unsafe_code)]
1386        unsafe {
1387            drain_fn(&mut *nodes_ptr);
1388        }
1389
1390        // Verify parameter applied
1391        let pid = ParameterId::new("multiplier").unwrap();
1392        #[allow(unsafe_code)]
1393        let val = unsafe {
1394            (*nodes_ptr)[1]
1395                .get_parameter(&pid)
1396                .unwrap()
1397                .as_f32()
1398                .unwrap()
1399        };
1400        assert!(
1401            (val - 3.0).abs() < 1e-6,
1402            "multiplier should be 3.0, got {}",
1403            val
1404        );
1405
1406        // Step 2: source generate
1407        let mut ctx = ProcessContext { clock: &tick };
1408        #[allow(unsafe_code)]
1409        unsafe {
1410            (*nodes_ptr)[topo[0]].process_block(&mut ctx).unwrap();
1411        }
1412
1413        // Step 3: propagate
1414        let action_ctx = rill_core::traits::algorithm::ActionContext::new(&tick);
1415        #[allow(unsafe_code)]
1416        let out_port = unsafe { (*nodes_ptr)[topo[0]].output_port(0).unwrap() };
1417        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1418
1419        // Verify: source(5) × gain(3) = 15
1420        #[allow(unsafe_code)]
1421        let sink_val = unsafe {
1422            (*nodes_ptr)[topo[2]]
1423                .input_port(0)
1424                .unwrap()
1425                .buffer
1426                .as_array()[0]
1427        };
1428        assert!(
1429            (sink_val - 15.0).abs() < 1e-6,
1430            "source(5)×gain(3)=15, got {}",
1431            sink_val
1432        );
1433    }
1434}