use anyhow::{anyhow, Error};
use hound::{WavIntoSamples, WavReader, WavWriter};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::{
fs::{self, File},
io::{BufReader, BufWriter},
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
use structopt::StructOpt;
use webrtc_audio_processing::Processor;
use webrtc_audio_processing_config::Config;
mod common;
use common::{deinterleave, interleave};
const AUDIO_SAMPLE_RATE: u32 = 48_000;
const AUDIO_INTERLEAVED: bool = true;
#[derive(Debug, StructOpt)]
struct Args {
#[structopt(short, long)]
pub config_file: PathBuf,
}
#[derive(Deserialize, Serialize, Default, Clone, Debug)]
struct CaptureOptions {
device_name: String,
num_channels: u16,
source_path: Option<PathBuf>,
preprocess_sink_path: Option<PathBuf>,
postprocess_sink_path: Option<PathBuf>,
}
#[derive(Deserialize, Serialize, Default, Clone, Debug)]
struct RenderOptions {
device_name: String,
num_channels: u16,
source_path: Option<PathBuf>,
#[serde(default)]
mute: bool,
}
#[derive(Deserialize, Serialize, Default, Clone, Debug)]
struct Options {
capture: CaptureOptions,
render: RenderOptions,
config: Config,
}
fn match_device(
pa: &portaudio::PortAudio,
device_name: Regex,
) -> Result<portaudio::DeviceIndex, Error> {
for device in (pa.devices()?).flatten() {
if device_name.is_match(device.1.name) {
return Ok(device.0);
}
}
Err(anyhow!("Audio device matching \"{}\" not found.", device_name))
}
fn create_stream_settings(
pa: &portaudio::PortAudio,
processor: &Processor,
opt: &Options,
) -> Result<portaudio::DuplexStreamSettings<f32, f32>, Error> {
let input_device = match_device(pa, Regex::new(&opt.capture.device_name)?)?;
let input_device_info = &pa.device_info(input_device)?;
let input_params = portaudio::StreamParameters::<f32>::new(
input_device,
opt.capture.num_channels as i32,
AUDIO_INTERLEAVED,
input_device_info.default_low_input_latency,
);
let output_device = match_device(pa, Regex::new(&opt.render.device_name)?)?;
let output_device_info = &pa.device_info(output_device)?;
let output_params = portaudio::StreamParameters::<f32>::new(
output_device,
opt.render.num_channels as i32,
AUDIO_INTERLEAVED,
output_device_info.default_low_output_latency,
);
pa.is_duplex_format_supported(input_params, output_params, f64::from(AUDIO_SAMPLE_RATE))?;
Ok(portaudio::DuplexStreamSettings::new(
input_params,
output_params,
f64::from(AUDIO_SAMPLE_RATE),
processor.num_samples_per_frame() as u32,
))
}
fn open_wav_writer(path: &Path, channels: u16) -> Result<WavWriter<BufWriter<File>>, Error> {
let sink = hound::WavWriter::<BufWriter<File>>::create(
path,
hound::WavSpec {
channels,
sample_rate: AUDIO_SAMPLE_RATE,
bits_per_sample: 32,
sample_format: hound::SampleFormat::Float,
},
)?;
Ok(sink)
}
fn open_wav_reader(path: &Path) -> Result<WavIntoSamples<BufReader<File>, f32>, Error> {
let reader = WavReader::<BufReader<File>>::open(path)?;
Ok(reader.into_samples())
}
fn copy_stream(source: &mut WavIntoSamples<BufReader<File>, f32>, dest: &mut [f32]) -> bool {
let mut dest_iter = dest.iter_mut();
for sample in source.flatten() {
*dest_iter.next().unwrap() = sample;
if dest_iter.len() == 0 {
break;
}
}
let source_eof = dest_iter.len() > 0;
for sample in dest_iter {
*sample = 0.0;
}
!source_eof
}
fn main() -> Result<(), Error> {
let args = Args::from_args();
let opt: Options = json5::from_str(&fs::read_to_string(&args.config_file)?)?;
let pa = portaudio::PortAudio::new()?;
let processor = Arc::new(Processor::new(AUDIO_SAMPLE_RATE)?);
processor.set_config(opt.config);
let running = Arc::new(AtomicBool::new(true));
let mut capture_source =
if let Some(path) = &opt.capture.source_path { Some(open_wav_reader(path)?) } else { None };
let mut capture_preprocess_sink = if let Some(path) = &opt.capture.preprocess_sink_path {
Some(open_wav_writer(path, opt.capture.num_channels)?)
} else {
None
};
let mut capture_postprocess_sink = if let Some(path) = &opt.capture.postprocess_sink_path {
Some(open_wav_writer(path, opt.capture.num_channels)?)
} else {
None
};
let mut render_source =
if let Some(path) = &opt.render.source_path { Some(open_wav_reader(path)?) } else { None };
let audio_callback = {
let mut input_mut =
vec![0f32; processor.num_samples_per_frame() * opt.capture.num_channels as usize];
let mut input_deinterleaved =
vec![vec![0f32; processor.num_samples_per_frame()]; opt.capture.num_channels as usize];
let mut output_deinterleaved =
vec![vec![0f32; processor.num_samples_per_frame()]; opt.render.num_channels as usize];
let running = running.clone();
let mute = opt.render.mute;
let processor = Arc::clone(&processor);
move |portaudio::DuplexStreamCallbackArgs { in_buffer, out_buffer, frames, .. }| {
assert_eq!(frames, processor.num_samples_per_frame());
let mut should_continue = true;
if let Some(source) = &mut capture_source {
if !copy_stream(source, &mut input_mut) {
should_continue = false;
}
} else {
input_mut.copy_from_slice(in_buffer);
}
if let Some(sink) = &mut capture_preprocess_sink {
for sample in &input_mut {
sink.write_sample(*sample).unwrap();
}
}
deinterleave(&input_mut, &mut input_deinterleaved);
processor.process_capture_frame(&mut input_deinterleaved).unwrap();
interleave(&input_deinterleaved, &mut input_mut);
if let Some(sink) = &mut capture_postprocess_sink {
for sample in &input_mut {
sink.write_sample(*sample).unwrap();
}
}
if let Some(source) = &mut render_source {
if !copy_stream(source, out_buffer) {
should_continue = false;
}
} else {
out_buffer.iter_mut().for_each(|m| *m = 0.0)
}
deinterleave(out_buffer, &mut output_deinterleaved);
processor.process_render_frame(&mut output_deinterleaved).unwrap();
interleave(&output_deinterleaved, out_buffer);
if mute {
out_buffer.iter_mut().for_each(|m| *m = 0.0)
}
if should_continue {
portaudio::Continue
} else {
running.store(false, Ordering::SeqCst);
portaudio::Complete
}
}
};
let stream_settings = create_stream_settings(&pa, &processor, &opt)?;
let mut stream = pa.open_non_blocking_stream(stream_settings, audio_callback)?;
stream.start()?;
ctrlc::set_handler({
let running = running.clone();
move || {
running.store(false, Ordering::SeqCst);
}
})?;
while running.load(Ordering::SeqCst) {
thread::sleep(Duration::from_millis(10));
}
println!("{:#?}", processor.get_stats());
Ok(())
}