aether_core/scheduler.rs
1//! Real-time audio scheduler.
2//!
3//! This is the hot path. It:
4//! 1. Drains bounded commands from the SPSC ring.
5//! 2. Executes the topologically sorted node list level by level.
6//! Nodes within the same BFS level are independent and run in parallel
7//! via Rayon's work-stealing thread pool.
8//! 3. Copies the output node's buffer to the DAC output.
9//!
10//! HARD RT RULES enforced here:
11//! - No allocation (Vec<NodeTask> is pre-allocated per level, bounded by MAX_NODES)
12//! - No locks
13//! - No I/O
14//! - No unbounded loops
15
16use ringbuf::traits::Consumer;
17
18use crate::{
19 arena::NodeId,
20 command::Command,
21 graph::DspGraph,
22 node::DspNode,
23 param::ParamBlock,
24 BUFFER_SIZE, MAX_COMMANDS_PER_TICK, MAX_INPUTS,
25};
26
27// ── Parallel dispatch helpers ─────────────────────────────────────────────────
28
29/// Per-node data bundle collected before parallel dispatch.
30///
31/// SAFETY INVARIANT: Within a single BFS level, every node writes to a distinct
32/// `BufferId` (guaranteed by the DAG structure — no two nodes in the same level
33/// share an output buffer). The `BufferPool` stores buffers in a flat `Vec`, so
34/// tasks writing to different `BufferId`s write to non-overlapping index ranges.
35/// This makes the concurrent writes safe despite using raw pointers.
36struct NodeTask {
37 output_buf_ptr: *mut [f32; BUFFER_SIZE],
38 params_ptr: *mut ParamBlock,
39 processor_ptr: *mut dyn DspNode,
40 inputs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS],
41}
42
43/// SAFETY: Within a BFS level each task accesses disjoint memory:
44/// - distinct output buffer (different BufferId → different Vec index range)
45/// - distinct processor and params (each belongs to exactly one NodeRecord)
46///
47/// No two tasks in the same level share any pointed-to memory.
48unsafe impl Send for NodeTask {}
49unsafe impl Sync for NodeTask {}
50
51// ── Scheduler ─────────────────────────────────────────────────────────────────
52
53/// Real-time audio scheduler.
54///
55/// The scheduler owns the DSP graph and processes audio in fixed-size blocks
56/// (64 samples by default). It executes nodes in topologically sorted order,
57/// with nodes at the same BFS level running in parallel via Rayon.
58///
59/// # Real-Time Safety
60///
61/// - ✅ No allocation in audio thread
62/// - ✅ No locks in audio thread
63/// - ✅ Bounded execution time
64/// - ✅ Lock-free command processing via SPSC ring
65///
66/// # Example
67///
68/// ```
69/// use aether_core::scheduler::Scheduler;
70/// use aether_core::node::DspNode;
71/// use aether_core::param::ParamBlock;
72/// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
73///
74/// // Create a simple oscillator node
75/// struct Oscillator {
76/// frequency: f32,
77/// phase: f32,
78/// }
79///
80/// impl DspNode for Oscillator {
81/// fn process(&mut self, _inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
82/// output: &mut [f32; BUFFER_SIZE], _params: &mut ParamBlock, sample_rate: f32) {
83/// let phase_inc = self.frequency / sample_rate;
84/// for sample in output.iter_mut() {
85/// *sample = (self.phase * std::f32::consts::TAU).sin() * 0.3;
86/// self.phase = (self.phase + phase_inc).fract();
87/// }
88/// }
89/// fn type_name(&self) -> &'static str { "Oscillator" }
90/// }
91///
92/// // Create scheduler and add node
93/// let mut sched = Scheduler::new(48_000.0);
94/// let osc = Box::new(Oscillator { frequency: 440.0, phase: 0.0 });
95/// let id = sched.graph.add_node(osc).unwrap();
96/// sched.graph.set_output_node(id);
97///
98/// // Process one audio block
99/// let mut output = vec![0.0f32; 128];
100/// sched.process_block_simple(&mut output);
101/// ```
102///
103/// # Performance
104///
105/// - Latency: 1.33ms @ 48kHz (64 samples)
106/// - Throughput: 1000+ nodes @ <100µs processing time
107/// - Memory: Pre-allocated arena + buffer pool
108pub struct Scheduler {
109 pub graph: DspGraph,
110 pub sample_rate: f32,
111 pub muted: bool,
112}
113
114impl Scheduler {
115 /// Creates a new scheduler with the given sample rate.
116 ///
117 /// # Arguments
118 ///
119 /// * `sample_rate` - Sample rate in Hz (typically 44100.0 or 48000.0)
120 ///
121 /// # Example
122 ///
123 /// ```
124 /// use aether_core::scheduler::Scheduler;
125 ///
126 /// let sched = Scheduler::new(48_000.0);
127 /// assert_eq!(sched.sample_rate, 48_000.0);
128 /// ```
129 pub fn new(sample_rate: f32) -> Self {
130 Self {
131 graph: DspGraph::new(),
132 sample_rate,
133 muted: false,
134 }
135 }
136
137 /// Processes one audio block with command draining.
138 ///
139 /// Call this from your audio thread (e.g., CPAL stream callback).
140 /// It drains up to `MAX_COMMANDS_PER_TICK` commands from the ring buffer,
141 /// applies them to the graph, then processes all nodes in topological order.
142 ///
143 /// # Arguments
144 ///
145 /// * `cmd_consumer` - SPSC consumer for control commands from UI/control thread
146 /// * `output` - Interleaved stereo output buffer (length = BUFFER_SIZE * 2)
147 ///
148 /// # Real-Time Safety
149 ///
150 /// This function is real-time safe:
151 /// - No allocations
152 /// - No locks (uses lock-free SPSC ring)
153 /// - Bounded execution time
154 /// - Parallel node execution within BFS levels
155 ///
156 /// # Example
157 ///
158 /// ```no_run
159 /// use aether_core::scheduler::Scheduler;
160 /// use aether_core::command::Command;
161 /// use ringbuf::{HeapRb, traits::Split};
162 ///
163 /// let mut sched = Scheduler::new(48_000.0);
164 /// let (mut producer, mut consumer) = HeapRb::<Command>::new(1024).split();
165 ///
166 /// // In audio thread callback:
167 /// let mut output = vec![0.0f32; 128]; // 64 frames * 2 channels
168 /// sched.process_block(&mut consumer, &mut output);
169 /// ```
170 ///
171 /// # See Also
172 ///
173 /// * [`process_block_simple`](Self::process_block_simple) - Simplified version without command ring
174 pub fn process_block<C>(&mut self, cmd_consumer: &mut C, output: &mut [f32])
175 where
176 C: Consumer<Item = Command>,
177 {
178 let mut processed = 0;
179 while processed < MAX_COMMANDS_PER_TICK {
180 match cmd_consumer.try_pop() {
181 Some(cmd) => { self.apply_command(cmd); processed += 1; }
182 None => break,
183 }
184 }
185 self.process_graph(output);
186 }
187
188 /// Processes one audio block without command draining.
189 ///
190 /// Simplified version of [`process_block`](Self::process_block) that doesn't
191 /// drain commands from a ring buffer. Use this when the scheduler is shared
192 /// via `Arc<Mutex<>>` and the control thread mutates it directly.
193 ///
194 /// # Arguments
195 ///
196 /// * `output` - Interleaved stereo output buffer (length = BUFFER_SIZE * 2)
197 ///
198 /// # Real-Time Safety
199 ///
200 /// This function is real-time safe:
201 /// - No allocations
202 /// - No locks (assumes caller holds lock)
203 /// - Bounded execution time
204 ///
205 /// # Example
206 ///
207 /// ```
208 /// use aether_core::scheduler::Scheduler;
209 /// use aether_core::BUFFER_SIZE;
210 ///
211 /// let mut sched = Scheduler::new(48_000.0);
212 ///
213 /// // Process one block
214 /// let mut output = vec![0.0f32; BUFFER_SIZE * 2];
215 /// sched.process_block_simple(&mut output);
216 /// ```
217 ///
218 /// # See Also
219 ///
220 /// * [`process_block`](Self::process_block) - Version with command ring buffer
221 pub fn process_block_simple(&mut self, output: &mut [f32]) {
222 self.process_graph(output);
223 }
224
225 fn process_graph(&mut self, output: &mut [f32]) {
226 let sr = self.sample_rate;
227 let level_count = self.graph.levels.len();
228
229 for level_idx in 0..level_count {
230 let level_len = self.graph.levels[level_idx].len();
231
232 if level_len == 0 {
233 continue;
234 } else if level_len == 1 {
235 // Zero-overhead path: single node, no Rayon overhead.
236 let node_id = self.graph.levels[level_idx][0];
237 self.process_node(node_id, sr);
238 } else {
239 // Parallel path: collect raw pointers while holding &mut self,
240 // then dispatch DSP work in parallel via rayon::scope.
241 //
242 // SAFETY: Within a BFS level, every node writes to a distinct
243 // output buffer (disjoint BufferId). The BufferPool stores buffers
244 // in a flat Vec; tasks write to non-overlapping index ranges.
245 // Each processor and ParamBlock belongs to exactly one node.
246 let mut tasks: Vec<NodeTask> = Vec::with_capacity(level_len);
247
248 for i in 0..level_len {
249 let node_id = self.graph.levels[level_idx][i];
250 let mut input_ptrs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS] =
251 [None; MAX_INPUTS];
252
253 if let Some(record) = self.graph.arena.get(node_id) {
254 for (slot, maybe_src) in record.inputs.iter().enumerate() {
255 if let Some(src_id) = maybe_src {
256 if let Some(src_record) = self.graph.arena.get(*src_id) {
257 input_ptrs[slot] = Some(
258 self.graph.buffers.get(src_record.output_buffer)
259 as *const [f32; BUFFER_SIZE],
260 );
261 }
262 }
263 }
264 let record_mut = self.graph.arena.get_mut(node_id).unwrap();
265 let output_buf_ptr = self.graph.buffers.get_mut(record_mut.output_buffer)
266 as *mut [f32; BUFFER_SIZE];
267 let params_ptr = &mut record_mut.params as *mut ParamBlock;
268 let processor_ptr = &mut *record_mut.processor as *mut dyn DspNode;
269
270 tasks.push(NodeTask {
271 output_buf_ptr,
272 params_ptr,
273 processor_ptr,
274 inputs: input_ptrs,
275 });
276 }
277 }
278
279 // SAFETY: each element of `tasks` points to disjoint memory.
280 // We pass a raw pointer per task so each closure captures a
281 // distinct non-aliasing pointer.
282 #[cfg(feature = "parallel")]
283 {
284 rayon::scope(|s| {
285 for task in tasks.iter_mut() {
286 // Capture the raw pointer value (usize) to avoid the
287 // borrow checker complaining about &mut Vec element borrows.
288 let ptr = task as *mut NodeTask as usize;
289 s.spawn(move |_| {
290 // SAFETY: ptr is a valid, exclusively-owned NodeTask.
291 let t: &mut NodeTask = unsafe { &mut *(ptr as *mut NodeTask) };
292 let inputs: [Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS] =
293 t.inputs.map(|p| p.map(|raw| unsafe { &*raw }));
294 unsafe {
295 (*t.processor_ptr).process(
296 &inputs,
297 &mut *t.output_buf_ptr,
298 &mut *t.params_ptr,
299 sr,
300 );
301 }
302 });
303 }
304 });
305 }
306
307 // Sequential fallback when parallel feature is disabled
308 #[cfg(not(feature = "parallel"))]
309 {
310 for task in tasks.iter_mut() {
311 let inputs: [Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS] =
312 task.inputs.map(|p| p.map(|raw| unsafe { &*raw }));
313 unsafe {
314 (*task.processor_ptr).process(
315 &inputs,
316 &mut *task.output_buf_ptr,
317 &mut *task.params_ptr,
318 sr,
319 );
320 }
321 }
322 }
323 }
324 }
325
326 // Copy output node buffer to DAC
327 if self.muted {
328 output.fill(0.0);
329 return;
330 }
331 if let Some(out_id) = self.graph.output_node {
332 if let Some(record) = self.graph.arena.get(out_id) {
333 let buf = self.graph.buffers.get(record.output_buffer);
334 let frames = output.len() / 2;
335 for i in 0..frames.min(BUFFER_SIZE) {
336 output[i * 2] = buf[i];
337 output[i * 2 + 1] = buf[i];
338 }
339 }
340 } else {
341 // INVARIANT: empty graph → silence.
342 output.fill(0.0);
343 }
344 }
345
346 /// Process a single node on the calling thread.
347 fn process_node(&mut self, node_id: NodeId, sample_rate: f32) {
348 let mut input_ptrs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS] = [None; MAX_INPUTS];
349
350 if let Some(record) = self.graph.arena.get(node_id) {
351 for (slot, maybe_src) in record.inputs.iter().enumerate() {
352 if let Some(src_id) = maybe_src {
353 if let Some(src_record) = self.graph.arena.get(*src_id) {
354 input_ptrs[slot] = Some(
355 self.graph.buffers.get(src_record.output_buffer)
356 as *const [f32; BUFFER_SIZE],
357 );
358 }
359 }
360 }
361 } else {
362 return;
363 }
364
365 let (output_buf_id, params_ptr, processor_ptr) = {
366 let record = self.graph.arena.get_mut(node_id).unwrap();
367 (
368 record.output_buffer,
369 &mut record.params as *mut ParamBlock,
370 &mut *record.processor as *mut dyn crate::node::DspNode,
371 )
372 };
373
374 let output_buf = self.graph.buffers.get_mut(output_buf_id);
375 let inputs: [Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS] =
376 input_ptrs.map(|p| p.map(|ptr| unsafe { &*ptr }));
377
378 unsafe {
379 (*processor_ptr).process(&inputs, output_buf, &mut *params_ptr, sample_rate);
380 }
381 }
382
383 fn apply_command(&mut self, cmd: Command) {
384 match cmd {
385 Command::AddNode { id } => { let _ = id; }
386 Command::RemoveNode { id } => { self.graph.remove_node(id); }
387 Command::Connect { src, dst, slot } => { self.graph.connect(src, dst, slot); }
388 Command::Disconnect { dst, slot } => { self.graph.disconnect(dst, slot); }
389 Command::UpdateParam { node, param_index, new_param } => {
390 if let Some(record) = self.graph.arena.get_mut(node) {
391 if param_index < record.params.count {
392 record.params.params[param_index] = new_param;
393 }
394 }
395 }
396 Command::SetOutputNode { id } => { self.graph.set_output_node(id); }
397 Command::SetMute { muted } => { self.muted = muted; }
398 Command::ClearGraph => {
399 let ids: Vec<_> = self.graph.execution_order.clone();
400 for id in ids { self.graph.remove_node(id); }
401 self.graph.output_node = None;
402 }
403 }
404 }
405
406 /// Reference sequential implementation for testing.
407 /// Processes nodes in flat execution_order without parallelism.
408 #[cfg(test)]
409 fn process_graph_sequential(&mut self, output: &mut [f32]) {
410 let sr = self.sample_rate;
411
412 // Collect execution order into a local Vec to avoid borrow conflict
413 // between the immutable borrow of execution_order and the mutable
414 // borrow inside process_node.
415 let order: Vec<NodeId> = self.graph.execution_order.clone();
416 for &node_id in &order {
417 self.process_node(node_id, sr);
418 }
419
420 // Copy output node buffer to DAC
421 if self.muted {
422 output.fill(0.0);
423 return;
424 }
425 if let Some(out_id) = self.graph.output_node {
426 if let Some(record) = self.graph.arena.get(out_id) {
427 let buf = self.graph.buffers.get(record.output_buffer);
428 let frames = output.len() / 2;
429 for i in 0..frames.min(BUFFER_SIZE) {
430 output[i * 2] = buf[i];
431 output[i * 2 + 1] = buf[i];
432 }
433 }
434 } else {
435 output.fill(0.0);
436 }
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443 use crate::node::DspNode;
444 use proptest::prelude::*;
445
446 /// Minimal deterministic test node for property testing.
447 /// Sums all inputs and multiplies by a fixed gain.
448 struct TestNode {
449 gain: f32,
450 }
451
452 impl TestNode {
453 fn new(gain: f32) -> Self {
454 Self { gain }
455 }
456 }
457
458 impl DspNode for TestNode {
459 fn process(
460 &mut self,
461 inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
462 output: &mut [f32; BUFFER_SIZE],
463 _params: &mut ParamBlock,
464 _sample_rate: f32,
465 ) {
466 output.fill(0.0);
467 for input_opt in inputs.iter() {
468 if let Some(input) = input_opt {
469 for i in 0..BUFFER_SIZE {
470 output[i] += input[i] * self.gain;
471 }
472 }
473 }
474 }
475
476 fn type_name(&self) -> &'static str {
477 "TestNode"
478 }
479 }
480
481 // Property 1
482 proptest! {
483 /// **Validates: Requirements 1.1, 1.4**
484 ///
485 /// Feature: aether-engine-upgrades, Property 1: parallel execution is output-equivalent
486 ///
487 /// Property 1: Parallel execution is output-equivalent to sequential execution.
488 ///
489 /// For any valid DSP patch (any combination of nodes and edges forming a valid DAG),
490 /// processing a block with the parallel Rayon scheduler SHALL produce a bit-identical
491 /// output buffer to processing the same block with the original sequential scheduler,
492 /// given the same initial node state and the same input.
493 #[test]
494 fn prop_parallel_equiv_sequential(
495 num_nodes in 1usize..=20,
496 edges in prop::collection::vec((0usize..20, 0usize..20, 0usize..MAX_INPUTS), 0..50),
497 seed in any::<u64>(),
498 ) {
499 // Create two identical schedulers
500 let mut scheduler_parallel = Scheduler::new(48000.0);
501 let mut scheduler_sequential = Scheduler::new(48000.0);
502
503 let mut node_ids = Vec::new();
504
505 // Add nodes to both schedulers with deterministic gains based on seed
506 for i in 0..num_nodes {
507 let gain = ((seed.wrapping_add(i as u64) % 100) as f32) / 100.0;
508
509 let id1 = scheduler_parallel.graph.add_node(Box::new(TestNode::new(gain)));
510 let id2 = scheduler_sequential.graph.add_node(Box::new(TestNode::new(gain)));
511
512 if let (Some(id1), Some(id2)) = (id1, id2) {
513 // Verify both schedulers assigned the same NodeId
514 prop_assert_eq!(id1.index, id2.index);
515 prop_assert_eq!(id1.generation, id2.generation);
516 node_ids.push(id1);
517 }
518 }
519
520 // Add edges to both schedulers (filter to maintain DAG invariant: src < dst)
521 for (src_idx, dst_idx, slot) in edges {
522 if src_idx < num_nodes && dst_idx < num_nodes && src_idx < dst_idx {
523 let src = node_ids[src_idx];
524 let dst = node_ids[dst_idx];
525
526 scheduler_parallel.graph.connect(src, dst, slot);
527 scheduler_sequential.graph.connect(src, dst, slot);
528 }
529 }
530
531 // Set output node to the last node if we have any nodes
532 if !node_ids.is_empty() {
533 let output_node = node_ids[num_nodes - 1];
534 scheduler_parallel.graph.set_output_node(output_node);
535 scheduler_sequential.graph.set_output_node(output_node);
536 }
537
538 // Prepare output buffers (stereo, 64 frames = 128 samples)
539 let mut output_parallel = vec![0.0f32; BUFFER_SIZE * 2];
540 let mut output_sequential = vec![0.0f32; BUFFER_SIZE * 2];
541
542 // Process one block with both schedulers
543 scheduler_parallel.process_graph(&mut output_parallel);
544 scheduler_sequential.process_graph_sequential(&mut output_sequential);
545
546 // Assert bit-identical output
547 for (i, (&p, &s)) in output_parallel.iter().zip(output_sequential.iter()).enumerate() {
548 prop_assert!(
549 p == s || (p.is_nan() && s.is_nan()),
550 "Output mismatch at sample {}: parallel={}, sequential={}",
551 i, p, s
552 );
553 }
554 }
555 }
556}