Skip to main content

rill_io/
engine.rs

1//! Основной аудио движок
2
3use crossbeam_channel::{unbounded, Receiver, Sender};
4use parking_lot::RwLock;
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8
9use crate::backend::AudioBackend;
10use crate::error::{IoError, IoResult};
11
12/// Тип процессора аудио
13pub trait AudioProcessor: Send + Sync + 'static {
14    /// Обработать блок аудио
15    fn process(&mut self, input: &[f32], output: &mut [f32]);
16
17    /// Сбросить состояние
18    fn reset(&mut self);
19
20    /// Установить частоту дискретизации
21    fn set_sample_rate(&mut self, sample_rate: f32);
22}
23
24/// Состояние аудио движка
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum EngineState {
27    /// Остановлен
28    Stopped,
29    /// Запущен
30    Running,
31    /// Приостановлен
32    Paused,
33    /// Ошибка
34    Error,
35}
36
37/// Команды для потока обработки
38enum ProcessorCommand<P: AudioProcessor> {
39    /// Обновить процессор
40    Update(Box<dyn FnOnce(&mut P) + Send>),
41}
42
43/// Основной аудио движок
44pub struct AudioEngine<B, P>
45where
46    B: AudioBackend + 'static,
47    P: AudioProcessor + 'static,
48{
49    backend: Option<B>,
50    processor: Arc<RwLock<Option<P>>>,
51    state: Arc<RwLock<EngineState>>,
52    command_tx: Sender<ProcessorCommand<P>>,
53    command_rx: Receiver<ProcessorCommand<P>>,
54    thread_handle: Option<thread::JoinHandle<()>>,
55    sample_rate: f32,
56    buffer_size: usize,
57    channels: usize,
58    xrun_count: Arc<RwLock<u32>>,
59}
60
61impl<B, P> AudioEngine<B, P>
62where
63    B: AudioBackend + Send + Sync + 'static,
64    P: AudioProcessor + Send + Sync + 'static,
65{
66    /// Создать новый аудио движок
67    pub fn new(backend: B, processor: P) -> Self {
68        let sample_rate = backend.config().sample_rate as f32;
69        let buffer_size = backend.config().buffer_size as usize;
70        let channels = backend.config().output_channels as usize;
71
72        let (command_tx, command_rx) = unbounded();
73
74        Self {
75            backend: Some(backend),
76            processor: Arc::new(RwLock::new(Some(processor))),
77            state: Arc::new(RwLock::new(EngineState::Stopped)),
78            command_tx,
79            command_rx,
80            thread_handle: None,
81            sample_rate,
82            buffer_size,
83            channels,
84            xrun_count: Arc::new(RwLock::new(0)),
85        }
86    }
87
88    /// Запустить движок
89    pub fn start(&mut self) -> IoResult<()> {
90        if *self.state.read() == EngineState::Running {
91            return Ok(());
92        }
93
94        // Забираем backend из Option
95        let mut backend = self
96            .backend
97            .take()
98            .ok_or_else(|| IoError::Backend("Backend already taken".to_string()))?;
99
100        backend.init()?;
101        backend.start()?;
102
103        *self.state.write() = EngineState::Running;
104
105        // Клонируем для потока
106        let state = self.state.clone();
107        let xrun_count = self.xrun_count.clone();
108        let processor = self.processor.clone();
109        let buffer_size = self.buffer_size;
110        let channels = self.channels;
111        let command_rx = self.command_rx.clone();
112
113        // Запускаем поток обработки
114        let handle = thread::spawn(move || {
115            let total_samples = buffer_size * channels;
116            let mut input_buffer = vec![0.0f32; total_samples];
117            let mut output_buffer = vec![0.0f32; total_samples];
118
119            while *state.read() == EngineState::Running {
120                // Обрабатываем команды
121                while let Ok(cmd) = command_rx.try_recv() {
122                    match cmd {
123                        ProcessorCommand::Update(f) => {
124                            if let Some(proc) = processor.write().as_mut() {
125                                f(proc);
126                            }
127                        }
128                    }
129                }
130
131                // Читаем входные данные
132                match backend.read(&mut input_buffer) {
133                    Ok(read) if read > 0 => {
134                        // Получаем процессор и обрабатываем
135                        if let Some(proc) = processor.write().as_mut() {
136                            proc.process(&input_buffer[..read], &mut output_buffer[..read]);
137                        }
138
139                        // Записываем выходные данные
140                        if let Err(e) = backend.write(&output_buffer[..read]) {
141                            log::error!("Write error: {}", e);
142                            *xrun_count.write() += 1;
143                        }
144                    }
145                    Ok(_) => {}
146                    Err(e) => {
147                        log::error!("Read error: {}", e);
148                        *xrun_count.write() += 1;
149                    }
150                }
151
152                // Небольшая пауза для снижения нагрузки на CPU
153                thread::sleep(Duration::from_micros(100));
154            }
155        });
156
157        self.thread_handle = Some(handle);
158
159        Ok(())
160    }
161
162    /// Остановить движок
163    pub fn stop(&mut self) -> IoResult<()> {
164        *self.state.write() = EngineState::Stopped;
165
166        if let Some(handle) = self.thread_handle.take() {
167            let _ = handle.join();
168        }
169
170        if let Some(backend) = &mut self.backend {
171            backend.stop()?;
172        }
173
174        Ok(())
175    }
176
177    /// Приостановить обработку
178    pub fn pause(&mut self) -> IoResult<()> {
179        if *self.state.read() == EngineState::Running {
180            *self.state.write() = EngineState::Paused;
181        }
182        Ok(())
183    }
184
185    /// Возобновить обработку
186    pub fn resume(&mut self) -> IoResult<()> {
187        if *self.state.read() == EngineState::Paused {
188            *self.state.write() = EngineState::Running;
189        }
190        Ok(())
191    }
192
193    /// Получить состояние движка
194    pub fn state(&self) -> EngineState {
195        *self.state.read()
196    }
197
198    /// Получить количество xrun'ов
199    pub fn xruns(&self) -> u32 {
200        *self.xrun_count.read()
201    }
202
203    /// Получить текущую задержку
204    pub fn latency(&self) -> Duration {
205        if let Some(backend) = &self.backend {
206            backend.latency()
207        } else {
208            Duration::from_micros(0)
209        }
210    }
211
212    /// Получить частоту дискретизации
213    pub fn sample_rate(&self) -> f32 {
214        self.sample_rate
215    }
216
217    /// Получить размер буфера
218    pub fn buffer_size(&self) -> usize {
219        self.buffer_size
220    }
221
222    /// Выполнить операцию с процессором (если движок не запущен)
223    pub fn with_processor<F, R>(&mut self, f: F) -> Option<R>
224    where
225        F: FnOnce(&mut P) -> R,
226    {
227        if *self.state.read() == EngineState::Running {
228            None
229        } else {
230            if let Some(proc) = self.processor.write().as_mut() {
231                Some(f(proc))
232            } else {
233                None
234            }
235        }
236    }
237
238    /// Обновить процессор через замыкание (безопасно для многопоточности)
239    pub fn update_processor<F>(&self, f: F) -> IoResult<()>
240    where
241        F: FnOnce(&mut P) + Send + 'static,
242    {
243        self.command_tx
244            .send(ProcessorCommand::Update(Box::new(f)))
245            .map_err(|_| IoError::Channel)?;
246        Ok(())
247    }
248}
249
250impl<B, P> Drop for AudioEngine<B, P>
251where
252    B: AudioBackend + 'static,
253    P: AudioProcessor + 'static,
254{
255    fn drop(&mut self) {
256        let _ = self.stop();
257    }
258}