#![allow(deprecated)]
use super::{asbd_from_config, check_os_status, frames_to_duration, host_time_to_stream_instant};
use super::OSStatus;
use crate::host::coreaudio::macos::loopback::LoopbackDevice;
use crate::traits::{HostTrait, StreamTrait};
use crate::{BackendSpecificError, DevicesError, PauseStreamError, PlayStreamError};
use coreaudio::audio_unit::AudioUnit;
use objc2_core_audio::AudioDeviceID;
use std::sync::{mpsc, Arc, Mutex, Weak};
pub use self::enumerate::{default_input_device, default_output_device, Devices};
use objc2_core_audio::{
kAudioDevicePropertyDeviceIsAlive, kAudioObjectPropertyElementMain,
kAudioObjectPropertyScopeGlobal, AudioObjectPropertyAddress,
};
use property_listener::AudioObjectPropertyListener;
mod device;
pub mod enumerate;
mod loopback;
mod property_listener;
pub use device::Device;
#[derive(Debug)]
pub struct Host;
impl Host {
pub fn new() -> Result<Self, crate::HostUnavailable> {
Ok(Host)
}
}
impl HostTrait for Host {
type Devices = Devices;
type Device = Device;
fn is_available() -> bool {
true
}
fn devices(&self) -> Result<Self::Devices, DevicesError> {
Devices::new()
}
fn default_input_device(&self) -> Option<Self::Device> {
default_input_device()
}
fn default_output_device(&self) -> Option<Self::Device> {
default_output_device()
}
}
type ErrorCallback = Box<dyn FnMut(crate::StreamError) + Send + 'static>;
#[inline]
fn invoke_error_callback<E>(error_callback: &Arc<Mutex<E>>, err: crate::StreamError) -> bool
where
E: FnMut(crate::StreamError) + Send,
{
match error_callback.try_lock() {
Ok(mut cb) => {
cb(err);
true
}
Err(std::sync::TryLockError::Poisoned(guard)) => {
guard.into_inner()(err);
true
}
Err(std::sync::TryLockError::WouldBlock) => {
false
}
}
}
struct DisconnectManager {
_shutdown_tx: mpsc::Sender<()>,
}
impl DisconnectManager {
fn new(
device_id: AudioDeviceID,
stream_weak: Weak<Mutex<StreamInner>>,
error_callback: Arc<Mutex<ErrorCallback>>,
) -> Result<Self, crate::BuildStreamError> {
let (shutdown_tx, shutdown_rx) = mpsc::channel();
let (disconnect_tx, disconnect_rx) = mpsc::channel();
let (ready_tx, ready_rx) = mpsc::channel();
let disconnect_tx_clone = disconnect_tx.clone();
std::thread::spawn(move || {
let property_address = AudioObjectPropertyAddress {
mSelector: kAudioDevicePropertyDeviceIsAlive,
mScope: kAudioObjectPropertyScopeGlobal,
mElement: kAudioObjectPropertyElementMain,
};
let disconnect_fn = move || {
let _ = disconnect_tx_clone.send(());
};
match AudioObjectPropertyListener::new(device_id, property_address, disconnect_fn) {
Ok(_listener) => {
let _ = ready_tx.send(Ok(()));
let _ = shutdown_rx.recv();
}
Err(e) => {
let _ = ready_tx.send(Err(e));
}
}
});
ready_rx
.recv()
.map_err(|_| crate::BuildStreamError::BackendSpecific {
err: BackendSpecificError {
description: "Disconnect listener thread terminated unexpectedly".to_string(),
},
})??;
let stream_weak_clone = stream_weak.clone();
let error_callback_clone = error_callback.clone();
std::thread::spawn(move || {
while disconnect_rx.recv().is_ok() {
if let Some(stream_arc) = stream_weak_clone.upgrade() {
if let Ok(mut stream_inner) = stream_arc.try_lock() {
let _ = stream_inner.pause();
}
invoke_error_callback(
&error_callback_clone,
crate::StreamError::DeviceNotAvailable,
);
} else {
break;
}
}
});
Ok(DisconnectManager {
_shutdown_tx: shutdown_tx,
})
}
}
struct StreamInner {
playing: bool,
audio_unit: AudioUnit,
#[allow(dead_code)]
device_id: AudioDeviceID,
_loopback_device: Option<LoopbackDevice>,
}
impl StreamInner {
fn play(&mut self) -> Result<(), PlayStreamError> {
if !self.playing {
if let Err(e) = self.audio_unit.start() {
let description = format!("{e}");
let err = BackendSpecificError { description };
return Err(err.into());
}
self.playing = true;
}
Ok(())
}
fn pause(&mut self) -> Result<(), PauseStreamError> {
if self.playing {
if let Err(e) = self.audio_unit.stop() {
let description = format!("{e}");
let err = BackendSpecificError { description };
return Err(err.into());
}
self.playing = false;
}
Ok(())
}
}
pub struct Stream {
inner: Arc<Mutex<StreamInner>>,
_disconnect_manager: DisconnectManager,
}
impl Stream {
fn new(
inner: StreamInner,
error_callback: ErrorCallback,
) -> Result<Self, crate::BuildStreamError> {
let device_id = inner.device_id;
let inner_arc = Arc::new(Mutex::new(inner));
let weak_inner = Arc::downgrade(&inner_arc);
let error_callback = Arc::new(Mutex::new(error_callback));
let disconnect_manager = DisconnectManager::new(device_id, weak_inner, error_callback)?;
Ok(Self {
inner: inner_arc,
_disconnect_manager: disconnect_manager,
})
}
}
impl StreamTrait for Stream {
fn play(&self) -> Result<(), PlayStreamError> {
let mut stream = self
.inner
.lock()
.map_err(|_| PlayStreamError::BackendSpecific {
err: BackendSpecificError {
description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(),
},
})?;
stream.play()
}
fn pause(&self) -> Result<(), PauseStreamError> {
let mut stream = self
.inner
.lock()
.map_err(|_| PauseStreamError::BackendSpecific {
err: BackendSpecificError {
description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(),
},
})?;
stream.pause()
}
}
#[cfg(test)]
mod test {
use crate::{
default_host,
traits::{DeviceTrait, HostTrait, StreamTrait},
Sample,
};
#[test]
fn test_play() {
let host = default_host();
let device = host.default_output_device().unwrap();
let mut supported_configs_range = device.supported_output_configs().unwrap();
let supported_config = supported_configs_range
.next()
.unwrap()
.with_max_sample_rate();
let config = supported_config.config();
let stream = device
.build_output_stream(
&config,
write_silence::<f32>,
move |err| println!("Error: {err}"),
None, )
.unwrap();
stream.play().unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
}
#[test]
fn test_record() {
let host = default_host();
let device = host.default_input_device().unwrap();
println!("Device: {:?}", device.name());
let mut supported_configs_range = device.supported_input_configs().unwrap();
println!("Supported configs:");
for config in supported_configs_range.clone() {
println!("{:?}", config)
}
let supported_config = supported_configs_range
.next()
.unwrap()
.with_max_sample_rate();
let config = supported_config.config();
let stream = device
.build_input_stream(
&config,
move |data: &[f32], _: &crate::InputCallbackInfo| {
println!("Got data: {:?}", &data[..25]);
},
move |err| println!("Error: {err}"),
None, )
.unwrap();
stream.play().unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
}
#[test]
fn test_record_output() {
if std::env::var("CI").is_ok() {
println!("Skipping test_record_output in CI environment due to permissions");
return;
}
let host = default_host();
let device = host.default_output_device().unwrap();
let mut supported_configs_range = device.supported_output_configs().unwrap();
let supported_config = supported_configs_range
.next()
.unwrap()
.with_max_sample_rate();
let config = supported_config.config();
println!("Building input stream");
let stream = device
.build_input_stream(
&config,
move |data: &[f32], _: &crate::InputCallbackInfo| {
println!("Got data: {:?}", &data[..25]);
},
move |err| println!("Error: {err}"),
None, )
.unwrap();
stream.play().unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
}
fn write_silence<T: Sample>(data: &mut [T], _: &crate::OutputCallbackInfo) {
for sample in data.iter_mut() {
*sample = Sample::EQUILIBRIUM;
}
}
}