Skip to main content

rill_patchbay/
automaton_task.rs

1//! # Automaton Task — обёртка Automaton trait в green thread
2//!
3//! Позволяет запустить любой `Automaton` как независимый tokio task
4//! с собственным интервалом тиков. Значения отправляются в `PortCombiner`
5//! через mpsc-канал.
6
7use std::time::Duration;
8
9use tokio::sync::mpsc;
10use tokio::sync::watch;
11
12use crate::control::{Automaton, Time};
13
14/// Запустить автомат как green thread (tokio task)
15///
16/// # Arguments
17///
18/// * `automaton` — реализация `Automaton` trait
19/// * `interval` — частота обновления (например, 10 мс для 100 Hz)
20/// * `value_tx` — канал для отправки значений в PortCombiner
21/// * `cancel_rx` — сигнал отмены (из PortCombinerHandle::cancel_rx)
22///
23/// Возвращает `JoinHandle` задачи. При дропе хэндла задача продолжает
24/// работать. Для остановки используется сигнал отмены.
25pub fn spawn_automaton_task<A>(
26    automaton: A,
27    interval: Duration,
28    value_tx: mpsc::Sender<f64>,
29    cancel_rx: watch::Receiver<bool>,
30) -> tokio::task::JoinHandle<()>
31where
32    A: Automaton + 'static,
33{
34    tokio::spawn(automaton_loop(automaton, interval, value_tx, cancel_rx))
35}
36
37async fn automaton_loop<A>(
38    automaton: A,
39    interval: Duration,
40    value_tx: mpsc::Sender<f64>,
41    mut cancel_rx: watch::Receiver<bool>,
42) where
43    A: Automaton,
44{
45    let mut state = automaton.initial_state();
46    let mut time: Time = 0.0;
47    let mut ticker = tokio::time::interval(interval);
48    // Пропускаем первый тик (немедленный)
49    ticker.tick().await;
50
51    loop {
52        tokio::select! {
53            _ = ticker.tick() => {
54                time += interval.as_secs_f64();
55                let (new_state, value_opt) = automaton.step(time, &A::Action::default(), &state);
56                if let Some(value) = value_opt {
57                    if value_tx.send(value).await.is_err() {
58                        // Канал закрыт — PortCombiner остановлен
59                        break;
60                    }
61                }
62                state = new_state;
63            }
64
65            _ = cancel_rx.changed() => {
66                if *cancel_rx.borrow() {
67                    break;
68                }
69            }
70        }
71    }
72}
73
74// =============================================================================
75// Тесты
76// =============================================================================
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81    use crate::automaton::LfoAutomaton;
82    use crate::automaton::LfoWaveform;
83    use tokio::sync::mpsc;
84
85    #[tokio::test]
86    async fn test_lfo_task_produces_values() {
87        let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
88        let (value_tx, mut value_rx) = mpsc::channel::<f64>(16);
89        let (cancel_tx, cancel_rx) = watch::channel(false);
90
91        let _handle = spawn_automaton_task(lfo, Duration::from_millis(10), value_tx, cancel_rx);
92
93        // Должны получить несколько значений
94        for _ in 0..3 {
95            let val = tokio::time::timeout(Duration::from_millis(50), value_rx.recv()).await;
96            assert!(val.is_ok(), "task should produce values");
97            let v = val.unwrap().unwrap();
98            assert!(v >= -1.0 && v <= 1.0, "value {} out of range", v);
99        }
100
101        let _ = cancel_tx.send(true);
102    }
103
104    #[tokio::test]
105    async fn test_task_stops_on_cancel() {
106        let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
107        let (value_tx, _value_rx) = mpsc::channel::<f64>(16);
108        let (cancel_tx, cancel_rx) = watch::channel(false);
109
110        let handle = spawn_automaton_task(lfo, Duration::from_millis(10), value_tx, cancel_rx);
111
112        let _ = cancel_tx.send(true);
113        let result = tokio::time::timeout(Duration::from_millis(100), handle).await;
114        assert!(result.is_ok(), "task should stop on cancel");
115    }
116}