firewheel_graph/graph/
compiler.rs

1use alloc::{collections::VecDeque, rc::Rc};
2use firewheel_core::node::{AudioNodeInfoInner, DynAudioNode, NodeID};
3use smallvec::SmallVec;
4use thunderdome::Arena;
5
6use crate::error::CompileGraphError;
7
8mod schedule;
9
10pub use schedule::{CompiledSchedule, NodeHeapData, ScheduleHeapData};
11use schedule::{InBufferAssignment, OutBufferAssignment, ScheduledNode};
12
13pub struct NodeEntry {
14    pub id: NodeID,
15    pub info: AudioNodeInfoInner,
16    pub dyn_node: Box<dyn DynAudioNode>,
17    pub activated: bool,
18    /// The edges connected to this node's input ports.
19    incoming: SmallVec<[Edge; 4]>,
20    /// The edges connected to this node's output ports.
21    outgoing: SmallVec<[Edge; 4]>,
22}
23
24impl NodeEntry {
25    pub fn new(info: AudioNodeInfoInner, dyn_node: Box<dyn DynAudioNode>) -> Self {
26        Self {
27            id: NodeID::DANGLING,
28            info,
29            dyn_node,
30            activated: false,
31            incoming: SmallVec::new(),
32            outgoing: SmallVec::new(),
33        }
34    }
35}
36
37/// The index of an input/output port on a particular node.
38pub type PortIdx = u32;
39
40/// A globally unique identifier for an [Edge].
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub struct EdgeID(pub(super) thunderdome::Index);
43
44/// An [Edge] is a connection from source node and port to a
45/// destination node and port.
46#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)]
47pub struct Edge {
48    pub id: EdgeID,
49    /// The ID of the source node used by this edge.
50    pub src_node: NodeID,
51    /// The ID of the source port used by this edge.
52    pub src_port: PortIdx,
53    /// The ID of the destination node used by this edge.
54    pub dst_node: NodeID,
55    /// The ID of the destination port used by this edge.
56    pub dst_port: PortIdx,
57}
58
59/// A reference to an abstract buffer during buffer allocation.
60#[derive(Debug, Clone, Copy)]
61struct BufferRef {
62    /// The index of the buffer
63    idx: usize,
64    /// The generation, or the nth time this buffer has
65    /// been assigned to a different edge in the graph.
66    generation: usize,
67}
68
69/// An allocator for managing and reusing [BufferRef]s.
70#[derive(Debug, Clone)]
71struct BufferAllocator {
72    /// A list of free buffers that may be reallocated
73    free_list: Vec<BufferRef>,
74    /// The maximum number of buffers used
75    count: usize,
76}
77
78impl BufferAllocator {
79    /// Create a new allocator, `num_types` defines the number
80    /// of buffer types we may allocate.
81    fn new(initial_capacity: usize) -> Self {
82        Self {
83            free_list: Vec::with_capacity(initial_capacity),
84            count: 0,
85        }
86    }
87
88    /// Acquire a new buffer
89    fn acquire(&mut self) -> Rc<BufferRef> {
90        let entry = self.free_list.pop().unwrap_or_else(|| {
91            let idx = self.count;
92            self.count += 1;
93            BufferRef { idx, generation: 0 }
94        });
95        Rc::new(BufferRef {
96            idx: entry.idx,
97            generation: entry.generation,
98        })
99    }
100
101    /// Release a BufferRef
102    fn release(&mut self, buffer_ref: Rc<BufferRef>) {
103        if Rc::strong_count(&buffer_ref) == 1 {
104            self.free_list.push(BufferRef {
105                idx: buffer_ref.idx,
106                generation: buffer_ref.generation + 1,
107            });
108        }
109    }
110
111    /// Consume the allocator to return the maximum number of buffers used
112    fn num_buffers(self) -> usize {
113        self.count
114    }
115}
116
117/// Main compilation algorithm
118pub fn compile<'a>(
119    nodes: &mut Arena<NodeEntry>,
120    edges: &mut Arena<Edge>,
121    graph_in_id: NodeID,
122    graph_out_id: NodeID,
123    max_block_frames: usize,
124) -> Result<CompiledSchedule, CompileGraphError> {
125    Ok(
126        GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, max_block_frames)
127            .sort_topologically(true)?
128            .solve_buffer_requirements()?
129            .merge(),
130    )
131}
132
133pub fn cycle_detected<'a>(
134    nodes: &'a mut Arena<NodeEntry>,
135    edges: &'a mut Arena<Edge>,
136    graph_in_id: NodeID,
137    graph_out_id: NodeID,
138) -> bool {
139    if let Err(CompileGraphError::CycleDetected) =
140        GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, 0).sort_topologically(false)
141    {
142        true
143    } else {
144        false
145    }
146}
147
148/// Internal IR used by the compiler algorithm. Built incrementally
149/// via the compiler passes.
150struct GraphIR<'a> {
151    nodes: &'a mut Arena<NodeEntry>,
152    edges: &'a mut Arena<Edge>,
153
154    /// The topologically sorted schedule of the graph. Built internally.
155    schedule: Vec<ScheduledNode>,
156    /// The maximum number of buffers used.
157    max_num_buffers: usize,
158
159    graph_in_id: NodeID,
160    graph_out_id: NodeID,
161    max_in_buffers: usize,
162    max_out_buffers: usize,
163    max_block_frames: usize,
164}
165
166impl<'a> GraphIR<'a> {
167    /// Construct a [GraphIR] instance from lists of nodes and edges, building
168    /// up the adjacency table and creating an empty schedule.
169    fn preprocess(
170        nodes: &'a mut Arena<NodeEntry>,
171        edges: &'a mut Arena<Edge>,
172        graph_in_id: NodeID,
173        graph_out_id: NodeID,
174        max_block_frames: usize,
175    ) -> Self {
176        assert!(nodes.contains(graph_in_id.0));
177        assert!(nodes.contains(graph_out_id.0));
178
179        for (_, node) in nodes.iter_mut() {
180            node.incoming.clear();
181            node.outgoing.clear();
182        }
183
184        for (_, edge) in edges.iter() {
185            nodes[edge.src_node.0].outgoing.push(*edge);
186            nodes[edge.dst_node.0].incoming.push(*edge);
187
188            debug_assert_ne!(edge.src_node, graph_out_id);
189            debug_assert_ne!(edge.dst_node, graph_in_id);
190        }
191
192        Self {
193            nodes,
194            edges,
195            schedule: vec![],
196            max_num_buffers: 0,
197            graph_in_id,
198            graph_out_id,
199            max_in_buffers: 0,
200            max_out_buffers: 0,
201            max_block_frames,
202        }
203    }
204
205    /// Sort the nodes topologically using Kahn's algorithm.
206    /// <https://www.geeksforgeeks.org/topological-sorting-indegree-based-solution/>
207    fn sort_topologically(mut self, build_schedule: bool) -> Result<Self, CompileGraphError> {
208        let mut in_degree = vec![0i32; self.nodes.capacity()];
209        let mut queue = VecDeque::with_capacity(self.nodes.len());
210
211        if build_schedule {
212            self.schedule.reserve(self.nodes.len());
213        }
214
215        let mut num_visited = 0;
216
217        // Calculate in-degree of each vertex
218        for (_, node_entry) in self.nodes.iter() {
219            for edge in node_entry.outgoing.iter() {
220                in_degree[edge.dst_node.0.slot() as usize] += 1;
221            }
222        }
223
224        // Make sure that the graph in node is the first entry in the
225        // schedule. Otherwise a different root node could overwrite
226        // the buffers assigned to the graph in node.
227        queue.push_back(self.graph_in_id.0.slot());
228
229        // Enqueue all other nodes with 0 in-degree
230        for (_, node_entry) in self.nodes.iter() {
231            if node_entry.incoming.is_empty() && node_entry.id.0.slot() != self.graph_in_id.0.slot()
232            {
233                queue.push_back(node_entry.id.0.slot());
234            }
235        }
236
237        // BFS traversal
238        while let Some(node_slot) = queue.pop_front() {
239            num_visited += 1;
240
241            let (_, node_entry) = self.nodes.get_by_slot(node_slot).unwrap();
242
243            // Reduce in-degree of adjacent nodes
244            for edge in node_entry.outgoing.iter() {
245                in_degree[edge.dst_node.0.slot() as usize] -= 1;
246
247                // If in-degree becomes 0, enqueue it
248                if in_degree[edge.dst_node.0.slot() as usize] == 0 {
249                    queue.push_back(edge.dst_node.0.slot());
250                }
251            }
252
253            if build_schedule {
254                if node_slot != self.graph_out_id.0.slot() {
255                    self.schedule.push(ScheduledNode::new(
256                        node_entry.id,
257                        node_entry.info.debug_name,
258                    ));
259                }
260            }
261        }
262
263        if build_schedule {
264            // Make sure that the graph out node is the last entry in the
265            // schedule by waiting to push it after all other nodes have
266            // been pushed. Otherwise a different leaf node could overwrite
267            // the buffers assigned to the graph out node.
268            self.schedule
269                .push(ScheduledNode::new(self.graph_out_id, "graph_out"));
270        }
271
272        // If not all vertices are visited, cycle
273        if num_visited != self.nodes.len() {
274            return Err(CompileGraphError::CycleDetected);
275        }
276
277        Ok(self)
278    }
279
280    fn solve_buffer_requirements(mut self) -> Result<Self, CompileGraphError> {
281        let mut allocator = BufferAllocator::new(64);
282        let mut assignment_table: Arena<Rc<BufferRef>> =
283            Arena::with_capacity(self.edges.capacity());
284        let mut buffers_to_release: Vec<Rc<BufferRef>> = Vec::with_capacity(64);
285
286        for entry in &mut self.schedule {
287            // Collect the inputs to the algorithm, the incoming/outgoing edges of this node.
288
289            let node_entry = &self.nodes[entry.id.0];
290
291            let num_inputs = node_entry.info.channel_config.num_inputs.get() as usize;
292            let num_outputs = node_entry.info.channel_config.num_outputs.get() as usize;
293
294            buffers_to_release.clear();
295            if buffers_to_release.capacity() < num_inputs + num_outputs {
296                buffers_to_release
297                    .reserve(num_inputs + num_outputs - buffers_to_release.capacity());
298            }
299
300            entry.input_buffers.reserve_exact(num_inputs);
301            entry.output_buffers.reserve_exact(num_outputs);
302
303            for port_idx in 0..num_inputs as u32 {
304                let edges: SmallVec<[&Edge; 4]> = node_entry
305                    .incoming
306                    .iter()
307                    .filter(|edge| edge.dst_port == port_idx)
308                    .collect();
309
310                if edges.is_empty() {
311                    // Case 1: The port is an input and it is unconnected. Acquire a buffer, and
312                    //         assign it. The buffer must be cleared. Release the buffer once the
313                    //         node assignments are done.
314                    let buffer = allocator.acquire();
315                    entry.input_buffers.push(InBufferAssignment {
316                        buffer_index: buffer.idx,
317                        //generation: buffer.generation,
318                        should_clear: true,
319                    });
320                    buffers_to_release.push(buffer);
321                } else if edges.len() == 1 {
322                    // Case 2: The port is an input, and has exactly one incoming edge. Lookup the
323                    //         corresponding buffer and assign it. Buffer should not be cleared.
324                    //         Release the buffer once the node assignments are done.
325                    let buffer = assignment_table
326                        .remove(edges[0].id.0)
327                        .expect("No buffer assigned to edge!");
328                    entry.input_buffers.push(InBufferAssignment {
329                        buffer_index: buffer.idx,
330                        //generation: buffer.generation,
331                        should_clear: false,
332                    });
333                    buffers_to_release.push(buffer);
334                } else {
335                    // Case 3: The port is an input with multiple incoming edges. Compute the
336                    //         summing point, and assign the input buffer assignment to the output
337                    //         of the summing point.
338
339                    let sum_buffer = allocator.acquire();
340                    let sum_output = OutBufferAssignment {
341                        buffer_index: sum_buffer.idx,
342                        //generation: sum_buffer.generation,
343                    };
344
345                    // The sum inputs are the corresponding output buffers of the incoming edges.
346                    let sum_inputs = edges
347                        .iter()
348                        .map(|edge| {
349                            let buf = assignment_table
350                                .remove(edge.id.0)
351                                .expect("No buffer assigned to edge!");
352                            let assignment = InBufferAssignment {
353                                buffer_index: buf.idx,
354                                //generation: buf.generation,
355                                should_clear: false,
356                            };
357                            allocator.release(buf);
358                            assignment
359                        })
360                        .collect();
361
362                    entry.sum_inputs.push(InsertedSum {
363                        input_buffers: sum_inputs,
364                        output_buffer: sum_output,
365                    });
366
367                    // This node's input buffer is the sum output buffer. Release it once the node
368                    // assignments are done.
369                    entry.input_buffers.push(InBufferAssignment {
370                        buffer_index: sum_output.buffer_index,
371                        //generation: sum_output.generation,
372                        should_clear: false,
373                    });
374
375                    buffers_to_release.push(sum_buffer);
376                }
377            }
378
379            for port_idx in 0..num_outputs as u32 {
380                let edges: SmallVec<[&Edge; 4]> = node_entry
381                    .outgoing
382                    .iter()
383                    .filter(|edge| edge.src_port == port_idx)
384                    .collect();
385
386                if edges.is_empty() {
387                    // Case 1: The port is an output and it is unconnected. Acquire a buffer and
388                    //         assign it. The buffer does not need to be cleared. Release the
389                    //         buffer once the node assignments are done.
390                    let buffer = allocator.acquire();
391                    entry.output_buffers.push(OutBufferAssignment {
392                        buffer_index: buffer.idx,
393                        //generation: buffer.generation,
394                    });
395                    buffers_to_release.push(buffer);
396                } else {
397                    // Case 2: The port is an output. Acquire a buffer, and add to the assignment
398                    //         table with any corresponding edge IDs. For each edge, update the
399                    //         assigned buffer table. Buffer should not be cleared or released.
400                    let buffer = allocator.acquire();
401                    for edge in &edges {
402                        assignment_table.insert_at(edge.id.0, Rc::clone(&buffer));
403                    }
404                    entry.output_buffers.push(OutBufferAssignment {
405                        buffer_index: buffer.idx,
406                        //generation: buffer.generation,
407                    });
408                }
409            }
410
411            for buffer in buffers_to_release.drain(..) {
412                allocator.release(buffer);
413            }
414
415            self.max_in_buffers = self.max_in_buffers.max(num_inputs);
416            self.max_out_buffers = self.max_out_buffers.max(num_outputs);
417        }
418
419        self.max_num_buffers = allocator.num_buffers() as usize;
420        Ok(self)
421    }
422
423    /// Merge the GraphIR into a [CompiledSchedule].
424    fn merge(self) -> CompiledSchedule {
425        CompiledSchedule::new(
426            self.schedule,
427            self.max_num_buffers,
428            self.max_block_frames,
429            self.graph_in_id,
430        )
431    }
432}
433
434#[derive(Debug, Clone)]
435struct InsertedSum {
436    input_buffers: SmallVec<[InBufferAssignment; 4]>,
437    output_buffer: OutBufferAssignment,
438}