1use crossbeam_channel::{Receiver, Sender, TryRecvError};
2use rill_core::math::Transcendental;
3use rill_core::queues::signal::CommandEnum;
4use rill_core::queues::telemetry::Telemetry;
5use rill_core::time::ClockTick;
6use rill_core::traits::processable::{NodeVariant, Processable};
7use rill_core::traits::port::Port;
8use rill_core::traits::{SignalNode, PortId, ProcessResult};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::thread;
12
13pub struct SignalEngine<T: Transcendental, const BUF_SIZE: usize> {
33 nodes: Vec<NodeVariant<T, BUF_SIZE>>,
34 topo_order: Vec<usize>,
35 cmd_slots: Vec<Option<CommandEnum>>,
36 cmd_rx: Option<Receiver<CommandEnum>>,
37 tel_tx: Option<Sender<Telemetry>>,
38 running: Arc<AtomicBool>,
39}
40
41impl<T: Transcendental, const BUF_SIZE: usize> SignalEngine<T, BUF_SIZE> {
42 pub fn new(
44 nodes: Vec<NodeVariant<T, BUF_SIZE>>,
45 topo_order: Vec<usize>,
46 cmd_rx: Option<Receiver<CommandEnum>>,
47 tel_tx: Option<Sender<Telemetry>>,
48 ) -> Self {
49 let node_count = nodes.len();
50 Self {
51 nodes,
52 topo_order,
53 cmd_slots: vec![None; node_count.max(1)],
54 cmd_rx,
55 tel_tx,
56 running: Arc::new(AtomicBool::new(false)),
57 }
58 }
59
60 pub fn process_tick(&mut self, tick: &ClockTick) -> usize {
68 let mut applied = 0usize;
69
70 if let Some(ref rx) = self.cmd_rx {
72 loop {
73 let cmd = match rx.try_recv() {
74 Ok(cmd) => cmd,
75 Err(TryRecvError::Empty) => break,
76 Err(TryRecvError::Disconnected) => {
77 self.cmd_rx = None;
78 break;
79 }
80 };
81
82 let nid = match cmd.target_node_id() {
83 Some(id) => id.inner() as usize,
84 None => continue,
85 };
86
87 if nid >= self.cmd_slots.len() {
88 continue;
89 }
90
91 if self.cmd_slots[nid].is_some() {
93 let _ = self.tel_tx.as_ref().map(|tx| {
94 let _ = tx.try_send(Telemetry::event(
95 "engine",
96 "command_dropped",
97 vec![nid as f32],
98 ));
99 });
100 }
101
102 self.cmd_slots[nid] = Some(cmd);
103 }
104 }
105
106 for &idx in &self.topo_order {
108 let num_inputs = self.nodes[idx].num_signal_inputs();
109 for pi in 0..num_inputs {
110 if let Some(port) = self.nodes[idx].input_port_mut(pi) {
111 port.pre_process(tick);
112 }
113 }
114 }
115
116 for &idx in &self.topo_order {
118 if let Some(cmd) = self.cmd_slots[idx].take() {
119 if let Some(sp) = cmd.as_set_parameter() {
120 let _ = self.nodes[idx].apply_set_parameter(sp);
121 applied += 1;
122 }
123 }
124 }
125
126 applied
127 }
128
129 #[allow(unsafe_code)]
139 pub fn process_block(&mut self, tick: &ClockTick) -> ProcessResult<usize> {
140 let applied = self.process_tick(tick);
141
142 for &idx in &self.topo_order {
143 let mut copy_bufs: Vec<[T; BUF_SIZE]> = Vec::new();
148 let mut audio_refs: Vec<&[T; BUF_SIZE]> = Vec::new();
149 for pi in 0..self.nodes[idx].num_signal_inputs() {
150 if let Some(port) = self.nodes[idx].input_port(pi) {
151 match port.upstream_buffer {
152 Some(ptr) => {
153 let buf = unsafe { Port::upstream_ref(ptr) };
156 audio_refs.push(buf.as_array());
157 }
158 None => {
159 copy_bufs.push(*port.buffer.as_array());
160 }
161 }
162 }
163 }
164 for buf in ©_bufs {
166 audio_refs.push(buf);
167 }
168
169 let owned_control: Vec<T> = (0..self.nodes[idx].num_control_inputs())
170 .filter_map(|ci| self.nodes[idx].control_port(ci))
171 .map(|p| *p.buffer.as_array().first().unwrap_or(&T::ZERO))
172 .collect();
173
174 let owned_clock: Vec<ClockTick> = (0..self.nodes[idx].num_clock_inputs())
175 .map(|_| *tick)
176 .collect();
177
178 let owned_feedback: Vec<[T; BUF_SIZE]> = (0..self.nodes[idx].num_feedback_ports())
179 .filter_map(|fi| self.nodes[idx].input_port(fi))
180 .map(|p| *p.buffer.as_array())
181 .collect();
182 let feedback_refs: Vec<&[T; BUF_SIZE]> = owned_feedback.iter().collect();
183
184 let mut ctx = rill_core::traits::processable::ProcessContext {
185 clock: tick,
186 signal_inputs: &audio_refs,
187 control_inputs: &owned_control,
188 clock_inputs: &owned_clock,
189 feedback_inputs: &feedback_refs,
190 };
191
192 self.nodes[idx].process_block(&mut ctx)?;
193
194 let num_outputs = self.nodes[idx].num_signal_outputs();
195 for po in 0..num_outputs {
196 if let Some(port) = self.nodes[idx].output_port_mut(po) {
197 port.snapshot_feedback();
198 }
199 }
200
201 for po in 0..num_outputs {
204 let (downstream, data) = match self.nodes[idx].output_port(po) {
205 Some(port) if !port.downstream.is_empty() => {
206 (port.downstream.clone(), *port.buffer.as_array())
207 }
208 _ => continue,
209 };
210 for &(to_n, to_p) in &downstream {
211 if let Some(port) = self.nodes[to_n].input_port_mut(to_p) {
212 if port.upstream_buffer.is_some() {
213 continue;
216 }
217 let buf = port.buffer.as_mut_array();
218 *buf = data;
219 }
220 }
221 }
222 }
223
224 Ok(applied)
225 }
226
227 pub fn nodes(&self) -> &[NodeVariant<T, BUF_SIZE>] {
230 &self.nodes
231 }
232
233 pub fn nodes_mut(&mut self) -> &mut [NodeVariant<T, BUF_SIZE>] {
235 &mut self.nodes
236 }
237
238 pub fn topo_order(&self) -> &[usize] {
240 &self.topo_order
241 }
242
243 pub fn start(&mut self) {
245 self.running.store(true, Ordering::SeqCst);
246 }
247
248 pub fn stop(&self) {
250 self.running.store(false, Ordering::SeqCst);
251 }
252
253 pub fn is_running(&self) -> bool {
255 self.running.load(Ordering::SeqCst)
256 }
257
258 pub fn spawn(mut self) -> std::thread::JoinHandle<()> {
265 let running = self.running.clone();
266 running.store(true, Ordering::SeqCst);
267
268 thread::Builder::new()
269 .name("rill-signal".into())
270 .spawn(move || {
271 let mut tick = ClockTick::new(0, BUF_SIZE as u32, 44100.0);
272 while running.load(Ordering::SeqCst) {
273 if let Err(e) = self.process_block(&tick) {
274 log::error!("SignalEngine error: {:?}", e);
275 break;
276 }
277 tick.advance(BUF_SIZE as u32);
278 }
279 })
280 .expect("failed to spawn rill-signal thread")
281 }
282
283 pub fn running_flag(&self) -> Arc<AtomicBool> {
285 self.running.clone()
286 }
287
288 pub fn attach_command_rx(&mut self, rx: Receiver<CommandEnum>) {
290 self.cmd_rx = Some(rx);
291 }
292
293 pub fn attach_telemetry_tx(&mut self, tx: Sender<Telemetry>) {
295 self.tel_tx = Some(tx);
296 }
297
298 pub fn cmd_slots(&self) -> &[Option<CommandEnum>] {
301 &self.cmd_slots
302 }
303
304 pub fn cmd_slots_mut(&mut self) -> &mut [Option<CommandEnum>] {
306 &mut self.cmd_slots
307 }
308}
309
310#[cfg(test)]
315mod tests {
316 use super::*;
317 use rill_core::math::Transcendental;
318 use rill_core::queues::signal::{AutomatonCommand, SetParameter, SignalSource};
319 use rill_core::queues::CommandQueue;
320 use rill_core::traits::{
321 SignalNode, NodeCategory, NodeId, NodeMetadata, NodeState, ParamValue, ParameterId, Port,
322 PortDirection, PortId, ProcessResult, Processor, Sink, Source,
323 };
324
325 struct ConstantSource<T: Transcendental, const BUF_SIZE: usize> {
329 id: NodeId,
330 value: T,
331 state: NodeState<T, BUF_SIZE>,
332 outputs: Vec<Port<T, BUF_SIZE>>,
333 }
334
335 impl<T: Transcendental, const BUF_SIZE: usize> ConstantSource<T, BUF_SIZE> {
336 fn new(id: NodeId, value: T, sample_rate: f32) -> Self {
337 let mut outputs = Vec::with_capacity(1);
338 outputs.push(Port {
339 id: PortId::audio_out(id, 0),
340 name: "output".into(),
341 direction: PortDirection::Output,
342 action: None,
343 pending_command: None,
344 buffer: Default::default(),
345 feedback_buffer: None,
346 downstream: Vec::new(),
347 feedback_downstream: Vec::new(),
348 upstream_buffer: None,
349 });
350 Self {
351 id,
352 value,
353 state: NodeState::new(sample_rate),
354 outputs,
355 }
356 }
357 }
358
359 impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
360 for ConstantSource<T, BUF_SIZE>
361 {
362 fn metadata(&self) -> NodeMetadata {
363 NodeMetadata { type_name: None,
364 name: "ConstantSource".into(),
365 category: NodeCategory::Source,
366 description: String::new(),
367 author: String::new(),
368 version: "1.0".into(),
369 signal_inputs: 0,
370 signal_outputs: 1,
371 control_inputs: 0,
372 control_outputs: 0,
373 clock_inputs: 0,
374 clock_outputs: 0,
375 feedback_ports: 0,
376 parameters: vec![],
377 }
378 }
379 fn init(&mut self, _sample_rate: f32) {}
380 fn reset(&mut self) {}
381 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
382 None
383 }
384 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
385 Ok(())
386 }
387 fn id(&self) -> NodeId {
388 self.id
389 }
390 fn set_id(&mut self, _id: NodeId) {}
391 fn num_signal_outputs(&self) -> usize {
392 1
393 }
394 fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
395 None
396 }
397 fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
398 None
399 }
400 fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
401 self.outputs.get(index)
402 }
403 fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
404 self.outputs.get_mut(index)
405 }
406 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
407 None
408 }
409 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
410 None
411 }
412 fn state(&self) -> &NodeState<T, BUF_SIZE> {
413 &self.state
414 }
415 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
416 &mut self.state
417 }
418 }
419
420 impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE>
421 for ConstantSource<T, BUF_SIZE>
422 {
423 fn generate(
424 &mut self,
425 _clock: &ClockTick,
426 _control_inputs: &[T],
427 _clock_inputs: &[ClockTick],
428 ) -> ProcessResult<()> {
429 let buf = self.outputs[0].buffer.as_mut_array();
430 for sample in buf.iter_mut() {
431 *sample = self.value;
432 }
433 Ok(())
434 }
435 }
436
437 struct NoopProcessor<T: Transcendental, const BUF_SIZE: usize> {
441 id: NodeId,
442 state: NodeState<T, BUF_SIZE>,
443 inputs: Vec<Port<T, BUF_SIZE>>,
444 outputs: Vec<Port<T, BUF_SIZE>>,
445 }
446
447 impl<T: Transcendental, const BUF_SIZE: usize> NoopProcessor<T, BUF_SIZE> {
448 fn new(id: NodeId, sample_rate: f32) -> Self {
449 let mut inputs = Vec::with_capacity(1);
450 inputs.push(Port {
451 id: PortId::audio_in(id, 0),
452 name: "input".into(),
453 direction: PortDirection::Input,
454 action: None,
455 pending_command: None,
456 buffer: Default::default(),
457 feedback_buffer: None,
458 downstream: Vec::new(),
459 feedback_downstream: Vec::new(),
460 upstream_buffer: None,
461 });
462 let mut outputs = Vec::with_capacity(1);
463 outputs.push(Port {
464 id: PortId::audio_out(id, 0),
465 name: "output".into(),
466 direction: PortDirection::Output,
467 action: None,
468 pending_command: None,
469 buffer: Default::default(),
470 feedback_buffer: None,
471 downstream: Vec::new(),
472 feedback_downstream: Vec::new(),
473 upstream_buffer: None,
474 });
475 Self {
476 id,
477 state: NodeState::new(sample_rate),
478 inputs,
479 outputs,
480 }
481 }
482 }
483
484 impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
485 for NoopProcessor<T, BUF_SIZE>
486 {
487 fn metadata(&self) -> NodeMetadata {
488 NodeMetadata { type_name: None,
489 name: "NoopProcessor".into(),
490 category: NodeCategory::Processor,
491 description: String::new(),
492 author: String::new(),
493 version: "1.0".into(),
494 signal_inputs: 1,
495 signal_outputs: 1,
496 control_inputs: 0,
497 control_outputs: 0,
498 clock_inputs: 0,
499 clock_outputs: 0,
500 feedback_ports: 0,
501 parameters: vec![],
502 }
503 }
504 fn init(&mut self, _sample_rate: f32) {}
505 fn reset(&mut self) {}
506 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
507 None
508 }
509 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
510 Ok(())
511 }
512 fn id(&self) -> NodeId {
513 self.id
514 }
515 fn set_id(&mut self, _id: NodeId) {}
516 fn num_signal_inputs(&self) -> usize {
517 1
518 }
519 fn num_signal_outputs(&self) -> usize {
520 1
521 }
522 fn input_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
523 self.inputs.get(index)
524 }
525 fn input_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
526 self.inputs.get_mut(index)
527 }
528 fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
529 self.outputs.get(index)
530 }
531 fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
532 self.outputs.get_mut(index)
533 }
534 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
535 None
536 }
537 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
538 None
539 }
540 fn state(&self) -> &NodeState<T, BUF_SIZE> {
541 &self.state
542 }
543 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
544 &mut self.state
545 }
546 }
547
548 impl<T: Transcendental, const BUF_SIZE: usize> Processor<T, BUF_SIZE>
549 for NoopProcessor<T, BUF_SIZE>
550 {
551 fn process(
552 &mut self,
553 _clock: &ClockTick,
554 signal_inputs: &[&[T; BUF_SIZE]],
555 _control_inputs: &[T],
556 _clock_inputs: &[ClockTick],
557 _feedback_inputs: &[&[T; BUF_SIZE]],
558 ) -> ProcessResult<()> {
559 let output = self.outputs[0].buffer.as_mut_array();
560 if let Some(input) = signal_inputs.first() {
561 output.copy_from_slice(*input);
562 }
563 Ok(())
564 }
565 }
566
567 struct CaptureSink<T: Transcendental, const BUF_SIZE: usize> {
571 id: NodeId,
572 state: NodeState<T, BUF_SIZE>,
573 inputs: Vec<Port<T, BUF_SIZE>>,
574 captured: Vec<T>,
575 }
576
577 impl<T: Transcendental, const BUF_SIZE: usize> CaptureSink<T, BUF_SIZE> {
578 fn new(id: NodeId, sample_rate: f32) -> Self {
579 let mut inputs = Vec::with_capacity(1);
580 inputs.push(Port {
581 id: PortId::audio_in(id, 0),
582 name: "input".into(),
583 direction: PortDirection::Input,
584 action: None,
585 pending_command: None,
586 buffer: Default::default(),
587 feedback_buffer: None,
588 downstream: Vec::new(),
589 feedback_downstream: Vec::new(),
590 upstream_buffer: None,
591 });
592 Self {
593 id,
594 state: NodeState::new(sample_rate),
595 inputs,
596 captured: Vec::new(),
597 }
598 }
599
600 #[allow(dead_code)]
601 fn captured(&self) -> &[T] {
602 &self.captured
603 }
604 }
605
606 impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
607 for CaptureSink<T, BUF_SIZE>
608 {
609 fn metadata(&self) -> NodeMetadata {
610 NodeMetadata { type_name: None,
611 name: "CaptureSink".into(),
612 category: NodeCategory::Sink,
613 description: String::new(),
614 author: String::new(),
615 version: "1.0".into(),
616 signal_inputs: 1,
617 signal_outputs: 0,
618 control_inputs: 0,
619 control_outputs: 0,
620 clock_inputs: 0,
621 clock_outputs: 0,
622 feedback_ports: 0,
623 parameters: vec![],
624 }
625 }
626 fn init(&mut self, _sample_rate: f32) {}
627 fn reset(&mut self) {}
628 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> {
629 None
630 }
631 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> {
632 Ok(())
633 }
634 fn id(&self) -> NodeId {
635 self.id
636 }
637 fn set_id(&mut self, _id: NodeId) {}
638 fn num_signal_inputs(&self) -> usize {
639 1
640 }
641 fn input_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> {
642 self.inputs.get(index)
643 }
644 fn input_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
645 self.inputs.get_mut(index)
646 }
647 fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
648 None
649 }
650 fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
651 None
652 }
653 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> {
654 None
655 }
656 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> {
657 None
658 }
659 fn state(&self) -> &NodeState<T, BUF_SIZE> {
660 &self.state
661 }
662 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> {
663 &mut self.state
664 }
665 }
666
667 impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE>
668 for CaptureSink<T, BUF_SIZE>
669 {
670 fn consume(
671 &mut self,
672 _clock: &ClockTick,
673 signal_inputs: &[&[T; BUF_SIZE]],
674 _control_inputs: &[T],
675 _clock_inputs: &[ClockTick],
676 _feedback_inputs: &[&[T; BUF_SIZE]],
677 ) -> ProcessResult<()> {
678 if let Some(input) = signal_inputs.first() {
679 self.captured = input.to_vec();
680 }
681 Ok(())
682 }
683 }
684
685 #[test]
690 fn test_engine_process_tick_drains_commands() {
691 const BUF: usize = 64;
692 let cmd_queue = CommandQueue::<CommandEnum>::new("test", 16);
693 let cmd_rx = cmd_queue.receiver();
694 let tick = ClockTick::new(0, BUF as u32, 44100.0);
695
696 let nodes: Vec<NodeVariant<f32, BUF>> = vec![NodeVariant::Source(Box::new(
697 ConstantSource::new(NodeId(0), 1.0, 44100.0),
698 ))];
699
700 let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0], Some(cmd_rx), None);
701
702 let cmd = CommandEnum::SetParameter(SetParameter::new(
703 PortId::param(NodeId(0), 0),
704 ParameterId::new("gain").unwrap(),
705 0.5,
706 SignalSource::Manual,
707 ));
708 cmd_queue.send(cmd).unwrap();
709
710 let applied = engine.process_tick(&tick);
711 assert_eq!(applied, 1);
712 }
713
714 #[test]
715 fn test_engine_process_tick_anti_ack() {
716 const BUF: usize = 64;
717 let cmd_queue = CommandQueue::<CommandEnum>::new("test", 16);
718 let cmd_rx = cmd_queue.receiver();
719 let (tel_tx, tel_rx) = crossbeam_channel::unbounded();
720 let tick = ClockTick::new(0, BUF as u32, 44100.0);
721
722 let nodes: Vec<NodeVariant<f32, BUF>> = vec![NodeVariant::Source(Box::new(
723 ConstantSource::new(NodeId(0), 1.0, 44100.0),
724 ))];
725
726 let mut engine = SignalEngine::<f32, BUF>::new(
727 nodes,
728 vec![0],
729 Some(cmd_rx),
730 Some(tel_tx),
731 );
732
733 let pid = ParameterId::new("gain").unwrap();
734 let cmd1 = CommandEnum::SetParameter(SetParameter::new(
735 PortId::param(NodeId(0), 0),
736 pid.clone(),
737 0.3,
738 SignalSource::Manual,
739 ));
740 let cmd2 = CommandEnum::SetParameter(SetParameter::new(
741 PortId::param(NodeId(0), 0),
742 pid,
743 0.8,
744 SignalSource::Manual,
745 ));
746 cmd_queue.send(cmd1).unwrap();
747 cmd_queue.send(cmd2).unwrap();
748
749 let applied = engine.process_tick(&tick);
750
751 assert_eq!(applied, 1);
753
754 let tel = tel_rx.try_recv().unwrap();
755 match tel {
756 Telemetry::Event { kind, data, .. } => {
757 assert_eq!(kind, "command_dropped");
758 assert_eq!(data, vec![0.0]);
759 }
760 _ => panic!("expected Event telemetry"),
761 }
762 }
763
764 #[test]
765 fn test_engine_process_tick_skips_non_set_parameter() {
766 const BUF: usize = 64;
767 let cmd_queue = CommandQueue::<CommandEnum>::new("test", 16);
768 let cmd_rx = cmd_queue.receiver();
769 let tick = ClockTick::new(0, BUF as u32, 44100.0);
770
771 let nodes: Vec<NodeVariant<f32, BUF>> = vec![NodeVariant::Source(Box::new(
772 ConstantSource::new(NodeId(0), 1.0, 44100.0),
773 ))];
774
775 let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0], Some(cmd_rx), None);
776
777 let cmd = CommandEnum::Automaton(AutomatonCommand::SetEnabled {
778 id: "test".into(),
779 enabled: true,
780 });
781 cmd_queue.send(cmd).unwrap();
782
783 let applied = engine.process_tick(&tick);
784 assert_eq!(applied, 0);
785 }
786
787 #[test]
788 fn test_engine_process_block_is_convenience_method() {
789 const BUF: usize = 64;
790 let tick = ClockTick::new(0, BUF as u32, 44100.0);
791
792 let nodes: Vec<NodeVariant<f32, BUF>> = vec![NodeVariant::Source(Box::new(
793 ConstantSource::new(NodeId(0), 1.0, 44100.0),
794 ))];
795
796 let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0], None, None);
797
798 let result = engine.process_block(&tick).unwrap();
799 assert_eq!(result, 0);
800 }
801
802 #[test]
803 fn test_engine_data_flows_source_to_sink() {
804 const BUF: usize = 64;
805
806 let mut nodes: Vec<NodeVariant<f32, BUF>> = Vec::new();
808
809 let mut source = Box::new(ConstantSource::new(NodeId(0), 42.0, 44100.0));
810 source.outputs[0].downstream.push((1, 0));
811 nodes.push(NodeVariant::Source(source));
812
813 let mut processor = Box::new(NoopProcessor::new(NodeId(1), 44100.0));
814 processor.outputs[0].downstream.push((2, 0));
815 nodes.push(NodeVariant::Processor(processor));
816
817 let sink_node = Box::new(CaptureSink::new(NodeId(2), 44100.0));
818 nodes.push(NodeVariant::Sink(sink_node));
819
820 let tick = ClockTick::new(0, BUF as u32, 44100.0);
821 let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0, 1, 2], None, None);
822
823 engine.process_block(&tick).unwrap();
824
825 let sink_port = engine.nodes[2].input_port(0).expect("sink input port");
827 let buf = sink_port.buffer.as_array();
828 assert!(buf.iter().all(|&x| x == 42.0),
829 "sink input should be all 42.0, got {:?}", &buf[..5]);
830 }
831
832 struct AdcSource<T: Transcendental, const BUF_SIZE: usize> {
837 id: NodeId,
838 block_count: u64,
839 state: NodeState<T, BUF_SIZE>,
840 outputs: Vec<Port<T, BUF_SIZE>>,
841 }
842
843 impl<T: Transcendental, const BUF_SIZE: usize> AdcSource<T, BUF_SIZE> {
844 fn new(id: NodeId, sample_rate: f32) -> Self {
845 let mut outputs = Vec::with_capacity(1);
846 outputs.push(Port {
847 id: PortId::audio_out(id, 0),
848 name: "adc_out".into(),
849 direction: PortDirection::Output,
850 action: None,
851 pending_command: None,
852 buffer: Default::default(),
853 feedback_buffer: None,
854 downstream: Vec::new(),
855 feedback_downstream: Vec::new(),
856 upstream_buffer: None,
857 });
858 Self {
859 id,
860 block_count: 0,
861 state: NodeState::new(sample_rate),
862 outputs,
863 }
864 }
865 }
866
867 impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
868 for AdcSource<T, BUF_SIZE>
869 {
870 fn metadata(&self) -> NodeMetadata {
871 NodeMetadata { type_name: None,
872 name: "AdcSource".into(),
873 category: NodeCategory::Source,
874 description: String::new(),
875 author: String::new(),
876 version: "1.0".into(),
877 signal_inputs: 0,
878 signal_outputs: 1,
879 control_inputs: 0,
880 control_outputs: 0,
881 clock_inputs: 0,
882 clock_outputs: 0,
883 feedback_ports: 0,
884 parameters: vec![],
885 }
886 }
887 fn init(&mut self, _sample_rate: f32) {}
888 fn reset(&mut self) { self.block_count = 0; }
889 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> { None }
890 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> { Ok(()) }
891 fn id(&self) -> NodeId { self.id }
892 fn set_id(&mut self, _id: NodeId) {}
893 fn num_signal_outputs(&self) -> usize { 1 }
894 fn output_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> { self.outputs.get(index) }
895 fn output_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> { self.outputs.get_mut(index) }
896 fn input_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> { None }
897 fn input_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> { None }
898 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> { None }
899 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> { None }
900 fn state(&self) -> &NodeState<T, BUF_SIZE> { &self.state }
901 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> { &mut self.state }
902 }
903
904 impl<T: Transcendental, const BUF_SIZE: usize> Source<T, BUF_SIZE>
905 for AdcSource<T, BUF_SIZE>
906 {
907 fn generate(
908 &mut self,
909 _clock: &ClockTick,
910 _control_inputs: &[T],
911 _clock_inputs: &[ClockTick],
912 ) -> ProcessResult<()> {
913 let buf = self.outputs[0].buffer.as_mut_array();
914 let count = self.block_count;
915 for (i, sample) in buf.iter_mut().enumerate() {
916 *sample = T::from_f32(count as f32) + T::from_f32(i as f32);
917 }
918 self.block_count += 1;
919 Ok(())
920 }
921 }
922
923 struct DacSink<T: Transcendental, const BUF_SIZE: usize> {
928 id: NodeId,
929 state: NodeState<T, BUF_SIZE>,
930 inputs: Vec<Port<T, BUF_SIZE>>,
931 captured: Vec<T>,
932 }
933
934 impl<T: Transcendental, const BUF_SIZE: usize> DacSink<T, BUF_SIZE> {
935 fn new(id: NodeId, sample_rate: f32) -> Self {
936 let mut inputs = Vec::with_capacity(1);
937 inputs.push(Port {
938 id: PortId::audio_in(id, 0),
939 name: "dac_in".into(),
940 direction: PortDirection::Input,
941 action: None,
942 pending_command: None,
943 buffer: Default::default(),
944 feedback_buffer: None,
945 downstream: Vec::new(),
946 feedback_downstream: Vec::new(),
947 upstream_buffer: None,
948 });
949 Self {
950 id,
951 state: NodeState::new(sample_rate),
952 inputs,
953 captured: Vec::new(),
954 }
955 }
956 fn captured(&self) -> &[T] { &self.captured }
957 }
958
959 impl<T: Transcendental, const BUF_SIZE: usize> SignalNode<T, BUF_SIZE>
960 for DacSink<T, BUF_SIZE>
961 {
962 fn metadata(&self) -> NodeMetadata {
963 NodeMetadata { type_name: None,
964 name: "DacSink".into(),
965 category: NodeCategory::Sink,
966 description: String::new(),
967 author: String::new(),
968 version: "1.0".into(),
969 signal_inputs: 1,
970 signal_outputs: 0,
971 control_inputs: 0,
972 control_outputs: 0,
973 clock_inputs: 0,
974 clock_outputs: 0,
975 feedback_ports: 0,
976 parameters: vec![],
977 }
978 }
979 fn init(&mut self, _sample_rate: f32) {}
980 fn reset(&mut self) { self.captured.clear(); }
981 fn get_parameter(&self, _id: &ParameterId) -> Option<ParamValue> { None }
982 fn set_parameter(&mut self, _id: &ParameterId, _value: ParamValue) -> ProcessResult<()> { Ok(()) }
983 fn id(&self) -> NodeId { self.id }
984 fn set_id(&mut self, _id: NodeId) {}
985 fn num_signal_inputs(&self) -> usize { 1 }
986 fn input_port(&self, index: usize) -> Option<&Port<T, BUF_SIZE>> { self.inputs.get(index) }
987 fn input_port_mut(&mut self, index: usize) -> Option<&mut Port<T, BUF_SIZE>> { self.inputs.get_mut(index) }
988 fn output_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> { None }
989 fn output_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> { None }
990 fn control_port(&self, _index: usize) -> Option<&Port<T, BUF_SIZE>> { None }
991 fn control_port_mut(&mut self, _index: usize) -> Option<&mut Port<T, BUF_SIZE>> { None }
992 fn state(&self) -> &NodeState<T, BUF_SIZE> { &self.state }
993 fn state_mut(&mut self) -> &mut NodeState<T, BUF_SIZE> { &mut self.state }
994 }
995
996 impl<T: Transcendental, const BUF_SIZE: usize> Sink<T, BUF_SIZE>
997 for DacSink<T, BUF_SIZE>
998 {
999 fn consume(
1000 &mut self,
1001 _clock: &ClockTick,
1002 signal_inputs: &[&[T; BUF_SIZE]],
1003 _control_inputs: &[T],
1004 _clock_inputs: &[ClockTick],
1005 _feedback_inputs: &[&[T; BUF_SIZE]],
1006 ) -> ProcessResult<()> {
1007 if let Some(input) = signal_inputs.first() {
1008 self.captured.extend_from_slice(*input);
1009 }
1010 Ok(())
1011 }
1012 }
1013
1014 #[test]
1024 fn test_engine_hardware_clock_simulation() {
1025 const BUF: usize = 64;
1026 const NUM_BLOCKS: usize = 10;
1027
1028 let mut nodes: Vec<NodeVariant<f32, BUF>> = Vec::new();
1030
1031 let mut adc = Box::new(AdcSource::new(NodeId(0), 44100.0));
1032 adc.outputs[0].downstream.push((1, 0));
1033 nodes.push(NodeVariant::Source(adc));
1034
1035 let mut proc = Box::new(NoopProcessor::new(NodeId(1), 44100.0));
1036 proc.outputs[0].downstream.push((2, 0));
1037 nodes.push(NodeVariant::Processor(proc));
1038
1039 let dac = Box::new(DacSink::new(NodeId(2), 44100.0));
1040 nodes.push(NodeVariant::Sink(dac));
1041
1042 let mut tick = ClockTick::new(0, BUF as u32, 44100.0);
1043 let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0, 1, 2], None, None);
1044
1045 for _ in 0..NUM_BLOCKS {
1047 engine.process_block(&tick).unwrap();
1048 tick.advance(BUF as u32);
1049 }
1050
1051 let dac_port = engine.nodes[2].input_port(0).expect("dac input");
1053 let last_block = dac_port.buffer.as_array();
1054
1055 assert_eq!(last_block[0], (NUM_BLOCKS - 1) as f32,
1058 "first sample of block {} should be {}", NUM_BLOCKS - 1, NUM_BLOCKS - 1);
1059 assert_eq!(last_block[BUF - 1], (NUM_BLOCKS - 1 + BUF - 1) as f32);
1060 }
1061
1062 #[test]
1076 fn test_engine_pull_model_active_sink() {
1077 const BUF: usize = 64;
1078 const NUM_BLOCKS: usize = 5;
1079
1080 let mut nodes: Vec<NodeVariant<f32, BUF>> = Vec::new();
1082
1083 let mut src = Box::new(ConstantSource::new(NodeId(0), 1.0, 44100.0));
1084 src.outputs[0].downstream.push((1, 0));
1085 nodes.push(NodeVariant::Source(src));
1086
1087 let mut proc = Box::new(NoopProcessor::new(NodeId(1), 44100.0));
1088 proc.outputs[0].downstream.push((2, 0));
1089 nodes.push(NodeVariant::Processor(proc));
1090
1091 let dac = Box::new(DacSink::new(NodeId(2), 44100.0));
1093 nodes.push(NodeVariant::Sink(dac));
1094
1095 let mut tick = ClockTick::new(0, BUF as u32, 44100.0);
1096 let mut engine = SignalEngine::<f32, BUF>::new(nodes, vec![0, 1, 2], None, None);
1097
1098 for _ in 0..NUM_BLOCKS {
1100 engine.process_block(&tick).unwrap();
1101 tick.advance(BUF as u32);
1102 }
1103
1104 let dac_port = engine.nodes[2].input_port(0).expect("dac input");
1106 let last_block = dac_port.buffer.as_array();
1107 assert!(last_block.iter().all(|&x| x == 1.0),
1108 "pull model: sink should receive 1.0, got {:?}", &last_block[..3]);
1109 }
1110}