use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use async_trait::async_trait;
use log;
use crate::error::Result;
use crate::frames::{
Frame, FrameDirection, FrameHandler, FrameInner, FrameProcessor, SystemFrame,
};
use super::analyzer::VadAnalyzer;
use super::params::VadParams;
use super::state::{StateMachine, VadState};
use super::{VadBackend, create_vad};
struct VadProcessorState {
machine: StateMachine,
model: Arc<dyn VadAnalyzer>,
start_secs: f32,
stop_secs: f32,
}
pub struct VadProcessor {
state: Arc<Mutex<VadProcessorState>>,
}
impl VadProcessor {
pub fn new(
sample_rate: u32,
params: VadParams,
backend: VadBackend,
) -> Result<Self> {
let model = create_vad(backend, sample_rate)
.map_err(|e| crate::error::PipecatError::pipeline(e))?;
let start_secs = params.start_secs;
let stop_secs = params.stop_secs;
let machine = StateMachine::new(sample_rate, params);
Ok(Self {
state: Arc::new(Mutex::new(VadProcessorState {
machine,
model,
start_secs,
stop_secs,
})),
})
}
pub fn with_defaults(sample_rate: u32, params: VadParams) -> Result<Self> {
Self::new(sample_rate, params, VadBackend::default())
}
pub fn into_processor(self) -> FrameProcessor {
FrameProcessor::new("VadProcessor", Box::new(self), false)
}
}
#[async_trait]
impl FrameHandler for VadProcessor {
async fn on_process_frame(
&self,
processor: &FrameProcessor,
frame: Frame,
direction: FrameDirection,
) -> Result<()> {
if let FrameInner::System(SystemFrame::InputAudioRaw(ref audio_data)) = frame.inner {
if direction == FrameDirection::Downstream {
processor.push_frame(frame.clone(), direction).await?;
let window_opt = {
let mut guard = self.state.lock().unwrap();
guard.machine.next_window(&audio_data.audio)
};
if let Some(window) = window_opt {
let model = {
self.state.lock().unwrap().model.clone()
};
let confidence = model.voice_confidence(window.clone()).await;
let (prev_state, new_state, start_secs, stop_secs) = {
let mut guard = self.state.lock().unwrap();
let prev = guard.machine.state;
let next = guard.machine.advance(confidence, &window);
(prev, next, guard.start_secs, guard.stop_secs)
};
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
match (prev_state, new_state) {
(s, VadState::Speaking) if s != VadState::Speaking => {
log::debug!("VAD: user started speaking (confidence={:.3})", confidence);
let vad_frame = Frame::vad_user_started_speaking(start_secs, ts);
processor
.push_frame(vad_frame, FrameDirection::Downstream)
.await?;
}
(s, VadState::Quiet) if s != VadState::Quiet => {
log::debug!("VAD: user stopped speaking (confidence={:.3})", confidence);
let vad_frame = Frame::vad_user_stopped_speaking(stop_secs, ts);
processor
.push_frame(vad_frame, FrameDirection::Downstream)
.await?;
}
_ => {}
}
}
return Ok(());
}
}
processor.push_frame(frame, direction).await
}
}