use crate::error::CaptureError;
use crossbeam_channel::{bounded, Receiver, Sender};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SourceRole {
Voice,
Call,
Default,
}
#[derive(Clone)]
pub struct AudioChunk {
pub samples: Vec<f32>,
pub rms: f32,
pub timestamp: Instant,
pub source: SourceRole,
}
static STREAM_AUDIO_LEVEL: AtomicU32 = AtomicU32::new(0);
pub fn stream_audio_level() -> u32 {
STREAM_AUDIO_LEVEL.load(Ordering::Relaxed)
}
pub struct AudioStream {
_stream: cpal::Stream,
stop: Arc<AtomicBool>,
err_flag: Arc<AtomicBool>,
pub receiver: Receiver<AudioChunk>,
pub sample_rate: u32,
pub device_name: String,
}
impl AudioStream {
pub fn start(device_override: Option<&str>) -> Result<Self, CaptureError> {
let host = cpal::default_host();
let device = crate::capture::select_input_device(&host, device_override)?;
let (tx, rx): (Sender<AudioChunk>, Receiver<AudioChunk>) = bounded(64);
let stop = Arc::new(AtomicBool::new(false));
let err_flag = Arc::new(AtomicBool::new(false));
let chunk_size: usize = 1600;
let mut chunk_buf: Vec<f32> = Vec::with_capacity(chunk_size);
let (stream, device_name, _config) = crate::resample::build_resampled_input_stream(
&device,
&stop,
&err_flag,
move |resampled: &[f32]| {
for &sample in resampled {
chunk_buf.push(sample);
if chunk_buf.len() >= chunk_size {
let samples: Vec<f32> = chunk_buf.drain(..chunk_size).collect();
let rms = compute_rms(&samples);
let level = (rms * 2000.0).min(100.0) as u32;
STREAM_AUDIO_LEVEL.store(level, Ordering::Relaxed);
let _ = tx.try_send(AudioChunk {
samples,
rms,
timestamp: Instant::now(),
source: SourceRole::Default,
});
}
}
},
)?;
tracing::info!(device = %device_name, "streaming audio capture started");
Ok(AudioStream {
_stream: stream,
stop,
err_flag,
receiver: rx,
sample_rate: 16000,
device_name,
})
}
pub fn has_error(&self) -> bool {
self.err_flag.load(Ordering::Relaxed)
}
pub fn stop(&self) {
self.stop.store(true, Ordering::Relaxed);
}
}
impl Drop for AudioStream {
fn drop(&mut self) {
self.stop();
}
}
fn compute_rms(samples: &[f32]) -> f32 {
if samples.is_empty() {
return 0.0;
}
let sum: f64 = samples.iter().map(|&s| (s as f64) * (s as f64)).sum();
(sum / samples.len() as f64).sqrt() as f32
}
pub struct MultiAudioStream {
voice: AudioStream,
call: AudioStream,
_merge_thread: std::thread::JoinHandle<()>,
stop: Arc<AtomicBool>,
pub receiver: Receiver<AudioChunk>,
}
impl MultiAudioStream {
pub fn start(voice_device: Option<&str>, call_device: &str) -> Result<Self, CaptureError> {
let voice = AudioStream::start(voice_device)?;
let call = AudioStream::start(Some(call_device))?;
let (tx, rx): (Sender<AudioChunk>, Receiver<AudioChunk>) = bounded(128);
let stop = Arc::new(AtomicBool::new(false));
let voice_rx = voice.receiver.clone();
let call_rx = call.receiver.clone();
let stop_clone = Arc::clone(&stop);
let tx_clone = tx.clone();
let merge_thread = std::thread::spawn(move || {
let timeout = std::time::Duration::from_millis(50);
while !stop_clone.load(Ordering::Relaxed) {
while let Ok(mut chunk) = voice_rx.try_recv() {
chunk.source = SourceRole::Voice;
let _ = tx.try_send(chunk);
}
while let Ok(mut chunk) = call_rx.try_recv() {
chunk.source = SourceRole::Call;
let _ = tx_clone.try_send(chunk);
}
std::thread::sleep(timeout);
}
});
tracing::info!(
voice = %voice.device_name,
call = %call.device_name,
"multi-source audio capture started"
);
Ok(MultiAudioStream {
voice,
call,
_merge_thread: merge_thread,
stop,
receiver: rx,
})
}
pub fn has_error(&self) -> bool {
self.voice.has_error() || self.call.has_error()
}
pub fn voice_device_name(&self) -> &str {
&self.voice.device_name
}
pub fn call_device_name(&self) -> &str {
&self.call.device_name
}
}
impl Drop for MultiAudioStream {
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
self.voice.stop();
self.call.stop();
}
}