extern crate alsa;
#[cfg(feature = "realtime")]
extern crate alsa_sys;
extern crate libc;
use std::{
collections::HashMap,
fmt,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread::{self, JoinHandle},
time::Duration,
vec::IntoIter as VecIntoIter,
};
use self::alsa::poll::Descriptors;
pub use self::enumerate::Devices;
use crate::{
host::{
equilibrium::{fill_equilibrium, DSD_EQUILIBRIUM_BYTE, U8_EQUILIBRIUM_BYTE},
frames_to_duration,
latch::Latch,
},
iter::{SupportedInputConfigs, SupportedOutputConfigs},
traits::{DeviceTrait, HostTrait, StreamTrait},
BufferSize, ChannelCount, Data, DeviceDescription, DeviceDescriptionBuilder, DeviceDirection,
DeviceId, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp,
OutputCallbackInfo, OutputStreamTimestamp, SampleFormat, SampleRate, StreamConfig,
StreamInstant, SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange,
COMMON_SAMPLE_RATES,
};
mod enumerate;
const DEFAULT_DEVICE: &str = "default";
const DEFAULT_PERIODS: alsa::pcm::Frames = 2;
static ALSA_OPEN_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
fn open_pcm(pcm_id: &str, direction: alsa::Direction) -> Result<alsa::pcm::PCM, Error> {
let _guard = ALSA_OPEN_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
alsa::pcm::PCM::new(pcm_id, direction, true).map_err(|e| {
let e = Error::from(e);
if e.kind() == ErrorKind::UnsupportedConfig {
let dir = match direction {
alsa::Direction::Capture => "input",
alsa::Direction::Playback => "output",
};
Error::with_message(
ErrorKind::UnsupportedOperation,
format!("Device does not support {dir}"),
)
} else {
e
}
})
}
const LIBC_ENOTSUPP: libc::c_int = 524;
#[derive(Debug, Clone)]
pub struct Host {
inner: Arc<AlsaContext>,
}
impl Host {
pub fn new() -> Result<Self, Error> {
let inner = AlsaContext::new().map_err(|e| {
Error::with_message(
ErrorKind::HostUnavailable,
format!("ALSA is not available: {e}"),
)
})?;
Ok(Self {
inner: Arc::new(inner),
})
}
fn default_device(&self) -> Device {
Device {
pcm_id: DEFAULT_DEVICE.to_owned(),
desc: Some("Default Audio Device".to_owned()),
direction: DeviceDirection::Unknown,
_context: self.inner.clone(),
}
}
}
impl HostTrait for Host {
type Devices = Devices;
type Device = Device;
fn is_available() -> bool {
true
}
fn devices(&self) -> Result<Self::Devices, Error> {
self.enumerate_devices()
}
fn device_by_id(&self, id: &DeviceId) -> Option<Self::Device> {
let canonical_id = DeviceId::new(id.host(), canonical_pcm_id(id.id()));
self.devices()
.ok()?
.find(|d| d.id().ok().as_ref() == Some(&canonical_id))
}
fn default_input_device(&self) -> Option<Self::Device> {
Some(self.default_device())
}
fn default_output_device(&self) -> Option<Self::Device> {
Some(self.default_device())
}
}
static ALSA_CONTEXT_COUNT: Mutex<usize> = Mutex::new(0);
#[derive(Debug)]
pub(super) struct AlsaContext;
impl AlsaContext {
fn new() -> Result<Self, alsa::Error> {
let mut count = ALSA_CONTEXT_COUNT.lock().unwrap_or_else(|e| e.into_inner());
if *count == 0 {
alsa::config::update()?;
}
*count += 1;
Ok(Self)
}
}
impl Drop for AlsaContext {
fn drop(&mut self) {
let mut count = ALSA_CONTEXT_COUNT.lock().unwrap_or_else(|e| e.into_inner());
*count = count.saturating_sub(1);
if *count == 0 {
let _ = alsa::config::update_free_global();
}
}
}
impl DeviceTrait for Device {
type SupportedInputConfigs = SupportedInputConfigs;
type SupportedOutputConfigs = SupportedOutputConfigs;
type Stream = Stream;
fn description(&self) -> Result<DeviceDescription, Error> {
Self::description(self)
}
fn id(&self) -> Result<DeviceId, Error> {
Self::id(self)
}
fn supports_input(&self) -> bool {
matches!(
self.direction,
DeviceDirection::Input | DeviceDirection::Duplex
)
}
fn supports_output(&self) -> bool {
matches!(
self.direction,
DeviceDirection::Output | DeviceDirection::Duplex
)
}
fn supported_input_configs(&self) -> Result<Self::SupportedInputConfigs, Error> {
Self::supported_input_configs(self)
}
fn supported_output_configs(&self) -> Result<Self::SupportedOutputConfigs, Error> {
Self::supported_output_configs(self)
}
fn default_input_config(&self) -> Result<SupportedStreamConfig, Error> {
Self::default_input_config(self)
}
fn default_output_config(&self) -> Result<SupportedStreamConfig, Error> {
Self::default_output_config(self)
}
fn build_input_stream_raw<D, E>(
&self,
conf: StreamConfig,
sample_format: SampleFormat,
data_callback: D,
error_callback: E,
timeout: Option<Duration>,
) -> Result<Self::Stream, Error>
where
D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
E: FnMut(Error) + Send + 'static,
{
let stream_inner =
self.build_stream_inner(conf, sample_format, alsa::Direction::Capture)?;
let stream = Self::Stream::new_input(
Arc::new(stream_inner),
data_callback,
error_callback,
timeout,
);
Ok(stream)
}
fn build_output_stream_raw<D, E>(
&self,
conf: StreamConfig,
sample_format: SampleFormat,
data_callback: D,
error_callback: E,
timeout: Option<Duration>,
) -> Result<Self::Stream, Error>
where
D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
E: FnMut(Error) + Send + 'static,
{
let stream_inner =
self.build_stream_inner(conf, sample_format, alsa::Direction::Playback)?;
let stream = Self::Stream::new_output(
Arc::new(stream_inner),
data_callback,
error_callback,
timeout,
);
Ok(stream)
}
}
#[derive(Debug)]
struct TriggerSender(libc::c_int);
#[derive(Debug)]
struct TriggerReceiver(libc::c_int);
impl TriggerSender {
fn wakeup(&self) {
let buf = 1u64;
loop {
let ret = unsafe { libc::write(self.0, &buf as *const u64 as *const _, 8) };
if ret == 8 {
return;
}
assert_eq!(ret, -1, "wakeup: unexpected return value {ret}");
let err = std::io::Error::last_os_error();
if err.kind() != std::io::ErrorKind::Interrupted {
panic!("wakeup: {err}");
}
}
}
}
impl TriggerReceiver {
fn clear_pipe(&self) {
let mut out = 0u64;
loop {
let ret = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, 8) };
if ret == 8 {
return;
}
assert_eq!(ret, -1, "clear_pipe: unexpected return value {ret}");
let err = std::io::Error::last_os_error();
if err.kind() != std::io::ErrorKind::Interrupted {
panic!("clear_pipe: {err}");
}
}
}
}
fn trigger() -> (TriggerSender, Arc<TriggerReceiver>) {
let mut fds = [0, 0];
match unsafe { libc::pipe(fds.as_mut_ptr()) } {
0 => (TriggerSender(fds[1]), Arc::new(TriggerReceiver(fds[0]))),
_ => panic!("Could not create pipe"),
}
}
impl Drop for TriggerSender {
fn drop(&mut self) {
unsafe {
libc::close(self.0);
}
}
}
impl Drop for TriggerReceiver {
fn drop(&mut self) {
unsafe {
libc::close(self.0);
}
}
}
#[derive(Clone, Debug)]
pub struct Device {
pcm_id: String,
desc: Option<String>,
direction: DeviceDirection,
_context: Arc<AlsaContext>,
}
impl Device {
fn build_stream_inner(
&self,
conf: StreamConfig,
sample_format: SampleFormat,
stream_type: alsa::Direction,
) -> Result<StreamInner, Error> {
crate::validate_stream_config(&conf)?;
let handle = open_pcm(&self.pcm_id, stream_type)?;
let hw_params = set_hw_params_from_format(&handle, conf, sample_format)?;
let (buffer_size, period_size) = set_sw_params_from_format(&handle, stream_type)?;
if buffer_size == 0 || period_size == 0 {
return Err(ErrorKind::DeviceNotAvailable.into());
}
handle.prepare()?;
if handle.count() == 0 {
return Err(ErrorKind::DeviceNotAvailable.into());
}
let creation_ts = handle.status()?.get_htstamp();
let timestamp_mode = if creation_ts.tv_sec == 0 && creation_ts.tv_nsec == 0 {
TimestampMode::CreationInstant
} else if hw_params.supports_audio_ts_type(alsa::pcm::AudioTstampType::LinkSynchronized) {
TimestampMode::AudioLink
} else {
TimestampMode::SystemClock
};
drop(hw_params);
let period_size = period_size as usize;
let frame_size = sample_format.sample_size() * conf.channels as usize;
let stream_inner = StreamInner {
dropping: AtomicBool::new(false),
direction: stream_type.into(),
handle,
sample_format,
sample_rate: conf.sample_rate,
frame_size,
period_size,
period_samples: period_size * conf.channels as usize,
equilibrium: EquilibriumFill::new(sample_format, period_size * frame_size),
timestamp_mode,
creation_ts,
creation_instant: std::time::Instant::now(),
_context: self._context.clone(),
};
Ok(stream_inner)
}
fn description(&self) -> Result<DeviceDescription, Error> {
let name = self
.desc
.as_ref()
.and_then(|desc| desc.lines().next())
.unwrap_or(self.pcm_id.as_str());
let mut builder = DeviceDescriptionBuilder::new(name)
.driver(self.pcm_id.as_str())
.direction(self.direction);
if let Some(ref desc) = self.desc {
builder = builder.extended(desc.lines().map(|l| l.trim()).filter(|l| !l.is_empty()));
}
Ok(builder.build())
}
fn id(&self) -> Result<DeviceId, Error> {
Ok(DeviceId::new(crate::platform::HostId::Alsa, &self.pcm_id))
}
fn supported_configs(
&self,
stream_t: alsa::Direction,
) -> Result<VecIntoIter<SupportedStreamConfigRange>, Error> {
let pcm = open_pcm(&self.pcm_id, stream_t)?;
let hw_params = alsa::pcm::HwParams::any(&pcm)?;
const FORMATS: [(SampleFormat, alsa::pcm::Format); 23] = [
(SampleFormat::I8, alsa::pcm::Format::S8),
(SampleFormat::U8, alsa::pcm::Format::U8),
(SampleFormat::I16, alsa::pcm::Format::S16LE),
(SampleFormat::I16, alsa::pcm::Format::S16BE),
(SampleFormat::U16, alsa::pcm::Format::U16LE),
(SampleFormat::U16, alsa::pcm::Format::U16BE),
(SampleFormat::I24, alsa::pcm::Format::S24LE),
(SampleFormat::I24, alsa::pcm::Format::S24BE),
(SampleFormat::U24, alsa::pcm::Format::U24LE),
(SampleFormat::U24, alsa::pcm::Format::U24BE),
(SampleFormat::I32, alsa::pcm::Format::S32LE),
(SampleFormat::I32, alsa::pcm::Format::S32BE),
(SampleFormat::U32, alsa::pcm::Format::U32LE),
(SampleFormat::U32, alsa::pcm::Format::U32BE),
(SampleFormat::F32, alsa::pcm::Format::FloatLE),
(SampleFormat::F32, alsa::pcm::Format::FloatBE),
(SampleFormat::F64, alsa::pcm::Format::Float64LE),
(SampleFormat::F64, alsa::pcm::Format::Float64BE),
(SampleFormat::DsdU8, alsa::pcm::Format::DSDU8),
(SampleFormat::DsdU16, alsa::pcm::Format::DSDU16LE),
(SampleFormat::DsdU16, alsa::pcm::Format::DSDU16BE),
(SampleFormat::DsdU32, alsa::pcm::Format::DSDU32LE),
(SampleFormat::DsdU32, alsa::pcm::Format::DSDU32BE),
];
let min_rate = hw_params.get_rate_min()?;
let max_rate = hw_params.get_rate_max()?;
let sample_rates = if min_rate == max_rate || hw_params.test_rate(min_rate + 1).is_ok() {
vec![(min_rate, max_rate)]
} else {
let mut probe: Vec<SampleRate> = COMMON_SAMPLE_RATES.to_vec();
probe.push(min_rate);
probe.push(max_rate);
probe.sort_unstable();
probe.dedup();
probe
.into_iter()
.filter(|&r| (min_rate..=max_rate).contains(&r) && hw_params.test_rate(r).is_ok())
.map(|r| (r, r))
.collect()
};
let min_channels = hw_params.get_channels_min()?;
const CHANNEL_ENUM_CAP: u32 = 64;
let max_channels = hw_params
.get_channels_max()?
.min(CHANNEL_ENUM_CAP)
.min(ChannelCount::MAX as u32);
let supported_channels: Vec<ChannelCount> =
if min_channels == max_channels || hw_params.test_channels(min_channels + 1).is_ok() {
(min_channels..=max_channels)
.map(|c| c as ChannelCount)
.collect()
} else {
(min_channels..=max_channels)
.filter(|&c| hw_params.test_channels(c).is_ok())
.map(|c| c as ChannelCount)
.collect()
};
let mut output =
Vec::with_capacity(FORMATS.len() * supported_channels.len() * sample_rates.len());
let mut seen_formats: Vec<SampleFormat> = Vec::with_capacity(FORMATS.len());
let mut buffer_size_cache: HashMap<(ChannelCount, u32), SupportedBufferSize> =
HashMap::with_capacity(supported_channels.len() * 4);
for &(sample_format, alsa_format) in FORMATS.iter() {
if seen_formats.contains(&sample_format) || hw_params.test_format(alsa_format).is_err()
{
continue;
}
seen_formats.push(sample_format);
let width = alsa_format.physical_width().unwrap_or(0) as u32;
for &channels in &supported_channels {
let buffer_size =
*buffer_size_cache
.entry((channels, width))
.or_insert_with(|| {
supported_period_size_range(&hw_params, alsa_format, channels)
});
for &(min_rate, max_rate) in sample_rates.iter() {
output.push(SupportedStreamConfigRange {
channels,
min_sample_rate: min_rate,
max_sample_rate: max_rate,
buffer_size,
sample_format,
});
}
}
}
Ok(output.into_iter())
}
fn supported_input_configs(&self) -> Result<SupportedInputConfigs, Error> {
self.supported_configs(alsa::Direction::Capture)
}
fn supported_output_configs(&self) -> Result<SupportedOutputConfigs, Error> {
self.supported_configs(alsa::Direction::Playback)
}
fn default_config(&self, stream_t: alsa::Direction) -> Result<SupportedStreamConfig, Error> {
let mut formats: Vec<_> = match self.supported_configs(stream_t) {
Err(err) => return Err(err),
Ok(fmts) => fmts.collect(),
};
formats.sort_by(|a, b| a.cmp_default_heuristics(b));
match formats.into_iter().next_back() {
Some(f) => Ok(f
.try_with_standard_sample_rate()
.unwrap_or_else(|| f.with_max_sample_rate())),
None => Err(Error::with_message(
ErrorKind::UnsupportedConfig,
"No supported configuration",
)),
}
}
fn default_input_config(&self) -> Result<SupportedStreamConfig, Error> {
self.default_config(alsa::Direction::Capture)
}
fn default_output_config(&self) -> Result<SupportedStreamConfig, Error> {
self.default_config(alsa::Direction::Playback)
}
}
impl PartialEq for Device {
fn eq(&self, other: &Self) -> bool {
self.pcm_id == other.pcm_id
}
}
impl Eq for Device {}
impl fmt::Display for Device {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let desc = self.description().map_err(|_| fmt::Error)?;
f.write_str(desc.name())
}
}
impl std::hash::Hash for Device {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.pcm_id.hash(state);
}
}
#[derive(Debug)]
enum EquilibriumFill {
Byte(u8),
Template(Box<[u8]>),
}
impl EquilibriumFill {
fn new(sample_format: SampleFormat, period_bytes: usize) -> Self {
if sample_format.is_int() || sample_format.is_float() {
Self::Byte(0)
} else if sample_format == SampleFormat::U8 {
Self::Byte(U8_EQUILIBRIUM_BYTE)
} else if sample_format.is_dsd() {
Self::Byte(DSD_EQUILIBRIUM_BYTE)
} else {
debug_assert!(sample_format.is_uint());
let mut template = vec![0u8; period_bytes].into_boxed_slice();
fill_equilibrium(&mut template, sample_format);
Self::Template(template)
}
}
#[inline]
fn fill(&self, buffer: &mut [u8]) {
match self {
Self::Byte(b) => buffer.fill(*b),
Self::Template(t) => buffer.copy_from_slice(t),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TimestampMode {
CreationInstant,
SystemClock,
AudioLink,
}
#[derive(Debug)]
struct StreamInner {
dropping: AtomicBool,
direction: DeviceDirection,
handle: alsa::pcm::PCM,
sample_format: SampleFormat,
sample_rate: SampleRate,
frame_size: usize,
period_size: usize,
period_samples: usize,
equilibrium: EquilibriumFill,
timestamp_mode: TimestampMode,
creation_ts: libc::timespec,
creation_instant: std::time::Instant,
_context: Arc<AlsaContext>,
}
unsafe impl Sync for StreamInner {}
#[derive(Debug)]
pub struct Stream {
thread: Option<JoinHandle<()>>,
inner: Arc<StreamInner>,
trigger: TriggerSender,
_rx: Arc<TriggerReceiver>,
latch: Latch,
}
crate::assert_stream_send!(Stream);
crate::assert_stream_sync!(Stream);
impl StreamInner {
#[inline]
fn callback_instant(&self, status: &alsa::pcm::Status) -> StreamInstant {
match self.timestamp_mode {
TimestampMode::CreationInstant => {
let d = std::time::Instant::now().duration_since(self.creation_instant);
StreamInstant::new(d.as_secs(), d.subsec_nanos())
}
TimestampMode::SystemClock => {
htstamp_elapsed(status, self.creation_ts)
}
TimestampMode::AudioLink => {
if status.get_state() != alsa::pcm::State::Running {
htstamp_elapsed(status, self.creation_ts)
} else {
let trigger_ts = status.get_trigger_htstamp();
let trigger_offset = timespec_diff_nanos(trigger_ts, self.creation_ts);
if trigger_offset < 0 {
htstamp_elapsed(status, self.creation_ts)
} else {
let audio_ts = status.get_audio_htstamp();
let nanos = timespec_to_nanos(audio_ts) + trigger_offset;
StreamInstant::from_nanos(nanos as u64)
}
}
}
}
}
#[cfg(feature = "realtime")]
fn is_rt_eligible(&self) -> bool {
use alsa_sys::*;
let raw = unsafe {
(&self.handle as *const alsa::pcm::PCM)
.cast::<*mut snd_pcm_t>()
.read()
};
let pcm_type = unsafe { snd_pcm_type(raw) };
matches!(
pcm_type,
SND_PCM_TYPE_HW
| SND_PCM_TYPE_LINEAR
| SND_PCM_TYPE_ALAW
| SND_PCM_TYPE_MULAW
| SND_PCM_TYPE_ADPCM
| SND_PCM_TYPE_LINEAR_FLOAT
| SND_PCM_TYPE_IEC958
)
}
}
struct StreamWorkerContext {
descriptors: Box<[libc::pollfd]>,
transfer_buffer: Box<[u8]>,
poll_timeout: i32,
}
impl StreamWorkerContext {
fn new(poll_timeout: &Option<Duration>, stream: &StreamInner, rx: &TriggerReceiver) -> Self {
let poll_timeout: i32 = if let Some(d) = poll_timeout {
d.as_millis().min(i32::MAX as u128) as i32
} else {
-1 };
let transfer_buffer = vec![0u8; stream.period_size * stream.frame_size].into_boxed_slice();
let num_descriptors = stream.handle.count();
let total_descriptors = 1 + num_descriptors;
let mut descriptors = vec![
libc::pollfd {
fd: 0,
events: 0,
revents: 0
};
total_descriptors
]
.into_boxed_slice();
descriptors[0] = libc::pollfd {
fd: rx.0,
events: libc::POLLIN,
revents: 0,
};
let filled = stream
.handle
.fill(&mut descriptors[1..])
.expect("Failed to fill ALSA descriptors");
debug_assert_eq!(filled, num_descriptors);
Self {
descriptors,
transfer_buffer,
poll_timeout,
}
}
}
fn input_stream_worker(
rx: Arc<TriggerReceiver>,
stream: &StreamInner,
data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
error_callback: &mut (dyn FnMut(Error) + Send + 'static),
timeout: Option<Duration>,
) {
#[cfg(feature = "realtime")]
if stream.is_rt_eligible() {
let period_frames = u32::try_from(stream.period_size).unwrap_or(0);
if let Err(err) = audio_thread_priority::promote_current_thread_to_real_time(
period_frames,
stream.sample_rate,
) {
error_callback(err.into());
}
}
let mut ctxt = StreamWorkerContext::new(&timeout, stream, &rx);
loop {
if stream.dropping.load(Ordering::Acquire) {
return;
}
let result = match poll_for_period(&rx, stream, &mut ctxt) {
Ok(Poll::Pending) => continue,
Ok(Poll::Ready {
status,
delay_frames,
}) => process_input(
stream,
&mut ctxt.transfer_buffer,
status,
delay_frames,
data_callback,
),
Err(err) => Err(err),
};
if let Err(err) = result {
match err.kind() {
ErrorKind::Xrun => {
error_callback(err);
if let Err(err) = stream.handle.prepare() {
error_callback(err.into());
} else if let Err(err) = stream.handle.start() {
error_callback(err.into());
}
}
ErrorKind::DeviceNotAvailable => {
error_callback(err);
return;
}
_ => error_callback(err),
}
}
}
}
fn output_stream_worker(
rx: Arc<TriggerReceiver>,
stream: &StreamInner,
data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
error_callback: &mut (dyn FnMut(Error) + Send + 'static),
timeout: Option<Duration>,
) {
#[cfg(feature = "realtime")]
if stream.is_rt_eligible() {
let period_frames = u32::try_from(stream.period_size).unwrap_or(0);
if let Err(err) = audio_thread_priority::promote_current_thread_to_real_time(
period_frames,
stream.sample_rate,
) {
error_callback(err.into());
}
}
let mut ctxt = StreamWorkerContext::new(&timeout, stream, &rx);
loop {
if stream.dropping.load(Ordering::Acquire) {
return;
}
let result = match poll_for_period(&rx, stream, &mut ctxt) {
Ok(Poll::Pending) => continue,
Ok(Poll::Ready {
status,
delay_frames,
}) => process_output(
stream,
&mut ctxt.transfer_buffer,
status,
delay_frames,
data_callback,
),
Err(err) => Err(err),
};
if let Err(err) = result {
match err.kind() {
ErrorKind::Xrun => {
error_callback(err);
if let Err(err) = stream.handle.prepare() {
error_callback(err.into());
}
}
ErrorKind::DeviceNotAvailable => {
error_callback(err);
return;
}
_ => error_callback(err),
}
}
}
}
fn try_resume(handle: &alsa::PCM) -> Result<Poll, Error> {
let hw_params = handle.hw_params_current()?;
if !hw_params.can_resume() {
return Err(Error::with_message(
ErrorKind::Xrun, "Device does not support suspend/resume",
));
}
match handle.resume() {
Ok(()) => {
if handle
.info()
.map(|i| i.get_stream() == alsa::Direction::Capture)
.unwrap_or(false)
{
if let Err(e) = handle.start() {
if e.errno() != libc::EBUSY {
return Err(e.into());
}
}
}
Ok(Poll::Pending)
}
Err(e) if e.errno() == libc::EAGAIN => Ok(Poll::Pending),
Err(e) if e.errno() == libc::ENOSYS => Err(ErrorKind::Xrun.into()),
Err(e) => Err(e.into()),
}
}
enum Poll {
Pending,
Ready {
status: alsa::pcm::Status,
delay_frames: usize,
},
}
fn poll_for_period(
rx: &TriggerReceiver,
stream: &StreamInner,
ctxt: &mut StreamWorkerContext,
) -> Result<Poll, Error> {
let StreamWorkerContext {
ref mut descriptors,
ref poll_timeout,
..
} = *ctxt;
let res = alsa::poll::poll(descriptors, *poll_timeout)?;
if res == 0 {
match stream.handle.state() {
alsa::pcm::State::Disconnected => {
return Err(Error::with_message(
ErrorKind::DeviceNotAvailable,
"Device disconnected",
));
}
alsa::pcm::State::XRun => {
return Err(ErrorKind::Xrun.into());
}
alsa::pcm::State::Suspended => return try_resume(&stream.handle),
_ => {}
}
return Ok(Poll::Pending);
}
if descriptors[0].revents != 0 {
rx.clear_pipe();
return Ok(Poll::Pending);
}
let revents = stream.handle.revents(&descriptors[1..])?;
if revents.is_empty() {
return Ok(Poll::Pending);
}
if revents.intersects(alsa::poll::Flags::HUP | alsa::poll::Flags::NVAL) {
return Err(Error::with_message(
ErrorKind::DeviceNotAvailable,
"Device disconnected",
));
}
let (avail_frames, delay_frames) = match stream.handle.avail_delay() {
Err(err) if err.errno() == libc::EPIPE => return Err(ErrorKind::Xrun.into()),
Err(err) if err.errno() == libc::ESTRPIPE => return try_resume(&stream.handle),
res => res,
}?;
if avail_frames < stream.period_size as alsa::pcm::Frames {
return Ok(Poll::Pending);
}
let audio_ts_type = match stream.timestamp_mode {
TimestampMode::AudioLink => alsa::pcm::AudioTstampType::LinkSynchronized,
TimestampMode::SystemClock | TimestampMode::CreationInstant => {
alsa::pcm::AudioTstampType::Compat
}
};
let status = alsa::pcm::StatusBuilder::new()
.audio_htstamp_config(audio_ts_type, false)
.build(&stream.handle)?;
Ok(Poll::Ready {
status,
delay_frames: delay_frames.max(0) as usize,
})
}
fn process_input(
stream: &StreamInner,
buffer: &mut [u8],
status: alsa::pcm::Status,
delay_frames: usize,
data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
) -> Result<(), Error> {
let mut frames_read = 0;
while frames_read < stream.period_size {
match stream
.handle
.io_bytes()
.readi(&mut buffer[frames_read * stream.frame_size..])
{
Ok(n) => frames_read += n,
Err(err) if err.errno() == libc::EAGAIN => {
if frames_read == 0 {
return Ok(());
} else {
return Err(ErrorKind::Xrun.into());
}
}
Err(err) if err.errno() == libc::EPIPE => return Err(ErrorKind::Xrun.into()),
Err(err) if err.errno() == libc::ESTRPIPE => {
return try_resume(&stream.handle).map(|_| ());
}
Err(err) => return Err(err.into()),
}
}
let data = buffer.as_mut_ptr() as *mut ();
let data = unsafe { Data::from_parts(data, stream.period_samples, stream.sample_format) };
let callback_instant = stream.callback_instant(&status);
let delay_duration = frames_to_duration(delay_frames as FrameCount, stream.sample_rate);
let capture = callback_instant
.checked_sub(delay_duration)
.unwrap_or(StreamInstant::ZERO);
let timestamp = InputStreamTimestamp {
callback: callback_instant,
capture,
};
let info = InputCallbackInfo { timestamp };
data_callback(&data, &info);
Ok(())
}
fn process_output(
stream: &StreamInner,
buffer: &mut [u8],
status: alsa::pcm::Status,
delay_frames: usize,
data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
) -> Result<(), Error> {
stream.equilibrium.fill(buffer);
let data = buffer.as_mut_ptr() as *mut ();
let mut data = unsafe { Data::from_parts(data, stream.period_samples, stream.sample_format) };
let callback_instant = stream.callback_instant(&status);
let delay_duration = frames_to_duration(delay_frames as FrameCount, stream.sample_rate);
let playback = callback_instant + delay_duration;
let timestamp = OutputStreamTimestamp {
callback: callback_instant,
playback,
};
let info = OutputCallbackInfo { timestamp };
data_callback(&mut data, &info);
let mut frames_written = 0;
while frames_written < stream.period_size {
match stream
.handle
.io_bytes()
.writei(&buffer[frames_written * stream.frame_size..])
{
Ok(n) => frames_written += n,
Err(err) if err.errno() == libc::EAGAIN => {
if frames_written == 0 {
return Ok(());
} else {
return Err(ErrorKind::Xrun.into());
}
}
Err(err) if err.errno() == libc::EPIPE => return Err(ErrorKind::Xrun.into()),
Err(err) if err.errno() == libc::ESTRPIPE => {
return try_resume(&stream.handle).map(|_| ());
}
Err(err) => return Err(err.into()),
}
}
Ok(())
}
#[inline]
#[allow(clippy::unnecessary_cast)]
fn timespec_to_nanos(ts: libc::timespec) -> i64 {
ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec as i64
}
#[inline]
fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 {
timespec_to_nanos(a) - timespec_to_nanos(b)
}
#[inline]
fn htstamp_elapsed(status: &alsa::pcm::Status, origin: libc::timespec) -> StreamInstant {
let nanos = timespec_diff_nanos(status.get_htstamp(), origin);
StreamInstant::from_nanos(nanos.max(0) as u64)
}
impl Stream {
fn signal_ready(&self) {
self.latch.release();
}
fn new_input<D, E>(
inner: Arc<StreamInner>,
mut data_callback: D,
mut error_callback: E,
timeout: Option<Duration>,
) -> Stream
where
D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
E: FnMut(Error) + Send + 'static,
{
let (tx, rx) = trigger();
let rx_thread = rx.clone();
let stream = inner.clone();
let mut latch = Latch::new();
let waiter = latch.waiter();
let thread = thread::Builder::new()
.name("cpal_alsa_in".to_owned())
.spawn(move || {
waiter.wait();
input_stream_worker(
rx_thread,
&stream,
&mut data_callback,
&mut error_callback,
timeout,
);
})
.unwrap();
latch.add_thread(thread.thread().clone());
Self {
thread: Some(thread),
inner,
trigger: tx,
_rx: rx,
latch,
}
}
fn new_output<D, E>(
inner: Arc<StreamInner>,
mut data_callback: D,
mut error_callback: E,
timeout: Option<Duration>,
) -> Stream
where
D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
E: FnMut(Error) + Send + 'static,
{
let (tx, rx) = trigger();
let rx_thread = rx.clone();
let stream = inner.clone();
let mut latch = Latch::new();
let waiter = latch.waiter();
let thread = thread::Builder::new()
.name("cpal_alsa_out".to_owned())
.spawn(move || {
waiter.wait();
output_stream_worker(
rx_thread,
&stream,
&mut data_callback,
&mut error_callback,
timeout,
);
})
.unwrap();
latch.add_thread(thread.thread().clone());
Self {
thread: Some(thread),
inner,
trigger: tx,
_rx: rx,
latch,
}
}
}
impl Drop for Stream {
fn drop(&mut self) {
self.signal_ready();
self.inner.dropping.store(true, Ordering::Release);
self.trigger.wakeup();
if let Some(handle) = self.thread.take() {
let _ = handle.join();
}
}
}
impl StreamTrait for Stream {
fn play(&self) -> Result<(), Error> {
self.signal_ready(); match self.inner.handle.state() {
alsa::pcm::State::Prepared if self.inner.direction == DeviceDirection::Input => {
self.inner.handle.start()?;
}
alsa::pcm::State::Paused => {
self.inner.handle.pause(false)?;
}
_ => {}
}
Ok(())
}
fn pause(&self) -> Result<(), Error> {
let hw_params = self.inner.handle.hw_params_current()?;
if !hw_params.can_pause() {
return Err(Error::with_message(
ErrorKind::UnsupportedOperation,
"Device does not support pausing",
));
}
if self.inner.handle.state() != alsa::pcm::State::Paused {
self.inner.handle.pause(true)?;
}
Ok(())
}
fn now(&self) -> StreamInstant {
if self.inner.timestamp_mode != TimestampMode::CreationInstant {
let audio_ts_type = match self.inner.timestamp_mode {
TimestampMode::AudioLink => alsa::pcm::AudioTstampType::LinkSynchronized,
_ => alsa::pcm::AudioTstampType::Compat,
};
if let Ok(status) = alsa::pcm::StatusBuilder::new()
.audio_htstamp_config(audio_ts_type, false)
.build(&self.inner.handle)
{
return self.inner.callback_instant(&status);
}
}
let d = std::time::Instant::now().duration_since(self.inner.creation_instant);
StreamInstant::new(d.as_secs(), d.subsec_nanos())
}
fn buffer_size(&self) -> Result<FrameCount, Error> {
Ok(self.inner.period_size as FrameCount)
}
}
fn supported_period_size_range(
hw_params: &alsa::pcm::HwParams<'_>,
alsa_format: alsa::pcm::Format,
channels: ChannelCount,
) -> SupportedBufferSize {
let p = hw_params.clone();
if p.set_access(alsa::pcm::Access::RWInterleaved).is_err()
|| p.set_channels(channels as u32).is_err()
|| p.set_format(alsa_format).is_err()
{
return SupportedBufferSize::Unknown;
}
let Some((min, max)) = hw_params_period_size_min_max(&p) else {
return SupportedBufferSize::Unknown;
};
let min_frames = min.max(1);
let effective_max = match p.get_buffer_size_max() {
Ok(max_buf) if max_buf > 0 => max.min(max_buf / DEFAULT_PERIODS),
_ => max,
};
if effective_max >= min_frames {
let Ok(min) = min_frames.try_into() else {
return SupportedBufferSize::Unknown;
};
SupportedBufferSize::Range {
min,
max: effective_max.try_into().unwrap_or(FrameCount::MAX),
}
} else {
SupportedBufferSize::Unknown
}
}
fn hw_params_period_size_min_max(
hw_params: &alsa::pcm::HwParams,
) -> Option<(alsa::pcm::Frames, alsa::pcm::Frames)> {
let min = hw_params.get_period_size_min().ok()?;
let max = hw_params.get_period_size_max().ok()?;
(max > 0 && max >= min).then_some((min, max))
}
fn init_hw_params<'a>(
pcm_handle: &'a alsa::pcm::PCM,
config: StreamConfig,
sample_format: SampleFormat,
) -> Result<alsa::pcm::HwParams<'a>, Error> {
let hw_params = alsa::pcm::HwParams::any(pcm_handle)?;
hw_params.set_access(alsa::pcm::Access::RWInterleaved)?;
let alsa_format = sample_format_to_alsa_format(&hw_params, sample_format)?;
hw_params.set_format(alsa_format)?;
hw_params.set_rate(config.sample_rate, alsa::ValueOr::Nearest)?;
hw_params.set_channels(config.channels as u32)?;
Ok(hw_params)
}
fn sample_format_to_alsa_format(
hw_params: &alsa::pcm::HwParams,
sample_format: SampleFormat,
) -> Result<alsa::pcm::Format, Error> {
use alsa::pcm::Format;
let (native, opposite) = match sample_format {
SampleFormat::I8 => return Ok(Format::S8), SampleFormat::U8 => return Ok(Format::U8), #[cfg(target_endian = "little")]
SampleFormat::I16 => (Format::S16LE, Format::S16BE),
#[cfg(target_endian = "big")]
SampleFormat::I16 => (Format::S16BE, Format::S16LE),
#[cfg(target_endian = "little")]
SampleFormat::U16 => (Format::U16LE, Format::U16BE),
#[cfg(target_endian = "big")]
SampleFormat::U16 => (Format::U16BE, Format::U16LE),
#[cfg(target_endian = "little")]
SampleFormat::I24 => (Format::S24LE, Format::S24BE),
#[cfg(target_endian = "big")]
SampleFormat::I24 => (Format::S24BE, Format::S24LE),
#[cfg(target_endian = "little")]
SampleFormat::U24 => (Format::U24LE, Format::U24BE),
#[cfg(target_endian = "big")]
SampleFormat::U24 => (Format::U24BE, Format::U24LE),
#[cfg(target_endian = "little")]
SampleFormat::I32 => (Format::S32LE, Format::S32BE),
#[cfg(target_endian = "big")]
SampleFormat::I32 => (Format::S32BE, Format::S32LE),
#[cfg(target_endian = "little")]
SampleFormat::U32 => (Format::U32LE, Format::U32BE),
#[cfg(target_endian = "big")]
SampleFormat::U32 => (Format::U32BE, Format::U32LE),
#[cfg(target_endian = "little")]
SampleFormat::F32 => (Format::FloatLE, Format::FloatBE),
#[cfg(target_endian = "big")]
SampleFormat::F32 => (Format::FloatBE, Format::FloatLE),
#[cfg(target_endian = "little")]
SampleFormat::F64 => (Format::Float64LE, Format::Float64BE),
#[cfg(target_endian = "big")]
SampleFormat::F64 => (Format::Float64BE, Format::Float64LE),
SampleFormat::DsdU8 => return Ok(Format::DSDU8),
#[cfg(target_endian = "little")]
SampleFormat::DsdU16 => (Format::DSDU16LE, Format::DSDU16BE),
#[cfg(target_endian = "big")]
SampleFormat::DsdU16 => (Format::DSDU16BE, Format::DSDU16LE),
#[cfg(target_endian = "little")]
SampleFormat::DsdU32 => (Format::DSDU32LE, Format::DSDU32BE),
#[cfg(target_endian = "big")]
SampleFormat::DsdU32 => (Format::DSDU32BE, Format::DSDU32LE),
_ => {
return Err(Error::with_message(
ErrorKind::UnsupportedConfig,
format!("Sample format {sample_format} is not supported"),
))
}
};
if hw_params.test_format(native).is_ok() {
return Ok(native);
}
if hw_params.test_format(opposite).is_ok() {
return Ok(opposite);
}
Err(Error::with_message(
ErrorKind::UnsupportedConfig,
format!("Sample format {sample_format} is not supported in any byte order"),
))
}
fn set_hw_params_from_format(
pcm_handle: &alsa::pcm::PCM,
config: StreamConfig,
sample_format: SampleFormat,
) -> Result<alsa::pcm::HwParams<'_>, Error> {
let hw_params = init_hw_params(pcm_handle, config, sample_format)?;
if let BufferSize::Fixed(period_size) = config.buffer_size {
let period_size = period_size as alsa::pcm::Frames;
if let Some((min_period, max_period)) = hw_params_period_size_min_max(&hw_params) {
if !(min_period..=max_period).contains(&period_size) {
return Err(Error::with_message(
ErrorKind::UnsupportedConfig,
format!("Buffer size {period_size} is not in the supported range {min_period}..={max_period}"),
));
}
}
let buffer_size = DEFAULT_PERIODS * period_size;
if let Ok(max_buffer) = hw_params.get_buffer_size_max() {
if max_buffer > 0 && buffer_size > max_buffer {
let effective_max = max_buffer / DEFAULT_PERIODS;
return Err(Error::with_message(
ErrorKind::UnsupportedConfig,
format!("Buffer size {period_size} exceeds the maximum supported value of {effective_max}"),
));
}
}
hw_params.set_buffer_size_near(buffer_size)?;
hw_params.set_period_size_near(period_size, alsa::ValueOr::Nearest)?;
}
pcm_handle.hw_params(&hw_params)?;
if config.buffer_size == BufferSize::Default {
if let Ok(period_size) = hw_params.get_period_size() {
let hw_params = init_hw_params(pcm_handle, config, sample_format)?;
hw_params.set_period_size_near(period_size, alsa::ValueOr::Nearest)?;
hw_params.set_buffer_size_near(DEFAULT_PERIODS * period_size)?;
pcm_handle.hw_params(&hw_params)?;
}
}
pcm_handle.hw_params_current().map_err(Into::into)
}
fn set_sw_params_from_format(
pcm_handle: &alsa::pcm::PCM,
stream_type: alsa::Direction,
) -> Result<(alsa::pcm::Frames, alsa::pcm::Frames), Error> {
let sw_params = pcm_handle.sw_params_current()?;
let (buffer_size, period_size) = pcm_handle
.get_params()
.map(|(b, p)| (b as alsa::pcm::Frames, p as alsa::pcm::Frames))?;
let start_threshold = match stream_type {
alsa::Direction::Playback => {
DEFAULT_PERIODS * period_size
}
alsa::Direction::Capture => 1,
};
sw_params.set_start_threshold(start_threshold)?;
sw_params.set_avail_min(period_size)?;
sw_params.set_tstamp_mode(true)?;
sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?;
if pcm_handle.sw_params(&sw_params).is_err() {
sw_params.set_tstamp_type(alsa::pcm::TstampType::Monotonic)?;
pcm_handle.sw_params(&sw_params)?;
}
Ok((buffer_size, period_size))
}
fn canonical_pcm_id(pcm_id: &str) -> String {
if let Some((prefix, rest)) = pcm_id.split_once(':') {
let (card_str, device_str) = match rest.split_once(',') {
Some((c, d)) => (c.trim(), d.trim()),
None => (rest.trim(), "0"),
};
if card_str.contains('=') {
if !rest.contains(',') {
return format!("{prefix}:{rest},DEV=0");
}
} else if let Ok(device) = device_str.parse::<u32>() {
return format!("{prefix}:CARD={card_str},DEV={device}");
}
}
pcm_id.to_owned()
}
impl From<alsa::Error> for Error {
fn from(err: alsa::Error) -> Self {
match err.errno() {
libc::ENODEV | libc::ENOENT | LIBC_ENOTSUPP => ErrorKind::DeviceNotAvailable.into(),
libc::EPERM | libc::EACCES => ErrorKind::PermissionDenied.into(),
libc::EBUSY | libc::EAGAIN => ErrorKind::DeviceBusy.into(),
libc::EINVAL => ErrorKind::UnsupportedConfig.into(),
libc::ENOSYS => ErrorKind::UnsupportedOperation.into(),
libc::EPIPE => ErrorKind::Xrun.into(),
_ => Error::with_message(ErrorKind::BackendError, err.to_string()),
}
}
}