Skip to main content

aether_core/
scheduler.rs

1//! Real-time audio scheduler.
2//!
3//! This is the hot path. It:
4//!   1. Drains bounded commands from the SPSC ring.
5//!   2. Executes the topologically sorted node list level by level.
6//!      Nodes within the same BFS level are independent and run in parallel
7//!      via Rayon's work-stealing thread pool.
8//!   3. Copies the output node's buffer to the DAC output.
9//!
10//! HARD RT RULES enforced here:
11//!   - No allocation (Vec<NodeTask> is pre-allocated per level, bounded by MAX_NODES)
12//!   - No locks
13//!   - No I/O
14//!   - No unbounded loops
15
16use ringbuf::traits::Consumer;
17
18use crate::{
19    arena::NodeId,
20    command::Command,
21    graph::DspGraph,
22    node::DspNode,
23    param::ParamBlock,
24    BUFFER_SIZE, MAX_COMMANDS_PER_TICK, MAX_INPUTS,
25};
26
27// ── Parallel dispatch helpers ─────────────────────────────────────────────────
28
29/// Per-node data bundle collected before parallel dispatch.
30///
31/// SAFETY INVARIANT: Within a single BFS level, every node writes to a distinct
32/// `BufferId` (guaranteed by the DAG structure — no two nodes in the same level
33/// share an output buffer). The `BufferPool` stores buffers in a flat `Vec`, so
34/// tasks writing to different `BufferId`s write to non-overlapping index ranges.
35/// This makes the concurrent writes safe despite using raw pointers.
36struct NodeTask {
37    output_buf_ptr: *mut [f32; BUFFER_SIZE],
38    params_ptr: *mut ParamBlock,
39    processor_ptr: *mut dyn DspNode,
40    inputs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS],
41}
42
43/// SAFETY: Within a BFS level each task accesses disjoint memory:
44/// - distinct output buffer (different BufferId → different Vec index range)
45/// - distinct processor and params (each belongs to exactly one NodeRecord)
46///
47/// No two tasks in the same level share any pointed-to memory.
48unsafe impl Send for NodeTask {}
49unsafe impl Sync for NodeTask {}
50
51// ── Scheduler ─────────────────────────────────────────────────────────────────
52
53/// Real-time audio scheduler.
54///
55/// The scheduler owns the DSP graph and processes audio in fixed-size blocks
56/// (64 samples by default). It executes nodes in topologically sorted order,
57/// with nodes at the same BFS level running in parallel via Rayon.
58///
59/// # Real-Time Safety
60///
61/// - ✅ No allocation in audio thread
62/// - ✅ No locks in audio thread
63/// - ✅ Bounded execution time
64/// - ✅ Lock-free command processing via SPSC ring
65///
66/// # Example
67///
68/// ```
69/// use aether_core::scheduler::Scheduler;
70/// use aether_core::node::DspNode;
71/// use aether_core::param::ParamBlock;
72/// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
73///
74/// // Create a simple oscillator node
75/// struct Oscillator {
76///     frequency: f32,
77///     phase: f32,
78/// }
79///
80/// impl DspNode for Oscillator {
81///     fn process(&mut self, _inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
82///                output: &mut [f32; BUFFER_SIZE], _params: &mut ParamBlock, sample_rate: f32) {
83///         let phase_inc = self.frequency / sample_rate;
84///         for sample in output.iter_mut() {
85///             *sample = (self.phase * std::f32::consts::TAU).sin() * 0.3;
86///             self.phase = (self.phase + phase_inc).fract();
87///         }
88///     }
89///     fn type_name(&self) -> &'static str { "Oscillator" }
90/// }
91///
92/// // Create scheduler and add node
93/// let mut sched = Scheduler::new(48_000.0);
94/// let osc = Box::new(Oscillator { frequency: 440.0, phase: 0.0 });
95/// let id = sched.graph.add_node(osc).unwrap();
96/// sched.graph.set_output_node(id);
97///
98/// // Process one audio block
99/// let mut output = vec![0.0f32; 128];
100/// sched.process_block_simple(&mut output);
101/// ```
102///
103/// # Performance
104///
105/// - Latency: 1.33ms @ 48kHz (64 samples)
106/// - Throughput: 1000+ nodes @ <100µs processing time
107/// - Memory: Pre-allocated arena + buffer pool
108pub struct Scheduler {
109    pub graph: DspGraph,
110    pub sample_rate: f32,
111    pub muted: bool,
112}
113
114impl Scheduler {
115    /// Creates a new scheduler with the given sample rate.
116    ///
117    /// # Arguments
118    ///
119    /// * `sample_rate` - Sample rate in Hz (typically 44100.0 or 48000.0)
120    ///
121    /// # Example
122    ///
123    /// ```
124    /// use aether_core::scheduler::Scheduler;
125    ///
126    /// let sched = Scheduler::new(48_000.0);
127    /// assert_eq!(sched.sample_rate, 48_000.0);
128    /// ```
129    pub fn new(sample_rate: f32) -> Self {
130        Self {
131            graph: DspGraph::new(),
132            sample_rate,
133            muted: false,
134        }
135    }
136
137    /// Processes one audio block with command draining.
138    ///
139    /// Call this from your audio thread (e.g., CPAL stream callback).
140    /// It drains up to `MAX_COMMANDS_PER_TICK` commands from the ring buffer,
141    /// applies them to the graph, then processes all nodes in topological order.
142    ///
143    /// # Arguments
144    ///
145    /// * `cmd_consumer` - SPSC consumer for control commands from UI/control thread
146    /// * `output` - Interleaved stereo output buffer (length = BUFFER_SIZE * 2)
147    ///
148    /// # Real-Time Safety
149    ///
150    /// This function is real-time safe:
151    /// - No allocations
152    /// - No locks (uses lock-free SPSC ring)
153    /// - Bounded execution time
154    /// - Parallel node execution within BFS levels
155    ///
156    /// # Example
157    ///
158    /// ```no_run
159    /// use aether_core::scheduler::Scheduler;
160    /// use aether_core::command::Command;
161    /// use ringbuf::{HeapRb, traits::Split};
162    ///
163    /// let mut sched = Scheduler::new(48_000.0);
164    /// let (mut producer, mut consumer) = HeapRb::<Command>::new(1024).split();
165    ///
166    /// // In audio thread callback:
167    /// let mut output = vec![0.0f32; 128]; // 64 frames * 2 channels
168    /// sched.process_block(&mut consumer, &mut output);
169    /// ```
170    ///
171    /// # See Also
172    ///
173    /// * [`process_block_simple`](Self::process_block_simple) - Simplified version without command ring
174    pub fn process_block<C>(&mut self, cmd_consumer: &mut C, output: &mut [f32])
175    where
176        C: Consumer<Item = Command>,
177    {
178        let mut processed = 0;
179        while processed < MAX_COMMANDS_PER_TICK {
180            match cmd_consumer.try_pop() {
181                Some(cmd) => { self.apply_command(cmd); processed += 1; }
182                None => break,
183            }
184        }
185        self.process_graph(output);
186    }
187
188    /// Processes one audio block without command draining.
189    ///
190    /// Simplified version of [`process_block`](Self::process_block) that doesn't
191    /// drain commands from a ring buffer. Use this when the scheduler is shared
192    /// via `Arc<Mutex<>>` and the control thread mutates it directly.
193    ///
194    /// # Arguments
195    ///
196    /// * `output` - Interleaved stereo output buffer (length = BUFFER_SIZE * 2)
197    ///
198    /// # Real-Time Safety
199    ///
200    /// This function is real-time safe:
201    /// - No allocations
202    /// - No locks (assumes caller holds lock)
203    /// - Bounded execution time
204    ///
205    /// # Example
206    ///
207    /// ```
208    /// use aether_core::scheduler::Scheduler;
209    /// use aether_core::BUFFER_SIZE;
210    ///
211    /// let mut sched = Scheduler::new(48_000.0);
212    ///
213    /// // Process one block
214    /// let mut output = vec![0.0f32; BUFFER_SIZE * 2];
215    /// sched.process_block_simple(&mut output);
216    /// ```
217    ///
218    /// # See Also
219    ///
220    /// * [`process_block`](Self::process_block) - Version with command ring buffer
221    pub fn process_block_simple(&mut self, output: &mut [f32]) {
222        self.process_graph(output);
223    }
224
225    fn process_graph(&mut self, output: &mut [f32]) {
226        let sr = self.sample_rate;
227        let level_count = self.graph.levels.len();
228
229        for level_idx in 0..level_count {
230            let level_len = self.graph.levels[level_idx].len();
231
232            if level_len == 0 {
233                continue;
234            } else if level_len == 1 {
235                // Zero-overhead path: single node, no Rayon overhead.
236                let node_id = self.graph.levels[level_idx][0];
237                self.process_node(node_id, sr);
238            } else {
239                // Parallel path: collect raw pointers while holding &mut self,
240                // then dispatch DSP work in parallel via rayon::scope.
241                //
242                // SAFETY: Within a BFS level, every node writes to a distinct
243                // output buffer (disjoint BufferId). The BufferPool stores buffers
244                // in a flat Vec; tasks write to non-overlapping index ranges.
245                // Each processor and ParamBlock belongs to exactly one node.
246                let mut tasks: Vec<NodeTask> = Vec::with_capacity(level_len);
247
248                for i in 0..level_len {
249                    let node_id = self.graph.levels[level_idx][i];
250                    let mut input_ptrs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS] =
251                        [None; MAX_INPUTS];
252
253                    if let Some(record) = self.graph.arena.get(node_id) {
254                        for (slot, maybe_src) in record.inputs.iter().enumerate() {
255                            if let Some(src_id) = maybe_src {
256                                if let Some(src_record) = self.graph.arena.get(*src_id) {
257                                    input_ptrs[slot] = Some(
258                                        self.graph.buffers.get(src_record.output_buffer)
259                                            as *const [f32; BUFFER_SIZE],
260                                    );
261                                }
262                            }
263                        }
264                        let record_mut = self.graph.arena.get_mut(node_id).unwrap();
265                        let output_buf_ptr = self.graph.buffers.get_mut(record_mut.output_buffer)
266                            as *mut [f32; BUFFER_SIZE];
267                        let params_ptr = &mut record_mut.params as *mut ParamBlock;
268                        let processor_ptr = &mut *record_mut.processor as *mut dyn DspNode;
269
270                        tasks.push(NodeTask {
271                            output_buf_ptr,
272                            params_ptr,
273                            processor_ptr,
274                            inputs: input_ptrs,
275                        });
276                    }
277                }
278
279                // SAFETY: each element of `tasks` points to disjoint memory.
280                // We pass a raw pointer per task so each closure captures a
281                // distinct non-aliasing pointer.
282                rayon::scope(|s| {
283                    for task in tasks.iter_mut() {
284                        // Capture the raw pointer value (usize) to avoid the
285                        // borrow checker complaining about &mut Vec element borrows.
286                        let ptr = task as *mut NodeTask as usize;
287                        s.spawn(move |_| {
288                            // SAFETY: ptr is a valid, exclusively-owned NodeTask.
289                            let t: &mut NodeTask = unsafe { &mut *(ptr as *mut NodeTask) };
290                            let inputs: [Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS] =
291                                t.inputs.map(|p| p.map(|raw| unsafe { &*raw }));
292                            unsafe {
293                                (*t.processor_ptr).process(
294                                    &inputs,
295                                    &mut *t.output_buf_ptr,
296                                    &mut *t.params_ptr,
297                                    sr,
298                                );
299                            }
300                        });
301                    }
302                });
303            }
304        }
305
306        // Copy output node buffer to DAC
307        if self.muted {
308            output.fill(0.0);
309            return;
310        }
311        if let Some(out_id) = self.graph.output_node {
312            if let Some(record) = self.graph.arena.get(out_id) {
313                let buf = self.graph.buffers.get(record.output_buffer);
314                let frames = output.len() / 2;
315                for i in 0..frames.min(BUFFER_SIZE) {
316                    output[i * 2] = buf[i];
317                    output[i * 2 + 1] = buf[i];
318                }
319            }
320        } else {
321            // INVARIANT: empty graph → silence.
322            output.fill(0.0);
323        }
324    }
325
326    /// Process a single node on the calling thread.
327    fn process_node(&mut self, node_id: NodeId, sample_rate: f32) {
328        let mut input_ptrs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS] = [None; MAX_INPUTS];
329
330        if let Some(record) = self.graph.arena.get(node_id) {
331            for (slot, maybe_src) in record.inputs.iter().enumerate() {
332                if let Some(src_id) = maybe_src {
333                    if let Some(src_record) = self.graph.arena.get(*src_id) {
334                        input_ptrs[slot] = Some(
335                            self.graph.buffers.get(src_record.output_buffer)
336                                as *const [f32; BUFFER_SIZE],
337                        );
338                    }
339                }
340            }
341        } else {
342            return;
343        }
344
345        let (output_buf_id, params_ptr, processor_ptr) = {
346            let record = self.graph.arena.get_mut(node_id).unwrap();
347            (
348                record.output_buffer,
349                &mut record.params as *mut ParamBlock,
350                &mut *record.processor as *mut dyn crate::node::DspNode,
351            )
352        };
353
354        let output_buf = self.graph.buffers.get_mut(output_buf_id);
355        let inputs: [Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS] =
356            input_ptrs.map(|p| p.map(|ptr| unsafe { &*ptr }));
357
358        unsafe {
359            (*processor_ptr).process(&inputs, output_buf, &mut *params_ptr, sample_rate);
360        }
361    }
362
363    fn apply_command(&mut self, cmd: Command) {
364        match cmd {
365            Command::AddNode { id } => { let _ = id; }
366            Command::RemoveNode { id } => { self.graph.remove_node(id); }
367            Command::Connect { src, dst, slot } => { self.graph.connect(src, dst, slot); }
368            Command::Disconnect { dst, slot } => { self.graph.disconnect(dst, slot); }
369            Command::UpdateParam { node, param_index, new_param } => {
370                if let Some(record) = self.graph.arena.get_mut(node) {
371                    if param_index < record.params.count {
372                        record.params.params[param_index] = new_param;
373                    }
374                }
375            }
376            Command::SetOutputNode { id } => { self.graph.set_output_node(id); }
377            Command::SetMute { muted } => { self.muted = muted; }
378            Command::ClearGraph => {
379                let ids: Vec<_> = self.graph.execution_order.clone();
380                for id in ids { self.graph.remove_node(id); }
381                self.graph.output_node = None;
382            }
383        }
384    }
385
386    /// Reference sequential implementation for testing.
387    /// Processes nodes in flat execution_order without parallelism.
388    #[cfg(test)]
389    fn process_graph_sequential(&mut self, output: &mut [f32]) {
390        let sr = self.sample_rate;
391
392        // Collect execution order into a local Vec to avoid borrow conflict
393        // between the immutable borrow of execution_order and the mutable
394        // borrow inside process_node.
395        let order: Vec<NodeId> = self.graph.execution_order.clone();
396        for &node_id in &order {
397            self.process_node(node_id, sr);
398        }
399
400        // Copy output node buffer to DAC
401        if self.muted {
402            output.fill(0.0);
403            return;
404        }
405        if let Some(out_id) = self.graph.output_node {
406            if let Some(record) = self.graph.arena.get(out_id) {
407                let buf = self.graph.buffers.get(record.output_buffer);
408                let frames = output.len() / 2;
409                for i in 0..frames.min(BUFFER_SIZE) {
410                    output[i * 2] = buf[i];
411                    output[i * 2 + 1] = buf[i];
412                }
413            }
414        } else {
415            output.fill(0.0);
416        }
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423    use crate::node::DspNode;
424    use proptest::prelude::*;
425
426    /// Minimal deterministic test node for property testing.
427    /// Sums all inputs and multiplies by a fixed gain.
428    struct TestNode {
429        gain: f32,
430    }
431
432    impl TestNode {
433        fn new(gain: f32) -> Self {
434            Self { gain }
435        }
436    }
437
438    impl DspNode for TestNode {
439        fn process(
440            &mut self,
441            inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
442            output: &mut [f32; BUFFER_SIZE],
443            _params: &mut ParamBlock,
444            _sample_rate: f32,
445        ) {
446            output.fill(0.0);
447            for input_opt in inputs.iter() {
448                if let Some(input) = input_opt {
449                    for i in 0..BUFFER_SIZE {
450                        output[i] += input[i] * self.gain;
451                    }
452                }
453            }
454        }
455
456        fn type_name(&self) -> &'static str {
457            "TestNode"
458        }
459    }
460
461    // Property 1
462    proptest! {
463        /// **Validates: Requirements 1.1, 1.4**
464        ///
465        /// Feature: aether-engine-upgrades, Property 1: parallel execution is output-equivalent
466        ///
467        /// Property 1: Parallel execution is output-equivalent to sequential execution.
468        ///
469        /// For any valid DSP patch (any combination of nodes and edges forming a valid DAG),
470        /// processing a block with the parallel Rayon scheduler SHALL produce a bit-identical
471        /// output buffer to processing the same block with the original sequential scheduler,
472        /// given the same initial node state and the same input.
473        #[test]
474        fn prop_parallel_equiv_sequential(
475            num_nodes in 1usize..=20,
476            edges in prop::collection::vec((0usize..20, 0usize..20, 0usize..MAX_INPUTS), 0..50),
477            seed in any::<u64>(),
478        ) {
479            // Create two identical schedulers
480            let mut scheduler_parallel = Scheduler::new(48000.0);
481            let mut scheduler_sequential = Scheduler::new(48000.0);
482
483            let mut node_ids = Vec::new();
484
485            // Add nodes to both schedulers with deterministic gains based on seed
486            for i in 0..num_nodes {
487                let gain = ((seed.wrapping_add(i as u64) % 100) as f32) / 100.0;
488                
489                let id1 = scheduler_parallel.graph.add_node(Box::new(TestNode::new(gain)));
490                let id2 = scheduler_sequential.graph.add_node(Box::new(TestNode::new(gain)));
491                
492                if let (Some(id1), Some(id2)) = (id1, id2) {
493                    // Verify both schedulers assigned the same NodeId
494                    prop_assert_eq!(id1.index, id2.index);
495                    prop_assert_eq!(id1.generation, id2.generation);
496                    node_ids.push(id1);
497                }
498            }
499
500            // Add edges to both schedulers (filter to maintain DAG invariant: src < dst)
501            for (src_idx, dst_idx, slot) in edges {
502                if src_idx < num_nodes && dst_idx < num_nodes && src_idx < dst_idx {
503                    let src = node_ids[src_idx];
504                    let dst = node_ids[dst_idx];
505                    
506                    scheduler_parallel.graph.connect(src, dst, slot);
507                    scheduler_sequential.graph.connect(src, dst, slot);
508                }
509            }
510
511            // Set output node to the last node if we have any nodes
512            if !node_ids.is_empty() {
513                let output_node = node_ids[num_nodes - 1];
514                scheduler_parallel.graph.set_output_node(output_node);
515                scheduler_sequential.graph.set_output_node(output_node);
516            }
517
518            // Prepare output buffers (stereo, 64 frames = 128 samples)
519            let mut output_parallel = vec![0.0f32; BUFFER_SIZE * 2];
520            let mut output_sequential = vec![0.0f32; BUFFER_SIZE * 2];
521
522            // Process one block with both schedulers
523            scheduler_parallel.process_graph(&mut output_parallel);
524            scheduler_sequential.process_graph_sequential(&mut output_sequential);
525
526            // Assert bit-identical output
527            for (i, (&p, &s)) in output_parallel.iter().zip(output_sequential.iter()).enumerate() {
528                prop_assert!(
529                    p == s || (p.is_nan() && s.is_nan()),
530                    "Output mismatch at sample {}: parallel={}, sequential={}",
531                    i, p, s
532                );
533            }
534        }
535    }
536}