Skip to main content

rill_graph/
engine.rs

1use crossbeam_channel::{Receiver, Sender, TryRecvError};
2use rill_core::math::Transcendental;
3use rill_core::queues::signal::CommandEnum;
4use rill_core::queues::telemetry::Telemetry;
5use rill_core::time::ClockTick;
6use rill_core::traits::processable::{NodeVariant, Processable};
7use rill_core::traits::port::Port;
8use rill_core::traits::{SignalNode, PortId, ProcessResult};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::thread;
12
13/// Real-time safe signal engine for a static signal graph.
14///
15/// Owns the mutable node state and provides:
16///
17/// 1. **Clock boundary** — [`process_tick`](Self::process_tick): drains
18///    commands (with anti-ack) and runs `pre_process` (feedback mix).
19/// 2. **Block processing** — [`process_block`](Self::process_block):
20///    iterates nodes in topological order: for each node, calls
21///    `process_block`, then `snapshot_feedback`, then `propagate` to
22///    downstream nodes via pre-established port connections.
23/// 3. **Thread management** — [`start`](Self::start)/[`stop`](Self::stop)
24///    manage a cooperative running flag; [`spawn`](Self::spawn) consumes
25///    the engine and runs it in a dedicated signal thread.
26///
27/// A separate control thread communicates via command/telemetry queues.
28///
29/// # Type Parameters
30/// - `T` — floating-point type (`f32` or `f64`)
31/// - `BUF_SIZE` — block size (must match the graph)
32pub struct SignalEngine<T: Transcendental, const BUF_SIZE: usize> {
33    nodes: Vec<NodeVariant<T, BUF_SIZE>>,
34    topo_order: Vec<usize>,
35    cmd_slots: Vec<Option<CommandEnum>>,
36    cmd_rx: Option<Receiver<CommandEnum>>,
37    tel_tx: Option<Sender<Telemetry>>,
38    running: Arc<AtomicBool>,
39}
40
41impl<T: Transcendental, const BUF_SIZE: usize> SignalEngine<T, BUF_SIZE> {
42    /// Create a new engine from graph parts.
43    pub fn new(
44        nodes: Vec<NodeVariant<T, BUF_SIZE>>,
45        topo_order: Vec<usize>,
46        cmd_rx: Option<Receiver<CommandEnum>>,
47        tel_tx: Option<Sender<Telemetry>>,
48    ) -> Self {
49        let node_count = nodes.len();
50        Self {
51            nodes,
52            topo_order,
53            cmd_slots: vec![None; node_count.max(1)],
54            cmd_rx,
55            tel_tx,
56            running: Arc::new(AtomicBool::new(false)),
57        }
58    }
59
60    /// Process a clock tick — called from the I/O callback when hardware fires.
61    ///
62    /// Drains pending commands (with anti-ack telemetry on overwrite), then
63    /// runs `pre_process` on all input ports (mixes feedback from previous block),
64    /// then applies any pending `SetParameter` commands to their target nodes.
65    ///
66    /// Returns the number of commands applied this tick.
67    pub fn process_tick(&mut self, tick: &ClockTick) -> usize {
68        let mut applied = 0usize;
69
70        // === 1. Drain command queue into sparse slots ===
71        if let Some(ref rx) = self.cmd_rx {
72            loop {
73                let cmd = match rx.try_recv() {
74                    Ok(cmd) => cmd,
75                    Err(TryRecvError::Empty) => break,
76                    Err(TryRecvError::Disconnected) => {
77                        self.cmd_rx = None;
78                        break;
79                    }
80                };
81
82                let nid = match cmd.target_node_id() {
83                    Some(id) => id.inner() as usize,
84                    None => continue,
85                };
86
87                if nid >= self.cmd_slots.len() {
88                    continue;
89                }
90
91                // Anti-ack: if slot is occupied, notify control
92                if self.cmd_slots[nid].is_some() {
93                    let _ = self.tel_tx.as_ref().map(|tx| {
94                        let _ = tx.try_send(Telemetry::event(
95                            "engine",
96                            "command_dropped",
97                            vec![nid as f32],
98                        ));
99                    });
100                }
101
102                self.cmd_slots[nid] = Some(cmd);
103            }
104        }
105
106        // === 2. pre_process on all nodes (feedback mix — block boundary) ===
107        for &idx in &self.topo_order {
108            let num_inputs = self.nodes[idx].num_signal_inputs();
109            for pi in 0..num_inputs {
110                if let Some(port) = self.nodes[idx].input_port_mut(pi) {
111                    port.pre_process(tick);
112                }
113            }
114        }
115
116        // === 3. Apply pending commands ===
117        for &idx in &self.topo_order {
118            if let Some(cmd) = self.cmd_slots[idx].take() {
119                if let Some(sp) = cmd.as_set_parameter() {
120                    let _ = self.nodes[idx].apply_set_parameter(sp);
121                    applied += 1;
122                }
123            }
124        }
125
126        applied
127    }
128
129    /// Convenience: run a full processing cycle for one block.
130    ///
131    /// Calls `process_tick`, then iterates nodes in topological order:
132    /// **process → snapshot_feedback → propagate** for each node.
133    /// This ensures data flows naturally: Source → propagate → Processor → Sink.
134    ///
135    /// Note: copies port buffers to build input slices for `ProcessContext`.
136    /// Production I/O callbacks should call `process_tick` and handle data
137    /// propagation themselves for zero-copy processing.
138    #[allow(unsafe_code)]
139    pub fn process_block(&mut self, tick: &ClockTick) -> ProcessResult<usize> {
140        let applied = self.process_tick(tick);
141
142        for &idx in &self.topo_order {
143            // Build audio input references:
144            // - Zero-copy ports: deref upstream_buffer directly via `Port::upstream_ref`
145            //   (no borrow of self.nodes[idx], no lifetime conflict)
146            // - Copy-based ports: copy buffer into local storage
147            let mut copy_bufs: Vec<[T; BUF_SIZE]> = Vec::new();
148            let mut audio_refs: Vec<&[T; BUF_SIZE]> = Vec::new();
149            for pi in 0..self.nodes[idx].num_signal_inputs() {
150                if let Some(port) = self.nodes[idx].input_port(pi) {
151                    match port.upstream_buffer {
152                        Some(ptr) => {
153                            // SAFETY: the graph is static, single-threaded,
154                            // upstream is processed before downstream.
155                            let buf = unsafe { Port::upstream_ref(ptr) };
156                            audio_refs.push(buf.as_array());
157                        }
158                        None => {
159                            copy_bufs.push(*port.buffer.as_array());
160                        }
161                    }
162                }
163            }
164            // Extend audio_refs with owned copies (no borrow of self.nodes)
165            for buf in &copy_bufs {
166                audio_refs.push(buf);
167            }
168
169            let owned_control: Vec<T> = (0..self.nodes[idx].num_control_inputs())
170                .filter_map(|ci| self.nodes[idx].control_port(ci))
171                .map(|p| *p.buffer.as_array().first().unwrap_or(&T::ZERO))
172                .collect();
173
174            let owned_clock: Vec<ClockTick> = (0..self.nodes[idx].num_clock_inputs())
175                .map(|_| *tick)
176                .collect();
177
178            let owned_feedback: Vec<[T; BUF_SIZE]> = (0..self.nodes[idx].num_feedback_ports())
179                .filter_map(|fi| self.nodes[idx].input_port(fi))
180                .map(|p| *p.buffer.as_array())
181                .collect();
182            let feedback_refs: Vec<&[T; BUF_SIZE]> = owned_feedback.iter().collect();
183
184            let mut ctx = rill_core::traits::processable::ProcessContext {
185                clock: tick,
186                signal_inputs: &audio_refs,
187                control_inputs: &owned_control,
188                clock_inputs: &owned_clock,
189                feedback_inputs: &feedback_refs,
190            };
191
192            self.nodes[idx].process_block(&mut ctx)?;
193
194            let num_outputs = self.nodes[idx].num_signal_outputs();
195            for po in 0..num_outputs {
196                if let Some(port) = self.nodes[idx].output_port_mut(po) {
197                    port.snapshot_feedback();
198                }
199            }
200
201            // Propagate outputs to downstream nodes (only for ports
202            // without upstream — upstream ports read zero-copy directly)
203            for po in 0..num_outputs {
204                let (downstream, data) = match self.nodes[idx].output_port(po) {
205                    Some(port) if !port.downstream.is_empty() => {
206                        (port.downstream.clone(), *port.buffer.as_array())
207                    }
208                    _ => continue,
209                };
210                for &(to_n, to_p) in &downstream {
211                    if let Some(port) = self.nodes[to_n].input_port_mut(to_p) {
212                        if port.upstream_buffer.is_some() {
213                            // Zero-copy: skip propagate, node reads directly
214                            // from upstream output buffer.
215                            continue;
216                        }
217                        let buf = port.buffer.as_mut_array();
218                        *buf = data;
219                    }
220                }
221            }
222        }
223
224        Ok(applied)
225    }
226
227    /// Access nodes for external processing (I/O layer).
228    /// The caller should iterate in `topo_order`.
229    pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
230        &self.nodes
231    }
232
233    /// Mutable access to nodes for external processing.
234    pub fn nodes_mut(&mut self) -> &mut [NodeVariant<T, BUF_SIZE>] {
235        &mut self.nodes
236    }
237
238    /// Topological order — indices into `nodes()`.
239    pub fn topo_order(&self) -> &[usize] {
240        &self.topo_order
241    }
242
243    /// Set the running flag for cooperative thread shutdown.
244    pub fn start(&mut self) {
245        self.running.store(true, Ordering::SeqCst);
246    }
247
248    /// Clear the running flag.
249    pub fn stop(&self) {
250        self.running.store(false, Ordering::SeqCst);
251    }
252
253    /// Check if the engine is running.
254    pub fn is_running(&self) -> bool {
255        self.running.load(Ordering::SeqCst)
256    }
257
258    /// Spawn a dedicated signal thread that runs `process_block` in a loop.
259    /// The engine is moved into the thread; communication happens through
260    /// command/telemetry queues.
261    ///
262    /// Returns a handle for joining the thread. Signal shutdown by setting
263    /// the running flag to `false` via [`running_flag`](Self::running_flag).
264    pub fn spawn(mut self) -> std::thread::JoinHandle<()> {
265        let running = self.running.clone();
266        running.store(true, Ordering::SeqCst);
267
268        thread::Builder::new()
269            .name("rill-signal".into())
270            .spawn(move || {
271                let mut tick = ClockTick::new(0, BUF_SIZE as u32, 44100.0);
272                while running.load(Ordering::SeqCst) {
273                    if let Err(e) = self.process_block(&tick) {
274                        log::error!("SignalEngine error: {:?}", e);
275                        break;
276                    }
277                    tick.advance(BUF_SIZE as u32);
278                }
279            })
280            .expect("failed to spawn rill-signal thread")
281    }
282
283    /// Clone of the running flag, for signaling shutdown from another thread.
284    pub fn running_flag(&self) -> Arc<AtomicBool> {
285        self.running.clone()
286    }
287
288    /// Attach a command receiver after construction.
289    pub fn attach_command_rx(&mut self, rx: Receiver<CommandEnum>) {
290        self.cmd_rx = Some(rx);
291    }
292
293    /// Attach a telemetry sender after construction.
294    pub fn attach_telemetry_tx(&mut self, tx: Sender<Telemetry>) {
295        self.tel_tx = Some(tx);
296    }
297
298    /// Borrow the command slots (for external processing that needs to check
299    /// or consume pending commands).
300    pub fn cmd_slots(&self) -> &[Option<CommandEnum>] {
301        &self.cmd_slots
302    }
303
304    /// Mutably borrow the command slots.
305    pub fn cmd_slots_mut(&mut self) -> &mut [Option<CommandEnum>] {
306        &mut self.cmd_slots
307    }
308}
309
310// ============================================================================
311// Tests
312// ============================================================================
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317    use rill_core::math::Transcendental;
318    use rill_core::queues::signal::{AutomatonCommand, SetParameter, SignalSource};
319    use rill_core::queues::CommandQueue;
320    use rill_core::traits::{
321        SignalNode, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
322        PortDirection, PortId, ProcessResult, Processor, Sink, Source,
323    };
324
325    // ------------------------------------------------------------------
326    // Mock: ConstantSource
327    // ------------------------------------------------------------------
328    struct ConstantSource<T: Transcendental, const BUF_SIZE: usize> {
329        id: NodeId,
330        value: T,
331        state: NodeState<T, BUF_SIZE>,
332        outputs: Vec<Port<T, BUF_SIZE>>,
333    }
334
335    impl<T: Transcendental, const BUF_SIZE: usize> ConstantSource<T, BUF_SIZE> {
336        fn new(id: NodeId, value: T, sample_rate: f32) -> Self {
337            let mut outputs = Vec::with_capacity(1);
338            outputs.push(Port {
339                id: PortId::audio_out(id, 0),
340                name: "output".into(),
341                direction: PortDirection::Output,
342                action: None,
343                pending_command: None,
344                buffer: Default::default(),
345                feedback_buffer: None,
346                downstream: Vec::new(),
347                feedback_downstream: Vec::new(),
348            upstream_buffer: None,
349            });
350            Self {
351                id,
352                value,
353                state: NodeState::new(sample_rate),
354                outputs,
355            }
356        }
357    }
358
359    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
360        for ConstantSource<T, BUF_SIZE>
361    {
362        fn metadata(&self) -> NodeMetadata {
363            NodeMetadata { type_name: None,
364                name: "ConstantSource".into(),
365                category: NodeCategory::Source,
366                description: String::new(),
367                author: String::new(),
368                version: "1.0".into(),
369                signal_inputs: 0,
370                signal_outputs: 1,
371                control_inputs: 0,
372                control_outputs: 0,
373                clock_inputs: 0,
374                clock_outputs: 0,
375                feedback_ports: 0,
376                parameters: vec![],
377            }
378        }
379        fn init(&mut self, _sample_rate: f32) {}
380        fn reset(&mut self) {}
381        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
382            None
383        }
384        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
385            Ok(())
386        }
387        fn id(&self) -> NodeId {
388            self.id
389        }
390        fn set_id(&mut self, _id: NodeId) {}
391        fn num_signal_outputs(&self) -> usize {
392            1
393        }
394        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
395            None
396        }
397        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
398            None
399        }
400        fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
401            self.outputs.get(index)
402        }
403        fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
404            self.outputs.get_mut(index)
405        }
406        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
407            None
408        }
409        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
410            None
411        }
412        fn state(&self) -> &NodeState<T, BUF_SIZE> {
413            &self.state
414        }
415        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
416            &mut self.state
417        }
418    }
419
420    impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE>
421        for ConstantSource<T, BUF_SIZE>
422    {
423        fn generate(
424            &mut self,
425            _clock: &ClockTick,
426            _control_inputs: &[T],
427            _clock_inputs: &[ClockTick],
428        ) -> ProcessResult<()> {
429            let buf = self.outputs[0].buffer.as_mut_array();
430            for sample in buf.iter_mut() {
431                *sample = self.value;
432            }
433            Ok(())
434        }
435    }
436
437    // ------------------------------------------------------------------
438    // Mock: NoopProcessor
439    // ------------------------------------------------------------------
440    struct NoopProcessor<T: Transcendental, const BUF_SIZE: usize> {
441        id: NodeId,
442        state: NodeState<T, BUF_SIZE>,
443        inputs: Vec<Port<T, BUF_SIZE>>,
444        outputs: Vec<Port<T, BUF_SIZE>>,
445    }
446
447    impl<T: Transcendental, const BUF_SIZE: usize> NoopProcessor<T, BUF_SIZE> {
448        fn new(id: NodeId, sample_rate: f32) -> Self {
449            let mut inputs = Vec::with_capacity(1);
450            inputs.push(Port {
451                id: PortId::audio_in(id, 0),
452                name: "input".into(),
453                direction: PortDirection::Input,
454                action: None,
455                pending_command: None,
456                buffer: Default::default(),
457                feedback_buffer: None,
458                downstream: Vec::new(),
459                feedback_downstream: Vec::new(),
460            upstream_buffer: None,
461            });
462            let mut outputs = Vec::with_capacity(1);
463            outputs.push(Port {
464                id: PortId::audio_out(id, 0),
465                name: "output".into(),
466                direction: PortDirection::Output,
467                action: None,
468                pending_command: None,
469                buffer: Default::default(),
470                feedback_buffer: None,
471                downstream: Vec::new(),
472                feedback_downstream: Vec::new(),
473            upstream_buffer: None,
474            });
475            Self {
476                id,
477                state: NodeState::new(sample_rate),
478                inputs,
479                outputs,
480            }
481        }
482    }
483
484    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
485        for NoopProcessor<T, BUF_SIZE>
486    {
487        fn metadata(&self) -> NodeMetadata {
488            NodeMetadata { type_name: None,
489                name: "NoopProcessor".into(),
490                category: NodeCategory::Processor,
491                description: String::new(),
492                author: String::new(),
493                version: "1.0".into(),
494                signal_inputs: 1,
495                signal_outputs: 1,
496                control_inputs: 0,
497                control_outputs: 0,
498                clock_inputs: 0,
499                clock_outputs: 0,
500                feedback_ports: 0,
501                parameters: vec![],
502            }
503        }
504        fn init(&mut self, _sample_rate: f32) {}
505        fn reset(&mut self) {}
506        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
507            None
508        }
509        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
510            Ok(())
511        }
512        fn id(&self) -> NodeId {
513            self.id
514        }
515        fn set_id(&mut self, _id: NodeId) {}
516        fn num_signal_inputs(&self) -> usize {
517            1
518        }
519        fn num_signal_outputs(&self) -> usize {
520            1
521        }
522        fn input_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
523            self.inputs.get(index)
524        }
525        fn input_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
526            self.inputs.get_mut(index)
527        }
528        fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
529            self.outputs.get(index)
530        }
531        fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
532            self.outputs.get_mut(index)
533        }
534        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
535            None
536        }
537        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
538            None
539        }
540        fn state(&self) -> &NodeState<T, BUF_SIZE> {
541            &self.state
542        }
543        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
544            &mut self.state
545        }
546    }
547
548    impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
549        for NoopProcessor<T, BUF_SIZE>
550    {
551        fn process(
552            &mut self,
553            _clock: &ClockTick,
554            signal_inputs: &[&[T; BUF_SIZE]],
555            _control_inputs: &[T],
556            _clock_inputs: &[ClockTick],
557            _feedback_inputs: &[&[T; BUF_SIZE]],
558        ) -> ProcessResult<()> {
559            let output = self.outputs[0].buffer.as_mut_array();
560            if let Some(input) = signal_inputs.first() {
561                output.copy_from_slice(*input);
562            }
563            Ok(())
564        }
565    }
566
567    // ------------------------------------------------------------------
568    // Mock: CaptureSink
569    // ------------------------------------------------------------------
570    struct CaptureSink<T: Transcendental, const BUF_SIZE: usize> {
571        id: NodeId,
572        state: NodeState<T, BUF_SIZE>,
573        inputs: Vec<Port<T, BUF_SIZE>>,
574        captured: Vec<T>,
575    }
576
577    impl<T: Transcendental, const BUF_SIZE: usize> CaptureSink<T, BUF_SIZE> {
578        fn new(id: NodeId, sample_rate: f32) -> Self {
579            let mut inputs = Vec::with_capacity(1);
580            inputs.push(Port {
581                id: PortId::audio_in(id, 0),
582                name: "input".into(),
583                direction: PortDirection::Input,
584                action: None,
585                pending_command: None,
586                buffer: Default::default(),
587                feedback_buffer: None,
588                downstream: Vec::new(),
589                feedback_downstream: Vec::new(),
590            upstream_buffer: None,
591            });
592            Self {
593                id,
594                state: NodeState::new(sample_rate),
595                inputs,
596                captured: Vec::new(),
597            }
598        }
599
600        #[allow(dead_code)]
601        fn captured(&self) -> &[T] {
602            &self.captured
603        }
604    }
605
606    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
607        for CaptureSink<T, BUF_SIZE>
608    {
609        fn metadata(&self) -> NodeMetadata {
610            NodeMetadata { type_name: None,
611                name: "CaptureSink".into(),
612                category: NodeCategory::Sink,
613                description: String::new(),
614                author: String::new(),
615                version: "1.0".into(),
616                signal_inputs: 1,
617                signal_outputs: 0,
618                control_inputs: 0,
619                control_outputs: 0,
620                clock_inputs: 0,
621                clock_outputs: 0,
622                feedback_ports: 0,
623                parameters: vec![],
624            }
625        }
626        fn init(&mut self, _sample_rate: f32) {}
627        fn reset(&mut self) {}
628        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
629            None
630        }
631        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
632            Ok(())
633        }
634        fn id(&self) -> NodeId {
635            self.id
636        }
637        fn set_id(&mut self, _id: NodeId) {}
638        fn num_signal_inputs(&self) -> usize {
639            1
640        }
641        fn input_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
642            self.inputs.get(index)
643        }
644        fn input_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
645            self.inputs.get_mut(index)
646        }
647        fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
648            None
649        }
650        fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
651            None
652        }
653        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
654            None
655        }
656        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
657            None
658        }
659        fn state(&self) -> &NodeState<T, BUF_SIZE> {
660            &self.state
661        }
662        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
663            &mut self.state
664        }
665    }
666
667    impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE>
668        for CaptureSink<T, BUF_SIZE>
669    {
670        fn consume(
671            &mut self,
672            _clock: &ClockTick,
673            signal_inputs: &[&[T; BUF_SIZE]],
674            _control_inputs: &[T],
675            _clock_inputs: &[ClockTick],
676            _feedback_inputs: &[&[T; BUF_SIZE]],
677        ) -> ProcessResult<()> {
678            if let Some(input) = signal_inputs.first() {
679                self.captured = input.to_vec();
680            }
681            Ok(())
682        }
683    }
684
685    // ==================================================================
686    // Tests
687    // ==================================================================
688
689    #[test]
690    fn test_engine_process_tick_drains_commands() {
691        const BUF: usize = 64;
692        let cmd_queue = CommandQueue::<CommandEnum>::new("test", 16);
693        let cmd_rx = cmd_queue.receiver();
694        let tick = ClockTick::new(0, BUF as u32, 44100.0);
695
696        let nodes: Vec<NodeVariant<f32, BUF>> = vec![NodeVariant::Source(Box::new(
697            ConstantSource::new(NodeId(0), 1.0, 44100.0),
698        ))];
699
700        let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0], Some(cmd_rx), None);
701
702        let cmd = CommandEnum::SetParameter(SetParameter::new(
703            PortId::param(NodeId(0), 0),
704            ParameterId::new("gain").unwrap(),
705            0.5,
706            SignalSource::Manual,
707        ));
708        cmd_queue.send(cmd).unwrap();
709
710        let applied = engine.process_tick(&tick);
711        assert_eq!(applied, 1);
712    }
713
714    #[test]
715    fn test_engine_process_tick_anti_ack() {
716        const BUF: usize = 64;
717        let cmd_queue = CommandQueue::<CommandEnum>::new("test", 16);
718        let cmd_rx = cmd_queue.receiver();
719        let (tel_tx, tel_rx) = crossbeam_channel::unbounded();
720        let tick = ClockTick::new(0, BUF as u32, 44100.0);
721
722        let nodes: Vec<NodeVariant<f32, BUF>> = vec![NodeVariant::Source(Box::new(
723            ConstantSource::new(NodeId(0), 1.0, 44100.0),
724        ))];
725
726        let mut engine = SignalEngine::<f32, BUF>::new(
727            nodes,
728            vec![0],
729            Some(cmd_rx),
730            Some(tel_tx),
731        );
732
733        let pid = ParameterId::new("gain").unwrap();
734        let cmd1 = CommandEnum::SetParameter(SetParameter::new(
735            PortId::param(NodeId(0), 0),
736            pid.clone(),
737            0.3,
738            SignalSource::Manual,
739        ));
740        let cmd2 = CommandEnum::SetParameter(SetParameter::new(
741            PortId::param(NodeId(0), 0),
742            pid,
743            0.8,
744            SignalSource::Manual,
745        ));
746        cmd_queue.send(cmd1).unwrap();
747        cmd_queue.send(cmd2).unwrap();
748
749        let applied = engine.process_tick(&tick);
750
751        // First command was overwritten (anti-ack), second applied
752        assert_eq!(applied, 1);
753
754        let tel = tel_rx.try_recv().unwrap();
755        match tel {
756            Telemetry::Event { kind, data, .. } => {
757                assert_eq!(kind, "command_dropped");
758                assert_eq!(data, vec![0.0]);
759            }
760            _ => panic!("expected Event telemetry"),
761        }
762    }
763
764    #[test]
765    fn test_engine_process_tick_skips_non_set_parameter() {
766        const BUF: usize = 64;
767        let cmd_queue = CommandQueue::<CommandEnum>::new("test", 16);
768        let cmd_rx = cmd_queue.receiver();
769        let tick = ClockTick::new(0, BUF as u32, 44100.0);
770
771        let nodes: Vec<NodeVariant<f32, BUF>> = vec![NodeVariant::Source(Box::new(
772            ConstantSource::new(NodeId(0), 1.0, 44100.0),
773        ))];
774
775        let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0], Some(cmd_rx), None);
776
777        let cmd = CommandEnum::Automaton(AutomatonCommand::SetEnabled {
778            id: "test".into(),
779            enabled: true,
780        });
781        cmd_queue.send(cmd).unwrap();
782
783        let applied = engine.process_tick(&tick);
784        assert_eq!(applied, 0);
785    }
786
787    #[test]
788    fn test_engine_process_block_is_convenience_method() {
789        const BUF: usize = 64;
790        let tick = ClockTick::new(0, BUF as u32, 44100.0);
791
792        let nodes: Vec<NodeVariant<f32, BUF>> = vec![NodeVariant::Source(Box::new(
793            ConstantSource::new(NodeId(0), 1.0, 44100.0),
794        ))];
795
796        let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0], None, None);
797
798        let result = engine.process_block(&tick).unwrap();
799        assert_eq!(result, 0);
800    }
801
802    #[test]
803    fn test_engine_data_flows_source_to_sink() {
804        const BUF: usize = 64;
805
806        // Source → Processor → Sink with manual wiring
807        let mut nodes: Vec<NodeVariant<f32, BUF>> = Vec::new();
808
809        let mut source = Box::new(ConstantSource::new(NodeId(0), 42.0, 44100.0));
810        source.outputs[0].downstream.push((1, 0));
811        nodes.push(NodeVariant::Source(source));
812
813        let mut processor = Box::new(NoopProcessor::new(NodeId(1), 44100.0));
814        processor.outputs[0].downstream.push((2, 0));
815        nodes.push(NodeVariant::Processor(processor));
816
817        let sink_node = Box::new(CaptureSink::new(NodeId(2), 44100.0));
818        nodes.push(NodeVariant::Sink(sink_node));
819
820        let tick = ClockTick::new(0, BUF as u32, 44100.0);
821        let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0, 1, 2], None, None);
822
823        engine.process_block(&tick).unwrap();
824
825        // Check data flowed through Source → Processor → Sink
826        let sink_port = engine.nodes[2].input_port(0).expect("sink input port");
827        let buf = sink_port.buffer.as_array();
828        assert!(buf.iter().all(|&x| x == 42.0),
829            "sink input should be all 42.0, got {:?}", &buf[..5]);
830    }
831
832    // ------------------------------------------------------------------
833    // ADC Source — simulates hardware ADC by filling output with a
834    // block counter. Each block's first sample = block_index.
835    // ------------------------------------------------------------------
836    struct AdcSource<T: Transcendental, const BUF_SIZE: usize> {
837        id: NodeId,
838        block_count: u64,
839        state: NodeState<T, BUF_SIZE>,
840        outputs: Vec<Port<T, BUF_SIZE>>,
841    }
842
843    impl<T: Transcendental, const BUF_SIZE: usize> AdcSource<T, BUF_SIZE> {
844        fn new(id: NodeId, sample_rate: f32) -> Self {
845            let mut outputs = Vec::with_capacity(1);
846            outputs.push(Port {
847                id: PortId::audio_out(id, 0),
848                name: "adc_out".into(),
849                direction: PortDirection::Output,
850                action: None,
851                pending_command: None,
852                buffer: Default::default(),
853                feedback_buffer: None,
854                downstream: Vec::new(),
855                feedback_downstream: Vec::new(),
856            upstream_buffer: None,
857            });
858            Self {
859                id,
860                block_count: 0,
861                state: NodeState::new(sample_rate),
862                outputs,
863            }
864        }
865    }
866
867    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
868        for AdcSource<T, BUF_SIZE>
869    {
870        fn metadata(&self) -> NodeMetadata {
871            NodeMetadata { type_name: None,
872                name: "AdcSource".into(),
873                category: NodeCategory::Source,
874                description: String::new(),
875                author: String::new(),
876                version: "1.0".into(),
877                signal_inputs: 0,
878                signal_outputs: 1,
879                control_inputs: 0,
880                control_outputs: 0,
881                clock_inputs: 0,
882                clock_outputs: 0,
883                feedback_ports: 0,
884                parameters: vec![],
885            }
886        }
887        fn init(&mut self, _sample_rate: f32) {}
888        fn reset(&mut self) { self.block_count = 0; }
889        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> { None }
890        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> { Ok(()) }
891        fn id(&self) -> NodeId { self.id }
892        fn set_id(&mut self, _id: NodeId) {}
893        fn num_signal_outputs(&self) -> usize { 1 }
894        fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> { self.outputs.get(index) }
895        fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> { self.outputs.get_mut(index) }
896        fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> { None }
897        fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> { None }
898        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> { None }
899        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> { None }
900        fn state(&self) -> &NodeState<T, BUF_SIZE> { &self.state }
901        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> { &mut self.state }
902    }
903
904    impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE>
905        for AdcSource<T, BUF_SIZE>
906    {
907        fn generate(
908            &mut self,
909            _clock: &ClockTick,
910            _control_inputs: &[T],
911            _clock_inputs: &[ClockTick],
912        ) -> ProcessResult<()> {
913            let buf = self.outputs[0].buffer.as_mut_array();
914            let count = self.block_count;
915            for (i, sample) in buf.iter_mut().enumerate() {
916                *sample = T::from_f32(count as f32) + T::from_f32(i as f32);
917            }
918            self.block_count += 1;
919            Ok(())
920        }
921    }
922
923    // ------------------------------------------------------------------
924    // DAC Sink — simulates hardware DAC by capturing the last
925    // `capture_depth` blocks into a shared buffer.
926    // ------------------------------------------------------------------
927    struct DacSink<T: Transcendental, const BUF_SIZE: usize> {
928        id: NodeId,
929        state: NodeState<T, BUF_SIZE>,
930        inputs: Vec<Port<T, BUF_SIZE>>,
931        captured: Vec<T>,
932    }
933
934    impl<T: Transcendental, const BUF_SIZE: usize> DacSink<T, BUF_SIZE> {
935        fn new(id: NodeId, sample_rate: f32) -> Self {
936            let mut inputs = Vec::with_capacity(1);
937            inputs.push(Port {
938                id: PortId::audio_in(id, 0),
939                name: "dac_in".into(),
940                direction: PortDirection::Input,
941                action: None,
942                pending_command: None,
943                buffer: Default::default(),
944                feedback_buffer: None,
945                downstream: Vec::new(),
946                feedback_downstream: Vec::new(),
947            upstream_buffer: None,
948            });
949            Self {
950                id,
951                state: NodeState::new(sample_rate),
952                inputs,
953                captured: Vec::new(),
954            }
955        }
956        fn captured(&self) -> &[T] { &self.captured }
957    }
958
959    impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
960        for DacSink<T, BUF_SIZE>
961    {
962        fn metadata(&self) -> NodeMetadata {
963            NodeMetadata { type_name: None,
964                name: "DacSink".into(),
965                category: NodeCategory::Sink,
966                description: String::new(),
967                author: String::new(),
968                version: "1.0".into(),
969                signal_inputs: 1,
970                signal_outputs: 0,
971                control_inputs: 0,
972                control_outputs: 0,
973                clock_inputs: 0,
974                clock_outputs: 0,
975                feedback_ports: 0,
976                parameters: vec![],
977            }
978        }
979        fn init(&mut self, _sample_rate: f32) {}
980        fn reset(&mut self) { self.captured.clear(); }
981        fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> { None }
982        fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> { Ok(()) }
983        fn id(&self) -> NodeId { self.id }
984        fn set_id(&mut self, _id: NodeId) {}
985        fn num_signal_inputs(&self) -> usize { 1 }
986        fn input_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> { self.inputs.get(index) }
987        fn input_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> { self.inputs.get_mut(index) }
988        fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> { None }
989        fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> { None }
990        fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> { None }
991        fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> { None }
992        fn state(&self) -> &NodeState<T, BUF_SIZE> { &self.state }
993        fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> { &mut self.state }
994    }
995
996    impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE>
997        for DacSink<T, BUF_SIZE>
998    {
999        fn consume(
1000            &mut self,
1001            _clock: &ClockTick,
1002            signal_inputs: &[&[T; BUF_SIZE]],
1003            _control_inputs: &[T],
1004            _clock_inputs: &[ClockTick],
1005            _feedback_inputs: &[&[T; BUF_SIZE]],
1006        ) -> ProcessResult<()> {
1007            if let Some(input) = signal_inputs.first() {
1008                self.captured.extend_from_slice(*input);
1009            }
1010            Ok(())
1011        }
1012    }
1013
1014    // ==================================================================
1015    // Hardware clock simulation test
1016    //
1017    // Simulates a real ADC → DSP → DAC chain where the hardware clock
1018    // fires at regular intervals (block_size / sample_rate seconds).
1019    // Each call to process_block = one hardware clock tick.
1020    // No sleeps — pure deterministic processing loop.
1021    // ==================================================================
1022
1023    #[test]
1024    fn test_engine_hardware_clock_simulation() {
1025        const BUF: usize = 64;
1026        const NUM_BLOCKS: usize = 10;
1027
1028        // Build ADC → NoopProcessor → DAC chain
1029        let mut nodes: Vec<NodeVariant<f32, BUF>> = Vec::new();
1030
1031        let mut adc = Box::new(AdcSource::new(NodeId(0), 44100.0));
1032        adc.outputs[0].downstream.push((1, 0));
1033        nodes.push(NodeVariant::Source(adc));
1034
1035        let mut proc = Box::new(NoopProcessor::new(NodeId(1), 44100.0));
1036        proc.outputs[0].downstream.push((2, 0));
1037        nodes.push(NodeVariant::Processor(proc));
1038
1039        let dac = Box::new(DacSink::new(NodeId(2), 44100.0));
1040        nodes.push(NodeVariant::Sink(dac));
1041
1042        let mut tick = ClockTick::new(0, BUF as u32, 44100.0);
1043        let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0, 1, 2], None, None);
1044
1045        // Process N blocks — each call is one hardware clock tick
1046        for _ in 0..NUM_BLOCKS {
1047            engine.process_block(&tick).unwrap();
1048            tick.advance(BUF as u32);
1049        }
1050
1051        // Verify last block propagated to DAC sink
1052        let dac_port = engine.nodes[2].input_port(0).expect("dac input");
1053        let last_block = dac_port.buffer.as_array();
1054
1055        // Block 9 (0-indexed): ADC writes block_count to first sample,
1056        // then block_count + sample_index for subsequent samples
1057        assert_eq!(last_block[0], (NUM_BLOCKS - 1) as f32,
1058            "first sample of block {} should be {}", NUM_BLOCKS - 1, NUM_BLOCKS - 1);
1059        assert_eq!(last_block[BUF - 1], (NUM_BLOCKS - 1 + BUF - 1) as f32);
1060    }
1061
1062    // ==================================================================
1063    // Pull model test — active Sink, passive Source.
1064    //
1065    // In the pull model, the hardware DAC clock drives processing:
1066    // the Sink receives the clock tick and data flows from Source
1067    // through the graph to the Sink.
1068    //
1069    // `process_tick` handles the clock boundary (feedback, commands),
1070    // then `process_block` runs the topo-order processing.
1071    // The Sink is semantically "active" — it pulls data by virtue
1072    // of the clock reaching it through the graph.
1073    // ==================================================================
1074
1075    #[test]
1076    fn test_engine_pull_model_active_sink() {
1077        const BUF: usize = 64;
1078        const NUM_BLOCKS: usize = 5;
1079
1080        // Passive Source — generates on each tick
1081        let mut nodes: Vec<NodeVariant<f32, BUF>> = Vec::new();
1082
1083        let mut src = Box::new(ConstantSource::new(NodeId(0), 1.0, 44100.0));
1084        src.outputs[0].downstream.push((1, 0));
1085        nodes.push(NodeVariant::Source(src));
1086
1087        let mut proc = Box::new(NoopProcessor::new(NodeId(1), 44100.0));
1088        proc.outputs[0].downstream.push((2, 0));
1089        nodes.push(NodeVariant::Processor(proc));
1090
1091        // Active Sink — simulates DAC pulling data
1092        let dac = Box::new(DacSink::new(NodeId(2), 44100.0));
1093        nodes.push(NodeVariant::Sink(dac));
1094
1095        let mut tick = ClockTick::new(0, BUF as u32, 44100.0);
1096        let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0, 1, 2], None, None);
1097
1098        // The clock fires — Sink is the active node in the pull model
1099        for _ in 0..NUM_BLOCKS {
1100            engine.process_block(&tick).unwrap();
1101            tick.advance(BUF as u32);
1102        }
1103
1104        // Verify data reached the Sink — constant 1.0 from source
1105        let dac_port = engine.nodes[2].input_port(0).expect("dac input");
1106        let last_block = dac_port.buffer.as_array();
1107        assert!(last_block.iter().all(|&x| x == 1.0),
1108            "pull model: sink should receive 1.0, got {:?}", &last_block[..3]);
1109    }
1110}