Skip to main content

magellan/watcher/
mod.rs

1//! Filesystem watcher with debounced batch events.
2//!
3//! Provides deterministic event coalescing: all events within a debounce window
4//! are collected, de-duplicated, sorted lexicographically, and emitted as a single
5//! batch. This ensures the same final DB state regardless of event arrival order.
6//!
7//! # Threading Design
8//!
9//! This watcher uses thread-safe synchronization for concurrent access.
10//! The legacy pending state fields use `Arc<Mutex<T>>` to allow safe access
11//! from multiple threads during concurrent operations and shutdown.
12//!
13//! **Thread safety:** `Arc<Mutex<T>>` provides runtime mutual exclusion
14//! and is safe to share across threads. The mutex will panic if poisoned
15//! (consistent with RefCell behavior).
16//!
17//! # Global Lock Ordering
18//!
19//! This module participates in the global lock ordering hierarchy:
20//!
21//! 1. **watcher state locks** (legacy_pending_batch, legacy_pending_index)—acquired first
22//! 2. **indexer shared state locks** (dirty_paths)—acquired second
23//! 3. **wakeup channel send** (highest priority)—acquired last
24//!
25//! **Rule:** Never send to wakeup channel while holding other locks.
26//!
27//! See `src/indexer.rs::PipelineSharedState` for full lock ordering documentation.
28//!
29//! See MANUAL.md for architecture details.
30
31use anyhow::Result;
32use notify::RecursiveMode;
33use notify_debouncer_mini::new_debouncer;
34use serde::{Deserialize, Serialize};
35use std::collections::BTreeSet;
36use std::mem::ManuallyDrop;
37use std::path::{Path, PathBuf};
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::sync::mpsc::{self, Receiver, Sender};
40use std::sync::{Arc, Mutex};
41use std::thread;
42use std::time::Duration;
43
44use crate::diagnostics::SkipReason;
45use crate::graph::filter::FileFilter;
46
47/// Deterministic batch of dirty file paths.
48///
49/// Contains ONLY paths (no timestamps, no event types) to ensure deterministic
50/// behavior. Paths are sorted lexicographically before emission.
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
52pub struct WatcherBatch {
53    /// Dirty file paths to reconcile, in lexicographic order
54    pub paths: Vec<PathBuf>,
55}
56
57impl WatcherBatch {
58    /// Create a new batch from a set of paths, sorting them deterministically.
59    fn from_set(paths: BTreeSet<PathBuf>) -> Self {
60        Self {
61            paths: paths.into_iter().collect(),
62        }
63    }
64
65    /// Empty batch for when no dirty paths exist after filtering.
66    pub fn empty() -> Self {
67        Self { paths: Vec::new() }
68    }
69
70    /// Whether this batch contains any paths.
71    pub fn is_empty(&self) -> bool {
72        self.paths.is_empty()
73    }
74}
75
76/// Filesystem watcher configuration
77#[derive(Debug, Clone)]
78pub struct WatcherConfig {
79    /// Root directory for path validation
80    pub root_path: PathBuf,
81    /// Debounce delay in milliseconds
82    pub debounce_ms: u64,
83    /// Enable .gitignore filtering (default: true)
84    pub gitignore_aware: bool,
85}
86
87impl Default for WatcherConfig {
88    fn default() -> Self {
89        Self {
90            root_path: PathBuf::from("."),
91            debounce_ms: 500,
92            gitignore_aware: true,
93        }
94    }
95}
96
97/// Filesystem watcher that emits debounced batches of dirty paths.
98///
99/// Uses notify-debouncer-mini for event coalescing. All paths within the
100/// debounce window are collected, de-duplicated, sorted, and emitted as a
101/// single WatcherBatch.
102///
103
104pub struct FileSystemWatcher {
105    /// Watcher thread handle (wrapped in ManuallyDrop for custom Drop/shutdown logic)
106    _watcher_thread: ManuallyDrop<thread::JoinHandle<()>>,
107    batch_receiver: Receiver<WatcherBatch>,
108    /// Legacy compatibility: pending batch to emit one path at a time
109    /// Thread-safe: wrapped in Arc<Mutex<T>> for concurrent access
110    legacy_pending_batch: Arc<Mutex<Option<WatcherBatch>>>,
111    /// Legacy compatibility: current index into pending batch
112    /// Thread-safe: wrapped in Arc<Mutex<T>> for concurrent access
113    legacy_pending_index: Arc<Mutex<usize>>,
114
115}
116
117impl FileSystemWatcher {
118    /// Create a new watcher for the given directory.
119    ///
120    /// # Arguments
121    /// * `path` - Directory to watch recursively (also used as root_path for validation)
122    /// * `config` - Watcher configuration
123    /// * `shutdown` - AtomicBool for graceful shutdown
124    ///
125    /// # Returns
126    /// A watcher that can be polled for batch events
127    pub fn new(path: PathBuf, config: WatcherConfig, shutdown: Arc<AtomicBool>) -> Result<Self> {
128        let (batch_tx, batch_rx) = mpsc::channel();
129
130        // Ensure root_path is set to the watched directory for validation
131        let config = WatcherConfig {
132            root_path: path.clone(),
133            ..config
134        };
135
136        let thread = thread::spawn(move || {
137            if let Err(e) = run_watcher(path, batch_tx, config, shutdown) {
138                eprintln!("Watcher error: {:?}", e);
139            }
140        });
141
142        Ok(Self {
143            _watcher_thread: ManuallyDrop::new(thread),
144            batch_receiver: batch_rx,
145            legacy_pending_batch: Arc::new(Mutex::new(None)),
146            legacy_pending_index: Arc::new(Mutex::new(0)),
147        })
148    }
149
150
151
152    /// Receive the next batch, blocking until available.
153    ///
154    /// # Returns
155    /// `None` if the watcher thread has terminated
156    pub fn recv_batch(&self) -> Option<WatcherBatch> {
157        self.batch_receiver.recv().ok()
158    }
159
160    /// Try to receive a batch without blocking.
161    ///
162    /// # Returns
163    /// - `Some(batch)` if a batch is available
164    /// - `None` if no batch is available or watcher terminated
165    pub fn try_recv_batch(&self) -> Option<WatcherBatch> {
166        self.batch_receiver.try_recv().ok()
167    }
168
169    /// Receive the next batch with a timeout.
170    ///
171    /// # Returns
172    /// - `Ok(Some(batch))` if a batch is available
173    /// - `Ok(None)` if the watcher thread has terminated
174    /// - `Err` if timeout elapsed
175    pub fn recv_batch_timeout(&self, timeout: Duration) -> Result<Option<WatcherBatch>, ()> {
176        match self.batch_receiver.recv_timeout(timeout) {
177            Ok(batch) => Ok(Some(batch)),
178            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(()),
179            Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => Ok(None),
180        }
181    }
182
183    // ========================================================================
184    // LEGACY: Old single-event API for backward compatibility during migration
185    // ========================================================================
186
187    /// Legacy: Try to receive a single event without blocking (DEPRECATED).
188    ///
189    /// This method converts batch events to single events for backward
190    /// compatibility. Paths from each batch are returned one at a time
191    /// in sorted order.
192    ///
193    /// # Deprecated
194    /// Use `try_recv_batch()` instead for deterministic batch processing.
195    ///
196    /// # Errors
197    /// Returns an error if a mutex is poisoned (thread panicked while holding the lock).
198    pub fn try_recv_event(&self) -> Result<Option<FileEvent>> {
199        // First, check if we have a pending batch to continue from
200        {
201            let mut pending_batch = self.legacy_pending_batch.lock()
202                .map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
203            let mut pending_index = self.legacy_pending_index.lock()
204                .map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
205
206            if let Some(ref batch) = *pending_batch {
207                if *pending_index < batch.paths.len() {
208                    let path = batch.paths[*pending_index].clone();
209                    *pending_index += 1;
210
211                    // Check if we've exhausted this batch
212                    if *pending_index >= batch.paths.len() {
213                        *pending_batch = None;
214                        *pending_index = 0;
215                    }
216
217                    return Ok(Some(FileEvent {
218                        path,
219                        event_type: EventType::Modify,
220                    }));
221                }
222            }
223        }
224
225        // No pending batch or batch exhausted, try to get a new batch
226        if let Ok(batch) = self.batch_receiver.try_recv() {
227            if batch.paths.is_empty() {
228                return Ok(None);
229            }
230
231            // If there are multiple paths, store the batch for next call
232            if batch.paths.len() > 1 {
233                let path = batch.paths[0].clone();
234                let mut pending_batch = self.legacy_pending_batch.lock()
235                    .map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
236                let mut pending_index = self.legacy_pending_index.lock()
237                    .map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
238                *pending_batch = Some(batch);
239                *pending_index = 1; // Next call will return index 1
240                drop(pending_batch);
241                drop(pending_index);
242                return Ok(Some(FileEvent {
243                    path,
244                    event_type: EventType::Modify,
245                }));
246            }
247
248            // Single path, return it directly
249            Ok(Some(FileEvent {
250                path: batch.paths[0].clone(),
251                event_type: EventType::Modify,
252            }))
253        } else {
254            Ok(None)
255        }
256    }
257
258    /// Legacy: Receive the next event, blocking until available (DEPRECATED).
259    ///
260    /// This method converts batch events to single events for backward
261    /// compatibility. Paths from each batch are returned one at a time
262    /// in sorted order.
263    ///
264    /// # Deprecated
265    /// Use `recv_batch()` instead for deterministic batch processing.
266    ///
267    /// # Errors
268    /// Returns an error if a mutex is poisoned (thread panicked while holding the lock).
269    pub fn recv_event(&self) -> Result<Option<FileEvent>> {
270        // First, check if we have a pending batch to continue from
271        {
272            let mut pending_batch = self.legacy_pending_batch.lock()
273                .map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
274            let mut pending_index = self.legacy_pending_index.lock()
275                .map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
276
277            if let Some(ref batch) = *pending_batch {
278                if *pending_index < batch.paths.len() {
279                    let path = batch.paths[*pending_index].clone();
280                    *pending_index += 1;
281
282                    // Check if we've exhausted this batch
283                    if *pending_index >= batch.paths.len() {
284                        *pending_batch = None;
285                        *pending_index = 0;
286                    }
287
288                    return Ok(Some(FileEvent {
289                        path,
290                        event_type: EventType::Modify,
291                    }));
292                }
293            }
294        }
295
296        // No pending batch or batch exhausted, block for a new batch
297        if let Ok(batch) = self.batch_receiver.recv() {
298            if batch.paths.is_empty() {
299                return Ok(None);
300            }
301
302            // If there are multiple paths, store the batch for next call
303            if batch.paths.len() > 1 {
304                let path = batch.paths[0].clone();
305                let mut pending_batch = self.legacy_pending_batch.lock()
306                    .map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
307                let mut pending_index = self.legacy_pending_index.lock()
308                    .map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
309                *pending_batch = Some(batch);
310                *pending_index = 1; // Next call will return index 1
311                drop(pending_batch);
312                drop(pending_index);
313                return Ok(Some(FileEvent {
314                    path,
315                    event_type: EventType::Modify,
316                }));
317            }
318
319            // Single path, return it directly
320            Ok(Some(FileEvent {
321                path: batch.paths[0].clone(),
322                event_type: EventType::Modify,
323            }))
324        } else {
325            Ok(None)
326        }
327    }
328
329    /// Explicitly shut down the watcher and join all background threads.
330    ///
331    /// This method consumes the watcher, ensuring that:
332    /// 1. The pub/sub receiver is shut down cleanly (if present)
333    /// 2. The watcher thread is joined (waits for clean termination)
334    ///
335    /// # Note
336    ///
337    /// This method should be called during graceful shutdown to ensure
338    /// all threads have terminated before the program exits.
339    pub fn shutdown(mut self) {
340        // Take ownership of self (consume it)
341        // SAFETY: We're consuming self, so we can safely extract the JoinHandle
342        let thread = unsafe { ManuallyDrop::take(&mut self._watcher_thread) };
343        // Join the thread - this waits for the watcher to exit cleanly
344        let _ = thread.join();
345        // Note: pubsub_receiver is dropped here, triggering its Drop impl
346    }
347}
348
349impl Drop for FileSystemWatcher {
350    fn drop(&mut self) {
351        // SAFETY: Drop is running, we can safely extract the JoinHandle
352        // and drop it without running its destructor (thread should be shutting down)
353        let _thread = unsafe { ManuallyDrop::take(&mut self._watcher_thread) };
354        drop(_thread);
355        // Note: The watcher thread will exit when shutdown flag is set
356    }
357}
358
359/// Run the debounced watcher in a dedicated thread.
360///
361/// Uses notify-debouncer-mini for event coalescing. Batches are emitted
362/// after the debounce delay expires with all paths that changed during
363/// the window.
364fn run_watcher(
365    path: PathBuf,
366    tx: Sender<WatcherBatch>,
367    config: WatcherConfig,
368    shutdown: Arc<AtomicBool>,
369) -> Result<()> {
370    // Convert debounce_ms to Duration
371    let debounce_duration = Duration::from_millis(config.debounce_ms);
372
373    // Get the root path for validation
374    let root_path = config.root_path.clone();
375
376    // Create gitignore filter if enabled (created ONCE before debouncer)
377    // This avoids re-parsing .gitignore on every event
378    let filter = if config.gitignore_aware {
379        match FileFilter::new(&root_path, &[], &[]) {
380            Ok(f) => Some(f),
381            Err(e) => {
382                eprintln!("Warning: Failed to create gitignore filter: {}", e);
383                None
384            }
385        }
386    } else {
387        None
388    };
389
390    // Create debouncer with notify 8.x API
391    // The debouncer calls our closure on each batch of events
392    let mut debouncer = new_debouncer(
393        debounce_duration,
394        move |result: notify_debouncer_mini::DebounceEventResult| {
395            match result {
396                Ok(events) => {
397                    // Collect all dirty paths from this batch
398                    // Pass filter reference (moved into closure)
399                    let dirty_paths = extract_dirty_paths(&events, &root_path, filter.as_ref());
400
401                    if !dirty_paths.is_empty() {
402                        let batch = WatcherBatch::from_set(dirty_paths);
403                        let _ = tx.send(batch);
404                    }
405                }
406                Err(error) => {
407                    eprintln!("Watcher error: {:?}", error);
408                }
409            }
410        },
411    )?;
412
413    // Watch the directory recursively via the inner watcher
414    debouncer.watcher().watch(&path, RecursiveMode::Recursive)?;
415
416    // Keep the thread alive until shutdown is signaled
417    // The debouncer runs in the background and sends batches via callback
418    while !shutdown.load(Ordering::SeqCst) {
419        thread::sleep(Duration::from_secs(1));
420    }
421
422    Ok(())
423}
424
425/// Extract dirty paths from a batch of debouncer events.
426///
427/// Filtering rules:
428/// - Exclude directories (only process files)
429/// - Exclude database-related files (.db, .sqlite, etc.)
430/// - Apply gitignore filter if provided (skip ignored files)
431/// - Validate paths are within project root (security: prevent path traversal)
432/// - De-duplicate via BTreeSet
433///
434/// Returns: BTreeSet of dirty paths (sorted deterministically)
435fn extract_dirty_paths(
436    events: &[notify_debouncer_mini::DebouncedEvent],
437    root: &Path,
438    filter: Option<&FileFilter>,
439) -> BTreeSet<PathBuf> {
440    let mut dirty_paths = BTreeSet::new();
441
442    for event in events {
443        let path = &event.path;
444
445        // Skip directories
446        if path.is_dir() {
447            continue;
448        }
449
450        // Skip database-related files to avoid feedback loop
451        let path_str = path.to_string_lossy();
452        if is_database_file(&path_str) {
453            continue;
454        }
455
456        // Apply gitignore filter if enabled
457        // This checks .gitignore patterns and internal ignores (target/, node_modules/, etc.)
458        if let Some(f) = filter {
459            match f.should_skip(path) {
460                None => {} // Path is not skipped, continue processing
461                Some(SkipReason::NotAFile) => {
462                    // File doesn't exist on disk - this could be a delete event
463                    // or a temporary file. Still report it so the indexer can
464                    // reconcile deletions.
465                }
466                Some(_) => {
467                    // Path is ignored by gitignore or other reasons, skip it
468                    continue;
469                }
470            }
471        }
472
473        // Validate path is within project root (security: prevent path traversal)
474        match crate::validation::validate_path_within_root(path, root) {
475            Ok(_) => {
476                // Path is safe, normalize before inserting
477                let normalized = crate::validation::normalize_path(path)
478                    .unwrap_or_else(|_| path.to_string_lossy().to_string());
479                dirty_paths.insert(PathBuf::from(normalized));
480            }
481            Err(crate::validation::PathValidationError::OutsideRoot(p, _)) => {
482                // Log the rejection but don't crash
483                eprintln!("WARNING: Watcher rejected path outside project root: {}", p);
484            }
485            Err(crate::validation::PathValidationError::SuspiciousTraversal(p)) => {
486                // Log suspicious path patterns
487                eprintln!(
488                    "WARNING: Watcher rejected suspicious traversal pattern: {}",
489                    p
490                );
491            }
492            Err(crate::validation::PathValidationError::SymlinkEscape(from, to)) => {
493                eprintln!(
494                    "WARNING: Watcher rejected symlink escaping root: {} -> {}",
495                    from, to
496                );
497            }
498            Err(crate::validation::PathValidationError::CannotCanonicalize(_)) => {
499                // Path doesn't exist or can't be accessed
500                // This happens for deleted files - still report them so the indexer
501                // can reconcile the deletion
502                let normalized = crate::validation::normalize_path(path)
503                    .unwrap_or_else(|_| path.to_string_lossy().to_string());
504                dirty_paths.insert(PathBuf::from(normalized));
505            }
506        }
507    }
508
509    dirty_paths
510}
511
512/// Check if a path is a database file that should be excluded from watching.
513///
514/// Database files are excluded because the indexer writes to them, which
515/// would create a feedback loop (write event -> indexer writes again -> ...).
516fn is_database_file(path: &str) -> bool {
517    let path_lower = path.to_lowercase();
518    path_lower.ends_with(".db")
519        || path_lower.ends_with(".db-journal")
520        || path_lower.ends_with(".db-wal")
521        || path_lower.ends_with(".db-shm")
522        || path_lower.ends_with(".sqlite")
523        || path_lower.ends_with(".sqlite3")
524}
525
526// ============================================================================
527// LEGACY: Old single-event types for backward compatibility during migration
528// ============================================================================
529
530/// Legacy: File event emitted by the watcher (DEPRECATED).
531///
532/// This type is kept for backward compatibility during the migration to
533/// batch-based processing. New code should use `WatcherBatch` instead.
534#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
535pub struct FileEvent {
536    /// Path of the affected file
537    pub path: PathBuf,
538    /// Type of event (DEPRECATED - not used in batch processing)
539    pub event_type: EventType,
540}
541
542/// Type of file event (DEPRECATED - not used in batch processing).
543#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
544pub enum EventType {
545    /// File was created
546    Create,
547    /// File was modified
548    Modify,
549    /// File was deleted
550    Delete,
551}
552
553impl std::fmt::Display for EventType {
554    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
555        match self {
556            EventType::Create => write!(f, "CREATE"),
557            EventType::Modify => write!(f, "MODIFY"),
558            EventType::Delete => write!(f, "DELETE"),
559        }
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566
567    #[test]
568    fn test_batch_is_empty() {
569        let batch = WatcherBatch::empty();
570        assert!(batch.is_empty());
571    }
572
573    #[test]
574    fn test_batch_from_set_sorts_deterministically() {
575        let mut set = BTreeSet::new();
576        set.insert(PathBuf::from("/zebra.rs"));
577        set.insert(PathBuf::from("/alpha.rs"));
578        set.insert(PathBuf::from("/beta.rs"));
579
580        let batch = WatcherBatch::from_set(set);
581
582        // BTreeSet iterates in sorted order
583        assert_eq!(batch.paths[0], PathBuf::from("/alpha.rs"));
584        assert_eq!(batch.paths[1], PathBuf::from("/beta.rs"));
585        assert_eq!(batch.paths[2], PathBuf::from("/zebra.rs"));
586    }
587
588    #[test]
589    fn test_database_file_detection() {
590        assert!(is_database_file("test.db"));
591        assert!(is_database_file("test.sqlite"));
592        assert!(is_database_file("test.db-journal"));
593        assert!(is_database_file("test.DB")); // Case insensitive
594        assert!(is_database_file("test.SQLITE"));
595
596        assert!(!is_database_file("test.rs"));
597        assert!(!is_database_file("test.py"));
598        assert!(!is_database_file("database.rs")); // Extension matters
599    }
600
601    #[test]
602    fn test_batch_serialization() {
603        let batch = WatcherBatch {
604            paths: vec![PathBuf::from("/alpha.rs"), PathBuf::from("/beta.rs")],
605        };
606
607        let json = serde_json::to_string(&batch).unwrap();
608        let deserialized: WatcherBatch = serde_json::from_str(&json).unwrap();
609
610        assert_eq!(batch.paths, deserialized.paths);
611    }
612
613    #[test]
614    fn test_watcher_config_has_root() {
615        let config = WatcherConfig {
616            root_path: PathBuf::from("/test/root"),
617            debounce_ms: 100,
618            gitignore_aware: true,
619        };
620
621        assert_eq!(config.root_path, PathBuf::from("/test/root"));
622        assert_eq!(config.debounce_ms, 100);
623        assert!(config.gitignore_aware);
624    }
625
626    #[test]
627    fn test_watcher_config_default() {
628        let config = WatcherConfig::default();
629
630        assert_eq!(config.root_path, PathBuf::from("."));
631        assert_eq!(config.debounce_ms, 500);
632        assert!(config.gitignore_aware);
633    }
634
635    #[test]
636    fn test_extract_dirty_paths_filters_traversal() {
637        use std::fs;
638        use tempfile::TempDir;
639
640        let temp_dir = TempDir::new().unwrap();
641        let root = temp_dir.path();
642
643        // Create a valid file
644        let valid_file = root.join("valid.rs");
645        fs::write(&valid_file, b"fn valid() {}").unwrap();
646
647        // Test the validation logic directly
648        // since DebouncedEvent cannot be easily constructed in tests
649        let result = crate::validation::validate_path_within_root(&valid_file, root);
650        assert!(result.is_ok());
651
652        // Test that traversal is rejected
653        let outside = root.join("../../../etc/passwd");
654        let result_outside = crate::validation::validate_path_within_root(&outside, root);
655        assert!(result_outside.is_err());
656    }
657}