code_mesh_core/tool/
file_watcher.rs

1//! File watching system for live updates
2//! Provides real-time notifications when files or directories change
3
4use async_trait::async_trait;
5use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::time::{Duration, SystemTime};
12use tokio::sync::{mpsc, RwLock};
13use uuid::Uuid;
14
15use super::{Tool, ToolContext, ToolResult, ToolError};
16
17/// File watcher tool for monitoring file system changes
18pub struct FileWatcherTool {
19    active_watchers: Arc<RwLock<HashMap<String, WatcherInstance>>>,
20}
21
22#[derive(Debug, Deserialize)]
23struct WatchParams {
24    path: String,
25    #[serde(default)]
26    recursive: bool,
27    #[serde(default)]
28    patterns: Option<Vec<String>>,
29    #[serde(default)]
30    ignore_patterns: Option<Vec<String>>,
31    #[serde(default)]
32    debounce_ms: Option<u64>,
33}
34
35#[derive(Debug, Serialize, Clone)]
36pub struct FileChangeEvent {
37    pub event_id: String,
38    pub timestamp: SystemTime,
39    pub event_type: String,
40    pub paths: Vec<PathBuf>,
41    pub details: HashMap<String, Value>,
42}
43
44struct WatcherInstance {
45    watcher_id: String,
46    _watcher: RecommendedWatcher,
47    event_sender: mpsc::UnboundedSender<FileChangeEvent>,
48    patterns: Option<Vec<String>>,
49    ignore_patterns: Option<Vec<String>>,
50}
51
52impl Default for FileWatcherTool {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl FileWatcherTool {
59    pub fn new() -> Self {
60        Self {
61            active_watchers: Arc::new(RwLock::new(HashMap::new())),
62        }
63    }
64    
65    /// Start watching a path for changes
66    pub async fn start_watching(
67        &self,
68        path: impl AsRef<Path>,
69        recursive: bool,
70        patterns: Option<Vec<String>>,
71        ignore_patterns: Option<Vec<String>>,
72    ) -> Result<(String, mpsc::UnboundedReceiver<FileChangeEvent>), ToolError> {
73        let path = path.as_ref();
74        let watcher_id = Uuid::new_v4().to_string();
75        
76        // Create event channel
77        let (event_tx, event_rx) = mpsc::unbounded_channel();
78        
79        // Create the watcher
80        let event_tx_clone = event_tx.clone();
81        let mut watcher = notify::recommended_watcher(move |result: Result<Event, notify::Error>| {
82            match result {
83                Ok(event) => {
84                    let file_event = FileChangeEvent {
85                        event_id: Uuid::new_v4().to_string(),
86                        timestamp: SystemTime::now(),
87                        event_type: format!("{:?}", event.kind),
88                        paths: event.paths,
89                        details: HashMap::new(),
90                    };
91                    
92                    if let Err(e) = event_tx_clone.send(file_event) {
93                        tracing::warn!("Failed to send file change event: {}", e);
94                    }
95                }
96                Err(e) => {
97                    tracing::error!("File watcher error: {}", e);
98                }
99            }
100        })
101        .map_err(|e| ToolError::ExecutionFailed(format!("Failed to create watcher: {}", e)))?;
102        
103        // Start watching
104        let mode = if recursive {
105            RecursiveMode::Recursive
106        } else {
107            RecursiveMode::NonRecursive
108        };
109        
110        watcher.watch(path, mode)
111            .map_err(|e| ToolError::ExecutionFailed(format!("Failed to start watching: {}", e)))?;
112        
113        // Store the watcher instance
114        let instance = WatcherInstance {
115            watcher_id: watcher_id.clone(),
116            _watcher: watcher,
117            event_sender: event_tx,
118            patterns: patterns.clone(),
119            ignore_patterns: ignore_patterns.clone(),
120        };
121        
122        {
123            let mut watchers = self.active_watchers.write().await;
124            watchers.insert(watcher_id.clone(), instance);
125        }
126        
127        Ok((watcher_id, event_rx))
128    }
129    
130    /// Stop watching a path
131    pub async fn stop_watching(&self, watcher_id: &str) -> Result<(), ToolError> {
132        let mut watchers = self.active_watchers.write().await;
133        if watchers.remove(watcher_id).is_some() {
134            Ok(())
135        } else {
136            Err(ToolError::ExecutionFailed(format!(
137                "Watcher {} not found",
138                watcher_id
139            )))
140        }
141    }
142    
143    /// Get list of active watchers
144    pub async fn list_watchers(&self) -> Vec<String> {
145        let watchers = self.active_watchers.read().await;
146        watchers.keys().cloned().collect()
147    }
148    
149    /// Check if a file change matches the given patterns
150    fn matches_patterns(
151        path: &Path,
152        patterns: &Option<Vec<String>>,
153        ignore_patterns: &Option<Vec<String>>,
154    ) -> bool {
155        let path_str = path.to_string_lossy();
156        
157        // Check ignore patterns first
158        if let Some(ignore) = ignore_patterns {
159            for pattern in ignore {
160                if glob::Pattern::new(pattern)
161                    .map(|p| p.matches(&path_str))
162                    .unwrap_or(false)
163                {
164                    return false;
165                }
166            }
167        }
168        
169        // Check include patterns
170        if let Some(include) = patterns {
171            for pattern in include {
172                if glob::Pattern::new(pattern)
173                    .map(|p| p.matches(&path_str))
174                    .unwrap_or(false)
175                {
176                    return true;
177                }
178            }
179            false // If patterns specified but none match
180        } else {
181            true // No patterns specified, match all
182        }
183    }
184}
185
186#[async_trait]
187impl Tool for FileWatcherTool {
188    fn id(&self) -> &str {
189        "file_watcher"
190    }
191    
192    fn description(&self) -> &str {
193        "Monitor file system changes with pattern matching and filtering"
194    }
195    
196    fn parameters_schema(&self) -> Value {
197        json!({
198            "type": "object",
199            "properties": {
200                "path": {
201                    "type": "string",
202                    "description": "Path to watch for changes"
203                },
204                "recursive": {
205                    "type": "boolean",
206                    "description": "Watch subdirectories recursively",
207                    "default": false
208                },
209                "patterns": {
210                    "type": "array",
211                    "items": {
212                        "type": "string"
213                    },
214                    "description": "Glob patterns to match files (e.g., ['*.rs', '*.js'])"
215                },
216                "ignorePatterns": {
217                    "type": "array",
218                    "items": {
219                        "type": "string"
220                    },
221                    "description": "Glob patterns to ignore (e.g., ['*.tmp', 'node_modules/**'])"
222                },
223                "debounceMs": {
224                    "type": "number",
225                    "description": "Debounce delay in milliseconds to group rapid changes",
226                    "minimum": 0,
227                    "maximum": 10000
228                }
229            },
230            "required": ["path"]
231        })
232    }
233    
234    async fn execute(
235        &self,
236        args: Value,
237        ctx: ToolContext,
238    ) -> Result<ToolResult, ToolError> {
239        let params: WatchParams = serde_json::from_value(args)
240            .map_err(|e| ToolError::InvalidParameters(e.to_string()))?;
241        
242        // Resolve path
243        let watch_path = if PathBuf::from(&params.path).is_absolute() {
244            PathBuf::from(&params.path)
245        } else {
246            ctx.working_directory.join(&params.path)
247        };
248        
249        // Validate path exists
250        if !watch_path.exists() {
251            return Err(ToolError::ExecutionFailed(format!(
252                "Path does not exist: {}",
253                watch_path.display()
254            )));
255        }
256        
257        // Start watching
258        let (watcher_id, mut event_rx) = self.start_watching(
259            &watch_path,
260            params.recursive,
261            params.patterns.clone(),
262            params.ignore_patterns.clone(),
263        ).await?;
264        
265        // For this demonstration, we'll watch for a short time and return events
266        // In a real implementation, this would be managed differently
267        let watch_duration = Duration::from_millis(params.debounce_ms.unwrap_or(1000));
268        let start_time = SystemTime::now();
269        let mut events = Vec::new();
270        
271        // Collect events for the specified duration
272        while start_time.elapsed().unwrap_or_default() < watch_duration {
273            if *ctx.abort_signal.borrow() {
274                self.stop_watching(&watcher_id).await.ok();
275                return Err(ToolError::Aborted);
276            }
277            
278            match tokio::time::timeout(Duration::from_millis(100), event_rx.recv()).await {
279                Ok(Some(event)) => {
280                    // Filter event based on patterns
281                    let matching_paths: Vec<_> = event.paths.iter()
282                        .filter(|path| Self::matches_patterns(
283                            path,
284                            &params.patterns,
285                            &params.ignore_patterns
286                        ))
287                        .cloned()
288                        .collect();
289                    
290                    if !matching_paths.is_empty() {
291                        let filtered_event = FileChangeEvent {
292                            event_id: event.event_id,
293                            timestamp: event.timestamp,
294                            event_type: event.event_type,
295                            paths: matching_paths,
296                            details: event.details,
297                        };
298                        events.push(filtered_event);
299                    }
300                }
301                Ok(None) => break, // Channel closed
302                Err(_) => continue, // Timeout, continue watching
303            }
304        }
305        
306        // Stop watching
307        self.stop_watching(&watcher_id).await.ok();
308        
309        // Calculate relative paths for display
310        let relative_path = watch_path
311            .strip_prefix(&ctx.working_directory)
312            .unwrap_or(&watch_path)
313            .to_string_lossy()
314            .to_string();
315        
316        let metadata = json!({
317            "watcher_id": watcher_id,
318            "path": watch_path.to_string_lossy(),
319            "relative_path": relative_path,
320            "recursive": params.recursive,
321            "patterns": params.patterns,
322            "ignore_patterns": params.ignore_patterns,
323            "watch_duration_ms": watch_duration.as_millis(),
324            "events_collected": events.len(),
325            "events": events.iter().map(|e| json!({
326                "event_id": e.event_id,
327                "timestamp": e.timestamp.duration_since(SystemTime::UNIX_EPOCH)
328                    .unwrap_or_default().as_secs(),
329                "event_type": e.event_type,
330                "paths": e.paths.iter().map(|p| p.to_string_lossy()).collect::<Vec<_>>()
331            })).collect::<Vec<_>>()
332        });
333        
334        let output = if events.is_empty() {
335            format!(
336                "No file changes detected in {} during {}ms watch period",
337                relative_path,
338                watch_duration.as_millis()
339            )
340        } else {
341            let mut output_lines = vec![
342                format!(
343                    "Detected {} file change{} in {} during {}ms watch period:",
344                    events.len(),
345                    if events.len() == 1 { "" } else { "s" },
346                    relative_path,
347                    watch_duration.as_millis()
348                )
349            ];
350            
351            for event in &events {
352                output_lines.push(format!(
353                    "  - {}: {}",
354                    event.event_type,
355                    event.paths.iter()
356                        .map(|p| p.to_string_lossy())
357                        .collect::<Vec<_>>()
358                        .join(", ")
359                ));
360            }
361            
362            output_lines.join("\n")
363        };
364        
365        Ok(ToolResult {
366            title: format!("Watched {} for {}ms", relative_path, watch_duration.as_millis()),
367            metadata,
368            output,
369        })
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use tempfile::TempDir;
377    use tokio::fs;
378    
379    #[tokio::test]
380    async fn test_file_watcher_creation() {
381        let watcher = FileWatcherTool::new();
382        assert!(watcher.list_watchers().await.is_empty());
383    }
384    
385    #[tokio::test]
386    async fn test_pattern_matching() {
387        let path = Path::new("test.rs");
388        let patterns = Some(vec!["*.rs".to_string()]);
389        let ignore_patterns = Some(vec!["*.tmp".to_string()]);
390        
391        assert!(FileWatcherTool::matches_patterns(&path, &patterns, &ignore_patterns));
392        
393        let ignored_path = Path::new("test.tmp");
394        assert!(!FileWatcherTool::matches_patterns(&ignored_path, &patterns, &ignore_patterns));
395    }
396    
397    #[tokio::test]
398    async fn test_file_watcher_tool() {
399        let temp_dir = TempDir::new().unwrap();
400        let temp_path = temp_dir.path().to_path_buf();
401        
402        let tool = FileWatcherTool::new();
403        let params = json!({
404            "path": temp_path.to_string_lossy(),
405            "recursive": false,
406            "patterns": ["*.txt"],
407            "debounceMs": 500
408        });
409        
410        let ctx = ToolContext {
411            session_id: "test".to_string(),
412            message_id: "test".to_string(),
413            abort_signal: tokio::sync::watch::channel(false).1,
414            working_directory: std::env::current_dir().unwrap(),
415        };
416        
417        // Start watching in background
418        let tool_clone = tool.clone();
419        let params_clone = params.clone();
420        let ctx_clone = ctx.clone();
421        
422        let watch_task = tokio::spawn(async move {
423            tool_clone.execute(params_clone, ctx_clone).await
424        });
425        
426        // Give watcher time to start
427        tokio::time::sleep(Duration::from_millis(100)).await;
428        
429        // Create a file to trigger an event
430        let test_file = temp_path.join("test.txt");
431        fs::write(&test_file, "test content").await.unwrap();
432        
433        // Wait for watcher to complete
434        let result = watch_task.await.unwrap();
435        
436        // Note: Due to timing, the test might not catch the file creation
437        // In a real scenario, events would be handled asynchronously
438        assert!(result.is_ok());
439    }
440}