Skip to main content

rill_io/backends/
cpal.rs

1//! CPAL бэкенд (кросс-платформенный)
2
3use crossbeam_channel::{unbounded, Receiver, Sender};
4use parking_lot::RwLock;
5use std::fmt;
6use std::sync::Arc;
7use std::thread;
8use std::time::Duration;
9
10use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
11
12use crate::buffer::IoRingBuffer;
13
14use crate::backend::{AudioBackend, BackendType};
15use crate::config::AudioConfig;
16use crate::error::{IoError, IoResult};
17
18// Команды для потока
19#[derive(Debug)]
20enum Command {
21    Init {
22        input_device: Option<String>,
23        output_device: Option<String>,
24    },
25    Start,
26    Stop,
27}
28
29// Сообщения о состоянии
30#[derive(Debug, PartialEq)]
31enum Status {
32    Initialized,
33    Started,
34    Stopped,
35    Error(String),
36}
37
38/// CPAL бэкенд
39pub struct CpalBackend {
40    config: AudioConfig,
41    host: Arc<cpal::Host>,
42    command_tx: Sender<Command>,
43    status_rx: Receiver<Status>,
44    xruns: Arc<RwLock<u32>>,
45    input_buffer: Arc<RwLock<IoRingBuffer>>,
46    output_buffer: Arc<RwLock<IoRingBuffer>>,
47    thread_handle: Option<thread::JoinHandle<()>>,
48}
49
50impl fmt::Debug for CpalBackend {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        f.debug_struct("CpalBackend")
53            .field("config", &self.config)
54            .field("xruns", &self.xruns)
55            .field("thread_handle", &self.thread_handle.is_some())
56            .finish()
57    }
58}
59
60impl CpalBackend {
61    /// Создать новый CPAL бэкенд
62    pub fn new(config: AudioConfig) -> IoResult<Self> {
63        let host = Arc::new(cpal::default_host());
64        let buffer_size = (config.buffer_size * config.output_channels * 4) as usize;
65
66        let (command_tx, command_rx) = unbounded();
67        let (status_tx, status_rx) = unbounded();
68
69        let xruns = Arc::new(RwLock::new(0));
70        let input_buffer = Arc::new(RwLock::new(IoRingBuffer::new(buffer_size)));
71        let output_buffer = Arc::new(RwLock::new(IoRingBuffer::new(buffer_size)));
72
73        let thread_host = host.clone();
74        let thread_input = input_buffer.clone();
75        let thread_output = output_buffer.clone();
76        let thread_xruns = xruns.clone();
77        let thread_config = config.clone();
78
79        // Запускаем поток для работы с CPAL
80        let handle = thread::spawn(move || {
81            run_cpal_thread(
82                command_rx,
83                status_tx,
84                thread_host,
85                thread_config,
86                thread_input,
87                thread_output,
88                thread_xruns,
89            );
90        });
91
92        Ok(Self {
93            config,
94            host,
95            command_tx,
96            status_rx,
97            xruns,
98            input_buffer,
99            output_buffer,
100            thread_handle: Some(handle),
101        })
102    }
103
104    fn wait_for_status(&self, expected: Status) -> IoResult<()> {
105        while let Ok(status) = self.status_rx.recv_timeout(Duration::from_millis(1000)) {
106            match status {
107                Status::Error(e) => return Err(IoError::Backend(e)),
108                s if s == expected => return Ok(()),
109                _ => continue,
110            }
111        }
112        Err(IoError::Timeout)
113    }
114}
115
116// Функция, выполняющаяся в отдельном потоке
117fn run_cpal_thread(
118    command_rx: Receiver<Command>,
119    status_tx: Sender<Status>,
120    host: Arc<cpal::Host>,
121    config: AudioConfig,
122    input_buffer: Arc<RwLock<IoRingBuffer>>,
123    output_buffer: Arc<RwLock<IoRingBuffer>>,
124    xruns: Arc<RwLock<u32>>,
125) {
126    let mut _input_device: Option<cpal::Device> = None;
127    let mut output_device: Option<cpal::Device> = None;
128    let mut stream: Option<cpal::Stream> = None;
129
130    while let Ok(cmd) = command_rx.recv() {
131        match cmd {
132            Command::Init {
133                input_device: in_name,
134                output_device: out_name,
135            } => {
136                _input_device = find_device(&host, in_name.as_deref(), true).ok().flatten();
137                output_device = find_device(&host, out_name.as_deref(), false)
138                    .ok()
139                    .flatten();
140
141                // Очищаем буферы
142                let cap = input_buffer.read().capacity();
143                let zeros = vec![0.0f32; cap];
144                input_buffer.write().write(&zeros);
145                output_buffer.write().write(&zeros);
146
147                let _ = status_tx.send(Status::Initialized);
148            }
149
150            Command::Start => {
151                if let Some(dev) = &output_device {
152                    let out_buf = output_buffer.clone();
153                    let xruns_clone = xruns.clone();
154
155                    let stream_config = cpal::StreamConfig {
156                        channels: config.output_channels as u16,
157                        sample_rate: cpal::SampleRate(config.sample_rate),
158                        buffer_size: cpal::BufferSize::Fixed(config.buffer_size),
159                    };
160
161                    match dev.build_output_stream(
162                        &stream_config,
163                        move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
164                            let mut out_buf_lock = out_buf.write();
165                            let mut temp = vec![0.0f32; data.len()];
166                            out_buf_lock.read(&mut temp);
167                            data.copy_from_slice(&temp[..data.len()]);
168                        },
169                        move |err| {
170                            eprintln!("Stream error: {}", err);
171                            *xruns_clone.write() += 1;
172                        },
173                        None,
174                    ) {
175                        Ok(s) => {
176                            if s.play().is_ok() {
177                                stream = Some(s);
178                                let _ = status_tx.send(Status::Started);
179                            }
180                        }
181                        Err(e) => {
182                            let _ = status_tx.send(Status::Error(e.to_string()));
183                        }
184                    }
185                }
186            }
187
188            Command::Stop => {
189                if let Some(s) = stream.take() {
190                    let _ = s.pause();
191                }
192                let _ = status_tx.send(Status::Stopped);
193            }
194        }
195    }
196}
197
198fn find_device(
199    host: &cpal::Host,
200    name: Option<&str>,
201    is_input: bool,
202) -> IoResult<Option<cpal::Device>> {
203    let devices = if is_input {
204        host.input_devices()
205    } else {
206        host.output_devices()
207    }
208    .map_err(|e| IoError::DeviceNotFound(e.to_string()))?;
209
210    if let Some(name) = name {
211        for device in devices {
212            if let Ok(dev_name) = device.name() {
213                if dev_name.contains(name) {
214                    return Ok(Some(device));
215                }
216            }
217        }
218        Ok(None)
219    } else {
220        if is_input {
221            Ok(host.default_input_device())
222        } else {
223            Ok(host.default_output_device())
224        }
225    }
226}
227
228impl AudioBackend for CpalBackend {
229    fn backend_type(&self) -> BackendType {
230        BackendType::Cpal
231    }
232
233    fn config(&self) -> &AudioConfig {
234        &self.config
235    }
236
237    fn config_mut(&mut self) -> &mut AudioConfig {
238        &mut self.config
239    }
240
241    fn init(&mut self) -> IoResult<()> {
242        self.command_tx
243            .send(Command::Init {
244                input_device: self.config.input_device.clone(),
245                output_device: self.config.output_device.clone(),
246            })
247            .map_err(|e| IoError::Backend(e.to_string()))?;
248
249        self.wait_for_status(Status::Initialized)
250    }
251
252    fn start(&mut self) -> IoResult<()> {
253        self.command_tx
254            .send(Command::Start)
255            .map_err(|e| IoError::Backend(e.to_string()))?;
256
257        self.wait_for_status(Status::Started)
258    }
259
260    fn stop(&mut self) -> IoResult<()> {
261        self.command_tx
262            .send(Command::Stop)
263            .map_err(|e| IoError::Backend(e.to_string()))?;
264
265        self.wait_for_status(Status::Stopped)
266    }
267
268    fn read(&mut self, buffer: &mut [f32]) -> IoResult<usize> {
269        let mut input_buf = self.input_buffer.write();
270        input_buf.read(buffer);
271        Ok(buffer.len())
272    }
273
274    fn write(&mut self, buffer: &[f32]) -> IoResult<usize> {
275        let mut output_buf = self.output_buffer.write();
276        output_buf.write(buffer);
277        Ok(buffer.len())
278    }
279
280    fn xruns(&self) -> u32 {
281        *self.xruns.read()
282    }
283
284    fn latency(&self) -> Duration {
285        Duration::from_micros(
286            (1_000_000.0 * self.config.buffer_size as f64 / self.config.sample_rate as f64) as u64,
287        )
288    }
289
290    fn list_input_devices(&self) -> Vec<String> {
291        self.host
292            .input_devices()
293            .map(|devices| devices.filter_map(|d| d.name().ok()).collect())
294            .unwrap_or_default()
295    }
296
297    fn list_output_devices(&self) -> Vec<String> {
298        self.host
299            .output_devices()
300            .map(|devices| devices.filter_map(|d| d.name().ok()).collect())
301            .unwrap_or_default()
302    }
303}
304
305impl Drop for CpalBackend {
306    fn drop(&mut self) {
307        if let Some(handle) = self.thread_handle.take() {
308            let _ = self.command_tx.send(Command::Stop);
309            let _ = handle.join();
310        }
311    }
312}