aether_core/graph.rs
1//! Directed Acyclic Graph (DAG) for DSP routing.
2//!
3//! The graph owns the node arena and buffer pool.
4//! Topological sort produces a flat execution order — no recursion in the RT path.
5
6use crate::{
7 arena::{Arena, NodeId},
8 buffer_pool::BufferPool,
9 node::{DspNode, NodeRecord},
10 MAX_NODES,
11};
12use std::collections::HashMap;
13
14/// Directed Acyclic Graph (DAG) for DSP routing.
15///
16/// The graph owns the node arena and buffer pool. It maintains a topologically
17/// sorted execution order and BFS level structure for parallel processing.
18///
19/// # Structure
20///
21/// - **Arena**: Generational arena storing node records
22/// - **Buffer Pool**: Pre-allocated audio buffers (no RT allocation)
23/// - **Execution Order**: Flat topologically sorted node list
24/// - **BFS Levels**: Nodes grouped by dependency depth for parallel execution
25///
26/// # Example
27///
28/// ```
29/// use aether_core::graph::DspGraph;
30/// use aether_core::node::DspNode;
31/// use aether_core::param::ParamBlock;
32/// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
33///
34/// struct Gain { gain: f32 }
35/// impl DspNode for Gain {
36/// fn process(&mut self, inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
37/// output: &mut [f32; BUFFER_SIZE], _params: &mut ParamBlock, _sr: f32) {
38/// if let Some(input) = inputs[0] {
39/// for (i, out) in output.iter_mut().enumerate() {
40/// *out = input[i] * self.gain;
41/// }
42/// }
43/// }
44/// fn type_name(&self) -> &'static str { "Gain" }
45/// }
46///
47/// let mut graph = DspGraph::new();
48/// let gain_id = graph.add_node(Box::new(Gain { gain: 0.5 })).unwrap();
49/// graph.set_output_node(gain_id);
50/// ```
51pub struct DspGraph {
52 pub arena: Arena<NodeRecord>,
53 pub buffers: BufferPool,
54 /// Topologically sorted execution order. Rebuilt on structural mutations.
55 pub execution_order: Vec<NodeId>,
56 /// BFS wave levels: each inner Vec contains nodes that can execute in parallel.
57 /// Level[i] nodes all depend only on nodes in levels 0..i.
58 pub levels: Vec<Vec<NodeId>>,
59 /// The node whose output buffer is sent to the DAC.
60 pub output_node: Option<NodeId>,
61 /// Adjacency list: node index → list of (dst_node, slot) it feeds into.
62 forward_edges: HashMap<u32, Vec<(NodeId, usize)>>,
63 /// Maps slot index → full NodeId (for topo sort without generation scanning).
64 index_to_id: HashMap<u32, NodeId>,
65}
66
67impl DspGraph {
68 /// Creates a new empty DSP graph.
69 ///
70 /// Initializes the arena, buffer pool, and execution structures with
71 /// pre-allocated capacity for `MAX_NODES` nodes.
72 ///
73 /// # Example
74 ///
75 /// ```
76 /// use aether_core::graph::DspGraph;
77 ///
78 /// let graph = DspGraph::new();
79 /// assert_eq!(graph.execution_order.len(), 0);
80 /// ```
81 pub fn new() -> Self {
82 Self {
83 arena: Arena::with_capacity(MAX_NODES),
84 buffers: BufferPool::default(),
85 execution_order: Vec::with_capacity(MAX_NODES),
86 levels: Vec::with_capacity(MAX_NODES),
87 output_node: None,
88 forward_edges: HashMap::new(),
89 index_to_id: HashMap::new(),
90 }
91 }
92
93 /// Adds a node to the graph and returns its ID.
94 ///
95 /// Acquires a buffer from the pool, inserts the node into the arena,
96 /// and rebuilds the topological execution order.
97 ///
98 /// # Arguments
99 ///
100 /// * `processor` - Boxed DSP node implementation
101 ///
102 /// # Returns
103 ///
104 /// * `Some(NodeId)` - The node's unique identifier
105 /// * `None` - If arena is full or buffer pool exhausted
106 ///
107 /// # Example
108 ///
109 /// ```
110 /// use aether_core::graph::DspGraph;
111 /// use aether_core::node::DspNode;
112 /// use aether_core::param::ParamBlock;
113 /// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
114 ///
115 /// struct Oscillator { frequency: f32, phase: f32 }
116 /// impl DspNode for Oscillator {
117 /// fn process(&mut self, _inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
118 /// output: &mut [f32; BUFFER_SIZE], _params: &mut ParamBlock, sr: f32) {
119 /// let phase_inc = self.frequency / sr;
120 /// for sample in output.iter_mut() {
121 /// *sample = (self.phase * std::f32::consts::TAU).sin();
122 /// self.phase = (self.phase + phase_inc).fract();
123 /// }
124 /// }
125 /// fn type_name(&self) -> &'static str { "Oscillator" }
126 /// }
127 ///
128 /// let mut graph = DspGraph::new();
129 /// let osc = Box::new(Oscillator { frequency: 440.0, phase: 0.0 });
130 /// let id = graph.add_node(osc).unwrap();
131 /// ```
132 ///
133 /// # See Also
134 ///
135 /// * [`remove_node`](Self::remove_node) - Remove a node from the graph
136 /// * [`connect`](Self::connect) - Connect two nodes
137 pub fn add_node(&mut self, processor: Box<dyn DspNode>) -> Option<NodeId> {
138 let buf = self.buffers.acquire()?;
139 let record = NodeRecord::new(processor, buf);
140 let id = self.arena.insert(record)?;
141 self.forward_edges.insert(id.index, Vec::new());
142 self.index_to_id.insert(id.index, id);
143 self.rebuild_execution_order();
144 Some(id)
145 }
146
147 /// Removes a node from the graph and releases its buffer.
148 ///
149 /// Removes the node from the arena, releases its output buffer back to
150 /// the pool, and removes all edges connected to this node. Rebuilds the
151 /// topological execution order.
152 ///
153 /// # Arguments
154 ///
155 /// * `id` - Node ID to remove
156 ///
157 /// # Returns
158 ///
159 /// * `true` - Node removed successfully
160 /// * `false` - Node doesn't exist (invalid ID or already removed)
161 ///
162 /// # Example
163 ///
164 /// ```
165 /// use aether_core::graph::DspGraph;
166 /// use aether_core::node::DspNode;
167 /// use aether_core::param::ParamBlock;
168 /// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
169 ///
170 /// struct SimpleNode;
171 /// impl DspNode for SimpleNode {
172 /// fn process(&mut self, _: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
173 /// output: &mut [f32; BUFFER_SIZE], _: &mut ParamBlock, _: f32) {
174 /// output.fill(0.0);
175 /// }
176 /// fn type_name(&self) -> &'static str { "SimpleNode" }
177 /// }
178 ///
179 /// let mut graph = DspGraph::new();
180 /// let node_id = graph.add_node(Box::new(SimpleNode)).unwrap();
181 ///
182 /// assert!(graph.remove_node(node_id)); // Returns true
183 /// assert!(!graph.remove_node(node_id)); // Returns false (already removed)
184 /// ```
185 ///
186 /// # See Also
187 ///
188 /// * [`add_node`](Self::add_node) - Add a node to the graph
189 pub fn remove_node(&mut self, id: NodeId) -> bool {
190 if let Some(record) = self.arena.remove(id) {
191 self.buffers.release(record.output_buffer);
192 self.forward_edges.remove(&id.index);
193 self.index_to_id.remove(&id.index);
194 for edges in self.forward_edges.values_mut() {
195 edges.retain(|(dst, _)| dst.index != id.index);
196 }
197 self.rebuild_execution_order();
198 true
199 } else {
200 false
201 }
202 }
203
204 /// Connects the output of one node to the input of another.
205 ///
206 /// Creates an edge in the DAG from `src` to `dst`, routing audio from
207 /// the source node's output buffer to the destination node's input slot.
208 /// Rebuilds the topological execution order to maintain DAG invariants.
209 ///
210 /// # Arguments
211 ///
212 /// * `src` - Source node ID (output)
213 /// * `dst` - Destination node ID (input)
214 /// * `slot` - Input slot index on destination node (0 to MAX_INPUTS-1)
215 ///
216 /// # Returns
217 ///
218 /// * `true` - Connection successful
219 /// * `false` - One or both nodes don't exist
220 ///
221 /// # Example
222 ///
223 /// ```
224 /// use aether_core::graph::DspGraph;
225 /// use aether_core::node::DspNode;
226 /// use aether_core::param::ParamBlock;
227 /// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
228 ///
229 /// struct SimpleNode;
230 /// impl DspNode for SimpleNode {
231 /// fn process(&mut self, _: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
232 /// output: &mut [f32; BUFFER_SIZE], _: &mut ParamBlock, _: f32) {
233 /// output.fill(0.0);
234 /// }
235 /// fn type_name(&self) -> &'static str { "SimpleNode" }
236 /// }
237 ///
238 /// let mut graph = DspGraph::new();
239 /// let node_a = graph.add_node(Box::new(SimpleNode)).unwrap();
240 /// let node_b = graph.add_node(Box::new(SimpleNode)).unwrap();
241 ///
242 /// // Connect node_a output → node_b input slot 0
243 /// graph.connect(node_a, node_b, 0);
244 /// ```
245 ///
246 /// # See Also
247 ///
248 /// * [`disconnect`](Self::disconnect) - Remove a connection
249 /// * [`add_node`](Self::add_node) - Add nodes to connect
250 pub fn connect(&mut self, src: NodeId, dst: NodeId, slot: usize) -> bool {
251 if self.arena.get(src).is_none() || self.arena.get(dst).is_none() {
252 return false;
253 }
254 // Record forward edge for topo sort.
255 if let Some(edges) = self.forward_edges.get_mut(&src.index) {
256 edges.push((dst, slot));
257 }
258 // Record backward reference in dst node.
259 if let Some(record) = self.arena.get_mut(dst) {
260 record.inputs[slot] = Some(src);
261 }
262 self.rebuild_execution_order();
263 true
264 }
265
266 /// Disconnects an input slot on a destination node.
267 ///
268 /// Removes the connection to the specified input slot, clearing the
269 /// audio routing. The slot will receive silence until reconnected.
270 /// Rebuilds the topological execution order.
271 ///
272 /// # Arguments
273 ///
274 /// * `dst` - Destination node ID
275 /// * `slot` - Input slot index to disconnect (0 to MAX_INPUTS-1)
276 ///
277 /// # Returns
278 ///
279 /// * `true` - Disconnection successful
280 /// * `false` - Node doesn't exist or slot was already empty
281 ///
282 /// # Example
283 ///
284 /// ```
285 /// use aether_core::graph::DspGraph;
286 /// use aether_core::node::DspNode;
287 /// use aether_core::param::ParamBlock;
288 /// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
289 ///
290 /// struct SimpleNode;
291 /// impl DspNode for SimpleNode {
292 /// fn process(&mut self, _: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
293 /// output: &mut [f32; BUFFER_SIZE], _: &mut ParamBlock, _: f32) {
294 /// output.fill(0.0);
295 /// }
296 /// fn type_name(&self) -> &'static str { "SimpleNode" }
297 /// }
298 ///
299 /// let mut graph = DspGraph::new();
300 /// let node_a = graph.add_node(Box::new(SimpleNode)).unwrap();
301 /// let node_b = graph.add_node(Box::new(SimpleNode)).unwrap();
302 ///
303 /// graph.connect(node_a, node_b, 0);
304 /// graph.disconnect(node_b, 0); // Disconnect slot 0
305 /// ```
306 ///
307 /// # See Also
308 ///
309 /// * [`connect`](Self::connect) - Create a connection
310 pub fn disconnect(&mut self, dst: NodeId, slot: usize) -> bool {
311 let src_id = self.arena.get(dst).and_then(|r| r.inputs[slot]);
312 if let Some(src) = src_id {
313 if let Some(edges) = self.forward_edges.get_mut(&src.index) {
314 edges.retain(|(d, s)| !(d.index == dst.index && *s == slot));
315 }
316 }
317 if let Some(record) = self.arena.get_mut(dst) {
318 record.inputs[slot] = None;
319 self.rebuild_execution_order();
320 true
321 } else {
322 false
323 }
324 }
325
326 /// Kahn's algorithm topological sort. O(V+E), bounded by MAX_NODES.
327 fn rebuild_execution_order(&mut self) {
328 self.execution_order.clear();
329 self.levels.clear();
330
331 // Compute in-degrees from forward edges.
332 let mut in_degree: HashMap<u32, usize> = self.index_to_id.keys().map(|&k| (k, 0)).collect();
333 for edges in self.forward_edges.values() {
334 for (dst, _) in edges {
335 *in_degree.entry(dst.index).or_insert(0) += 1;
336 }
337 }
338
339 // Seed the first wave: all nodes with in-degree 0.
340 let mut current_wave: Vec<u32> = in_degree
341 .iter()
342 .filter(|(_, °)| deg == 0)
343 .map(|(&idx, _)| idx)
344 .collect();
345
346 while !current_wave.is_empty() {
347 let mut level_ids: Vec<NodeId> = Vec::with_capacity(current_wave.len());
348 let mut next_wave: Vec<u32> = Vec::new();
349
350 for idx in ¤t_wave {
351 if let Some(&id) = self.index_to_id.get(idx) {
352 level_ids.push(id);
353 self.execution_order.push(id);
354 }
355 if let Some(edges) = self.forward_edges.get(idx) {
356 for (dst, _) in edges.clone() {
357 let deg = in_degree.entry(dst.index).or_insert(0);
358 if *deg > 0 {
359 *deg -= 1;
360 if *deg == 0 {
361 next_wave.push(dst.index);
362 }
363 }
364 }
365 }
366 }
367
368 self.levels.push(level_ids);
369 current_wave = next_wave;
370 }
371 }
372
373 /// Sets the output node whose buffer is sent to the DAC.
374 ///
375 /// Designates which node's output buffer should be copied to the
376 /// audio device output. Only one node can be the output node at a time.
377 ///
378 /// # Arguments
379 ///
380 /// * `id` - Node ID to use as output
381 ///
382 /// # Example
383 ///
384 /// ```
385 /// use aether_core::graph::DspGraph;
386 /// use aether_core::node::DspNode;
387 /// use aether_core::param::ParamBlock;
388 /// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
389 ///
390 /// struct Oscillator { phase: f32 }
391 /// impl DspNode for Oscillator {
392 /// fn process(&mut self, _: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
393 /// output: &mut [f32; BUFFER_SIZE], _: &mut ParamBlock, _: f32) {
394 /// for sample in output.iter_mut() {
395 /// *sample = (self.phase * std::f32::consts::TAU).sin();
396 /// self.phase = (self.phase + 0.01).fract();
397 /// }
398 /// }
399 /// fn type_name(&self) -> &'static str { "Oscillator" }
400 /// }
401 ///
402 /// let mut graph = DspGraph::new();
403 /// let osc_id = graph.add_node(Box::new(Oscillator { phase: 0.0 })).unwrap();
404 /// graph.set_output_node(osc_id);
405 /// ```
406 pub fn set_output_node(&mut self, id: NodeId) {
407 self.output_node = Some(id);
408 }
409}
410
411impl Default for DspGraph {
412 fn default() -> Self {
413 Self::new()
414 }
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use crate::{node::DspNode, param::ParamBlock, BUFFER_SIZE, MAX_INPUTS};
421 use proptest::prelude::*;
422
423 /// Minimal test node for graph topology testing.
424 struct TestNode;
425
426 impl DspNode for TestNode {
427 fn process(
428 &mut self,
429 _inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
430 output: &mut [f32; BUFFER_SIZE],
431 _params: &mut ParamBlock,
432 _sample_rate: f32,
433 ) {
434 output.fill(0.0);
435 }
436
437 fn type_name(&self) -> &'static str {
438 "TestNode"
439 }
440 }
441
442 // Property 2
443 proptest! {
444 /// **Validates: Requirements 1.2, 1.9**
445 ///
446 /// Property 2: Topological level assignments satisfy the dependency ordering invariant.
447 ///
448 /// For any DAG after `rebuild_execution_order`, every node at level L SHALL have all
449 /// its input-connected nodes at levels strictly less than L. Equivalently, no node at
450 /// level L depends on any other node at level L.
451 #[test]
452 fn prop_topological_level_ordering_invariant(
453 num_nodes in 1usize..=20,
454 edges in prop::collection::vec((0usize..20, 0usize..20, 0usize..MAX_INPUTS), 0..50)
455 ) {
456 let mut graph = DspGraph::new();
457 let mut node_ids = Vec::new();
458
459 // Add nodes
460 for _ in 0..num_nodes {
461 if let Some(id) = graph.add_node(Box::new(TestNode)) {
462 node_ids.push(id);
463 }
464 }
465
466 // Add edges, filtering to maintain DAG invariant (src < dst to prevent cycles)
467 for &(src_idx, dst_idx, slot) in &edges {
468 if src_idx < num_nodes && dst_idx < num_nodes && src_idx < dst_idx {
469 let src = node_ids[src_idx];
470 let dst = node_ids[dst_idx];
471 graph.connect(src, dst, slot);
472 }
473 }
474
475 // Build a map from NodeId to level index
476 let mut node_to_level: HashMap<u32, usize> = HashMap::new();
477 for (level_idx, level_nodes) in graph.levels.iter().enumerate() {
478 for &node_id in level_nodes {
479 node_to_level.insert(node_id.index, level_idx);
480 }
481 }
482
483 // Verify the invariant: for every edge (src → dst), level[src] < level[dst]
484 for &(src_idx, dst_idx, slot) in &edges {
485 if src_idx < num_nodes && dst_idx < num_nodes && src_idx < dst_idx {
486 let src = node_ids[src_idx];
487 let dst = node_ids[dst_idx];
488
489 // Check if the edge was actually added (connect may fail if slot already occupied)
490 if let Some(record) = graph.arena.get(dst) {
491 if record.inputs[slot] == Some(src) {
492 // Edge exists, verify level ordering
493 let src_level = node_to_level.get(&src.index).copied();
494 let dst_level = node_to_level.get(&dst.index).copied();
495
496 if let (Some(src_lvl), Some(dst_lvl)) = (src_level, dst_level) {
497 prop_assert!(
498 src_lvl < dst_lvl,
499 "Level ordering violated: node {} at level {} → node {} at level {}",
500 src.index, src_lvl, dst.index, dst_lvl
501 );
502 }
503 }
504 }
505 }
506 }
507 }
508 }
509}