rill_patchbay/
automaton_task.rs1use std::time::Duration;
8
9use tokio::sync::mpsc;
10use tokio::sync::watch;
11
12use crate::engine::{Automaton, Time};
13use rill_core::traits::ParamValue;
14
15pub fn spawn_automaton_task<A>(
27 automaton: A,
28 interval: Duration,
29 value_tx: mpsc::Sender<f64>,
30 cancel_rx: watch::Receiver<bool>,
31) -> tokio::task::JoinHandle<()>
32where
33 A: Automaton + 'static,
34{
35 tokio::spawn(automaton_loop(automaton, interval, value_tx, cancel_rx))
36}
37
38async fn automaton_loop<A>(
39 automaton: A,
40 interval: Duration,
41 value_tx: mpsc::Sender<f64>,
42 mut cancel_rx: watch::Receiver<bool>,
43) where
44 A: Automaton,
45{
46 let mut internal = automaton.initial_internal();
47 let mut current = ParamValue::Float(0.0);
48 let mut time: Time = 0.0;
49 let mut ticker = tokio::time::interval(interval);
50 ticker.tick().await;
52
53 loop {
54 tokio::select! {
55 _ = ticker.tick() => {
56 time += interval.as_secs_f64();
57 current = automaton.step(&mut internal, ¤t, time, &A::Action::default());
58 let value = current.as_f32().unwrap_or(0.0) as f64;
59 if value_tx.send(value).await.is_err() {
60 break;
62 }
63 }
64
65 _ = cancel_rx.changed() => {
66 if *cancel_rx.borrow() {
67 break;
68 }
69 }
70 }
71 }
72}
73
74#[cfg(test)]
79mod tests {
80 use super::*;
81 use crate::automaton::LfoAutomaton;
82 use crate::automaton::LfoWaveform;
83 use tokio::sync::mpsc;
84
85 #[tokio::test]
86 async fn test_lfo_task_produces_values() {
87 let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
88 let (value_tx, mut value_rx) = mpsc::channel::<f64>(16);
89 let (cancel_tx, cancel_rx) = watch::channel(false);
90
91 let _handle = spawn_automaton_task(lfo, Duration::from_millis(10), value_tx, cancel_rx);
92
93 for _ in 0..3 {
95 let val = tokio::time::timeout(Duration::from_millis(50), value_rx.recv()).await;
96 assert!(val.is_ok(), "task should produce values");
97 let v = val.unwrap().unwrap();
98 assert!(v >= -1.0 && v <= 1.0, "value {} out of range", v);
99 }
100
101 let _ = cancel_tx.send(true);
102 }
103
104 #[tokio::test]
105 async fn test_task_stops_on_cancel() {
106 let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
107 let (value_tx, _value_rx) = mpsc::channel::<f64>(16);
108 let (cancel_tx, cancel_rx) = watch::channel(false);
109
110 let handle = spawn_automaton_task(lfo, Duration::from_millis(10), value_tx, cancel_rx);
111
112 let _ = cancel_tx.send(true);
113 let result = tokio::time::timeout(Duration::from_millis(100), handle).await;
114 assert!(result.is_ok(), "task should stop on cancel");
115 }
116}