1use crossbeam_channel::{unbounded, Receiver, Sender};
4use parking_lot::RwLock;
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8
9use crate::backend::AudioBackend;
10use crate::error::{IoError, IoResult};
11
12pub trait AudioProcessor: Send + Sync + 'static {
14 fn process(&mut self, input: &[f32], output: &mut [f32]);
16
17 fn reset(&mut self);
19
20 fn set_sample_rate(&mut self, sample_rate: f32);
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum EngineState {
27 Stopped,
29 Running,
31 Paused,
33 Error,
35}
36
37enum ProcessorCommand<P: AudioProcessor> {
39 Update(Box<dyn FnOnce(&mut P) + Send>),
41}
42
43pub struct AudioEngine<B, P>
45where
46 B: AudioBackend + 'static,
47 P: AudioProcessor + 'static,
48{
49 backend: Option<B>,
50 processor: Arc<RwLock<Option<P>>>,
51 state: Arc<RwLock<EngineState>>,
52 command_tx: Sender<ProcessorCommand<P>>,
53 command_rx: Receiver<ProcessorCommand<P>>,
54 thread_handle: Option<thread::JoinHandle<()>>,
55 sample_rate: f32,
56 buffer_size: usize,
57 channels: usize,
58 xrun_count: Arc<RwLock<u32>>,
59}
60
61impl<B, P> AudioEngine<B, P>
62where
63 B: AudioBackend + Send + Sync + 'static,
64 P: AudioProcessor + Send + Sync + 'static,
65{
66 pub fn new(backend: B, processor: P) -> Self {
68 let sample_rate = backend.config().sample_rate as f32;
69 let buffer_size = backend.config().buffer_size as usize;
70 let channels = backend.config().output_channels as usize;
71
72 let (command_tx, command_rx) = unbounded();
73
74 Self {
75 backend: Some(backend),
76 processor: Arc::new(RwLock::new(Some(processor))),
77 state: Arc::new(RwLock::new(EngineState::Stopped)),
78 command_tx,
79 command_rx,
80 thread_handle: None,
81 sample_rate,
82 buffer_size,
83 channels,
84 xrun_count: Arc::new(RwLock::new(0)),
85 }
86 }
87
88 pub fn start(&mut self) -> IoResult<()> {
90 if *self.state.read() == EngineState::Running {
91 return Ok(());
92 }
93
94 let mut backend = self
96 .backend
97 .take()
98 .ok_or_else(|| IoError::Backend("Backend already taken".to_string()))?;
99
100 backend.init()?;
101 backend.start()?;
102
103 *self.state.write() = EngineState::Running;
104
105 let state = self.state.clone();
107 let xrun_count = self.xrun_count.clone();
108 let processor = self.processor.clone();
109 let buffer_size = self.buffer_size;
110 let channels = self.channels;
111 let command_rx = self.command_rx.clone();
112
113 let handle = thread::spawn(move || {
115 let total_samples = buffer_size * channels;
116 let mut input_buffer = vec![0.0f32; total_samples];
117 let mut output_buffer = vec![0.0f32; total_samples];
118
119 while *state.read() == EngineState::Running {
120 while let Ok(cmd) = command_rx.try_recv() {
122 match cmd {
123 ProcessorCommand::Update(f) => {
124 if let Some(proc) = processor.write().as_mut() {
125 f(proc);
126 }
127 }
128 }
129 }
130
131 match backend.read(&mut input_buffer) {
133 Ok(read) if read > 0 => {
134 if let Some(proc) = processor.write().as_mut() {
136 proc.process(&input_buffer[..read], &mut output_buffer[..read]);
137 }
138
139 if let Err(e) = backend.write(&output_buffer[..read]) {
141 log::error!("Write error: {}", e);
142 *xrun_count.write() += 1;
143 }
144 }
145 Ok(_) => {}
146 Err(e) => {
147 log::error!("Read error: {}", e);
148 *xrun_count.write() += 1;
149 }
150 }
151
152 thread::sleep(Duration::from_micros(100));
154 }
155 });
156
157 self.thread_handle = Some(handle);
158
159 Ok(())
160 }
161
162 pub fn stop(&mut self) -> IoResult<()> {
164 *self.state.write() = EngineState::Stopped;
165
166 if let Some(handle) = self.thread_handle.take() {
167 let _ = handle.join();
168 }
169
170 if let Some(backend) = &mut self.backend {
171 backend.stop()?;
172 }
173
174 Ok(())
175 }
176
177 pub fn pause(&mut self) -> IoResult<()> {
179 if *self.state.read() == EngineState::Running {
180 *self.state.write() = EngineState::Paused;
181 }
182 Ok(())
183 }
184
185 pub fn resume(&mut self) -> IoResult<()> {
187 if *self.state.read() == EngineState::Paused {
188 *self.state.write() = EngineState::Running;
189 }
190 Ok(())
191 }
192
193 pub fn state(&self) -> EngineState {
195 *self.state.read()
196 }
197
198 pub fn xruns(&self) -> u32 {
200 *self.xrun_count.read()
201 }
202
203 pub fn latency(&self) -> Duration {
205 if let Some(backend) = &self.backend {
206 backend.latency()
207 } else {
208 Duration::from_micros(0)
209 }
210 }
211
212 pub fn sample_rate(&self) -> f32 {
214 self.sample_rate
215 }
216
217 pub fn buffer_size(&self) -> usize {
219 self.buffer_size
220 }
221
222 pub fn with_processor<F, R>(&mut self, f: F) -> Option<R>
224 where
225 F: FnOnce(&mut P) -> R,
226 {
227 if *self.state.read() == EngineState::Running {
228 None
229 } else {
230 if let Some(proc) = self.processor.write().as_mut() {
231 Some(f(proc))
232 } else {
233 None
234 }
235 }
236 }
237
238 pub fn update_processor<F>(&self, f: F) -> IoResult<()>
240 where
241 F: FnOnce(&mut P) + Send + 'static,
242 {
243 self.command_tx
244 .send(ProcessorCommand::Update(Box::new(f)))
245 .map_err(|_| IoError::Channel)?;
246 Ok(())
247 }
248}
249
250impl<B, P> Drop for AudioEngine<B, P>
251where
252 B: AudioBackend + 'static,
253 P: AudioProcessor + 'static,
254{
255 fn drop(&mut self) {
256 let _ = self.stop();
257 }
258}