Skip to main content

rill_patchbay/
automaton_task.rs

1//! # Automaton Task — обёртка Automaton trait в green thread
2//!
3//! Позволяет запустить любой `Automaton` как независимый tokio task
4//! с собственным интервалом тиков. Значения отправляются в `PortCombiner`
5//! через mpsc-канал.
6
7use std::time::Duration;
8
9use tokio::sync::mpsc;
10use tokio::sync::watch;
11
12use crate::control::{Automaton, Time};
13
14/// Запустить автомат как green thread (tokio task)
15///
16/// # Arguments
17///
18/// * `automaton` — реализация `Automaton` trait
19/// * `interval` — частота обновления (например, 10 мс для 100 Hz)
20/// * `value_tx` — канал для отправки значений в PortCombiner
21/// * `cancel_rx` — сигнал отмены (из PortCombinerHandle::cancel_rx)
22///
23/// Возвращает `JoinHandle` задачи. При дропе хэндла задача продолжает
24/// работать. Для остановки используется сигнал отмены.
25pub fn spawn_automaton_task<A>(
26    automaton: A,
27    interval: Duration,
28    value_tx: mpsc::Sender<f64>,
29    cancel_rx: watch::Receiver<bool>,
30) -> tokio::task::JoinHandle<()>
31where
32    A: Automaton + 'static,
33{
34    tokio::spawn(automaton_loop(
35        automaton,
36        interval,
37        value_tx,
38        cancel_rx,
39    ))
40}
41
42async fn automaton_loop<A>(
43    automaton: A,
44    interval: Duration,
45    value_tx: mpsc::Sender<f64>,
46    mut cancel_rx: watch::Receiver<bool>,
47) where
48    A: Automaton,
49{
50    let mut state = automaton.initial_state();
51    let mut time: Time = 0.0;
52    let mut ticker = tokio::time::interval(interval);
53    // Пропускаем первый тик (немедленный)
54    ticker.tick().await;
55
56    loop {
57        tokio::select! {
58            _ = ticker.tick() => {
59                time += interval.as_secs_f64();
60                let (new_state, value_opt) = automaton.step(time, &A::Action::default(), &state);
61                if let Some(value) = value_opt {
62                    if value_tx.send(value).await.is_err() {
63                        // Канал закрыт — PortCombiner остановлен
64                        break;
65                    }
66                }
67                state = new_state;
68            }
69
70            _ = cancel_rx.changed() => {
71                if *cancel_rx.borrow() {
72                    break;
73                }
74            }
75        }
76    }
77}
78
79// =============================================================================
80// Тесты
81// =============================================================================
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86    use crate::automaton::LfoAutomaton;
87    use crate::automaton::LfoWaveform;
88    use tokio::sync::mpsc;
89
90    #[tokio::test]
91    async fn test_lfo_task_produces_values() {
92        let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
93        let (value_tx, mut value_rx) = mpsc::channel::<f64>(16);
94        let (cancel_tx, cancel_rx) = watch::channel(false);
95
96        let _handle = spawn_automaton_task(
97            lfo,
98            Duration::from_millis(10),
99            value_tx,
100            cancel_rx,
101        );
102
103        // Должны получить несколько значений
104        for _ in 0..3 {
105            let val = tokio::time::timeout(
106                Duration::from_millis(50),
107                value_rx.recv(),
108            )
109            .await;
110            assert!(val.is_ok(), "task should produce values");
111            let v = val.unwrap().unwrap();
112            assert!(v >= -1.0 && v <= 1.0, "value {} out of range", v);
113        }
114
115        let _ = cancel_tx.send(true);
116    }
117
118    #[tokio::test]
119    async fn test_task_stops_on_cancel() {
120        let lfo = LfoAutomaton::new("test", 10.0, 1.0, 0.0, LfoWaveform::Sine);
121        let (value_tx, _value_rx) = mpsc::channel::<f64>(16);
122        let (cancel_tx, cancel_rx) = watch::channel(false);
123
124        let handle = spawn_automaton_task(
125            lfo,
126            Duration::from_millis(10),
127            value_tx,
128            cancel_rx,
129        );
130
131        let _ = cancel_tx.send(true);
132        let result = tokio::time::timeout(Duration::from_millis(100), handle)
133            .await;
134        assert!(result.is_ok(), "task should stop on cancel");
135    }
136}