use audio_mel::audio_info::*;
use melodium_core::*;
use melodium_macro::mel_treatment;
#[mel_treatment(
input trigger Block<void>,
output signal Stream<f32>
output info Block<AudioInfo>
output failed Block<void>
output errors Stream<string>
)]
pub async fn record_mono(device: Option<string>, sample_rate: Option<u32>) {
#[cfg(feature = "real")]
if let Ok(_) = trigger.recv_one().await {
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use std::sync::Arc;
let (mono_sender, mono_receiver) = async_channel::bounded::<Vec<f32>>(256);
let (err_sender, err_receiver) = async_channel::bounded::<(bool, String)>(64);
let (info_sender, info_receiver) = async_channel::bounded::<AudioInfo>(1);
let capture_fut = async move {
let _ = async_std::task::spawn_blocking(move || {
let host = cpal::default_host();
let input_device = match &device {
None => match host.default_input_device() {
Some(d) => d,
None => {
let _ = async_std::task::block_on(
err_sender
.send((true, "no default input device available".to_string())),
);
return;
}
},
Some(name) => {
let found = host.input_devices().ok().and_then(|mut iter| {
iter.find(|d| {
d.description()
.map(|desc| desc.name() == name.as_str())
.unwrap_or(false)
})
});
match found {
Some(d) => d,
None => {
let _ = async_std::task::block_on(
err_sender
.send((true, format!("input device not found: {name}"))),
);
return;
}
}
}
};
let config = match input_device.default_input_config() {
Ok(c) => c,
Err(e) => {
let _ = async_std::task::block_on(
err_sender.send((true, format!("failed to get input config: {e}"))),
);
return;
}
};
let num_channels = config.channels() as usize;
let sample_format = config.sample_format();
let stream_config = if let Some(rate) = sample_rate {
let supported =
input_device
.supported_input_configs()
.ok()
.and_then(|mut iter| {
iter.find(|r| {
r.channels() as usize == num_channels
&& r.sample_format() == sample_format
&& r.min_sample_rate() <= rate
&& rate <= r.max_sample_rate()
})
});
match supported {
Some(range) => range.with_sample_rate(rate).config(),
None => {
let _ = async_std::task::block_on(err_sender.send((
true,
format!("requested sample rate {rate} not supported by device"),
)));
return;
}
}
} else {
config.config()
};
let stream_result = build_stream(
&input_device,
&stream_config,
sample_format,
num_channels,
mono_sender,
err_sender.clone(),
);
let stream = match stream_result {
Ok(s) => s,
Err(e) => {
let _ = async_std::task::block_on(
err_sender.send((true, format!("failed to build input stream: {e}"))),
);
return;
}
};
if let Err(e) = stream.play() {
let _ = async_std::task::block_on(
err_sender.send((true, format!("failed to start stream: {e}"))),
);
return;
}
let audio_info = AudioInfo {
codec: format!("{:?}", sample_format),
channels: num_channels as u32,
sample_rate: stream_config.sample_rate,
duration_seconds: None,
};
let _ = async_std::task::block_on(info_sender.send(audio_info));
let thread = std::thread::current();
loop {
std::thread::park_timeout(std::time::Duration::from_millis(100));
if err_sender.is_closed() {
break;
}
let _ = &stream;
let _ = &thread;
}
})
.await;
};
let forward_fut = async {
while let Ok(mono) = mono_receiver.recv().await {
let batch: VecDeque<f32> = mono.into_iter().collect();
if signal.send_many(batch.into()).await.is_err() {
break;
}
}
};
let info_fut = async {
if let Ok(audio_info) = info_receiver.recv().await {
let _ = info.send_one(Value::Data(Arc::new(audio_info))).await;
}
};
let error_fut = async {
while let Ok((fatal, msg)) = err_receiver.recv().await {
let _ = errors.send_one(msg.into()).await;
if fatal {
let _ = failed.send_one(().into()).await;
break;
}
}
};
futures::join!(capture_fut, forward_fut, info_fut, error_fut);
}
#[cfg(feature = "mock")]
if let Ok(_) = trigger.recv_one().await {
let _ = failed.send_one(().into()).await;
let _ = errors
.send_one(
"Live recording is not available because the 'record' package is in mock mode"
.to_string()
.into(),
)
.await;
}
}
#[cfg(feature = "real")]
fn build_stream(
device: &cpal::Device,
config: &cpal::StreamConfig,
sample_format: cpal::SampleFormat,
num_channels: usize,
mono_sender: async_channel::Sender<Vec<f32>>,
err_sender: async_channel::Sender<(bool, String)>,
) -> Result<cpal::Stream, String> {
use cpal::traits::DeviceTrait;
use cpal::SampleFormat;
macro_rules! build {
($t:ty) => {{
let sender = mono_sender.clone();
let err = err_sender.clone();
device
.build_input_stream(
config,
move |data: &[$t], _: &cpal::InputCallbackInfo| {
let mono = mix_to_mono::<$t>(data, num_channels);
if async_std::task::block_on(sender.send(mono)).is_err() {
}
},
move |e| {
let _ = async_std::task::block_on(
err.send((false, format!("stream error: {e}"))),
);
},
None,
)
.map_err(|e| e.to_string())
}};
}
match sample_format {
SampleFormat::F32 => build!(f32),
SampleFormat::F64 => build!(f64),
SampleFormat::I8 => build!(i8),
SampleFormat::I16 => build!(i16),
SampleFormat::I32 => build!(i32),
SampleFormat::I64 => build!(i64),
SampleFormat::U8 => build!(u8),
SampleFormat::U16 => build!(u16),
SampleFormat::U32 => build!(u32),
SampleFormat::U64 => build!(u64),
f => Err(format!("unsupported sample format: {f:?}")),
}
}
#[cfg(feature = "real")]
fn mix_to_mono<T>(data: &[T], num_channels: usize) -> Vec<f32>
where
T: cpal::SizedSample,
f32: cpal::FromSample<T>,
{
use cpal::FromSample;
if num_channels <= 1 {
return data.iter().map(|&s| f32::from_sample_(s)).collect();
}
let inv = 1.0_f32 / num_channels as f32;
data.chunks_exact(num_channels)
.map(|frame| frame.iter().map(|&s| f32::from_sample_(s)).sum::<f32>() * inv)
.collect()
}