Skip to main content

firewheel_graph/graph/
compiler.rs

1use alloc::{collections::VecDeque, rc::Rc};
2use firewheel_core::node::{AudioNodeInfoInner, DynAudioNode, NodeID};
3use smallvec::SmallVec;
4use thunderdome::Arena;
5
6#[cfg(not(feature = "std"))]
7use bevy_platform::prelude::{vec, Box, Vec};
8
9use crate::error::CompileGraphError;
10
11mod schedule;
12
13pub use schedule::{CompiledSchedule, NodeHeapData, ScheduleHeapData};
14use schedule::{InBufferAssignment, OutBufferAssignment, PreProcNode, ScheduledNode};
15
16pub struct NodeEntry {
17    pub id: NodeID,
18    pub info: AudioNodeInfoInner,
19    pub dyn_node: Box<dyn DynAudioNode>,
20    pub processor_constructed: bool,
21    /// The edges connected to this node's input ports.
22    incoming: SmallVec<[Edge; 4]>,
23    /// The edges connected to this node's output ports.
24    outgoing: SmallVec<[Edge; 4]>,
25}
26
27impl NodeEntry {
28    pub fn new(info: AudioNodeInfoInner, dyn_node: Box<dyn DynAudioNode>) -> Self {
29        Self {
30            id: NodeID::DANGLING,
31            info,
32            dyn_node,
33            processor_constructed: false,
34            incoming: SmallVec::new(),
35            outgoing: SmallVec::new(),
36        }
37    }
38}
39
40/// The index of an input/output port on a particular node.
41pub type PortIdx = u32;
42
43/// A globally unique identifier for an [Edge].
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub struct EdgeID(pub(super) thunderdome::Index);
46
47/// An [Edge] is a connection from source node and port to a
48/// destination node and port.
49#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)]
50pub struct Edge {
51    pub id: EdgeID,
52    /// The ID of the source node used by this edge.
53    pub src_node: NodeID,
54    /// The ID of the source port used by this edge.
55    pub src_port: PortIdx,
56    /// The ID of the destination node used by this edge.
57    pub dst_node: NodeID,
58    /// The ID of the destination port used by this edge.
59    pub dst_port: PortIdx,
60}
61
62/// A reference to an abstract buffer during buffer allocation.
63#[derive(Debug, Clone, Copy)]
64struct BufferRef {
65    /// The index of the buffer
66    idx: usize,
67    /// The generation, or the nth time this buffer has
68    /// been assigned to a different edge in the graph.
69    generation: usize,
70}
71
72/// An allocator for managing and reusing [BufferRef]s.
73#[derive(Debug, Clone)]
74struct BufferAllocator {
75    /// A list of free buffers that may be reallocated
76    free_list: Vec<BufferRef>,
77    /// The maximum number of buffers used
78    count: usize,
79}
80
81impl BufferAllocator {
82    /// Create a new allocator, `num_types` defines the number
83    /// of buffer types we may allocate.
84    fn new(initial_capacity: usize) -> Self {
85        Self {
86            free_list: Vec::with_capacity(initial_capacity),
87            count: 0,
88        }
89    }
90
91    /// Acquire a new buffer
92    fn acquire(&mut self) -> Rc<BufferRef> {
93        let entry = self.free_list.pop().unwrap_or_else(|| {
94            let idx = self.count;
95            self.count += 1;
96            BufferRef { idx, generation: 0 }
97        });
98        Rc::new(BufferRef {
99            idx: entry.idx,
100            generation: entry.generation,
101        })
102    }
103
104    /// Release a BufferRef
105    fn release(&mut self, buffer_ref: Rc<BufferRef>) {
106        if Rc::strong_count(&buffer_ref) == 1 {
107            self.free_list.push(BufferRef {
108                idx: buffer_ref.idx,
109                generation: buffer_ref.generation + 1,
110            });
111        }
112    }
113
114    /// Consume the allocator to return the maximum number of buffers used
115    fn num_buffers(self) -> usize {
116        self.count
117    }
118}
119
120/// Main compilation algorithm
121pub fn compile(
122    nodes: &mut Arena<NodeEntry>,
123    edges: &mut Arena<Edge>,
124    graph_in_id: NodeID,
125    graph_out_id: NodeID,
126    max_block_frames: usize,
127) -> Result<CompiledSchedule, CompileGraphError> {
128    Ok(
129        GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, max_block_frames)
130            .sort_topologically(true)?
131            .solve_buffer_requirements()?
132            .merge(),
133    )
134}
135
136pub fn cycle_detected<'a>(
137    nodes: &'a mut Arena<NodeEntry>,
138    edges: &'a mut Arena<Edge>,
139    graph_in_id: NodeID,
140    graph_out_id: NodeID,
141) -> bool {
142    matches!(
143        GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, 0).sort_topologically(false),
144        Err(CompileGraphError::CycleDetected)
145    )
146}
147
148/// Internal IR used by the compiler algorithm. Built incrementally
149/// via the compiler passes.
150struct GraphIR<'a> {
151    nodes: &'a mut Arena<NodeEntry>,
152    edges: &'a mut Arena<Edge>,
153
154    /// Nodes with zero inputs and outputs are "pre process nodes" that get
155    /// processed before all other nodes.
156    pre_proc_nodes: Vec<PreProcNode>,
157    /// The topologically sorted schedule of the graph. Built internally.
158    schedule: Vec<ScheduledNode>,
159    /// The maximum number of buffers used.
160    max_num_buffers: usize,
161
162    graph_in_id: NodeID,
163    graph_out_id: NodeID,
164    max_in_buffers: usize,
165    max_out_buffers: usize,
166    max_block_frames: usize,
167}
168
169impl<'a> GraphIR<'a> {
170    /// Construct a [GraphIR] instance from lists of nodes and edges, building
171    /// up the adjacency table and creating an empty schedule.
172    fn preprocess(
173        nodes: &'a mut Arena<NodeEntry>,
174        edges: &'a mut Arena<Edge>,
175        graph_in_id: NodeID,
176        graph_out_id: NodeID,
177        max_block_frames: usize,
178    ) -> Self {
179        assert!(nodes.contains(graph_in_id.0));
180        assert!(nodes.contains(graph_out_id.0));
181
182        for (_, node) in nodes.iter_mut() {
183            node.incoming.clear();
184            node.outgoing.clear();
185        }
186
187        for (_, edge) in edges.iter() {
188            nodes[edge.src_node.0].outgoing.push(*edge);
189            nodes[edge.dst_node.0].incoming.push(*edge);
190
191            debug_assert_ne!(edge.src_node, graph_out_id);
192            debug_assert_ne!(edge.dst_node, graph_in_id);
193        }
194
195        Self {
196            nodes,
197            edges,
198            pre_proc_nodes: vec![],
199            schedule: vec![],
200            max_num_buffers: 0,
201            graph_in_id,
202            graph_out_id,
203            max_in_buffers: 0,
204            max_out_buffers: 0,
205            max_block_frames,
206        }
207    }
208
209    /// Sort the nodes topologically using Kahn's algorithm.
210    /// <https://www.geeksforgeeks.org/topological-sorting-indegree-based-solution/>
211    fn sort_topologically(mut self, build_schedule: bool) -> Result<Self, CompileGraphError> {
212        let mut in_degree = vec![0i32; self.nodes.capacity()];
213        let mut queue = VecDeque::with_capacity(self.nodes.len());
214
215        if build_schedule {
216            self.schedule.reserve(self.nodes.len());
217        }
218
219        let mut num_visited = 0;
220
221        // Calculate in-degree of each vertex
222        for (_, node_entry) in self.nodes.iter() {
223            for edge in node_entry.outgoing.iter() {
224                in_degree[edge.dst_node.0.slot() as usize] += 1;
225            }
226        }
227
228        // Make sure that the graph in node is the first entry in the
229        // schedule. Otherwise a different root node could overwrite
230        // the buffers assigned to the graph in node.
231        queue.push_back(self.graph_in_id.0.slot());
232
233        // Enqueue all other nodes with 0 in-degree
234        for (_, node_entry) in self.nodes.iter() {
235            if node_entry.incoming.is_empty() && node_entry.id.0.slot() != self.graph_in_id.0.slot()
236            {
237                // If the number of inputs and outputs on a node is zero, then it
238                // is a "pre process" node.
239                if node_entry.info.channel_config.is_empty() {
240                    self.pre_proc_nodes.push(PreProcNode {
241                        id: node_entry.id,
242                        debug_name: node_entry.info.debug_name,
243                    });
244
245                    num_visited += 1;
246                } else {
247                    queue.push_back(node_entry.id.0.slot());
248                }
249            }
250        }
251
252        // BFS traversal
253        while let Some(node_slot) = queue.pop_front() {
254            num_visited += 1;
255
256            let (_, node_entry) = self.nodes.get_by_slot(node_slot).unwrap();
257
258            // Reduce in-degree of adjacent nodes
259            for edge in node_entry.outgoing.iter() {
260                in_degree[edge.dst_node.0.slot() as usize] -= 1;
261
262                // If in-degree becomes 0, enqueue it
263                if in_degree[edge.dst_node.0.slot() as usize] == 0 {
264                    queue.push_back(edge.dst_node.0.slot());
265                }
266            }
267
268            if build_schedule && node_slot != self.graph_out_id.0.slot() {
269                self.schedule.push(ScheduledNode::new(
270                    node_entry.id,
271                    node_entry.info.debug_name,
272                ));
273            }
274        }
275
276        if build_schedule {
277            // Make sure that the graph out node is the last entry in the
278            // schedule by waiting to push it after all other nodes have
279            // been pushed. Otherwise a different leaf node could overwrite
280            // the buffers assigned to the graph out node.
281            self.schedule
282                .push(ScheduledNode::new(self.graph_out_id, "graph_out"));
283        }
284
285        // If not all vertices are visited, cycle
286        if num_visited != self.nodes.len() {
287            return Err(CompileGraphError::CycleDetected);
288        }
289
290        Ok(self)
291    }
292
293    fn solve_buffer_requirements(mut self) -> Result<Self, CompileGraphError> {
294        let mut allocator = BufferAllocator::new(64);
295        let mut assignment_table: Arena<Rc<BufferRef>> =
296            Arena::with_capacity(self.edges.capacity());
297        let mut buffers_to_release: Vec<Rc<BufferRef>> = Vec::with_capacity(64);
298
299        for entry in &mut self.schedule {
300            // Collect the inputs to the algorithm, the incoming/outgoing edges of this node.
301
302            let node_entry = &self.nodes[entry.id.0];
303
304            let num_inputs = node_entry.info.channel_config.num_inputs.get() as usize;
305            let num_outputs = node_entry.info.channel_config.num_outputs.get() as usize;
306
307            buffers_to_release.clear();
308            if buffers_to_release.capacity() < num_inputs + num_outputs {
309                buffers_to_release
310                    .reserve(num_inputs + num_outputs - buffers_to_release.capacity());
311            }
312
313            entry.input_buffers.reserve_exact(num_inputs);
314            entry.output_buffers.reserve_exact(num_outputs);
315
316            for port_idx in 0..num_inputs as u32 {
317                let edges: SmallVec<[&Edge; 4]> = node_entry
318                    .incoming
319                    .iter()
320                    .filter(|edge| edge.dst_port == port_idx)
321                    .collect();
322
323                entry
324                    .in_connected_mask
325                    .set_channel(port_idx as usize, !edges.is_empty());
326
327                if edges.is_empty() {
328                    // Case 1: The port is an input and it is unconnected. Acquire a buffer, and
329                    //         assign it. The buffer must be cleared. Release the buffer once the
330                    //         node assignments are done.
331                    let buffer = allocator.acquire();
332                    entry.input_buffers.push(InBufferAssignment {
333                        buffer_index: buffer.idx,
334                        //generation: buffer.generation,
335                        should_clear: true,
336                    });
337                    buffers_to_release.push(buffer);
338                } else if edges.len() == 1 {
339                    // Case 2: The port is an input, and has exactly one incoming edge. Lookup the
340                    //         corresponding buffer and assign it. Buffer should not be cleared.
341                    //         Release the buffer once the node assignments are done.
342                    let buffer = assignment_table
343                        .remove(edges[0].id.0)
344                        .expect("No buffer assigned to edge!");
345                    entry.input_buffers.push(InBufferAssignment {
346                        buffer_index: buffer.idx,
347                        //generation: buffer.generation,
348                        should_clear: false,
349                    });
350                    buffers_to_release.push(buffer);
351                } else {
352                    // Case 3: The port is an input with multiple incoming edges. Compute the
353                    //         summing point, and assign the input buffer assignment to the output
354                    //         of the summing point.
355
356                    let sum_buffer = allocator.acquire();
357                    let sum_output = OutBufferAssignment {
358                        buffer_index: sum_buffer.idx,
359                        //generation: sum_buffer.generation,
360                    };
361
362                    // The sum inputs are the corresponding output buffers of the incoming edges.
363                    let sum_inputs = edges
364                        .iter()
365                        .map(|edge| {
366                            let buf = assignment_table
367                                .remove(edge.id.0)
368                                .expect("No buffer assigned to edge!");
369                            let assignment = InBufferAssignment {
370                                buffer_index: buf.idx,
371                                //generation: buf.generation,
372                                should_clear: false,
373                            };
374                            allocator.release(buf);
375                            assignment
376                        })
377                        .collect();
378
379                    entry.sum_inputs.push(InsertedSum {
380                        input_buffers: sum_inputs,
381                        output_buffer: sum_output,
382                    });
383
384                    // This node's input buffer is the sum output buffer. Release it once the node
385                    // assignments are done.
386                    entry.input_buffers.push(InBufferAssignment {
387                        buffer_index: sum_output.buffer_index,
388                        //generation: sum_output.generation,
389                        should_clear: false,
390                    });
391
392                    buffers_to_release.push(sum_buffer);
393                }
394            }
395
396            for port_idx in 0..num_outputs as u32 {
397                let edges: SmallVec<[&Edge; 4]> = node_entry
398                    .outgoing
399                    .iter()
400                    .filter(|edge| edge.src_port == port_idx)
401                    .collect();
402
403                entry
404                    .out_connected_mask
405                    .set_channel(port_idx as usize, !edges.is_empty());
406
407                if edges.is_empty() {
408                    // Case 1: The port is an output and it is unconnected. Acquire a buffer and
409                    //         assign it. The buffer does not need to be cleared. Release the
410                    //         buffer once the node assignments are done.
411                    let buffer = allocator.acquire();
412                    entry.output_buffers.push(OutBufferAssignment {
413                        buffer_index: buffer.idx,
414                        //generation: buffer.generation,
415                    });
416                    buffers_to_release.push(buffer);
417                } else {
418                    // Case 2: The port is an output. Acquire a buffer, and add to the assignment
419                    //         table with any corresponding edge IDs. For each edge, update the
420                    //         assigned buffer table. Buffer should not be cleared or released.
421                    let buffer = allocator.acquire();
422                    for edge in &edges {
423                        assignment_table.insert_at(edge.id.0, Rc::clone(&buffer));
424                    }
425                    entry.output_buffers.push(OutBufferAssignment {
426                        buffer_index: buffer.idx,
427                        //generation: buffer.generation,
428                    });
429                }
430            }
431
432            for buffer in buffers_to_release.drain(..) {
433                allocator.release(buffer);
434            }
435
436            self.max_in_buffers = self.max_in_buffers.max(num_inputs);
437            self.max_out_buffers = self.max_out_buffers.max(num_outputs);
438        }
439
440        self.max_num_buffers = allocator.num_buffers();
441        Ok(self)
442    }
443
444    /// Merge the GraphIR into a [CompiledSchedule].
445    fn merge(self) -> CompiledSchedule {
446        CompiledSchedule::new(
447            self.pre_proc_nodes,
448            self.schedule,
449            self.max_num_buffers,
450            self.max_block_frames,
451            self.graph_in_id,
452        )
453    }
454}
455
456#[derive(Debug, Clone)]
457struct InsertedSum {
458    input_buffers: SmallVec<[InBufferAssignment; 4]>,
459    output_buffer: OutBufferAssignment,
460}