1use alloc::{collections::VecDeque, rc::Rc};
2use firewheel_core::node::{AudioNodeInfoInner, DynAudioNode, NodeID};
3use smallvec::SmallVec;
4use thunderdome::Arena;
5
6#[cfg(not(feature = "std"))]
7use bevy_platform::prelude::{vec, Box, Vec};
8
9use crate::error::CompileGraphError;
10
11mod schedule;
12
13pub use schedule::{CompiledSchedule, NodeHeapData, ScheduleHeapData};
14use schedule::{InBufferAssignment, OutBufferAssignment, PreProcNode, ScheduledNode};
15
16pub struct NodeEntry {
17 pub id: NodeID,
18 pub info: AudioNodeInfoInner,
19 pub dyn_node: Box<dyn DynAudioNode>,
20 pub processor_constructed: bool,
21 incoming: SmallVec<[Edge; 4]>,
23 outgoing: SmallVec<[Edge; 4]>,
25}
26
27impl NodeEntry {
28 pub fn new(info: AudioNodeInfoInner, dyn_node: Box<dyn DynAudioNode>) -> Self {
29 Self {
30 id: NodeID::DANGLING,
31 info,
32 dyn_node,
33 processor_constructed: false,
34 incoming: SmallVec::new(),
35 outgoing: SmallVec::new(),
36 }
37 }
38}
39
40pub type PortIdx = u32;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub struct EdgeID(pub(super) thunderdome::Index);
46
47#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)]
50pub struct Edge {
51 pub id: EdgeID,
52 pub src_node: NodeID,
54 pub src_port: PortIdx,
56 pub dst_node: NodeID,
58 pub dst_port: PortIdx,
60}
61
62#[derive(Debug, Clone, Copy)]
64struct BufferRef {
65 idx: usize,
67 generation: usize,
70}
71
72#[derive(Debug, Clone)]
74struct BufferAllocator {
75 free_list: Vec<BufferRef>,
77 count: usize,
79}
80
81impl BufferAllocator {
82 fn new(initial_capacity: usize) -> Self {
85 Self {
86 free_list: Vec::with_capacity(initial_capacity),
87 count: 0,
88 }
89 }
90
91 fn acquire(&mut self) -> Rc<BufferRef> {
93 let entry = self.free_list.pop().unwrap_or_else(|| {
94 let idx = self.count;
95 self.count += 1;
96 BufferRef { idx, generation: 0 }
97 });
98 Rc::new(BufferRef {
99 idx: entry.idx,
100 generation: entry.generation,
101 })
102 }
103
104 fn release(&mut self, buffer_ref: Rc<BufferRef>) {
106 if Rc::strong_count(&buffer_ref) == 1 {
107 self.free_list.push(BufferRef {
108 idx: buffer_ref.idx,
109 generation: buffer_ref.generation + 1,
110 });
111 }
112 }
113
114 fn num_buffers(self) -> usize {
116 self.count
117 }
118}
119
120pub fn compile(
122 nodes: &mut Arena<NodeEntry>,
123 edges: &mut Arena<Edge>,
124 graph_in_id: NodeID,
125 graph_out_id: NodeID,
126 max_block_frames: usize,
127) -> Result<CompiledSchedule, CompileGraphError> {
128 Ok(
129 GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, max_block_frames)
130 .sort_topologically(true)?
131 .solve_buffer_requirements()?
132 .merge(),
133 )
134}
135
136pub fn cycle_detected<'a>(
137 nodes: &'a mut Arena<NodeEntry>,
138 edges: &'a mut Arena<Edge>,
139 graph_in_id: NodeID,
140 graph_out_id: NodeID,
141) -> bool {
142 if let Err(CompileGraphError::CycleDetected) =
143 GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, 0).sort_topologically(false)
144 {
145 true
146 } else {
147 false
148 }
149}
150
151struct GraphIR<'a> {
154 nodes: &'a mut Arena<NodeEntry>,
155 edges: &'a mut Arena<Edge>,
156
157 pre_proc_nodes: Vec<PreProcNode>,
160 schedule: Vec<ScheduledNode>,
162 max_num_buffers: usize,
164
165 graph_in_id: NodeID,
166 graph_out_id: NodeID,
167 max_in_buffers: usize,
168 max_out_buffers: usize,
169 max_block_frames: usize,
170}
171
172impl<'a> GraphIR<'a> {
173 fn preprocess(
176 nodes: &'a mut Arena<NodeEntry>,
177 edges: &'a mut Arena<Edge>,
178 graph_in_id: NodeID,
179 graph_out_id: NodeID,
180 max_block_frames: usize,
181 ) -> Self {
182 assert!(nodes.contains(graph_in_id.0));
183 assert!(nodes.contains(graph_out_id.0));
184
185 for (_, node) in nodes.iter_mut() {
186 node.incoming.clear();
187 node.outgoing.clear();
188 }
189
190 for (_, edge) in edges.iter() {
191 nodes[edge.src_node.0].outgoing.push(*edge);
192 nodes[edge.dst_node.0].incoming.push(*edge);
193
194 debug_assert_ne!(edge.src_node, graph_out_id);
195 debug_assert_ne!(edge.dst_node, graph_in_id);
196 }
197
198 Self {
199 nodes,
200 edges,
201 pre_proc_nodes: vec![],
202 schedule: vec![],
203 max_num_buffers: 0,
204 graph_in_id,
205 graph_out_id,
206 max_in_buffers: 0,
207 max_out_buffers: 0,
208 max_block_frames,
209 }
210 }
211
212 fn sort_topologically(mut self, build_schedule: bool) -> Result<Self, CompileGraphError> {
215 let mut in_degree = vec![0i32; self.nodes.capacity()];
216 let mut queue = VecDeque::with_capacity(self.nodes.len());
217
218 if build_schedule {
219 self.schedule.reserve(self.nodes.len());
220 }
221
222 let mut num_visited = 0;
223
224 for (_, node_entry) in self.nodes.iter() {
226 for edge in node_entry.outgoing.iter() {
227 in_degree[edge.dst_node.0.slot() as usize] += 1;
228 }
229 }
230
231 queue.push_back(self.graph_in_id.0.slot());
235
236 for (_, node_entry) in self.nodes.iter() {
238 if node_entry.incoming.is_empty() && node_entry.id.0.slot() != self.graph_in_id.0.slot()
239 {
240 if node_entry.info.channel_config.is_empty() {
243 self.pre_proc_nodes.push(PreProcNode {
244 id: node_entry.id,
245 debug_name: node_entry.info.debug_name,
246 });
247
248 num_visited += 1;
249 } else {
250 queue.push_back(node_entry.id.0.slot());
251 }
252 }
253 }
254
255 while let Some(node_slot) = queue.pop_front() {
257 num_visited += 1;
258
259 let (_, node_entry) = self.nodes.get_by_slot(node_slot).unwrap();
260
261 for edge in node_entry.outgoing.iter() {
263 in_degree[edge.dst_node.0.slot() as usize] -= 1;
264
265 if in_degree[edge.dst_node.0.slot() as usize] == 0 {
267 queue.push_back(edge.dst_node.0.slot());
268 }
269 }
270
271 if build_schedule {
272 if node_slot != self.graph_out_id.0.slot() {
273 self.schedule.push(ScheduledNode::new(
274 node_entry.id,
275 node_entry.info.debug_name,
276 ));
277 }
278 }
279 }
280
281 if build_schedule {
282 self.schedule
287 .push(ScheduledNode::new(self.graph_out_id, "graph_out"));
288 }
289
290 if num_visited != self.nodes.len() {
292 return Err(CompileGraphError::CycleDetected);
293 }
294
295 Ok(self)
296 }
297
298 fn solve_buffer_requirements(mut self) -> Result<Self, CompileGraphError> {
299 let mut allocator = BufferAllocator::new(64);
300 let mut assignment_table: Arena<Rc<BufferRef>> =
301 Arena::with_capacity(self.edges.capacity());
302 let mut buffers_to_release: Vec<Rc<BufferRef>> = Vec::with_capacity(64);
303
304 for entry in &mut self.schedule {
305 let node_entry = &self.nodes[entry.id.0];
308
309 let num_inputs = node_entry.info.channel_config.num_inputs.get() as usize;
310 let num_outputs = node_entry.info.channel_config.num_outputs.get() as usize;
311
312 buffers_to_release.clear();
313 if buffers_to_release.capacity() < num_inputs + num_outputs {
314 buffers_to_release
315 .reserve(num_inputs + num_outputs - buffers_to_release.capacity());
316 }
317
318 entry.input_buffers.reserve_exact(num_inputs);
319 entry.output_buffers.reserve_exact(num_outputs);
320
321 for port_idx in 0..num_inputs as u32 {
322 let edges: SmallVec<[&Edge; 4]> = node_entry
323 .incoming
324 .iter()
325 .filter(|edge| edge.dst_port == port_idx)
326 .collect();
327
328 entry
329 .in_connected_mask
330 .set_channel(port_idx as usize, !edges.is_empty());
331
332 if edges.is_empty() {
333 let buffer = allocator.acquire();
337 entry.input_buffers.push(InBufferAssignment {
338 buffer_index: buffer.idx,
339 should_clear: true,
341 });
342 buffers_to_release.push(buffer);
343 } else if edges.len() == 1 {
344 let buffer = assignment_table
348 .remove(edges[0].id.0)
349 .expect("No buffer assigned to edge!");
350 entry.input_buffers.push(InBufferAssignment {
351 buffer_index: buffer.idx,
352 should_clear: false,
354 });
355 buffers_to_release.push(buffer);
356 } else {
357 let sum_buffer = allocator.acquire();
362 let sum_output = OutBufferAssignment {
363 buffer_index: sum_buffer.idx,
364 };
366
367 let sum_inputs = edges
369 .iter()
370 .map(|edge| {
371 let buf = assignment_table
372 .remove(edge.id.0)
373 .expect("No buffer assigned to edge!");
374 let assignment = InBufferAssignment {
375 buffer_index: buf.idx,
376 should_clear: false,
378 };
379 allocator.release(buf);
380 assignment
381 })
382 .collect();
383
384 entry.sum_inputs.push(InsertedSum {
385 input_buffers: sum_inputs,
386 output_buffer: sum_output,
387 });
388
389 entry.input_buffers.push(InBufferAssignment {
392 buffer_index: sum_output.buffer_index,
393 should_clear: false,
395 });
396
397 buffers_to_release.push(sum_buffer);
398 }
399 }
400
401 for port_idx in 0..num_outputs as u32 {
402 let edges: SmallVec<[&Edge; 4]> = node_entry
403 .outgoing
404 .iter()
405 .filter(|edge| edge.src_port == port_idx)
406 .collect();
407
408 entry
409 .out_connected_mask
410 .set_channel(port_idx as usize, !edges.is_empty());
411
412 if edges.is_empty() {
413 let buffer = allocator.acquire();
417 entry.output_buffers.push(OutBufferAssignment {
418 buffer_index: buffer.idx,
419 });
421 buffers_to_release.push(buffer);
422 } else {
423 let buffer = allocator.acquire();
427 for edge in &edges {
428 assignment_table.insert_at(edge.id.0, Rc::clone(&buffer));
429 }
430 entry.output_buffers.push(OutBufferAssignment {
431 buffer_index: buffer.idx,
432 });
434 }
435 }
436
437 for buffer in buffers_to_release.drain(..) {
438 allocator.release(buffer);
439 }
440
441 self.max_in_buffers = self.max_in_buffers.max(num_inputs);
442 self.max_out_buffers = self.max_out_buffers.max(num_outputs);
443 }
444
445 self.max_num_buffers = allocator.num_buffers() as usize;
446 Ok(self)
447 }
448
449 fn merge(self) -> CompiledSchedule {
451 CompiledSchedule::new(
452 self.pre_proc_nodes,
453 self.schedule,
454 self.max_num_buffers,
455 self.max_block_frames,
456 self.graph_in_id,
457 )
458 }
459}
460
461#[derive(Debug, Clone)]
462struct InsertedSum {
463 input_buffers: SmallVec<[InBufferAssignment; 4]>,
464 output_buffer: OutBufferAssignment,
465}