use crate::error::CaptureError;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use crossbeam_channel::{bounded, Receiver, Sender};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
#[derive(Clone)]
pub struct AudioChunk {
pub samples: Vec<f32>,
pub rms: f32,
}
static STREAM_AUDIO_LEVEL: AtomicU32 = AtomicU32::new(0);
pub fn stream_audio_level() -> u32 {
STREAM_AUDIO_LEVEL.load(Ordering::Relaxed)
}
pub struct AudioStream {
_stream: cpal::Stream,
stop: Arc<AtomicBool>,
pub receiver: Receiver<AudioChunk>,
pub sample_rate: u32,
pub device_name: String,
}
impl AudioStream {
pub fn start() -> Result<Self, CaptureError> {
let host = cpal::default_host();
let device = host
.default_input_device()
.ok_or(CaptureError::DeviceNotFound)?;
let device_name = device.name().unwrap_or_else(|_| "unknown".into());
let config = device
.default_input_config()
.map_err(|e| CaptureError::Io(std::io::Error::other(format!("input config: {}", e))))?;
let native_rate = config.sample_rate().0;
let channels = config.channels() as usize;
let ratio = native_rate as f64 / 16000.0;
let (tx, rx): (Sender<AudioChunk>, Receiver<AudioChunk>) = bounded(64);
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = Arc::clone(&stop);
let chunk_size: usize = 1600;
let stream = match config.sample_format() {
cpal::SampleFormat::F32 => {
let mut resample_buf: Vec<f32> = Vec::new();
let mut resample_pos: f64 = 0.0;
let mut chunk_buf: Vec<f32> = Vec::with_capacity(chunk_size);
let tx = tx.clone();
device
.build_input_stream(
&config.into(),
move |data: &[f32], _: &cpal::InputCallbackInfo| {
if stop_clone.load(Ordering::Relaxed) {
return;
}
for frame in data.chunks(channels) {
let mono: f32 = frame.iter().sum::<f32>() / channels as f32;
resample_buf.push(mono);
}
while resample_pos < resample_buf.len() as f64 {
let idx = resample_pos as usize;
if idx < resample_buf.len() {
chunk_buf.push(resample_buf[idx]);
}
resample_pos += ratio;
if chunk_buf.len() >= chunk_size {
let samples: Vec<f32> = chunk_buf.drain(..chunk_size).collect();
let rms = compute_rms(&samples);
let level = (rms * 2000.0).min(100.0) as u32;
STREAM_AUDIO_LEVEL.store(level, Ordering::Relaxed);
let _ = tx.try_send(AudioChunk { samples, rms });
}
}
let consumed = (resample_pos as usize).min(resample_buf.len());
if consumed > 0 {
resample_buf.drain(..consumed);
resample_pos -= consumed as f64;
}
},
move |err| {
tracing::error!("streaming audio error: {}", err);
},
None,
)
.map_err(|e| {
CaptureError::Io(std::io::Error::other(format!("build stream: {}", e)))
})?
}
cpal::SampleFormat::I16 => {
let mut resample_buf: Vec<f32> = Vec::new();
let mut resample_pos: f64 = 0.0;
let mut chunk_buf: Vec<f32> = Vec::with_capacity(chunk_size);
let tx = tx.clone();
device
.build_input_stream(
&config.into(),
move |data: &[i16], _: &cpal::InputCallbackInfo| {
if stop_clone.load(Ordering::Relaxed) {
return;
}
for frame in data.chunks(channels) {
let mono: f32 =
frame.iter().map(|&s| s as f32 / 32768.0).sum::<f32>()
/ channels as f32;
resample_buf.push(mono);
}
while resample_pos < resample_buf.len() as f64 {
let idx = resample_pos as usize;
if idx < resample_buf.len() {
chunk_buf.push(resample_buf[idx]);
}
resample_pos += ratio;
if chunk_buf.len() >= chunk_size {
let samples: Vec<f32> = chunk_buf.drain(..chunk_size).collect();
let rms = compute_rms(&samples);
let level = (rms * 2000.0).min(100.0) as u32;
STREAM_AUDIO_LEVEL.store(level, Ordering::Relaxed);
let _ = tx.try_send(AudioChunk { samples, rms });
}
}
let consumed = (resample_pos as usize).min(resample_buf.len());
if consumed > 0 {
resample_buf.drain(..consumed);
resample_pos -= consumed as f64;
}
},
move |err| {
tracing::error!("streaming audio error: {}", err);
},
None,
)
.map_err(|e| {
CaptureError::Io(std::io::Error::other(format!("build stream: {}", e)))
})?
}
fmt => {
return Err(CaptureError::Io(std::io::Error::other(format!(
"unsupported format: {:?}",
fmt
))));
}
};
stream
.play()
.map_err(|e| CaptureError::Io(std::io::Error::other(format!("play: {}", e))))?;
tracing::info!(device = %device_name, "streaming audio capture started");
Ok(AudioStream {
_stream: stream,
stop,
receiver: rx,
sample_rate: 16000,
device_name,
})
}
pub fn stop(&self) {
self.stop.store(true, Ordering::Relaxed);
}
}
impl Drop for AudioStream {
fn drop(&mut self) {
self.stop();
}
}
fn compute_rms(samples: &[f32]) -> f32 {
if samples.is_empty() {
return 0.0;
}
let sum: f64 = samples.iter().map(|&s| (s as f64) * (s as f64)).sum();
(sum / samples.len() as f64).sqrt() as f32
}