use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
BuildStreamError, DefaultStreamConfigError, Device, DeviceNameError, DevicesError,
OutputCallbackInfo, PauseStreamError, PlayStreamError, SampleFormat, Stream, StreamConfig,
SupportedBufferSize,
};
use crossbeam_channel::Receiver;
use super::{
AudioBackendError, AudioBackendErrorKind, AudioBackendManager, BackendResult, RenderThreadInit,
};
use crate::buffer::AudioBuffer;
use crate::context::AudioContextLatencyCategory;
use crate::context::AudioContextOptions;
use crate::io::microphone::MicrophoneRender;
use crate::media_devices::{MediaDeviceInfo, MediaDeviceInfoKind};
use crate::render::RenderThread;
use crate::stats::AudioStats;
use crate::{AtomicF64, MAX_CHANNELS};
fn get_host() -> BackendResult<cpal::Host> {
#[cfg(feature = "cpal-jack")]
{
if let Some(jack_id) = cpal::available_hosts()
.into_iter()
.find(|id| *id == cpal::HostId::Jack)
{
let jack_host = cpal::host_from_id(jack_id).map_err(|e| {
map_cpal_backend_error(AudioBackendErrorKind::BackendSpecific, "jack_host", e)
})?;
let host = match jack_host.devices() {
Ok(devices) => {
if devices.count() == 0 {
log::warn!("No jack devices found, fallback to default host");
cpal::default_host()
} else {
jack_host
}
}
Err(_) => cpal::default_host(),
};
return Ok(host);
}
}
Ok(cpal::default_host())
}
fn map_cpal_backend_error(
kind: AudioBackendErrorKind,
operation: &'static str,
err: impl std::fmt::Display,
) -> AudioBackendError {
AudioBackendError::new(kind, "cpal", operation, err.to_string())
}
fn map_cpal_build_error(operation: &'static str, err: BuildStreamError) -> AudioBackendError {
let kind = match err {
BuildStreamError::DeviceNotAvailable => AudioBackendErrorKind::DeviceUnavailable,
BuildStreamError::StreamConfigNotSupported => AudioBackendErrorKind::NotSupported,
BuildStreamError::InvalidArgument => AudioBackendErrorKind::InvalidArgument,
BuildStreamError::StreamIdOverflow | BuildStreamError::BackendSpecific { .. } => {
AudioBackendErrorKind::BackendSpecific
}
};
map_cpal_backend_error(kind, operation, err)
}
fn map_cpal_default_config_error(
operation: &'static str,
err: DefaultStreamConfigError,
) -> AudioBackendError {
let kind = match err {
DefaultStreamConfigError::DeviceNotAvailable => AudioBackendErrorKind::DeviceUnavailable,
DefaultStreamConfigError::StreamTypeNotSupported => AudioBackendErrorKind::NotSupported,
DefaultStreamConfigError::BackendSpecific { .. } => AudioBackendErrorKind::BackendSpecific,
};
map_cpal_backend_error(kind, operation, err)
}
fn map_cpal_play_error(operation: &'static str, err: PlayStreamError) -> AudioBackendError {
let kind = match err {
PlayStreamError::DeviceNotAvailable => AudioBackendErrorKind::DeviceUnavailable,
PlayStreamError::BackendSpecific { .. } => AudioBackendErrorKind::BackendSpecific,
};
map_cpal_backend_error(kind, operation, err)
}
fn map_cpal_pause_error(operation: &'static str, err: PauseStreamError) -> AudioBackendError {
let kind = match err {
PauseStreamError::DeviceNotAvailable => AudioBackendErrorKind::DeviceUnavailable,
PauseStreamError::BackendSpecific { .. } => AudioBackendErrorKind::BackendSpecific,
};
map_cpal_backend_error(kind, operation, err)
}
fn map_cpal_devices_error(operation: &'static str, err: DevicesError) -> AudioBackendError {
map_cpal_backend_error(AudioBackendErrorKind::BackendSpecific, operation, err)
}
fn map_cpal_device_name_error(operation: &'static str, err: DeviceNameError) -> AudioBackendError {
map_cpal_backend_error(AudioBackendErrorKind::BackendSpecific, operation, err)
}
#[derive(Clone)]
#[allow(unused)]
pub(crate) struct CpalBackend {
stream: Arc<Mutex<Option<Stream>>>,
output_latency: Arc<AtomicF64>,
sample_rate: f32,
number_of_channels: usize,
sink_id: String,
}
impl AudioBackendManager for CpalBackend {
fn build_output(
options: AudioContextOptions,
render_thread_init: RenderThreadInit,
) -> BackendResult<Self>
where
Self: Sized,
{
let host = get_host()?;
log::info!("Audio Output Host: cpal {:?}", host.id());
let RenderThreadInit {
state,
startup_pending,
frames_played,
stats,
ctrl_msg_recv,
event_send,
} = render_thread_init;
let device = if options.sink_id.is_empty() {
host.default_output_device().ok_or_else(|| {
AudioBackendError::new(
AudioBackendErrorKind::DeviceUnavailable,
"cpal",
"default_output_device",
"No output device available",
)
})?
} else {
Self::enumerate_devices_sync()?
.into_iter()
.find(|e| e.device_id() == options.sink_id)
.and_then(|e| e.device().downcast::<cpal::Device>().ok().map(|e| *e))
.or_else(|| host.default_output_device())
.ok_or_else(|| {
AudioBackendError::new(
AudioBackendErrorKind::DeviceUnavailable,
"cpal",
"select_output_device",
"No output device available",
)
})?
};
log::info!(
"Output device: {:?}",
device
.description()
.map_err(|e| map_cpal_device_name_error("output_device_name", e))?
.to_string()
);
let default_device_config = device
.default_output_config()
.map_err(|e| map_cpal_default_config_error("default_output_config", e))?;
let number_of_channels = usize::from(default_device_config.channels()).min(MAX_CHANNELS);
let mut preferred_config: StreamConfig = default_device_config.clone().into();
preferred_config.channels = number_of_channels as u16;
if let Some(sample_rate) = options.sample_rate {
crate::assert_valid_sample_rate(sample_rate);
preferred_config.sample_rate = sample_rate as u32;
}
let buffer_size = super::buffer_size_for_latency_category(
options.latency_hint,
preferred_config.sample_rate as f32,
) as u32;
let clamped_buffer_size: u32 = match default_device_config.buffer_size() {
SupportedBufferSize::Unknown => buffer_size,
SupportedBufferSize::Range { min, max } => buffer_size.clamp(*min, *max),
};
preferred_config.buffer_size = cpal::BufferSize::Fixed(clamped_buffer_size);
if cfg!(target_os = "android") {
if let AudioContextLatencyCategory::Balanced
| AudioContextLatencyCategory::Interactive = options.latency_hint
{
preferred_config.buffer_size = cpal::BufferSize::Default;
}
}
let mut sample_rate = preferred_config.sample_rate as f32;
let output_latency = Arc::new(AtomicF64::new(0.));
let mut renderer = RenderThread::new(
sample_rate,
preferred_config.channels as usize,
ctrl_msg_recv.clone(),
Arc::clone(&state),
Arc::clone(&frames_played),
stats.clone(),
event_send.clone(),
);
renderer.set_startup_pending(Arc::clone(&startup_pending));
renderer.spawn_garbage_collector_thread();
log::debug!(
"Attempt output stream with preferred config: {:?}",
&preferred_config
);
let spawned = spawn_output_stream(
&device,
default_device_config.sample_format(),
&preferred_config,
renderer,
Arc::clone(&output_latency),
stats.clone(),
);
let stream = match spawned {
Ok(stream) => {
log::debug!("Output stream set up successfully");
stream
}
Err(e) => {
log::warn!("Output stream build failed with preferred config: {}", e);
let mut supported_config: StreamConfig = default_device_config.clone().into();
supported_config.channels = number_of_channels as u16;
sample_rate = supported_config.sample_rate as f32;
log::debug!(
"Attempt output stream with fallback config: {:?}",
&supported_config
);
let mut renderer = RenderThread::new(
sample_rate,
supported_config.channels as usize,
ctrl_msg_recv,
state,
frames_played,
stats.clone(),
event_send,
);
renderer.set_startup_pending(startup_pending);
renderer.spawn_garbage_collector_thread();
let spawned = spawn_output_stream(
&device,
default_device_config.sample_format(),
&supported_config,
renderer,
Arc::clone(&output_latency),
stats.clone(),
);
spawned.map_err(|e| map_cpal_build_error("build_fallback_output_stream", e))?
}
};
stream
.play()
.map_err(|e| map_cpal_play_error("play_output_stream", e))?;
Ok(CpalBackend {
stream: Arc::new(Mutex::new(Some(stream))),
output_latency,
sample_rate,
number_of_channels,
sink_id: options.sink_id,
})
}
fn build_input(
options: AudioContextOptions,
number_of_channels: Option<u32>,
) -> BackendResult<(Self, Receiver<AudioBuffer>)>
where
Self: Sized,
{
let host = get_host()?;
log::info!("Audio Input Host: cpal {:?}", host.id());
let device = if options.sink_id.is_empty() {
host.default_input_device().ok_or_else(|| {
AudioBackendError::new(
AudioBackendErrorKind::DeviceUnavailable,
"cpal",
"default_input_device",
"No input device available",
)
})?
} else {
Self::enumerate_devices_sync()?
.into_iter()
.find(|e| e.device_id() == options.sink_id)
.and_then(|e| e.device().downcast::<cpal::Device>().ok().map(|e| *e))
.or_else(|| host.default_input_device())
.ok_or_else(|| {
AudioBackendError::new(
AudioBackendErrorKind::DeviceUnavailable,
"cpal",
"select_input_device",
"No input device available",
)
})?
};
log::info!(
"Input device: {:?}",
device
.description()
.map_err(|e| map_cpal_device_name_error("input_device_name", e))?
.to_string()
);
let supported = device
.default_input_config()
.map_err(|e| map_cpal_default_config_error("default_input_config", e))?;
let mut preferred: StreamConfig = supported.clone().into();
if let Some(number_of_channels) = number_of_channels {
preferred.channels = number_of_channels as u16;
}
if let Some(sample_rate) = options.sample_rate {
crate::assert_valid_sample_rate(sample_rate);
preferred.sample_rate = sample_rate as u32;
}
let buffer_size = super::buffer_size_for_latency_category(
options.latency_hint,
preferred.sample_rate as f32,
) as u32;
let clamped_buffer_size: u32 = match supported.buffer_size() {
SupportedBufferSize::Unknown => buffer_size,
SupportedBufferSize::Range { min, max } => buffer_size.clamp(*min, *max),
};
preferred.buffer_size = cpal::BufferSize::Fixed(clamped_buffer_size);
let mut sample_rate = preferred.sample_rate as f32;
let mut number_of_channels = preferred.channels as usize;
let smoothing = 3; let (sender, mut receiver) = crossbeam_channel::bounded(smoothing);
let renderer = MicrophoneRender::new(number_of_channels, sample_rate, sender);
log::debug!(
"Attempt input stream with preferred config: {:?}",
&preferred
);
let spawned = spawn_input_stream(&device, supported.sample_format(), &preferred, renderer);
let stream = match spawned {
Ok(stream) => {
log::debug!("Input stream set up successfully");
stream
}
Err(e) => {
log::warn!("Output stream build failed with preferred config: {}", e);
let supported_config: StreamConfig = supported.clone().into();
number_of_channels = usize::from(supported_config.channels);
sample_rate = supported_config.sample_rate as f32;
log::debug!(
"Attempt output stream with fallback config: {:?}",
&supported_config
);
let (sender, receiver2) = crossbeam_channel::bounded(smoothing);
receiver = receiver2;
let renderer = MicrophoneRender::new(number_of_channels, sample_rate, sender);
let spawned = spawn_input_stream(
&device,
supported.sample_format(),
&supported_config,
renderer,
);
spawned.map_err(|e| map_cpal_build_error("build_fallback_input_stream", e))?
}
};
stream
.play()
.map_err(|e| map_cpal_play_error("play_input_stream", e))?;
let backend = CpalBackend {
stream: Arc::new(Mutex::new(Some(stream))),
output_latency: Arc::new(AtomicF64::new(0.)),
sample_rate,
number_of_channels,
sink_id: options.sink_id,
};
Ok((backend, receiver))
}
fn resume(&self) -> BackendResult<bool> {
if let Some(s) = self.stream.lock().unwrap().as_ref() {
s.play()
.map(|_| true)
.map_err(|e| map_cpal_play_error("resume", e))?;
return Ok(true);
}
Ok(false)
}
fn suspend(&self) -> BackendResult<bool> {
if let Some(s) = self.stream.lock().unwrap().as_ref() {
s.pause()
.map(|_| true)
.map_err(|e| map_cpal_pause_error("suspend", e))?;
return Ok(true);
}
Ok(false)
}
fn close(&self) -> BackendResult<()> {
self.stream.lock().unwrap().take(); Ok(())
}
fn sample_rate(&self) -> f32 {
self.sample_rate
}
fn number_of_channels(&self) -> usize {
self.number_of_channels
}
fn output_latency(&self) -> BackendResult<f64> {
Ok(self.output_latency.load(Ordering::Relaxed))
}
fn sink_id(&self) -> &str {
self.sink_id.as_str()
}
fn enumerate_devices_sync() -> BackendResult<Vec<MediaDeviceInfo>>
where
Self: Sized,
{
let host = get_host()?;
let input_devices = host
.input_devices()
.map_err(|e| map_cpal_devices_error("enumerate_input_devices", e))?
.filter_map(|d| {
let num_channels = d.default_input_config().ok()?.channels();
Some((d, MediaDeviceInfoKind::AudioInput, num_channels))
});
let output_devices = host
.output_devices()
.map_err(|e| map_cpal_devices_error("enumerate_output_devices", e))?
.filter_map(|d| {
let num_channels = d.default_output_config().ok()?.channels();
Some((d, MediaDeviceInfoKind::AudioOutput, num_channels))
});
let mut list = Vec::<MediaDeviceInfo>::new();
for (device, kind, num_channels) in input_devices.chain(output_devices) {
let mut index = 0;
loop {
let device_id = crate::media_devices::DeviceId::as_string(
kind,
"cpal".to_string(),
device
.description()
.map_err(|e| map_cpal_device_name_error("device_name", e))?
.to_string(),
num_channels,
index,
);
if !list.iter().any(|d| d.device_id() == device_id) {
let device = MediaDeviceInfo::new(
device_id,
None,
kind,
device
.description()
.map_err(|e| map_cpal_device_name_error("device_name", e))?
.to_string(),
Box::new(device),
);
list.push(device);
break;
} else {
index += 1;
}
}
}
Ok(list)
}
}
fn latency_in_seconds(infos: &OutputCallbackInfo) -> f64 {
let timestamp = infos.timestamp();
timestamp
.playback
.duration_since(×tamp.callback)
.map(|delta| delta.as_secs() as f64 + delta.subsec_nanos() as f64 * 1e-9)
.unwrap_or(0.0)
}
fn spawn_output_stream(
device: &Device,
sample_format: SampleFormat,
config: &StreamConfig,
mut render: RenderThread,
output_latency: Arc<AtomicF64>,
stats: AudioStats,
) -> Result<Stream, BuildStreamError> {
let err_fn = |err| log::error!("an error occurred on the output audio stream: {}", err);
match sample_format {
SampleFormat::F32 => device.build_output_stream(
config,
move |d: &mut [f32], i: &OutputCallbackInfo| {
render.render(d);
let latency = latency_in_seconds(i);
output_latency.store(latency, Ordering::Relaxed);
stats.record_latency_seconds(latency);
},
err_fn,
None,
),
SampleFormat::F64 => device.build_output_stream(
config,
move |d: &mut [f64], i: &OutputCallbackInfo| {
render.render(d);
let latency = latency_in_seconds(i);
output_latency.store(latency, Ordering::Relaxed);
stats.record_latency_seconds(latency);
},
err_fn,
None,
),
SampleFormat::U8 => device.build_output_stream(
config,
move |d: &mut [u8], i: &OutputCallbackInfo| {
render.render(d);
let latency = latency_in_seconds(i);
output_latency.store(latency, Ordering::Relaxed);
stats.record_latency_seconds(latency);
},
err_fn,
None,
),
SampleFormat::U16 => device.build_output_stream(
config,
move |d: &mut [u16], i: &OutputCallbackInfo| {
render.render(d);
let latency = latency_in_seconds(i);
output_latency.store(latency, Ordering::Relaxed);
stats.record_latency_seconds(latency);
},
err_fn,
None,
),
SampleFormat::U32 => device.build_output_stream(
config,
move |d: &mut [u32], i: &OutputCallbackInfo| {
render.render(d);
let latency = latency_in_seconds(i);
output_latency.store(latency, Ordering::Relaxed);
stats.record_latency_seconds(latency);
},
err_fn,
None,
),
SampleFormat::U64 => device.build_output_stream(
config,
move |d: &mut [u64], i: &OutputCallbackInfo| {
render.render(d);
let latency = latency_in_seconds(i);
output_latency.store(latency, Ordering::Relaxed);
stats.record_latency_seconds(latency);
},
err_fn,
None,
),
SampleFormat::I8 => device.build_output_stream(
config,
move |d: &mut [i8], i: &OutputCallbackInfo| {
render.render(d);
let latency = latency_in_seconds(i);
output_latency.store(latency, Ordering::Relaxed);
stats.record_latency_seconds(latency);
},
err_fn,
None,
),
SampleFormat::I16 => device.build_output_stream(
config,
move |d: &mut [i16], i: &OutputCallbackInfo| {
render.render(d);
let latency = latency_in_seconds(i);
output_latency.store(latency, Ordering::Relaxed);
stats.record_latency_seconds(latency);
},
err_fn,
None,
),
SampleFormat::I32 => device.build_output_stream(
config,
move |d: &mut [i32], i: &OutputCallbackInfo| {
render.render(d);
let latency = latency_in_seconds(i);
output_latency.store(latency, Ordering::Relaxed);
stats.record_latency_seconds(latency);
},
err_fn,
None,
),
SampleFormat::I64 => device.build_output_stream(
config,
move |d: &mut [i64], i: &OutputCallbackInfo| {
render.render(d);
let latency = latency_in_seconds(i);
output_latency.store(latency, Ordering::Relaxed);
stats.record_latency_seconds(latency);
},
err_fn,
None,
),
_ => Err(BuildStreamError::StreamConfigNotSupported),
}
}
fn spawn_input_stream(
device: &Device,
sample_format: SampleFormat,
config: &StreamConfig,
render: MicrophoneRender,
) -> Result<Stream, BuildStreamError> {
let err_fn = |err| log::error!("an error occurred on the input audio stream: {}", err);
match sample_format {
SampleFormat::F32 => {
device.build_input_stream(config, move |d: &[f32], _c| render.render(d), err_fn, None)
}
SampleFormat::F64 => {
device.build_input_stream(config, move |d: &[f64], _c| render.render(d), err_fn, None)
}
SampleFormat::U8 => {
device.build_input_stream(config, move |d: &[u8], _c| render.render(d), err_fn, None)
}
SampleFormat::U16 => {
device.build_input_stream(config, move |d: &[u16], _c| render.render(d), err_fn, None)
}
SampleFormat::U32 => {
device.build_input_stream(config, move |d: &[u32], _c| render.render(d), err_fn, None)
}
SampleFormat::U64 => {
device.build_input_stream(config, move |d: &[u64], _c| render.render(d), err_fn, None)
}
SampleFormat::I8 => {
device.build_input_stream(config, move |d: &[i8], _c| render.render(d), err_fn, None)
}
SampleFormat::I16 => {
device.build_input_stream(config, move |d: &[i16], _c| render.render(d), err_fn, None)
}
SampleFormat::I32 => {
device.build_input_stream(config, move |d: &[i32], _c| render.render(d), err_fn, None)
}
SampleFormat::I64 => {
device.build_input_stream(config, move |d: &[i64], _c| render.render(d), err_fn, None)
}
_ => Err(BuildStreamError::StreamConfigNotSupported),
}
}