use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::time::{Duration, Instant};
const DEFAULT_DEBOUNCE_MS: u64 = 200;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PluginReloadEvent {
pub wasm_path: PathBuf,
}
pub struct PluginWatcher {
_watcher: RecommendedWatcher,
receiver: mpsc::Receiver<PluginReloadEvent>,
}
impl std::fmt::Debug for PluginWatcher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PluginWatcher")
.field("active", &true)
.finish()
}
}
impl PluginWatcher {
pub fn start(dir: &Path) -> anyhow::Result<Self> {
Self::start_with_debounce(dir, Duration::from_millis(DEFAULT_DEBOUNCE_MS))
}
pub fn start_with_debounce(dir: &Path, debounce: Duration) -> anyhow::Result<Self> {
if !dir.exists() {
anyhow::bail!("plugin watch directory does not exist: {}", dir.display());
}
let (raw_tx, raw_rx) = mpsc::channel::<PathBuf>();
let (debounced_tx, debounced_rx) = mpsc::channel::<PluginReloadEvent>();
let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
let event = match res {
Ok(e) => e,
Err(e) => {
tracing::warn!("plugin watcher error: {e}");
return;
}
};
match event.kind {
EventKind::Create(_) | EventKind::Modify(_) => {
for path in &event.paths {
if path.extension().and_then(|e| e.to_str()) == Some("wasm") {
let _ = raw_tx.send(path.clone());
}
}
}
_ => {}
}
})?;
watcher.watch(dir, RecursiveMode::Recursive)?;
tracing::info!("plugin watcher started for {}", dir.display());
std::thread::Builder::new()
.name("plugin-watcher-debounce".into())
.spawn(move || {
let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
loop {
let next_deadline = pending.values().min().copied();
let wait = next_deadline
.map(|deadline| {
deadline
.checked_duration_since(Instant::now())
.unwrap_or(Duration::ZERO)
})
.unwrap_or(Duration::from_secs(1));
match raw_rx.recv_timeout(wait) {
Ok(path) => {
pending.insert(path, Instant::now() + debounce);
}
Err(mpsc::RecvTimeoutError::Timeout) => {
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
for (path, _) in pending.drain() {
let _ = debounced_tx.send(PluginReloadEvent { wasm_path: path });
}
return;
}
}
let now = Instant::now();
let expired: Vec<PathBuf> = pending
.iter()
.filter(|(_, deadline)| now >= **deadline)
.map(|(path, _)| path.clone())
.collect();
for path in expired {
pending.remove(&path);
if debounced_tx
.send(PluginReloadEvent { wasm_path: path })
.is_err()
{
return; }
}
}
})?;
Ok(Self {
_watcher: watcher,
receiver: debounced_rx,
})
}
pub fn try_recv(&self) -> Option<PluginReloadEvent> {
self.receiver.try_recv().ok()
}
pub fn recv_timeout(&self, timeout: Duration) -> Option<PluginReloadEvent> {
self.receiver.recv_timeout(timeout).ok()
}
pub fn drain(&self) -> Vec<PluginReloadEvent> {
let mut events = Vec::new();
while let Ok(event) = self.receiver.try_recv() {
events.push(event);
}
events
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
#[test]
fn watcher_rejects_missing_directory() {
let tmp = tempfile::tempdir().expect("temp dir");
let missing = tmp.path().join("nonexistent");
let err = PluginWatcher::start(&missing).expect_err("missing dir should fail");
assert!(err.to_string().contains("does not exist"));
}
#[test]
fn watcher_detects_wasm_file_creation() {
let tmp = tempfile::tempdir().expect("temp dir");
let watch_dir = tmp.path().join("plugins");
fs::create_dir_all(&watch_dir).expect("dir should be created");
let watcher = PluginWatcher::start(&watch_dir).expect("watcher should start");
let wasm_path = watch_dir.join("test.wasm");
fs::write(&wasm_path, b"\0asm\x01\0\0\0").expect("write wasm");
let event = watcher.recv_timeout(Duration::from_secs(5));
assert!(event.is_some(), "watcher should detect .wasm creation");
let expected = wasm_path.canonicalize().unwrap_or(wasm_path);
let actual_path = event.unwrap().wasm_path;
let actual = actual_path.canonicalize().unwrap_or(actual_path);
assert_eq!(actual, expected);
}
#[test]
fn watcher_ignores_non_wasm_files() {
let tmp = tempfile::tempdir().expect("temp dir");
let watch_dir = tmp.path().join("plugins");
fs::create_dir_all(&watch_dir).expect("dir should be created");
let watcher = PluginWatcher::start(&watch_dir).expect("watcher should start");
fs::write(watch_dir.join("readme.txt"), b"hello").expect("write txt");
std::thread::sleep(Duration::from_millis(500));
let events = watcher.drain();
assert!(
events.is_empty(),
"watcher should not report non-wasm files"
);
}
#[test]
fn watcher_detects_wasm_modification() {
let tmp = tempfile::tempdir().expect("temp dir");
let watch_dir = tmp.path().join("plugins");
fs::create_dir_all(&watch_dir).expect("dir should be created");
let wasm_path = watch_dir.join("test.wasm");
fs::write(&wasm_path, b"\0asm\x01\0\0\0").expect("initial write");
let watcher = PluginWatcher::start(&watch_dir).expect("watcher should start");
std::thread::sleep(Duration::from_millis(100));
fs::write(&wasm_path, b"\0asm\x01\0\0\0\x01").expect("modify wasm");
let event = watcher.recv_timeout(Duration::from_secs(5));
assert!(event.is_some(), "watcher should detect .wasm modification");
}
#[test]
fn watcher_debounces_rapid_writes() {
let tmp = tempfile::tempdir().expect("temp dir");
let watch_dir = tmp.path().join("plugins");
fs::create_dir_all(&watch_dir).expect("dir should be created");
let watcher = PluginWatcher::start_with_debounce(&watch_dir, Duration::from_millis(300))
.expect("watcher should start");
let wasm_path = watch_dir.join("rapid.wasm");
for i in 0..5 {
let content = format!("\0asm\x01\0\0\0{}", i);
fs::write(&wasm_path, content.as_bytes()).expect("write wasm");
std::thread::sleep(Duration::from_millis(20));
}
std::thread::sleep(Duration::from_millis(600));
let events = watcher.drain();
assert_eq!(
events.len(),
1,
"rapid writes should be debounced into a single event, got {}",
events.len()
);
}
}