mcp_methods/server/
watch.rs1#![allow(dead_code)]
15
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::{Context, Result};
21use notify_debouncer_mini::notify::RecursiveMode;
22use notify_debouncer_mini::{new_debouncer, DebounceEventResult, Debouncer};
23
24pub type ChangeHandler = Arc<dyn Fn(&[PathBuf]) + Send + Sync>;
30
31pub const DEFAULT_DEBOUNCE: Duration = Duration::from_millis(500);
34
35pub struct WatchHandle {
37 _debouncer: Debouncer<notify_debouncer_mini::notify::RecommendedWatcher>,
38}
39
40pub fn watch(
46 dir: &Path,
47 on_change: Option<ChangeHandler>,
48 debounce: Option<Duration>,
49) -> Result<WatchHandle> {
50 if !dir.is_dir() {
51 anyhow::bail!("--watch path is not a directory: {}", dir.display());
52 }
53 let debounce = debounce.unwrap_or(DEFAULT_DEBOUNCE);
54 let dir_for_log = dir.to_path_buf();
55 let on_change = on_change.unwrap_or_else(|| {
56 Arc::new(|_| {
57 })
59 });
60
61 let mut debouncer = new_debouncer(debounce, move |result: DebounceEventResult| match result {
62 Ok(events) => {
63 let paths: Vec<PathBuf> = events.into_iter().map(|e| e.path).collect();
64 tracing::info!(
65 root = %dir_for_log.display(),
66 changed = paths.len(),
67 "watch: file change debounced"
68 );
69 on_change(&paths);
70 }
71 Err(e) => {
72 tracing::warn!(error = %e, "watch: error from notify");
73 }
74 })
75 .context("failed to construct file-system debouncer")?;
76
77 debouncer
78 .watcher()
79 .watch(dir, RecursiveMode::Recursive)
80 .with_context(|| format!("failed to watch {}", dir.display()))?;
81
82 tracing::info!(root = %dir.display(), debounce_ms = debounce.as_millis() as u64, "watch: active");
83 Ok(WatchHandle {
84 _debouncer: debouncer,
85 })
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use std::sync::atomic::{AtomicUsize, Ordering};
92
93 #[test]
94 fn watch_rejects_non_directory() {
95 let result = watch(Path::new("/this/does/not/exist"), None, None);
96 assert!(result.is_err());
97 }
98
99 #[test]
100 fn watch_starts_and_drops_clean() {
101 let dir = tempfile::tempdir().unwrap();
102 let _handle = watch(dir.path(), None, Some(Duration::from_millis(100))).unwrap();
103 }
105
106 #[test]
107 fn callback_fires_on_file_change() {
108 use std::thread::sleep;
109 let dir = tempfile::tempdir().unwrap();
110 let counter = Arc::new(AtomicUsize::new(0));
111 let counter_for_cb = counter.clone();
112 let cb: ChangeHandler = Arc::new(move |_paths: &[PathBuf]| {
113 counter_for_cb.fetch_add(1, Ordering::SeqCst);
114 });
115 let _handle = watch(dir.path(), Some(cb), Some(Duration::from_millis(100))).unwrap();
116 sleep(Duration::from_millis(50)); std::fs::write(dir.path().join("a.txt"), "hi").unwrap();
118 sleep(Duration::from_millis(400)); assert!(
120 counter.load(Ordering::SeqCst) >= 1,
121 "expected callback to fire at least once after file write"
122 );
123 }
124}