1use async_trait::async_trait;
5use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::time::{Duration, SystemTime};
12use tokio::sync::{mpsc, RwLock};
13use uuid::Uuid;
14
15use super::{Tool, ToolContext, ToolResult, ToolError};
16
17pub struct FileWatcherTool {
19 active_watchers: Arc<RwLock<HashMap<String, WatcherInstance>>>,
20}
21
22#[derive(Debug, Deserialize)]
23struct WatchParams {
24 path: String,
25 #[serde(default)]
26 recursive: bool,
27 #[serde(default)]
28 patterns: Option<Vec<String>>,
29 #[serde(default)]
30 ignore_patterns: Option<Vec<String>>,
31 #[serde(default)]
32 debounce_ms: Option<u64>,
33}
34
35#[derive(Debug, Serialize, Clone)]
36pub struct FileChangeEvent {
37 pub event_id: String,
38 pub timestamp: SystemTime,
39 pub event_type: String,
40 pub paths: Vec<PathBuf>,
41 pub details: HashMap<String, Value>,
42}
43
44struct WatcherInstance {
45 watcher_id: String,
46 _watcher: RecommendedWatcher,
47 event_sender: mpsc::UnboundedSender<FileChangeEvent>,
48 patterns: Option<Vec<String>>,
49 ignore_patterns: Option<Vec<String>>,
50}
51
52impl Default for FileWatcherTool {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl FileWatcherTool {
59 pub fn new() -> Self {
60 Self {
61 active_watchers: Arc::new(RwLock::new(HashMap::new())),
62 }
63 }
64
65 pub async fn start_watching(
67 &self,
68 path: impl AsRef<Path>,
69 recursive: bool,
70 patterns: Option<Vec<String>>,
71 ignore_patterns: Option<Vec<String>>,
72 ) -> Result<(String, mpsc::UnboundedReceiver<FileChangeEvent>), ToolError> {
73 let path = path.as_ref();
74 let watcher_id = Uuid::new_v4().to_string();
75
76 let (event_tx, event_rx) = mpsc::unbounded_channel();
78
79 let event_tx_clone = event_tx.clone();
81 let mut watcher = notify::recommended_watcher(move |result: Result<Event, notify::Error>| {
82 match result {
83 Ok(event) => {
84 let file_event = FileChangeEvent {
85 event_id: Uuid::new_v4().to_string(),
86 timestamp: SystemTime::now(),
87 event_type: format!("{:?}", event.kind),
88 paths: event.paths,
89 details: HashMap::new(),
90 };
91
92 if let Err(e) = event_tx_clone.send(file_event) {
93 tracing::warn!("Failed to send file change event: {}", e);
94 }
95 }
96 Err(e) => {
97 tracing::error!("File watcher error: {}", e);
98 }
99 }
100 })
101 .map_err(|e| ToolError::ExecutionFailed(format!("Failed to create watcher: {}", e)))?;
102
103 let mode = if recursive {
105 RecursiveMode::Recursive
106 } else {
107 RecursiveMode::NonRecursive
108 };
109
110 watcher.watch(path, mode)
111 .map_err(|e| ToolError::ExecutionFailed(format!("Failed to start watching: {}", e)))?;
112
113 let instance = WatcherInstance {
115 watcher_id: watcher_id.clone(),
116 _watcher: watcher,
117 event_sender: event_tx,
118 patterns: patterns.clone(),
119 ignore_patterns: ignore_patterns.clone(),
120 };
121
122 {
123 let mut watchers = self.active_watchers.write().await;
124 watchers.insert(watcher_id.clone(), instance);
125 }
126
127 Ok((watcher_id, event_rx))
128 }
129
130 pub async fn stop_watching(&self, watcher_id: &str) -> Result<(), ToolError> {
132 let mut watchers = self.active_watchers.write().await;
133 if watchers.remove(watcher_id).is_some() {
134 Ok(())
135 } else {
136 Err(ToolError::ExecutionFailed(format!(
137 "Watcher {} not found",
138 watcher_id
139 )))
140 }
141 }
142
143 pub async fn list_watchers(&self) -> Vec<String> {
145 let watchers = self.active_watchers.read().await;
146 watchers.keys().cloned().collect()
147 }
148
149 fn matches_patterns(
151 path: &Path,
152 patterns: &Option<Vec<String>>,
153 ignore_patterns: &Option<Vec<String>>,
154 ) -> bool {
155 let path_str = path.to_string_lossy();
156
157 if let Some(ignore) = ignore_patterns {
159 for pattern in ignore {
160 if glob::Pattern::new(pattern)
161 .map(|p| p.matches(&path_str))
162 .unwrap_or(false)
163 {
164 return false;
165 }
166 }
167 }
168
169 if let Some(include) = patterns {
171 for pattern in include {
172 if glob::Pattern::new(pattern)
173 .map(|p| p.matches(&path_str))
174 .unwrap_or(false)
175 {
176 return true;
177 }
178 }
179 false } else {
181 true }
183 }
184}
185
186#[async_trait]
187impl Tool for FileWatcherTool {
188 fn id(&self) -> &str {
189 "file_watcher"
190 }
191
192 fn description(&self) -> &str {
193 "Monitor file system changes with pattern matching and filtering"
194 }
195
196 fn parameters_schema(&self) -> Value {
197 json!({
198 "type": "object",
199 "properties": {
200 "path": {
201 "type": "string",
202 "description": "Path to watch for changes"
203 },
204 "recursive": {
205 "type": "boolean",
206 "description": "Watch subdirectories recursively",
207 "default": false
208 },
209 "patterns": {
210 "type": "array",
211 "items": {
212 "type": "string"
213 },
214 "description": "Glob patterns to match files (e.g., ['*.rs', '*.js'])"
215 },
216 "ignorePatterns": {
217 "type": "array",
218 "items": {
219 "type": "string"
220 },
221 "description": "Glob patterns to ignore (e.g., ['*.tmp', 'node_modules/**'])"
222 },
223 "debounceMs": {
224 "type": "number",
225 "description": "Debounce delay in milliseconds to group rapid changes",
226 "minimum": 0,
227 "maximum": 10000
228 }
229 },
230 "required": ["path"]
231 })
232 }
233
234 async fn execute(
235 &self,
236 args: Value,
237 ctx: ToolContext,
238 ) -> Result<ToolResult, ToolError> {
239 let params: WatchParams = serde_json::from_value(args)
240 .map_err(|e| ToolError::InvalidParameters(e.to_string()))?;
241
242 let watch_path = if PathBuf::from(¶ms.path).is_absolute() {
244 PathBuf::from(¶ms.path)
245 } else {
246 ctx.working_directory.join(¶ms.path)
247 };
248
249 if !watch_path.exists() {
251 return Err(ToolError::ExecutionFailed(format!(
252 "Path does not exist: {}",
253 watch_path.display()
254 )));
255 }
256
257 let (watcher_id, mut event_rx) = self.start_watching(
259 &watch_path,
260 params.recursive,
261 params.patterns.clone(),
262 params.ignore_patterns.clone(),
263 ).await?;
264
265 let watch_duration = Duration::from_millis(params.debounce_ms.unwrap_or(1000));
268 let start_time = SystemTime::now();
269 let mut events = Vec::new();
270
271 while start_time.elapsed().unwrap_or_default() < watch_duration {
273 if *ctx.abort_signal.borrow() {
274 self.stop_watching(&watcher_id).await.ok();
275 return Err(ToolError::Aborted);
276 }
277
278 match tokio::time::timeout(Duration::from_millis(100), event_rx.recv()).await {
279 Ok(Some(event)) => {
280 let matching_paths: Vec<_> = event.paths.iter()
282 .filter(|path| Self::matches_patterns(
283 path,
284 ¶ms.patterns,
285 ¶ms.ignore_patterns
286 ))
287 .cloned()
288 .collect();
289
290 if !matching_paths.is_empty() {
291 let filtered_event = FileChangeEvent {
292 event_id: event.event_id,
293 timestamp: event.timestamp,
294 event_type: event.event_type,
295 paths: matching_paths,
296 details: event.details,
297 };
298 events.push(filtered_event);
299 }
300 }
301 Ok(None) => break, Err(_) => continue, }
304 }
305
306 self.stop_watching(&watcher_id).await.ok();
308
309 let relative_path = watch_path
311 .strip_prefix(&ctx.working_directory)
312 .unwrap_or(&watch_path)
313 .to_string_lossy()
314 .to_string();
315
316 let metadata = json!({
317 "watcher_id": watcher_id,
318 "path": watch_path.to_string_lossy(),
319 "relative_path": relative_path,
320 "recursive": params.recursive,
321 "patterns": params.patterns,
322 "ignore_patterns": params.ignore_patterns,
323 "watch_duration_ms": watch_duration.as_millis(),
324 "events_collected": events.len(),
325 "events": events.iter().map(|e| json!({
326 "event_id": e.event_id,
327 "timestamp": e.timestamp.duration_since(SystemTime::UNIX_EPOCH)
328 .unwrap_or_default().as_secs(),
329 "event_type": e.event_type,
330 "paths": e.paths.iter().map(|p| p.to_string_lossy()).collect::<Vec<_>>()
331 })).collect::<Vec<_>>()
332 });
333
334 let output = if events.is_empty() {
335 format!(
336 "No file changes detected in {} during {}ms watch period",
337 relative_path,
338 watch_duration.as_millis()
339 )
340 } else {
341 let mut output_lines = vec![
342 format!(
343 "Detected {} file change{} in {} during {}ms watch period:",
344 events.len(),
345 if events.len() == 1 { "" } else { "s" },
346 relative_path,
347 watch_duration.as_millis()
348 )
349 ];
350
351 for event in &events {
352 output_lines.push(format!(
353 " - {}: {}",
354 event.event_type,
355 event.paths.iter()
356 .map(|p| p.to_string_lossy())
357 .collect::<Vec<_>>()
358 .join(", ")
359 ));
360 }
361
362 output_lines.join("\n")
363 };
364
365 Ok(ToolResult {
366 title: format!("Watched {} for {}ms", relative_path, watch_duration.as_millis()),
367 metadata,
368 output,
369 })
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use tempfile::TempDir;
377 use tokio::fs;
378
379 #[tokio::test]
380 async fn test_file_watcher_creation() {
381 let watcher = FileWatcherTool::new();
382 assert!(watcher.list_watchers().await.is_empty());
383 }
384
385 #[tokio::test]
386 async fn test_pattern_matching() {
387 let path = Path::new("test.rs");
388 let patterns = Some(vec!["*.rs".to_string()]);
389 let ignore_patterns = Some(vec!["*.tmp".to_string()]);
390
391 assert!(FileWatcherTool::matches_patterns(&path, &patterns, &ignore_patterns));
392
393 let ignored_path = Path::new("test.tmp");
394 assert!(!FileWatcherTool::matches_patterns(&ignored_path, &patterns, &ignore_patterns));
395 }
396
397 #[tokio::test]
398 async fn test_file_watcher_tool() {
399 let temp_dir = TempDir::new().unwrap();
400 let temp_path = temp_dir.path().to_path_buf();
401
402 let tool = FileWatcherTool::new();
403 let params = json!({
404 "path": temp_path.to_string_lossy(),
405 "recursive": false,
406 "patterns": ["*.txt"],
407 "debounceMs": 500
408 });
409
410 let ctx = ToolContext {
411 session_id: "test".to_string(),
412 message_id: "test".to_string(),
413 abort_signal: tokio::sync::watch::channel(false).1,
414 working_directory: std::env::current_dir().unwrap(),
415 };
416
417 let tool_clone = tool.clone();
419 let params_clone = params.clone();
420 let ctx_clone = ctx.clone();
421
422 let watch_task = tokio::spawn(async move {
423 tool_clone.execute(params_clone, ctx_clone).await
424 });
425
426 tokio::time::sleep(Duration::from_millis(100)).await;
428
429 let test_file = temp_path.join("test.txt");
431 fs::write(&test_file, "test content").await.unwrap();
432
433 let result = watch_task.await.unwrap();
435
436 assert!(result.is_ok());
439 }
440}