Skip to main content

rill_patchbay/
automaton_task.rs

1//! # Automaton Task — wrapping the Automaton trait in a green thread
2//!
3//! Allows running any `Automaton` as an independent tokio task
4//! with its own tick interval. Values are sent via an mpsc channel.
5//! (`PortCombiner` was removed — it duplicated `CommandEnum`.)
6
7use std::time::Duration;
8
9use tokio::sync::mpsc;
10use tokio::sync::watch;
11
12use crate::engine::{Automaton, Time};
13use rill_core::traits::ParamValue;
14
15/// Run an automaton as a green thread (tokio task)
16///
17/// # Arguments
18///
19/// * `automaton` — implementation of the `Automaton` trait
20/// * `interval` — update frequency (e.g. 10 ms for 100 Hz)
21/// * `value_tx` — channel for sending computed values
22/// * `cancel_rx` — cancellation signal
23///
24/// Returns a `JoinHandle`. Dropping the handle does not stop the
25/// task. Use the cancellation signal to stop it.
26pub fn spawn_automaton_task<A>(
27    automaton: A,
28    interval: Duration,
29    value_tx: mpsc::Sender<f64>,
30    cancel_rx: watch::Receiver<bool>,
31) -> tokio::task::JoinHandle<()>
32where
33    A: Automaton + 'static,
34{
35    tokio::spawn(automaton_loop(automaton, interval, value_tx, cancel_rx))
36}
37
38async fn automaton_loop<A>(
39    automaton: A,
40    interval: Duration,
41    value_tx: mpsc::Sender<f64>,
42    mut cancel_rx: watch::Receiver<bool>,
43) where
44    A: Automaton,
45{
46    let mut internal = automaton.initial_internal();
47    let mut current = ParamValue::Float(0.0);
48    let mut time: Time = 0.0;
49    let mut ticker = tokio::time::interval(interval);
50    // Skip the first tick (immediate)
51    ticker.tick().await;
52
53    loop {
54        tokio::select! {
55            _ = ticker.tick() => {
56                time += interval.as_secs_f64();
57                current = automaton.step(&mut internal, &current, time, &A::Action::default());
58                let value = current.as_f32().unwrap_or(0.0) as f64;
59                if value_tx.send(value).await.is_err() {
60                    // Channel closed — receiver dropped
61                    break;
62                }
63            }
64
65            _ = cancel_rx.changed() => {
66                if *cancel_rx.borrow() {
67                    break;
68                }
69            }
70        }
71    }
72}
73
74// =============================================================================
75// Tests
76// =============================================================================
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81    use crate::automaton::LfoAutomaton;
82    use crate::automaton::LfoWaveform;
83    use tokio::sync::mpsc;
84
85    #[tokio::test]
86    async fn test_lfo_task_produces_values() {
87        let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
88        let (value_tx, mut value_rx) = mpsc::channel::<f64>(16);
89        let (cancel_tx, cancel_rx) = watch::channel(false);
90
91        let _handle = spawn_automaton_task(lfo, Duration::from_millis(10), value_tx, cancel_rx);
92
93        // Should receive several values
94        for _ in 0..3 {
95            let val = tokio::time::timeout(Duration::from_millis(50), value_rx.recv()).await;
96            assert!(val.is_ok(), "task should produce values");
97            let v = val.unwrap().unwrap();
98            assert!(v >= -1.0 && v <= 1.0, "value {} out of range", v);
99        }
100
101        let _ = cancel_tx.send(true);
102    }
103
104    #[tokio::test]
105    async fn test_task_stops_on_cancel() {
106        let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
107        let (value_tx, _value_rx) = mpsc::channel::<f64>(16);
108        let (cancel_tx, cancel_rx) = watch::channel(false);
109
110        let handle = spawn_automaton_task(lfo, Duration::from_millis(10), value_tx, cancel_rx);
111
112        let _ = cancel_tx.send(true);
113        let result = tokio::time::timeout(Duration::from_millis(100), handle).await;
114        assert!(result.is_ok(), "task should stop on cancel");
115    }
116}