firewheel_graph/graph/
compiler.rs

1use alloc::{collections::VecDeque, rc::Rc};
2use firewheel_core::node::{AudioNodeInfoInner, DynAudioNode, NodeID};
3use smallvec::SmallVec;
4use thunderdome::Arena;
5
6#[cfg(not(feature = "std"))]
7use bevy_platform::prelude::{vec, Box, Vec};
8
9use crate::error::CompileGraphError;
10
11mod schedule;
12
13pub use schedule::{CompiledSchedule, NodeHeapData, ScheduleHeapData};
14use schedule::{InBufferAssignment, OutBufferAssignment, PreProcNode, ScheduledNode};
15
16pub struct NodeEntry {
17    pub id: NodeID,
18    pub info: AudioNodeInfoInner,
19    pub dyn_node: Box<dyn DynAudioNode>,
20    pub processor_constructed: bool,
21    /// The edges connected to this node's input ports.
22    incoming: SmallVec<[Edge; 4]>,
23    /// The edges connected to this node's output ports.
24    outgoing: SmallVec<[Edge; 4]>,
25}
26
27impl NodeEntry {
28    pub fn new(info: AudioNodeInfoInner, dyn_node: Box<dyn DynAudioNode>) -> Self {
29        Self {
30            id: NodeID::DANGLING,
31            info,
32            dyn_node,
33            processor_constructed: false,
34            incoming: SmallVec::new(),
35            outgoing: SmallVec::new(),
36        }
37    }
38}
39
40/// The index of an input/output port on a particular node.
41pub type PortIdx = u32;
42
43/// A globally unique identifier for an [Edge].
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub struct EdgeID(pub(super) thunderdome::Index);
46
47/// An [Edge] is a connection from source node and port to a
48/// destination node and port.
49#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)]
50pub struct Edge {
51    pub id: EdgeID,
52    /// The ID of the source node used by this edge.
53    pub src_node: NodeID,
54    /// The ID of the source port used by this edge.
55    pub src_port: PortIdx,
56    /// The ID of the destination node used by this edge.
57    pub dst_node: NodeID,
58    /// The ID of the destination port used by this edge.
59    pub dst_port: PortIdx,
60}
61
62/// A reference to an abstract buffer during buffer allocation.
63#[derive(Debug, Clone, Copy)]
64struct BufferRef {
65    /// The index of the buffer
66    idx: usize,
67    /// The generation, or the nth time this buffer has
68    /// been assigned to a different edge in the graph.
69    generation: usize,
70}
71
72/// An allocator for managing and reusing [BufferRef]s.
73#[derive(Debug, Clone)]
74struct BufferAllocator {
75    /// A list of free buffers that may be reallocated
76    free_list: Vec<BufferRef>,
77    /// The maximum number of buffers used
78    count: usize,
79}
80
81impl BufferAllocator {
82    /// Create a new allocator, `num_types` defines the number
83    /// of buffer types we may allocate.
84    fn new(initial_capacity: usize) -> Self {
85        Self {
86            free_list: Vec::with_capacity(initial_capacity),
87            count: 0,
88        }
89    }
90
91    /// Acquire a new buffer
92    fn acquire(&mut self) -> Rc<BufferRef> {
93        let entry = self.free_list.pop().unwrap_or_else(|| {
94            let idx = self.count;
95            self.count += 1;
96            BufferRef { idx, generation: 0 }
97        });
98        Rc::new(BufferRef {
99            idx: entry.idx,
100            generation: entry.generation,
101        })
102    }
103
104    /// Release a BufferRef
105    fn release(&mut self, buffer_ref: Rc<BufferRef>) {
106        if Rc::strong_count(&buffer_ref) == 1 {
107            self.free_list.push(BufferRef {
108                idx: buffer_ref.idx,
109                generation: buffer_ref.generation + 1,
110            });
111        }
112    }
113
114    /// Consume the allocator to return the maximum number of buffers used
115    fn num_buffers(self) -> usize {
116        self.count
117    }
118}
119
120/// Main compilation algorithm
121pub fn compile(
122    nodes: &mut Arena<NodeEntry>,
123    edges: &mut Arena<Edge>,
124    graph_in_id: NodeID,
125    graph_out_id: NodeID,
126    max_block_frames: usize,
127) -> Result<CompiledSchedule, CompileGraphError> {
128    Ok(
129        GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, max_block_frames)
130            .sort_topologically(true)?
131            .solve_buffer_requirements()?
132            .merge(),
133    )
134}
135
136pub fn cycle_detected<'a>(
137    nodes: &'a mut Arena<NodeEntry>,
138    edges: &'a mut Arena<Edge>,
139    graph_in_id: NodeID,
140    graph_out_id: NodeID,
141) -> bool {
142    if let Err(CompileGraphError::CycleDetected) =
143        GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, 0).sort_topologically(false)
144    {
145        true
146    } else {
147        false
148    }
149}
150
151/// Internal IR used by the compiler algorithm. Built incrementally
152/// via the compiler passes.
153struct GraphIR<'a> {
154    nodes: &'a mut Arena<NodeEntry>,
155    edges: &'a mut Arena<Edge>,
156
157    /// Nodes with zero inputs and outputs are "pre process nodes" that get
158    /// processed before all other nodes.
159    pre_proc_nodes: Vec<PreProcNode>,
160    /// The topologically sorted schedule of the graph. Built internally.
161    schedule: Vec<ScheduledNode>,
162    /// The maximum number of buffers used.
163    max_num_buffers: usize,
164
165    graph_in_id: NodeID,
166    graph_out_id: NodeID,
167    max_in_buffers: usize,
168    max_out_buffers: usize,
169    max_block_frames: usize,
170}
171
172impl<'a> GraphIR<'a> {
173    /// Construct a [GraphIR] instance from lists of nodes and edges, building
174    /// up the adjacency table and creating an empty schedule.
175    fn preprocess(
176        nodes: &'a mut Arena<NodeEntry>,
177        edges: &'a mut Arena<Edge>,
178        graph_in_id: NodeID,
179        graph_out_id: NodeID,
180        max_block_frames: usize,
181    ) -> Self {
182        assert!(nodes.contains(graph_in_id.0));
183        assert!(nodes.contains(graph_out_id.0));
184
185        for (_, node) in nodes.iter_mut() {
186            node.incoming.clear();
187            node.outgoing.clear();
188        }
189
190        for (_, edge) in edges.iter() {
191            nodes[edge.src_node.0].outgoing.push(*edge);
192            nodes[edge.dst_node.0].incoming.push(*edge);
193
194            debug_assert_ne!(edge.src_node, graph_out_id);
195            debug_assert_ne!(edge.dst_node, graph_in_id);
196        }
197
198        Self {
199            nodes,
200            edges,
201            pre_proc_nodes: vec![],
202            schedule: vec![],
203            max_num_buffers: 0,
204            graph_in_id,
205            graph_out_id,
206            max_in_buffers: 0,
207            max_out_buffers: 0,
208            max_block_frames,
209        }
210    }
211
212    /// Sort the nodes topologically using Kahn's algorithm.
213    /// <https://www.geeksforgeeks.org/topological-sorting-indegree-based-solution/>
214    fn sort_topologically(mut self, build_schedule: bool) -> Result<Self, CompileGraphError> {
215        let mut in_degree = vec![0i32; self.nodes.capacity()];
216        let mut queue = VecDeque::with_capacity(self.nodes.len());
217
218        if build_schedule {
219            self.schedule.reserve(self.nodes.len());
220        }
221
222        let mut num_visited = 0;
223
224        // Calculate in-degree of each vertex
225        for (_, node_entry) in self.nodes.iter() {
226            for edge in node_entry.outgoing.iter() {
227                in_degree[edge.dst_node.0.slot() as usize] += 1;
228            }
229        }
230
231        // Make sure that the graph in node is the first entry in the
232        // schedule. Otherwise a different root node could overwrite
233        // the buffers assigned to the graph in node.
234        queue.push_back(self.graph_in_id.0.slot());
235
236        // Enqueue all other nodes with 0 in-degree
237        for (_, node_entry) in self.nodes.iter() {
238            if node_entry.incoming.is_empty() && node_entry.id.0.slot() != self.graph_in_id.0.slot()
239            {
240                // If the number of inputs and outputs on a node is zero, then it
241                // is a "pre process" node.
242                if node_entry.info.channel_config.is_empty() {
243                    self.pre_proc_nodes.push(PreProcNode {
244                        id: node_entry.id,
245                        debug_name: node_entry.info.debug_name,
246                    });
247
248                    num_visited += 1;
249                } else {
250                    queue.push_back(node_entry.id.0.slot());
251                }
252            }
253        }
254
255        // BFS traversal
256        while let Some(node_slot) = queue.pop_front() {
257            num_visited += 1;
258
259            let (_, node_entry) = self.nodes.get_by_slot(node_slot).unwrap();
260
261            // Reduce in-degree of adjacent nodes
262            for edge in node_entry.outgoing.iter() {
263                in_degree[edge.dst_node.0.slot() as usize] -= 1;
264
265                // If in-degree becomes 0, enqueue it
266                if in_degree[edge.dst_node.0.slot() as usize] == 0 {
267                    queue.push_back(edge.dst_node.0.slot());
268                }
269            }
270
271            if build_schedule {
272                if node_slot != self.graph_out_id.0.slot() {
273                    self.schedule.push(ScheduledNode::new(
274                        node_entry.id,
275                        node_entry.info.debug_name,
276                    ));
277                }
278            }
279        }
280
281        if build_schedule {
282            // Make sure that the graph out node is the last entry in the
283            // schedule by waiting to push it after all other nodes have
284            // been pushed. Otherwise a different leaf node could overwrite
285            // the buffers assigned to the graph out node.
286            self.schedule
287                .push(ScheduledNode::new(self.graph_out_id, "graph_out"));
288        }
289
290        // If not all vertices are visited, cycle
291        if num_visited != self.nodes.len() {
292            return Err(CompileGraphError::CycleDetected);
293        }
294
295        Ok(self)
296    }
297
298    fn solve_buffer_requirements(mut self) -> Result<Self, CompileGraphError> {
299        let mut allocator = BufferAllocator::new(64);
300        let mut assignment_table: Arena<Rc<BufferRef>> =
301            Arena::with_capacity(self.edges.capacity());
302        let mut buffers_to_release: Vec<Rc<BufferRef>> = Vec::with_capacity(64);
303
304        for entry in &mut self.schedule {
305            // Collect the inputs to the algorithm, the incoming/outgoing edges of this node.
306
307            let node_entry = &self.nodes[entry.id.0];
308
309            let num_inputs = node_entry.info.channel_config.num_inputs.get() as usize;
310            let num_outputs = node_entry.info.channel_config.num_outputs.get() as usize;
311
312            buffers_to_release.clear();
313            if buffers_to_release.capacity() < num_inputs + num_outputs {
314                buffers_to_release
315                    .reserve(num_inputs + num_outputs - buffers_to_release.capacity());
316            }
317
318            entry.input_buffers.reserve_exact(num_inputs);
319            entry.output_buffers.reserve_exact(num_outputs);
320
321            for port_idx in 0..num_inputs as u32 {
322                let edges: SmallVec<[&Edge; 4]> = node_entry
323                    .incoming
324                    .iter()
325                    .filter(|edge| edge.dst_port == port_idx)
326                    .collect();
327
328                entry
329                    .in_connected_mask
330                    .set_channel(port_idx as usize, !edges.is_empty());
331
332                if edges.is_empty() {
333                    // Case 1: The port is an input and it is unconnected. Acquire a buffer, and
334                    //         assign it. The buffer must be cleared. Release the buffer once the
335                    //         node assignments are done.
336                    let buffer = allocator.acquire();
337                    entry.input_buffers.push(InBufferAssignment {
338                        buffer_index: buffer.idx,
339                        //generation: buffer.generation,
340                        should_clear: true,
341                    });
342                    buffers_to_release.push(buffer);
343                } else if edges.len() == 1 {
344                    // Case 2: The port is an input, and has exactly one incoming edge. Lookup the
345                    //         corresponding buffer and assign it. Buffer should not be cleared.
346                    //         Release the buffer once the node assignments are done.
347                    let buffer = assignment_table
348                        .remove(edges[0].id.0)
349                        .expect("No buffer assigned to edge!");
350                    entry.input_buffers.push(InBufferAssignment {
351                        buffer_index: buffer.idx,
352                        //generation: buffer.generation,
353                        should_clear: false,
354                    });
355                    buffers_to_release.push(buffer);
356                } else {
357                    // Case 3: The port is an input with multiple incoming edges. Compute the
358                    //         summing point, and assign the input buffer assignment to the output
359                    //         of the summing point.
360
361                    let sum_buffer = allocator.acquire();
362                    let sum_output = OutBufferAssignment {
363                        buffer_index: sum_buffer.idx,
364                        //generation: sum_buffer.generation,
365                    };
366
367                    // The sum inputs are the corresponding output buffers of the incoming edges.
368                    let sum_inputs = edges
369                        .iter()
370                        .map(|edge| {
371                            let buf = assignment_table
372                                .remove(edge.id.0)
373                                .expect("No buffer assigned to edge!");
374                            let assignment = InBufferAssignment {
375                                buffer_index: buf.idx,
376                                //generation: buf.generation,
377                                should_clear: false,
378                            };
379                            allocator.release(buf);
380                            assignment
381                        })
382                        .collect();
383
384                    entry.sum_inputs.push(InsertedSum {
385                        input_buffers: sum_inputs,
386                        output_buffer: sum_output,
387                    });
388
389                    // This node's input buffer is the sum output buffer. Release it once the node
390                    // assignments are done.
391                    entry.input_buffers.push(InBufferAssignment {
392                        buffer_index: sum_output.buffer_index,
393                        //generation: sum_output.generation,
394                        should_clear: false,
395                    });
396
397                    buffers_to_release.push(sum_buffer);
398                }
399            }
400
401            for port_idx in 0..num_outputs as u32 {
402                let edges: SmallVec<[&Edge; 4]> = node_entry
403                    .outgoing
404                    .iter()
405                    .filter(|edge| edge.src_port == port_idx)
406                    .collect();
407
408                entry
409                    .out_connected_mask
410                    .set_channel(port_idx as usize, !edges.is_empty());
411
412                if edges.is_empty() {
413                    // Case 1: The port is an output and it is unconnected. Acquire a buffer and
414                    //         assign it. The buffer does not need to be cleared. Release the
415                    //         buffer once the node assignments are done.
416                    let buffer = allocator.acquire();
417                    entry.output_buffers.push(OutBufferAssignment {
418                        buffer_index: buffer.idx,
419                        //generation: buffer.generation,
420                    });
421                    buffers_to_release.push(buffer);
422                } else {
423                    // Case 2: The port is an output. Acquire a buffer, and add to the assignment
424                    //         table with any corresponding edge IDs. For each edge, update the
425                    //         assigned buffer table. Buffer should not be cleared or released.
426                    let buffer = allocator.acquire();
427                    for edge in &edges {
428                        assignment_table.insert_at(edge.id.0, Rc::clone(&buffer));
429                    }
430                    entry.output_buffers.push(OutBufferAssignment {
431                        buffer_index: buffer.idx,
432                        //generation: buffer.generation,
433                    });
434                }
435            }
436
437            for buffer in buffers_to_release.drain(..) {
438                allocator.release(buffer);
439            }
440
441            self.max_in_buffers = self.max_in_buffers.max(num_inputs);
442            self.max_out_buffers = self.max_out_buffers.max(num_outputs);
443        }
444
445        self.max_num_buffers = allocator.num_buffers() as usize;
446        Ok(self)
447    }
448
449    /// Merge the GraphIR into a [CompiledSchedule].
450    fn merge(self) -> CompiledSchedule {
451        CompiledSchedule::new(
452            self.pre_proc_nodes,
453            self.schedule,
454            self.max_num_buffers,
455            self.max_block_frames,
456            self.graph_in_id,
457        )
458    }
459}
460
461#[derive(Debug, Clone)]
462struct InsertedSum {
463    input_buffers: SmallVec<[InBufferAssignment; 4]>,
464    output_buffer: OutBufferAssignment,
465}