Skip to main content

rtp_core/audio_device/
cpal_backend.rs

1//! cpal-backed audio device implementation.
2//!
3//! Provides real audio capture and playback through system audio devices.
4
5use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
6use std::collections::VecDeque;
7use std::sync::{Arc, Mutex};
8use tokio::sync::mpsc;
9
10use super::{AudioConfig, AudioDeviceInfo, DeviceSelector, DeviceType};
11
12/// List all available audio devices (input and output).
13pub fn list_devices() -> Vec<AudioDeviceInfo> {
14    let mut devices = list_input_devices();
15    devices.extend(list_output_devices());
16    devices
17}
18
19/// List available input (microphone) devices.
20pub fn list_input_devices() -> Vec<AudioDeviceInfo> {
21    let host = cpal::default_host();
22    let default_name = host
23        .default_input_device()
24        .and_then(|d| d.name().ok());
25
26    let Ok(input_devices) = host.input_devices() else {
27        return Vec::new();
28    };
29
30    input_devices
31        .filter_map(|device| {
32            let name = device.name().ok()?;
33            let configs = device.supported_input_configs().ok()?;
34
35            let mut sample_rates = Vec::new();
36            let mut channels = Vec::new();
37
38            for config in configs {
39                let min = config.min_sample_rate().0;
40                let max = config.max_sample_rate().0;
41                for &rate in &[8000, 16000, 44100, 48000] {
42                    if rate >= min && rate <= max && !sample_rates.contains(&rate) {
43                        sample_rates.push(rate);
44                    }
45                }
46                let ch = config.channels();
47                if !channels.contains(&ch) {
48                    channels.push(ch);
49                }
50            }
51
52            sample_rates.sort();
53            channels.sort();
54
55            Some(AudioDeviceInfo {
56                is_default: default_name.as_deref() == Some(&name),
57                name,
58                device_type: DeviceType::Input,
59                sample_rates,
60                channels,
61            })
62        })
63        .collect()
64}
65
66/// List available output (speaker) devices.
67pub fn list_output_devices() -> Vec<AudioDeviceInfo> {
68    let host = cpal::default_host();
69    let default_name = host
70        .default_output_device()
71        .and_then(|d| d.name().ok());
72
73    let Ok(output_devices) = host.output_devices() else {
74        return Vec::new();
75    };
76
77    output_devices
78        .filter_map(|device| {
79            let name = device.name().ok()?;
80            let configs = device.supported_output_configs().ok()?;
81
82            let mut sample_rates = Vec::new();
83            let mut channels = Vec::new();
84
85            for config in configs {
86                let min = config.min_sample_rate().0;
87                let max = config.max_sample_rate().0;
88                for &rate in &[8000, 16000, 44100, 48000] {
89                    if rate >= min && rate <= max && !sample_rates.contains(&rate) {
90                        sample_rates.push(rate);
91                    }
92                }
93                let ch = config.channels();
94                if !channels.contains(&ch) {
95                    channels.push(ch);
96                }
97            }
98
99            sample_rates.sort();
100            channels.sort();
101
102            Some(AudioDeviceInfo {
103                is_default: default_name.as_deref() == Some(&name),
104                name,
105                device_type: DeviceType::Output,
106                sample_rates,
107                channels,
108            })
109        })
110        .collect()
111}
112
113/// Check if any audio device is available.
114pub fn is_audio_available() -> bool {
115    let host = cpal::default_host();
116    host.default_input_device().is_some() || host.default_output_device().is_some()
117}
118
119/// Returns a reason why audio is unavailable, or empty string if available.
120pub fn audio_unavailable_reason() -> &'static str {
121    if is_audio_available() {
122        ""
123    } else {
124        "No audio devices found. Check that a sound card is installed and drivers are loaded."
125    }
126}
127
128/// Select a cpal device by selector criteria.
129fn select_input_device(selector: &DeviceSelector) -> Option<cpal::Device> {
130    let host = cpal::default_host();
131    match selector {
132        DeviceSelector::Default => host.default_input_device(),
133        DeviceSelector::ByName(name) => {
134            let devices = host.input_devices().ok()?;
135            devices
136                .into_iter()
137                .find(|d| d.name().ok().map(|n| n.contains(name.as_str())).unwrap_or(false))
138        }
139        DeviceSelector::ByIndex(idx) => {
140            let devices: Vec<_> = host.input_devices().ok()?.collect();
141            devices.into_iter().nth(*idx)
142        }
143    }
144}
145
146fn select_output_device(selector: &DeviceSelector) -> Option<cpal::Device> {
147    let host = cpal::default_host();
148    match selector {
149        DeviceSelector::Default => host.default_output_device(),
150        DeviceSelector::ByName(name) => {
151            let devices = host.output_devices().ok()?;
152            devices
153                .into_iter()
154                .find(|d| d.name().ok().map(|n| n.contains(name.as_str())).unwrap_or(false))
155        }
156        DeviceSelector::ByIndex(idx) => {
157            let devices: Vec<_> = host.output_devices().ok()?.collect();
158            devices.into_iter().nth(*idx)
159        }
160    }
161}
162
163/// Audio capture stream that reads from a microphone.
164pub struct AudioCapture {
165    _stream: cpal::Stream,
166    rx: mpsc::Receiver<Vec<i16>>,
167}
168
169impl AudioCapture {
170    /// Start capturing audio from the selected input device.
171    ///
172    /// If the device doesn't support the requested sample rate, we find the
173    /// nearest supported rate and downsample in the capture callback.
174    pub fn start(
175        selector: &DeviceSelector,
176        config: &AudioConfig,
177    ) -> Result<Self, String> {
178        let device = select_input_device(selector)
179            .ok_or_else(|| format!("Input device not found: {}", selector))?;
180
181        let device_rate = find_supported_input_rate(&device, config.sample_rate, config.channels)
182            .ok_or_else(|| "No supported sample rate found for input device".to_string())?;
183
184        let stream_config = cpal::StreamConfig {
185            channels: config.channels,
186            sample_rate: cpal::SampleRate(device_rate),
187            buffer_size: cpal::BufferSize::Default,
188        };
189
190        let need_resample = device_rate != config.sample_rate;
191        let src_rate = device_rate as f64;
192        let dst_rate = config.sample_rate as f64;
193
194        let samples_per_frame = config.samples_per_frame();
195        let (tx, rx) = mpsc::channel::<Vec<i16>>(32);
196        let buffer = Arc::new(Mutex::new(Vec::with_capacity(samples_per_frame * 2)));
197        let buffer_clone = buffer.clone();
198
199        let stream = device
200            .build_input_stream(
201                &stream_config,
202                move |data: &[f32], _: &cpal::InputCallbackInfo| {
203                    // Convert f32 to i16 with clamping
204                    let i16_data: Vec<i16> = data
205                        .iter()
206                        .map(|&s| (s.clamp(-1.0, 1.0) * 32767.0) as i16)
207                        .collect();
208                    let resampled = if need_resample {
209                        resample_linear(&i16_data, src_rate, dst_rate)
210                    } else {
211                        i16_data
212                    };
213
214                    if let Ok(mut buf) = buffer_clone.lock() {
215                        buf.extend_from_slice(&resampled);
216
217                        while buf.len() >= samples_per_frame {
218                            let frame: Vec<i16> = buf.drain(..samples_per_frame).collect();
219                            let _ = tx.try_send(frame);
220                        }
221                    }
222                },
223                |err| {
224                    tracing::error!("Audio capture error: {}", err);
225                },
226                None,
227            )
228            .map_err(|e| format!("Failed to build input stream: {}", e))?;
229
230        stream
231            .play()
232            .map_err(|e| format!("Failed to start capture: {}", e))?;
233
234        Ok(Self {
235            _stream: stream,
236            rx,
237        })
238    }
239
240    /// Receive the next audio frame.
241    pub async fn next_frame(&mut self) -> Option<Vec<i16>> {
242        self.rx.recv().await
243    }
244}
245
246/// Audio playback stream that writes to a speaker.
247pub struct AudioPlayback {
248    _stream: cpal::Stream,
249    tx: mpsc::Sender<Vec<i16>>,
250}
251
252impl AudioPlayback {
253    /// Start playing audio on the selected output device.
254    ///
255    /// If the device doesn't support the requested sample rate, we find the
256    /// nearest supported rate and perform linear interpolation resampling in
257    /// the playback callback.
258    pub fn start(
259        selector: &DeviceSelector,
260        config: &AudioConfig,
261    ) -> Result<Self, String> {
262        let device = select_output_device(selector)
263            .ok_or_else(|| format!("Output device not found: {}", selector))?;
264
265        // Find a supported sample rate, preferring the requested one.
266        // Try with requested channels first, then fall back to any channel count.
267        let (device_rate, device_channels) =
268            find_supported_output_config(&device, config.sample_rate, config.channels)
269                .ok_or_else(|| "No supported configuration found for output device".to_string())?;
270
271        let stream_config = cpal::StreamConfig {
272            channels: device_channels,
273            sample_rate: cpal::SampleRate(device_rate),
274            buffer_size: cpal::BufferSize::Default,
275        };
276
277        let need_resample = device_rate != config.sample_rate;
278        let need_channel_convert = device_channels != config.channels;
279        let src_rate = config.sample_rate as f64;
280        let dst_rate = device_rate as f64;
281        let out_channels = device_channels;
282
283        // Playout buffer thresholds (all in output samples).
284        //
285        // The sender's RTP clock and the hardware audio clock are independent oscillators
286        // that drift apart over time.  Without RTCP Sender Reports there is no external
287        // reference to correct them, so we do adaptive clock recovery entirely from buffer
288        // level:
289        //
290        //  • high-water (3 frames / 60 ms): buffer is growing → sender faster than hardware.
291        //    Drop 1 sample per frame by blending adjacent samples (linear interpolation).
292        //    Effect: ~0.6 % speed-up, nearly inaudible.
293        //
294        //  • low-water  (1 frame / 20 ms): buffer is shrinking → sender slower than hardware.
295        //    Duplicate 1 sample per frame.
296        //    Effect: ~0.6 % slow-down, nearly inaudible.
297        //
298        //  • hard cap   (4 frames / 80 ms): safety net for burst arrivals.
299        //    When exceeded, trim to the midpoint (2 frames / 40 ms) to create headroom before
300        //    the next potential overflow.  This is a last-resort discontinuity; in steady state
301        //    the adaptive logic above should keep the buffer between low- and high-water.
302        let samples_per_frame_out = (dst_rate / 50.0 * out_channels as f64) as usize;
303        let low_water  = samples_per_frame_out;           // 1 frame  (~20 ms)
304        let high_water = samples_per_frame_out * 3;       // 3 frames (~60 ms)
305        let max_buf_samples = samples_per_frame_out * 4;  // 4 frames (~80 ms) hard cap
306
307        let (tx, mut rx) = mpsc::channel::<Vec<i16>>(32);
308        let buffer = Arc::new(Mutex::new(VecDeque::<f32>::new()));
309        let buffer_clone = buffer.clone();
310
311        // Spawn a task to receive i16 frames, resample/channel-convert, and buffer as f32.
312        // Adaptive clock correction is applied here so the real-time audio callback never
313        // needs to do anything but drain the local buffer.
314        tokio::spawn(async move {
315            while let Some(frame) = rx.recv().await {
316                let resampled = if need_resample {
317                    resample_linear(&frame, src_rate, dst_rate)
318                } else {
319                    frame
320                };
321                let converted = if need_channel_convert {
322                    mono_to_multi(&resampled, out_channels)
323                } else {
324                    resampled
325                };
326                // Convert i16 → f32 [-1.0, 1.0]
327                let mut float_samples: Vec<f32> = converted
328                    .iter()
329                    .map(|&s| (s as f32 / 32768.0).clamp(-1.0, 1.0))
330                    .collect();
331
332                if let Ok(mut buf) = buffer_clone.lock() {
333                    // Adaptive rate correction based on current buffer occupancy.
334                    if buf.len() > high_water && float_samples.len() > 2 {
335                        // Buffer growing: drop 1 sample near the middle via linear blend.
336                        // Choosing the midpoint minimises audible discontinuity compared
337                        // with dropping at the edges.
338                        let mid = float_samples.len() / 2;
339                        let blended = (float_samples[mid] + float_samples[mid + 1]) / 2.0;
340                        float_samples[mid] = blended;
341                        float_samples.remove(mid + 1);
342                    } else if buf.len() < low_water && !float_samples.is_empty() {
343                        // Buffer shrinking: duplicate 1 sample near the middle.
344                        let mid = float_samples.len() / 2;
345                        let dup = float_samples[mid];
346                        float_samples.insert(mid + 1, dup);
347                    }
348
349                    buf.extend(float_samples.iter());
350
351                    // Hard cap: burst protection.  Trim to midpoint to leave headroom.
352                    if buf.len() > max_buf_samples {
353                        let target = max_buf_samples / 2;
354                        let excess = buf.len().saturating_sub(target);
355                        buf.drain(..excess);
356                    }
357                }
358            }
359        });
360
361        let buffer_for_stream = buffer.clone();
362
363        // Local buffer owned exclusively by the audio callback (no lock needed to read it).
364        // On each invocation we try to drain the shared Mutex buffer into this local one in a
365        // single short critical section, then serve output from the local copy.  This means the
366        // real-time audio thread never blocks waiting for the tokio producer.
367        //
368        // VecDeque is used so that pop_front() and draining from the front are O(1).
369        // Vec::drain(..n) from the front shifts all remaining elements, which is O(n) and
370        // creates non-deterministic callback timing → occasional underruns → clicks.
371        let mut local_buf: VecDeque<f32> = VecDeque::with_capacity(max_buf_samples);
372
373        let stream = device
374            .build_output_stream(
375                &stream_config,
376                move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
377                    // Try to grab all pending samples from the shared buffer in one shot.
378                    // try_lock() never blocks: if the producer holds the lock we simply
379                    // serve from whatever is already in local_buf.
380                    if let Ok(mut shared) = buffer_for_stream.try_lock() {
381                        local_buf.extend(shared.drain(..));
382                    }
383
384                    for sample in data.iter_mut() {
385                        *sample = local_buf.pop_front().unwrap_or(0.0);
386                    }
387                },
388                |err| {
389                    tracing::error!("Audio playback error: {}", err);
390                },
391                None,
392            )
393            .map_err(|e| format!("Failed to build output stream: {}", e))?;
394
395        stream
396            .play()
397            .map_err(|e| format!("Failed to start playback: {}", e))?;
398
399        Ok(Self {
400            _stream: stream,
401            tx,
402        })
403    }
404
405    /// Send an audio frame for playback.
406    pub async fn play_frame(&self, frame: Vec<i16>) -> Result<(), String> {
407        self.tx
408            .send(frame)
409            .await
410            .map_err(|_| "Playback channel closed".to_string())
411    }
412}
413
414/// Find a supported output sample rate and channel count for the device.
415/// Prefers the requested rate+channels; falls back to standard rates and any channel count.
416fn find_supported_output_config(device: &cpal::Device, requested: u32, channels: u16) -> Option<(u32, u16)> {
417    let configs: Vec<_> = device.supported_output_configs().ok()?.collect();
418    let standard_rates = [44100u32, 48000, 16000, 8000, 96000];
419
420    // 1. Exact match: requested rate + requested channels
421    for cfg in &configs {
422        if cfg.channels() == channels
423            && cfg.min_sample_rate().0 <= requested
424            && cfg.max_sample_rate().0 >= requested
425        {
426            return Some((requested, channels));
427        }
428    }
429    // 2. Requested rate, any channel count
430    for cfg in &configs {
431        if cfg.min_sample_rate().0 <= requested && cfg.max_sample_rate().0 >= requested {
432            return Some((requested, cfg.channels()));
433        }
434    }
435    // 3. Standard rate + requested channels
436    for &rate in &standard_rates {
437        for cfg in &configs {
438            if cfg.channels() == channels
439                && cfg.min_sample_rate().0 <= rate
440                && cfg.max_sample_rate().0 >= rate
441            {
442                return Some((rate, channels));
443            }
444        }
445    }
446    // 4. Standard rate, any channel count
447    for &rate in &standard_rates {
448        for cfg in &configs {
449            if cfg.min_sample_rate().0 <= rate && cfg.max_sample_rate().0 >= rate {
450                return Some((rate, cfg.channels()));
451            }
452        }
453    }
454    None
455}
456
457/// Find a supported input sample rate for the device.
458fn find_supported_input_rate(device: &cpal::Device, requested: u32, channels: u16) -> Option<u32> {
459    let configs: Vec<_> = device.supported_input_configs().ok()?.collect();
460    for cfg in &configs {
461        if cfg.channels() == channels
462            && cfg.min_sample_rate().0 <= requested
463            && cfg.max_sample_rate().0 >= requested
464        {
465            return Some(requested);
466        }
467    }
468    let standard_rates = [44100u32, 48000, 16000, 8000, 96000];
469    for &rate in &standard_rates {
470        for cfg in &configs {
471            if cfg.channels() == channels
472                && cfg.min_sample_rate().0 <= rate
473                && cfg.max_sample_rate().0 >= rate
474            {
475                return Some(rate);
476            }
477        }
478    }
479    for &rate in &standard_rates {
480        for cfg in &configs {
481            if cfg.min_sample_rate().0 <= rate && cfg.max_sample_rate().0 >= rate {
482                return Some(rate);
483            }
484        }
485    }
486    None
487}
488
489/// Duplicate mono samples to fill multiple channels (e.g., mono→stereo).
490fn mono_to_multi(samples: &[i16], channels: u16) -> Vec<i16> {
491    let ch = channels as usize;
492    let mut out = Vec::with_capacity(samples.len() * ch);
493    for &s in samples {
494        for _ in 0..ch {
495            out.push(s);
496        }
497    }
498    out
499}
500
501/// Linear interpolation resampling from src_rate to dst_rate.
502fn resample_linear(samples: &[i16], src_rate: f64, dst_rate: f64) -> Vec<i16> {
503    if samples.is_empty() {
504        return Vec::new();
505    }
506    let ratio = src_rate / dst_rate;
507    let out_len = ((samples.len() as f64) / ratio).ceil() as usize;
508    let mut output = Vec::with_capacity(out_len);
509    for i in 0..out_len {
510        let src_pos = i as f64 * ratio;
511        let idx = src_pos as usize;
512        let frac = src_pos - idx as f64;
513        let sample = if idx + 1 < samples.len() {
514            let a = samples[idx] as f64;
515            let b = samples[idx + 1] as f64;
516            (a + frac * (b - a)) as i16
517        } else {
518            samples[samples.len() - 1]
519        };
520        output.push(sample);
521    }
522    output
523}