pub mod audio_watcher;
pub mod camera_watcher;
pub mod ring_buffer;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct WatchConfig {
pub audio_enabled: bool,
pub camera_enabled: bool,
pub audio_vad_threshold_db: f32,
pub audio_window_secs: f32,
pub camera_poll_interval_ms: u64,
pub camera_motion_threshold: f32,
}
impl Default for WatchConfig {
fn default() -> Self {
Self {
audio_enabled: false,
camera_enabled: false,
audio_vad_threshold_db: -40.0,
audio_window_secs: 5.0,
camera_poll_interval_ms: 2000,
camera_motion_threshold: 0.05,
}
}
}
#[derive(Debug, Clone)]
pub enum WatchEvent {
Speech {
text: String,
confidence: f32,
timestamp: String,
},
Gesture {
gesture: String,
confidence: f32,
hand: String,
timestamp: String,
},
Error {
source: String,
message: String,
},
}
pub struct WatchCoordinator {
pub(crate) cancel: CancellationToken,
pub(crate) audio_handle: Option<JoinHandle<()>>,
pub(crate) camera_handle: Option<JoinHandle<()>>,
}
const EVENT_CHANNEL_CAPACITY: usize = 100;
impl WatchCoordinator {
#[must_use]
pub fn start(config: WatchConfig) -> (Self, mpsc::Receiver<WatchEvent>) {
let cancel = CancellationToken::new();
let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_CAPACITY);
let audio_handle = spawn_audio_watcher(&config, &cancel, event_tx.clone());
let camera_handle = spawn_camera_watcher(&config, &cancel, event_tx);
info!(
audio = config.audio_enabled,
camera = config.camera_enabled,
"watch coordinator started"
);
(
Self {
cancel,
audio_handle,
camera_handle,
},
event_rx,
)
}
pub async fn stop(self) {
self.cancel.cancel();
debug!("watch coordinator: cancellation signalled");
if let Some(h) = self.audio_handle {
let _ = h.await;
}
if let Some(h) = self.camera_handle {
let _ = h.await;
}
info!("watch coordinator stopped");
}
#[must_use]
pub fn is_stopped(&self) -> bool {
self.cancel.is_cancelled()
}
#[must_use]
pub fn status(&self) -> WatchStatus {
WatchStatus {
audio_running: self.audio_handle.is_some() && !self.is_stopped(),
camera_running: self.camera_handle.is_some() && !self.is_stopped(),
}
}
}
#[derive(Debug, Clone)]
pub struct WatchStatus {
pub audio_running: bool,
pub camera_running: bool,
}
fn spawn_audio_watcher(
config: &WatchConfig,
cancel: &CancellationToken,
event_tx: mpsc::Sender<WatchEvent>,
) -> Option<JoinHandle<()>> {
if !config.audio_enabled {
return None;
}
let cfg = config.clone();
let tok = cancel.clone();
Some(tokio::spawn(async move {
audio_watcher::run_audio_watcher(cfg, event_tx, tok).await;
}))
}
fn spawn_camera_watcher(
config: &WatchConfig,
cancel: &CancellationToken,
event_tx: mpsc::Sender<WatchEvent>,
) -> Option<JoinHandle<()>> {
if !config.camera_enabled {
return None;
}
let cfg = config.clone();
let tok = cancel.clone();
Some(tokio::spawn(async move {
camera_watcher::run_camera_watcher(cfg, event_tx, tok).await;
}))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn watch_config_defaults_are_conservative() {
let cfg = WatchConfig::default();
assert!(!cfg.audio_enabled, "audio must be opt-in");
assert!(!cfg.camera_enabled, "camera must be opt-in");
}
#[test]
fn watch_config_defaults_have_sane_thresholds() {
let cfg = WatchConfig::default();
assert_eq!(cfg.audio_vad_threshold_db, -40.0);
assert_eq!(cfg.audio_window_secs, 5.0);
assert_eq!(cfg.camera_poll_interval_ms, 2000);
assert!((cfg.camera_motion_threshold - 0.05).abs() < 1e-6);
}
#[test]
fn watch_config_clone_is_independent() {
let mut cfg = WatchConfig::default();
let cfg2 = cfg.clone();
cfg.audio_enabled = true;
assert!(!cfg2.audio_enabled, "clone must be independent");
}
#[tokio::test]
async fn coordinator_starts_with_both_disabled_no_handles() {
let cfg = WatchConfig::default(); let (coord, _rx) = WatchCoordinator::start(cfg);
assert!(coord.audio_handle.is_none());
assert!(coord.camera_handle.is_none());
coord.stop().await;
}
#[tokio::test]
async fn coordinator_stop_marks_cancelled() {
let cfg = WatchConfig::default();
let (coord, _rx) = WatchCoordinator::start(cfg);
let cancel_clone = coord.cancel.clone();
assert!(!cancel_clone.is_cancelled());
coord.stop().await;
assert!(cancel_clone.is_cancelled());
}
#[tokio::test]
async fn coordinator_is_stopped_reflects_cancellation() {
let cfg = WatchConfig::default();
let (coord, _rx) = WatchCoordinator::start(cfg);
assert!(!coord.is_stopped());
let cancel = coord.cancel.clone();
cancel.cancel();
assert!(coord.is_stopped());
coord.stop().await;
}
#[tokio::test]
async fn coordinator_status_both_disabled() {
let cfg = WatchConfig::default();
let (coord, _rx) = WatchCoordinator::start(cfg);
let status = coord.status();
assert!(!status.audio_running);
assert!(!status.camera_running);
coord.stop().await;
}
#[test]
fn event_channel_capacity_is_100() {
assert_eq!(EVENT_CHANNEL_CAPACITY, 100);
}
#[tokio::test]
async fn event_channel_drops_on_full_without_blocking() {
let (tx, mut rx) = mpsc::channel::<WatchEvent>(100);
let mut sent = 0usize;
let mut dropped = 0usize;
for i in 0..105 {
let event = WatchEvent::Error {
source: "test".into(),
message: format!("event {i}"),
};
if tx.try_send(event).is_ok() {
sent += 1;
} else {
dropped += 1;
}
}
assert_eq!(sent, 100);
assert_eq!(dropped, 5);
let mut count = 0;
while rx.try_recv().is_ok() {
count += 1;
}
assert_eq!(count, 100);
}
#[test]
fn watch_event_speech_fields_are_accessible() {
let e = WatchEvent::Speech {
text: "hello".into(),
confidence: 0.9,
timestamp: "2026-03-20T00:00:00Z".into(),
};
if let WatchEvent::Speech {
text, confidence, ..
} = e
{
assert_eq!(text, "hello");
assert!((confidence - 0.9).abs() < 1e-6);
} else {
panic!("wrong variant");
}
}
#[test]
fn watch_event_gesture_fields_are_accessible() {
let e = WatchEvent::Gesture {
gesture: "thumbs_up".into(),
confidence: 0.95,
hand: "right".into(),
timestamp: "2026-03-20T00:00:00Z".into(),
};
if let WatchEvent::Gesture {
gesture,
confidence,
hand,
..
} = e
{
assert_eq!(gesture, "thumbs_up");
assert!(confidence > 0.9);
assert_eq!(hand, "right");
} else {
panic!("wrong variant");
}
}
#[test]
fn watch_event_error_fields_are_accessible() {
let e = WatchEvent::Error {
source: "audio_watcher".into(),
message: "mic unavailable".into(),
};
if let WatchEvent::Error { source, message } = e {
assert_eq!(source, "audio_watcher");
assert!(message.contains("mic"));
} else {
panic!("wrong variant");
}
}
}