active_call/media/vad/
mod.rs

1use crate::event::{EventSender, SessionEvent};
2use crate::media::processor::Processor;
3use crate::media::{AudioFrame, PcmBuf, Samples};
4use anyhow::Result;
5use serde::{Deserialize, Serialize};
6use serde_with::skip_serializing_none;
7use std::any::Any;
8use tokio_util::sync::CancellationToken;
9
10pub(crate) mod simd;
11pub mod tiny_silero;
12pub(crate) mod utils;
13pub use tiny_silero::TinySilero;
14
15#[cfg(test)]
16mod benchmark_all;
17#[cfg(test)]
18mod tests;
19
20#[skip_serializing_none]
21#[derive(Clone, Debug, Deserialize, Serialize)]
22#[serde(rename_all = "camelCase")]
23#[serde(default)]
24pub struct VADOption {
25    pub r#type: VadType,
26    pub samplerate: u32,
27    /// Padding before speech detection (in ms)
28    pub speech_padding: u64,
29    /// Padding after silence detection (in ms)
30    pub silence_padding: u64,
31    pub ratio: f32,
32    pub voice_threshold: f32,
33    pub max_buffer_duration_secs: u64,
34    /// Timeout duration for silence (in ms), None means disable this feature
35    pub silence_timeout: Option<u64>,
36    pub endpoint: Option<String>,
37    pub secret_key: Option<String>,
38    pub secret_id: Option<String>,
39}
40
41impl Default for VADOption {
42    fn default() -> Self {
43        Self {
44            r#type: VadType::Silero,
45            samplerate: 16000,
46            speech_padding: 250,  // min_speech_duration_ms (match silero_vad default)
47            silence_padding: 100, // min_silence_duration_ms
48            ratio: 0.5,
49            voice_threshold: 0.5,
50            max_buffer_duration_secs: 50,
51            silence_timeout: None,
52            endpoint: None,
53            secret_key: None,
54            secret_id: None,
55        }
56    }
57}
58
59#[derive(Clone, Debug, Serialize, Eq, Hash, PartialEq)]
60#[serde(rename_all = "lowercase")]
61pub enum VadType {
62    Silero,
63    Other(String),
64}
65
66impl<'de> Deserialize<'de> for VadType {
67    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
68    where
69        D: serde::Deserializer<'de>,
70    {
71        let value = String::deserialize(deserializer)?;
72        match value.as_str() {
73            "silero" => Ok(VadType::Silero),
74            _ => Ok(VadType::Other(value)),
75        }
76    }
77}
78
79impl std::fmt::Display for VadType {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        match self {
82            VadType::Silero => write!(f, "silero"),
83            VadType::Other(provider) => write!(f, "{}", provider),
84        }
85    }
86}
87
88impl TryFrom<&String> for VadType {
89    type Error = String;
90
91    fn try_from(value: &String) -> std::result::Result<Self, Self::Error> {
92        match value.as_str() {
93            "silero" => Ok(VadType::Silero),
94            other => Ok(VadType::Other(other.to_string())),
95        }
96    }
97}
98struct SpeechBuf {
99    samples: PcmBuf,
100    timestamp: u64,
101}
102
103struct VadProcessorInner {
104    vad: Box<dyn VadEngine>,
105    event_sender: EventSender,
106    option: VADOption,
107    window_bufs: Vec<SpeechBuf>,
108    triggered: bool,
109    triggered_event_sent: bool,
110    current_speech_start: Option<u64>,
111    temp_end: Option<u64>,
112}
113pub struct VadProcessor {
114    inner: VadProcessorInner,
115}
116
117pub trait VadEngine: Send + Sync + Any {
118    fn process(&mut self, frame: &mut AudioFrame) -> Vec<(bool, u64)>;
119}
120
121impl VadProcessorInner {
122    pub fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
123        let samples = match &frame.samples {
124            Samples::PCM { samples } => samples,
125            _ => return Ok(()),
126        };
127
128        let samples_cloned = samples.to_owned();
129        let results = self.vad.process(frame);
130        for (is_speaking, timestamp) in results {
131            if is_speaking || self.triggered {
132                let current_buf = SpeechBuf {
133                    samples: samples_cloned.clone(),
134                    timestamp,
135                };
136                self.window_bufs.push(current_buf);
137            }
138            self.process_vad_logic(is_speaking, timestamp, &frame.track_id)?;
139
140            // Clean up old buffers periodically
141            if self.window_bufs.len() > 1000 || !self.triggered {
142                let cutoff = if self.triggered {
143                    timestamp.saturating_sub(5000)
144                } else {
145                    timestamp.saturating_sub(self.option.silence_padding)
146                };
147                self.window_bufs.retain(|buf| buf.timestamp > cutoff);
148            }
149        }
150
151        Ok(())
152    }
153
154    fn process_vad_logic(
155        &mut self,
156        is_speaking: bool,
157        timestamp: u64,
158        track_id: &str,
159    ) -> Result<()> {
160        if is_speaking && !self.triggered {
161            self.triggered = true;
162            self.current_speech_start = Some(timestamp);
163            self.triggered_event_sent = false;
164        } else if is_speaking && self.triggered {
165            // Already triggered, check if we need to emit Speaking event for the first time
166            if let Some(start_time) = self.current_speech_start {
167                let duration = timestamp.saturating_sub(start_time);
168                // L1 filter: only emit Speaking if duration is enough (e.g. 200ms)
169                // Use speech_padding as min_speech_duration
170                if duration >= self.option.speech_padding && !self.triggered_event_sent {
171                    let event = SessionEvent::Speaking {
172                        track_id: track_id.to_string(),
173                        timestamp: crate::media::get_timestamp(),
174                        start_time,
175                        is_filler: None, // Will be enriched by MFCC or ASR
176                        confidence: Some(1.0),
177                    };
178                    self.event_sender.send(event).ok();
179                    self.triggered_event_sent = true;
180                }
181            }
182        } else if !is_speaking {
183            if self.temp_end.is_none() {
184                self.temp_end = Some(timestamp);
185            }
186
187            if let Some(temp_end) = self.temp_end {
188                // Use saturating_sub to handle timestamp wrapping or out-of-order frames
189                let silence_duration = timestamp.saturating_sub(temp_end);
190
191                // Process regular silence detection for speech segments
192                if self.triggered && silence_duration >= self.option.silence_padding {
193                    if let Some(start_time) = self.current_speech_start {
194                        // Use safe duration calculation
195                        let duration = temp_end.saturating_sub(start_time);
196                        if duration >= self.option.speech_padding {
197                            let samples_vec = self
198                                .window_bufs
199                                .iter()
200                                .filter(|buf| {
201                                    buf.timestamp >= start_time && buf.timestamp <= temp_end
202                                })
203                                .flat_map(|buf| buf.samples.iter())
204                                .cloned()
205                                .collect();
206                            self.window_bufs.clear();
207
208                            let event = SessionEvent::Silence {
209                                track_id: track_id.to_string(),
210                                timestamp: crate::media::get_timestamp(),
211                                start_time,
212                                duration,
213                                samples: Some(samples_vec),
214                            };
215                            self.event_sender.send(event).ok();
216                        }
217                    }
218                    self.triggered = false;
219                    self.triggered_event_sent = false;
220                    self.current_speech_start = None;
221                    self.temp_end = Some(timestamp); // Update temp_end for silence timeout tracking
222                }
223
224                // Process silence timeout if configured
225                if let Some(timeout) = self.option.silence_timeout {
226                    // Use same safe calculation for silence timeout
227                    let timeout_duration = timestamp.saturating_sub(temp_end);
228
229                    if timeout_duration >= timeout {
230                        let event = SessionEvent::Silence {
231                            track_id: track_id.to_string(),
232                            timestamp: crate::media::get_timestamp(),
233                            start_time: temp_end,
234                            duration: timeout_duration,
235                            samples: None,
236                        };
237                        self.event_sender.send(event).ok();
238                        self.temp_end = Some(timestamp);
239                    }
240                }
241            }
242        }
243
244        if is_speaking && self.temp_end.is_some() {
245            self.temp_end = None;
246        }
247
248        Ok(())
249    }
250}
251
252impl VadProcessor {
253    pub fn create(
254        _token: CancellationToken,
255        event_sender: EventSender,
256        option: VADOption,
257    ) -> Result<Box<dyn Processor>> {
258        let vad: Box<dyn VadEngine> = match option.r#type {
259            VadType::Silero => Box::new(tiny_silero::TinySilero::new(option.clone())?),
260            _ => Box::new(NopVad::new()?),
261        };
262        Ok(Box::new(VadProcessor::new(vad, event_sender, option)?))
263    }
264
265    pub fn create_nop(
266        _token: CancellationToken,
267        event_sender: EventSender,
268        option: VADOption,
269    ) -> Result<Box<dyn Processor>> {
270        let vad: Box<dyn VadEngine> = match option.r#type {
271            _ => Box::new(NopVad::new()?),
272        };
273        Ok(Box::new(VadProcessor::new(vad, event_sender, option)?))
274    }
275
276    pub fn new(
277        engine: Box<dyn VadEngine>,
278        event_sender: EventSender,
279        option: VADOption,
280    ) -> Result<Self> {
281        let inner = VadProcessorInner {
282            vad: engine,
283            event_sender,
284            option,
285            window_bufs: Vec::new(),
286            triggered_event_sent: false,
287            triggered: false,
288            current_speech_start: None,
289            temp_end: None,
290        };
291        Ok(Self { inner })
292    }
293}
294
295impl Processor for VadProcessor {
296    fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
297        self.inner.process_frame(frame)
298    }
299}
300
301struct NopVad {}
302
303impl NopVad {
304    pub fn new() -> Result<Self> {
305        Ok(Self {})
306    }
307}
308
309impl VadEngine for NopVad {
310    fn process(&mut self, frame: &mut AudioFrame) -> Vec<(bool, u64)> {
311        let samples = match &frame.samples {
312            Samples::PCM { samples } => samples,
313            _ => return vec![(false, frame.timestamp)],
314        };
315        // Check if there are any non-zero samples
316        let has_speech = samples.iter().any(|&x| x != 0);
317        vec![(has_speech, frame.timestamp)]
318    }
319}