use crate::Device;
use cpal::traits::EventLoopTrait;
use failure::Fail;
use sample::Sample;
use std;
use std::any::{Any, TypeId};
use std::marker::PhantomData;
use std::sync::atomic::{self, AtomicBool};
use std::sync::{mpsc, Arc, Mutex};
pub mod output;
pub mod input;
pub mod duplex {}
pub const DEFAULT_SAMPLE_RATE: u32 = 44_100;
pub type UpdateFn<M> = dyn FnOnce(&mut M) + Send + 'static;
pub(crate) type ProcessFn = dyn FnMut(cpal::StreamDataResult) + 'static + Send;
pub(crate) type ProcessFnMsg = (cpal::StreamId, Box<ProcessFn>);
pub(crate) struct LoopContext {
process_fn_rx: mpsc::Receiver<ProcessFnMsg>,
process_fns: Vec<(cpal::StreamId, Box<ProcessFn>)>,
}
pub struct Stream<M> {
update_tx: mpsc::Sender<Box<dyn FnMut(&mut M) + 'static + Send>>,
process_fn_tx: mpsc::Sender<ProcessFnMsg>,
shared: Arc<Shared<M>>,
cpal_format: cpal::Format,
}
struct Shared<M> {
model: Arc<Mutex<Option<M>>>,
stream_id: cpal::StreamId,
event_loop: Arc<cpal::EventLoop>,
is_paused: AtomicBool,
}
pub struct Builder<M, S = f32> {
pub(crate) host: Arc<cpal::Host>,
pub(crate) event_loop: Arc<cpal::EventLoop>,
pub(crate) process_fn_tx: mpsc::Sender<ProcessFnMsg>,
pub model: M,
pub sample_rate: Option<u32>,
pub channels: Option<usize>,
pub frames_per_buffer: Option<usize>,
pub device: Option<Device>,
pub(crate) sample_format: PhantomData<S>,
}
#[derive(Debug, Fail)]
pub enum BuildError {
#[fail(display = "failed to get default device")]
DefaultDevice,
#[fail(display = "failed to enumerate available formats: {}", err)]
SupportedFormats { err: cpal::SupportedFormatsError },
#[fail(display = "failed to build stream: {}", err)]
BuildStream { err: cpal::BuildStreamError },
}
impl LoopContext {
pub fn new(process_fn_rx: mpsc::Receiver<ProcessFnMsg>) -> Self {
let process_fns = vec![];
LoopContext {
process_fn_rx,
process_fns,
}
}
pub fn process(&mut self, stream_id: cpal::StreamId, data: cpal::StreamDataResult) {
for (stream_id, proc_fn) in self.process_fn_rx.try_iter() {
self.process_fns.retain(|&(ref id, _)| *id != stream_id);
self.process_fns.push((stream_id, proc_fn));
}
if let Some(proc_fn) = self.process_fn_mut(&stream_id) {
proc_fn(data);
} else {
if let Ok(cpal::StreamData::Output { mut buffer }) = data {
fn silence<S: Sample>(slice: &mut [S]) {
for sample in slice {
*sample = S::equilibrium();
}
}
match buffer {
cpal::UnknownTypeOutputBuffer::U16(ref mut buffer) => silence(buffer),
cpal::UnknownTypeOutputBuffer::I16(ref mut buffer) => silence(buffer),
cpal::UnknownTypeOutputBuffer::F32(ref mut buffer) => silence(buffer),
}
}
}
}
fn process_fn_mut(&mut self, stream_id: &cpal::StreamId) -> Option<&mut Box<ProcessFn>> {
self.process_fns
.iter_mut()
.find(|&&mut (ref id, _)| id == stream_id)
.map(|&mut (_, ref mut f)| f)
}
}
impl<M> Stream<M> {
pub fn play(&self) -> Result<(), cpal::PlayStreamError> {
self.shared.play()
}
pub fn pause(&self) -> Result<(), cpal::PauseStreamError> {
self.shared.pause()
}
pub fn is_playing(&self) -> bool {
self.shared.is_playing()
}
pub fn is_paused(&self) -> bool {
self.shared.is_paused()
}
pub fn send<F>(
&self,
update: F,
) -> Result<(), mpsc::SendError<Box<dyn FnMut(&mut M) + Send + 'static>>>
where
F: FnOnce(&mut M) + Send + 'static,
{
if self.shared.is_paused.load(atomic::Ordering::Relaxed) {
if let Ok(mut guard) = self.shared.model.lock() {
let mut model = guard.take().unwrap();
update(&mut model);
*guard = Some(model);
}
} else {
let mut update_opt = Some(update);
let update_fn = move |audio: &mut M| {
if let Some(update) = update_opt.take() {
update(audio);
}
};
self.update_tx.send(Box::new(update_fn))?;
}
Ok(())
}
pub fn cpal_format(&self) -> &cpal::Format {
&self.cpal_format
}
pub fn id(&self) -> &cpal::StreamId {
&self.shared.stream_id
}
}
impl<M> Shared<M> {
fn play(&self) -> Result<(), cpal::PlayStreamError> {
self.event_loop.play_stream(self.stream_id.clone())?;
self.is_paused.store(false, atomic::Ordering::Relaxed);
Ok(())
}
fn pause(&self) -> Result<(), cpal::PauseStreamError> {
self.is_paused.store(true, atomic::Ordering::Relaxed);
self.event_loop.pause_stream(self.stream_id.clone())?;
Ok(())
}
fn is_playing(&self) -> bool {
!self.is_paused.load(atomic::Ordering::Relaxed)
}
fn is_paused(&self) -> bool {
self.is_paused.load(atomic::Ordering::Relaxed)
}
}
impl<M> Clone for Stream<M> {
fn clone(&self) -> Self {
let update_tx = self.update_tx.clone();
let process_fn_tx = self.process_fn_tx.clone();
let shared = self.shared.clone();
let cpal_format = self.cpal_format.clone();
Stream {
update_tx,
process_fn_tx,
shared,
cpal_format,
}
}
}
impl<M> Drop for Shared<M> {
fn drop(&mut self) {
if self.is_playing() {
self.pause().ok();
}
self.event_loop.destroy_stream(self.stream_id.clone());
}
}
impl From<cpal::BuildStreamError> for BuildError {
fn from(err: cpal::BuildStreamError) -> Self {
BuildError::BuildStream { err }
}
}
impl From<cpal::SupportedFormatsError> for BuildError {
fn from(err: cpal::SupportedFormatsError) -> Self {
BuildError::SupportedFormats { err }
}
}
fn cpal_sample_format<S: Any>() -> Option<cpal::SampleFormat> {
let type_id = TypeId::of::<S>();
if type_id == TypeId::of::<f32>() {
Some(cpal::SampleFormat::F32)
} else if type_id == TypeId::of::<i16>() {
Some(cpal::SampleFormat::I16)
} else if type_id == TypeId::of::<u16>() {
Some(cpal::SampleFormat::U16)
} else {
None
}
}
fn matching_supported_formats(
mut supported_format: cpal::SupportedFormat,
sample_format: Option<cpal::SampleFormat>,
channels: Option<usize>,
sample_rate: Option<cpal::SampleRate>,
) -> Option<cpal::Format> {
if let Some(sample_format) = sample_format {
if supported_format.data_type != sample_format {
return None;
}
}
if let Some(channels) = channels {
let supported_channels = supported_format.channels as usize;
if supported_channels < channels {
return None;
} else if supported_channels > channels {
supported_format.channels = channels as u16;
}
}
if let Some(sample_rate) = sample_rate {
if supported_format.min_sample_rate > sample_rate
|| supported_format.max_sample_rate < sample_rate
{
return None;
}
let mut format = supported_format.with_max_sample_rate();
format.sample_rate = sample_rate;
return Some(format);
}
Some(supported_format.with_max_sample_rate())
}
fn find_best_matching_format<F>(
device: &cpal::Device,
mut sample_format: Option<cpal::SampleFormat>,
channels: Option<usize>,
mut sample_rate: Option<cpal::SampleRate>,
default_format: Option<cpal::Format>,
supported_formats: F,
) -> Result<Option<cpal::Format>, cpal::SupportedFormatsError>
where
F: Fn(&cpal::Device) -> Result<Vec<cpal::SupportedFormat>, cpal::SupportedFormatsError>,
{
loop {
{
if let Some(ref format) = default_format {
if sample_format == Some(format.data_type)
&& channels
.map(|ch| ch <= format.channels as usize)
.unwrap_or(true)
&& sample_rate == Some(format.sample_rate)
{
return Ok(Some(format.clone()));
}
}
let stream_formats = supported_formats(device)?.into_iter().filter_map(|fmt| {
matching_supported_formats(fmt, sample_format, channels, sample_rate)
});
if let Some(format) = stream_formats.max_by_key(|fmt| fmt.channels) {
return Ok(Some(format));
}
}
let default_sample_format = default_format.as_ref().map(|f| f.data_type);
if sample_format.is_some() && sample_format != default_sample_format {
sample_format = default_sample_format;
} else if sample_rate == Some(cpal::SampleRate(DEFAULT_SAMPLE_RATE)) {
let cpal_default_sample_rate = default_format.as_ref().map(|fmt| fmt.sample_rate);
let nannou_default_sample_rate = Some(cpal::SampleRate(DEFAULT_SAMPLE_RATE));
sample_rate = if cpal_default_sample_rate != nannou_default_sample_rate {
cpal_default_sample_rate
} else {
None
};
} else {
return Ok(None);
}
}
}