rill_patchbay/
automaton_task.rs1use std::time::Duration;
8
9use tokio::sync::mpsc;
10use tokio::sync::watch;
11
12use crate::control::{Automaton, Time};
13
14pub fn spawn_automaton_task<A>(
26 automaton: A,
27 interval: Duration,
28 value_tx: mpsc::Sender<f64>,
29 cancel_rx: watch::Receiver<bool>,
30) -> tokio::task::JoinHandle<()>
31where
32 A: Automaton + 'static,
33{
34 tokio::spawn(automaton_loop(
35 automaton,
36 interval,
37 value_tx,
38 cancel_rx,
39 ))
40}
41
42async fn automaton_loop<A>(
43 automaton: A,
44 interval: Duration,
45 value_tx: mpsc::Sender<f64>,
46 mut cancel_rx: watch::Receiver<bool>,
47) where
48 A: Automaton,
49{
50 let mut state = automaton.initial_state();
51 let mut time: Time = 0.0;
52 let mut ticker = tokio::time::interval(interval);
53 ticker.tick().await;
55
56 loop {
57 tokio::select! {
58 _ = ticker.tick() => {
59 time += interval.as_secs_f64();
60 let (new_state, value_opt) = automaton.step(time, &A::Action::default(), &state);
61 if let Some(value) = value_opt {
62 if value_tx.send(value).await.is_err() {
63 break;
65 }
66 }
67 state = new_state;
68 }
69
70 _ = cancel_rx.changed() => {
71 if *cancel_rx.borrow() {
72 break;
73 }
74 }
75 }
76 }
77}
78
79#[cfg(test)]
84mod tests {
85 use super::*;
86 use crate::automaton::LfoAutomaton;
87 use crate::automaton::LfoWaveform;
88 use tokio::sync::mpsc;
89
90 #[tokio::test]
91 async fn test_lfo_task_produces_values() {
92 let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
93 let (value_tx, mut value_rx) = mpsc::channel::<f64>(16);
94 let (cancel_tx, cancel_rx) = watch::channel(false);
95
96 let _handle = spawn_automaton_task(
97 lfo,
98 Duration::from_millis(10),
99 value_tx,
100 cancel_rx,
101 );
102
103 for _ in 0..3 {
105 let val = tokio::time::timeout(
106 Duration::from_millis(50),
107 value_rx.recv(),
108 )
109 .await;
110 assert!(val.is_ok(), "task should produce values");
111 let v = val.unwrap().unwrap();
112 assert!(v >= -1.0 && v <= 1.0, "value {} out of range", v);
113 }
114
115 let _ = cancel_tx.send(true);
116 }
117
118 #[tokio::test]
119 async fn test_task_stops_on_cancel() {
120 let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
121 let (value_tx, _value_rx) = mpsc::channel::<f64>(16);
122 let (cancel_tx, cancel_rx) = watch::channel(false);
123
124 let handle = spawn_automaton_task(
125 lfo,
126 Duration::from_millis(10),
127 value_tx,
128 cancel_rx,
129 );
130
131 let _ = cancel_tx.send(true);
132 let result = tokio::time::timeout(Duration::from_millis(100), handle)
133 .await;
134 assert!(result.is_ok(), "task should stop on cancel");
135 }
136}