use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use super::{WatchConfig, WatchEvent};
pub async fn run_audio_watcher(
config: WatchConfig,
event_tx: mpsc::Sender<WatchEvent>,
cancel: CancellationToken,
) {
debug!("audio watcher starting");
loop {
tokio::select! {
_ = cancel.cancelled() => {
debug!("audio watcher cancelled");
break;
}
_ = tokio::time::sleep(Duration::from_secs_f32(config.audio_window_secs)) => {
process_audio_window(&config, &event_tx).await;
}
}
}
}
async fn process_audio_window(config: &WatchConfig, event_tx: &mpsc::Sender<WatchEvent>) {
let window_secs = config.audio_window_secs;
let vad_threshold = config.audio_vad_threshold_db;
let result =
tokio::task::spawn_blocking(move || capture_and_analyse(window_secs, vad_threshold)).await;
match result {
Ok(Some(event)) => {
if event_tx.try_send(event).is_err() {
debug!("audio event channel full — event dropped");
}
}
Ok(None) => {
debug!("audio window below VAD threshold or empty transcript");
}
Err(join_err) => {
warn!(error = %join_err, "audio capture task panicked");
let _ = event_tx.try_send(WatchEvent::Error {
source: "audio_watcher".into(),
message: join_err.to_string(),
});
}
}
}
fn capture_and_analyse(window_secs: f32, vad_threshold_db: f32) -> Option<WatchEvent> {
#[cfg(feature = "audio")]
{
use crate::audio::{capture_microphone, transcribe};
let audio = match capture_microphone(window_secs) {
Ok(a) => a,
Err(e) => {
warn!(error = %e, "microphone capture failed");
return Some(WatchEvent::Error {
source: "audio_watcher".into(),
message: e.to_string(),
});
}
};
let rms_db = compute_rms_db(&audio.samples);
debug!(rms_db, threshold = vad_threshold_db, "audio VAD check");
if rms_db < vad_threshold_db {
return None;
}
let timestamp = current_timestamp();
let text = match transcribe(&audio) {
Ok(t) => t,
Err(e) => {
warn!(error = %e, "transcription failed");
return Some(WatchEvent::Error {
source: "audio_watcher".into(),
message: e.to_string(),
});
}
};
if text.trim().is_empty() {
return None;
}
Some(WatchEvent::Speech {
text,
confidence: 1.0, timestamp,
})
}
#[cfg(not(feature = "audio"))]
{
let _ = (window_secs, vad_threshold_db);
None
}
}
#[must_use]
pub fn compute_rms_db(samples: &[f32]) -> f32 {
if samples.is_empty() {
return -96.0;
}
let mean_sq = samples.iter().map(|s| s * s).sum::<f32>() / samples.len() as f32;
let rms = mean_sq.sqrt();
20.0 * rms.max(1e-5_f32).log10()
}
pub fn current_timestamp_pub() -> String {
current_timestamp()
}
fn current_timestamp() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let (y, mo, d, h, mi, s) = epoch_secs_to_parts(secs);
format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}Z")
}
pub(crate) fn epoch_secs_to_parts(secs: u64) -> (u32, u32, u32, u32, u32, u32) {
let s = (secs % 60) as u32;
let m = ((secs / 60) % 60) as u32;
let h = ((secs / 3600) % 24) as u32;
let z = (secs / 86400) as i64 + 719_468; let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = (z - era * 146_097) as u64; let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365; let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); let mp = (5 * doy + 2) / 153; let d = doy - (153 * mp + 2) / 5 + 1;
let mo = if mp < 10 { mp + 3 } else { mp - 9 };
let y_adj = if mo <= 2 { y + 1 } else { y };
(y_adj as u32, mo as u32, d as u32, h, m, s)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rms_db_all_zeros_returns_near_floor() {
let samples = vec![0.0f32; 16_000];
let db = compute_rms_db(&samples);
assert!(db < -90.0, "expected < -90 dB for silence, got {db}");
}
#[test]
fn rms_db_full_scale_sine_near_zero_db() {
let samples = vec![1.0f32; 16_000];
let db = compute_rms_db(&samples);
assert!(
db > -1.0 && db <= 1.0,
"expected ~0 dB for full scale, got {db}"
);
}
#[test]
fn rms_db_half_amplitude_is_about_minus_six_db() {
let samples = vec![0.5f32; 16_000];
let db = compute_rms_db(&samples);
assert!(
(-7.0..=-5.0).contains(&db),
"expected ~-6 dB for 0.5 amplitude, got {db}"
);
}
#[test]
fn rms_db_empty_slice_returns_floor() {
let db = compute_rms_db(&[]);
assert_eq!(db, -96.0);
}
#[test]
fn rms_db_speech_level_exceeds_minus_forty_db() {
let samples = vec![0.012f32; 16_000];
let db = compute_rms_db(&samples);
assert!(db > -40.0, "expected above VAD threshold, got {db}");
}
#[test]
fn current_timestamp_has_iso8601_format() {
let ts = current_timestamp();
assert_eq!(ts.len(), 20, "timestamp wrong length: {ts}");
assert_eq!(&ts[4..5], "-");
assert_eq!(&ts[7..8], "-");
assert_eq!(&ts[10..11], "T");
assert_eq!(&ts[13..14], ":");
assert_eq!(&ts[16..17], ":");
assert_eq!(&ts[19..20], "Z");
}
#[test]
fn epoch_secs_to_parts_known_date() {
let (y, mo, d, h, mi, s) = epoch_secs_to_parts(1_773_964_800); assert_eq!(y, 2026);
assert_eq!(mo, 3);
assert_eq!(d, 20);
assert_eq!(h, 0);
assert_eq!(mi, 0);
assert_eq!(s, 0);
}
#[test]
fn epoch_secs_to_parts_unix_epoch() {
let (y, mo, d, h, mi, s) = epoch_secs_to_parts(0);
assert_eq!(y, 1970);
assert_eq!(mo, 1);
assert_eq!(d, 1);
assert_eq!(h, 0);
assert_eq!(mi, 0);
assert_eq!(s, 0);
}
#[test]
fn default_vad_threshold_is_negative_forty_db() {
let cfg = WatchConfig::default();
assert_eq!(cfg.audio_vad_threshold_db, -40.0);
}
#[test]
fn default_audio_window_is_five_seconds() {
let cfg = WatchConfig::default();
assert_eq!(cfg.audio_window_secs, 5.0);
}
}