rill_patchbay/
automaton_task.rs1use std::time::Duration;
8
9use tokio::sync::mpsc;
10use tokio::sync::watch;
11
12use crate::control::{Automaton, Time};
13
14pub fn spawn_automaton_task<A>(
26 automaton: A,
27 interval: Duration,
28 value_tx: mpsc::Sender<f64>,
29 cancel_rx: watch::Receiver<bool>,
30) -> tokio::task::JoinHandle<()>
31where
32 A: Automaton + 'static,
33{
34 tokio::spawn(automaton_loop(automaton, interval, value_tx, cancel_rx))
35}
36
37async fn automaton_loop<A>(
38 automaton: A,
39 interval: Duration,
40 value_tx: mpsc::Sender<f64>,
41 mut cancel_rx: watch::Receiver<bool>,
42) where
43 A: Automaton,
44{
45 let mut state = automaton.initial_state();
46 let mut time: Time = 0.0;
47 let mut ticker = tokio::time::interval(interval);
48 ticker.tick().await;
50
51 loop {
52 tokio::select! {
53 _ = ticker.tick() => {
54 time += interval.as_secs_f64();
55 let (new_state, value_opt) = automaton.step(time, &A::Action::default(), &state);
56 if let Some(value) = value_opt {
57 if value_tx.send(value).await.is_err() {
58 break;
60 }
61 }
62 state = new_state;
63 }
64
65 _ = cancel_rx.changed() => {
66 if *cancel_rx.borrow() {
67 break;
68 }
69 }
70 }
71 }
72}
73
74#[cfg(test)]
79mod tests {
80 use super::*;
81 use crate::automaton::LfoAutomaton;
82 use crate::automaton::LfoWaveform;
83 use tokio::sync::mpsc;
84
85 #[tokio::test]
86 async fn test_lfo_task_produces_values() {
87 let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
88 let (value_tx, mut value_rx) = mpsc::channel::<f64>(16);
89 let (cancel_tx, cancel_rx) = watch::channel(false);
90
91 let _handle = spawn_automaton_task(lfo, Duration::from_millis(10), value_tx, cancel_rx);
92
93 for _ in 0..3 {
95 let val = tokio::time::timeout(Duration::from_millis(50), value_rx.recv()).await;
96 assert!(val.is_ok(), "task should produce values");
97 let v = val.unwrap().unwrap();
98 assert!(v >= -1.0 && v <= 1.0, "value {} out of range", v);
99 }
100
101 let _ = cancel_tx.send(true);
102 }
103
104 #[tokio::test]
105 async fn test_task_stops_on_cancel() {
106 let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
107 let (value_tx, _value_rx) = mpsc::channel::<f64>(16);
108 let (cancel_tx, cancel_rx) = watch::channel(false);
109
110 let handle = spawn_automaton_task(lfo, Duration::from_millis(10), value_tx, cancel_rx);
111
112 let _ = cancel_tx.send(true);
113 let result = tokio::time::timeout(Duration::from_millis(100), handle).await;
114 assert!(result.is_ok(), "task should stop on cancel");
115 }
116}