1use alloc::{collections::VecDeque, rc::Rc};
2use firewheel_core::node::{AudioNodeInfoInner, DynAudioNode, NodeID};
3use smallvec::SmallVec;
4use thunderdome::Arena;
5
6#[cfg(not(feature = "std"))]
7use bevy_platform::prelude::{vec, Box, Vec};
8
9use crate::error::CompileGraphError;
10
11mod schedule;
12
13pub use schedule::{CompiledSchedule, NodeHeapData, ScheduleHeapData};
14use schedule::{InBufferAssignment, OutBufferAssignment, PreProcNode, ScheduledNode};
15
16pub struct NodeEntry {
17 pub id: NodeID,
18 pub info: AudioNodeInfoInner,
19 pub dyn_node: Box<dyn DynAudioNode>,
20 pub processor_constructed: bool,
21 incoming: SmallVec<[Edge; 4]>,
23 outgoing: SmallVec<[Edge; 4]>,
25}
26
27impl NodeEntry {
28 pub fn new(info: AudioNodeInfoInner, dyn_node: Box<dyn DynAudioNode>) -> Self {
29 Self {
30 id: NodeID::DANGLING,
31 info,
32 dyn_node,
33 processor_constructed: false,
34 incoming: SmallVec::new(),
35 outgoing: SmallVec::new(),
36 }
37 }
38}
39
40pub type PortIdx = u32;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub struct EdgeID(pub(super) thunderdome::Index);
46
47#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)]
50pub struct Edge {
51 pub id: EdgeID,
52 pub src_node: NodeID,
54 pub src_port: PortIdx,
56 pub dst_node: NodeID,
58 pub dst_port: PortIdx,
60}
61
62#[derive(Debug, Clone, Copy)]
64struct BufferRef {
65 idx: usize,
67 generation: usize,
70}
71
72#[derive(Debug, Clone)]
74struct BufferAllocator {
75 free_list: Vec<BufferRef>,
77 count: usize,
79}
80
81impl BufferAllocator {
82 fn new(initial_capacity: usize) -> Self {
85 Self {
86 free_list: Vec::with_capacity(initial_capacity),
87 count: 0,
88 }
89 }
90
91 fn acquire(&mut self) -> Rc<BufferRef> {
93 let entry = self.free_list.pop().unwrap_or_else(|| {
94 let idx = self.count;
95 self.count += 1;
96 BufferRef { idx, generation: 0 }
97 });
98 Rc::new(BufferRef {
99 idx: entry.idx,
100 generation: entry.generation,
101 })
102 }
103
104 fn release(&mut self, buffer_ref: Rc<BufferRef>) {
106 if Rc::strong_count(&buffer_ref) == 1 {
107 self.free_list.push(BufferRef {
108 idx: buffer_ref.idx,
109 generation: buffer_ref.generation + 1,
110 });
111 }
112 }
113
114 fn num_buffers(self) -> usize {
116 self.count
117 }
118}
119
120pub fn compile(
122 nodes: &mut Arena<NodeEntry>,
123 edges: &mut Arena<Edge>,
124 graph_in_id: NodeID,
125 graph_out_id: NodeID,
126 max_block_frames: usize,
127) -> Result<CompiledSchedule, CompileGraphError> {
128 Ok(
129 GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, max_block_frames)
130 .sort_topologically(true)?
131 .solve_buffer_requirements()?
132 .merge(),
133 )
134}
135
136pub fn cycle_detected<'a>(
137 nodes: &'a mut Arena<NodeEntry>,
138 edges: &'a mut Arena<Edge>,
139 graph_in_id: NodeID,
140 graph_out_id: NodeID,
141) -> bool {
142 matches!(
143 GraphIR::preprocess(nodes, edges, graph_in_id, graph_out_id, 0).sort_topologically(false),
144 Err(CompileGraphError::CycleDetected)
145 )
146}
147
148struct GraphIR<'a> {
151 nodes: &'a mut Arena<NodeEntry>,
152 edges: &'a mut Arena<Edge>,
153
154 pre_proc_nodes: Vec<PreProcNode>,
157 schedule: Vec<ScheduledNode>,
159 max_num_buffers: usize,
161
162 graph_in_id: NodeID,
163 graph_out_id: NodeID,
164 max_in_buffers: usize,
165 max_out_buffers: usize,
166 max_block_frames: usize,
167}
168
169impl<'a> GraphIR<'a> {
170 fn preprocess(
173 nodes: &'a mut Arena<NodeEntry>,
174 edges: &'a mut Arena<Edge>,
175 graph_in_id: NodeID,
176 graph_out_id: NodeID,
177 max_block_frames: usize,
178 ) -> Self {
179 assert!(nodes.contains(graph_in_id.0));
180 assert!(nodes.contains(graph_out_id.0));
181
182 for (_, node) in nodes.iter_mut() {
183 node.incoming.clear();
184 node.outgoing.clear();
185 }
186
187 for (_, edge) in edges.iter() {
188 nodes[edge.src_node.0].outgoing.push(*edge);
189 nodes[edge.dst_node.0].incoming.push(*edge);
190
191 debug_assert_ne!(edge.src_node, graph_out_id);
192 debug_assert_ne!(edge.dst_node, graph_in_id);
193 }
194
195 Self {
196 nodes,
197 edges,
198 pre_proc_nodes: vec![],
199 schedule: vec![],
200 max_num_buffers: 0,
201 graph_in_id,
202 graph_out_id,
203 max_in_buffers: 0,
204 max_out_buffers: 0,
205 max_block_frames,
206 }
207 }
208
209 fn sort_topologically(mut self, build_schedule: bool) -> Result<Self, CompileGraphError> {
212 let mut in_degree = vec![0i32; self.nodes.capacity()];
213 let mut queue = VecDeque::with_capacity(self.nodes.len());
214
215 if build_schedule {
216 self.schedule.reserve(self.nodes.len());
217 }
218
219 let mut num_visited = 0;
220
221 for (_, node_entry) in self.nodes.iter() {
223 for edge in node_entry.outgoing.iter() {
224 in_degree[edge.dst_node.0.slot() as usize] += 1;
225 }
226 }
227
228 queue.push_back(self.graph_in_id.0.slot());
232
233 for (_, node_entry) in self.nodes.iter() {
235 if node_entry.incoming.is_empty() && node_entry.id.0.slot() != self.graph_in_id.0.slot()
236 {
237 if node_entry.info.channel_config.is_empty() {
240 self.pre_proc_nodes.push(PreProcNode {
241 id: node_entry.id,
242 debug_name: node_entry.info.debug_name,
243 });
244
245 num_visited += 1;
246 } else {
247 queue.push_back(node_entry.id.0.slot());
248 }
249 }
250 }
251
252 while let Some(node_slot) = queue.pop_front() {
254 num_visited += 1;
255
256 let (_, node_entry) = self.nodes.get_by_slot(node_slot).unwrap();
257
258 for edge in node_entry.outgoing.iter() {
260 in_degree[edge.dst_node.0.slot() as usize] -= 1;
261
262 if in_degree[edge.dst_node.0.slot() as usize] == 0 {
264 queue.push_back(edge.dst_node.0.slot());
265 }
266 }
267
268 if build_schedule && node_slot != self.graph_out_id.0.slot() {
269 self.schedule.push(ScheduledNode::new(
270 node_entry.id,
271 node_entry.info.debug_name,
272 ));
273 }
274 }
275
276 if build_schedule {
277 self.schedule
282 .push(ScheduledNode::new(self.graph_out_id, "graph_out"));
283 }
284
285 if num_visited != self.nodes.len() {
287 return Err(CompileGraphError::CycleDetected);
288 }
289
290 Ok(self)
291 }
292
293 fn solve_buffer_requirements(mut self) -> Result<Self, CompileGraphError> {
294 let mut allocator = BufferAllocator::new(64);
295 let mut assignment_table: Arena<Rc<BufferRef>> =
296 Arena::with_capacity(self.edges.capacity());
297 let mut buffers_to_release: Vec<Rc<BufferRef>> = Vec::with_capacity(64);
298
299 for entry in &mut self.schedule {
300 let node_entry = &self.nodes[entry.id.0];
303
304 let num_inputs = node_entry.info.channel_config.num_inputs.get() as usize;
305 let num_outputs = node_entry.info.channel_config.num_outputs.get() as usize;
306
307 buffers_to_release.clear();
308 if buffers_to_release.capacity() < num_inputs + num_outputs {
309 buffers_to_release
310 .reserve(num_inputs + num_outputs - buffers_to_release.capacity());
311 }
312
313 entry.input_buffers.reserve_exact(num_inputs);
314 entry.output_buffers.reserve_exact(num_outputs);
315
316 for port_idx in 0..num_inputs as u32 {
317 let edges: SmallVec<[&Edge; 4]> = node_entry
318 .incoming
319 .iter()
320 .filter(|edge| edge.dst_port == port_idx)
321 .collect();
322
323 entry
324 .in_connected_mask
325 .set_channel(port_idx as usize, !edges.is_empty());
326
327 if edges.is_empty() {
328 let buffer = allocator.acquire();
332 entry.input_buffers.push(InBufferAssignment {
333 buffer_index: buffer.idx,
334 should_clear: true,
336 });
337 buffers_to_release.push(buffer);
338 } else if edges.len() == 1 {
339 let buffer = assignment_table
343 .remove(edges[0].id.0)
344 .expect("No buffer assigned to edge!");
345 entry.input_buffers.push(InBufferAssignment {
346 buffer_index: buffer.idx,
347 should_clear: false,
349 });
350 buffers_to_release.push(buffer);
351 } else {
352 let sum_buffer = allocator.acquire();
357 let sum_output = OutBufferAssignment {
358 buffer_index: sum_buffer.idx,
359 };
361
362 let sum_inputs = edges
364 .iter()
365 .map(|edge| {
366 let buf = assignment_table
367 .remove(edge.id.0)
368 .expect("No buffer assigned to edge!");
369 let assignment = InBufferAssignment {
370 buffer_index: buf.idx,
371 should_clear: false,
373 };
374 allocator.release(buf);
375 assignment
376 })
377 .collect();
378
379 entry.sum_inputs.push(InsertedSum {
380 input_buffers: sum_inputs,
381 output_buffer: sum_output,
382 });
383
384 entry.input_buffers.push(InBufferAssignment {
387 buffer_index: sum_output.buffer_index,
388 should_clear: false,
390 });
391
392 buffers_to_release.push(sum_buffer);
393 }
394 }
395
396 for port_idx in 0..num_outputs as u32 {
397 let edges: SmallVec<[&Edge; 4]> = node_entry
398 .outgoing
399 .iter()
400 .filter(|edge| edge.src_port == port_idx)
401 .collect();
402
403 entry
404 .out_connected_mask
405 .set_channel(port_idx as usize, !edges.is_empty());
406
407 if edges.is_empty() {
408 let buffer = allocator.acquire();
412 entry.output_buffers.push(OutBufferAssignment {
413 buffer_index: buffer.idx,
414 });
416 buffers_to_release.push(buffer);
417 } else {
418 let buffer = allocator.acquire();
422 for edge in &edges {
423 assignment_table.insert_at(edge.id.0, Rc::clone(&buffer));
424 }
425 entry.output_buffers.push(OutBufferAssignment {
426 buffer_index: buffer.idx,
427 });
429 }
430 }
431
432 for buffer in buffers_to_release.drain(..) {
433 allocator.release(buffer);
434 }
435
436 self.max_in_buffers = self.max_in_buffers.max(num_inputs);
437 self.max_out_buffers = self.max_out_buffers.max(num_outputs);
438 }
439
440 self.max_num_buffers = allocator.num_buffers();
441 Ok(self)
442 }
443
444 fn merge(self) -> CompiledSchedule {
446 CompiledSchedule::new(
447 self.pre_proc_nodes,
448 self.schedule,
449 self.max_num_buffers,
450 self.max_block_frames,
451 self.graph_in_id,
452 )
453 }
454}
455
456#[derive(Debug, Clone)]
457struct InsertedSum {
458 input_buffers: SmallVec<[InBufferAssignment; 4]>,
459 output_buffer: OutBufferAssignment,
460}