aether_core/scheduler.rs
1//! Real-time audio scheduler.
2//!
3//! This is the hot path. It:
4//! 1. Drains bounded commands from the SPSC ring.
5//! 2. Executes the topologically sorted node list level by level.
6//! Nodes within the same BFS level are independent and run in parallel
7//! via Rayon's work-stealing thread pool.
8//! 3. Copies the output node's buffer to the DAC output.
9//!
10//! HARD RT RULES enforced here:
11//! - No allocation (Vec<NodeTask> is pre-allocated per level, bounded by MAX_NODES)
12//! - No locks
13//! - No I/O
14//! - No unbounded loops
15
16use ringbuf::traits::Consumer;
17
18use crate::{
19 arena::NodeId,
20 command::Command,
21 graph::DspGraph,
22 node::DspNode,
23 param::ParamBlock,
24 BUFFER_SIZE, MAX_COMMANDS_PER_TICK, MAX_INPUTS,
25};
26
27// ── Parallel dispatch helpers ─────────────────────────────────────────────────
28
29/// Per-node data bundle collected before parallel dispatch.
30///
31/// SAFETY INVARIANT: Within a single BFS level, every node writes to a distinct
32/// `BufferId` (guaranteed by the DAG structure — no two nodes in the same level
33/// share an output buffer). The `BufferPool` stores buffers in a flat `Vec`, so
34/// tasks writing to different `BufferId`s write to non-overlapping index ranges.
35/// This makes the concurrent writes safe despite using raw pointers.
36struct NodeTask {
37 output_buf_ptr: *mut [f32; BUFFER_SIZE],
38 params_ptr: *mut ParamBlock,
39 processor_ptr: *mut dyn DspNode,
40 inputs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS],
41}
42
43/// SAFETY: Within a BFS level each task accesses disjoint memory:
44/// - distinct output buffer (different BufferId → different Vec index range)
45/// - distinct processor and params (each belongs to exactly one NodeRecord)
46///
47/// No two tasks in the same level share any pointed-to memory.
48unsafe impl Send for NodeTask {}
49unsafe impl Sync for NodeTask {}
50
51// ── Scheduler ─────────────────────────────────────────────────────────────────
52
53/// Real-time audio scheduler.
54///
55/// The scheduler owns the DSP graph and processes audio in fixed-size blocks
56/// (64 samples by default). It executes nodes in topologically sorted order,
57/// with nodes at the same BFS level running in parallel via Rayon.
58///
59/// # Real-Time Safety
60///
61/// - ✅ No allocation in audio thread
62/// - ✅ No locks in audio thread
63/// - ✅ Bounded execution time
64/// - ✅ Lock-free command processing via SPSC ring
65///
66/// # Example
67///
68/// ```
69/// use aether_core::scheduler::Scheduler;
70/// use aether_core::node::DspNode;
71/// use aether_core::param::ParamBlock;
72/// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
73///
74/// // Create a simple oscillator node
75/// struct Oscillator {
76/// frequency: f32,
77/// phase: f32,
78/// }
79///
80/// impl DspNode for Oscillator {
81/// fn process(&mut self, _inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
82/// output: &mut [f32; BUFFER_SIZE], _params: &mut ParamBlock, sample_rate: f32) {
83/// let phase_inc = self.frequency / sample_rate;
84/// for sample in output.iter_mut() {
85/// *sample = (self.phase * std::f32::consts::TAU).sin() * 0.3;
86/// self.phase = (self.phase + phase_inc).fract();
87/// }
88/// }
89/// fn type_name(&self) -> &'static str { "Oscillator" }
90/// }
91///
92/// // Create scheduler and add node
93/// let mut sched = Scheduler::new(48_000.0);
94/// let osc = Box::new(Oscillator { frequency: 440.0, phase: 0.0 });
95/// let id = sched.graph.add_node(osc).unwrap();
96/// sched.graph.set_output_node(id);
97///
98/// // Process one audio block
99/// let mut output = vec![0.0f32; 128];
100/// sched.process_block_simple(&mut output);
101/// ```
102///
103/// # Performance
104///
105/// - Latency: 1.33ms @ 48kHz (64 samples)
106/// - Throughput: 1000+ nodes @ <100µs processing time
107/// - Memory: Pre-allocated arena + buffer pool
108pub struct Scheduler {
109 pub graph: DspGraph,
110 pub sample_rate: f32,
111 pub muted: bool,
112}
113
114impl Scheduler {
115 /// Creates a new scheduler with the given sample rate.
116 ///
117 /// # Arguments
118 ///
119 /// * `sample_rate` - Sample rate in Hz (typically 44100.0 or 48000.0)
120 ///
121 /// # Example
122 ///
123 /// ```
124 /// use aether_core::scheduler::Scheduler;
125 ///
126 /// let sched = Scheduler::new(48_000.0);
127 /// assert_eq!(sched.sample_rate, 48_000.0);
128 /// ```
129 pub fn new(sample_rate: f32) -> Self {
130 Self {
131 graph: DspGraph::new(),
132 sample_rate,
133 muted: false,
134 }
135 }
136
137 /// Processes one audio block with command draining.
138 ///
139 /// Call this from your audio thread (e.g., CPAL stream callback).
140 /// It drains up to `MAX_COMMANDS_PER_TICK` commands from the ring buffer,
141 /// applies them to the graph, then processes all nodes in topological order.
142 ///
143 /// # Arguments
144 ///
145 /// * `cmd_consumer` - SPSC consumer for control commands from UI/control thread
146 /// * `output` - Interleaved stereo output buffer (length = BUFFER_SIZE * 2)
147 ///
148 /// # Real-Time Safety
149 ///
150 /// This function is real-time safe:
151 /// - No allocations
152 /// - No locks (uses lock-free SPSC ring)
153 /// - Bounded execution time
154 /// - Parallel node execution within BFS levels
155 ///
156 /// # Example
157 ///
158 /// ```no_run
159 /// use aether_core::scheduler::Scheduler;
160 /// use aether_core::command::Command;
161 /// use ringbuf::{HeapRb, traits::Split};
162 ///
163 /// let mut sched = Scheduler::new(48_000.0);
164 /// let (mut producer, mut consumer) = HeapRb::<Command>::new(1024).split();
165 ///
166 /// // In audio thread callback:
167 /// let mut output = vec![0.0f32; 128]; // 64 frames * 2 channels
168 /// sched.process_block(&mut consumer, &mut output);
169 /// ```
170 ///
171 /// # See Also
172 ///
173 /// * [`process_block_simple`](Self::process_block_simple) - Simplified version without command ring
174 pub fn process_block<C>(&mut self, cmd_consumer: &mut C, output: &mut [f32])
175 where
176 C: Consumer<Item = Command>,
177 {
178 let mut processed = 0;
179 while processed < MAX_COMMANDS_PER_TICK {
180 match cmd_consumer.try_pop() {
181 Some(cmd) => { self.apply_command(cmd); processed += 1; }
182 None => break,
183 }
184 }
185 self.process_graph(output);
186 }
187
188 /// Processes one audio block without command draining.
189 ///
190 /// Simplified version of [`process_block`](Self::process_block) that doesn't
191 /// drain commands from a ring buffer. Use this when the scheduler is shared
192 /// via `Arc<Mutex<>>` and the control thread mutates it directly.
193 ///
194 /// # Arguments
195 ///
196 /// * `output` - Interleaved stereo output buffer (length = BUFFER_SIZE * 2)
197 ///
198 /// # Real-Time Safety
199 ///
200 /// This function is real-time safe:
201 /// - No allocations
202 /// - No locks (assumes caller holds lock)
203 /// - Bounded execution time
204 ///
205 /// # Example
206 ///
207 /// ```
208 /// use aether_core::scheduler::Scheduler;
209 /// use aether_core::BUFFER_SIZE;
210 ///
211 /// let mut sched = Scheduler::new(48_000.0);
212 ///
213 /// // Process one block
214 /// let mut output = vec![0.0f32; BUFFER_SIZE * 2];
215 /// sched.process_block_simple(&mut output);
216 /// ```
217 ///
218 /// # See Also
219 ///
220 /// * [`process_block`](Self::process_block) - Version with command ring buffer
221 pub fn process_block_simple(&mut self, output: &mut [f32]) {
222 self.process_graph(output);
223 }
224
225 fn process_graph(&mut self, output: &mut [f32]) {
226 let sr = self.sample_rate;
227 let level_count = self.graph.levels.len();
228
229 for level_idx in 0..level_count {
230 let level_len = self.graph.levels[level_idx].len();
231
232 if level_len == 0 {
233 continue;
234 } else if level_len == 1 {
235 // Zero-overhead path: single node, no Rayon overhead.
236 let node_id = self.graph.levels[level_idx][0];
237 self.process_node(node_id, sr);
238 } else {
239 // Parallel path: collect raw pointers while holding &mut self,
240 // then dispatch DSP work in parallel via rayon::scope.
241 //
242 // SAFETY: Within a BFS level, every node writes to a distinct
243 // output buffer (disjoint BufferId). The BufferPool stores buffers
244 // in a flat Vec; tasks write to non-overlapping index ranges.
245 // Each processor and ParamBlock belongs to exactly one node.
246 let mut tasks: Vec<NodeTask> = Vec::with_capacity(level_len);
247
248 for i in 0..level_len {
249 let node_id = self.graph.levels[level_idx][i];
250 let mut input_ptrs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS] =
251 [None; MAX_INPUTS];
252
253 if let Some(record) = self.graph.arena.get(node_id) {
254 for (slot, maybe_src) in record.inputs.iter().enumerate() {
255 if let Some(src_id) = maybe_src {
256 if let Some(src_record) = self.graph.arena.get(*src_id) {
257 input_ptrs[slot] = Some(
258 self.graph.buffers.get(src_record.output_buffer)
259 as *const [f32; BUFFER_SIZE],
260 );
261 }
262 }
263 }
264 let record_mut = self.graph.arena.get_mut(node_id).unwrap();
265 let output_buf_ptr = self.graph.buffers.get_mut(record_mut.output_buffer)
266 as *mut [f32; BUFFER_SIZE];
267 let params_ptr = &mut record_mut.params as *mut ParamBlock;
268 let processor_ptr = &mut *record_mut.processor as *mut dyn DspNode;
269
270 tasks.push(NodeTask {
271 output_buf_ptr,
272 params_ptr,
273 processor_ptr,
274 inputs: input_ptrs,
275 });
276 }
277 }
278
279 // SAFETY: each element of `tasks` points to disjoint memory.
280 // We pass a raw pointer per task so each closure captures a
281 // distinct non-aliasing pointer.
282 rayon::scope(|s| {
283 for task in tasks.iter_mut() {
284 // Capture the raw pointer value (usize) to avoid the
285 // borrow checker complaining about &mut Vec element borrows.
286 let ptr = task as *mut NodeTask as usize;
287 s.spawn(move |_| {
288 // SAFETY: ptr is a valid, exclusively-owned NodeTask.
289 let t: &mut NodeTask = unsafe { &mut *(ptr as *mut NodeTask) };
290 let inputs: [Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS] =
291 t.inputs.map(|p| p.map(|raw| unsafe { &*raw }));
292 unsafe {
293 (*t.processor_ptr).process(
294 &inputs,
295 &mut *t.output_buf_ptr,
296 &mut *t.params_ptr,
297 sr,
298 );
299 }
300 });
301 }
302 });
303 }
304 }
305
306 // Copy output node buffer to DAC
307 if self.muted {
308 output.fill(0.0);
309 return;
310 }
311 if let Some(out_id) = self.graph.output_node {
312 if let Some(record) = self.graph.arena.get(out_id) {
313 let buf = self.graph.buffers.get(record.output_buffer);
314 let frames = output.len() / 2;
315 for i in 0..frames.min(BUFFER_SIZE) {
316 output[i * 2] = buf[i];
317 output[i * 2 + 1] = buf[i];
318 }
319 }
320 } else {
321 // INVARIANT: empty graph → silence.
322 output.fill(0.0);
323 }
324 }
325
326 /// Process a single node on the calling thread.
327 fn process_node(&mut self, node_id: NodeId, sample_rate: f32) {
328 let mut input_ptrs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS] = [None; MAX_INPUTS];
329
330 if let Some(record) = self.graph.arena.get(node_id) {
331 for (slot, maybe_src) in record.inputs.iter().enumerate() {
332 if let Some(src_id) = maybe_src {
333 if let Some(src_record) = self.graph.arena.get(*src_id) {
334 input_ptrs[slot] = Some(
335 self.graph.buffers.get(src_record.output_buffer)
336 as *const [f32; BUFFER_SIZE],
337 );
338 }
339 }
340 }
341 } else {
342 return;
343 }
344
345 let (output_buf_id, params_ptr, processor_ptr) = {
346 let record = self.graph.arena.get_mut(node_id).unwrap();
347 (
348 record.output_buffer,
349 &mut record.params as *mut ParamBlock,
350 &mut *record.processor as *mut dyn crate::node::DspNode,
351 )
352 };
353
354 let output_buf = self.graph.buffers.get_mut(output_buf_id);
355 let inputs: [Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS] =
356 input_ptrs.map(|p| p.map(|ptr| unsafe { &*ptr }));
357
358 unsafe {
359 (*processor_ptr).process(&inputs, output_buf, &mut *params_ptr, sample_rate);
360 }
361 }
362
363 fn apply_command(&mut self, cmd: Command) {
364 match cmd {
365 Command::AddNode { id } => { let _ = id; }
366 Command::RemoveNode { id } => { self.graph.remove_node(id); }
367 Command::Connect { src, dst, slot } => { self.graph.connect(src, dst, slot); }
368 Command::Disconnect { dst, slot } => { self.graph.disconnect(dst, slot); }
369 Command::UpdateParam { node, param_index, new_param } => {
370 if let Some(record) = self.graph.arena.get_mut(node) {
371 if param_index < record.params.count {
372 record.params.params[param_index] = new_param;
373 }
374 }
375 }
376 Command::SetOutputNode { id } => { self.graph.set_output_node(id); }
377 Command::SetMute { muted } => { self.muted = muted; }
378 Command::ClearGraph => {
379 let ids: Vec<_> = self.graph.execution_order.clone();
380 for id in ids { self.graph.remove_node(id); }
381 self.graph.output_node = None;
382 }
383 }
384 }
385
386 /// Reference sequential implementation for testing.
387 /// Processes nodes in flat execution_order without parallelism.
388 #[cfg(test)]
389 fn process_graph_sequential(&mut self, output: &mut [f32]) {
390 let sr = self.sample_rate;
391
392 // Collect execution order into a local Vec to avoid borrow conflict
393 // between the immutable borrow of execution_order and the mutable
394 // borrow inside process_node.
395 let order: Vec<NodeId> = self.graph.execution_order.clone();
396 for &node_id in &order {
397 self.process_node(node_id, sr);
398 }
399
400 // Copy output node buffer to DAC
401 if self.muted {
402 output.fill(0.0);
403 return;
404 }
405 if let Some(out_id) = self.graph.output_node {
406 if let Some(record) = self.graph.arena.get(out_id) {
407 let buf = self.graph.buffers.get(record.output_buffer);
408 let frames = output.len() / 2;
409 for i in 0..frames.min(BUFFER_SIZE) {
410 output[i * 2] = buf[i];
411 output[i * 2 + 1] = buf[i];
412 }
413 }
414 } else {
415 output.fill(0.0);
416 }
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423 use crate::node::DspNode;
424 use proptest::prelude::*;
425
426 /// Minimal deterministic test node for property testing.
427 /// Sums all inputs and multiplies by a fixed gain.
428 struct TestNode {
429 gain: f32,
430 }
431
432 impl TestNode {
433 fn new(gain: f32) -> Self {
434 Self { gain }
435 }
436 }
437
438 impl DspNode for TestNode {
439 fn process(
440 &mut self,
441 inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
442 output: &mut [f32; BUFFER_SIZE],
443 _params: &mut ParamBlock,
444 _sample_rate: f32,
445 ) {
446 output.fill(0.0);
447 for input_opt in inputs.iter() {
448 if let Some(input) = input_opt {
449 for i in 0..BUFFER_SIZE {
450 output[i] += input[i] * self.gain;
451 }
452 }
453 }
454 }
455
456 fn type_name(&self) -> &'static str {
457 "TestNode"
458 }
459 }
460
461 // Property 1
462 proptest! {
463 /// **Validates: Requirements 1.1, 1.4**
464 ///
465 /// Feature: aether-engine-upgrades, Property 1: parallel execution is output-equivalent
466 ///
467 /// Property 1: Parallel execution is output-equivalent to sequential execution.
468 ///
469 /// For any valid DSP patch (any combination of nodes and edges forming a valid DAG),
470 /// processing a block with the parallel Rayon scheduler SHALL produce a bit-identical
471 /// output buffer to processing the same block with the original sequential scheduler,
472 /// given the same initial node state and the same input.
473 #[test]
474 fn prop_parallel_equiv_sequential(
475 num_nodes in 1usize..=20,
476 edges in prop::collection::vec((0usize..20, 0usize..20, 0usize..MAX_INPUTS), 0..50),
477 seed in any::<u64>(),
478 ) {
479 // Create two identical schedulers
480 let mut scheduler_parallel = Scheduler::new(48000.0);
481 let mut scheduler_sequential = Scheduler::new(48000.0);
482
483 let mut node_ids = Vec::new();
484
485 // Add nodes to both schedulers with deterministic gains based on seed
486 for i in 0..num_nodes {
487 let gain = ((seed.wrapping_add(i as u64) % 100) as f32) / 100.0;
488
489 let id1 = scheduler_parallel.graph.add_node(Box::new(TestNode::new(gain)));
490 let id2 = scheduler_sequential.graph.add_node(Box::new(TestNode::new(gain)));
491
492 if let (Some(id1), Some(id2)) = (id1, id2) {
493 // Verify both schedulers assigned the same NodeId
494 prop_assert_eq!(id1.index, id2.index);
495 prop_assert_eq!(id1.generation, id2.generation);
496 node_ids.push(id1);
497 }
498 }
499
500 // Add edges to both schedulers (filter to maintain DAG invariant: src < dst)
501 for (src_idx, dst_idx, slot) in edges {
502 if src_idx < num_nodes && dst_idx < num_nodes && src_idx < dst_idx {
503 let src = node_ids[src_idx];
504 let dst = node_ids[dst_idx];
505
506 scheduler_parallel.graph.connect(src, dst, slot);
507 scheduler_sequential.graph.connect(src, dst, slot);
508 }
509 }
510
511 // Set output node to the last node if we have any nodes
512 if !node_ids.is_empty() {
513 let output_node = node_ids[num_nodes - 1];
514 scheduler_parallel.graph.set_output_node(output_node);
515 scheduler_sequential.graph.set_output_node(output_node);
516 }
517
518 // Prepare output buffers (stereo, 64 frames = 128 samples)
519 let mut output_parallel = vec![0.0f32; BUFFER_SIZE * 2];
520 let mut output_sequential = vec![0.0f32; BUFFER_SIZE * 2];
521
522 // Process one block with both schedulers
523 scheduler_parallel.process_graph(&mut output_parallel);
524 scheduler_sequential.process_graph_sequential(&mut output_sequential);
525
526 // Assert bit-identical output
527 for (i, (&p, &s)) in output_parallel.iter().zip(output_sequential.iter()).enumerate() {
528 prop_assert!(
529 p == s || (p.is_nan() && s.is_nan()),
530 "Output mismatch at sample {}: parallel={}, sequential={}",
531 i, p, s
532 );
533 }
534 }
535 }
536}