Skip to main content

rill_graph/
graph.rs

1use crate::backend_factory::BackendFactory;
2use crate::factory::{NodeFactory, RegistryError};
3use rill_core::buffer::{Buffer, BufferRegistry, FixedBuffer, TapeLoop};
4use rill_core::math::Transcendental;
5use rill_core::queues::{MpscQueue, SetParameter};
6use rill_core::time::ClockTick;
7use rill_core::traits::algorithm::ActionContext;
8use rill_core::traits::port::Port;
9use rill_core::traits::processable::{ProcessContext, Processable};
10use rill_core::traits::ParamValue;
11use rill_core::traits::{Node, NodeId, NodeVariant, Params};
12use rill_core_actor::{ActorCell, ActorRef};
13use std::collections::{HashMap, VecDeque};
14use std::sync::atomic::AtomicBool;
15use std::sync::Arc;
16
17// ============================================================================
18// Internal routing metadata
19// ============================================================================
20
21// ============================================================================
22// Build Errors
23// ============================================================================
24
25/// Errors that can occur during graph construction.
26#[derive(Debug, Clone)]
27pub enum BuildError {
28    /// A cycle was detected in the signal edge graph.
29    CycleDetected,
30    /// Backend creation failed.
31    Backend(String),
32}
33
34impl std::fmt::Display for BuildError {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            Self::CycleDetected => write!(f, "graph cycle detected"),
38            Self::Backend(msg) => write!(f, "backend error: {msg}"),
39        }
40    }
41}
42
43// ============================================================================
44// Graph Builder
45// ============================================================================
46
47// ============================================================================
48// Node Storage
49// ============================================================================
50
51pub(crate) struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
52    pub(crate) node: NodeVariant<T, BUF_SIZE>,
53}
54
55// ============================================================================
56// GraphBuilder (Mutable Construction)
57// ============================================================================
58
59/// A named resource (tape loop) shared between nodes in the graph.
60#[derive(Clone)]
61pub struct GraphResource {
62    /// Unique name referenced by node parameters.
63    pub name: String,
64    /// Resource kind string (`"tape"`).
65    pub kind: String,
66    /// Capacity in samples (for `"tape"` kind).
67    pub capacity: usize,
68}
69
70/// Mutable builder for an immutable signal graph.
71///
72/// # Node factory
73///
74/// The builder holds an [`Arc<NodeFactory>`] for constructing nodes by
75/// type name, provided at construction via [`GraphBuilder::new`].
76pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
77    nodes: Vec<NodeEntry<T, BUF_SIZE>>,
78    node_backends: Vec<Option<String>>,
79    signal_edges: Vec<(usize, usize, usize, usize)>,
80    control_edges: Vec<(usize, usize, usize, usize)>,
81    clock_edges: Vec<(usize, usize, usize, usize)>,
82    feedback_edges: Vec<(usize, usize, usize, usize)>,
83    resources: Vec<GraphResource>,
84    /// Shared node factory (required, from Runtime).
85    factory: Arc<NodeFactory<T, BUF_SIZE>>,
86    /// Shared backend factory (required, from Runtime).
87    backend_factory: Arc<BackendFactory<T>>,
88    /// Default backend name for nodes that don't specify one explicitly.
89    default_backend: Option<String>,
90    /// Default backend parameters (sample_rate, buffer_size, channels).
91    backend_params: HashMap<String, ParamValue>,
92    /// Sample rate override. When set, used in [`build`](Self::build).
93    /// Populated from [`GraphDef::sample_rate`] during deserialization.
94    sample_rate: Option<f32>,
95    /// Telemetry queue for clock ticks (audio → control).
96    clock_tx: Option<ActorRef<ClockTick>>,
97}
98
99impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
100    /// Create a new empty graph builder without a node factory.
101    pub fn new(
102        factory: Arc<NodeFactory<T, BUF_SIZE>>,
103        backend_factory: Arc<BackendFactory<T>>,
104    ) -> Self {
105        Self {
106            nodes: Vec::new(),
107            node_backends: Vec::new(),
108            signal_edges: Vec::new(),
109            control_edges: Vec::new(),
110            clock_edges: Vec::new(),
111            feedback_edges: Vec::new(),
112            resources: Vec::new(),
113            factory,
114            backend_factory,
115            default_backend: None,
116            backend_params: HashMap::new(),
117            sample_rate: None,
118            clock_tx: None,
119        }
120    }
121
122    /// Add a node by type name using the internal factory.
123    ///
124    /// The type must have been registered in the factory before calling
125    /// this method.
126    ///
127    /// Returns the index of the newly added node.
128    ///
129    /// # Errors
130    ///
131    /// Returns [`RegistryError`] if no factory is set or the type name
132    /// is not registered.
133    pub fn add_node(&mut self, type_name: &str, params: &Params) -> Result<usize, RegistryError> {
134        let id = NodeId(self.nodes.len() as u32);
135        self.add_node_with_id(type_name, params, id)
136    }
137
138    /// Add a node with an explicit [`NodeId`].
139    ///
140    /// Like [`add_node`](Self::add_node) but uses the provided `id`.
141    /// Important for serialization where external references depend on
142    /// exact IDs.
143    pub fn add_node_with_id(
144        &mut self,
145        type_name: &str,
146        params: &Params,
147        id: NodeId,
148    ) -> Result<usize, RegistryError> {
149        let node = self.factory.construct(type_name, id, params)?;
150        let idx = self.nodes.len();
151        self.nodes.push(NodeEntry { node });
152        self.node_backends.push(None);
153        Ok(idx)
154    }
155
156    /// Assign a named backend to the node at the given index.
157    ///
158    /// During [`build`](Self::build), the backend is created via the
159    /// builder's [`BackendFactory`](crate::BackendFactory) and passed to
160    /// the node's [`Node::resolve_backend`](rill_core::Node::resolve_backend).
161    pub fn set_node_backend(&mut self, idx: usize, name: String) {
162        if idx < self.node_backends.len() {
163            self.node_backends[idx] = Some(name);
164        }
165    }
166
167    /// Register a named resource (tape loop, buffer, etc.).
168    pub fn add_resource(&mut self, resource: GraphResource) {
169        self.resources.push(resource);
170    }
171
172    /// Number of nodes added to the builder so far.
173    pub fn node_count(&self) -> usize {
174        self.nodes.len()
175    }
176
177    /// Set the default backend name and parameters. Nodes without an explicit
178    /// backend in [`NodeDef::backend`] will use this during [`build`](Self::build).
179    pub fn set_default_backend(&mut self, name: String, params: HashMap<String, ParamValue>) {
180        self.default_backend = Some(name);
181        self.backend_params = params;
182    }
183
184    /// Get the default backend name, if set.
185    pub fn default_backend_name(&self) -> Option<&String> {
186        self.default_backend.as_ref()
187    }
188
189    /// Set the sample rate for this builder.
190    pub fn set_sample_rate(&mut self, sr: f32) {
191        self.sample_rate = Some(sr);
192    }
193
194    /// Set the clock tick channel (audio → control).
195    pub fn set_clock_tx(&mut self, tx: ActorRef<ClockTick>) {
196        self.clock_tx = Some(tx);
197    }
198
199    /// Access the shared backend factory.
200    pub fn backend_factory(&self) -> &Arc<BackendFactory<T>> {
201        &self.backend_factory
202    }
203
204    /// Add a source node and return its index.
205    pub fn add_source(&mut self, source: Box<dyn rill_core::traits::Source<T, BUF_SIZE>>) -> usize {
206        let idx = self.nodes.len();
207        self.nodes.push(NodeEntry {
208            node: NodeVariant::Source(source),
209        });
210        self.node_backends.push(None);
211        idx
212    }
213
214    /// Add a processor node and return its index.
215    pub fn add_processor(
216        &mut self,
217        processor: Box<dyn rill_core::traits::Processor<T, BUF_SIZE>>,
218    ) -> usize {
219        let idx = self.nodes.len();
220        self.nodes.push(NodeEntry {
221            node: NodeVariant::Processor(processor),
222        });
223        self.node_backends.push(None);
224        idx
225    }
226
227    /// Add a sink node and return its index.
228    pub fn add_sink(&mut self, sink: Box<dyn rill_core::traits::Sink<T, BUF_SIZE>>) -> usize {
229        let idx = self.nodes.len();
230        self.nodes.push(NodeEntry {
231            node: NodeVariant::Sink(sink),
232        });
233        self.node_backends.push(None);
234        idx
235    }
236
237    /// Add a Router node (N→M configurable routing, no DSP).
238    pub fn add_router(&mut self, router: Box<dyn rill_core::traits::Router<T, BUF_SIZE>>) -> usize {
239        let idx = self.nodes.len();
240        self.nodes.push(NodeEntry {
241            node: NodeVariant::Router(router),
242        });
243        self.node_backends.push(None);
244        idx
245    }
246
247    /// Connect signal ports (audio data).
248    pub fn connect_signal(
249        &mut self,
250        from_node: usize,
251        from_port: usize,
252        to_node: usize,
253        to_port: usize,
254    ) {
255        self.signal_edges
256            .push((from_node, from_port, to_node, to_port));
257    }
258
259    /// Connect control ports (modulation values).
260    pub fn connect_control(
261        &mut self,
262        from_node: usize,
263        from_port: usize,
264        to_node: usize,
265        to_port: usize,
266    ) {
267        self.control_edges
268            .push((from_node, from_port, to_node, to_port));
269    }
270
271    /// Connect clock ports (timing events).
272    pub fn connect_clock(
273        &mut self,
274        from_node: usize,
275        from_port: usize,
276        to_node: usize,
277        to_port: usize,
278    ) {
279        self.clock_edges
280            .push((from_node, from_port, to_node, to_port));
281    }
282
283    /// Connect feedback ports (delay lines, state carryover).
284    pub fn connect_feedback(
285        &mut self,
286        from_node: usize,
287        from_port: usize,
288        to_node: usize,
289        to_port: usize,
290    ) {
291        self.feedback_edges
292            .push((from_node, from_port, to_node, to_port));
293    }
294
295    /// Build the graph.
296    ///
297    /// Creates backends for nodes that have a backend name set (via
298    /// [`NodeDef::backend`] or the builder's default).  Finds the active
299    /// (driver) node and stores its index for [`Graph::run`].
300    pub fn build(mut self) -> Result<Graph<T, BUF_SIZE>, BuildError> {
301        let num_nodes = self.nodes.len();
302
303        // --- adjacency for Kahn (audio edges only; feedback is not a DAG edge) ---
304        let mut in_degree = vec![0usize; num_nodes];
305        let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
306
307        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
308            in_degree[to_n] += 1;
309            out_edges[from_n].push((from_p, to_n, to_p));
310        }
311
312        // --- Kahn's algorithm ---
313        let mut queue: VecDeque<usize> = in_degree
314            .iter()
315            .enumerate()
316            .filter(|(_, &d)| d == 0)
317            .map(|(i, _)| i)
318            .collect();
319
320        let mut topo = Vec::with_capacity(num_nodes);
321        let mut indeg = in_degree;
322        while let Some(idx) = queue.pop_front() {
323            topo.push(idx);
324            for &(_, to_n, _) in &out_edges[idx] {
325                indeg[to_n] -= 1;
326                if indeg[to_n] == 0 {
327                    queue.push_back(to_n);
328                }
329            }
330        }
331
332        if topo.len() != num_nodes {
333            return Err(BuildError::CycleDetected);
334        }
335
336        // --- populate Port::downstream, downstream_input_ptrs, parent ---
337        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
338            // downstream list (serialization)
339            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
340                port.downstream.push((to_n, to_p));
341            }
342            // Prepare pointers (safe: distinct indices in static DAG).
343            let in_ptr: *mut Port<T, BUF_SIZE> = self.nodes[to_n]
344                .node
345                .input_port_mut(to_p)
346                .map(|p| p as *mut Port<T, BUF_SIZE>)
347                .unwrap_or(std::ptr::null_mut());
348            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
349            let out_ptr: *mut Port<T, BUF_SIZE> = self.nodes[from_n]
350                .node
351                .output_port_mut(from_p)
352                .map(|p| p as *mut Port<T, BUF_SIZE>)
353                .unwrap_or(std::ptr::null_mut());
354            // Assign (safe: pointers were obtained without overlapping borrows).
355            if !in_ptr.is_null() && !out_ptr.is_null() {
356                #[allow(unsafe_code)]
357                unsafe {
358                    (*in_ptr).parent = parent;
359                    (*out_ptr).downstream_input_ptrs.push(in_ptr);
360                }
361            }
362        }
363
364        // --- downstream_nodes: unique downstream node pointers ---
365        for &(from_n, from_p, to_n, _) in &self.signal_edges {
366            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut self.nodes[to_n].node;
367            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
368                let ptr_val = parent as usize;
369                let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
370                if !already {
371                    port.downstream_nodes.push(parent);
372                }
373            }
374        }
375
376        // --- upstream_buffer: zero-copy routing for 1:1 and fan-out ---
377        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
378            let upstream = self.nodes[from_n]
379                .node
380                .output_port(from_p)
381                .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
382            if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
383                if port.upstream_buffer.is_none() {
384                    // First upstream: set zero-copy pointer
385                    port.upstream_buffer = upstream;
386                } else {
387                    // Fan-in: copy-based fallback
388                    port.upstream_buffer = None;
389                }
390            }
391        }
392
393        // --- enable feedback buffers on both output and input ports ---
394        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
395            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
396                port.feedback_buffer = Some(FixedBuffer::new());
397                port.feedback_downstream.push((to_n, to_p));
398            }
399            if let Some(port) = self.nodes[to_n].node.input_port_mut(to_p) {
400                port.feedback_buffer = Some(FixedBuffer::new());
401            }
402        }
403        // --- populate Port::feedback_ptrs on output ports ---
404        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
405            let ptr = self.nodes[to_n]
406                .node
407                .input_port(to_p)
408                .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
409                .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
410            if let Some(port) = self.nodes[from_n].node.output_port_mut(from_p) {
411                if let Some(p) = ptr {
412                    port.feedback_ptrs.push(p);
413                }
414            }
415        }
416
417        let sr = self.sample_rate.unwrap_or(44100.0);
418
419        // Allocate named buffers (tape loops, etc.) from resource definitions.
420        let mut buffers = BufferRegistry::new();
421        for r in &self.resources {
422            if r.kind == "tape" {
423                if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
424                    buffers.register(&r.name, Box::new(tape));
425                }
426            }
427        }
428
429        // Resolve resources — each node that needs a shared buffer
430        // (e.g. WriteHead, ReadHead) looks it up by name and caches the
431        // pointer.  This is a single‑threaded, one‑time setup step.
432        for entry in &mut self.nodes {
433            entry.node.resolve_resources(&buffers);
434        }
435
436        // Resolve audio backends for I/O nodes.
437        // Each node with a named backend gets its own instance.
438        // Nodes without explicit backend get the default (if set).
439        for (idx, be_name) in self.node_backends.iter().enumerate() {
440            let name = match be_name {
441                Some(ref n) => Some(n.clone()),
442                None => self.default_backend.clone(),
443            };
444            if let Some(ref name) = name {
445                let mut be_params = HashMap::new();
446                be_params.insert("sample_rate".into(), ParamValue::Float(sr));
447                be_params.insert("buffer_size".into(), ParamValue::Int(BUF_SIZE as i32));
448                if self.default_backend.as_ref() == Some(name) {
449                    for (k, v) in &self.backend_params {
450                        be_params.entry(k.clone()).or_insert_with(|| v.clone());
451                    }
452                }
453                let backend = self
454                    .backend_factory
455                    .create(name, &be_params)
456                    .map_err(BuildError::Backend)?;
457                if let Some(io_node) = self.nodes[idx].node.as_io_node_mut() {
458                    io_node.resolve_backend(backend);
459                }
460            }
461        }
462
463        let mut nodes: Vec<NodeVariant<T, BUF_SIZE>> =
464            self.nodes.into_iter().map(|e| e.node).collect();
465
466        // Find the active (driver) node via ActiveNode trait.
467        let cmd_queue = Arc::new(MpscQueue::<SetParameter>::with_capacity(64));
468        let mut active_node_idx = None;
469        for (i, n) in nodes.iter_mut().enumerate() {
470            if n.as_active_node_mut().is_some() {
471                active_node_idx = Some(i);
472                break;
473            }
474        }
475        let have_queue = active_node_idx.is_some();
476        let command_queue = if have_queue { Some(cmd_queue) } else { None };
477
478        let owned_buffers = buffers.into_inner();
479
480        let allocated = self.resources.clone();
481
482        Ok(Graph {
483            nodes,
484            topo_order: topo,
485            resources: allocated,
486            current_tick: ClockTick::new(0, BUF_SIZE as u32, sr),
487            buffers: owned_buffers,
488            active_node_idx,
489            command_queue,
490            clock_tx: self.clock_tx.clone(),
491        })
492    }
493}
494
495// ============================================================================
496// Graph (Static DAG)
497// ============================================================================
498
499/// Immutable signal graph with static DAG topology.
500///
501/// Once built the graph cannot be modified. The graph owns no processing
502/// logic — it is a pure topology description. Processing is driven by
503/// port-level methods (`pre_process`, `snapshot_feedback`, `propagate`)
504/// called from external code (e.g. a real-time signal callback or an
505/// offline renderer).
506pub struct Graph<T: Transcendental, const BUF_SIZE: usize> {
507    nodes: Vec<NodeVariant<T, BUF_SIZE>>,
508    topo_order: Vec<usize>,
509    current_tick: ClockTick,
510    /// Resource metadata (name, kind, capacity) for serialization.
511    pub(crate) resources: Vec<GraphResource>,
512    /// Named buffers (tape loops, etc.) shared between nodes.
513    #[allow(dead_code)]
514    buffers: Vec<Box<dyn Buffer<T> + Send>>,
515    /// Index of the active node that drives graph processing.
516    active_node_idx: Option<usize>,
517    /// Command queue for sending parameters from control to audio thread.
518    command_queue: Option<Arc<MpscQueue<SetParameter>>>,
519    /// Clock tick channel (audio → control sequencer).
520    clock_tx: Option<ActorRef<ClockTick>>,
521}
522
523impl<T: Transcendental, const BUF_SIZE: usize> Graph<T, BUF_SIZE> {
524    // ========================================================================
525    // Accessors
526    // ========================================================================
527
528    /// Borrow the node array.
529    pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
530        &self.nodes
531    }
532
533    /// Mutably borrow the node array.
534    pub fn nodes_mut(&mut self) -> &mut [NodeVariant<T, BUF_SIZE>] {
535        &mut self.nodes
536    }
537
538    /// Return the current clock tick.
539    pub fn current_tick(&self) -> ClockTick {
540        self.current_tick
541    }
542
543    /// Return the number of nodes in the graph.
544    pub fn node_count(&self) -> usize {
545        self.nodes.len()
546    }
547
548    /// Return the topological ordering of node indices.
549    pub fn topo_order(&self) -> &[usize] {
550        &self.topo_order
551    }
552
553    #[allow(dead_code)]
554    pub(crate) fn sample_rate(&self) -> f32 {
555        self.current_tick.sample_rate
556    }
557
558    /// Access the named resources (tape loops, etc.) allocated for this graph.
559    #[allow(dead_code)]
560    pub fn resources(&self) -> &[GraphResource] {
561        &self.resources
562    }
563
564    /// Run graph processing through the active node.
565    ///
566    /// Creates a tick closure from the graph's runner and passes it to the
567    /// active node's [`Node::run`](rill_core::Node::run).
568    #[allow(unsafe_code)]
569    pub fn run(&mut self, running: Arc<AtomicBool>) -> Result<(), String> {
570        let Some(idx) = self.active_node_idx else {
571            return Ok(());
572        };
573        let source_idx = self.topo_order[0];
574        let cmd_queue = self
575            .command_queue
576            .clone()
577            .unwrap_or_else(|| Arc::new(MpscQueue::new()));
578        let clock_tx = self
579            .clock_tx
580            .clone()
581            .unwrap_or_else(|| ActorRef::new(&Arc::new(MpscQueue::new())));
582
583        let graph_ptr: *mut Graph<T, BUF_SIZE> = self;
584        let tick: Box<dyn FnMut(u64, f32)> = Box::new(move |sample_pos, sample_rate| {
585            let graph = unsafe { &mut *graph_ptr };
586            // drain command queue
587            while let Some(cmd) = cmd_queue.pop() {
588                let i = cmd.port.node_id().inner() as usize;
589                if i < graph.nodes.len() {
590                    let _ = graph.nodes[i].set_parameter(&cmd.parameter, cmd.value);
591                }
592            }
593            // process source and propagate
594            let tick = ClockTick::new(sample_pos, BUF_SIZE as u32, sample_rate);
595            let mut ctx = ProcessContext { clock: &tick };
596            let _ = graph.nodes[source_idx].process_block(&mut ctx);
597            let action_ctx = ActionContext::new(&tick);
598            for po in 0..graph.nodes[source_idx].num_signal_outputs() {
599                if let Some(port) = graph.nodes[source_idx].output_port(po) {
600                    let _ = port.propagate(port.buffer(), &action_ctx);
601                }
602            }
603            // send clock tick
604            clock_tx.send(tick);
605        });
606
607        self.nodes[idx]
608            .as_active_node_mut()
609            .ok_or("no active node")?
610            .run(tick, running)
611    }
612
613    /// Obtain an [`ActorRef`] for sending commands to this graph.
614    ///
615    /// The returned handle holds a weak reference — when the `Graph` is
616    /// dropped, all subsequent `send` calls route to dead letters.
617    /// Returns `None` if no audio backend was configured (no queue created).
618    pub fn handle(&self) -> Option<ActorRef<SetParameter>> {
619        let mailbox = self.command_queue.as_ref()?;
620        Some(ActorRef::new(mailbox))
621    }
622
623    /// Consume the graph and return its owned parts (test only).
624    #[cfg(test)]
625    pub fn into_parts(
626        self,
627    ) -> (
628        Vec<NodeVariant<T, BUF_SIZE>>,
629        Vec<usize>,
630        ClockTick,
631        Vec<Box<dyn Buffer<T> + Send>>,
632    ) {
633        let Self {
634            nodes,
635            topo_order,
636            current_tick,
637            resources: _,
638            buffers,
639            active_node_idx: _,
640            command_queue: _,
641            clock_tx: _,
642        } = self;
643        (nodes, topo_order, current_tick, buffers)
644    }
645}
646
647// ============================================================================
648// ActorCell implementation
649// ============================================================================
650
651impl<T: Transcendental, const BUF_SIZE: usize> ActorCell for Graph<T, BUF_SIZE> {
652    type Msg = SetParameter;
653
654    /// Process a single parameter command by writing to the target node.
655    fn receive(&mut self, msg: SetParameter) {
656        let idx = msg.port.node_id().inner() as usize;
657        if idx < self.nodes.len() {
658            let _ = self.nodes[idx].set_parameter(&msg.parameter, msg.value);
659        }
660    }
661}
662
663#[cfg(test)]
664mod tests {
665    use super::*;
666    use rill_core::math::Transcendental;
667    use rill_core::time::ClockTick;
668    use rill_core::traits::algorithm::ActionContext;
669    use rill_core::traits::processable::{ProcessContext, Processable};
670    use rill_core::traits::{
671        Node, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
672        PortDirection, PortId, ProcessResult, Processor, Sink, Source,
673    };
674    use std::sync::Arc;
675
676    /// Create a test builder with empty factories.
677    fn test_builder<const B: usize>() -> GraphBuilder<f32, B> {
678        GraphBuilder::new(
679            Arc::new(NodeFactory::new()),
680            Arc::new(BackendFactory::new()),
681        )
682    }
683
684    // ------------------------------------------------------------------------
685    // Mock: ConstantSource — fills output with a constant value
686    // ------------------------------------------------------------------------
687    struct ConstantSource<T: Transcendental, const BUF_SIZE: usize> {
688        value: T,
689        state: NodeState<T, BUF_SIZE>,
690        outputs: Vec<Port<T, BUF_SIZE>>,
691    }
692
693    impl<T: Transcendental, const BUF_SIZE: usize> ConstantSource<T, BUF_SIZE> {
694        fn new(value: T, sample_rate: f32) -> Self {
695            let mut outputs = Vec::with_capacity(1);
696            outputs.push(Port {
697                id: PortId::signal_out(NodeId(0), 0),
698                name: "output".into(),
699                direction: PortDirection::Output,
700                action: None,
701                pending_command: None,
702                buffer: Default::default(),
703                feedback_buffer: None,
704                downstream: Vec::new(),
705                feedback_downstream: Vec::new(),
706                feedback_ptrs: Vec::new(),
707                downstream_input_ptrs: Vec::new(),
708                downstream_nodes: Vec::new(),
709                parent: std::ptr::null_mut(),
710                upstream_buffer: None,
711            });
712            Self {
713                value,
714                state: NodeState::new(sample_rate),
715                outputs,
716            }
717        }
718    }
719
720    impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
721        fn metadata(&self) -> NodeMetadata {
722            NodeMetadata {
723                type_name: None,
724                name: "ConstantSource".into(),
725                category: NodeCategory::Source,
726                description: String::new(),
727                author: String::new(),
728                version: "1.0".into(),
729                signal_inputs: 0,
730                signal_outputs: 1,
731                control_inputs: 0,
732                control_outputs: 0,
733                clock_inputs: 0,
734                clock_outputs: 0,
735                feedback_ports: 0,
736                parameters: vec![],
737            }
738        }
739        fn init(&mut self, _sample_rate: f32) {}
740        fn reset(&mut self) {}
741        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
742            None
743        }
744        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
745            Ok(())
746        }
747        fn id(&self) -> NodeId {
748            NodeId(0)
749        }
750        fn set_id(&mut self, _id: NodeId) {}
751        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
752            None
753        }
754        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
755            None
756        }
757        fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
758            self.outputs.get(index)
759        }
760        fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
761            self.outputs.get_mut(index)
762        }
763        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
764            None
765        }
766        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
767            None
768        }
769        fn state(&self) -> &NodeState<T, BUF_SIZE> {
770            &self.state
771        }
772        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
773            &mut self.state
774        }
775    }
776
777    impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE> for ConstantSource<T, BUF_SIZE> {
778        fn generate(
779            &mut self,
780            _clock: &ClockTick,
781            _control_inputs: &[T],
782            _clock_inputs: &[ClockTick],
783        ) -> ProcessResult<()> {
784            let out = self.outputs[0].buffer.as_mut_array();
785            for sample in out.iter_mut() {
786                *sample = self.value;
787            }
788            Ok(())
789        }
790        fn num_signal_outputs(&self) -> usize {
791            1
792        }
793    }
794
795    // ------------------------------------------------------------------------
796    // Mock: NoopProcessor — minimal processor for topology tests
797    // ------------------------------------------------------------------------
798    struct NoopProcessor<T: Transcendental, const BUF_SIZE: usize> {
799        state: NodeState<T, BUF_SIZE>,
800    }
801
802    impl<T: Transcendental, const BUF_SIZE: usize> NoopProcessor<T, BUF_SIZE> {
803        fn new(sample_rate: f32) -> Self {
804            Self {
805                state: NodeState::new(sample_rate),
806            }
807        }
808    }
809
810    impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for NoopProcessor<T, BUF_SIZE> {
811        fn metadata(&self) -> NodeMetadata {
812            NodeMetadata {
813                type_name: None,
814                name: "NoopProcessor".into(),
815                category: NodeCategory::Processor,
816                description: String::new(),
817                author: String::new(),
818                version: "1.0".into(),
819                signal_inputs: 0,
820                signal_outputs: 0,
821                control_inputs: 0,
822                control_outputs: 0,
823                clock_inputs: 0,
824                clock_outputs: 0,
825                feedback_ports: 0,
826                parameters: vec![],
827            }
828        }
829        fn init(&mut self, _sample_rate: f32) {}
830        fn reset(&mut self) {}
831        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
832            None
833        }
834        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
835            Ok(())
836        }
837        fn id(&self) -> NodeId {
838            NodeId(1)
839        }
840        fn set_id(&mut self, _id: NodeId) {}
841        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
842            None
843        }
844        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
845            None
846        }
847        fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
848            None
849        }
850        fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
851            None
852        }
853        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
854            None
855        }
856        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
857            None
858        }
859        fn state(&self) -> &NodeState<T, BUF_SIZE> {
860            &self.state
861        }
862        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
863            &mut self.state
864        }
865    }
866
867    impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
868        for NoopProcessor<T, BUF_SIZE>
869    {
870        fn process(
871            &mut self,
872            _clock: &ClockTick,
873            _signal_inputs: &[&[T; BUF_SIZE]],
874            _control_inputs: &[T],
875            _clock_inputs: &[ClockTick],
876            _feedback_inputs: &[&[T; BUF_SIZE]],
877        ) -> ProcessResult<()> {
878            Ok(())
879        }
880    }
881
882    // ------------------------------------------------------------------------
883    // Mock: NoopSink — minimal sink for topology tests
884    // ------------------------------------------------------------------------
885    struct NoopSink<T: Transcendental, const BUF_SIZE: usize> {
886        state: NodeState<T, BUF_SIZE>,
887    }
888
889    impl<T: Transcendental, const BUF_SIZE: usize> NoopSink<T, BUF_SIZE> {
890        fn new(sample_rate: f32) -> Self {
891            Self {
892                state: NodeState::new(sample_rate),
893            }
894        }
895    }
896
897    impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
898        fn metadata(&self) -> NodeMetadata {
899            NodeMetadata {
900                type_name: None,
901                name: "NoopSink".into(),
902                category: NodeCategory::Sink,
903                description: String::new(),
904                author: String::new(),
905                version: "1.0".into(),
906                signal_inputs: 0,
907                signal_outputs: 0,
908                control_inputs: 0,
909                control_outputs: 0,
910                clock_inputs: 0,
911                clock_outputs: 0,
912                feedback_ports: 0,
913                parameters: vec![],
914            }
915        }
916        fn init(&mut self, _sample_rate: f32) {}
917        fn reset(&mut self) {}
918        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
919            None
920        }
921        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
922            Ok(())
923        }
924        fn id(&self) -> NodeId {
925            NodeId(2)
926        }
927        fn set_id(&mut self, _id: NodeId) {}
928        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
929            None
930        }
931        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
932            None
933        }
934        fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
935            None
936        }
937        fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
938            None
939        }
940        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
941            None
942        }
943        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
944            None
945        }
946        fn state(&self) -> &NodeState<T, BUF_SIZE> {
947            &self.state
948        }
949        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
950            &mut self.state
951        }
952    }
953
954    impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for NoopSink<T, BUF_SIZE> {
955        fn consume(
956            &mut self,
957            _clock: &ClockTick,
958            _signal_inputs: &[&[T; BUF_SIZE]],
959            _control_inputs: &[T],
960            _clock_inputs: &[ClockTick],
961            _feedback_inputs: &[&[T; BUF_SIZE]],
962        ) -> ProcessResult<()> {
963            Ok(())
964        }
965    }
966
967    // ========================================================================
968    // Tests
969    // ========================================================================
970
971    #[test]
972    fn test_topo_order_correct() {
973        const BUF: usize = 64;
974        let mut builder = test_builder::<BUF>();
975
976        let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
977        let proc = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
978        let sink = builder.add_sink(Box::new(NoopSink::new(44100.0)));
979
980        builder.connect_signal(src, 0, proc, 0);
981        builder.connect_signal(proc, 0, sink, 0);
982
983        let graph = builder.build().expect("build failed");
984
985        let order = graph.topo_order();
986        let src_pos = order.iter().position(|&i| i == src).unwrap();
987        let proc_pos = order.iter().position(|&i| i == proc).unwrap();
988        let sink_pos = order.iter().position(|&i| i == sink).unwrap();
989        assert!(src_pos < proc_pos);
990        assert!(proc_pos < sink_pos);
991    }
992
993    #[test]
994    fn test_cycle_detection() {
995        const BUF: usize = 64;
996        let mut builder = test_builder::<BUF>();
997
998        let a = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
999        let b = builder.add_processor(Box::new(NoopProcessor::new(44100.0)));
1000
1001        builder.connect_signal(a, 0, b, 0);
1002        builder.connect_signal(b, 0, a, 0);
1003
1004        let result = builder.build();
1005        assert!(matches!(result, Err(BuildError::CycleDetected)));
1006    }
1007
1008    #[test]
1009    fn test_source_node_create() {
1010        const BUF: usize = 64;
1011        let mut builder = test_builder::<BUF>();
1012        let idx = builder.add_source(Box::new(ConstantSource::new(0.5, 44100.0)));
1013        let graph = builder.build().expect("build failed");
1014        assert_eq!(graph.node_count(), 1);
1015        assert_eq!(graph.topo_order(), &[idx]);
1016    }
1017
1018    // ========================================================================
1019    // Port-based propagation tests
1020    // ========================================================================
1021
1022    /// Simple Sink that captures its first input port for inspection.
1023    pub struct TestSink<T: Transcendental, const BUF_SIZE: usize> {
1024        id: NodeId,
1025        state: NodeState<T, BUF_SIZE>,
1026        pub inputs: Vec<Port<T, BUF_SIZE>>,
1027        last_value: T,
1028    }
1029
1030    impl<T: Transcendental, const BUF_SIZE: usize> TestSink<T, BUF_SIZE> {
1031        fn new(id: NodeId, sample_rate: f32) -> Self {
1032            let mut inputs = Vec::new();
1033            inputs.push(Port::input(id, 0, "in"));
1034            Self {
1035                id,
1036                state: NodeState::new(sample_rate),
1037                inputs,
1038                last_value: T::ZERO,
1039            }
1040        }
1041    }
1042
1043    impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
1044        fn metadata(&self) -> NodeMetadata {
1045            NodeMetadata {
1046                type_name: None,
1047                name: "TestSink".into(),
1048                category: NodeCategory::Sink,
1049                description: String::new(),
1050                author: String::new(),
1051                version: "1.0".into(),
1052                signal_inputs: 1,
1053                signal_outputs: 0,
1054                control_inputs: 0,
1055                control_outputs: 0,
1056                clock_inputs: 0,
1057                clock_outputs: 0,
1058                feedback_ports: 0,
1059                parameters: vec![],
1060            }
1061        }
1062        fn init(&mut self, _: f32) {}
1063        fn reset(&mut self) {
1064            self.state.sample_pos = 0;
1065            self.state.blocks_processed = 0;
1066        }
1067        fn id(&self) -> NodeId {
1068            self.id
1069        }
1070        fn set_id(&mut self, id: NodeId) {
1071            self.id = id;
1072        }
1073        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
1074            None
1075        }
1076        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
1077            Ok(())
1078        }
1079        fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1080            self.inputs.get(i)
1081        }
1082        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1083            self.inputs.get_mut(i)
1084        }
1085        fn output_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1086            None
1087        }
1088        fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1089            None
1090        }
1091        fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1092            None
1093        }
1094        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1095            None
1096        }
1097        fn num_signal_inputs(&self) -> usize {
1098            1
1099        }
1100        fn num_signal_outputs(&self) -> usize {
1101            0
1102        }
1103        fn state(&self) -> &NodeState<T, BUF_SIZE> {
1104            &self.state
1105        }
1106        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
1107            &mut self.state
1108        }
1109    }
1110
1111    impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE> for TestSink<T, BUF_SIZE> {
1112        fn consume(
1113            &mut self,
1114            _clock: &ClockTick,
1115            _signal_inputs: &[&[T; BUF_SIZE]],
1116            _control_inputs: &[T],
1117            _clock_inputs: &[ClockTick],
1118            _feedback_inputs: &[&[T; BUF_SIZE]],
1119        ) -> ProcessResult<()> {
1120            if let Some(port) = self.inputs.first() {
1121                self.last_value = port.buffer.as_array()[0];
1122            }
1123            self.state.advance();
1124            Ok(())
1125        }
1126    }
1127
1128    /// Processor with a `multiplier` parameter. Output = input × multiplier.
1129    pub struct GainProcessor<T: Transcendental, const BUF_SIZE: usize> {
1130        id: NodeId,
1131        state: NodeState<T, BUF_SIZE>,
1132        pub inputs: Vec<Port<T, BUF_SIZE>>,
1133        pub outputs: Vec<Port<T, BUF_SIZE>>,
1134        pub multiplier: T,
1135    }
1136
1137    impl<T: Transcendental, const BUF_SIZE: usize> GainProcessor<T, BUF_SIZE> {
1138        fn new(id: NodeId, sample_rate: f32, multiplier: T) -> Self {
1139            let mut inputs = Vec::new();
1140            inputs.push(Port::input(id, 0, "in"));
1141            let mut outputs = Vec::new();
1142            outputs.push(Port::output(id, 0, "out"));
1143            Self {
1144                id,
1145                state: NodeState::new(sample_rate),
1146                inputs,
1147                outputs,
1148                multiplier,
1149            }
1150        }
1151    }
1152
1153    impl<T: Transcendental, const BUF_SIZE: usize> Node<T, BUF_SIZE> for GainProcessor<T, BUF_SIZE> {
1154        fn metadata(&self) -> NodeMetadata {
1155            NodeMetadata {
1156                type_name: None,
1157                name: "GainProcessor".into(),
1158                category: NodeCategory::Processor,
1159                description: String::new(),
1160                author: String::new(),
1161                version: "1.0".into(),
1162                signal_inputs: 1,
1163                signal_outputs: 1,
1164                control_inputs: 0,
1165                control_outputs: 0,
1166                clock_inputs: 0,
1167                clock_outputs: 0,
1168                feedback_ports: 0,
1169                parameters: vec![],
1170            }
1171        }
1172        fn init(&mut self, _: f32) {}
1173        fn reset(&mut self) {
1174            self.state.sample_pos = 0;
1175            self.state.blocks_processed = 0;
1176        }
1177        fn id(&self) -> NodeId {
1178            self.id
1179        }
1180        fn set_id(&mut self, id: NodeId) {
1181            self.id = id;
1182        }
1183        fn get_parameter(&self, id: &ParameterId) -> Option<ParamValue> {
1184            match id.as_str() {
1185                "multiplier" => Some(ParamValue::Float(self.multiplier.to_f32())),
1186                _ => None,
1187            }
1188        }
1189        fn set_parameter(&mut self, id: &ParameterId, value: ParamValue) -> ProcessResult<()> {
1190            match id.as_str() {
1191                "multiplier" => {
1192                    if let Some(v) = value.as_f32() {
1193                        self.multiplier = T::from_f32(v);
1194                        Ok(())
1195                    } else {
1196                        Err(rill_core::ProcessError::parameter("expected float"))
1197                    }
1198                }
1199                _ => Err(rill_core::ProcessError::parameter("unknown")),
1200            }
1201        }
1202        fn input_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1203            self.inputs.get(i)
1204        }
1205        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1206            self.inputs.get_mut(i)
1207        }
1208        fn output_port(&self, i: usize) -> Option<&Port<T, BUF_SIZE>> {
1209            self.outputs.get(i)
1210        }
1211        fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1212            self.outputs.get_mut(i)
1213        }
1214        fn control_port(&self, _: usize) -> Option<&Port<T, BUF_SIZE>> {
1215            None
1216        }
1217        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, BUF_SIZE>> {
1218            None
1219        }
1220        fn num_signal_inputs(&self) -> usize {
1221            1
1222        }
1223        fn num_signal_outputs(&self) -> usize {
1224            1
1225        }
1226        fn state(&self) -> &NodeState<T, BUF_SIZE> {
1227            &self.state
1228        }
1229        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
1230            &mut self.state
1231        }
1232    }
1233
1234    impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
1235        for GainProcessor<T, BUF_SIZE>
1236    {
1237        fn process(
1238            &mut self,
1239            _clock: &ClockTick,
1240            _signal_inputs: &[&[T; BUF_SIZE]],
1241            _control_inputs: &[T],
1242            _clock_inputs: &[ClockTick],
1243            _feedback_inputs: &[&[T; BUF_SIZE]],
1244        ) -> ProcessResult<()> {
1245            let inp = *self.inputs[0].buffer.as_array();
1246            let out = self.outputs[0].buffer.as_mut_array();
1247            for i in 0..BUF_SIZE {
1248                out[i] = inp[i] * self.multiplier;
1249            }
1250            self.state.advance();
1251            Ok(())
1252        }
1253        fn latency(&self) -> usize {
1254            0
1255        }
1256    }
1257
1258    // ── Test: Source → Sink via GraphBuilder ────────────────────────
1259
1260    #[test]
1261    fn test_graph_source_to_sink() {
1262        use rill_core::traits::algorithm::ActionContext;
1263        use rill_core::traits::processable::{ProcessContext, Processable};
1264        const BUF: usize = 64;
1265        let mut builder = test_builder::<BUF>();
1266        let src = builder.add_source(Box::new(ConstantSource::new(42.0, 44100.0)));
1267        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(1), 44100.0)));
1268        builder.connect_signal(src, 0, snk, 0);
1269        let graph = builder.build().unwrap();
1270        let (mut nodes, topo, _, _bufs) = graph.into_parts();
1271        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1272
1273        let mut ctx = ProcessContext { clock: &tick };
1274        let _ = nodes[topo[0]].process_block(&mut ctx);
1275        let action_ctx = ActionContext::new(&tick);
1276        let out_port = nodes[topo[0]].output_port(0).unwrap();
1277        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1278
1279        let sink_val = nodes[topo[1]].input_port(0).unwrap().buffer.as_array()[0];
1280        assert_eq!(sink_val, 42.0, "sink should receive source value");
1281    }
1282
1283    // ── Test: Source → Processor → Sink via GraphBuilder ────────────
1284
1285    #[test]
1286    fn test_graph_source_proc_sink() {
1287        use rill_core::traits::algorithm::ActionContext;
1288        use rill_core::traits::processable::{ProcessContext, Processable};
1289        const BUF: usize = 64;
1290        let mut builder = test_builder::<BUF>();
1291        let src = builder.add_source(Box::new(ConstantSource::new(10.0, 44100.0)));
1292        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1293            NodeId(1),
1294            44100.0,
1295            3.0,
1296        )));
1297        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1298        builder.connect_signal(src, 0, proc, 0);
1299        builder.connect_signal(proc, 0, snk, 0);
1300        let graph = builder.build().unwrap();
1301        let (mut nodes, topo, _, _bufs) = graph.into_parts();
1302        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1303
1304        let mut ctx = ProcessContext { clock: &tick };
1305        let _ = nodes[topo[0]].process_block(&mut ctx);
1306        let action_ctx = ActionContext::new(&tick);
1307        let out_port = nodes[topo[0]].output_port(0).unwrap();
1308        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1309
1310        let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1311        assert!(
1312            (sink_val - 30.0).abs() < 1e-6,
1313            "source(10)×gain(3)=30, got {}",
1314            sink_val
1315        );
1316    }
1317
1318    // ── Test: Command queue drain ───────────────────────────────────
1319
1320    #[test]
1321    fn test_command_queue_drain() {
1322        use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1323        use rill_core::traits::PortId;
1324
1325        const BUF: usize = 64;
1326        let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1327
1328        let mut builder = test_builder::<BUF>();
1329        builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1330            NodeId(0),
1331            44100.0,
1332            2.0,
1333        )));
1334        let graph = builder.build().unwrap();
1335        let (mut nodes, _, _, _bufs) = graph.into_parts();
1336
1337        let _ = queue.push(SetParameter::new(
1338            PortId::control_in(NodeId(0), 0),
1339            ParameterId::new("multiplier").unwrap(),
1340            ParamValue::Float(5.0),
1341            SignalOrigin::Manual,
1342        ));
1343
1344        while let Some(cmd) = queue.pop() {
1345            let idx = cmd.port.node_id().inner() as usize;
1346            let pid = cmd.parameter.clone();
1347            let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1348        }
1349
1350        let pid = ParameterId::new("multiplier").unwrap();
1351        let val = nodes[0].get_parameter(&pid).unwrap().as_f32().unwrap();
1352        assert!(
1353            (val - 5.0).abs() < 1e-6,
1354            "multiplier should be 5.0, got {}",
1355            val
1356        );
1357    }
1358
1359    // ── Test: Queue + propagate ─────────────────────────────────────
1360
1361    #[test]
1362    fn test_command_then_propagate() {
1363        use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1364        use rill_core::traits::algorithm::ActionContext;
1365        use rill_core::traits::processable::{ProcessContext, Processable};
1366        use rill_core::traits::PortId;
1367
1368        const BUF: usize = 64;
1369        let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1370
1371        let mut builder = test_builder::<BUF>();
1372        let src = builder.add_source(Box::new(ConstantSource::new(7.0, 44100.0)));
1373        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1374            NodeId(1),
1375            44100.0,
1376            2.0,
1377        )));
1378        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1379        builder.connect_signal(src, 0, proc, 0);
1380        builder.connect_signal(proc, 0, snk, 0);
1381        let graph = builder.build().unwrap();
1382        let (mut nodes, topo, _, _bufs) = graph.into_parts();
1383        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1384
1385        // Push command and drain
1386        let _ = queue.push(SetParameter::new(
1387            PortId::control_in(NodeId(1), 0),
1388            ParameterId::new("multiplier").unwrap(),
1389            ParamValue::Float(4.0),
1390            SignalOrigin::Manual,
1391        ));
1392        while let Some(cmd) = queue.pop() {
1393            let idx = cmd.port.node_id().inner() as usize;
1394            let pid = cmd.parameter.clone();
1395            let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1396        }
1397
1398        // Verify multiplier changed
1399        let pid = ParameterId::new("multiplier").unwrap();
1400        let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
1401        assert!((val - 4.0).abs() < 1e-6);
1402
1403        // Process + propagate
1404        let mut ctx = ProcessContext { clock: &tick };
1405        let _ = nodes[topo[0]].process_block(&mut ctx);
1406        let action_ctx = ActionContext::new(&tick);
1407        let out_port = nodes[topo[0]].output_port(0).unwrap();
1408        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1409
1410        let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1411        assert!(
1412            (sink_val - 28.0).abs() < 1e-6,
1413            "source(7)×gain(4)=28, got {}",
1414            sink_val
1415        );
1416    }
1417
1418    // ── Test: Feedback propagation ──────────────────────────────────
1419
1420    #[test]
1421    fn test_feedback_propagation() {
1422        use rill_core::traits::algorithm::ActionContext;
1423        use rill_core::traits::processable::{ProcessContext, Processable};
1424
1425        const BUF: usize = 64;
1426        let mut builder = test_builder::<BUF>();
1427        let src = builder.add_source(Box::new(ConstantSource::new(1.0, 44100.0)));
1428        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1429            NodeId(1),
1430            44100.0,
1431            2.0,
1432        )));
1433        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1434        builder.connect_signal(src, 0, proc, 0);
1435        builder.connect_signal(proc, 0, snk, 0);
1436        builder.connect_feedback(proc, 0, proc, 0);
1437        let graph = builder.build().unwrap();
1438        let (mut nodes, topo, _, _bufs) = graph.into_parts();
1439
1440        // ── Block 1: no feedback yet ──
1441        let tick1 = ClockTick::new(0, BUF as u32, 44100.0);
1442        let mut ctx = ProcessContext { clock: &tick1 };
1443        let _ = nodes[topo[0]].process_block(&mut ctx);
1444        let ctx1 = ActionContext::new(&tick1);
1445        let out_port = nodes[topo[0]].output_port(0).unwrap();
1446        out_port.propagate(out_port.buffer(), &ctx1).unwrap();
1447        let block1 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1448        assert!(
1449            (block1 - 2.0).abs() < 1e-6,
1450            "block1: 1.0×2.0=2.0, got {}",
1451            block1
1452        );
1453
1454        // ── Block 2: feedback from block1 should be mixed in ──
1455        let tick2 = ClockTick::new(BUF as u64, BUF as u32, 44100.0);
1456        let mut ctx = ProcessContext { clock: &tick2 };
1457        let _ = nodes[topo[0]].process_block(&mut ctx);
1458        let ctx2 = ActionContext::new(&tick2);
1459        let out_port = nodes[topo[0]].output_port(0).unwrap();
1460        out_port.propagate(out_port.buffer(), &ctx2).unwrap();
1461        let block2 = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1462        assert!(
1463            (block2 - 6.0).abs() < 1e-6,
1464            "block2: (1+2)×2=6.0, got {}",
1465            block2
1466        );
1467    }
1468
1469    // ── Test: drain_fn pattern (as used by AudioInput) ──────────────
1470
1471    #[test]
1472    fn test_drain_fn_before_propagate() {
1473        use rill_core::queues::{MpscQueue, SetParameter, SignalOrigin};
1474        use rill_core::traits::algorithm::ActionContext;
1475        use rill_core::traits::processable::{ProcessContext, Processable};
1476        use rill_core::traits::PortId;
1477
1478        const BUF: usize = 64;
1479        let queue: Arc<MpscQueue<SetParameter>> = Arc::new(MpscQueue::new());
1480
1481        let mut builder = test_builder::<BUF>();
1482        let src = builder.add_source(Box::new(ConstantSource::new(5.0, 44100.0)));
1483        let proc = builder.add_processor(Box::new(GainProcessor::<f32, BUF>::new(
1484            NodeId(1),
1485            44100.0,
1486            1.0,
1487        )));
1488        let snk = builder.add_sink(Box::new(TestSink::<f32, BUF>::new(NodeId(2), 44100.0)));
1489        builder.connect_signal(src, 0, proc, 0);
1490        builder.connect_signal(proc, 0, snk, 0);
1491        let graph = builder.build().unwrap();
1492        let (mut nodes, topo, _, _bufs) = graph.into_parts();
1493        let tick = ClockTick::new(0, BUF as u32, 44100.0);
1494
1495        // Push command BEFORE processing
1496        let _ = queue.push(SetParameter::new(
1497            PortId::control_in(NodeId(1), 0),
1498            ParameterId::new("multiplier").unwrap(),
1499            ParamValue::Float(3.0),
1500            SignalOrigin::Manual,
1501        ));
1502
1503        // Drain
1504        while let Some(cmd) = queue.pop() {
1505            let idx = cmd.port.node_id().inner() as usize;
1506            let pid = cmd.parameter.clone();
1507            let _ = nodes[idx].set_parameter(&pid, cmd.value.clone());
1508        }
1509
1510        // Verify parameter applied
1511        let pid = ParameterId::new("multiplier").unwrap();
1512        let val = nodes[1].get_parameter(&pid).unwrap().as_f32().unwrap();
1513        assert!(
1514            (val - 3.0).abs() < 1e-6,
1515            "multiplier should be 3.0, got {}",
1516            val
1517        );
1518
1519        // Source generate
1520        let mut ctx = ProcessContext { clock: &tick };
1521        let _ = nodes[topo[0]].process_block(&mut ctx).unwrap();
1522
1523        // Propagate
1524        let action_ctx = ActionContext::new(&tick);
1525        let out_port = nodes[topo[0]].output_port(0).unwrap();
1526        out_port.propagate(out_port.buffer(), &action_ctx).unwrap();
1527
1528        // Verify: source(5) × gain(3) = 15
1529        let sink_val = nodes[topo[2]].input_port(0).unwrap().buffer.as_array()[0];
1530        assert!(
1531            (sink_val - 15.0).abs() < 1e-6,
1532            "source(5)×gain(3)=15, got {}",
1533            sink_val
1534        );
1535    }
1536}