Skip to main content

rill_graph/
graph.rs

1use crate::backend_factory::BackendFactory;
2use crate::factory::{NodeFactory, RegistryError};
3use rill_core::buffer::{Buffer, BufferRegistry, FixedBuffer, TapeLoop};
4use rill_core::math::Transcendental;
5use rill_core::queues::{MpscQueue, SetParameter};
6use rill_core::time::ClockTick;
7use rill_core::traits::active::GraphHandle;
8use rill_core::traits::port::Port;
9use rill_core::traits::ParamValue;
10use rill_core::traits::{Node, NodeId, NodeVariant, Params};
11use rill_core_actor::{ActorCell, ActorRef};
12use std::collections::{HashMap, VecDeque};
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15
16// ============================================================================
17// Internal routing metadata
18// ============================================================================
19
20// ============================================================================
21// Build Errors
22// ============================================================================
23
24/// Errors that can occur during graph construction.
25#[derive(Debug, Clone)]
26pub enum BuildError {
27    /// A cycle was detected in the signal edge graph.
28    CycleDetected,
29    /// Backend creation failed.
30    Backend(String),
31}
32
33impl std::fmt::Display for BuildError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::CycleDetected => write!(f, "graph cycle detected"),
37            Self::Backend(msg) => write!(f, "backend error: {msg}"),
38        }
39    }
40}
41
42// ============================================================================
43// Graph Builder
44// ============================================================================
45
46// ============================================================================
47// Node Storage
48// ============================================================================
49
50pub(crate) struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
51    pub(crate) node: NodeVariant<T, BUF_SIZE>,
52}
53
54// ============================================================================
55// GraphBuilder (Mutable Construction)
56// ============================================================================
57
58/// A named resource (tape loop) shared between nodes in the graph.
59#[derive(Clone)]
60pub struct GraphResource {
61    /// Unique name referenced by node parameters.
62    pub name: String,
63    /// Resource kind string (`"tape"`).
64    pub kind: String,
65    /// Capacity in samples (for `"tape"` kind).
66    pub capacity: usize,
67}
68
69/// Mutable builder for an immutable signal graph.
70///
71/// # Node factory
72///
73/// The builder holds an [`Arc<NodeFactory>`] for constructing nodes by
74/// type name, provided at construction via [`GraphBuilder::new`].
75pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
76    nodes: Vec<NodeEntry<T, BUF_SIZE>>,
77    signal_edges: Vec<(usize, usize, usize, usize)>,
78    control_edges: Vec<(usize, usize, usize, usize)>,
79    clock_edges: Vec<(usize, usize, usize, usize)>,
80    feedback_edges: Vec<(usize, usize, usize, usize)>,
81    resources: Vec<GraphResource>,
82    /// Shared node factory (required, from Runtime).
83    factory: Arc<NodeFactory<T, BUF_SIZE>>,
84    /// Shared backend factory (required, from Runtime).
85    backends: Arc<BackendFactory<T>>,
86    /// Optional backend configuration (set via [`with_backend`](Self::with_backend)).
87    backend_config: Option<BackendConfig>,
88}
89
90struct BackendConfig {
91    name: String,
92    params: HashMap<String, ParamValue>,
93}
94
95impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
96    /// Create a new empty graph builder without a node factory.
97    pub fn new(factory: Arc<NodeFactory<T, BUF_SIZE>>, backends: Arc<BackendFactory<T>>) -> Self {
98        Self {
99            nodes: Vec::new(),
100            signal_edges: Vec::new(),
101            control_edges: Vec::new(),
102            clock_edges: Vec::new(),
103            feedback_edges: Vec::new(),
104            resources: Vec::new(),
105            factory,
106            backends,
107            backend_config: None,
108        }
109    }
110
111    /// Add a node by type name using the internal factory.
112    ///
113    /// The type must have been registered in the factory before calling
114    /// this method.
115    ///
116    /// Returns the index of the newly added node.
117    ///
118    /// # Errors
119    ///
120    /// Returns [`RegistryError`] if no factory is set or the type name
121    /// is not registered.
122    pub fn add_node(&mut self, type_name: &str, params: &Params) -> Result<usize, RegistryError> {
123        let id = NodeId(self.nodes.len() as u32);
124        self.add_node_with_id(type_name, params, id)
125    }
126
127    /// Add a node with an explicit [`NodeId`].
128    ///
129    /// Like [`add_node`](Self::add_node) but uses the provided `id`.
130    /// Important for serialization where external references depend on
131    /// exact IDs.
132    pub fn add_node_with_id(
133        &mut self,
134        type_name: &str,
135        params: &Params,
136        id: NodeId,
137    ) -> Result<usize, RegistryError> {
138        let node = self.factory.construct(type_name, id, params)?;
139        let idx = self.nodes.len();
140        self.nodes.push(NodeEntry { node });
141        Ok(idx)
142    }
143
144    /// Register a named resource (tape loop, buffer, etc.).
145    pub fn add_resource(&mut self, resource: GraphResource) {
146        self.resources.push(resource);
147    }
148
149    /// Number of nodes added to the builder so far.
150    pub fn node_count(&self) -> usize {
151        self.nodes.len()
152    }
153
154    /// Access the shared backend factory (for building with external configs).
155    pub fn backend_factory(&self) -> &Arc<BackendFactory<T>> {
156        &self.backends
157    }
158
159    /// Add a source node and return its index.
160    pub fn add_source(&mut self, source: Box<dyn rill_core::traits::Source<T, BUF_SIZE>>) -> usize {
161        let idx = self.nodes.len();
162        self.nodes.push(NodeEntry {
163            node: NodeVariant::Source(source),
164        });
165        idx
166    }
167
168    /// Add a processor node and return its index.
169    pub fn add_processor(
170        &mut self,
171        processor: Box<dyn rill_core::traits::Processor<T, BUF_SIZE>>,
172    ) -> usize {
173        let idx = self.nodes.len();
174        self.nodes.push(NodeEntry {
175            node: NodeVariant::Processor(processor),
176        });
177        idx
178    }
179
180    /// Add a sink node and return its index.
181    pub fn add_sink(&mut self, sink: Box<dyn rill_core::traits::Sink<T, BUF_SIZE>>) -> usize {
182        let idx = self.nodes.len();
183        self.nodes.push(NodeEntry {
184            node: NodeVariant::Sink(sink),
185        });
186        idx
187    }
188
189    /// Add a Router node (N→M configurable routing, no DSP).
190    pub fn add_router(&mut self, router: Box<dyn rill_core::traits::Router<T, BUF_SIZE>>) -> usize {
191        let idx = self.nodes.len();
192        self.nodes.push(NodeEntry {
193            node: NodeVariant::Router(router),
194        });
195        idx
196    }
197
198    /// Connect signal ports (audio data).
199    pub fn connect_signal(
200        &mut self,
201        from_node: usize,
202        from_port: usize,
203        to_node: usize,
204        to_port: usize,
205    ) {
206        self.signal_edges
207            .push((from_node, from_port, to_node, to_port));
208    }
209
210    /// Connect control ports (modulation values).
211    pub fn connect_control(
212        &mut self,
213        from_node: usize,
214        from_port: usize,
215        to_node: usize,
216        to_port: usize,
217    ) {
218        self.control_edges
219            .push((from_node, from_port, to_node, to_port));
220    }
221
222    /// Connect clock ports (timing events).
223    pub fn connect_clock(
224        &mut self,
225        from_node: usize,
226        from_port: usize,
227        to_node: usize,
228        to_port: usize,
229    ) {
230        self.clock_edges
231            .push((from_node, from_port, to_node, to_port));
232    }
233
234    /// Connect feedback ports (delay lines, state carryover).
235    pub fn connect_feedback(
236        &mut self,
237        from_node: usize,
238        from_port: usize,
239        to_node: usize,
240        to_port: usize,
241    ) {
242        self.feedback_edges
243            .push((from_node, from_port, to_node, to_port));
244    }
245
246    /// Configure an audio backend for this builder.
247    ///
248    /// When set, [`build`](Self::build) looks for a driver node in the graph
249    /// and auto-starts it with a command queue and the given audio backend.
250    /// Without this method, the graph is purely structural (no audio I/O,
251    /// no command queue).
252    /// Configure an audio backend for this builder.
253    ///
254    /// Params are passed blindly to the backend factory — keys like
255    /// `"sample_rate"`, `"buffer_size"`, `"channels"` are interpreted
256    /// by each backend constructor.
257    pub fn with_backend(mut self, backend_name: &str, params: HashMap<String, ParamValue>) -> Self {
258        self.backend_config = Some(BackendConfig {
259            name: backend_name.to_string(),
260            params,
261        });
262        self
263    }
264
265    /// Build the graph.
266    ///
267    /// If [`with_backend`](Self::with_backend) was called before, the builder
268    /// auto-starts a driver node with a command queue and the configured
269    /// audio backend. Otherwise the graph is purely structural (no audio I/O,
270    /// no command queue).
271    pub fn build(mut self) -> Result<Graph<T, BUF_SIZE>, BuildError> {
272        let (backend_name, params) = match self.backend_config.as_ref() {
273            Some(cfg) => (Some(cfg.name.as_str()), &cfg.params),
274            None => (None, &HashMap::new()),
275        };
276        let num_nodes = self.nodes.len();
277
278        // --- adjacency for Kahn (audio edges only; feedback is not a DAG edge) ---
279        let mut in_degree = vec![0usize; num_nodes];
280        let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
281
282        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
283            in_degree[to_n] += 1;
284            out_edges[from_n].push((from_p, to_n, to_p));
285        }
286
287        // --- Kahn's algorithm ---
288        let mut queue: VecDeque<usize> = in_degree
289            .iter()
290            .enumerate()
291            .filter(|(_, &d)| d == 0)
292            .map(|(i, _)| i)
293            .collect();
294
295        let mut topo = Vec::with_capacity(num_nodes);
296        let mut indeg = in_degree;
297        while let Some(idx) = queue.pop_front() {
298            topo.push(idx);
299            for &(_, to_n, _) in &out_edges[idx] {
300                indeg[to_n] -= 1;
301                if indeg[to_n] == 0 {
302                    queue.push_back(to_n);
303                }
304            }
305        }
306
307        if topo.len() != num_nodes {
308            return Err(BuildError::CycleDetected);
309        }
310
311        // --- populate Port::downstream, downstream_input_ptrs, parent ---
312        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
313            // downstream list (serialization)
314            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
315                port.downstream.push((to_n, to_p));
316            }
317            // Prepare pointers (safe: distinct indices in static DAG).
318            let in_ptr: *mut Port<T, BUF_SIZE> = self.nodes[to_n]
319                .node
320                .input_port_mut(to_p)
321                .map(|p| p as *mut Port<T, BUF_SIZE>)
322                .unwrap_or(std::ptr::null_mut());
323            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
324            let out_ptr: *mut Port<T, BUF_SIZE> = self.nodes[from_n]
325                .node
326                .output_port_mut(from_p)
327                .map(|p| p as *mut Port<T, BUF_SIZE>)
328                .unwrap_or(std::ptr::null_mut());
329            // Assign (safe: pointers were obtained without overlapping borrows).
330            if !in_ptr.is_null() && !out_ptr.is_null() {
331                #[allow(unsafe_code)]
332                unsafe {
333                    (*in_ptr).parent = parent;
334                    (*out_ptr).downstream_input_ptrs.push(in_ptr);
335                }
336            }
337        }
338
339        // --- downstream_nodes: unique downstream node pointers ---
340        for &(from_n, from_p, to_n, _) in &self.signal_edges {
341            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
342            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
343                let ptr_val = parent as usize;
344                let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
345                if !already {
346                    port.downstream_nodes.push(parent);
347                }
348            }
349        }
350
351        // --- upstream_buffer: zero-copy routing for 1:1 and fan-out ---
352        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
353            let upstream = self.nodes[from_n]
354                .node
355                .output_port(from_p)
356                .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
357            if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
358                if port.upstream_buffer.is_none() {
359                    // First upstream: set zero-copy pointer
360                    port.upstream_buffer = upstream;
361                } else {
362                    // Fan-in: copy-based fallback
363                    port.upstream_buffer = None;
364                }
365            }
366        }
367
368        // --- enable feedback buffers on both output and input ports ---
369        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
370            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
371                port.feedback_buffer = Some(FixedBuffer::new());
372                port.feedback_downstream.push((to_n, to_p));
373            }
374            if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
375                port.feedback_buffer = Some(FixedBuffer::new());
376            }
377        }
378        // --- populate Port::feedback_ptrs on output ports ---
379        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
380            let ptr = self.nodes[to_n]
381                .node
382                .input_port(to_p)
383                .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
384                .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
385            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
386                if let Some(p) = ptr {
387                    port.feedback_ptrs.push(p);
388                }
389            }
390        }
391
392        let sr = params
393            .get("sample_rate")
394            .and_then(|v| v.as_i32())
395            .unwrap_or(44100) as f32;
396
397        // Allocate named buffers (tape loops, etc.) from resource definitions.
398        let mut buffers = BufferRegistry::new();
399        for r in &self.resources {
400            if r.kind == "tape" {
401                if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
402                    buffers.register(&r.name, Box::new(tape));
403                }
404            }
405        }
406
407        // Resolve resources — each node that needs a shared buffer
408        // (e.g. WriteHead, ReadHead) looks it up by name and caches the
409        // pointer.  This is a single‑threaded, one‑time setup step.
410        for entry in &mut self.nodes {
411            entry.node.resolve_resources(&buffers);
412        }
413
414        // Resolve audio backend pointer for I/O nodes.
415        let backend_box = if let Some(name) = backend_name {
416            let b = self
417                .backends
418                .create(name, params)
419                .map_err(BuildError::Backend)?;
420            let ptr: *mut dyn rill_core::io::IoBackend<T> = &*b
421                as *const dyn rill_core::io::IoBackend<T>
422                as *mut dyn rill_core::io::IoBackend<T>;
423            for entry in &mut self.nodes {
424                entry.node.resolve_backend(ptr);
425            }
426            Some(b)
427        } else {
428            None
429        };
430
431        let mut nodes: Vec<NodeVariant<T, BUF_SIZE>> =
432            self.nodes.into_iter().map(|e| e.node).collect();
433
434        // Auto-start driver node (registers process callback on backend).
435        let cmd_queue = Arc::new(MpscQueue::<SetParameter>::with_capacity(64));
436        let have_queue = if let Some(ref _backend) = backend_box {
437            let driver_idx = nodes
438                .iter()
439                .position(|n| {
440                    let name = n.metadata().name;
441                    name == "AudioInput" || name == "Input"
442                })
443                .or_else(|| {
444                    nodes.iter().position(|n| {
445                        let name = n.metadata().name;
446                        name == "AudioOutput" || name == "Output"
447                    })
448                });
449            if let Some(driver_idx) = driver_idx {
450                let nodes_ptr = nodes.as_mut_ptr();
451                let len = nodes.len();
452                let source_idx = topo[0];
453                let queue_ptr: *const MpscQueue<SetParameter> = Arc::as_ptr(&cmd_queue);
454                let handle = GraphHandle {
455                    nodes: nodes_ptr as *mut u8,
456                    len,
457                    source_idx,
458                    sample_rate: sr,
459                    queue: queue_ptr,
460                };
461                nodes[driver_idx].start(handle);
462                true
463            } else {
464                false
465            }
466        } else {
467            false
468        };
469        let command_queue = if have_queue { Some(cmd_queue) } else { None };
470
471        let owned_buffers = buffers.into_inner();
472
473        let allocated = self.resources.clone();
474
475        Ok(Graph {
476            nodes,
477            topo_order: topo,
478            resources: allocated,
479            current_tick: ClockTick::new(0, BUF_SIZE as u32, sr),
480            buffers: owned_buffers,
481            backend: backend_box,
482            command_queue,
483        })
484    }
485}
486
487// ============================================================================
488// Graph (Static DAG)
489// ============================================================================
490
491/// Immutable signal graph with static DAG topology.
492///
493/// Once built the graph cannot be modified. The graph owns no processing
494/// logic — it is a pure topology description. Processing is driven by
495/// port-level methods (`pre_process`, `snapshot_feedback`, `propagate`)
496/// called from external code (e.g. a real-time signal callback or an
497/// offline renderer).
498pub struct Graph<T: Transcendental, const BUF_SIZE: usize> {
499    nodes: Vec<NodeVariant<T, BUF_SIZE>>,
500    topo_order: Vec<usize>,
501    current_tick: ClockTick,
502    /// Resource metadata (name, kind, capacity) for serialization.
503    pub(crate) resources: Vec<GraphResource>,
504    /// Named buffers (tape loops, etc.) shared between nodes.
505    #[allow(dead_code)]
506    buffers: Vec<Box<dyn Buffer<T> + Send>>,
507    /// Shared audio backend (alive for the graph's lifetime).
508    #[allow(dead_code)]
509    backend: Option<Box<dyn rill_core::io::IoBackend<T>>>,
510    /// Command queue for sending parameters from control to audio thread.
511    command_queue: Option<Arc<MpscQueue<SetParameter>>>,
512}
513
514impl<T: Transcendental, const BUF_SIZE: usize> Graph<T, BUF_SIZE> {
515    // ========================================================================
516    // Accessors
517    // ========================================================================
518
519    /// Borrow the node array.
520    pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
521        &self.nodes
522    }
523
524    /// Mutably borrow the node array.
525    pub fn nodes_mut(&mut self) -> &mut [NodeVariant<T, BUF_SIZE>] {
526        &mut self.nodes
527    }
528
529    /// Return the current clock tick.
530    pub fn current_tick(&self) -> ClockTick {
531        self.current_tick
532    }
533
534    /// Return the number of nodes in the graph.
535    pub fn node_count(&self) -> usize {
536        self.nodes.len()
537    }
538
539    /// Return the topological ordering of node indices.
540    pub fn topo_order(&self) -> &[usize] {
541        &self.topo_order
542    }
543
544    #[allow(dead_code)]
545    pub(crate) fn sample_rate(&self) -> f32 {
546        self.current_tick.sample_rate
547    }
548
549    /// Access the named resources (tape loops, etc.) allocated for this graph.
550    #[allow(dead_code)]
551    pub fn resources(&self) -> &[GraphResource] {
552        &self.resources
553    }
554
555    /// Return a reference to the audio backend, if one was configured.
556    #[allow(dead_code)]
557    pub(crate) fn backend_ref(&self) -> Option<&dyn rill_core::io::IoBackend<T>> {
558        self.backend.as_deref()
559    }
560
561    /// Run the audio backend until `running` becomes false.
562    ///
563    /// For blocking backends (ALSA, PipeWire) this blocks inside
564    /// `backend.run()`. For non-blocking backends (CPAL, JACK) it
565    /// parks after setup. An external signal must unpark the thread
566    /// after setting `running` to false.
567    pub fn run(&self, running: Arc<AtomicBool>) -> Result<(), String> {
568        if let Some(ref backend) = self.backend {
569            backend.run(running.clone())?;
570            while running.load(Ordering::Acquire) {
571                std::thread::park();
572            }
573            backend.stop()
574        } else {
575            Ok(())
576        }
577    }
578
579    /// Obtain an [`ActorRef`] for sending commands to this graph.
580    ///
581    /// The returned handle holds a weak reference — when the `Graph` is
582    /// dropped, all subsequent `send` calls route to dead letters.
583    /// Returns `None` if no audio backend was configured (no queue created).
584    pub fn handle(&self) -> Option<ActorRef<SetParameter>> {
585        let mailbox = self.command_queue.as_ref()?;
586        Some(ActorRef::new(mailbox))
587    }
588
589    /// Consume the graph and return its owned parts (test only).
590    #[cfg(test)]
591    pub fn into_parts(
592        self,
593    ) -> (
594        Vec<NodeVariant<T, BUF_SIZE>>,
595        Vec<usize>,
596        ClockTick,
597        Vec<Box<dyn Buffer<T> + Send>>,
598    ) {
599        let Self {
600            nodes,
601            topo_order,
602            current_tick,
603            resources: _,
604            buffers,
605            backend: _,
606            command_queue: _,
607        } = self;
608        (nodes, topo_order, current_tick, buffers)
609    }
610}
611
612// ============================================================================
613// ActorCell implementation
614// ============================================================================
615
616impl<T: Transcendental, const BUF_SIZE: usize> ActorCell for Graph<T, BUF_SIZE> {
617    type Msg = SetParameter;
618
619    /// Process a single parameter command by writing to the target node.
620    fn receive(&mut self, msg: SetParameter) {
621        let idx = msg.port.node_id().inner() as usize;
622        if idx < self.nodes.len() {
623            let _ = self.nodes[idx].set_parameter(&msg.parameter, msg.value);
624        }
625    }
626}
627
628#[cfg(test)]
629mod tests {
630    use super::*;
631    use rill_core::math::Transcendental;
632    use rill_core::time::ClockTick;
633    use rill_core::traits::active::ActiveNode;
634    use rill_core::traits::algorithm::ActionContext;
635    use rill_core::traits::processable::{ProcessContext, Processable};
636    use rill_core::traits::{
637        Node, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
638        PortDirection, PortId, ProcessResult, Processor, Sink, Source,
639    };
640    use std::sync::Arc;
641
642    /// Create a test builder with empty factories.
643    fn test_builder<const B: usize>() -> GraphBuilder<f32, B> {
644        GraphBuilder::new(
645            Arc::new(NodeFactory::new()),
646            Arc::new(BackendFactory::new()),
647        )
648    }
649
650    // ------------------------------------------------------------------------
651    // Mock: ConstantSource — fills output with a constant value
652    // ------------------------------------------------------------------------
653    struct ConstantSource<T: Transcendental, const BUF_SIZE: usize> {
654        value: T,
655        state: NodeState<T, BUF_SIZE>,
656        outputs: Vec<Port<T, BUF_SIZE>>,
657    }
658
659    impl<T: Transcendental, const BUF_SIZE: usize> ConstantSource<T, BUF_SIZE> {
660        fn new(value: T, sample_rate: f32) -> Self {
661            let mut outputs = Vec::with_capacity(1);
662            outputs.push(Port {
663                id: PortId::signal_out(NodeId(0), 0),
664                name: "output".into(),
665                direction: PortDirection::Output,
666                action: None,
667                pending_command: None,
668                buffer: Default::default(),
669                feedback_buffer: None,
670                downstream: Vec::new(),
671                feedback_downstream: Vec::new(),
672                feedback_ptrs: Vec::new(),
673                downstream_input_ptrs: Vec::new(),
674                downstream_nodes: Vec::new(),
675                parent: std::ptr::null_mut(),
676                upstream_buffer: None,
677            });
678            Self {
679                value,
680                state: NodeState::new(sample_rate),
681                outputs,
682            }
683        }
684    }
685
686    impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
687        fn metadata(&self) -> NodeMetadata {
688            NodeMetadata {
689                type_name: None,
690                name: "ConstantSource".into(),
691                category: NodeCategory::Source,
692                description: String::new(),
693                author: String::new(),
694                version: "1.0".into(),
695                signal_inputs: 0,
696                signal_outputs: 1,
697                control_inputs: 0,
698                control_outputs: 0,
699                clock_inputs: 0,
700                clock_outputs: 0,
701                feedback_ports: 0,
702                parameters: vec![],
703            }
704        }
705        fn init(&mut self, _sample_rate: f32) {}
706        fn reset(&mut self) {}
707        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
708            None
709        }
710        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
711            Ok(())
712        }
713        fn id(&self) -> NodeId {
714            NodeId(0)
715        }
716        fn set_id(&mut self, _id: NodeId) {}
717        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
718            None
719        }
720        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
721            None
722        }
723        fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
724            self.outputs.get(index)
725        }
726        fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
727            self.outputs.get_mut(index)
728        }
729        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
730            None
731        }
732        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
733            None
734        }
735        fn state(&self) -> &NodeState<T, BUF_SIZE> {
736            &self.state
737        }
738        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
739            &mut self.state
740        }
741    }
742
743    impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
744        fn generate(
745            &mut self,
746            _clock: &ClockTick,
747            _control_inputs: &[T],
748            _clock_inputs: &[ClockTick],
749        ) -> ProcessResult<()> {
750            let out = self.outputs[0].buffer.as_mut_array();
751            for sample in out.iter_mut() {
752                *sample = self.value;
753            }
754            Ok(())
755        }
756        fn num_signal_outputs(&self) -> usize {
757            1
758        }
759    }
760
761    impl<const BUF_SIZE: usize> ActiveNode for ConstantSource<f32, BUF_SIZE> {
762        fn start(&mut self, handle: GraphHandle) {
763            #[allow(unsafe_code)]
764            unsafe {
765                let nodes = std::slice::from_raw_parts_mut(
766                    handle.nodes as *mut NodeVariant<f32, BUF_SIZE>,
767                    handle.len,
768                );
769                let idx = handle.source_idx;
770                let tick = ClockTick::new(0, BUF_SIZE as u32, handle.sample_rate);
771                let mut ctx = ProcessContext { clock: &tick };
772                let _ = nodes[idx].process_block(&mut ctx);
773                let action_ctx = ActionContext::new(&tick);
774                for po in 0..nodes[idx].num_signal_outputs() {
775                    if let Some(port) = nodes[idx].output_port(po) {
776                        let _ = port.propagate(port.buffer(), &action_ctx);
777                    }
778                }
779            }
780        }
781        fn stop(&mut self) {}
782    }
783
784    // ------------------------------------------------------------------------
785    // Mock: NoopProcessor — minimal processor for topology tests
786    // ------------------------------------------------------------------------
787    struct NoopProcessor<T: Transcendental, const BUF_SIZE: usize> {
788        state: NodeState<T, BUF_SIZE>,
789    }
790
791    impl<T: Transcendental, const BUF_SIZE: usize> NoopProcessor<T, BUF_SIZE> {
792        fn new(sample_rate: f32) -> Self {
793            Self {
794                state: NodeState::new(sample_rate),
795            }
796        }
797    }
798
799    impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for NoopProcessor<T, BUF_SIZE> {
800        fn metadata(&self) -> NodeMetadata {
801            NodeMetadata {
802                type_name: None,
803                name: "NoopProcessor".into(),
804                category: NodeCategory::Processor,
805                description: String::new(),
806                author: String::new(),
807                version: "1.0".into(),
808                signal_inputs: 0,
809                signal_outputs: 0,
810                control_inputs: 0,
811                control_outputs: 0,
812                clock_inputs: 0,
813                clock_outputs: 0,
814                feedback_ports: 0,
815                parameters: vec![],
816            }
817        }
818        fn init(&mut self, _sample_rate: f32) {}
819        fn reset(&mut self) {}
820        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
821            None
822        }
823        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
824            Ok(())
825        }
826        fn id(&self) -> NodeId {
827            NodeId(1)
828        }
829        fn set_id(&mut self, _id: NodeId) {}
830        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
831            None
832        }
833        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
834            None
835        }
836        fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
837            None
838        }
839        fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
840            None
841        }
842        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
843            None
844        }
845        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
846            None
847        }
848        fn state(&self) -> &NodeState<T, BUF_SIZE> {
849            &self.state
850        }
851        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
852            &mut self.state
853        }
854    }
855
856    impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
857        for NoopProcessor<T, BUF_SIZE>
858    {
859        fn process(
860            &mut self,
861            _clock: &ClockTick,
862            _signal_inputs: &[&[T; BUF_SIZE]],
863            _control_inputs: &[T],
864            _clock_inputs: &[ClockTick],
865            _feedback_inputs: &[&[T; BUF_SIZE]],
866        ) -> ProcessResult<()> {
867            Ok(())
868        }
869    }
870
871    // ------------------------------------------------------------------------
872    // Mock: NoopSink — minimal sink for topology tests
873    // ------------------------------------------------------------------------
874    struct NoopSink<T: Transcendental, const BUF_SIZE: usize> {
875        state: NodeState<T, BUF_SIZE>,
876    }
877
878    impl<T: Transcendental, const BUF_SIZE: usize> NoopSink<T, BUF_SIZE> {
879        fn new(sample_rate: f32) -> Self {
880            Self {
881                state: NodeState::new(sample_rate),
882            }
883        }
884    }
885
886    impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
887        fn metadata(&self) -> NodeMetadata {
888            NodeMetadata {
889                type_name: None,
890                name: "NoopSink".into(),
891                category: NodeCategory::Sink,
892                description: String::new(),
893                author: String::new(),
894                version: "1.0".into(),
895                signal_inputs: 0,
896                signal_outputs: 0,
897                control_inputs: 0,
898                control_outputs: 0,
899                clock_inputs: 0,
900                clock_outputs: 0,
901                feedback_ports: 0,
902                parameters: vec![],
903            }
904        }
905        fn init(&mut self, _sample_rate: f32) {}
906        fn reset(&mut self) {}
907        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
908            None
909        }
910        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
911            Ok(())
912        }
913        fn id(&self) -> NodeId {
914            NodeId(2)
915        }
916        fn set_id(&mut self, _id: NodeId) {}
917        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
918            None
919        }
920        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
921            None
922        }
923        fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
924            None
925        }
926        fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
927            None
928        }
929        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
930            None
931        }
932        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
933            None
934        }
935        fn state(&self) -> &NodeState<T, BUF_SIZE> {
936            &self.state
937        }
938        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
939            &mut self.state
940        }
941    }
942
943    impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
944        fn consume(
945            &mut self,
946            _clock: &ClockTick,
947            _signal_inputs: &[&[T; BUF_SIZE]],
948            _control_inputs: &[T],
949            _clock_inputs: &[ClockTick],
950            _feedback_inputs: &[&[T; BUF_SIZE]],
951        ) -> ProcessResult<()> {
952            Ok(())
953        }
954    }
955
956    // ========================================================================
957    // Tests
958    // ========================================================================
959
960    #[test]
961    fn test_topo_order_correct() {
962        const BUF: usize = 64;
963        let mut builder = test_builder::<BUF>();
964
965        let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
966        let proc = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
967        let sink = builder.add_sink(Box::new(NoopSink::new(44100.0)));
968
969        builder.connect_signal(src, 0, proc, 0);
970        builder.connect_signal(proc, 0, sink, 0);
971
972        let graph = builder.build().expect("build failed");
973
974        let order = graph.topo_order();
975        let src_pos = order.iter().position(|&i| i == src).unwrap();
976        let proc_pos = order.iter().position(|&i| i == proc).unwrap();
977        let sink_pos = order.iter().position(|&i| i == sink).unwrap();
978        assert!(src_pos < proc_pos);
979        assert!(proc_pos < sink_pos);
980    }
981
982    #[test]
983    fn test_cycle_detection() {
984        const BUF: usize = 64;
985        let mut builder = test_builder::<BUF>();
986
987        let a = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
988        let b = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
989
990        builder.connect_signal(a, 0, b, 0);
991        builder.connect_signal(b, 0, a, 0);
992
993        let result = builder.build();
994        assert!(matches!(result, Err(BuildError::CycleDetected)));
995    }
996
997    #[test]
998    fn test_source_node_create() {
999        const BUF: usize = 64;
1000        let mut builder = test_builder::<BUF>();
1001        let idx = builder.add_source(Box::new(ConstantSource::new(0.5, 44100.0)));
1002        let graph = builder.build().expect("build failed");
1003        assert_eq!(graph.node_count(), 1);
1004        assert_eq!(graph.topo_order(), &[idx]);
1005    }
1006
1007    // ========================================================================
1008    // Port-based propagation tests
1009    // ========================================================================
1010
1011    /// Simple Sink that captures its first input port for inspection.
1012    pub struct TestSink<T: Transcendental, const BUF_SIZE: usize> {
1013        id: NodeId,
1014        state: NodeState<T, BUF_SIZE>,
1015        pub inputs: Vec<Port<T, BUF_SIZE>>,
1016        last_value: T,
1017    }
1018
1019    impl<T: Transcendental, const BUF_SIZE: usize> TestSink<T, BUF_SIZE> {
1020        fn new(id: NodeId, sample_rate: f32) -> Self {
1021            let mut inputs = Vec::new();
1022            inputs.push(Port::input(id, 0, "in"));
1023            Self {
1024                id,
1025                state: NodeState::new(sample_rate),
1026                inputs,
1027                last_value: T::ZERO,
1028            }
1029        }
1030        #[allow(dead_code)]
1031        fn last_value(&self) -> T {
1032            self.last_value
1033        }
1034    }
1035
1036    impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
1037        fn metadata(&self) -> NodeMetadata {
1038            NodeMetadata {
1039                type_name: None,
1040                name: "TestSink".into(),
1041                category: NodeCategory::Sink,
1042                description: String::new(),
1043                author: String::new(),
1044                version: "1.0".into(),
1045                signal_inputs: 1,
1046                signal_outputs: 0,
1047                control_inputs: 0,
1048                control_outputs: 0,
1049                clock_inputs: 0,
1050                clock_outputs: 0,
1051                feedback_ports: 0,
1052                parameters: vec![],
1053            }
1054        }
1055        fn init(&mut self, _: f32) {}
1056        fn reset(&mut self) {
1057            self.state.sample_pos = 0;
1058            self.state.blocks_processed = 0;
1059        }
1060        fn id(&self) -> NodeId {
1061            self.id
1062        }
1063        fn set_id(&mut self, id: NodeId) {
1064            self.id = id;
1065        }
1066        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
1067            None
1068        }
1069        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
1070            Ok(())
1071        }
1072        fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1073            self.inputs.get(i)
1074        }
1075        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1076            self.inputs.get_mut(i)
1077        }
1078        fn output_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1079            None
1080        }
1081        fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1082            None
1083        }
1084        fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1085            None
1086        }
1087        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1088            None
1089        }
1090        fn num_signal_inputs(&self) -> usize {
1091            1
1092        }
1093        fn num_signal_outputs(&self) -> usize {
1094            0
1095        }
1096        fn state(&self) -> &NodeState<T, BUF_SIZE> {
1097            &self.state
1098        }
1099        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
1100            &mut self.state
1101        }
1102    }
1103
1104    impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
1105        fn consume(
1106            &mut self,
1107            _clock: &ClockTick,
1108            _signal_inputs: &[&[T; BUF_SIZE]],
1109            _control_inputs: &[T],
1110            _clock_inputs: &[ClockTick],
1111            _feedback_inputs: &[&[T; BUF_SIZE]],
1112        ) -> ProcessResult<()> {
1113            if let Some(port) = self.inputs.first() {
1114                self.last_value = port.buffer.as_array()[0];
1115            }
1116            self.state.advance();
1117            Ok(())
1118        }
1119    }
1120
1121    /// Processor with a `multiplier` parameter. Output = input × multiplier.
1122    pub struct GainProcessor<T: Transcendental, const BUF_SIZE: usize> {
1123        id: NodeId,
1124        state: NodeState<T, BUF_SIZE>,
1125        pub inputs: Vec<Port<T, BUF_SIZE>>,
1126        pub outputs: Vec<Port<T, BUF_SIZE>>,
1127        pub multiplier: T,
1128    }
1129
1130    impl<T: Transcendental, const BUF_SIZE: usize> GainProcessor<T, BUF_SIZE> {
1131        fn new(id: NodeId, sample_rate: f32, multiplier: T) -> Self {
1132            let mut inputs = Vec::new();
1133            inputs.push(Port::input(id, 0, "in"));
1134            let mut outputs = Vec::new();
1135            outputs.push(Port::output(id, 0, "out"));
1136            Self {
1137                id,
1138                state: NodeState::new(sample_rate),
1139                inputs,
1140                outputs,
1141                multiplier,
1142            }
1143        }
1144    }
1145
1146    impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for GainProcessor<T, BUF_SIZE> {
1147        fn metadata(&self) -> NodeMetadata {
1148            NodeMetadata {
1149                type_name: None,
1150                name: "GainProcessor".into(),
1151                category: NodeCategory::Processor,
1152                description: String::new(),
1153                author: String::new(),
1154                version: "1.0".into(),
1155                signal_inputs: 1,
1156                signal_outputs: 1,
1157                control_inputs: 0,
1158                control_outputs: 0,
1159                clock_inputs: 0,
1160                clock_outputs: 0,
1161                feedback_ports: 0,
1162                parameters: vec![],
1163            }
1164        }
1165        fn init(&mut self, _: f32) {}
1166        fn reset(&mut self) {
1167            self.state.sample_pos = 0;
1168            self.state.blocks_processed = 0;
1169        }
1170        fn id(&self) -> NodeId {
1171            self.id
1172        }
1173        fn set_id(&mut self, id: NodeId) {
1174            self.id = id;
1175        }
1176        fn get_parameter(&self, id: &ParameterId) -> Option<ParamValue> {
1177            match id.as_str() {
1178                "multiplier" => Some(ParamValue::Float(self.multiplier.to_f32())),
1179                _ => None,
1180            }
1181        }
1182        fn set_parameter(&mut self, id: &ParameterId, value: ParamValue) -> ProcessResult<()> {
1183            match id.as_str() {
1184                "multiplier" => {
1185                    if let Some(v) = value.as_f32() {
1186                        self.multiplier = T::from_f32(v);
1187                        Ok(())
1188                    } else {
1189                        Err(rill_core::ProcessError::parameter("expected float"))
1190                    }
1191                }
1192                _ => Err(rill_core::ProcessError::parameter("unknown")),
1193            }
1194        }
1195        fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1196            self.inputs.get(i)
1197        }
1198        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1199            self.inputs.get_mut(i)
1200        }
1201        fn output_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1202            self.outputs.get(i)
1203        }
1204        fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1205            self.outputs.get_mut(i)
1206        }
1207        fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1208            None
1209        }
1210        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1211            None
1212        }
1213        fn num_signal_inputs(&self) -> usize {
1214            1
1215        }
1216        fn num_signal_outputs(&self) -> usize {
1217            1
1218        }
1219        fn state(&self) -> &NodeState<T, BUF_SIZE> {
1220            &self.state
1221        }
1222        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
1223            &mut self.state
1224        }
1225    }
1226
1227    impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
1228        for GainProcessor<T, BUF_SIZE>
1229    {
1230        fn process(
1231            &mut self,
1232            _clock: &ClockTick,
1233            _signal_inputs: &[&[T; BUF_SIZE]],
1234            _control_inputs: &[T],
1235            _clock_inputs: &[ClockTick],
1236            _feedback_inputs: &[&[T; BUF_SIZE]],
1237        ) -> ProcessResult<()> {
1238            let inp = *self.inputs[0].buffer.as_array();
1239            let out = self.outputs[0].buffer.as_mut_array();
1240            for i in 0..BUF_SIZE {
1241                out[i] = inp[i] * self.multiplier;
1242            }
1243            self.state.advance();
1244            Ok(())
1245        }
1246        fn latency(&self) -> usize {
1247            0
1248        }
1249    }
1250
1251    // ── Test: Source → Sink via GraphBuilder ────────────────────────
1252
1253    #[test]
1254    fn test_graph_source_to_sink() {
1255        use rill_core::traits::algorithm::ActionContext;
1256        use rill_core::traits::processable::{ProcessContext, Processable};
1257        const BUF: usize = 64;
1258        let mut builder = test_builder::<BUF>();
1259        let src = builder.add_source(Box::new(ConstantSource::new(42.0, 44100.0)));
1260        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(1), 44100.0)));
1261        builder.connect_signal(src, 0, snk, 0);
1262        let graph = builder.build().unwrap();
1263        let (mut nodes, topo, _, _bufs) = graph.into_parts();
1264        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1265
1266        let mut ctx = ProcessContext { clock: &tick };
1267        let _ = nodes[topo[0]].process_block(&mut ctx);
1268        let action_ctx = ActionContext::new(&tick);
1269        let out_port = nodes[topo[0]].output_port(0).unwrap();
1270        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1271
1272        let sink_val = nodes[topo[1]].input_port(0).unwrap().buffer.as_array()[0];
1273        assert_eq!(sink_val, 42.0, "sink should receive source value");
1274    }
1275
1276    // ── Test: Source → Processor → Sink via GraphBuilder ────────────
1277
1278    #[test]
1279    fn test_graph_source_proc_sink() {
1280        use rill_core::traits::algorithm::ActionContext;
1281        use rill_core::traits::processable::{ProcessContext, Processable};
1282        const BUF: usize = 64;
1283        let mut builder = test_builder::<BUF>();
1284        let src = builder.add_source(Box::new(ConstantSource::new(10.0, 44100.0)));
1285        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1286            NodeId(1),
1287            44100.0,
1288            3.0,
1289        )));
1290        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1291        builder.connect_signal(src, 0, proc, 0);
1292        builder.connect_signal(proc, 0, snk, 0);
1293        let graph = builder.build().unwrap();
1294        let (mut nodes, topo, _, _bufs) = graph.into_parts();
1295        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1296
1297        let mut ctx = ProcessContext { clock: &tick };
1298        let _ = nodes[topo[0]].process_block(&mut ctx);
1299        let action_ctx = ActionContext::new(&tick);
1300        let out_port = nodes[topo[0]].output_port(0).unwrap();
1301        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1302
1303        let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1304        assert!(
1305            (sink_val - 30.0).abs() < 1e-6,
1306            "source(10)×gain(3)=30, got {}",
1307            sink_val
1308        );
1309    }
1310
1311    // ── Test: Command queue drain ───────────────────────────────────
1312
1313    #[test]
1314    fn test_command_queue_drain() {
1315        use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1316        use rill_core::traits::PortId;
1317
1318        const BUF: usize = 64;
1319        let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1320
1321        let mut builder = test_builder::<BUF>();
1322        builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1323            NodeId(0),
1324            44100.0,
1325            2.0,
1326        )));
1327        let graph = builder.build().unwrap();
1328        let (mut nodes, _, _, _bufs) = graph.into_parts();
1329
1330        let _ = queue.push(SetParameter::new(
1331            PortId::control_in(NodeId(0), 0),
1332            ParameterId::new("multiplier").unwrap(),
1333            ParamValue::Float(5.0),
1334            SignalOrigin::Manual,
1335        ));
1336
1337        while let Some(cmd) = queue.pop() {
1338            let idx = cmd.port.node_id().inner() as usize;
1339            let pid = cmd.parameter.clone();
1340            let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1341        }
1342
1343        let pid = ParameterId::new("multiplier").unwrap();
1344        let val = nodes[0].get_parameter(&pid).unwrap().as_f32().unwrap();
1345        assert!(
1346            (val - 5.0).abs() < 1e-6,
1347            "multiplier should be 5.0, got {}",
1348            val
1349        );
1350    }
1351
1352    // ── Test: Queue + propagate ─────────────────────────────────────
1353
1354    #[test]
1355    fn test_command_then_propagate() {
1356        use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1357        use rill_core::traits::algorithm::ActionContext;
1358        use rill_core::traits::processable::{ProcessContext, Processable};
1359        use rill_core::traits::PortId;
1360
1361        const BUF: usize = 64;
1362        let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1363
1364        let mut builder = test_builder::<BUF>();
1365        let src = builder.add_source(Box::new(ConstantSource::new(7.0, 44100.0)));
1366        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1367            NodeId(1),
1368            44100.0,
1369            2.0,
1370        )));
1371        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1372        builder.connect_signal(src, 0, proc, 0);
1373        builder.connect_signal(proc, 0, snk, 0);
1374        let graph = builder.build().unwrap();
1375        let (mut nodes, topo, _, _bufs) = graph.into_parts();
1376        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1377
1378        // Push command and drain
1379        let _ = queue.push(SetParameter::new(
1380            PortId::control_in(NodeId(1), 0),
1381            ParameterId::new("multiplier").unwrap(),
1382            ParamValue::Float(4.0),
1383            SignalOrigin::Manual,
1384        ));
1385        while let Some(cmd) = queue.pop() {
1386            let idx = cmd.port.node_id().inner() as usize;
1387            let pid = cmd.parameter.clone();
1388            let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1389        }
1390
1391        // Verify multiplier changed
1392        let pid = ParameterId::new("multiplier").unwrap();
1393        let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
1394        assert!((val - 4.0).abs() < 1e-6);
1395
1396        // Process + propagate
1397        let mut ctx = ProcessContext { clock: &tick };
1398        let _ = nodes[topo[0]].process_block(&mut ctx);
1399        let action_ctx = ActionContext::new(&tick);
1400        let out_port = nodes[topo[0]].output_port(0).unwrap();
1401        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1402
1403        let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1404        assert!(
1405            (sink_val - 28.0).abs() < 1e-6,
1406            "source(7)×gain(4)=28, got {}",
1407            sink_val
1408        );
1409    }
1410
1411    // ── Test: Feedback propagation ──────────────────────────────────
1412
1413    #[test]
1414    fn test_feedback_propagation() {
1415        use rill_core::traits::algorithm::ActionContext;
1416        use rill_core::traits::processable::{ProcessContext, Processable};
1417
1418        const BUF: usize = 64;
1419        let mut builder = test_builder::<BUF>();
1420        let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
1421        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1422            NodeId(1),
1423            44100.0,
1424            2.0,
1425        )));
1426        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1427        builder.connect_signal(src, 0, proc, 0);
1428        builder.connect_signal(proc, 0, snk, 0);
1429        builder.connect_feedback(proc, 0, proc, 0);
1430        let graph = builder.build().unwrap();
1431        let (mut nodes, topo, _, _bufs) = graph.into_parts();
1432
1433        // ── Block 1: no feedback yet ──
1434        let tick1 = ClockTick::new(0, BUF as u32, 44100.0);
1435        let mut ctx = ProcessContext { clock: &tick1 };
1436        let _ = nodes[topo[0]].process_block(&mut ctx);
1437        let ctx1 = ActionContext::new(&tick1);
1438        let out_port = nodes[topo[0]].output_port(0).unwrap();
1439        out_port.propagate(out_port.buffer(), &ctx1).unwrap();
1440        let block1 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1441        assert!(
1442            (block1 - 2.0).abs() < 1e-6,
1443            "block1: 1.0×2.0=2.0, got {}",
1444            block1
1445        );
1446
1447        // ── Block 2: feedback from block1 should be mixed in ──
1448        let tick2 = ClockTick::new(BUF as u64, BUF as u32, 44100.0);
1449        let mut ctx = ProcessContext { clock: &tick2 };
1450        let _ = nodes[topo[0]].process_block(&mut ctx);
1451        let ctx2 = ActionContext::new(&tick2);
1452        let out_port = nodes[topo[0]].output_port(0).unwrap();
1453        out_port.propagate(out_port.buffer(), &ctx2).unwrap();
1454        let block2 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1455        assert!(
1456            (block2 - 6.0).abs() < 1e-6,
1457            "block2: (1+2)×2=6.0, got {}",
1458            block2
1459        );
1460    }
1461
1462    // ── Test: drain_fn pattern (as used by AudioInput) ──────────────
1463
1464    #[test]
1465    fn test_drain_fn_before_propagate() {
1466        use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1467        use rill_core::traits::algorithm::ActionContext;
1468        use rill_core::traits::processable::{ProcessContext, Processable};
1469        use rill_core::traits::PortId;
1470
1471        const BUF: usize = 64;
1472        let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1473
1474        let mut builder = test_builder::<BUF>();
1475        let src = builder.add_source(Box::new(ConstantSource::new(5.0, 44100.0)));
1476        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1477            NodeId(1),
1478            44100.0,
1479            1.0,
1480        )));
1481        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1482        builder.connect_signal(src, 0, proc, 0);
1483        builder.connect_signal(proc, 0, snk, 0);
1484        let graph = builder.build().unwrap();
1485        let (mut nodes, topo, _, _bufs) = graph.into_parts();
1486        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1487
1488        // Push command BEFORE processing
1489        let _ = queue.push(SetParameter::new(
1490            PortId::control_in(NodeId(1), 0),
1491            ParameterId::new("multiplier").unwrap(),
1492            ParamValue::Float(3.0),
1493            SignalOrigin::Manual,
1494        ));
1495
1496        // Drain
1497        while let Some(cmd) = queue.pop() {
1498            let idx = cmd.port.node_id().inner() as usize;
1499            let pid = cmd.parameter.clone();
1500            let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1501        }
1502
1503        // Verify parameter applied
1504        let pid = ParameterId::new("multiplier").unwrap();
1505        let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
1506        assert!(
1507            (val - 3.0).abs() < 1e-6,
1508            "multiplier should be 3.0, got {}",
1509            val
1510        );
1511
1512        // Source generate
1513        let mut ctx = ProcessContext { clock: &tick };
1514        let _ = nodes[topo[0]].process_block(&mut ctx).unwrap();
1515
1516        // Propagate
1517        let action_ctx = ActionContext::new(&tick);
1518        let out_port = nodes[topo[0]].output_port(0).unwrap();
1519        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1520
1521        // Verify: source(5) × gain(3) = 15
1522        let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1523        assert!(
1524            (sink_val - 15.0).abs() < 1e-6,
1525            "source(5)×gain(3)=15, got {}",
1526            sink_val
1527        );
1528    }
1529}