firewheel_graph/graph/
compiler.rs

1use alloc::{collections::VecDeque, rc::Rc};
2use firewheel_core::node::{AudioNodeInfoInner, DynAudioNode, NodeID};
3use smallvec::SmallVec;
4use thunderdome::Arena;
5
6#[cfg(not(feature = "std"))]
7use bevy_platform::prelude::{vec, Box, Vec};
8
9use crate::error::CompileGraphError;
10
11mod schedule;
12
13pub use schedule::{CompiledSchedule, NodeHeapData, ScheduleHeapData};
14use schedule::{InBufferAssignment, OutBufferAssignment, ScheduledNode};
15
16pub struct NodeEntry {
17    pub id: NodeID,
18    pub info: AudioNodeInfoInner,
19    pub dyn_node: Box<dyn DynAudioNode>,
20    pub processor_constructed: bool,
21    /// The edges connected to this node's input ports.
22    incoming: SmallVec<[Edge; 4]>,
23    /// The edges connected to this node's output ports.
24    outgoing: SmallVec<[Edge; 4]>,
25}
26
27impl NodeEntry {
28    pub fn new(info: AudioNodeInfoInner, dyn_node: Box<dyn DynAudioNode>) -> Self {
29        Self {
30            id: NodeID::DANGLING,
31            info,
32            dyn_node,
33            processor_constructed: false,
34            incoming: SmallVec::new(),
35            outgoing: SmallVec::new(),
36        }
37    }
38}
39
40/// The index of an input/output port on a particular node.
41pub type PortIdx = u32;
42
43/// A globally unique identifier for an [Edge].
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub struct EdgeID(pub(super) thunderdome::Index);
46
47/// An [Edge] is a connection from source node and port to a
48/// destination node and port.
49#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)]
50pub struct Edge {
51    pub id: EdgeID,
52    /// The ID of the source node used by this edge.
53    pub src_node: NodeID,
54    /// The ID of the source port used by this edge.
55    pub src_port: PortIdx,
56    /// The ID of the destination node used by this edge.
57    pub dst_node: NodeID,
58    /// The ID of the destination port used by this edge.
59    pub dst_port: PortIdx,
60}
61
62/// A reference to an abstract buffer during buffer allocation.
63#[derive(Debug, Clone, Copy)]
64struct BufferRef {
65    /// The index of the buffer
66    idx: usize,
67    /// The generation, or the nth time this buffer has
68    /// been assigned to a different edge in the graph.
69    generation: usize,
70}
71
72/// An allocator for managing and reusing [BufferRef]s.
73#[derive(Debug, Clone)]
74struct BufferAllocator {
75    /// A list of free buffers that may be reallocated
76    free_list: Vec<BufferRef>,
77    /// The maximum number of buffers used
78    count: usize,
79}
80
81impl BufferAllocator {
82    /// Create a new allocator, `num_types` defines the number
83    /// of buffer types we may allocate.
84    fn new(initial_capacity: usize) -> Self {
85        Self {
86            free_list: Vec::with_capacity(initial_capacity),
87            count: 0,
88        }
89    }
90
91    /// Acquire a new buffer
92    fn acquire(&mut self) -> Rc<BufferRef> {
93        let entry = self.free_list.pop().unwrap_or_else(|| {
94            let idx = self.count;
95            self.count += 1;
96            BufferRef { idx, generation: 0 }
97        });
98        Rc::new(BufferRef {
99            idx: entry.idx,
100            generation: entry.generation,
101        })
102    }
103
104    /// Release a BufferRef
105    fn release(&mut self, buffer_ref: Rc<BufferRef>) {
106        if Rc::strong_count(&buffer_ref) == 1 {
107            self.free_list.push(BufferRef {
108                idx: buffer_ref.idx,
109                generation: buffer_ref.generation + 1,
110            });
111        }
112    }
113
114    /// Consume the allocator to return the maximum number of buffers used
115    fn num_buffers(self) -> usize {
116        self.count
117    }
118}
119
120/// Main compilation algorithm
121pub fn compile(
122    nodes: &mut Arena<NodeEntry>,
123    edges: &mut Arena<Edge>,
124    graph_in_id: NodeID,
125    graph_out_id: NodeID,
126    max_block_frames: usize,
127) -> Result<CompiledSchedule, CompileGraphError> {
128    Ok(
129        GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, max_block_frames)
130            .sort_topologically(true)?
131            .solve_buffer_requirements()?
132            .merge(),
133    )
134}
135
136pub fn cycle_detected<'a>(
137    nodes: &'a mut Arena<NodeEntry>,
138    edges: &'a mut Arena<Edge>,
139    graph_in_id: NodeID,
140    graph_out_id: NodeID,
141) -> bool {
142    if let Err(CompileGraphError::CycleDetected) =
143        GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, 0).sort_topologically(false)
144    {
145        true
146    } else {
147        false
148    }
149}
150
151/// Internal IR used by the compiler algorithm. Built incrementally
152/// via the compiler passes.
153struct GraphIR<'a> {
154    nodes: &'a mut Arena<NodeEntry>,
155    edges: &'a mut Arena<Edge>,
156
157    /// The topologically sorted schedule of the graph. Built internally.
158    schedule: Vec<ScheduledNode>,
159    /// The maximum number of buffers used.
160    max_num_buffers: usize,
161
162    graph_in_id: NodeID,
163    graph_out_id: NodeID,
164    max_in_buffers: usize,
165    max_out_buffers: usize,
166    max_block_frames: usize,
167}
168
169impl<'a> GraphIR<'a> {
170    /// Construct a [GraphIR] instance from lists of nodes and edges, building
171    /// up the adjacency table and creating an empty schedule.
172    fn preprocess(
173        nodes: &'a mut Arena<NodeEntry>,
174        edges: &'a mut Arena<Edge>,
175        graph_in_id: NodeID,
176        graph_out_id: NodeID,
177        max_block_frames: usize,
178    ) -> Self {
179        assert!(nodes.contains(graph_in_id.0));
180        assert!(nodes.contains(graph_out_id.0));
181
182        for (_, node) in nodes.iter_mut() {
183            node.incoming.clear();
184            node.outgoing.clear();
185        }
186
187        for (_, edge) in edges.iter() {
188            nodes[edge.src_node.0].outgoing.push(*edge);
189            nodes[edge.dst_node.0].incoming.push(*edge);
190
191            debug_assert_ne!(edge.src_node, graph_out_id);
192            debug_assert_ne!(edge.dst_node, graph_in_id);
193        }
194
195        Self {
196            nodes,
197            edges,
198            schedule: vec![],
199            max_num_buffers: 0,
200            graph_in_id,
201            graph_out_id,
202            max_in_buffers: 0,
203            max_out_buffers: 0,
204            max_block_frames,
205        }
206    }
207
208    /// Sort the nodes topologically using Kahn's algorithm.
209    /// <https://www.geeksforgeeks.org/topological-sorting-indegree-based-solution/>
210    fn sort_topologically(mut self, build_schedule: bool) -> Result<Self, CompileGraphError> {
211        let mut in_degree = vec![0i32; self.nodes.capacity()];
212        let mut queue = VecDeque::with_capacity(self.nodes.len());
213
214        if build_schedule {
215            self.schedule.reserve(self.nodes.len());
216        }
217
218        let mut num_visited = 0;
219
220        // Calculate in-degree of each vertex
221        for (_, node_entry) in self.nodes.iter() {
222            for edge in node_entry.outgoing.iter() {
223                in_degree[edge.dst_node.0.slot() as usize] += 1;
224            }
225        }
226
227        // Make sure that the graph in node is the first entry in the
228        // schedule. Otherwise a different root node could overwrite
229        // the buffers assigned to the graph in node.
230        queue.push_back(self.graph_in_id.0.slot());
231
232        // Enqueue all other nodes with 0 in-degree
233        for (_, node_entry) in self.nodes.iter() {
234            if node_entry.incoming.is_empty() && node_entry.id.0.slot() != self.graph_in_id.0.slot()
235            {
236                queue.push_back(node_entry.id.0.slot());
237            }
238        }
239
240        // BFS traversal
241        while let Some(node_slot) = queue.pop_front() {
242            num_visited += 1;
243
244            let (_, node_entry) = self.nodes.get_by_slot(node_slot).unwrap();
245
246            // Reduce in-degree of adjacent nodes
247            for edge in node_entry.outgoing.iter() {
248                in_degree[edge.dst_node.0.slot() as usize] -= 1;
249
250                // If in-degree becomes 0, enqueue it
251                if in_degree[edge.dst_node.0.slot() as usize] == 0 {
252                    queue.push_back(edge.dst_node.0.slot());
253                }
254            }
255
256            if build_schedule {
257                if node_slot != self.graph_out_id.0.slot() {
258                    self.schedule.push(ScheduledNode::new(
259                        node_entry.id,
260                        node_entry.info.debug_name,
261                    ));
262                }
263            }
264        }
265
266        if build_schedule {
267            // Make sure that the graph out node is the last entry in the
268            // schedule by waiting to push it after all other nodes have
269            // been pushed. Otherwise a different leaf node could overwrite
270            // the buffers assigned to the graph out node.
271            self.schedule
272                .push(ScheduledNode::new(self.graph_out_id, "graph_out"));
273        }
274
275        // If not all vertices are visited, cycle
276        if num_visited != self.nodes.len() {
277            return Err(CompileGraphError::CycleDetected);
278        }
279
280        Ok(self)
281    }
282
283    fn solve_buffer_requirements(mut self) -> Result<Self, CompileGraphError> {
284        let mut allocator = BufferAllocator::new(64);
285        let mut assignment_table: Arena<Rc<BufferRef>> =
286            Arena::with_capacity(self.edges.capacity());
287        let mut buffers_to_release: Vec<Rc<BufferRef>> = Vec::with_capacity(64);
288
289        for entry in &mut self.schedule {
290            // Collect the inputs to the algorithm, the incoming/outgoing edges of this node.
291
292            let node_entry = &self.nodes[entry.id.0];
293
294            let num_inputs = node_entry.info.channel_config.num_inputs.get() as usize;
295            let num_outputs = node_entry.info.channel_config.num_outputs.get() as usize;
296
297            buffers_to_release.clear();
298            if buffers_to_release.capacity() < num_inputs + num_outputs {
299                buffers_to_release
300                    .reserve(num_inputs + num_outputs - buffers_to_release.capacity());
301            }
302
303            entry.input_buffers.reserve_exact(num_inputs);
304            entry.output_buffers.reserve_exact(num_outputs);
305
306            for port_idx in 0..num_inputs as u32 {
307                let edges: SmallVec<[&Edge; 4]> = node_entry
308                    .incoming
309                    .iter()
310                    .filter(|edge| edge.dst_port == port_idx)
311                    .collect();
312
313                entry
314                    .in_connected_mask
315                    .set_channel(port_idx as usize, !edges.is_empty());
316
317                if edges.is_empty() {
318                    // Case 1: The port is an input and it is unconnected. Acquire a buffer, and
319                    //         assign it. The buffer must be cleared. Release the buffer once the
320                    //         node assignments are done.
321                    let buffer = allocator.acquire();
322                    entry.input_buffers.push(InBufferAssignment {
323                        buffer_index: buffer.idx,
324                        //generation: buffer.generation,
325                        should_clear: true,
326                    });
327                    buffers_to_release.push(buffer);
328                } else if edges.len() == 1 {
329                    // Case 2: The port is an input, and has exactly one incoming edge. Lookup the
330                    //         corresponding buffer and assign it. Buffer should not be cleared.
331                    //         Release the buffer once the node assignments are done.
332                    let buffer = assignment_table
333                        .remove(edges[0].id.0)
334                        .expect("No buffer assigned to edge!");
335                    entry.input_buffers.push(InBufferAssignment {
336                        buffer_index: buffer.idx,
337                        //generation: buffer.generation,
338                        should_clear: false,
339                    });
340                    buffers_to_release.push(buffer);
341                } else {
342                    // Case 3: The port is an input with multiple incoming edges. Compute the
343                    //         summing point, and assign the input buffer assignment to the output
344                    //         of the summing point.
345
346                    let sum_buffer = allocator.acquire();
347                    let sum_output = OutBufferAssignment {
348                        buffer_index: sum_buffer.idx,
349                        //generation: sum_buffer.generation,
350                    };
351
352                    // The sum inputs are the corresponding output buffers of the incoming edges.
353                    let sum_inputs = edges
354                        .iter()
355                        .map(|edge| {
356                            let buf = assignment_table
357                                .remove(edge.id.0)
358                                .expect("No buffer assigned to edge!");
359                            let assignment = InBufferAssignment {
360                                buffer_index: buf.idx,
361                                //generation: buf.generation,
362                                should_clear: false,
363                            };
364                            allocator.release(buf);
365                            assignment
366                        })
367                        .collect();
368
369                    entry.sum_inputs.push(InsertedSum {
370                        input_buffers: sum_inputs,
371                        output_buffer: sum_output,
372                    });
373
374                    // This node's input buffer is the sum output buffer. Release it once the node
375                    // assignments are done.
376                    entry.input_buffers.push(InBufferAssignment {
377                        buffer_index: sum_output.buffer_index,
378                        //generation: sum_output.generation,
379                        should_clear: false,
380                    });
381
382                    buffers_to_release.push(sum_buffer);
383                }
384            }
385
386            for port_idx in 0..num_outputs as u32 {
387                let edges: SmallVec<[&Edge; 4]> = node_entry
388                    .outgoing
389                    .iter()
390                    .filter(|edge| edge.src_port == port_idx)
391                    .collect();
392
393                entry
394                    .out_connected_mask
395                    .set_channel(port_idx as usize, !edges.is_empty());
396
397                if edges.is_empty() {
398                    // Case 1: The port is an output and it is unconnected. Acquire a buffer and
399                    //         assign it. The buffer does not need to be cleared. Release the
400                    //         buffer once the node assignments are done.
401                    let buffer = allocator.acquire();
402                    entry.output_buffers.push(OutBufferAssignment {
403                        buffer_index: buffer.idx,
404                        //generation: buffer.generation,
405                    });
406                    buffers_to_release.push(buffer);
407                } else {
408                    // Case 2: The port is an output. Acquire a buffer, and add to the assignment
409                    //         table with any corresponding edge IDs. For each edge, update the
410                    //         assigned buffer table. Buffer should not be cleared or released.
411                    let buffer = allocator.acquire();
412                    for edge in &edges {
413                        assignment_table.insert_at(edge.id.0, Rc::clone(&buffer));
414                    }
415                    entry.output_buffers.push(OutBufferAssignment {
416                        buffer_index: buffer.idx,
417                        //generation: buffer.generation,
418                    });
419                }
420            }
421
422            for buffer in buffers_to_release.drain(..) {
423                allocator.release(buffer);
424            }
425
426            self.max_in_buffers = self.max_in_buffers.max(num_inputs);
427            self.max_out_buffers = self.max_out_buffers.max(num_outputs);
428        }
429
430        self.max_num_buffers = allocator.num_buffers() as usize;
431        Ok(self)
432    }
433
434    /// Merge the GraphIR into a [CompiledSchedule].
435    fn merge(self) -> CompiledSchedule {
436        CompiledSchedule::new(
437            self.schedule,
438            self.max_num_buffers,
439            self.max_block_frames,
440            self.graph_in_id,
441        )
442    }
443}
444
445#[derive(Debug, Clone)]
446struct InsertedSum {
447    input_buffers: SmallVec<[InBufferAssignment; 4]>,
448    output_buffer: OutBufferAssignment,
449}