use std::{
mem,
ops::ControlFlow,
ptr,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Receiver, SendError, Sender},
Arc,
},
thread::{self, JoinHandle},
time::Duration,
};
use windows::Win32::{
Foundation::{self, PROPERTYKEY, WAIT_OBJECT_0},
Media::Audio,
System::{Performance, SystemServices, Threading},
};
use crate::{
host::{
emit_error, equilibrium::fill_equilibrium, frames_to_duration, latch::Latch,
ErrorCallbackArc,
},
traits::StreamTrait,
Data, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp,
OutputCallbackInfo, OutputStreamTimestamp, ResultExt, SampleFormat, SampleRate, StreamConfig,
StreamInstant,
};
fn get_current_default(flow: Audio::EDataFlow) -> Option<Audio::IMMDevice> {
super::device::current_default_endpoint(flow)
}
pub(crate) struct DefaultDeviceMonitor {
enumerator: Audio::IMMDeviceEnumerator,
client: Audio::IMMNotificationClient,
event: Foundation::HANDLE,
pub(crate) pending_device_changed: Arc<AtomicBool>,
}
unsafe impl Send for DefaultDeviceMonitor {}
unsafe impl Sync for DefaultDeviceMonitor {}
impl DefaultDeviceMonitor {
pub fn new(
enumerator: Audio::IMMDeviceEnumerator,
flow: Audio::EDataFlow,
) -> Result<Self, Error> {
let event =
unsafe { Threading::CreateEventW(None, false, false, None).map_err(Error::from)? };
let pending_device_changed = Arc::new(AtomicBool::new(false));
let client: Audio::IMMNotificationClient = DefaultDeviceNotificationImpl {
flow,
event,
pending_device_changed: pending_device_changed.clone(),
}
.into();
unsafe {
enumerator
.RegisterEndpointNotificationCallback(&client)
.map_err(Error::from)?;
}
Ok(Self {
enumerator,
client,
event,
pending_device_changed,
})
}
}
impl Drop for DefaultDeviceMonitor {
fn drop(&mut self) {
crate::host::com::com_initialized();
unsafe {
if self
.enumerator
.UnregisterEndpointNotificationCallback(&self.client)
.is_ok()
{
let _ = Foundation::CloseHandle(self.event);
}
}
}
}
#[windows::core::implement(Audio::IMMNotificationClient)]
struct DefaultDeviceNotificationImpl {
flow: Audio::EDataFlow,
event: Foundation::HANDLE,
pending_device_changed: Arc<AtomicBool>,
}
impl Audio::IMMNotificationClient_Impl for DefaultDeviceNotificationImpl_Impl {
fn OnDefaultDeviceChanged(
&self,
flow: Audio::EDataFlow,
role: Audio::ERole,
_pwstrdefaultdeviceid: &windows::core::PCWSTR,
) -> windows::core::Result<()> {
if flow == self.flow && role == Audio::eConsole {
unsafe {
if Threading::SetEvent(self.event).is_err() {
self.pending_device_changed.store(true, Ordering::Relaxed);
}
}
}
Ok(())
}
fn OnDeviceStateChanged(
&self,
_pwstrdeviceid: &windows::core::PCWSTR,
dwnewstate: Audio::DEVICE_STATE,
) -> windows::core::Result<()> {
let is_unavailable = dwnewstate == Audio::DEVICE_STATE_DISABLED
|| dwnewstate == Audio::DEVICE_STATE_NOTPRESENT
|| dwnewstate == Audio::DEVICE_STATE_UNPLUGGED;
if is_unavailable && get_current_default(self.flow).is_none() {
unsafe {
if Threading::SetEvent(self.event).is_err() {
self.pending_device_changed.store(true, Ordering::Relaxed);
}
}
}
Ok(())
}
fn OnDeviceAdded(&self, _pwstrdeviceid: &windows::core::PCWSTR) -> windows::core::Result<()> {
Ok(())
}
fn OnDeviceRemoved(&self, _pwstrdeviceid: &windows::core::PCWSTR) -> windows::core::Result<()> {
if get_current_default(self.flow).is_none() {
unsafe {
if Threading::SetEvent(self.event).is_err() {
self.pending_device_changed.store(true, Ordering::Relaxed);
}
}
}
Ok(())
}
fn OnPropertyValueChanged(
&self,
_pwstrdeviceid: &windows::core::PCWSTR,
_key: &PROPERTYKEY,
) -> windows::core::Result<()> {
Ok(())
}
}
pub struct Stream {
thread: Option<JoinHandle<()>>,
commands: Sender<Command>,
pending_scheduled_event: Foundation::HANDLE,
period_frames: FrameCount,
qpc_frequency: u64,
_default_device_monitor: Option<DefaultDeviceMonitor>,
latch: Latch,
}
unsafe impl Send for Stream {}
unsafe impl Sync for Stream {}
crate::assert_stream_send!(Stream);
crate::assert_stream_sync!(Stream);
struct RunContext {
stream: StreamInner,
handles: Vec<Foundation::HANDLE>,
commands: Receiver<Command>,
pending_device_changed: Option<Arc<AtomicBool>>,
pending_scheduled_event: Foundation::HANDLE,
}
impl Drop for RunContext {
fn drop(&mut self) {
unsafe {
let _ = Foundation::CloseHandle(self.pending_scheduled_event);
}
}
}
unsafe impl Send for RunContext {}
pub enum Command {
PlayStream,
PauseStream,
Terminate,
}
pub enum AudioClientFlow {
Render {
render_client: Audio::IAudioRenderClient,
},
Capture {
capture_client: Audio::IAudioCaptureClient,
},
}
pub struct StreamInner {
pub audio_client: Audio::IAudioClient,
pub audio_clock: Audio::IAudioClock,
pub client_flow: AudioClientFlow,
pub event: Foundation::HANDLE,
pub playing: bool,
pub max_frames_in_buffer: FrameCount,
pub period_frames: FrameCount,
pub bytes_per_frame: u16,
pub config: StreamConfig,
pub sample_format: SampleFormat,
pub stream_latency: Duration,
}
impl Stream {
pub(crate) fn new_input<D>(
stream_inner: StreamInner,
mut data_callback: D,
error_callback: ErrorCallbackArc,
default_device_monitor: Option<DefaultDeviceMonitor>,
) -> Result<Stream, Error>
where
D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
{
let pending_scheduled_event = unsafe {
Threading::CreateEventA(None, false, false, windows::core::PCSTR(ptr::null()))
}
.expect("cpal: could not create input stream event");
let (tx, rx) = channel();
let period_frames = stream_inner.period_frames;
let mut qpc_frequency: i64 = 0;
unsafe {
Performance::QueryPerformanceFrequency(&mut qpc_frequency)
.expect("QueryPerformanceFrequency failed");
debug_assert_ne!(qpc_frequency, 0, "QueryPerformanceFrequency returned zero");
}
let mut handles = vec![pending_scheduled_event, stream_inner.event];
if let Some(ref monitor) = default_device_monitor {
handles.push(monitor.event);
}
let pending_device_changed = default_device_monitor
.as_ref()
.map(|m| m.pending_device_changed.clone());
let run_context = RunContext {
handles,
stream: stream_inner,
commands: rx,
pending_device_changed,
pending_scheduled_event,
};
let mut latch = Latch::new();
let waiter = latch.waiter();
let thread = thread::Builder::new()
.name("cpal_wasapi_in".to_owned())
.spawn(move || {
waiter.wait();
run_input(run_context, &mut data_callback, &error_callback)
})
.map_err(|e| {
Error::with_message(
ErrorKind::ResourceExhausted,
format!("Failed to create audio thread: {e}"),
)
})?;
latch.add_thread(thread.thread().clone());
let stream = Stream {
thread: Some(thread),
commands: tx,
pending_scheduled_event,
period_frames,
qpc_frequency: qpc_frequency as u64,
_default_device_monitor: default_device_monitor,
latch,
};
Ok(stream)
}
pub(crate) fn new_output<D>(
stream_inner: StreamInner,
mut data_callback: D,
error_callback: ErrorCallbackArc,
default_device_monitor: Option<DefaultDeviceMonitor>,
) -> Result<Stream, Error>
where
D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
{
let pending_scheduled_event = unsafe {
Threading::CreateEventA(None, false, false, windows::core::PCSTR(ptr::null()))
}
.expect("cpal: could not create output stream event");
let (tx, rx) = channel();
let period_frames = stream_inner.period_frames;
let mut qpc_frequency: i64 = 0;
unsafe {
Performance::QueryPerformanceFrequency(&mut qpc_frequency)
.expect("QueryPerformanceFrequency failed");
debug_assert_ne!(qpc_frequency, 0, "QueryPerformanceFrequency returned zero");
}
let mut handles = vec![pending_scheduled_event, stream_inner.event];
if let Some(ref monitor) = default_device_monitor {
handles.push(monitor.event);
}
let pending_device_changed = default_device_monitor
.as_ref()
.map(|m| m.pending_device_changed.clone());
let run_context = RunContext {
handles,
stream: stream_inner,
commands: rx,
pending_device_changed,
pending_scheduled_event,
};
let mut latch = Latch::new();
let waiter = latch.waiter();
let thread = thread::Builder::new()
.name("cpal_wasapi_out".to_owned())
.spawn(move || {
waiter.wait();
run_output(run_context, &mut data_callback, &error_callback)
})
.map_err(|e| {
Error::with_message(
ErrorKind::ResourceExhausted,
format!("Failed to create audio thread: {e}"),
)
})?;
latch.add_thread(thread.thread().clone());
let stream = Stream {
thread: Some(thread),
commands: tx,
pending_scheduled_event,
period_frames,
qpc_frequency: qpc_frequency as u64,
_default_device_monitor: default_device_monitor,
latch,
};
Ok(stream)
}
pub(crate) fn signal_ready(&self) {
self.latch.release();
}
fn push_command(&self, command: Command) -> Result<(), SendError<Command>> {
self.commands.send(command)?;
unsafe {
Threading::SetEvent(self.pending_scheduled_event).unwrap();
}
Ok(())
}
}
impl Drop for Stream {
fn drop(&mut self) {
self.signal_ready();
let _ = self.push_command(Command::Terminate);
if let Some(handle) = self.thread.take() {
if handle.thread().id() != std::thread::current().id() {
let _ = handle.join();
}
}
}
}
impl StreamTrait for Stream {
fn play(&self) -> Result<(), Error> {
self.push_command(Command::PlayStream).map_err(|_| {
Error::with_message(
ErrorKind::StreamInvalidated,
"Stream command channel closed",
)
})?;
Ok(())
}
fn pause(&self) -> Result<(), Error> {
self.push_command(Command::PauseStream).map_err(|_| {
Error::with_message(
ErrorKind::StreamInvalidated,
"Stream command channel closed",
)
})?;
Ok(())
}
fn now(&self) -> StreamInstant {
let mut counter: i64 = 0;
unsafe {
Performance::QueryPerformanceCounter(&mut counter)
.expect("QueryPerformanceCounter failed");
}
let units_100ns = counter as u128 * 10_000_000 / self.qpc_frequency as u128;
let nanos = units_100ns * 100;
StreamInstant::new(
(nanos / 1_000_000_000) as u64,
(nanos % 1_000_000_000) as u32,
)
}
fn buffer_size(&self) -> Result<FrameCount, Error> {
Ok(self.period_frames)
}
}
impl Drop for StreamInner {
fn drop(&mut self) {
unsafe {
let _ = Foundation::CloseHandle(self.event);
}
}
}
fn process_commands(run_context: &mut RunContext) -> Result<bool, Error> {
for command in run_context.commands.try_iter() {
match command {
Command::PlayStream => unsafe {
if !run_context.stream.playing {
run_context
.stream
.audio_client
.Start()
.context("Failed to start audio client")?;
run_context.stream.playing = true;
}
},
Command::PauseStream => unsafe {
if run_context.stream.playing {
run_context
.stream
.audio_client
.Stop()
.context("Failed to stop audio client")?;
run_context.stream.playing = false;
}
},
Command::Terminate => {
return Ok(false);
}
}
}
Ok(true)
}
fn wait_for_handle_signal(handles: &[Foundation::HANDLE]) -> Result<usize, Error> {
debug_assert!(handles.len() <= SystemServices::MAXIMUM_WAIT_OBJECTS as usize);
let result = unsafe {
Threading::WaitForMultipleObjectsEx(
handles,
false, Threading::INFINITE, false, )
};
if result == Foundation::WAIT_FAILED {
return Err(Error::with_message(
ErrorKind::StreamInvalidated,
"Failed to wait for audio event",
));
}
let handle_idx = (result.0 - WAIT_OBJECT_0.0) as usize;
Ok(handle_idx)
}
#[inline]
fn get_available_frames(stream: &StreamInner) -> Result<FrameCount, Error> {
unsafe {
let padding = stream
.audio_client
.GetCurrentPadding()
.context("Failed to get current padding")?;
Ok(stream.max_frames_in_buffer - padding)
}
}
fn run_input(
mut run_ctxt: RunContext,
data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo),
error_callback: &ErrorCallbackArc,
) {
#[cfg(feature = "realtime")]
if let Err(err) = boost_current_thread_priority(
run_ctxt.stream.period_frames,
run_ctxt.stream.config.sample_rate,
) {
emit_error(error_callback, err);
}
loop {
match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
ControlFlow::Break(()) => break,
ControlFlow::Continue(false) => continue,
ControlFlow::Continue(true) => {}
}
let capture_client = match run_ctxt.stream.client_flow {
AudioClientFlow::Capture { ref capture_client } => capture_client.clone(),
_ => unreachable!(),
};
if let Err(err) = process_input(&run_ctxt.stream, capture_client, data_callback) {
emit_error(error_callback, err);
break;
}
}
}
fn run_output(
mut run_ctxt: RunContext,
data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo),
error_callback: &ErrorCallbackArc,
) {
#[cfg(feature = "realtime")]
if let Err(err) = boost_current_thread_priority(
run_ctxt.stream.period_frames,
run_ctxt.stream.config.sample_rate,
) {
emit_error(error_callback, err);
}
loop {
match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
ControlFlow::Break(()) => break,
ControlFlow::Continue(false) => continue,
ControlFlow::Continue(true) => {}
}
let render_client = match run_ctxt.stream.client_flow {
AudioClientFlow::Render { ref render_client } => render_client.clone(),
_ => unreachable!(),
};
if let Err(err) = process_output(&run_ctxt.stream, render_client, data_callback) {
emit_error(error_callback, err);
break;
}
}
}
#[cfg(feature = "realtime")]
fn boost_current_thread_priority(
period_frames: FrameCount,
sample_rate: SampleRate,
) -> Result<(), Error> {
match audio_thread_priority::promote_current_thread_to_real_time(period_frames, sample_rate) {
Ok(_) => Ok(()),
Err(_) => unsafe {
let thread_handle = Threading::GetCurrentThread();
Threading::SetThreadPriority(thread_handle, Threading::THREAD_PRIORITY_TIME_CRITICAL)
.context("Failed to promote audio thread to real-time priority")
},
}
}
fn process_commands_and_await_signal(
run_context: &mut RunContext,
error_callback: &ErrorCallbackArc,
) -> ControlFlow<(), bool> {
match process_commands(run_context) {
Ok(true) => (),
Ok(false) => return ControlFlow::Break(()),
Err(err) => {
emit_error(error_callback, err);
return ControlFlow::Break(());
}
};
if let Some(ref flag) = run_context.pending_device_changed {
if flag.swap(false, Ordering::Relaxed) {
emit_error(
error_callback,
Error::with_message(ErrorKind::DeviceChanged, "Default audio device changed"),
);
}
}
let handle_idx = match wait_for_handle_signal(&run_context.handles) {
Ok(idx) => idx,
Err(err) => {
emit_error(error_callback, err);
return ControlFlow::Break(());
}
};
if handle_idx >= 2 {
emit_error(
error_callback,
Error::with_message(ErrorKind::DeviceChanged, "Default audio device changed"),
);
return ControlFlow::Continue(false);
}
ControlFlow::Continue(handle_idx != 0)
}
fn process_input(
stream: &StreamInner,
capture_client: Audio::IAudioCaptureClient,
data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo),
) -> Result<(), Error> {
unsafe {
let mut buffer: *mut u8 = ptr::null_mut();
let mut flags = mem::MaybeUninit::uninit();
loop {
let mut frames_available = match capture_client.GetNextPacketSize() {
Ok(0) => return Ok(()),
Ok(f) => f,
Err(err) => return Err(Error::from(err)),
};
let mut qpc_position: u64 = 0;
let result = capture_client.GetBuffer(
&mut buffer,
&mut frames_available,
flags.as_mut_ptr(),
None,
Some(&mut qpc_position),
);
match result {
Err(e) if e.code() == Audio::AUDCLNT_S_BUFFER_EMPTY => continue,
Err(e) => return Err(Error::from(e)),
Ok(_) => (),
}
debug_assert!(!buffer.is_null());
let data = buffer as *mut ();
let len = frames_available as usize * stream.bytes_per_frame as usize
/ stream.sample_format.sample_size();
let data = Data::from_parts(data, len, stream.sample_format);
let timestamp = input_timestamp(stream, qpc_position)?;
let info = InputCallbackInfo { timestamp };
data_callback(&data, &info);
capture_client
.ReleaseBuffer(frames_available)
.context("Failed to release capture buffer")?;
}
}
}
fn process_output(
stream: &StreamInner,
render_client: Audio::IAudioRenderClient,
data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo),
) -> Result<(), Error> {
let frames_available = match get_available_frames(stream)? {
0 => return Ok(()), n => n,
};
unsafe {
let buffer = render_client
.GetBuffer(frames_available)
.map_err(Error::from)?;
debug_assert!(!buffer.is_null());
let byte_count = frames_available as usize * stream.bytes_per_frame as usize;
let buffer_slice = std::slice::from_raw_parts_mut(buffer, byte_count);
fill_equilibrium(buffer_slice, stream.sample_format);
let data = buffer as *mut ();
let len = byte_count / stream.sample_format.sample_size();
let mut data = Data::from_parts(data, len, stream.sample_format);
let sample_rate = stream.config.sample_rate;
let timestamp = output_timestamp(stream, frames_available, sample_rate)?;
let info = OutputCallbackInfo { timestamp };
data_callback(&mut data, &info);
render_client
.ReleaseBuffer(frames_available, 0)
.map_err(Error::from)?;
}
Ok(())
}
#[inline]
fn stream_instant(stream: &StreamInner) -> Result<StreamInstant, Error> {
let mut position: u64 = 0;
let mut qpc_position: u64 = 0;
unsafe {
stream
.audio_clock
.GetPosition(&mut position, Some(&mut qpc_position))
.context("Failed to get clock position")?;
};
let nanos = qpc_position as u128 * 100;
let instant = StreamInstant::new(
(nanos / 1_000_000_000) as u64,
(nanos % 1_000_000_000) as u32,
);
Ok(instant)
}
#[inline]
fn input_timestamp(
stream: &StreamInner,
buffer_qpc_position: u64,
) -> Result<InputStreamTimestamp, Error> {
let nanos = buffer_qpc_position as u128 * 100;
let capture = StreamInstant::new(
(nanos / 1_000_000_000) as u64,
(nanos % 1_000_000_000) as u32,
);
let callback = stream_instant(stream)?;
Ok(InputStreamTimestamp { capture, callback })
}
#[inline]
fn output_timestamp(
stream: &StreamInner,
frames_available: FrameCount,
sample_rate: SampleRate,
) -> Result<OutputStreamTimestamp, Error> {
let callback = stream_instant(stream)?;
let padding = stream.max_frames_in_buffer - frames_available;
let playback = callback + (frames_to_duration(padding, sample_rate) + stream.stream_latency);
Ok(OutputStreamTimestamp { callback, playback })
}