use {
crate::{GenericSample, OperationError},
cpal::{
BufferSize, Device, HostId, Stream, StreamConfig, SupportedStreamConfig, default_host,
traits::{DeviceTrait, HostTrait, StreamTrait},
},
rodio::{ChannelCount, SampleRate, buffer::SamplesBuffer, source::UniformSourceIterator},
std::{
fmt::{Debug, Error as FmtError, Formatter, Result as FmtResult},
io::{Error as IoError, ErrorKind},
mem::replace,
},
tokio::{
sync::mpsc::{Receiver, channel},
time::{Duration, sleep},
},
};
pub struct AudioCollector {
device: Device,
host_id: HostId,
receiver: Receiver<f32>,
stream_config: StreamConfig,
supported_stream_config: SupportedStreamConfig,
stream: Stream,
}
impl AudioCollector {
fn create_stream(
device: &Device,
stream_config: &StreamConfig,
) -> Result<(Receiver<f32>, Stream), OperationError> {
let buffer_size = match stream_config.buffer_size {
BufferSize::Default => 8192,
BufferSize::Fixed(size) => size as _,
};
let (tx, rx) = channel(buffer_size);
Ok((
rx,
device.build_input_stream(
stream_config.clone(),
move |buffer: &[f32], _| {
let iter = match tx.try_reserve_many(buffer.len()) {
Err(e) => {
eprintln!("AudioCollector can't send data: {}", e);
return;
}
Ok(p) => p,
};
let iter = iter.enumerate();
for (i, permit) in iter {
permit.send(buffer[i]);
}
},
|e| eprintln!("{}", e),
None,
)?,
))
}
fn update_stream(&mut self) -> Result<(), OperationError> {
let (receiver, stream) = Self::create_stream(&self.device, &self.stream_config)?;
drop(replace(&mut self.stream, stream));
drop(replace(&mut self.receiver, receiver));
Ok(())
}
pub fn new() -> Result<Self, OperationError> {
let host = default_host();
let host_id = host.id();
let device = host.default_input_device().ok_or(OperationError::NoDevice(
"No default audio input device.".to_owned(),
))?;
let supported_stream_config = device.default_input_config()?;
let stream_config = supported_stream_config.config();
let (receiver, stream) = Self::create_stream(&device, &stream_config)?;
Ok(Self {
device,
host_id,
receiver,
stream_config,
supported_stream_config,
stream,
})
}
pub fn get_name(&self) -> Result<String, OperationError> {
Ok(self.device.description()?.name().to_owned())
}
pub fn get_supported_stream_channels(&self) -> usize {
self.supported_stream_config.channels() as _
}
pub fn get_supported_stream_sample_rate(&self) -> usize {
self.supported_stream_config.sample_rate() as _
}
pub fn set_stream_channels(&mut self, channels: usize) -> Result<(), OperationError> {
self.stream_config.channels = channels as _;
self.update_stream()
}
pub fn set_stream_sample_rate(&mut self, sample_rate: usize) -> Result<(), OperationError> {
self.stream_config.sample_rate = sample_rate as _;
self.update_stream()
}
pub fn get_stream_channels(&self) -> usize {
self.stream_config.channels as _
}
pub fn get_stream_sample_rate(&self) -> usize {
self.stream_config.sample_rate as _
}
pub fn collect(&self) -> Result<(), OperationError> {
Ok(self.stream.play()?)
}
pub fn pause(&self) -> Result<(), OperationError> {
Ok(self.stream.pause()?)
}
pub async fn read<const SR: usize, S>(
&mut self,
channels: usize,
) -> Result<Vec<S>, OperationError>
where
S: GenericSample,
{
if self.receiver.is_empty() {
sleep(Duration::from_millis(
(self.receiver.max_capacity() * 1000
/ self.get_stream_channels()
/ self.get_stream_sample_rate())
.min(25) as _,
))
.await;
}
let capacity = self.receiver.max_capacity() - self.receiver.capacity();
let mut buffer = Vec::with_capacity(capacity);
let read = self.receiver.recv_many(&mut buffer, capacity).await;
if capacity > 0 && read == 0 {
return Err(OperationError::Io(IoError::new(
ErrorKind::UnexpectedEof,
"No more data.",
)));
}
let res = if self.get_stream_channels() != channels || self.get_stream_sample_rate() != SR {
let buffer = SamplesBuffer::new(
ChannelCount::new(self.get_stream_channels() as _)
.ok_or(IoError::other("Invalid channel count."))?,
SampleRate::new(self.get_stream_sample_rate() as _)
.ok_or(IoError::other("Invalid sample rate."))?,
&buffer[..read],
);
let resampled: Vec<f32> = UniformSourceIterator::new(
buffer,
ChannelCount::new(channels as _).ok_or(IoError::other("Invalid channel count."))?,
SampleRate::new(SR as _).ok_or(IoError::other("Invalid sample rate."))?,
)
.collect();
resampled.iter().map(|&v| S::from_f32(v)).collect()
} else {
buffer[..read].iter().map(|&v| S::from_f32(v)).collect()
};
Ok(res)
}
pub fn close(&mut self) {
self.receiver.close()
}
}
impl Debug for AudioCollector {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(
f,
"AudioCollector({}, {})",
self.host_id.name(),
self.get_name().map_err(|_| FmtError)?
)
}
}