use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
use log::*;
use std::fmt::Debug;
use tokio::sync::watch;
use crate::audio::transformers::{NoiseGate, Transformer};
use crate::audio::SAMPLE_RATE;
use crate::error::{AudioError, AudioStream};
use crate::state::StatePhase;
pub fn callback<T: Sample>(
mut input_sender: futures_channel::mpsc::Sender<Vec<u8>>,
mut transformers: Vec<Box<dyn Transformer + Send + 'static>>,
mut opus_encoder: opus::Encoder,
buffer_size: usize,
input_volume_receiver: watch::Receiver<f32>,
phase_watcher: watch::Receiver<StatePhase>,
) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static {
let mut buffer = Vec::with_capacity(buffer_size);
move |data: &[T], _info: &InputCallbackInfo| {
if !matches!(&*phase_watcher.borrow(), StatePhase::Connected(_)) {
return;
}
let input_volume = *input_volume_receiver.borrow();
let mut data = data.iter().map(|e| e.to_f32()).map(|e| e * input_volume);
while buffer.len() + data.len() > buffer_size {
buffer.extend(data.by_ref().take(buffer_size - buffer.len()));
let encoded = transformers
.iter_mut()
.try_fold((opus::Channels::Mono, &mut buffer[..]), |acc, e| {
e.transform(acc)
})
.map(|buf| opus_encoder.encode_vec_float(&*buf.1, buffer_size).unwrap());
if let Some(encoded) = encoded {
if let Err(e) = input_sender.try_send(encoded) {
warn!("Error sending audio: {}", e);
}
}
buffer.clear();
}
buffer.extend(data);
}
}
pub trait AudioInputDevice {
fn play(&self) -> Result<(), AudioError>;
fn pause(&self) -> Result<(), AudioError>;
fn set_volume(&self, volume: f32);
fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>>;
fn num_channels(&self) -> usize;
}
pub struct DefaultAudioInputDevice {
stream: cpal::Stream,
sample_receiver: Option<futures_channel::mpsc::Receiver<Vec<u8>>>,
volume_sender: watch::Sender<f32>,
channels: u16,
}
impl DefaultAudioInputDevice {
pub fn new(
input_volume: f32,
disable_noise_gate: bool,
phase_watcher: watch::Receiver<StatePhase>,
frame_size: u32, ) -> Result<Self, AudioError> {
let sample_rate = SampleRate(SAMPLE_RATE);
let host = cpal::default_host();
let input_device = host
.default_input_device()
.ok_or(AudioError::NoDevice(AudioStream::Input))?;
let input_supported_config = input_device
.supported_input_configs()
.map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))?
.find_map(|c| {
if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
Some(c)
} else {
None
}
})
.ok_or(AudioError::NoSupportedConfig(AudioStream::Input))?
.with_sample_rate(sample_rate);
let input_supported_sample_format = input_supported_config.sample_format();
let input_config: StreamConfig = input_supported_config.into();
let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000);
let (volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
let opus_encoder = opus::Encoder::new(
sample_rate.0,
match input_config.channels {
1 => opus::Channels::Mono,
2 => opus::Channels::Stereo,
_ => unimplemented!(
"Only 1 or 2 channels supported, got {}",
input_config.channels
),
},
opus::Application::Voip,
)
.unwrap();
let buffer_size = (sample_rate.0 * frame_size / 400) as usize;
let transformers = if disable_noise_gate {
vec![]
} else {
vec![Box::new(NoiseGate::new(50)) as Box<dyn Transformer + Send + 'static>]
};
let input_stream = match input_supported_sample_format {
SampleFormat::F32 => input_device.build_input_stream(
&input_config,
callback::<f32>(
sample_sender,
transformers,
opus_encoder,
buffer_size,
input_volume_receiver,
phase_watcher,
),
err_fn,
),
SampleFormat::I16 => input_device.build_input_stream(
&input_config,
callback::<i16>(
sample_sender,
transformers,
opus_encoder,
buffer_size,
input_volume_receiver,
phase_watcher,
),
err_fn,
),
SampleFormat::U16 => input_device.build_input_stream(
&input_config,
callback::<u16>(
sample_sender,
transformers,
opus_encoder,
buffer_size,
input_volume_receiver,
phase_watcher,
),
err_fn,
),
}
.map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?;
let res = Self {
stream: input_stream,
sample_receiver: Some(sample_receiver),
volume_sender,
channels: input_config.channels,
};
Ok(res)
}
}
impl AudioInputDevice for DefaultAudioInputDevice {
fn play(&self) -> Result<(), AudioError> {
self.stream.play().map_err(AudioError::InputPlayError)
}
fn pause(&self) -> Result<(), AudioError> {
self.stream.pause().map_err(AudioError::InputPauseError)
}
fn set_volume(&self, volume: f32) {
self.volume_sender.send(volume).unwrap();
}
fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>> {
self.sample_receiver.take()
}
fn num_channels(&self) -> usize {
self.channels as usize
}
}
impl Debug for DefaultAudioInputDevice {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DefaultAudioInputDevice")
.field("sample_receiver", &self.sample_receiver)
.field("channels", &self.channels)
.field("volume_sender", &self.volume_sender)
.field("stream", &"cpal::Stream")
.finish()
}
}