1use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::Duration;
9
10use notify::{
11 Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
12 event::{CreateKind, ModifyKind, RemoveKind, RenameMode},
13};
14use tokio::sync::{RwLock, mpsc};
15use tracing::{debug, error, info, warn};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum WatchEventKind {
20 Created,
22 Modified,
24 Removed,
26 Renamed { from: PathBuf, to: PathBuf },
28}
29
30#[derive(Debug, Clone)]
32pub struct WatchEvent {
33 pub kind: WatchEventKind,
35 pub path: PathBuf,
37 pub timestamp: std::time::Instant,
39}
40
41impl WatchEvent {
42 pub fn new(kind: WatchEventKind, path: PathBuf) -> Self {
44 Self {
45 kind,
46 path,
47 timestamp: std::time::Instant::now(),
48 }
49 }
50
51 pub fn is_plugin_file(&self) -> bool {
53 let ext = self.path.extension().and_then(|e| e.to_str());
54 matches!(ext, Some("so") | Some("dylib") | Some("dll"))
55 }
56}
57
58#[derive(Debug, Clone)]
60pub struct WatchConfig {
61 pub debounce_duration: Duration,
63 pub extensions: Vec<String>,
65 pub recursive: bool,
67 pub ignore_patterns: Vec<String>,
69 pub max_events_per_sec: u32,
71}
72
73impl Default for WatchConfig {
74 fn default() -> Self {
75 Self {
76 debounce_duration: Duration::from_millis(500),
77 extensions: vec!["so".to_string(), "dylib".to_string(), "dll".to_string()],
78 recursive: false,
79 ignore_patterns: vec!["*.tmp".to_string(), "*.swp".to_string(), "*~".to_string()],
80 max_events_per_sec: 100,
81 }
82 }
83}
84
85impl WatchConfig {
86 pub fn new() -> Self {
88 Self::default()
89 }
90
91 pub fn with_debounce(mut self, duration: Duration) -> Self {
93 self.debounce_duration = duration;
94 self
95 }
96
97 pub fn with_extension(mut self, ext: &str) -> Self {
99 self.extensions.push(ext.to_string());
100 self
101 }
102
103 pub fn with_recursive(mut self, recursive: bool) -> Self {
105 self.recursive = recursive;
106 self
107 }
108
109 pub fn with_ignore(mut self, pattern: &str) -> Self {
111 self.ignore_patterns.push(pattern.to_string());
112 self
113 }
114
115 pub fn should_watch(&self, path: &Path) -> bool {
117 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
119 if !self.extensions.is_empty() && !self.extensions.iter().any(|e| e == ext) {
120 return false;
121 }
122
123 let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
125 for pattern in &self.ignore_patterns {
126 if pattern.starts_with('*') && file_name.ends_with(&pattern[1..]) {
127 return false;
128 }
129 if pattern.ends_with('*') && file_name.starts_with(&pattern[..pattern.len() - 1]) {
130 return false;
131 }
132 if file_name == pattern {
133 return false;
134 }
135 }
136
137 true
138 }
139}
140
141pub struct PluginWatcher {
143 watch_paths: Arc<RwLock<Vec<PathBuf>>>,
145 config: WatchConfig,
147 event_tx: mpsc::Sender<WatchEvent>,
149 event_rx: Option<mpsc::Receiver<WatchEvent>>,
151 watcher: Option<RecommendedWatcher>,
153 last_events: Arc<RwLock<HashMap<PathBuf, std::time::Instant>>>,
155 shutdown_tx: Option<mpsc::Sender<()>>,
157}
158
159impl PluginWatcher {
160 pub fn new(config: WatchConfig) -> Self {
162 let (event_tx, event_rx) = mpsc::channel(1024);
163
164 Self {
165 watch_paths: Arc::new(RwLock::new(Vec::new())),
166 config,
167 event_tx,
168 event_rx: Some(event_rx),
169 watcher: None,
170 last_events: Arc::new(RwLock::new(HashMap::new())),
171 shutdown_tx: None,
172 }
173 }
174
175 pub fn take_event_receiver(&mut self) -> Option<mpsc::Receiver<WatchEvent>> {
177 self.event_rx.take()
178 }
179
180 pub async fn watch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), notify::Error> {
182 let path = path.as_ref().to_path_buf();
183
184 if !path.exists() {
185 warn!("Watch path does not exist: {:?}", path);
186 return Ok(());
187 }
188
189 info!("Adding watch path: {:?}", path);
190
191 {
193 let mut paths = self.watch_paths.write().await;
194 if !paths.contains(&path) {
195 paths.push(path.clone());
196 }
197 }
198
199 if let Some(ref mut watcher) = self.watcher {
201 let mode = if self.config.recursive {
202 RecursiveMode::Recursive
203 } else {
204 RecursiveMode::NonRecursive
205 };
206 watcher.watch(&path, mode)?;
207 }
208
209 Ok(())
210 }
211
212 pub async fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), notify::Error> {
214 let path = path.as_ref().to_path_buf();
215
216 info!("Removing watch path: {:?}", path);
217
218 {
220 let mut paths = self.watch_paths.write().await;
221 paths.retain(|p| p != &path);
222 }
223
224 if let Some(ref mut watcher) = self.watcher {
226 watcher.unwatch(&path)?;
227 }
228
229 Ok(())
230 }
231
232 pub async fn start(&mut self) -> Result<(), notify::Error> {
234 info!("Starting plugin watcher");
235
236 let event_tx = self.event_tx.clone();
237 let config = self.config.clone();
238 let last_events = self.last_events.clone();
239 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
240
241 self.shutdown_tx = Some(shutdown_tx);
242
243 let (tx, mut rx) = mpsc::channel(1024);
245
246 let watcher_config = Config::default().with_poll_interval(Duration::from_millis(100));
247
248 let mut watcher = RecommendedWatcher::new(
249 move |result: Result<Event, notify::Error>| {
250 if let Ok(event) = result {
251 let _ = tx.blocking_send(event);
252 }
253 },
254 watcher_config,
255 )?;
256
257 let mode = if self.config.recursive {
259 RecursiveMode::Recursive
260 } else {
261 RecursiveMode::NonRecursive
262 };
263
264 let paths = self.watch_paths.read().await;
265 for path in paths.iter() {
266 watcher.watch(path, mode)?;
267 }
268
269 self.watcher = Some(watcher);
270
271 tokio::spawn(async move {
273 let mut rename_from: Option<PathBuf> = None;
274
275 loop {
276 tokio::select! {
277 Some(event) = rx.recv() => {
278 for path in event.paths {
280 if !config.should_watch(&path) {
282 continue;
283 }
284
285 let should_process = {
287 let mut last = last_events.write().await;
288 let now = std::time::Instant::now();
289
290 if let Some(last_time) = last.get(&path) {
291 if now.duration_since(*last_time) < config.debounce_duration {
292 false
293 } else {
294 last.insert(path.clone(), now);
295 true
296 }
297 } else {
298 last.insert(path.clone(), now);
299 true
300 }
301 };
302
303 if !should_process {
304 debug!("Debounced event for {:?}", path);
305 continue;
306 }
307
308 let watch_event = match event.kind {
310 EventKind::Create(CreateKind::File) => {
311 Some(WatchEvent::new(WatchEventKind::Created, path.clone()))
312 }
313 EventKind::Modify(ModifyKind::Data(_)) |
314 EventKind::Modify(ModifyKind::Any) => {
315 Some(WatchEvent::new(WatchEventKind::Modified, path.clone()))
316 }
317 EventKind::Remove(RemoveKind::File) => {
318 Some(WatchEvent::new(WatchEventKind::Removed, path.clone()))
319 }
320 EventKind::Modify(ModifyKind::Name(RenameMode::From)) => {
321 rename_from = Some(path.clone());
322 None
323 }
324 EventKind::Modify(ModifyKind::Name(RenameMode::To)) => {
325 if let Some(from) = rename_from.take() {
326 Some(WatchEvent::new(
327 WatchEventKind::Renamed {
328 from: from.clone(),
329 to: path.clone(),
330 },
331 path.clone(),
332 ))
333 } else {
334 Some(WatchEvent::new(WatchEventKind::Created, path.clone()))
335 }
336 }
337 _ => None,
338 };
339
340 if let Some(evt) = watch_event {
341 debug!("Watch event: {:?}", evt);
342 if event_tx.send(evt).await.is_err() {
343 error!("Failed to send watch event");
344 return;
345 }
346 }
347 }
348 }
349 _ = shutdown_rx.recv() => {
350 info!("Plugin watcher shutting down");
351 return;
352 }
353 }
354 }
355 });
356
357 Ok(())
358 }
359
360 pub async fn stop(&mut self) {
362 info!("Stopping plugin watcher");
363
364 if let Some(tx) = self.shutdown_tx.take() {
366 let _ = tx.send(()).await;
367 }
368
369 self.watcher = None;
371 }
372
373 pub async fn watched_paths(&self) -> Vec<PathBuf> {
375 self.watch_paths.read().await.clone()
376 }
377
378 pub async fn is_watching<P: AsRef<Path>>(&self, path: P) -> bool {
380 let paths = self.watch_paths.read().await;
381 paths.contains(&path.as_ref().to_path_buf())
382 }
383
384 pub fn config(&self) -> &WatchConfig {
386 &self.config
387 }
388
389 pub async fn scan_existing(&self) -> Vec<PathBuf> {
391 let mut plugins = Vec::new();
392 let paths = self.watch_paths.read().await;
393
394 for watch_path in paths.iter() {
395 if let Ok(entries) = std::fs::read_dir(watch_path) {
396 for entry in entries.flatten() {
397 let path = entry.path();
398 if path.is_file() && self.config.should_watch(&path) {
399 plugins.push(path);
400 }
401 }
402 }
403 }
404
405 plugins
406 }
407}
408
409impl Drop for PluginWatcher {
410 fn drop(&mut self) {
411 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418
419 #[test]
420 fn test_watch_config_default() {
421 let config = WatchConfig::default();
422 assert_eq!(config.extensions.len(), 3);
423 assert!(!config.recursive);
424 }
425
426 #[test]
427 fn test_should_watch() {
428 let config = WatchConfig::default();
429
430 assert!(config.should_watch(Path::new("/path/to/plugin.so")));
432 assert!(config.should_watch(Path::new("/path/to/plugin.dylib")));
433 assert!(config.should_watch(Path::new("/path/to/plugin.dll")));
434
435 assert!(!config.should_watch(Path::new("/path/to/file.txt")));
437 assert!(!config.should_watch(Path::new("/path/to/file.rs")));
438
439 assert!(!config.should_watch(Path::new("/path/to/plugin.so.tmp")));
441 assert!(!config.should_watch(Path::new("/path/to/plugin.swp")));
442 }
443
444 #[test]
445 fn test_watch_event() {
446 let event = WatchEvent::new(
447 WatchEventKind::Modified,
448 PathBuf::from("/path/to/plugin.so"),
449 );
450
451 assert!(event.is_plugin_file());
452 assert!(matches!(event.kind, WatchEventKind::Modified));
453 }
454
455 #[tokio::test]
456 async fn test_plugin_watcher_new() {
457 let config = WatchConfig::default();
458 let mut watcher = PluginWatcher::new(config);
459
460 assert!(watcher.take_event_receiver().is_some());
461 assert!(watcher.take_event_receiver().is_none()); }
463}