Skip to main content

rill_graph/
graph.rs

1use crate::factory::NodeFactory;
2use std::sync::Arc;
3
4use rill_core::buffer::{Buffer, BufferRegistry, FixedBuffer, TapeLoop};
5use rill_core::io::{IoCapture, IoDriver, IoPlayback};
6use rill_core::math::Transcendental;
7use rill_core::queues::CommandEnum;
8use rill_core::time::{ClockTick, RenderContext, SystemClock};
9use rill_core::traits::port::Port;
10use rill_core::traits::processable::Processable;
11use rill_core::traits::{Node, NodeId, NodeVariant, Params, ProcessResult};
12use rill_core_actor::{Actor, ActorRef, ActorSystem};
13use std::cell::UnsafeCell;
14use std::collections::VecDeque;
15use std::rc::Rc;
16use std::sync::atomic::AtomicBool;
17
18// ============================================================================
19// Internal routing metadata
20// ============================================================================
21
22// ============================================================================
23// Build Errors
24// ============================================================================
25
26/// Errors that can occur during graph construction.
27#[derive(Debug, Clone)]
28pub enum BuildError {
29    /// A cycle was detected in the signal edge graph.
30    CycleDetected,
31    /// Backend creation failed.
32    Backend(String),
33    /// Factory registration error (unknown node type).
34    Registry(String),
35}
36
37impl std::fmt::Display for BuildError {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Self::CycleDetected => write!(f, "graph cycle detected"),
41            Self::Backend(msg) => write!(f, "backend error: {msg}"),
42            Self::Registry(msg) => write!(f, "registry error: {msg}"),
43        }
44    }
45}
46
47// ============================================================================
48// Graph Builder
49// ============================================================================
50
51// ============================================================================
52// Node Storage
53// ============================================================================
54
55/// A deferred node recipe — constructed at [`build`](GraphBuilder::build) time.
56struct NodeRecipe<T: Transcendental, const BUF_SIZE: usize> {
57    type_name: String,
58    id: NodeId,
59    params: Params,
60    routing_entries: Vec<(usize, usize, f32)>,
61    _phantom: std::marker::PhantomData<(T, [(); BUF_SIZE])>,
62}
63
64/// Temporary holder during build — wraps a constructed node for wiring.
65struct NodeEntry<T: Transcendental, const BUF_SIZE: usize> {
66    node: NodeVariant<T, BUF_SIZE>,
67}
68
69// ============================================================================
70// GraphBuilder (Mutable Construction)
71// ============================================================================
72
73/// A named resource (tape loop) shared between nodes in the graph.
74#[derive(Clone)]
75pub struct GraphResource {
76    /// Unique name referenced by node parameters.
77    pub name: String,
78    /// Resource kind string (`"tape"`).
79    pub kind: String,
80    /// Capacity in samples (for `"tape"` kind).
81    pub capacity: usize,
82}
83
84/// Mutable builder for an immutable signal graph.
85///
86/// Stores deferred node recipes until [`build`](Self::build), which
87/// constructs all nodes, wires connections, and performs topological
88/// sort.  This keeps `GraphBuilder` `Send` — all non‑`Send` data
89/// is constructed inside the target thread.
90///
91/// # Node factory
92///
93/// The builder holds an [`Arc<NodeFactory>`] for constructing nodes by
94/// type name. Nodes registered via [`add_node_with_id`](Self::add_node_with_id)
95/// are only validated and constructed at [`build`](Self::build) time.
96pub struct GraphBuilder<T: Transcendental, const BUF_SIZE: usize> {
97    recipes: Vec<NodeRecipe<T, BUF_SIZE>>,
98    signal_edges: Vec<(usize, usize, usize, usize)>,
99    control_edges: Vec<(usize, usize, usize, usize)>,
100    clock_edges: Vec<(usize, usize, usize, usize)>,
101    feedback_edges: Vec<(usize, usize, usize, usize)>,
102    resources: Vec<GraphResource>,
103    /// Shared node factory (required, from Runtime).
104    factory: Arc<NodeFactory<T, BUF_SIZE>>,
105    /// Sample rate override. When set, used in [`build`](Self::build).
106    /// Populated from [`GraphDef::sample_rate`] during deserialization.
107    sample_rate: Option<f32>,
108    /// Parent RackCase ActorRef — Graph sends ClockTick here.
109    parent_ref: Option<ActorRef<CommandEnum>>,
110}
111
112impl<T: Transcendental, const BUF_SIZE: usize> GraphBuilder<T, BUF_SIZE> {
113    /// Create a new empty graph builder.
114    pub fn new(factory: Arc<NodeFactory<T, BUF_SIZE>>) -> Self {
115        Self {
116            recipes: Vec::new(),
117            signal_edges: Vec::new(),
118            control_edges: Vec::new(),
119            clock_edges: Vec::new(),
120            feedback_edges: Vec::new(),
121            resources: Vec::new(),
122            factory,
123            sample_rate: None,
124            parent_ref: None,
125        }
126    }
127
128    /// Add a node by type name using the internal factory.
129    ///
130    /// The type must be registered in the factory before [`build`](Self::build)
131    /// is called.
132    ///
133    /// Returns the index of the newly added node.
134    pub fn add_node(&mut self, type_name: &str, params: &Params) -> usize {
135        let id = NodeId(self.recipes.len() as u32);
136        self.add_node_with_id(type_name, params, id)
137    }
138
139    /// Add a node with an explicit [`NodeId`].
140    ///
141    /// Like [`add_node`](Self::add_node) but uses the provided `id`.
142    /// Important for serialization where external references depend on
143    /// exact IDs.
144    pub fn add_node_with_id(&mut self, type_name: &str, params: &Params, id: NodeId) -> usize {
145        let idx = self.recipes.len();
146        self.recipes.push(NodeRecipe {
147            type_name: type_name.to_string(),
148            id,
149            params: params.clone(),
150            routing_entries: Vec::new(),
151            _phantom: std::marker::PhantomData,
152        });
153        idx
154    }
155
156    /// Store a routing matrix entry to be applied at build time.
157    pub fn add_routing_entry(&mut self, idx: usize, from: usize, to: usize, gain: f32) {
158        if let Some(recipe) = self.recipes.get_mut(idx) {
159            recipe.routing_entries.push((from, to, gain));
160        }
161    }
162
163    /// Register a named resource (tape loop, buffer, etc.).
164    pub fn add_resource(&mut self, resource: GraphResource) {
165        self.resources.push(resource);
166    }
167
168    /// Number of nodes added to the builder so far.
169    pub fn node_count(&self) -> usize {
170        self.recipes.len()
171    }
172
173    /// Set the sample rate for this builder.
174    pub fn set_sample_rate(&mut self, sr: f32) {
175        self.sample_rate = Some(sr);
176    }
177
178    /// Set the parent RackCase actor reference (Graph → parent ClockTick).
179    pub fn set_parent_ref(&mut self, parent: ActorRef<CommandEnum>) {
180        self.parent_ref = Some(parent);
181    }
182
183    /// Connect signal ports.
184    pub fn connect_signal(
185        &mut self,
186        from_node: usize,
187        from_port: usize,
188        to_node: usize,
189        to_port: usize,
190    ) {
191        self.signal_edges
192            .push((from_node, from_port, to_node, to_port));
193    }
194
195    /// Connect control ports (modulation values).
196    pub fn connect_control(
197        &mut self,
198        from_node: usize,
199        from_port: usize,
200        to_node: usize,
201        to_port: usize,
202    ) {
203        self.control_edges
204            .push((from_node, from_port, to_node, to_port));
205    }
206
207    /// Connect clock ports (timing events).
208    pub fn connect_clock(
209        &mut self,
210        from_node: usize,
211        from_port: usize,
212        to_node: usize,
213        to_port: usize,
214    ) {
215        self.clock_edges
216            .push((from_node, from_port, to_node, to_port));
217    }
218
219    /// Connect feedback ports (delay lines, state carryover).
220    pub fn connect_feedback(
221        &mut self,
222        from_node: usize,
223        from_port: usize,
224        to_node: usize,
225        to_port: usize,
226    ) {
227        self.feedback_edges
228            .push((from_node, from_port, to_node, to_port));
229    }
230
231    /// Build the graph.
232    ///
233    /// Creates backends for nodes that have a backend name set (via
234    /// `SourceDef::backend` / `SinkDef::backend` or the builder's default).  Finds the active
235    /// (driver) node and stores its index for [`Graph::run`].
236    pub fn build(self, system: &ActorSystem) -> Result<Graph<T, BUF_SIZE>, BuildError> {
237        // Phase 1: Construct all nodes from recipes
238        let mut node_entries: Vec<NodeEntry<T, BUF_SIZE>> = Vec::with_capacity(self.recipes.len());
239        for recipe in &self.recipes {
240            let node = self
241                .factory
242                .construct(&recipe.type_name, recipe.id, &recipe.params)
243                .map_err(|e| BuildError::Registry(format!("{e}")))?;
244            node_entries.push(NodeEntry { node });
245        }
246
247        // Apply pre-configured routing entries
248        for (idx, node) in node_entries.iter_mut().enumerate() {
249            for &(from, to, gain) in &self.recipes[idx].routing_entries {
250                if let NodeVariant::Router(ref mut router) = node.node {
251                    router.set_connection(from, to, T::from_f32(gain)).ok();
252                }
253            }
254        }
255
256        let num_nodes = node_entries.len();
257
258        // --- Phase 2: adjacency for Kahn (signal edges only) ---
259        let mut in_degree = vec![0usize; num_nodes];
260        let mut out_edges: Vec<Vec<(usize, usize, usize)>> = vec![Vec::new(); num_nodes];
261
262        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
263            in_degree[to_n] += 1;
264            out_edges[from_n].push((from_p, to_n, to_p));
265        }
266
267        // --- Kahn's algorithm ---
268        let mut queue: VecDeque<usize> = in_degree
269            .iter()
270            .enumerate()
271            .filter(|(_, &d)| d == 0)
272            .map(|(i, _)| i)
273            .collect();
274
275        let mut topo = Vec::with_capacity(num_nodes);
276        let mut indeg = in_degree;
277        while let Some(idx) = queue.pop_front() {
278            topo.push(idx);
279            for &(_, to_n, _) in &out_edges[idx] {
280                indeg[to_n] -= 1;
281                if indeg[to_n] == 0 {
282                    queue.push_back(to_n);
283                }
284            }
285        }
286
287        if topo.len() != num_nodes {
288            return Err(BuildError::CycleDetected);
289        }
290
291        // --- collect nodes into final Vec ---
292        let mut nodes: Vec<NodeVariant<T, BUF_SIZE>> =
293            node_entries.into_iter().map(|e| e.node).collect();
294
295        // --- Phase 5: port pointer wiring on the final nodes Vec ---
296        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
297            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
298                port.downstream.push((to_n, to_p));
299            }
300            let in_ptr: *mut Port<T, BUF_SIZE> = nodes[to_n]
301                .input_port_mut(to_p)
302                .map(|p| p as *mut Port<T, BUF_SIZE>)
303                .unwrap_or(std::ptr::null_mut());
304            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
305            let out_ptr: *mut Port<T, BUF_SIZE> = nodes[from_n]
306                .output_port_mut(from_p)
307                .map(|p| p as *mut Port<T, BUF_SIZE>)
308                .unwrap_or(std::ptr::null_mut());
309            if !in_ptr.is_null() && !out_ptr.is_null() {
310                #[allow(unsafe_code)]
311                unsafe {
312                    (*in_ptr).parent = parent;
313                    (*out_ptr).downstream_input_ptrs.push(in_ptr);
314                }
315            }
316        }
317
318        // --- downstream_nodes ---
319        for &(from_n, from_p, to_n, _) in &self.signal_edges {
320            let parent: *mut NodeVariant<T, BUF_SIZE> = &mut nodes[to_n];
321            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
322                let ptr_val = parent as usize;
323                let already = port.downstream_nodes.iter().any(|&p| p as usize == ptr_val);
324                if !already {
325                    port.downstream_nodes.push(parent);
326                }
327            }
328        }
329
330        // --- upstream_buffer ---
331        for &(from_n, from_p, to_n, to_p) in &self.signal_edges {
332            let upstream = nodes[from_n]
333                .output_port(from_p)
334                .map(|p| &p.buffer as *const FixedBuffer<T, BUF_SIZE>);
335            if let Some(port) = nodes[to_n].input_port_mut(to_p) {
336                if port.upstream_buffer.is_none() {
337                    port.upstream_buffer = upstream;
338                } else {
339                    port.upstream_buffer = None;
340                }
341            }
342        }
343
344        // --- feedback buffers ---
345        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
346            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
347                port.feedback_buffer = Some(FixedBuffer::new());
348                port.feedback_downstream.push((to_n, to_p));
349            }
350            if let Some(port) = nodes[to_n].input_port_mut(to_p) {
351                port.feedback_buffer = Some(FixedBuffer::new());
352            }
353        }
354        for &(from_n, from_p, to_n, to_p) in &self.feedback_edges {
355            let ptr = nodes[to_n]
356                .input_port(to_p)
357                .map(|p| &p.feedback_buffer as *const Option<FixedBuffer<T, BUF_SIZE>>)
358                .map(|r| r as *mut Option<FixedBuffer<T, BUF_SIZE>>);
359            if let Some(port) = nodes[from_n].output_port_mut(from_p) {
360                if let Some(p) = ptr {
361                    port.feedback_ptrs.push(p);
362                }
363            }
364        }
365
366        // Allocate named buffers
367        let mut buffers = BufferRegistry::new();
368        for r in &self.resources {
369            if r.kind == "tape" {
370                if let Some(tape) = TapeLoop::<T>::new(r.capacity) {
371                    buffers.register(&r.name, Box::new(tape));
372                }
373            }
374        }
375        for entry in nodes.iter_mut() {
376            entry.resolve_resources(&buffers);
377        }
378
379        let source_idx = topo.first().copied().unwrap_or(0);
380
381        let owned_buffers = buffers.into_inner();
382        let allocated = self.resources.clone();
383
384        // Wrap nodes in Rc<UnsafeCell<Vec<>>> — port pointers already valid in this Vec.
385        let nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>> = Rc::new(UnsafeCell::new(nodes));
386
387        let actor = system.spawn("graph", {
388            let n = nodes.clone();
389            #[allow(unsafe_code)]
390            move |msg: CommandEnum| {
391                if let CommandEnum::SetParameter(param) = msg {
392                    let idx = param.port.node_id().inner() as usize;
393                    unsafe {
394                        let nv = &mut *n.get();
395                        if idx < nv.len() {
396                            let _ = nv[idx].set_parameter(&param.parameter, param.value);
397                        }
398                    }
399                }
400            }
401        });
402
403        let actor_ref = actor.actor_ref();
404
405        Ok(Graph {
406            nodes,
407            topo_order: topo,
408            resources: allocated,
409            current_tick: ClockTick::new(
410                0,
411                BUF_SIZE as u32,
412                self.sample_rate.unwrap_or(44100.0),
413                String::new(),
414            ),
415            buffers: owned_buffers,
416            source_idx,
417            actor: Some(actor),
418            actor_ref,
419            parent_ref: self.parent_ref.clone(),
420            system_clock: None,
421        })
422    }
423}
424
425// ============================================================================
426// Graph (Static DAG)
427// ============================================================================
428
429/// Owned parts of a [`Graph`] returned by `into_parts` (test only).
430#[cfg(test)]
431type GraphParts<T, const BUF_SIZE: usize> = (
432    Vec<NodeVariant<T, BUF_SIZE>>,
433    Vec<usize>,
434    ClockTick,
435    Vec<Box<dyn Buffer<T> + Send>>,
436);
437
438/// Immutable signal graph with static DAG topology.
439///
440/// Once built the graph cannot be modified. The graph owns no processing
441/// logic — it is a pure topology description. Processing is driven by
442/// port-level methods (`pre_process`, `snapshot_feedback`, `propagate`)
443/// called from external code (e.g. a real-time signal callback or an
444/// offline renderer).
445pub struct Graph<T: Transcendental, const BUF_SIZE: usize> {
446    nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>>,
447    topo_order: Vec<usize>,
448    source_idx: usize,
449    current_tick: ClockTick,
450    pub(crate) resources: Vec<GraphResource>,
451    #[allow(dead_code)]
452    buffers: Vec<Box<dyn Buffer<T> + Send>>,
453    actor: Option<Actor<CommandEnum>>,
454    actor_ref: ActorRef<CommandEnum>,
455    parent_ref: Option<ActorRef<CommandEnum>>,
456    /// Optional shared system clock, updated by external sync sources (MIDI, JACK transport).
457    /// When set, the I/O callback reads BPM from it and creates `ClockTick::with_tempo`.
458    pub system_clock: Option<Arc<SystemClock>>,
459}
460
461/// Owned processing state extracted from a [`Graph`].
462///
463/// Holds the parts needed for the I/O callback loop: the actor mailbox
464/// for draining `SetParameter` commands, the node array, and routing
465/// metadata.  The state is `!Send + !Sync` — it stays on the I/O thread.
466///
467/// Created via [`Graph::into_processing_state`].
468pub struct ProcessingState<T: Transcendental, const BUF_SIZE: usize> {
469    actor: Actor<CommandEnum>,
470    nodes: Rc<UnsafeCell<Vec<NodeVariant<T, BUF_SIZE>>>>,
471    source_idx: usize,
472    parent_ref: Option<ActorRef<CommandEnum>>,
473    system_clock: Option<Arc<SystemClock>>,
474    #[allow(dead_code)]
475    buffers: Vec<Box<dyn Buffer<T> + Send>>,
476}
477
478impl<T: Transcendental, const BUF_SIZE: usize> ProcessingState<T, BUF_SIZE> {
479    /// Process one block of signal data driven by an external [`ClockTick`].
480    ///
481    /// Same logic as [`Graph::process_block`] but operates on independently
482    /// owned state (no borrow of the original `Graph`).
483    #[allow(unsafe_code)]
484    pub fn process_block(&mut self, tick: &ClockTick) -> ProcessResult<()> {
485        self.actor.drain();
486        let mut ctx = if let Some(ref clock) = self.system_clock {
487            RenderContext::with_tempo(
488                tick.sample_pos,
489                tick.samples_since_last,
490                tick.sample_rate,
491                clock.bpm() as f32,
492            )
493        } else {
494            RenderContext::new(tick.sample_pos, tick.samples_since_last, tick.sample_rate)
495        };
496        ctx.speed_ratio = tick.speed_ratio;
497        unsafe {
498            let nv = &mut *self.nodes.get();
499            let _ = nv[self.source_idx].process_block(&ctx, tick);
500            for po in 0..nv[self.source_idx].num_signal_outputs() {
501                if let Some(port) = nv[self.source_idx].output_port(po) {
502                    let _ = port.propagate(port.buffer(), &ctx, tick);
503                }
504            }
505        }
506        Ok(())
507    }
508
509    /// Send a ClockTick to the parent actor (rack fan-out).
510    ///
511    /// Called by the backend's process callback at the appropriate time
512    /// (once per I/O callback for standard backends, once per DMA buffer
513    /// for chunking backends).
514    pub fn send_clock_tick(&self, tick: &ClockTick) {
515        if tick.is_final {
516            if let Some(ref parent) = self.parent_ref {
517                parent.send(CommandEnum::ClockTick(tick.clone()));
518            }
519        }
520    }
521
522    /// Wire capture/playback backends into Source/Sink nodes after graph construction.
523    ///
524    /// Must be called after `into_processing_state()` and before the driver starts.
525    /// Only Source nodes respond to `set_capture`; only Sink nodes respond to
526    /// `set_playback`.  Processor and Router nodes ignore both.
527    #[allow(unsafe_code)]
528    pub fn wire_backends(
529        &mut self,
530        capture: Option<Arc<dyn IoCapture>>,
531        playback: Option<Arc<dyn IoPlayback>>,
532    ) {
533        unsafe {
534            let nv = &mut *self.nodes.get();
535            for node in nv.iter_mut() {
536                if let Some(ref c) = capture {
537                    if let NodeVariant::Source(src) = node {
538                        src.set_capture(c.clone())
539                    }
540                }
541                if let Some(ref p) = playback {
542                    if let NodeVariant::Sink(sink) = node {
543                        sink.set_playback(p.clone())
544                    }
545                }
546            }
547        }
548    }
549
550    /// Run this processing state with a pre-created driver backend.
551    ///
552    /// Consumes `self`, wires the process callback, enters the I/O loop.
553    /// The `running` flag controls shutdown.
554    pub fn run_with_driver(
555        mut self,
556        driver: Arc<dyn IoDriver>,
557        running: Arc<AtomicBool>,
558    ) -> Result<(), String> {
559        self.actor.drain();
560        driver.set_process_callback(Box::new(move |tick: &ClockTick| {
561            let _ = self.process_block(tick);
562            self.send_clock_tick(tick);
563        }));
564        driver.run(running.clone())?;
565        while running.load(std::sync::atomic::Ordering::Acquire) {
566            std::thread::park();
567        }
568        let _ = driver.stop();
569        Ok(())
570    }
571}
572
573impl<T: Transcendental, const BUF_SIZE: usize> Graph<T, BUF_SIZE> {
574    // ========================================================================
575    // Accessors
576    // ========================================================================
577
578    /// Borrow the node array (read-only).
579    #[allow(unsafe_code)]
580    pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
581        unsafe { &*self.nodes.get() }
582    }
583
584    /// Return the current clock tick.
585    pub fn current_tick(&self) -> ClockTick {
586        self.current_tick.clone()
587    }
588
589    /// Return the number of nodes in the graph.
590    #[allow(unsafe_code)]
591    pub fn node_count(&self) -> usize {
592        unsafe { (*self.nodes.get()).len() }
593    }
594
595    /// Return the topological ordering of node indices.
596    pub fn topo_order(&self) -> &[usize] {
597        &self.topo_order
598    }
599
600    #[allow(dead_code)]
601    pub(crate) fn sample_rate(&self) -> f32 {
602        self.current_tick.sample_rate
603    }
604
605    /// Access the named resources (tape loops, etc.) allocated for this graph.
606    #[allow(dead_code)]
607    pub fn resources(&self) -> &[GraphResource] {
608        &self.resources
609    }
610
611    /// Process one block of signal data driven by an external [`ClockTick`].
612    ///
613    /// Called from the backend's process callback. Performs:
614    ///
615    /// 1. Drains the graph's actor mailbox (applies queued `SetParameter`s).
616    /// 2. Creates a [`RenderContext`] from the tick.
617    /// 3. Calls `process_block` on the source node and recursively
618    ///    propagates through the DAG via [`Port::propagate`].
619    /// 4. Sends the tick to the parent [`ActorRef`] (if any).
620    ///
621    /// The graph is `!Send + !Sync` — it stays on the I/O callback thread.
622    #[allow(unsafe_code)]
623    pub fn process_block(&mut self, tick: &ClockTick) -> ProcessResult<()> {
624        if let Some(ref mut actor) = self.actor {
625            actor.drain();
626        }
627        let ctx = if let Some(ref clock) = self.system_clock {
628            RenderContext::with_tempo(
629                tick.sample_pos,
630                tick.samples_since_last,
631                tick.sample_rate,
632                clock.bpm() as f32,
633            )
634        } else {
635            RenderContext::new(tick.sample_pos, tick.samples_since_last, tick.sample_rate)
636        };
637        self.current_tick = tick.clone();
638        unsafe {
639            let nv = &mut *self.nodes.get();
640            let _ = nv[self.source_idx].process_block(&ctx, tick);
641            for po in 0..nv[self.source_idx].num_signal_outputs() {
642                if let Some(port) = nv[self.source_idx].output_port(po) {
643                    let _ = port.propagate(port.buffer(), &ctx, tick);
644                }
645            }
646        }
647        Ok(())
648    }
649
650    /// Consume the graph and return a [`ProcessingState`] that owns all
651    /// parts needed for the I/O callback loop.
652    ///
653    /// `ProcessingState` is `!Send + !Sync` — it stays on the I/O thread
654    /// and is moved into the backend's process callback closure.
655    pub fn into_processing_state(mut self) -> ProcessingState<T, BUF_SIZE> {
656        let actor = self.actor.take().expect("graph actor missing");
657        ProcessingState {
658            actor,
659            nodes: self.nodes,
660            source_idx: self.source_idx,
661            parent_ref: self.parent_ref,
662            system_clock: self.system_clock,
663            buffers: self.buffers,
664        }
665    }
666
667    /// Obtain an [`ActorRef`] for sending commands to this graph.
668    pub fn handle(&self) -> ActorRef<CommandEnum> {
669        self.actor_ref.clone()
670    }
671
672    /// Consume the graph and return its owned parts (test only).
673    #[cfg(test)]
674    pub fn into_parts(self) -> GraphParts<T, BUF_SIZE> {
675        let Self {
676            nodes,
677            topo_order,
678            current_tick,
679            resources: _,
680            buffers,
681            source_idx: _,
682            actor,
683            actor_ref: _,
684            parent_ref: _,
685            system_clock: _,
686        } = self;
687        drop(actor);
688        let nodes = Rc::try_unwrap(nodes).unwrap().into_inner();
689        (nodes, topo_order, current_tick, buffers)
690    }
691}
692
693#[cfg(test)]
694mod tests {
695    use super::*;
696    use rill_core::math::Transcendental;
697    use rill_core::time::RenderContext;
698
699    use rill_core::traits::{
700        Node, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
701        ProcessResult, Processor, Sink, Source,
702    };
703    use rill_core_actor::ActorSystem;
704    use std::sync::Arc;
705
706    fn test_system() -> ActorSystem {
707        ActorSystem::new()
708    }
709
710    fn test_factory<const B: usize>() -> Arc<NodeFactory<f32, B>> {
711        let mut f = NodeFactory::<f32, B>::new();
712
713        f.register_fn("test/const", |id, params| {
714            let value = params.get_f32("value", 1.0);
715            let mut node = ConstantSource::<f32, B>::new(id, value, params.sample_rate);
716            node.init(params.sample_rate);
717            NodeVariant::Source(Box::new(node))
718        });
719
720        f.register_fn("test/gain", |id, params| {
721            let gain = params.get_f32("gain", 1.0);
722            let mut node = GainProcessor::<f32, B>::new(id, params.sample_rate, gain);
723            node.init(params.sample_rate);
724            NodeVariant::Processor(Box::new(node))
725        });
726
727        f.register_fn("test/capture", |id, params| {
728            let mut node = CaptureSink::<f32, B>::new(id, params.sample_rate);
729            node.init(params.sample_rate);
730            NodeVariant::Sink(Box::new(node))
731        });
732
733        Arc::new(f)
734    }
735
736    fn test_builder<const B: usize>(factory: &Arc<NodeFactory<f32, B>>) -> GraphBuilder<f32, B> {
737        GraphBuilder::new(factory.clone())
738    }
739
740    fn test_params(sample_rate: f32) -> Params {
741        let mut p = Params::new(sample_rate);
742        p.insert("value".to_string(), ParamValue::Float(sample_rate));
743        p
744    }
745
746    // ------------------------------------------------------------------------
747    // Test node implementations
748    // ------------------------------------------------------------------------
749
750    pub(crate) struct ConstantSource<T: Transcendental, const B: usize> {
751        id: NodeId,
752        value: T,
753        state: NodeState<T, B>,
754        output: Port<T, B>,
755    }
756
757    impl<T: Transcendental, const B: usize> ConstantSource<T, B> {
758        pub fn new(id: NodeId, value: T, sample_rate: f32) -> Self {
759            let state = NodeState::new(sample_rate);
760            let mut output = Port::output(id, 0, "out");
761            output.buffer = FixedBuffer::new();
762            Self {
763                id,
764                value,
765                state,
766                output,
767            }
768        }
769    }
770
771    impl<T: Transcendental, const B: usize> Node<T, B> for ConstantSource<T, B> {
772        fn id(&self) -> NodeId {
773            self.id
774        }
775        fn set_id(&mut self, id: NodeId) {
776            self.id = id;
777        }
778        fn metadata(&self) -> NodeMetadata {
779            NodeMetadata {
780                name: "ConstantSource".into(),
781                type_name: Some("test/const".into()),
782                category: NodeCategory::Source,
783                description: String::new(),
784                author: String::new(),
785                version: String::new(),
786                parameters: vec![],
787                signal_inputs: 0,
788                signal_outputs: 1,
789                control_inputs: 0,
790                control_outputs: 0,
791                clock_inputs: 0,
792                clock_outputs: 0,
793                feedback_ports: 0,
794            }
795        }
796        fn init(&mut self, _: f32) {}
797        fn reset(&mut self) {}
798        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
799            None
800        }
801        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
802            Ok(())
803        }
804        fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
805            None
806        }
807        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
808            None
809        }
810        fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
811            if i == 0 {
812                Some(&self.output)
813            } else {
814                None
815            }
816        }
817        fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
818            if i == 0 {
819                Some(&mut self.output)
820            } else {
821                None
822            }
823        }
824        fn input_port(&self, _: usize) -> Option<&Port<T, B>> {
825            None
826        }
827        fn input_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
828            None
829        }
830        fn state(&self) -> &NodeState<T, B> {
831            &self.state
832        }
833        fn state_mut(&mut self) -> &mut NodeState<T, B> {
834            &mut self.state
835        }
836    }
837
838    impl<T: Transcendental, const B: usize> Source<T, B> for ConstantSource<T, B> {
839        fn generate(
840            &mut self,
841            _: &RenderContext,
842            _: &[T],
843            _: &[RenderContext],
844            _: &ClockTick,
845        ) -> ProcessResult<()> {
846            self.output.buffer.as_mut_array().fill(self.value);
847            Ok(())
848        }
849    }
850
851    // ------------------------------------------------------------------------
852    // GainProcessor
853    // ------------------------------------------------------------------------
854
855    pub(crate) struct GainProcessor<T: Transcendental, const B: usize> {
856        id: NodeId,
857        gain: T,
858        state: NodeState<T, B>,
859        input: Port<T, B>,
860        output: Port<T, B>,
861    }
862
863    impl<T: Transcendental, const B: usize> GainProcessor<T, B> {
864        pub fn new(id: NodeId, sample_rate: f32, gain: T) -> Self {
865            let state = NodeState::new(sample_rate);
866            let input = Port::input(id, 0, "in");
867            let output = Port::output(id, 0, "out");
868            Self {
869                id,
870                gain,
871                state,
872                input,
873                output,
874            }
875        }
876    }
877
878    impl<T: Transcendental, const B: usize> Node<T, B> for GainProcessor<T, B> {
879        fn id(&self) -> NodeId {
880            self.id
881        }
882        fn set_id(&mut self, id: NodeId) {
883            self.id = id;
884        }
885        fn metadata(&self) -> NodeMetadata {
886            NodeMetadata {
887                name: "GainProcessor".into(),
888                type_name: Some("test/gain".into()),
889                category: NodeCategory::Processor,
890                description: String::new(),
891                author: String::new(),
892                version: String::new(),
893                parameters: vec![],
894                signal_inputs: 1,
895                signal_outputs: 1,
896                control_inputs: 0,
897                control_outputs: 0,
898                clock_inputs: 0,
899                clock_outputs: 0,
900                feedback_ports: 0,
901            }
902        }
903        fn init(&mut self, _: f32) {}
904        fn reset(&mut self) {}
905        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
906            None
907        }
908        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
909            Ok(())
910        }
911        fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
912            None
913        }
914        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
915            None
916        }
917        fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
918            if i == 0 {
919                Some(&self.input)
920            } else {
921                None
922            }
923        }
924        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
925            if i == 0 {
926                Some(&mut self.input)
927            } else {
928                None
929            }
930        }
931        fn num_signal_outputs(&self) -> usize {
932            1
933        }
934        fn output_port(&self, i: usize) -> Option<&Port<T, B>> {
935            if i == 0 {
936                Some(&self.output)
937            } else {
938                None
939            }
940        }
941        fn output_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
942            if i == 0 {
943                Some(&mut self.output)
944            } else {
945                None
946            }
947        }
948        fn state(&self) -> &NodeState<T, B> {
949            &self.state
950        }
951        fn state_mut(&mut self) -> &mut NodeState<T, B> {
952            &mut self.state
953        }
954    }
955
956    impl<T: Transcendental, const B: usize> Processor<T, B> for GainProcessor<T, B> {
957        fn process(
958            &mut self,
959            _: &RenderContext,
960            _: &[&[T; B]],
961            _: &[T],
962            _: &[RenderContext],
963            _: &[&[T; B]],
964        ) -> ProcessResult<()> {
965            let src = self.input.buffer.as_array();
966            let buf = self.output.buffer.as_mut_array();
967            for i in 0..B {
968                buf[i] = src[i] * self.gain;
969            }
970            Ok(())
971        }
972    }
973
974    // ------------------------------------------------------------------------
975    // CaptureSink — captures first sample of each block
976    // ------------------------------------------------------------------------
977
978    pub(crate) struct CaptureSink<T: Transcendental, const B: usize> {
979        id: NodeId,
980        state: NodeState<T, B>,
981        input: Port<T, B>,
982    }
983
984    impl<T: Transcendental, const B: usize> CaptureSink<T, B> {
985        pub fn new(id: NodeId, sample_rate: f32) -> Self {
986            let state = NodeState::new(sample_rate);
987            let input = Port::input(id, 0, "in");
988            Self { id, state, input }
989        }
990    }
991
992    impl<T: Transcendental, const B: usize> Node<T, B> for CaptureSink<T, B> {
993        fn id(&self) -> NodeId {
994            self.id
995        }
996        fn set_id(&mut self, id: NodeId) {
997            self.id = id;
998        }
999        fn metadata(&self) -> NodeMetadata {
1000            NodeMetadata {
1001                name: "CaptureSink".into(),
1002                type_name: Some("test/capture".into()),
1003                category: NodeCategory::Sink,
1004                description: String::new(),
1005                author: String::new(),
1006                version: String::new(),
1007                parameters: vec![],
1008                signal_inputs: 1,
1009                signal_outputs: 0,
1010                control_inputs: 0,
1011                control_outputs: 0,
1012                clock_inputs: 0,
1013                clock_outputs: 0,
1014                feedback_ports: 0,
1015            }
1016        }
1017        fn init(&mut self, _: f32) {}
1018        fn reset(&mut self) {}
1019        fn get_parameter(&self, _: &ParameterId) -> Option<ParamValue> {
1020            None
1021        }
1022        fn set_parameter(&mut self, _: &ParameterId, _: ParamValue) -> ProcessResult<()> {
1023            Ok(())
1024        }
1025        fn control_port(&self, _: usize) -> Option<&Port<T, B>> {
1026            None
1027        }
1028        fn control_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
1029            None
1030        }
1031        fn output_port(&self, _: usize) -> Option<&Port<T, B>> {
1032            None
1033        }
1034        fn output_port_mut(&mut self, _: usize) -> Option<&mut Port<T, B>> {
1035            None
1036        }
1037        fn input_port(&self, i: usize) -> Option<&Port<T, B>> {
1038            if i == 0 {
1039                Some(&self.input)
1040            } else {
1041                None
1042            }
1043        }
1044        fn input_port_mut(&mut self, i: usize) -> Option<&mut Port<T, B>> {
1045            if i == 0 {
1046                Some(&mut self.input)
1047            } else {
1048                None
1049            }
1050        }
1051        fn state(&self) -> &NodeState<T, B> {
1052            &self.state
1053        }
1054        fn state_mut(&mut self) -> &mut NodeState<T, B> {
1055            &mut self.state
1056        }
1057    }
1058
1059    impl<T: Transcendental, const B: usize> Sink<T, B> for CaptureSink<T, B> {
1060        fn consume(
1061            &mut self,
1062            _: &RenderContext,
1063            _: &[&[T; B]],
1064            _: &[T],
1065            _: &[RenderContext],
1066            _: &[&[T; B]],
1067            _: &ClockTick,
1068        ) -> ProcessResult<()> {
1069            Ok(())
1070        }
1071    }
1072
1073    // ------------------------------------------------------------------------
1074    // Graph signal flow tests
1075    // ------------------------------------------------------------------------
1076
1077    const BUF: usize = 64;
1078
1079    #[test]
1080    #[allow(unsafe_code)]
1081    fn test_graph_source_to_sink() {
1082        let factory = test_factory::<BUF>();
1083        let mut builder = test_builder::<BUF>(&factory);
1084        let system = test_system();
1085
1086        let src_idx = builder.add_node("test/const", &test_params(44100.0));
1087        let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1088        builder.connect_signal(src_idx, 0, snk_idx, 0);
1089
1090        let graph = builder.build(&system).unwrap();
1091        let source_idx = graph.source_idx;
1092
1093        let ctx = RenderContext::new(0, BUF as u32, 44100.0);
1094        let tick = ClockTick::new(0, BUF as u32, 44100.0, String::new());
1095        let nodes = graph.nodes.clone();
1096        unsafe {
1097            let nv = &mut *nodes.get();
1098            nv[source_idx].process_block(&ctx, &tick).unwrap();
1099            if let Some(port) = nv[source_idx].output_port(0) {
1100                port.propagate(port.buffer(), &ctx, &tick).unwrap();
1101            }
1102        }
1103        unsafe {
1104            let nv = &*nodes.get();
1105            let val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1106            assert!(val != 0.0, "signal should have propagated, got {}", val);
1107        }
1108    }
1109
1110    #[test]
1111    #[allow(unsafe_code)]
1112    fn test_graph_source_proc_sink() {
1113        let factory = test_factory::<BUF>();
1114        let mut builder = test_builder::<BUF>(&factory);
1115        let system = test_system();
1116
1117        let mut params = test_params(44100.0);
1118        params.insert("value".to_string(), ParamValue::Float(5.0));
1119        let src_idx = builder.add_node("test/const", &params);
1120
1121        let mut gain_params = test_params(44100.0);
1122        gain_params.insert("gain".to_string(), ParamValue::Float(3.0));
1123        let proc_idx = builder.add_node("test/gain", &gain_params);
1124
1125        let snk_idx = builder.add_node("test/capture", &test_params(44100.0));
1126
1127        builder.connect_signal(src_idx, 0, proc_idx, 0);
1128        builder.connect_signal(proc_idx, 0, snk_idx, 0);
1129
1130        let graph = builder.build(&system).unwrap();
1131        let source_idx = graph.source_idx;
1132
1133        eprintln!("topo: {:?}", graph.topo_order);
1134        eprintln!("source_idx: {source_idx}, src_idx: {src_idx}, proc_idx: {proc_idx}, snk_idx: {snk_idx}");
1135
1136        let ctx = RenderContext::new(0, BUF as u32, 44100.0);
1137        let tick = ClockTick::new(0, BUF as u32, 44100.0, String::new());
1138        let nodes = graph.nodes.clone();
1139        unsafe {
1140            let nv = &mut *nodes.get();
1141            eprintln!(
1142                "node types: src={:?}, proc={:?}, snk={:?}",
1143                std::mem::discriminant(&nv[0]),
1144                std::mem::discriminant(&nv[1]),
1145                std::mem::discriminant(&nv[2]),
1146            );
1147
1148            let _ = nv[source_idx].process_block(&ctx, &tick);
1149            let src_val = nv[source_idx].output_port(0).unwrap().buffer.as_array()[0];
1150            eprintln!("source output: {src_val}");
1151
1152            let out_port = nv[source_idx].output_port(0).unwrap();
1153            eprintln!(
1154                "source output port downstream_nodes: {}",
1155                out_port.downstream_nodes.len()
1156            );
1157            eprintln!(
1158                "source output port downstream_input_ptrs: {}",
1159                out_port.downstream_input_ptrs.len()
1160            );
1161
1162            // Check processor output port connections BEFORE propagate
1163            {
1164                let proc_port = nv[proc_idx].output_port(0).unwrap();
1165                eprintln!(
1166                    "PROC OUT port downstream_nodes: {}",
1167                    proc_port.downstream_nodes.len()
1168                );
1169                eprintln!(
1170                    "PROC OUT port downstream_input_ptrs: {}",
1171                    proc_port.downstream_input_ptrs.len()
1172                );
1173                for (i, &dn) in proc_port.downstream.iter().enumerate() {
1174                    eprintln!("  downstream[{}]: (node={}, port={})", i, dn.0, dn.1);
1175                }
1176            }
1177
1178            // --- BUFFER ADDRESS DEBUG ---
1179            let src_out = nv[source_idx].output_port(0).unwrap();
1180            let proc_in = nv[proc_idx].input_port(0).unwrap();
1181            let proc_out = nv[proc_idx].output_port(0).unwrap();
1182            let snk_in = nv[snk_idx].input_port(0).unwrap();
1183            eprintln!("BUFFER ADDRESSES:");
1184            eprintln!(
1185                "  src output buf:  {:p}",
1186                src_out.buffer.as_array().as_ptr()
1187            );
1188            eprintln!(
1189                "  proc input buf:  {:p}",
1190                proc_in.buffer.as_array().as_ptr()
1191            );
1192            eprintln!(
1193                "  proc output buf: {:p}",
1194                proc_out.buffer.as_array().as_ptr()
1195            );
1196            eprintln!("  snk input buf:   {:p}", snk_in.buffer.as_array().as_ptr());
1197            eprintln!(
1198                "  proc_in.upstream_buffer.is_some(): {}",
1199                proc_in.upstream_buffer.is_some()
1200            );
1201            eprintln!(
1202                "  snk_in.upstream_buffer.is_some(): {}",
1203                snk_in.upstream_buffer.is_some()
1204            );
1205            // --- END DEBUG ---
1206
1207            out_port.propagate(out_port.buffer(), &ctx, &tick).unwrap();
1208
1209            // --- AFTER PROPAGATE: debug buffer values ---
1210            {
1211                let nv = &*nodes.get();
1212                let snk_in = nv[snk_idx].input_port(0).unwrap();
1213                eprintln!(
1214                    "AFTER propagate - snk input buf[0] via .buffer: {}",
1215                    snk_in.buffer.as_array()[0]
1216                );
1217                if let Some(up) = snk_in.upstream_buffer {
1218                    eprintln!(
1219                        "AFTER propagate - snk input via upstream ptr: {}",
1220                        (*up).as_array()[0]
1221                    );
1222                }
1223            }
1224
1225            let sink_buf = nv[snk_idx].input_port(0).unwrap().buffer.as_array();
1226            eprintln!("SINK input port buffer first sample: {}", sink_buf[0]);
1227
1228            // Check processor output port propagation
1229            let proc_out_port = nv[proc_idx].output_port(0).unwrap();
1230            eprintln!(
1231                "proc output port downstream_nodes: {}",
1232                proc_out_port.downstream_nodes.len()
1233            );
1234            eprintln!(
1235                "proc output port downstream_input_ptrs: {}",
1236                proc_out_port.downstream_input_ptrs.len()
1237            );
1238
1239            // Sink
1240            let sink_val = nv[snk_idx].input_port(0).unwrap().buffer.as_array()[0];
1241            eprintln!("sink input AFTER propagate: {sink_val}");
1242
1243            assert!(
1244                (sink_val - 15.0).abs() < 1e-4,
1245                "expected 15.0, got {}",
1246                sink_val
1247            );
1248        }
1249    }
1250}