1use crossbeam_channel::{unbounded, Receiver, Sender};
4use parking_lot::RwLock;
5use std::fmt;
6use std::sync::Arc;
7use std::thread;
8use std::time::Duration;
9
10use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
11
12use crate::buffer::IoRingBuffer;
13
14use crate::backend::{AudioBackend, BackendType};
15use crate::config::AudioConfig;
16use crate::error::{IoError, IoResult};
17
18#[derive(Debug)]
20enum Command {
21 Init {
22 input_device: Option<String>,
23 output_device: Option<String>,
24 },
25 Start,
26 Stop,
27}
28
29#[derive(Debug, PartialEq)]
31enum Status {
32 Initialized,
33 Started,
34 Stopped,
35 Error(String),
36}
37
38pub struct CpalBackend {
40 config: AudioConfig,
41 host: Arc<cpal::Host>,
42 command_tx: Sender<Command>,
43 status_rx: Receiver<Status>,
44 xruns: Arc<RwLock<u32>>,
45 input_buffer: Arc<RwLock<IoRingBuffer>>,
46 output_buffer: Arc<RwLock<IoRingBuffer>>,
47 thread_handle: Option<thread::JoinHandle<()>>,
48}
49
50impl fmt::Debug for CpalBackend {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 f.debug_struct("CpalBackend")
53 .field("config", &self.config)
54 .field("xruns", &self.xruns)
55 .field("thread_handle", &self.thread_handle.is_some())
56 .finish()
57 }
58}
59
60impl CpalBackend {
61 pub fn new(config: AudioConfig) -> IoResult<Self> {
63 let host = Arc::new(cpal::default_host());
64 let buffer_size = (config.buffer_size * config.output_channels * 4) as usize;
65
66 let (command_tx, command_rx) = unbounded();
67 let (status_tx, status_rx) = unbounded();
68
69 let xruns = Arc::new(RwLock::new(0));
70 let input_buffer = Arc::new(RwLock::new(IoRingBuffer::new(buffer_size)));
71 let output_buffer = Arc::new(RwLock::new(IoRingBuffer::new(buffer_size)));
72
73 let thread_host = host.clone();
74 let thread_input = input_buffer.clone();
75 let thread_output = output_buffer.clone();
76 let thread_xruns = xruns.clone();
77 let thread_config = config.clone();
78
79 let handle = thread::spawn(move || {
81 run_cpal_thread(
82 command_rx,
83 status_tx,
84 thread_host,
85 thread_config,
86 thread_input,
87 thread_output,
88 thread_xruns,
89 );
90 });
91
92 Ok(Self {
93 config,
94 host,
95 command_tx,
96 status_rx,
97 xruns,
98 input_buffer,
99 output_buffer,
100 thread_handle: Some(handle),
101 })
102 }
103
104 fn wait_for_status(&self, expected: Status) -> IoResult<()> {
105 while let Ok(status) = self.status_rx.recv_timeout(Duration::from_millis(1000)) {
106 match status {
107 Status::Error(e) => return Err(IoError::Backend(e)),
108 s if s == expected => return Ok(()),
109 _ => continue,
110 }
111 }
112 Err(IoError::Timeout)
113 }
114}
115
116fn run_cpal_thread(
118 command_rx: Receiver<Command>,
119 status_tx: Sender<Status>,
120 host: Arc<cpal::Host>,
121 config: AudioConfig,
122 input_buffer: Arc<RwLock<IoRingBuffer>>,
123 output_buffer: Arc<RwLock<IoRingBuffer>>,
124 xruns: Arc<RwLock<u32>>,
125) {
126 let mut _input_device: Option<cpal::Device> = None;
127 let mut output_device: Option<cpal::Device> = None;
128 let mut stream: Option<cpal::Stream> = None;
129
130 while let Ok(cmd) = command_rx.recv() {
131 match cmd {
132 Command::Init {
133 input_device: in_name,
134 output_device: out_name,
135 } => {
136 _input_device = find_device(&host, in_name.as_deref(), true).ok().flatten();
137 output_device = find_device(&host, out_name.as_deref(), false)
138 .ok()
139 .flatten();
140
141 let cap = input_buffer.read().capacity();
143 let zeros = vec![0.0f32; cap];
144 input_buffer.write().write(&zeros);
145 output_buffer.write().write(&zeros);
146
147 let _ = status_tx.send(Status::Initialized);
148 }
149
150 Command::Start => {
151 if let Some(dev) = &output_device {
152 let out_buf = output_buffer.clone();
153 let xruns_clone = xruns.clone();
154
155 let stream_config = cpal::StreamConfig {
156 channels: config.output_channels as u16,
157 sample_rate: cpal::SampleRate(config.sample_rate),
158 buffer_size: cpal::BufferSize::Fixed(config.buffer_size),
159 };
160
161 match dev.build_output_stream(
162 &stream_config,
163 move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
164 let mut out_buf_lock = out_buf.write();
165 let mut temp = vec![0.0f32; data.len()];
166 out_buf_lock.read(&mut temp);
167 data.copy_from_slice(&temp[..data.len()]);
168 },
169 move |err| {
170 eprintln!("Stream error: {}", err);
171 *xruns_clone.write() += 1;
172 },
173 None,
174 ) {
175 Ok(s) => {
176 if s.play().is_ok() {
177 stream = Some(s);
178 let _ = status_tx.send(Status::Started);
179 }
180 }
181 Err(e) => {
182 let _ = status_tx.send(Status::Error(e.to_string()));
183 }
184 }
185 }
186 }
187
188 Command::Stop => {
189 if let Some(s) = stream.take() {
190 let _ = s.pause();
191 }
192 let _ = status_tx.send(Status::Stopped);
193 }
194 }
195 }
196}
197
198fn find_device(
199 host: &cpal::Host,
200 name: Option<&str>,
201 is_input: bool,
202) -> IoResult<Option<cpal::Device>> {
203 let devices = if is_input {
204 host.input_devices()
205 } else {
206 host.output_devices()
207 }
208 .map_err(|e| IoError::DeviceNotFound(e.to_string()))?;
209
210 if let Some(name) = name {
211 for device in devices {
212 if let Ok(dev_name) = device.name() {
213 if dev_name.contains(name) {
214 return Ok(Some(device));
215 }
216 }
217 }
218 Ok(None)
219 } else {
220 if is_input {
221 Ok(host.default_input_device())
222 } else {
223 Ok(host.default_output_device())
224 }
225 }
226}
227
228impl AudioBackend for CpalBackend {
229 fn backend_type(&self) -> BackendType {
230 BackendType::Cpal
231 }
232
233 fn config(&self) -> &AudioConfig {
234 &self.config
235 }
236
237 fn config_mut(&mut self) -> &mut AudioConfig {
238 &mut self.config
239 }
240
241 fn init(&mut self) -> IoResult<()> {
242 self.command_tx
243 .send(Command::Init {
244 input_device: self.config.input_device.clone(),
245 output_device: self.config.output_device.clone(),
246 })
247 .map_err(|e| IoError::Backend(e.to_string()))?;
248
249 self.wait_for_status(Status::Initialized)
250 }
251
252 fn start(&mut self) -> IoResult<()> {
253 self.command_tx
254 .send(Command::Start)
255 .map_err(|e| IoError::Backend(e.to_string()))?;
256
257 self.wait_for_status(Status::Started)
258 }
259
260 fn stop(&mut self) -> IoResult<()> {
261 self.command_tx
262 .send(Command::Stop)
263 .map_err(|e| IoError::Backend(e.to_string()))?;
264
265 self.wait_for_status(Status::Stopped)
266 }
267
268 fn read(&mut self, buffer: &mut [f32]) -> IoResult<usize> {
269 let mut input_buf = self.input_buffer.write();
270 input_buf.read(buffer);
271 Ok(buffer.len())
272 }
273
274 fn write(&mut self, buffer: &[f32]) -> IoResult<usize> {
275 let mut output_buf = self.output_buffer.write();
276 output_buf.write(buffer);
277 Ok(buffer.len())
278 }
279
280 fn xruns(&self) -> u32 {
281 *self.xruns.read()
282 }
283
284 fn latency(&self) -> Duration {
285 Duration::from_micros(
286 (1_000_000.0 * self.config.buffer_size as f64 / self.config.sample_rate as f64) as u64,
287 )
288 }
289
290 fn list_input_devices(&self) -> Vec<String> {
291 self.host
292 .input_devices()
293 .map(|devices| devices.filter_map(|d| d.name().ok()).collect())
294 .unwrap_or_default()
295 }
296
297 fn list_output_devices(&self) -> Vec<String> {
298 self.host
299 .output_devices()
300 .map(|devices| devices.filter_map(|d| d.name().ok()).collect())
301 .unwrap_or_default()
302 }
303}
304
305impl Drop for CpalBackend {
306 fn drop(&mut self) {
307 if let Some(handle) = self.thread_handle.take() {
308 let _ = self.command_tx.send(Command::Stop);
309 let _ = handle.join();
310 }
311 }
312}