Skip to main content

mofa_plugins/hot_reload/
watcher.rs

1//! File system watcher for plugin changes
2//!
3//! Monitors plugin directories for file changes and triggers reload events
4
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::Duration;
9
10use notify::{
11    Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
12    event::{CreateKind, ModifyKind, RemoveKind, RenameMode},
13};
14use tokio::sync::{RwLock, mpsc};
15use tracing::{debug, error, info, warn};
16
17/// Watch event kinds
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum WatchEventKind {
20    /// Plugin file created
21    Created,
22    /// Plugin file modified
23    Modified,
24    /// Plugin file removed
25    Removed,
26    /// Plugin file renamed
27    Renamed { from: PathBuf, to: PathBuf },
28}
29
30/// Watch event
31#[derive(Debug, Clone)]
32pub struct WatchEvent {
33    /// Event kind
34    pub kind: WatchEventKind,
35    /// Affected path
36    pub path: PathBuf,
37    /// Timestamp
38    pub timestamp: std::time::Instant,
39}
40
41impl WatchEvent {
42    /// Create a new watch event
43    pub fn new(kind: WatchEventKind, path: PathBuf) -> Self {
44        Self {
45            kind,
46            path,
47            timestamp: std::time::Instant::now(),
48        }
49    }
50
51    /// Check if this is a plugin file
52    pub fn is_plugin_file(&self) -> bool {
53        let ext = self.path.extension().and_then(|e| e.to_str());
54        matches!(ext, Some("so") | Some("dylib") | Some("dll"))
55    }
56}
57
58/// Watch configuration
59#[derive(Debug, Clone)]
60pub struct WatchConfig {
61    /// Debounce duration for rapid file changes
62    pub debounce_duration: Duration,
63    /// File extensions to watch
64    pub extensions: Vec<String>,
65    /// Whether to watch subdirectories
66    pub recursive: bool,
67    /// Ignore patterns (glob patterns)
68    pub ignore_patterns: Vec<String>,
69    /// Maximum events per second (rate limiting)
70    pub max_events_per_sec: u32,
71}
72
73impl Default for WatchConfig {
74    fn default() -> Self {
75        Self {
76            debounce_duration: Duration::from_millis(500),
77            extensions: vec!["so".to_string(), "dylib".to_string(), "dll".to_string()],
78            recursive: false,
79            ignore_patterns: vec!["*.tmp".to_string(), "*.swp".to_string(), "*~".to_string()],
80            max_events_per_sec: 100,
81        }
82    }
83}
84
85impl WatchConfig {
86    /// Create a new watch config
87    pub fn new() -> Self {
88        Self::default()
89    }
90
91    /// Set debounce duration
92    pub fn with_debounce(mut self, duration: Duration) -> Self {
93        self.debounce_duration = duration;
94        self
95    }
96
97    /// Add file extension to watch
98    pub fn with_extension(mut self, ext: &str) -> Self {
99        self.extensions.push(ext.to_string());
100        self
101    }
102
103    /// Set recursive mode
104    pub fn with_recursive(mut self, recursive: bool) -> Self {
105        self.recursive = recursive;
106        self
107    }
108
109    /// Add ignore pattern
110    pub fn with_ignore(mut self, pattern: &str) -> Self {
111        self.ignore_patterns.push(pattern.to_string());
112        self
113    }
114
115    /// Check if a path should be watched
116    pub fn should_watch(&self, path: &Path) -> bool {
117        // Check extension
118        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
119        if !self.extensions.is_empty() && !self.extensions.iter().any(|e| e == ext) {
120            return false;
121        }
122
123        // Check ignore patterns (simple implementation)
124        let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
125        for pattern in &self.ignore_patterns {
126            if pattern.starts_with('*') && file_name.ends_with(&pattern[1..]) {
127                return false;
128            }
129            if pattern.ends_with('*') && file_name.starts_with(&pattern[..pattern.len() - 1]) {
130                return false;
131            }
132            if file_name == pattern {
133                return false;
134            }
135        }
136
137        true
138    }
139}
140
141/// Plugin file watcher
142pub struct PluginWatcher {
143    /// Watched directories
144    watch_paths: Arc<RwLock<Vec<PathBuf>>>,
145    /// Configuration
146    config: WatchConfig,
147    /// Event sender
148    event_tx: mpsc::Sender<WatchEvent>,
149    /// Event receiver (taken by consumer)
150    event_rx: Option<mpsc::Receiver<WatchEvent>>,
151    /// Internal watcher handle
152    watcher: Option<RecommendedWatcher>,
153    /// Last event times for debouncing
154    last_events: Arc<RwLock<HashMap<PathBuf, std::time::Instant>>>,
155    /// Shutdown signal
156    shutdown_tx: Option<mpsc::Sender<()>>,
157}
158
159impl PluginWatcher {
160    /// Create a new plugin watcher
161    pub fn new(config: WatchConfig) -> Self {
162        let (event_tx, event_rx) = mpsc::channel(1024);
163
164        Self {
165            watch_paths: Arc::new(RwLock::new(Vec::new())),
166            config,
167            event_tx,
168            event_rx: Some(event_rx),
169            watcher: None,
170            last_events: Arc::new(RwLock::new(HashMap::new())),
171            shutdown_tx: None,
172        }
173    }
174
175    /// Take the event receiver (can only be called once)
176    pub fn take_event_receiver(&mut self) -> Option<mpsc::Receiver<WatchEvent>> {
177        self.event_rx.take()
178    }
179
180    /// Add a directory to watch
181    pub async fn watch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), notify::Error> {
182        let path = path.as_ref().to_path_buf();
183
184        if !path.exists() {
185            warn!("Watch path does not exist: {:?}", path);
186            return Ok(());
187        }
188
189        info!("Adding watch path: {:?}", path);
190
191        // Add to tracked paths
192        {
193            let mut paths = self.watch_paths.write().await;
194            if !paths.contains(&path) {
195                paths.push(path.clone());
196            }
197        }
198
199        // Add to watcher if running
200        if let Some(ref mut watcher) = self.watcher {
201            let mode = if self.config.recursive {
202                RecursiveMode::Recursive
203            } else {
204                RecursiveMode::NonRecursive
205            };
206            watcher.watch(&path, mode)?;
207        }
208
209        Ok(())
210    }
211
212    /// Remove a directory from watching
213    pub async fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), notify::Error> {
214        let path = path.as_ref().to_path_buf();
215
216        info!("Removing watch path: {:?}", path);
217
218        // Remove from tracked paths
219        {
220            let mut paths = self.watch_paths.write().await;
221            paths.retain(|p| p != &path);
222        }
223
224        // Remove from watcher if running
225        if let Some(ref mut watcher) = self.watcher {
226            watcher.unwatch(&path)?;
227        }
228
229        Ok(())
230    }
231
232    /// Start watching for changes
233    pub async fn start(&mut self) -> Result<(), notify::Error> {
234        info!("Starting plugin watcher");
235
236        let event_tx = self.event_tx.clone();
237        let config = self.config.clone();
238        let last_events = self.last_events.clone();
239        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
240
241        self.shutdown_tx = Some(shutdown_tx);
242
243        // Create the file system watcher
244        let (tx, mut rx) = mpsc::channel(1024);
245
246        let watcher_config = Config::default().with_poll_interval(Duration::from_millis(100));
247
248        let mut watcher = RecommendedWatcher::new(
249            move |result: Result<Event, notify::Error>| {
250                if let Ok(event) = result {
251                    let _ = tx.blocking_send(event);
252                }
253            },
254            watcher_config,
255        )?;
256
257        // Add all tracked paths
258        let mode = if self.config.recursive {
259            RecursiveMode::Recursive
260        } else {
261            RecursiveMode::NonRecursive
262        };
263
264        let paths = self.watch_paths.read().await;
265        for path in paths.iter() {
266            watcher.watch(path, mode)?;
267        }
268
269        self.watcher = Some(watcher);
270
271        // Spawn event processing task
272        tokio::spawn(async move {
273            let mut rename_from: Option<PathBuf> = None;
274
275            loop {
276                tokio::select! {
277                    Some(event) = rx.recv() => {
278                        // Process file system event
279                        for path in event.paths {
280                            // Skip if should not watch
281                            if !config.should_watch(&path) {
282                                continue;
283                            }
284
285                            // Debounce check
286                            let should_process = {
287                                let mut last = last_events.write().await;
288                                let now = std::time::Instant::now();
289
290                                if let Some(last_time) = last.get(&path) {
291                                    if now.duration_since(*last_time) < config.debounce_duration {
292                                        false
293                                    } else {
294                                        last.insert(path.clone(), now);
295                                        true
296                                    }
297                                } else {
298                                    last.insert(path.clone(), now);
299                                    true
300                                }
301                            };
302
303                            if !should_process {
304                                debug!("Debounced event for {:?}", path);
305                                continue;
306                            }
307
308                            // Convert to watch event
309                            let watch_event = match event.kind {
310                                EventKind::Create(CreateKind::File) => {
311                                    Some(WatchEvent::new(WatchEventKind::Created, path.clone()))
312                                }
313                                EventKind::Modify(ModifyKind::Data(_)) |
314                                EventKind::Modify(ModifyKind::Any) => {
315                                    Some(WatchEvent::new(WatchEventKind::Modified, path.clone()))
316                                }
317                                EventKind::Remove(RemoveKind::File) => {
318                                    Some(WatchEvent::new(WatchEventKind::Removed, path.clone()))
319                                }
320                                EventKind::Modify(ModifyKind::Name(RenameMode::From)) => {
321                                    rename_from = Some(path.clone());
322                                    None
323                                }
324                                EventKind::Modify(ModifyKind::Name(RenameMode::To)) => {
325                                    if let Some(from) = rename_from.take() {
326                                        Some(WatchEvent::new(
327                                            WatchEventKind::Renamed {
328                                                from: from.clone(),
329                                                to: path.clone(),
330                                            },
331                                            path.clone(),
332                                        ))
333                                    } else {
334                                        Some(WatchEvent::new(WatchEventKind::Created, path.clone()))
335                                    }
336                                }
337                                _ => None,
338                            };
339
340                            if let Some(evt) = watch_event {
341                                debug!("Watch event: {:?}", evt);
342                                if event_tx.send(evt).await.is_err() {
343                                    error!("Failed to send watch event");
344                                    return;
345                                }
346                            }
347                        }
348                    }
349                    _ = shutdown_rx.recv() => {
350                        info!("Plugin watcher shutting down");
351                        return;
352                    }
353                }
354            }
355        });
356
357        Ok(())
358    }
359
360    /// Stop watching
361    pub async fn stop(&mut self) {
362        info!("Stopping plugin watcher");
363
364        // Send shutdown signal
365        if let Some(tx) = self.shutdown_tx.take() {
366            let _ = tx.send(()).await;
367        }
368
369        // Drop the watcher
370        self.watcher = None;
371    }
372
373    /// Get watched paths
374    pub async fn watched_paths(&self) -> Vec<PathBuf> {
375        self.watch_paths.read().await.clone()
376    }
377
378    /// Check if a path is being watched
379    pub async fn is_watching<P: AsRef<Path>>(&self, path: P) -> bool {
380        let paths = self.watch_paths.read().await;
381        paths.contains(&path.as_ref().to_path_buf())
382    }
383
384    /// Get configuration
385    pub fn config(&self) -> &WatchConfig {
386        &self.config
387    }
388
389    /// Scan for existing plugin files
390    pub async fn scan_existing(&self) -> Vec<PathBuf> {
391        let mut plugins = Vec::new();
392        let paths = self.watch_paths.read().await;
393
394        for watch_path in paths.iter() {
395            if let Ok(entries) = std::fs::read_dir(watch_path) {
396                for entry in entries.flatten() {
397                    let path = entry.path();
398                    if path.is_file() && self.config.should_watch(&path) {
399                        plugins.push(path);
400                    }
401                }
402            }
403        }
404
405        plugins
406    }
407}
408
409impl Drop for PluginWatcher {
410    fn drop(&mut self) {
411        // Watcher will be dropped automatically
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    #[test]
420    fn test_watch_config_default() {
421        let config = WatchConfig::default();
422        assert_eq!(config.extensions.len(), 3);
423        assert!(!config.recursive);
424    }
425
426    #[test]
427    fn test_should_watch() {
428        let config = WatchConfig::default();
429
430        // Should watch plugin files
431        assert!(config.should_watch(Path::new("/path/to/plugin.so")));
432        assert!(config.should_watch(Path::new("/path/to/plugin.dylib")));
433        assert!(config.should_watch(Path::new("/path/to/plugin.dll")));
434
435        // Should not watch non-plugin files
436        assert!(!config.should_watch(Path::new("/path/to/file.txt")));
437        assert!(!config.should_watch(Path::new("/path/to/file.rs")));
438
439        // Should not watch ignored patterns
440        assert!(!config.should_watch(Path::new("/path/to/plugin.so.tmp")));
441        assert!(!config.should_watch(Path::new("/path/to/plugin.swp")));
442    }
443
444    #[test]
445    fn test_watch_event() {
446        let event = WatchEvent::new(
447            WatchEventKind::Modified,
448            PathBuf::from("/path/to/plugin.so"),
449        );
450
451        assert!(event.is_plugin_file());
452        assert!(matches!(event.kind, WatchEventKind::Modified));
453    }
454
455    #[tokio::test]
456    async fn test_plugin_watcher_new() {
457        let config = WatchConfig::default();
458        let mut watcher = PluginWatcher::new(config);
459
460        assert!(watcher.take_event_receiver().is_some());
461        assert!(watcher.take_event_receiver().is_none()); // Can only take once
462    }
463}