Skip to main content

aether_core/
graph.rs

1//! Directed Acyclic Graph (DAG) for DSP routing.
2//!
3//! The graph owns the node arena and buffer pool.
4//! Topological sort produces a flat execution order — no recursion in the RT path.
5
6use crate::{
7    arena::{Arena, NodeId},
8    buffer_pool::BufferPool,
9    node::{DspNode, NodeRecord},
10    MAX_NODES,
11};
12use std::collections::HashMap;
13
14/// Directed Acyclic Graph (DAG) for DSP routing.
15///
16/// The graph owns the node arena and buffer pool. It maintains a topologically
17/// sorted execution order and BFS level structure for parallel processing.
18///
19/// # Structure
20///
21/// - **Arena**: Generational arena storing node records
22/// - **Buffer Pool**: Pre-allocated audio buffers (no RT allocation)
23/// - **Execution Order**: Flat topologically sorted node list
24/// - **BFS Levels**: Nodes grouped by dependency depth for parallel execution
25///
26/// # Example
27///
28/// ```
29/// use aether_core::graph::DspGraph;
30/// use aether_core::node::DspNode;
31/// use aether_core::param::ParamBlock;
32/// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
33///
34/// struct Gain { gain: f32 }
35/// impl DspNode for Gain {
36///     fn process(&mut self, inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
37///                output: &mut [f32; BUFFER_SIZE], _params: &mut ParamBlock, _sr: f32) {
38///         if let Some(input) = inputs[0] {
39///             for (i, out) in output.iter_mut().enumerate() {
40///                 *out = input[i] * self.gain;
41///             }
42///         }
43///     }
44///     fn type_name(&self) -> &'static str { "Gain" }
45/// }
46///
47/// let mut graph = DspGraph::new();
48/// let gain_id = graph.add_node(Box::new(Gain { gain: 0.5 })).unwrap();
49/// graph.set_output_node(gain_id);
50/// ```
51pub struct DspGraph {
52    pub arena: Arena<NodeRecord>,
53    pub buffers: BufferPool,
54    /// Topologically sorted execution order. Rebuilt on structural mutations.
55    pub execution_order: Vec<NodeId>,
56    /// BFS wave levels: each inner Vec contains nodes that can execute in parallel.
57    /// Level[i] nodes all depend only on nodes in levels 0..i.
58    pub levels: Vec<Vec<NodeId>>,
59    /// The node whose output buffer is sent to the DAC.
60    pub output_node: Option<NodeId>,
61    /// Adjacency list: node index → list of (dst_node, slot) it feeds into.
62    forward_edges: HashMap<u32, Vec<(NodeId, usize)>>,
63    /// Maps slot index → full NodeId (for topo sort without generation scanning).
64    index_to_id: HashMap<u32, NodeId>,
65}
66
67impl DspGraph {
68    /// Creates a new empty DSP graph.
69    ///
70    /// Initializes the arena, buffer pool, and execution structures with
71    /// pre-allocated capacity for `MAX_NODES` nodes.
72    ///
73    /// # Example
74    ///
75    /// ```
76    /// use aether_core::graph::DspGraph;
77    ///
78    /// let graph = DspGraph::new();
79    /// assert_eq!(graph.execution_order.len(), 0);
80    /// ```
81    pub fn new() -> Self {
82        Self {
83            arena: Arena::with_capacity(MAX_NODES),
84            buffers: BufferPool::default(),
85            execution_order: Vec::with_capacity(MAX_NODES),
86            levels: Vec::with_capacity(MAX_NODES),
87            output_node: None,
88            forward_edges: HashMap::new(),
89            index_to_id: HashMap::new(),
90        }
91    }
92
93    /// Adds a node to the graph and returns its ID.
94    ///
95    /// Acquires a buffer from the pool, inserts the node into the arena,
96    /// and rebuilds the topological execution order.
97    ///
98    /// # Arguments
99    ///
100    /// * `processor` - Boxed DSP node implementation
101    ///
102    /// # Returns
103    ///
104    /// * `Some(NodeId)` - The node's unique identifier
105    /// * `None` - If arena is full or buffer pool exhausted
106    ///
107    /// # Example
108    ///
109    /// ```
110    /// use aether_core::graph::DspGraph;
111    /// use aether_core::node::DspNode;
112    /// use aether_core::param::ParamBlock;
113    /// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
114    ///
115    /// struct Oscillator { frequency: f32, phase: f32 }
116    /// impl DspNode for Oscillator {
117    ///     fn process(&mut self, _inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
118    ///                output: &mut [f32; BUFFER_SIZE], _params: &mut ParamBlock, sr: f32) {
119    ///         let phase_inc = self.frequency / sr;
120    ///         for sample in output.iter_mut() {
121    ///             *sample = (self.phase * std::f32::consts::TAU).sin();
122    ///             self.phase = (self.phase + phase_inc).fract();
123    ///         }
124    ///     }
125    ///     fn type_name(&self) -> &'static str { "Oscillator" }
126    /// }
127    ///
128    /// let mut graph = DspGraph::new();
129    /// let osc = Box::new(Oscillator { frequency: 440.0, phase: 0.0 });
130    /// let id = graph.add_node(osc).unwrap();
131    /// ```
132    ///
133    /// # See Also
134    ///
135    /// * [`remove_node`](Self::remove_node) - Remove a node from the graph
136    /// * [`connect`](Self::connect) - Connect two nodes
137    pub fn add_node(&mut self, processor: Box<dyn DspNode>) -> Option<NodeId> {
138        let buf = self.buffers.acquire()?;
139        let record = NodeRecord::new(processor, buf);
140        let id = self.arena.insert(record)?;
141        self.forward_edges.insert(id.index, Vec::new());
142        self.index_to_id.insert(id.index, id);
143        self.rebuild_execution_order();
144        Some(id)
145    }
146
147    /// Removes a node from the graph and releases its buffer.
148    ///
149    /// Removes the node from the arena, releases its output buffer back to
150    /// the pool, and removes all edges connected to this node. Rebuilds the
151    /// topological execution order.
152    ///
153    /// # Arguments
154    ///
155    /// * `id` - Node ID to remove
156    ///
157    /// # Returns
158    ///
159    /// * `true` - Node removed successfully
160    /// * `false` - Node doesn't exist (invalid ID or already removed)
161    ///
162    /// # Example
163    ///
164    /// ```
165    /// use aether_core::graph::DspGraph;
166    /// use aether_core::node::DspNode;
167    /// use aether_core::param::ParamBlock;
168    /// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
169    ///
170    /// struct SimpleNode;
171    /// impl DspNode for SimpleNode {
172    ///     fn process(&mut self, _: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
173    ///                output: &mut [f32; BUFFER_SIZE], _: &mut ParamBlock, _: f32) {
174    ///         output.fill(0.0);
175    ///     }
176    ///     fn type_name(&self) -> &'static str { "SimpleNode" }
177    /// }
178    ///
179    /// let mut graph = DspGraph::new();
180    /// let node_id = graph.add_node(Box::new(SimpleNode)).unwrap();
181    ///
182    /// assert!(graph.remove_node(node_id)); // Returns true
183    /// assert!(!graph.remove_node(node_id)); // Returns false (already removed)
184    /// ```
185    ///
186    /// # See Also
187    ///
188    /// * [`add_node`](Self::add_node) - Add a node to the graph
189    pub fn remove_node(&mut self, id: NodeId) -> bool {
190        if let Some(record) = self.arena.remove(id) {
191            self.buffers.release(record.output_buffer);
192            self.forward_edges.remove(&id.index);
193            self.index_to_id.remove(&id.index);
194            for edges in self.forward_edges.values_mut() {
195                edges.retain(|(dst, _)| dst.index != id.index);
196            }
197            self.rebuild_execution_order();
198            true
199        } else {
200            false
201        }
202    }
203
204    /// Connects the output of one node to the input of another.
205    ///
206    /// Creates an edge in the DAG from `src` to `dst`, routing audio from
207    /// the source node's output buffer to the destination node's input slot.
208    /// Rebuilds the topological execution order to maintain DAG invariants.
209    ///
210    /// # Arguments
211    ///
212    /// * `src` - Source node ID (output)
213    /// * `dst` - Destination node ID (input)
214    /// * `slot` - Input slot index on destination node (0 to MAX_INPUTS-1)
215    ///
216    /// # Returns
217    ///
218    /// * `true` - Connection successful
219    /// * `false` - One or both nodes don't exist
220    ///
221    /// # Example
222    ///
223    /// ```
224    /// use aether_core::graph::DspGraph;
225    /// use aether_core::node::DspNode;
226    /// use aether_core::param::ParamBlock;
227    /// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
228    ///
229    /// struct SimpleNode;
230    /// impl DspNode for SimpleNode {
231    ///     fn process(&mut self, _: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
232    ///                output: &mut [f32; BUFFER_SIZE], _: &mut ParamBlock, _: f32) {
233    ///         output.fill(0.0);
234    ///     }
235    ///     fn type_name(&self) -> &'static str { "SimpleNode" }
236    /// }
237    ///
238    /// let mut graph = DspGraph::new();
239    /// let node_a = graph.add_node(Box::new(SimpleNode)).unwrap();
240    /// let node_b = graph.add_node(Box::new(SimpleNode)).unwrap();
241    ///
242    /// // Connect node_a output → node_b input slot 0
243    /// graph.connect(node_a, node_b, 0);
244    /// ```
245    ///
246    /// # See Also
247    ///
248    /// * [`disconnect`](Self::disconnect) - Remove a connection
249    /// * [`add_node`](Self::add_node) - Add nodes to connect
250    pub fn connect(&mut self, src: NodeId, dst: NodeId, slot: usize) -> bool {
251        if self.arena.get(src).is_none() || self.arena.get(dst).is_none() {
252            return false;
253        }
254        // Record forward edge for topo sort.
255        if let Some(edges) = self.forward_edges.get_mut(&src.index) {
256            edges.push((dst, slot));
257        }
258        // Record backward reference in dst node.
259        if let Some(record) = self.arena.get_mut(dst) {
260            record.inputs[slot] = Some(src);
261        }
262        self.rebuild_execution_order();
263        true
264    }
265
266    /// Disconnects an input slot on a destination node.
267    ///
268    /// Removes the connection to the specified input slot, clearing the
269    /// audio routing. The slot will receive silence until reconnected.
270    /// Rebuilds the topological execution order.
271    ///
272    /// # Arguments
273    ///
274    /// * `dst` - Destination node ID
275    /// * `slot` - Input slot index to disconnect (0 to MAX_INPUTS-1)
276    ///
277    /// # Returns
278    ///
279    /// * `true` - Disconnection successful
280    /// * `false` - Node doesn't exist or slot was already empty
281    ///
282    /// # Example
283    ///
284    /// ```
285    /// use aether_core::graph::DspGraph;
286    /// use aether_core::node::DspNode;
287    /// use aether_core::param::ParamBlock;
288    /// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
289    ///
290    /// struct SimpleNode;
291    /// impl DspNode for SimpleNode {
292    ///     fn process(&mut self, _: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
293    ///                output: &mut [f32; BUFFER_SIZE], _: &mut ParamBlock, _: f32) {
294    ///         output.fill(0.0);
295    ///     }
296    ///     fn type_name(&self) -> &'static str { "SimpleNode" }
297    /// }
298    ///
299    /// let mut graph = DspGraph::new();
300    /// let node_a = graph.add_node(Box::new(SimpleNode)).unwrap();
301    /// let node_b = graph.add_node(Box::new(SimpleNode)).unwrap();
302    ///
303    /// graph.connect(node_a, node_b, 0);
304    /// graph.disconnect(node_b, 0); // Disconnect slot 0
305    /// ```
306    ///
307    /// # See Also
308    ///
309    /// * [`connect`](Self::connect) - Create a connection
310    pub fn disconnect(&mut self, dst: NodeId, slot: usize) -> bool {
311        let src_id = self.arena.get(dst).and_then(|r| r.inputs[slot]);
312        if let Some(src) = src_id {
313            if let Some(edges) = self.forward_edges.get_mut(&src.index) {
314                edges.retain(|(d, s)| !(d.index == dst.index && *s == slot));
315            }
316        }
317        if let Some(record) = self.arena.get_mut(dst) {
318            record.inputs[slot] = None;
319            self.rebuild_execution_order();
320            true
321        } else {
322            false
323        }
324    }
325
326    /// Kahn's algorithm topological sort. O(V+E), bounded by MAX_NODES.
327    fn rebuild_execution_order(&mut self) {
328        self.execution_order.clear();
329        self.levels.clear();
330
331        // Compute in-degrees from forward edges.
332        let mut in_degree: HashMap<u32, usize> = self.index_to_id.keys().map(|&k| (k, 0)).collect();
333        for edges in self.forward_edges.values() {
334            for (dst, _) in edges {
335                *in_degree.entry(dst.index).or_insert(0) += 1;
336            }
337        }
338
339        // Seed the first wave: all nodes with in-degree 0.
340        let mut current_wave: Vec<u32> = in_degree
341            .iter()
342            .filter(|(_, &deg)| deg == 0)
343            .map(|(&idx, _)| idx)
344            .collect();
345
346        while !current_wave.is_empty() {
347            let mut level_ids: Vec<NodeId> = Vec::with_capacity(current_wave.len());
348            let mut next_wave: Vec<u32> = Vec::new();
349
350            for idx in &current_wave {
351                if let Some(&id) = self.index_to_id.get(idx) {
352                    level_ids.push(id);
353                    self.execution_order.push(id);
354                }
355                if let Some(edges) = self.forward_edges.get(idx) {
356                    for (dst, _) in edges.clone() {
357                        let deg = in_degree.entry(dst.index).or_insert(0);
358                        if *deg > 0 {
359                            *deg -= 1;
360                            if *deg == 0 {
361                                next_wave.push(dst.index);
362                            }
363                        }
364                    }
365                }
366            }
367
368            self.levels.push(level_ids);
369            current_wave = next_wave;
370        }
371    }
372
373    /// Sets the output node whose buffer is sent to the DAC.
374    ///
375    /// Designates which node's output buffer should be copied to the
376    /// audio device output. Only one node can be the output node at a time.
377    ///
378    /// # Arguments
379    ///
380    /// * `id` - Node ID to use as output
381    ///
382    /// # Example
383    ///
384    /// ```
385    /// use aether_core::graph::DspGraph;
386    /// use aether_core::node::DspNode;
387    /// use aether_core::param::ParamBlock;
388    /// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
389    ///
390    /// struct Oscillator { phase: f32 }
391    /// impl DspNode for Oscillator {
392    ///     fn process(&mut self, _: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
393    ///                output: &mut [f32; BUFFER_SIZE], _: &mut ParamBlock, _: f32) {
394    ///         for sample in output.iter_mut() {
395    ///             *sample = (self.phase * std::f32::consts::TAU).sin();
396    ///             self.phase = (self.phase + 0.01).fract();
397    ///         }
398    ///     }
399    ///     fn type_name(&self) -> &'static str { "Oscillator" }
400    /// }
401    ///
402    /// let mut graph = DspGraph::new();
403    /// let osc_id = graph.add_node(Box::new(Oscillator { phase: 0.0 })).unwrap();
404    /// graph.set_output_node(osc_id);
405    /// ```
406    pub fn set_output_node(&mut self, id: NodeId) {
407        self.output_node = Some(id);
408    }
409}
410
411impl Default for DspGraph {
412    fn default() -> Self {
413        Self::new()
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420    use crate::{node::DspNode, param::ParamBlock, BUFFER_SIZE, MAX_INPUTS};
421    use proptest::prelude::*;
422
423    /// Minimal test node for graph topology testing.
424    struct TestNode;
425
426    impl DspNode for TestNode {
427        fn process(
428            &mut self,
429            _inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
430            output: &mut [f32; BUFFER_SIZE],
431            _params: &mut ParamBlock,
432            _sample_rate: f32,
433        ) {
434            output.fill(0.0);
435        }
436
437        fn type_name(&self) -> &'static str {
438            "TestNode"
439        }
440    }
441
442    // Property 2
443    proptest! {
444        /// **Validates: Requirements 1.2, 1.9**
445        ///
446        /// Property 2: Topological level assignments satisfy the dependency ordering invariant.
447        ///
448        /// For any DAG after `rebuild_execution_order`, every node at level L SHALL have all
449        /// its input-connected nodes at levels strictly less than L. Equivalently, no node at
450        /// level L depends on any other node at level L.
451        #[test]
452        fn prop_topological_level_ordering_invariant(
453            num_nodes in 1usize..=20,
454            edges in prop::collection::vec((0usize..20, 0usize..20, 0usize..MAX_INPUTS), 0..50)
455        ) {
456            let mut graph = DspGraph::new();
457            let mut node_ids = Vec::new();
458
459            // Add nodes
460            for _ in 0..num_nodes {
461                if let Some(id) = graph.add_node(Box::new(TestNode)) {
462                    node_ids.push(id);
463                }
464            }
465
466            // Add edges, filtering to maintain DAG invariant (src < dst to prevent cycles)
467            for &(src_idx, dst_idx, slot) in &edges {
468                if src_idx < num_nodes && dst_idx < num_nodes && src_idx < dst_idx {
469                    let src = node_ids[src_idx];
470                    let dst = node_ids[dst_idx];
471                    graph.connect(src, dst, slot);
472                }
473            }
474
475            // Build a map from NodeId to level index
476            let mut node_to_level: HashMap<u32, usize> = HashMap::new();
477            for (level_idx, level_nodes) in graph.levels.iter().enumerate() {
478                for &node_id in level_nodes {
479                    node_to_level.insert(node_id.index, level_idx);
480                }
481            }
482
483            // Verify the invariant: for every edge (src → dst), level[src] < level[dst]
484            for &(src_idx, dst_idx, slot) in &edges {
485                if src_idx < num_nodes && dst_idx < num_nodes && src_idx < dst_idx {
486                    let src = node_ids[src_idx];
487                    let dst = node_ids[dst_idx];
488
489                    // Check if the edge was actually added (connect may fail if slot already occupied)
490                    if let Some(record) = graph.arena.get(dst) {
491                        if record.inputs[slot] == Some(src) {
492                            // Edge exists, verify level ordering
493                            let src_level = node_to_level.get(&src.index).copied();
494                            let dst_level = node_to_level.get(&dst.index).copied();
495
496                            if let (Some(src_lvl), Some(dst_lvl)) = (src_level, dst_level) {
497                                prop_assert!(
498                                    src_lvl < dst_lvl,
499                                    "Level ordering violated: node {} at level {} → node {} at level {}",
500                                    src.index, src_lvl, dst.index, dst_lvl
501                                );
502                            }
503                        }
504                    }
505                }
506            }
507        }
508    }
509}