Skip to main content

aether_core/
scheduler.rs

1//! Real-time audio scheduler.
2//!
3//! This is the hot path. It:
4//!   1. Drains bounded commands from the SPSC ring.
5//!   2. Executes the topologically sorted node list level by level.
6//!      Nodes within the same BFS level are independent and run in parallel
7//!      via Rayon's work-stealing thread pool.
8//!   3. Copies the output node's buffer to the DAC output.
9//!
10//! HARD RT RULES enforced here:
11//!   - No allocation (Vec<NodeTask> is pre-allocated per level, bounded by MAX_NODES)
12//!   - No locks
13//!   - No I/O
14//!   - No unbounded loops
15
16use ringbuf::traits::Consumer;
17
18use crate::{
19    arena::NodeId,
20    command::Command,
21    graph::DspGraph,
22    node::DspNode,
23    param::ParamBlock,
24    BUFFER_SIZE, MAX_COMMANDS_PER_TICK, MAX_INPUTS,
25};
26
27// ── Parallel dispatch helpers ─────────────────────────────────────────────────
28
29/// Per-node data bundle collected before parallel dispatch.
30///
31/// SAFETY INVARIANT: Within a single BFS level, every node writes to a distinct
32/// `BufferId` (guaranteed by the DAG structure — no two nodes in the same level
33/// share an output buffer). The `BufferPool` stores buffers in a flat `Vec`, so
34/// tasks writing to different `BufferId`s write to non-overlapping index ranges.
35/// This makes the concurrent writes safe despite using raw pointers.
36struct NodeTask {
37    output_buf_ptr: *mut [f32; BUFFER_SIZE],
38    params_ptr: *mut ParamBlock,
39    processor_ptr: *mut dyn DspNode,
40    inputs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS],
41}
42
43/// SAFETY: Within a BFS level each task accesses disjoint memory:
44/// - distinct output buffer (different BufferId → different Vec index range)
45/// - distinct processor and params (each belongs to exactly one NodeRecord)
46///
47/// No two tasks in the same level share any pointed-to memory.
48unsafe impl Send for NodeTask {}
49unsafe impl Sync for NodeTask {}
50
51// ── Scheduler ─────────────────────────────────────────────────────────────────
52
53/// Real-time audio scheduler.
54///
55/// The scheduler owns the DSP graph and processes audio in fixed-size blocks
56/// (64 samples by default). It executes nodes in topologically sorted order,
57/// with nodes at the same BFS level running in parallel via Rayon.
58///
59/// # Real-Time Safety
60///
61/// - ✅ No allocation in audio thread
62/// - ✅ No locks in audio thread
63/// - ✅ Bounded execution time
64/// - ✅ Lock-free command processing via SPSC ring
65///
66/// # Example
67///
68/// ```
69/// use aether_core::scheduler::Scheduler;
70/// use aether_core::node::DspNode;
71/// use aether_core::param::ParamBlock;
72/// use aether_core::{BUFFER_SIZE, MAX_INPUTS};
73///
74/// // Create a simple oscillator node
75/// struct Oscillator {
76///     frequency: f32,
77///     phase: f32,
78/// }
79///
80/// impl DspNode for Oscillator {
81///     fn process(&mut self, _inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
82///                output: &mut [f32; BUFFER_SIZE], _params: &mut ParamBlock, sample_rate: f32) {
83///         let phase_inc = self.frequency / sample_rate;
84///         for sample in output.iter_mut() {
85///             *sample = (self.phase * std::f32::consts::TAU).sin() * 0.3;
86///             self.phase = (self.phase + phase_inc).fract();
87///         }
88///     }
89///     fn type_name(&self) -> &'static str { "Oscillator" }
90/// }
91///
92/// // Create scheduler and add node
93/// let mut sched = Scheduler::new(48_000.0);
94/// let osc = Box::new(Oscillator { frequency: 440.0, phase: 0.0 });
95/// let id = sched.graph.add_node(osc).unwrap();
96/// sched.graph.set_output_node(id);
97///
98/// // Process one audio block
99/// let mut output = vec![0.0f32; 128];
100/// sched.process_block_simple(&mut output);
101/// ```
102///
103/// # Performance
104///
105/// - Latency: 1.33ms @ 48kHz (64 samples)
106/// - Throughput: 1000+ nodes @ <100µs processing time
107/// - Memory: Pre-allocated arena + buffer pool
108pub struct Scheduler {
109    pub graph: DspGraph,
110    pub sample_rate: f32,
111    pub muted: bool,
112}
113
114impl Scheduler {
115    /// Creates a new scheduler with the given sample rate.
116    ///
117    /// # Arguments
118    ///
119    /// * `sample_rate` - Sample rate in Hz (typically 44100.0 or 48000.0)
120    ///
121    /// # Example
122    ///
123    /// ```
124    /// use aether_core::scheduler::Scheduler;
125    ///
126    /// let sched = Scheduler::new(48_000.0);
127    /// assert_eq!(sched.sample_rate, 48_000.0);
128    /// ```
129    pub fn new(sample_rate: f32) -> Self {
130        Self {
131            graph: DspGraph::new(),
132            sample_rate,
133            muted: false,
134        }
135    }
136
137    /// Processes one audio block with command draining.
138    ///
139    /// Call this from your audio thread (e.g., CPAL stream callback).
140    /// It drains up to `MAX_COMMANDS_PER_TICK` commands from the ring buffer,
141    /// applies them to the graph, then processes all nodes in topological order.
142    ///
143    /// # Arguments
144    ///
145    /// * `cmd_consumer` - SPSC consumer for control commands from UI/control thread
146    /// * `output` - Interleaved stereo output buffer (length = BUFFER_SIZE * 2)
147    ///
148    /// # Real-Time Safety
149    ///
150    /// This function is real-time safe:
151    /// - No allocations
152    /// - No locks (uses lock-free SPSC ring)
153    /// - Bounded execution time
154    /// - Parallel node execution within BFS levels
155    ///
156    /// # Example
157    ///
158    /// ```no_run
159    /// use aether_core::scheduler::Scheduler;
160    /// use aether_core::command::Command;
161    /// use ringbuf::{HeapRb, traits::Split};
162    ///
163    /// let mut sched = Scheduler::new(48_000.0);
164    /// let (mut producer, mut consumer) = HeapRb::<Command>::new(1024).split();
165    ///
166    /// // In audio thread callback:
167    /// let mut output = vec![0.0f32; 128]; // 64 frames * 2 channels
168    /// sched.process_block(&mut consumer, &mut output);
169    /// ```
170    ///
171    /// # See Also
172    ///
173    /// * [`process_block_simple`](Self::process_block_simple) - Simplified version without command ring
174    pub fn process_block<C>(&mut self, cmd_consumer: &mut C, output: &mut [f32])
175    where
176        C: Consumer<Item = Command>,
177    {
178        let mut processed = 0;
179        while processed < MAX_COMMANDS_PER_TICK {
180            match cmd_consumer.try_pop() {
181                Some(cmd) => { self.apply_command(cmd); processed += 1; }
182                None => break,
183            }
184        }
185        self.process_graph(output);
186    }
187
188    /// Processes one audio block without command draining.
189    ///
190    /// Simplified version of [`process_block`](Self::process_block) that doesn't
191    /// drain commands from a ring buffer. Use this when the scheduler is shared
192    /// via `Arc<Mutex<>>` and the control thread mutates it directly.
193    ///
194    /// # Arguments
195    ///
196    /// * `output` - Interleaved stereo output buffer (length = BUFFER_SIZE * 2)
197    ///
198    /// # Real-Time Safety
199    ///
200    /// This function is real-time safe:
201    /// - No allocations
202    /// - No locks (assumes caller holds lock)
203    /// - Bounded execution time
204    ///
205    /// # Example
206    ///
207    /// ```
208    /// use aether_core::scheduler::Scheduler;
209    /// use aether_core::BUFFER_SIZE;
210    ///
211    /// let mut sched = Scheduler::new(48_000.0);
212    ///
213    /// // Process one block
214    /// let mut output = vec![0.0f32; BUFFER_SIZE * 2];
215    /// sched.process_block_simple(&mut output);
216    /// ```
217    ///
218    /// # See Also
219    ///
220    /// * [`process_block`](Self::process_block) - Version with command ring buffer
221    pub fn process_block_simple(&mut self, output: &mut [f32]) {
222        self.process_graph(output);
223    }
224
225    fn process_graph(&mut self, output: &mut [f32]) {
226        let sr = self.sample_rate;
227        let level_count = self.graph.levels.len();
228
229        for level_idx in 0..level_count {
230            let level_len = self.graph.levels[level_idx].len();
231
232            if level_len == 0 {
233                continue;
234            } else if level_len == 1 {
235                // Zero-overhead path: single node, no Rayon overhead.
236                let node_id = self.graph.levels[level_idx][0];
237                self.process_node(node_id, sr);
238            } else {
239                // Parallel path: collect raw pointers while holding &mut self,
240                // then dispatch DSP work in parallel via rayon::scope.
241                //
242                // SAFETY: Within a BFS level, every node writes to a distinct
243                // output buffer (disjoint BufferId). The BufferPool stores buffers
244                // in a flat Vec; tasks write to non-overlapping index ranges.
245                // Each processor and ParamBlock belongs to exactly one node.
246                let mut tasks: Vec<NodeTask> = Vec::with_capacity(level_len);
247
248                for i in 0..level_len {
249                    let node_id = self.graph.levels[level_idx][i];
250                    let mut input_ptrs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS] =
251                        [None; MAX_INPUTS];
252
253                    if let Some(record) = self.graph.arena.get(node_id) {
254                        for (slot, maybe_src) in record.inputs.iter().enumerate() {
255                            if let Some(src_id) = maybe_src {
256                                if let Some(src_record) = self.graph.arena.get(*src_id) {
257                                    input_ptrs[slot] = Some(
258                                        self.graph.buffers.get(src_record.output_buffer)
259                                            as *const [f32; BUFFER_SIZE],
260                                    );
261                                }
262                            }
263                        }
264                        let record_mut = self.graph.arena.get_mut(node_id).unwrap();
265                        let output_buf_ptr = self.graph.buffers.get_mut(record_mut.output_buffer)
266                            as *mut [f32; BUFFER_SIZE];
267                        let params_ptr = &mut record_mut.params as *mut ParamBlock;
268                        let processor_ptr = &mut *record_mut.processor as *mut dyn DspNode;
269
270                        tasks.push(NodeTask {
271                            output_buf_ptr,
272                            params_ptr,
273                            processor_ptr,
274                            inputs: input_ptrs,
275                        });
276                    }
277                }
278
279                // SAFETY: each element of `tasks` points to disjoint memory.
280                // We pass a raw pointer per task so each closure captures a
281                // distinct non-aliasing pointer.
282                #[cfg(feature = "parallel")]
283                {
284                    rayon::scope(|s| {
285                        for task in tasks.iter_mut() {
286                            // Capture the raw pointer value (usize) to avoid the
287                            // borrow checker complaining about &mut Vec element borrows.
288                            let ptr = task as *mut NodeTask as usize;
289                            s.spawn(move |_| {
290                                // SAFETY: ptr is a valid, exclusively-owned NodeTask.
291                                let t: &mut NodeTask = unsafe { &mut *(ptr as *mut NodeTask) };
292                                let inputs: [Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS] =
293                                    t.inputs.map(|p| p.map(|raw| unsafe { &*raw }));
294                                unsafe {
295                                    (*t.processor_ptr).process(
296                                        &inputs,
297                                        &mut *t.output_buf_ptr,
298                                        &mut *t.params_ptr,
299                                        sr,
300                                    );
301                                }
302                            });
303                        }
304                    });
305                }
306
307                // Sequential fallback when parallel feature is disabled
308                #[cfg(not(feature = "parallel"))]
309                {
310                    for task in tasks.iter_mut() {
311                        let inputs: [Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS] =
312                            task.inputs.map(|p| p.map(|raw| unsafe { &*raw }));
313                        unsafe {
314                            (*task.processor_ptr).process(
315                                &inputs,
316                                &mut *task.output_buf_ptr,
317                                &mut *task.params_ptr,
318                                sr,
319                            );
320                        }
321                    }
322                }
323            }
324        }
325
326        // Copy output node buffer to DAC
327        if self.muted {
328            output.fill(0.0);
329            return;
330        }
331        if let Some(out_id) = self.graph.output_node {
332            if let Some(record) = self.graph.arena.get(out_id) {
333                let buf = self.graph.buffers.get(record.output_buffer);
334                let frames = output.len() / 2;
335                for i in 0..frames.min(BUFFER_SIZE) {
336                    output[i * 2] = buf[i];
337                    output[i * 2 + 1] = buf[i];
338                }
339            }
340        } else {
341            // INVARIANT: empty graph → silence.
342            output.fill(0.0);
343        }
344    }
345
346    /// Process a single node on the calling thread.
347    fn process_node(&mut self, node_id: NodeId, sample_rate: f32) {
348        let mut input_ptrs: [Option<*const [f32; BUFFER_SIZE]>; MAX_INPUTS] = [None; MAX_INPUTS];
349
350        if let Some(record) = self.graph.arena.get(node_id) {
351            for (slot, maybe_src) in record.inputs.iter().enumerate() {
352                if let Some(src_id) = maybe_src {
353                    if let Some(src_record) = self.graph.arena.get(*src_id) {
354                        input_ptrs[slot] = Some(
355                            self.graph.buffers.get(src_record.output_buffer)
356                                as *const [f32; BUFFER_SIZE],
357                        );
358                    }
359                }
360            }
361        } else {
362            return;
363        }
364
365        let (output_buf_id, params_ptr, processor_ptr) = {
366            let record = self.graph.arena.get_mut(node_id).unwrap();
367            (
368                record.output_buffer,
369                &mut record.params as *mut ParamBlock,
370                &mut *record.processor as *mut dyn crate::node::DspNode,
371            )
372        };
373
374        let output_buf = self.graph.buffers.get_mut(output_buf_id);
375        let inputs: [Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS] =
376            input_ptrs.map(|p| p.map(|ptr| unsafe { &*ptr }));
377
378        unsafe {
379            (*processor_ptr).process(&inputs, output_buf, &mut *params_ptr, sample_rate);
380        }
381    }
382
383    fn apply_command(&mut self, cmd: Command) {
384        match cmd {
385            Command::AddNode { id } => { let _ = id; }
386            Command::RemoveNode { id } => { self.graph.remove_node(id); }
387            Command::Connect { src, dst, slot } => { self.graph.connect(src, dst, slot); }
388            Command::Disconnect { dst, slot } => { self.graph.disconnect(dst, slot); }
389            Command::UpdateParam { node, param_index, new_param } => {
390                if let Some(record) = self.graph.arena.get_mut(node) {
391                    if param_index < record.params.count {
392                        record.params.params[param_index] = new_param;
393                    }
394                }
395            }
396            Command::SetOutputNode { id } => { self.graph.set_output_node(id); }
397            Command::SetMute { muted } => { self.muted = muted; }
398            Command::ClearGraph => {
399                let ids: Vec<_> = self.graph.execution_order.clone();
400                for id in ids { self.graph.remove_node(id); }
401                self.graph.output_node = None;
402            }
403        }
404    }
405
406    /// Reference sequential implementation for testing.
407    /// Processes nodes in flat execution_order without parallelism.
408    #[cfg(test)]
409    fn process_graph_sequential(&mut self, output: &mut [f32]) {
410        let sr = self.sample_rate;
411
412        // Collect execution order into a local Vec to avoid borrow conflict
413        // between the immutable borrow of execution_order and the mutable
414        // borrow inside process_node.
415        let order: Vec<NodeId> = self.graph.execution_order.clone();
416        for &node_id in &order {
417            self.process_node(node_id, sr);
418        }
419
420        // Copy output node buffer to DAC
421        if self.muted {
422            output.fill(0.0);
423            return;
424        }
425        if let Some(out_id) = self.graph.output_node {
426            if let Some(record) = self.graph.arena.get(out_id) {
427                let buf = self.graph.buffers.get(record.output_buffer);
428                let frames = output.len() / 2;
429                for i in 0..frames.min(BUFFER_SIZE) {
430                    output[i * 2] = buf[i];
431                    output[i * 2 + 1] = buf[i];
432                }
433            }
434        } else {
435            output.fill(0.0);
436        }
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use super::*;
443    use crate::node::DspNode;
444    use proptest::prelude::*;
445
446    /// Minimal deterministic test node for property testing.
447    /// Sums all inputs and multiplies by a fixed gain.
448    struct TestNode {
449        gain: f32,
450    }
451
452    impl TestNode {
453        fn new(gain: f32) -> Self {
454            Self { gain }
455        }
456    }
457
458    impl DspNode for TestNode {
459        fn process(
460            &mut self,
461            inputs: &[Option<&[f32; BUFFER_SIZE]>; MAX_INPUTS],
462            output: &mut [f32; BUFFER_SIZE],
463            _params: &mut ParamBlock,
464            _sample_rate: f32,
465        ) {
466            output.fill(0.0);
467            for input_opt in inputs.iter() {
468                if let Some(input) = input_opt {
469                    for i in 0..BUFFER_SIZE {
470                        output[i] += input[i] * self.gain;
471                    }
472                }
473            }
474        }
475
476        fn type_name(&self) -> &'static str {
477            "TestNode"
478        }
479    }
480
481    // Property 1
482    proptest! {
483        /// **Validates: Requirements 1.1, 1.4**
484        ///
485        /// Feature: aether-engine-upgrades, Property 1: parallel execution is output-equivalent
486        ///
487        /// Property 1: Parallel execution is output-equivalent to sequential execution.
488        ///
489        /// For any valid DSP patch (any combination of nodes and edges forming a valid DAG),
490        /// processing a block with the parallel Rayon scheduler SHALL produce a bit-identical
491        /// output buffer to processing the same block with the original sequential scheduler,
492        /// given the same initial node state and the same input.
493        #[test]
494        fn prop_parallel_equiv_sequential(
495            num_nodes in 1usize..=20,
496            edges in prop::collection::vec((0usize..20, 0usize..20, 0usize..MAX_INPUTS), 0..50),
497            seed in any::<u64>(),
498        ) {
499            // Create two identical schedulers
500            let mut scheduler_parallel = Scheduler::new(48000.0);
501            let mut scheduler_sequential = Scheduler::new(48000.0);
502
503            let mut node_ids = Vec::new();
504
505            // Add nodes to both schedulers with deterministic gains based on seed
506            for i in 0..num_nodes {
507                let gain = ((seed.wrapping_add(i as u64) % 100) as f32) / 100.0;
508                
509                let id1 = scheduler_parallel.graph.add_node(Box::new(TestNode::new(gain)));
510                let id2 = scheduler_sequential.graph.add_node(Box::new(TestNode::new(gain)));
511                
512                if let (Some(id1), Some(id2)) = (id1, id2) {
513                    // Verify both schedulers assigned the same NodeId
514                    prop_assert_eq!(id1.index, id2.index);
515                    prop_assert_eq!(id1.generation, id2.generation);
516                    node_ids.push(id1);
517                }
518            }
519
520            // Add edges to both schedulers (filter to maintain DAG invariant: src < dst)
521            for (src_idx, dst_idx, slot) in edges {
522                if src_idx < num_nodes && dst_idx < num_nodes && src_idx < dst_idx {
523                    let src = node_ids[src_idx];
524                    let dst = node_ids[dst_idx];
525                    
526                    scheduler_parallel.graph.connect(src, dst, slot);
527                    scheduler_sequential.graph.connect(src, dst, slot);
528                }
529            }
530
531            // Set output node to the last node if we have any nodes
532            if !node_ids.is_empty() {
533                let output_node = node_ids[num_nodes - 1];
534                scheduler_parallel.graph.set_output_node(output_node);
535                scheduler_sequential.graph.set_output_node(output_node);
536            }
537
538            // Prepare output buffers (stereo, 64 frames = 128 samples)
539            let mut output_parallel = vec![0.0f32; BUFFER_SIZE * 2];
540            let mut output_sequential = vec![0.0f32; BUFFER_SIZE * 2];
541
542            // Process one block with both schedulers
543            scheduler_parallel.process_graph(&mut output_parallel);
544            scheduler_sequential.process_graph_sequential(&mut output_sequential);
545
546            // Assert bit-identical output
547            for (i, (&p, &s)) in output_parallel.iter().zip(output_sequential.iter()).enumerate() {
548                prop_assert!(
549                    p == s || (p.is_nan() && s.is_nan()),
550                    "Output mismatch at sample {}: parallel={}, sequential={}",
551                    i, p, s
552                );
553            }
554        }
555    }
556}