use super::windows_err_to_cpal_err;
use crate::traits::StreamTrait;
use crate::{
BackendSpecificError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError,
PlayStreamError, SampleFormat, StreamError,
};
use std::mem;
use std::ptr;
use std::sync::mpsc::{channel, Receiver, SendError, Sender};
use std::thread::{self, JoinHandle};
use windows::Win32::Foundation;
use windows::Win32::Foundation::HANDLE;
use windows::Win32::Foundation::WAIT_OBJECT_0;
use windows::Win32::Media::Audio;
use windows::Win32::System::SystemServices;
use windows::Win32::System::Threading;
pub struct Stream {
thread: Option<JoinHandle<()>>,
commands: Sender<Command>,
pending_scheduled_event: Foundation::HANDLE,
}
struct RunContext {
stream: StreamInner,
handles: Vec<Foundation::HANDLE>,
commands: Receiver<Command>,
}
unsafe impl Send for RunContext {}
pub enum Command {
PlayStream,
PauseStream,
Terminate,
}
pub enum AudioClientFlow {
Render {
render_client: Audio::IAudioRenderClient,
},
Capture {
capture_client: Audio::IAudioCaptureClient,
},
}
pub struct StreamInner {
pub audio_client: Audio::IAudioClient,
pub audio_clock: Audio::IAudioClock,
pub client_flow: AudioClientFlow,
pub event: Foundation::HANDLE,
pub playing: bool,
pub max_frames_in_buffer: u32,
pub bytes_per_frame: u16,
pub config: crate::StreamConfig,
pub sample_format: SampleFormat,
}
impl Stream {
pub(crate) fn new_input<D, E>(
stream_inner: StreamInner,
mut data_callback: D,
mut error_callback: E,
) -> Stream
where
D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
E: FnMut(StreamError) + Send + 'static,
{
let pending_scheduled_event = unsafe {
Threading::CreateEventA(None, false, false, windows::core::PCSTR(ptr::null()))
}
.expect("cpal: could not create input stream event");
let (tx, rx) = channel();
let run_context = RunContext {
handles: vec![pending_scheduled_event, stream_inner.event],
stream: stream_inner,
commands: rx,
};
let thread = thread::Builder::new()
.name("cpal_wasapi_in".to_owned())
.spawn(move || run_input(run_context, &mut data_callback, &mut error_callback))
.unwrap();
Stream {
thread: Some(thread),
commands: tx,
pending_scheduled_event,
}
}
pub(crate) fn new_output<D, E>(
stream_inner: StreamInner,
mut data_callback: D,
mut error_callback: E,
) -> Stream
where
D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
E: FnMut(StreamError) + Send + 'static,
{
let pending_scheduled_event = unsafe {
Threading::CreateEventA(None, false, false, windows::core::PCSTR(ptr::null()))
}
.expect("cpal: could not create output stream event");
let (tx, rx) = channel();
let run_context = RunContext {
handles: vec![pending_scheduled_event, stream_inner.event],
stream: stream_inner,
commands: rx,
};
let thread = thread::Builder::new()
.name("cpal_wasapi_out".to_owned())
.spawn(move || run_output(run_context, &mut data_callback, &mut error_callback))
.unwrap();
Stream {
thread: Some(thread),
commands: tx,
pending_scheduled_event,
}
}
#[inline]
fn push_command(&self, command: Command) -> Result<(), SendError<Command>> {
self.commands.send(command)?;
unsafe {
Threading::SetEvent(self.pending_scheduled_event).unwrap();
}
Ok(())
}
}
impl Drop for Stream {
#[inline]
fn drop(&mut self) {
if self.push_command(Command::Terminate).is_ok() {
self.thread.take().unwrap().join().unwrap();
unsafe {
let _ = Foundation::CloseHandle(self.pending_scheduled_event);
}
}
}
}
impl StreamTrait for Stream {
fn play(&self) -> Result<(), PlayStreamError> {
self.push_command(Command::PlayStream)
.map_err(|_| crate::error::PlayStreamError::DeviceNotAvailable)?;
Ok(())
}
fn pause(&self) -> Result<(), PauseStreamError> {
self.push_command(Command::PauseStream)
.map_err(|_| crate::error::PauseStreamError::DeviceNotAvailable)?;
Ok(())
}
}
impl Drop for StreamInner {
#[inline]
fn drop(&mut self) {
unsafe {
let _ = Foundation::CloseHandle(self.event);
}
}
}
fn process_commands(run_context: &mut RunContext) -> Result<bool, StreamError> {
for command in run_context.commands.try_iter() {
match command {
Command::PlayStream => unsafe {
if !run_context.stream.playing {
run_context
.stream
.audio_client
.Start()
.map_err(windows_err_to_cpal_err::<StreamError>)?;
run_context.stream.playing = true;
}
},
Command::PauseStream => unsafe {
if run_context.stream.playing {
run_context
.stream
.audio_client
.Stop()
.map_err(windows_err_to_cpal_err::<StreamError>)?;
run_context.stream.playing = false;
}
},
Command::Terminate => {
return Ok(false);
}
}
}
Ok(true)
}
fn wait_for_handle_signal(handles: &[Foundation::HANDLE]) -> Result<usize, BackendSpecificError> {
debug_assert!(handles.len() <= SystemServices::MAXIMUM_WAIT_OBJECTS as usize);
let result = unsafe {
Threading::WaitForMultipleObjectsEx(
handles,
false, Threading::INFINITE, false, )
};
if result == Foundation::WAIT_FAILED {
let err = unsafe { Foundation::GetLastError() };
let description = format!("`WaitForMultipleObjectsEx failed: {:?}", err);
let err = BackendSpecificError { description };
return Err(err);
}
let handle_idx = (result.0 - WAIT_OBJECT_0.0) as usize;
Ok(handle_idx)
}
fn get_available_frames(stream: &StreamInner) -> Result<u32, StreamError> {
unsafe {
let padding = stream
.audio_client
.GetCurrentPadding()
.map_err(windows_err_to_cpal_err::<StreamError>)?;
Ok(stream.max_frames_in_buffer - padding)
}
}
fn run_input(
mut run_ctxt: RunContext,
data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo),
error_callback: &mut dyn FnMut(StreamError),
) {
boost_current_thread_priority();
loop {
match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
Some(ControlFlow::Break) => break,
Some(ControlFlow::Continue) => continue,
None => (),
}
let capture_client = match run_ctxt.stream.client_flow {
AudioClientFlow::Capture { ref capture_client } => capture_client.clone(),
_ => unreachable!(),
};
match process_input(
&run_ctxt.stream,
capture_client,
data_callback,
error_callback,
) {
ControlFlow::Break => break,
ControlFlow::Continue => continue,
}
}
}
fn run_output(
mut run_ctxt: RunContext,
data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo),
error_callback: &mut dyn FnMut(StreamError),
) {
boost_current_thread_priority();
loop {
match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
Some(ControlFlow::Break) => break,
Some(ControlFlow::Continue) => continue,
None => (),
}
let render_client = match run_ctxt.stream.client_flow {
AudioClientFlow::Render { ref render_client } => render_client.clone(),
_ => unreachable!(),
};
match process_output(
&run_ctxt.stream,
render_client,
data_callback,
error_callback,
) {
ControlFlow::Break => break,
ControlFlow::Continue => continue,
}
}
}
fn boost_current_thread_priority() {
unsafe {
let thread_id = Threading::GetCurrentThreadId();
let _ = Threading::SetThreadPriority(
HANDLE(thread_id as isize),
Threading::THREAD_PRIORITY_TIME_CRITICAL,
);
}
}
enum ControlFlow {
Break,
Continue,
}
fn process_commands_and_await_signal(
run_context: &mut RunContext,
error_callback: &mut dyn FnMut(StreamError),
) -> Option<ControlFlow> {
match process_commands(run_context) {
Ok(true) => (),
Ok(false) => return Some(ControlFlow::Break),
Err(err) => {
error_callback(err);
return Some(ControlFlow::Break);
}
};
let handle_idx = match wait_for_handle_signal(&run_context.handles) {
Ok(idx) => idx,
Err(err) => {
error_callback(err.into());
return Some(ControlFlow::Break);
}
};
if handle_idx == 0 {
return Some(ControlFlow::Continue);
}
None
}
fn process_input(
stream: &StreamInner,
capture_client: Audio::IAudioCaptureClient,
data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo),
error_callback: &mut dyn FnMut(StreamError),
) -> ControlFlow {
unsafe {
let mut buffer: *mut u8 = ptr::null_mut();
let mut flags = mem::MaybeUninit::uninit();
loop {
let mut frames_available = match capture_client.GetNextPacketSize() {
Ok(0) => return ControlFlow::Continue,
Ok(f) => f,
Err(err) => {
error_callback(windows_err_to_cpal_err(err));
return ControlFlow::Break;
}
};
let mut qpc_position: u64 = 0;
let result = capture_client.GetBuffer(
&mut buffer,
&mut frames_available,
flags.as_mut_ptr(),
None,
Some(&mut qpc_position),
);
match result {
Err(e) if e.code() == Audio::AUDCLNT_S_BUFFER_EMPTY => continue,
Err(e) => {
error_callback(windows_err_to_cpal_err(e));
return ControlFlow::Break;
}
Ok(_) => (),
}
debug_assert!(!buffer.is_null());
let data = buffer as *mut ();
let len = frames_available as usize * stream.bytes_per_frame as usize
/ stream.sample_format.sample_size();
let data = Data::from_parts(data, len, stream.sample_format);
let timestamp = match input_timestamp(stream, qpc_position) {
Ok(ts) => ts,
Err(err) => {
error_callback(err);
return ControlFlow::Break;
}
};
let info = InputCallbackInfo { timestamp };
data_callback(&data, &info);
let result = capture_client
.ReleaseBuffer(frames_available)
.map_err(windows_err_to_cpal_err);
if let Err(err) = result {
error_callback(err);
return ControlFlow::Break;
}
}
}
}
fn process_output(
stream: &StreamInner,
render_client: Audio::IAudioRenderClient,
data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo),
error_callback: &mut dyn FnMut(StreamError),
) -> ControlFlow {
let frames_available = match get_available_frames(stream) {
Ok(0) => return ControlFlow::Continue, Ok(n) => n,
Err(err) => {
error_callback(err);
return ControlFlow::Break;
}
};
unsafe {
let buffer = match render_client.GetBuffer(frames_available) {
Ok(b) => b,
Err(e) => {
error_callback(windows_err_to_cpal_err(e));
return ControlFlow::Break;
}
};
debug_assert!(!buffer.is_null());
let data = buffer as *mut ();
let len = frames_available as usize * stream.bytes_per_frame as usize
/ stream.sample_format.sample_size();
let mut data = Data::from_parts(data, len, stream.sample_format);
let sample_rate = stream.config.sample_rate;
let timestamp = match output_timestamp(stream, frames_available, sample_rate) {
Ok(ts) => ts,
Err(err) => {
error_callback(err);
return ControlFlow::Break;
}
};
let info = OutputCallbackInfo { timestamp };
data_callback(&mut data, &info);
if let Err(err) = render_client.ReleaseBuffer(frames_available, 0) {
error_callback(windows_err_to_cpal_err(err));
return ControlFlow::Break;
}
}
ControlFlow::Continue
}
fn frames_to_duration(frames: u32, rate: crate::SampleRate) -> std::time::Duration {
let secsf = frames as f64 / rate.0 as f64;
let secs = secsf as u64;
let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
std::time::Duration::new(secs, nanos)
}
fn stream_instant(stream: &StreamInner) -> Result<crate::StreamInstant, StreamError> {
let mut position: u64 = 0;
let mut qpc_position: u64 = 0;
unsafe {
stream
.audio_clock
.GetPosition(&mut position, Some(&mut qpc_position))
.map_err(windows_err_to_cpal_err::<StreamError>)?;
};
let qpc_nanos = qpc_position as i128 * 100;
let instant = crate::StreamInstant::from_nanos_i128(qpc_nanos)
.expect("performance counter out of range of `StreamInstant` representation");
Ok(instant)
}
fn input_timestamp(
stream: &StreamInner,
buffer_qpc_position: u64,
) -> Result<crate::InputStreamTimestamp, StreamError> {
let qpc_nanos = buffer_qpc_position as i128 * 100;
let capture = crate::StreamInstant::from_nanos_i128(qpc_nanos)
.expect("performance counter out of range of `StreamInstant` representation");
let callback = stream_instant(stream)?;
Ok(crate::InputStreamTimestamp { capture, callback })
}
fn output_timestamp(
stream: &StreamInner,
frames_available: u32,
sample_rate: crate::SampleRate,
) -> Result<crate::OutputStreamTimestamp, StreamError> {
let callback = stream_instant(stream)?;
let buffer_duration = frames_to_duration(frames_available, sample_rate);
let playback = callback
.add(buffer_duration)
.expect("`playback` occurs beyond representation supported by `StreamInstant`");
Ok(crate::OutputStreamTimestamp { callback, playback })
}