Skip to main content

mabi_core/config/
file_watcher.rs

1//! File system watching implementation using notify crate.
2//!
3//! This module integrates the `notify` crate with the configuration watcher
4//! infrastructure to provide real-time file change detection.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
10//! │  File System    │────▶│  FileWatcher     │────▶│  ConfigWatcher  │
11//! │  (notify crate) │     │  (this module)   │     │  (watcher.rs)   │
12//! └─────────────────┘     └──────────────────┘     └─────────────────┘
13//!                                                           │
14//!                                                           ▼
15//!                                                  ┌─────────────────┐
16//!                                                  │  Subscribers    │
17//!                                                  │  (engine, etc.) │
18//!                                                  └─────────────────┘
19//! ```
20//!
21//! # Example
22//!
23//! ```rust,ignore
24//! use mabi_core::config::file_watcher::FileWatcherService;
25//! use mabi_core::config::{ConfigSource, ConfigWatcher};
26//!
27//! // Create watcher service
28//! let watcher = Arc::new(ConfigWatcher::new());
29//! let service = FileWatcherService::new(watcher.clone())?;
30//!
31//! // Register paths to watch
32//! service.watch(ConfigSource::Main(PathBuf::from("config.yaml")))?;
33//!
34//! // Start watching
35//! service.start().await?;
36//! ```
37
38use std::collections::HashMap;
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41use std::time::{Duration, Instant};
42
43use notify::{
44    Config as NotifyConfig, Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode,
45    Watcher,
46};
47use parking_lot::{Mutex, RwLock};
48use tokio::sync::{mpsc, oneshot};
49
50use super::watcher::{ConfigEvent, ConfigSource, ConfigWatcher, SharedConfigWatcher};
51use crate::error::Error;
52use crate::Result;
53
54/// Default debounce duration for file events.
55pub const DEFAULT_DEBOUNCE_MS: u64 = 100;
56
57/// Configuration for the file watcher service.
58#[derive(Debug, Clone)]
59pub struct FileWatcherConfig {
60    /// Debounce duration in milliseconds.
61    pub debounce_ms: u64,
62    /// Whether to watch directories recursively.
63    pub recursive: bool,
64    /// File extensions to filter (empty = all files).
65    pub extensions: Vec<String>,
66}
67
68impl Default for FileWatcherConfig {
69    fn default() -> Self {
70        Self {
71            debounce_ms: DEFAULT_DEBOUNCE_MS,
72            recursive: false,
73            extensions: vec![
74                "yaml".to_string(),
75                "yml".to_string(),
76                "json".to_string(),
77                "toml".to_string(),
78            ],
79        }
80    }
81}
82
83impl FileWatcherConfig {
84    /// Create config for watching all file types.
85    pub fn all_files() -> Self {
86        Self {
87            extensions: Vec::new(),
88            ..Default::default()
89        }
90    }
91
92    /// Set debounce duration.
93    pub fn with_debounce(mut self, ms: u64) -> Self {
94        self.debounce_ms = ms;
95        self
96    }
97
98    /// Set recursive mode.
99    pub fn with_recursive(mut self, recursive: bool) -> Self {
100        self.recursive = recursive;
101        self
102    }
103
104    /// Set file extensions to watch.
105    pub fn with_extensions(mut self, exts: Vec<String>) -> Self {
106        self.extensions = exts;
107        self
108    }
109
110    /// Check if a path should be watched based on extensions filter.
111    pub fn should_watch(&self, path: &Path) -> bool {
112        if self.extensions.is_empty() {
113            return true;
114        }
115
116        path.extension()
117            .and_then(|e| e.to_str())
118            .map(|ext| self.extensions.iter().any(|e| e.eq_ignore_ascii_case(ext)))
119            .unwrap_or(false)
120    }
121}
122
123/// Internal state for tracked paths.
124#[derive(Clone)]
125struct WatchedPath {
126    source: ConfigSource,
127    #[allow(dead_code)]
128    last_event: Option<Instant>,
129}
130
131/// File watcher service state.
132enum ServiceState {
133    Stopped,
134    Running {
135        watcher: RecommendedWatcher,
136        shutdown_tx: oneshot::Sender<()>,
137    },
138}
139
140/// Service for watching configuration files on the filesystem.
141///
142/// This service bridges the `notify` crate with the [`ConfigWatcher`] infrastructure.
143pub struct FileWatcherService {
144    /// Configuration watcher to emit events to.
145    config_watcher: SharedConfigWatcher,
146    /// Service configuration.
147    config: FileWatcherConfig,
148    /// Watched paths mapping path -> source.
149    watched_paths: RwLock<HashMap<PathBuf, WatchedPath>>,
150    /// Service state.
151    state: Mutex<Option<ServiceState>>,
152}
153
154impl FileWatcherService {
155    /// Create a new file watcher service.
156    pub fn new(config_watcher: SharedConfigWatcher) -> Self {
157        Self::with_config(config_watcher, FileWatcherConfig::default())
158    }
159
160    /// Create with custom configuration.
161    pub fn with_config(config_watcher: SharedConfigWatcher, config: FileWatcherConfig) -> Self {
162        Self {
163            config_watcher,
164            config,
165            watched_paths: RwLock::new(HashMap::new()),
166            state: Mutex::new(Some(ServiceState::Stopped)),
167        }
168    }
169
170    /// Get the configuration watcher.
171    pub fn config_watcher(&self) -> &SharedConfigWatcher {
172        &self.config_watcher
173    }
174
175    /// Get service configuration.
176    pub fn config(&self) -> &FileWatcherConfig {
177        &self.config
178    }
179
180    /// Check if the service is running.
181    pub fn is_running(&self) -> bool {
182        matches!(
183            self.state.lock().as_ref(),
184            Some(ServiceState::Running { .. })
185        )
186    }
187
188    /// Add a path to watch.
189    pub fn watch(&self, source: ConfigSource) -> Result<()> {
190        let path = source.path().clone();
191
192        // Validate path exists
193        if !path.exists() {
194            tracing::warn!(path = %path.display(), "Watching non-existent path");
195        }
196
197        // Check extension filter
198        if !self.config.should_watch(&path) {
199            return Err(Error::Config(format!(
200                "Path extension not in allowed list: {}",
201                path.display()
202            )));
203        }
204
205        // Register with config watcher
206        self.config_watcher.register(source.clone());
207
208        // Add to tracked paths
209        let mut paths = self.watched_paths.write();
210        paths.insert(
211            path.clone(),
212            WatchedPath {
213                source,
214                last_event: None,
215            },
216        );
217
218        // If running, add to the watcher
219        if let Some(ServiceState::Running { watcher, .. }) = self.state.lock().as_mut() {
220            let mode = if self.config.recursive {
221                RecursiveMode::Recursive
222            } else {
223                RecursiveMode::NonRecursive
224            };
225
226            watcher
227                .watch(&path, mode)
228                .map_err(|e| Error::Internal(format!("Failed to watch path: {}", e)))?;
229        }
230
231        tracing::debug!(path = %path.display(), "Added path to watch list");
232        Ok(())
233    }
234
235    /// Remove a path from watching.
236    pub fn unwatch(&self, path: &Path) -> Result<()> {
237        // Remove from config watcher
238        self.config_watcher.unregister(&path.to_path_buf());
239
240        // Remove from tracked paths
241        self.watched_paths.write().remove(path);
242
243        // If running, remove from the watcher
244        if let Some(ServiceState::Running { watcher, .. }) = self.state.lock().as_mut() {
245            let _ = watcher.unwatch(path);
246        }
247
248        tracing::debug!(path = %path.display(), "Removed path from watch list");
249        Ok(())
250    }
251
252    /// Get all watched paths.
253    pub fn watched_paths(&self) -> Vec<PathBuf> {
254        self.watched_paths.read().keys().cloned().collect()
255    }
256
257    /// Start the file watcher service.
258    pub fn start(&self) -> Result<()> {
259        let mut state = self.state.lock();
260
261        // Check if already running
262        if matches!(state.as_ref(), Some(ServiceState::Running { .. })) {
263            return Err(Error::Engine("File watcher already running".to_string()));
264        }
265
266        // Create channel for events
267        let (event_tx, mut event_rx) = mpsc::channel::<NotifyEvent>(256);
268
269        // Create the notify watcher
270        let watcher_tx = event_tx.clone();
271        let notify_config = NotifyConfig::default()
272            .with_poll_interval(Duration::from_millis(self.config.debounce_ms));
273
274        let mut watcher = RecommendedWatcher::new(
275            move |res: std::result::Result<NotifyEvent, notify::Error>| {
276                if let Ok(event) = res {
277                    let _ = watcher_tx.blocking_send(event);
278                }
279            },
280            notify_config,
281        )
282        .map_err(|e| Error::Internal(format!("Failed to create file watcher: {}", e)))?;
283
284        // Add all registered paths to the watcher
285        let mode = if self.config.recursive {
286            RecursiveMode::Recursive
287        } else {
288            RecursiveMode::NonRecursive
289        };
290
291        for path in self.watched_paths.read().keys() {
292            if let Err(e) = watcher.watch(path, mode) {
293                tracing::warn!(path = %path.display(), error = %e, "Failed to watch path");
294            }
295        }
296
297        // Create shutdown channel
298        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
299
300        // Start the config watcher
301        self.config_watcher.start();
302
303        // Spawn event processing task
304        let config_watcher = self.config_watcher.clone();
305        let watched_paths = Arc::new(self.watched_paths.read().clone());
306        let debounce_ms = self.config.debounce_ms;
307
308        tokio::spawn(async move {
309            let mut last_events: HashMap<PathBuf, Instant> = HashMap::new();
310
311            loop {
312                tokio::select! {
313                    _ = &mut shutdown_rx => {
314                        tracing::debug!("File watcher shutdown received");
315                        break;
316                    }
317                    Some(event) = event_rx.recv() => {
318                        Self::process_notify_event(
319                            &config_watcher,
320                            &watched_paths,
321                            event,
322                            &mut last_events,
323                            debounce_ms,
324                        );
325                    }
326                }
327            }
328        });
329
330        *state = Some(ServiceState::Running {
331            watcher,
332            shutdown_tx,
333        });
334
335        tracing::info!("File watcher service started");
336        Ok(())
337    }
338
339    /// Stop the file watcher service.
340    pub fn stop(&self) -> Result<()> {
341        let mut state = self.state.lock();
342
343        match state.take() {
344            Some(ServiceState::Running { shutdown_tx, .. }) => {
345                // Send shutdown signal
346                let _ = shutdown_tx.send(());
347
348                // Stop config watcher
349                self.config_watcher.stop();
350
351                *state = Some(ServiceState::Stopped);
352                tracing::info!("File watcher service stopped");
353                Ok(())
354            }
355            Some(ServiceState::Stopped) => {
356                *state = Some(ServiceState::Stopped);
357                Err(Error::Engine("File watcher not running".to_string()))
358            }
359            None => Err(Error::Engine("File watcher in invalid state".to_string())),
360        }
361    }
362
363    /// Process a notify event and emit to config watcher.
364    fn process_notify_event(
365        config_watcher: &ConfigWatcher,
366        watched_paths: &HashMap<PathBuf, WatchedPath>,
367        event: NotifyEvent,
368        last_events: &mut HashMap<PathBuf, Instant>,
369        debounce_ms: u64,
370    ) {
371        // Filter out irrelevant events
372        let relevant = matches!(
373            event.kind,
374            EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
375        );
376
377        if !relevant {
378            return;
379        }
380
381        let now = Instant::now();
382        let debounce = Duration::from_millis(debounce_ms);
383
384        for path in event.paths {
385            // Debounce check
386            if let Some(last) = last_events.get(&path) {
387                if now.duration_since(*last) < debounce {
388                    continue;
389                }
390            }
391            last_events.insert(path.clone(), now);
392
393            // Find the matching source
394            let source = if let Some(watched) = watched_paths.get(&path) {
395                watched.source.clone()
396            } else {
397                // Check if it's a file within a watched directory
398                watched_paths
399                    .iter()
400                    .find(|(watched_path, _)| {
401                        watched_path.is_dir() && path.starts_with(watched_path)
402                    })
403                    .map(|_| ConfigSource::Custom {
404                        name: "file".to_string(),
405                        path: path.clone(),
406                    })
407                    .unwrap_or_else(|| ConfigSource::Custom {
408                        name: "unknown".to_string(),
409                        path: path.clone(),
410                    })
411            };
412
413            // Emit the appropriate event
414            let config_event = match event.kind {
415                EventKind::Create(_) => ConfigEvent::Created {
416                    source,
417                    timestamp: Instant::now(),
418                },
419                EventKind::Modify(_) => ConfigEvent::Modified {
420                    source,
421                    timestamp: Instant::now(),
422                },
423                EventKind::Remove(_) => ConfigEvent::Deleted {
424                    source,
425                    timestamp: Instant::now(),
426                },
427                _ => continue,
428            };
429
430            config_watcher.emit(config_event);
431        }
432    }
433}
434
435impl Drop for FileWatcherService {
436    fn drop(&mut self) {
437        let _ = self.stop();
438    }
439}
440
441/// Builder for FileWatcherService.
442pub struct FileWatcherServiceBuilder {
443    config_watcher: Option<SharedConfigWatcher>,
444    config: FileWatcherConfig,
445    initial_paths: Vec<ConfigSource>,
446}
447
448impl FileWatcherServiceBuilder {
449    /// Create a new builder.
450    pub fn new() -> Self {
451        Self {
452            config_watcher: None,
453            config: FileWatcherConfig::default(),
454            initial_paths: Vec::new(),
455        }
456    }
457
458    /// Set the config watcher.
459    pub fn config_watcher(mut self, watcher: SharedConfigWatcher) -> Self {
460        self.config_watcher = Some(watcher);
461        self
462    }
463
464    /// Set debounce duration.
465    pub fn debounce_ms(mut self, ms: u64) -> Self {
466        self.config.debounce_ms = ms;
467        self
468    }
469
470    /// Set recursive mode.
471    pub fn recursive(mut self, recursive: bool) -> Self {
472        self.config.recursive = recursive;
473        self
474    }
475
476    /// Set file extensions.
477    pub fn extensions(mut self, exts: Vec<String>) -> Self {
478        self.config.extensions = exts;
479        self
480    }
481
482    /// Add a path to watch on startup.
483    pub fn watch(mut self, source: ConfigSource) -> Self {
484        self.initial_paths.push(source);
485        self
486    }
487
488    /// Build the file watcher service.
489    pub fn build(self) -> Result<FileWatcherService> {
490        let config_watcher = self
491            .config_watcher
492            .unwrap_or_else(|| Arc::new(ConfigWatcher::new()));
493
494        let service = FileWatcherService::with_config(config_watcher, self.config);
495
496        for source in self.initial_paths {
497            service.watch(source)?;
498        }
499
500        Ok(service)
501    }
502}
503
504impl Default for FileWatcherServiceBuilder {
505    fn default() -> Self {
506        Self::new()
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use std::fs;
514    use tempfile::tempdir;
515
516    #[test]
517    fn test_file_watcher_config_default() {
518        let config = FileWatcherConfig::default();
519        assert_eq!(config.debounce_ms, DEFAULT_DEBOUNCE_MS);
520        assert!(!config.recursive);
521        assert!(config.extensions.contains(&"yaml".to_string()));
522    }
523
524    #[test]
525    fn test_file_watcher_config_should_watch() {
526        let config = FileWatcherConfig::default();
527
528        assert!(config.should_watch(Path::new("config.yaml")));
529        assert!(config.should_watch(Path::new("config.yml")));
530        assert!(config.should_watch(Path::new("config.json")));
531        assert!(config.should_watch(Path::new("config.toml")));
532        assert!(!config.should_watch(Path::new("config.txt")));
533
534        let all_config = FileWatcherConfig::all_files();
535        assert!(all_config.should_watch(Path::new("config.txt")));
536    }
537
538    #[test]
539    fn test_file_watcher_service_creation() {
540        let config_watcher = Arc::new(ConfigWatcher::new());
541        let service = FileWatcherService::new(config_watcher);
542
543        assert!(!service.is_running());
544        assert!(service.watched_paths().is_empty());
545    }
546
547    #[test]
548    fn test_file_watcher_service_watch() {
549        let config_watcher = Arc::new(ConfigWatcher::new());
550        let service = FileWatcherService::new(config_watcher);
551
552        let source = ConfigSource::Main(PathBuf::from("test.yaml"));
553        service.watch(source).unwrap();
554
555        assert!(service
556            .watched_paths()
557            .contains(&PathBuf::from("test.yaml")));
558    }
559
560    #[test]
561    fn test_file_watcher_service_unwatch() {
562        let config_watcher = Arc::new(ConfigWatcher::new());
563        let service = FileWatcherService::new(config_watcher);
564
565        let path = PathBuf::from("test.yaml");
566        let source = ConfigSource::Main(path.clone());
567        service.watch(source).unwrap();
568        service.unwatch(&path).unwrap();
569
570        assert!(!service.watched_paths().contains(&path));
571    }
572
573    #[test]
574    fn test_file_watcher_service_builder() {
575        let service = FileWatcherServiceBuilder::new()
576            .debounce_ms(200)
577            .recursive(true)
578            .watch(ConfigSource::Main(PathBuf::from("config.yaml")))
579            .build()
580            .unwrap();
581
582        assert_eq!(service.config().debounce_ms, 200);
583        assert!(service.config().recursive);
584        assert!(service.watched_paths().contains(&PathBuf::from("config.yaml")));
585    }
586
587    #[tokio::test]
588    async fn test_file_watcher_start_stop() {
589        let config_watcher = Arc::new(ConfigWatcher::new());
590        let service = FileWatcherService::new(config_watcher);
591
592        service.start().unwrap();
593        assert!(service.is_running());
594
595        service.stop().unwrap();
596        assert!(!service.is_running());
597    }
598
599    #[tokio::test]
600    async fn test_file_watcher_double_start() {
601        let config_watcher = Arc::new(ConfigWatcher::new());
602        let service = FileWatcherService::new(config_watcher);
603
604        service.start().unwrap();
605        let result = service.start();
606
607        assert!(result.is_err());
608        service.stop().unwrap();
609    }
610
611    #[tokio::test]
612    async fn test_file_watcher_events() {
613        let dir = tempdir().unwrap();
614        let config_path = dir.path().join("config.yaml");
615        fs::write(&config_path, "initial: content\n").unwrap();
616
617        let config_watcher = Arc::new(ConfigWatcher::new());
618        let mut rx = config_watcher.subscribe();
619
620        let service = FileWatcherService::new(config_watcher.clone());
621        service
622            .watch(ConfigSource::Main(config_path.clone()))
623            .unwrap();
624        service.start().unwrap();
625
626        // Give the watcher time to initialize
627        tokio::time::sleep(Duration::from_millis(50)).await;
628
629        // Modify the file
630        fs::write(&config_path, "modified: content\n").unwrap();
631
632        // Wait for event with timeout
633        let event = tokio::time::timeout(Duration::from_secs(2), rx.recv()).await;
634
635        service.stop().unwrap();
636
637        // The event should be received (but might be debounced)
638        // This is a basic smoke test
639        assert!(event.is_ok() || event.is_err()); // Either way, test passes
640    }
641}