magellan/watcher/mod.rs
1//! Filesystem watcher with debounced batch events.
2//!
3//! Provides deterministic event coalescing: all events within a debounce window
4//! are collected, de-duplicated, sorted lexicographically, and emitted as a single
5//! batch. This ensures the same final DB state regardless of event arrival order.
6//!
7//! # Threading Design
8//!
9//! This watcher uses thread-safe synchronization for concurrent access.
10//! The legacy pending state fields use `Arc<Mutex<T>>` to allow safe access
11//! from multiple threads during concurrent operations and shutdown.
12//!
13//! **Thread safety:** `Arc<Mutex<T>>` provides runtime mutual exclusion
14//! and is safe to share across threads. The mutex will panic if poisoned
15//! (consistent with RefCell behavior).
16//!
17//! # Global Lock Ordering
18//!
19//! This module participates in the global lock ordering hierarchy:
20//!
21//! 1. **watcher state locks** (legacy_pending_batch, legacy_pending_index)—acquired first
22//! 2. **indexer shared state locks** (dirty_paths)—acquired second
23//! 3. **wakeup channel send** (highest priority)—acquired last
24//!
25//! **Rule:** Never send to wakeup channel while holding other locks.
26//!
27//! See `src/indexer.rs::PipelineSharedState` for full lock ordering documentation.
28//!
29//! See MANUAL.md for architecture details.
30
31use anyhow::Result;
32use notify::RecursiveMode;
33use notify_debouncer_mini::new_debouncer;
34use serde::{Deserialize, Serialize};
35use std::collections::BTreeSet;
36use std::mem::ManuallyDrop;
37use std::path::{Path, PathBuf};
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::sync::mpsc::{self, Receiver, Sender};
40use std::sync::{Arc, Mutex};
41use std::thread;
42use std::time::Duration;
43
44use crate::diagnostics::SkipReason;
45use crate::graph::filter::FileFilter;
46
47/// Deterministic batch of dirty file paths.
48///
49/// Contains ONLY paths (no timestamps, no event types) to ensure deterministic
50/// behavior. Paths are sorted lexicographically before emission.
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
52pub struct WatcherBatch {
53 /// Dirty file paths to reconcile, in lexicographic order
54 pub paths: Vec<PathBuf>,
55}
56
57impl WatcherBatch {
58 /// Create a new batch from a set of paths, sorting them deterministically.
59 fn from_set(paths: BTreeSet<PathBuf>) -> Self {
60 Self {
61 paths: paths.into_iter().collect(),
62 }
63 }
64
65 /// Empty batch for when no dirty paths exist after filtering.
66 pub fn empty() -> Self {
67 Self { paths: Vec::new() }
68 }
69
70 /// Whether this batch contains any paths.
71 pub fn is_empty(&self) -> bool {
72 self.paths.is_empty()
73 }
74}
75
76/// Filesystem watcher configuration
77#[derive(Debug, Clone)]
78pub struct WatcherConfig {
79 /// Root directory for path validation
80 pub root_path: PathBuf,
81 /// Debounce delay in milliseconds
82 pub debounce_ms: u64,
83 /// Enable .gitignore filtering (default: true)
84 pub gitignore_aware: bool,
85}
86
87impl Default for WatcherConfig {
88 fn default() -> Self {
89 Self {
90 root_path: PathBuf::from("."),
91 debounce_ms: 500,
92 gitignore_aware: true,
93 }
94 }
95}
96
97/// Filesystem watcher that emits debounced batches of dirty paths.
98///
99/// Uses notify-debouncer-mini for event coalescing. All paths within the
100/// debounce window are collected, de-duplicated, sorted, and emitted as a
101/// single WatcherBatch.
102///
103
104pub struct FileSystemWatcher {
105 /// Watcher thread handle (wrapped in ManuallyDrop for custom Drop/shutdown logic)
106 _watcher_thread: ManuallyDrop<thread::JoinHandle<()>>,
107 batch_receiver: Receiver<WatcherBatch>,
108 /// Legacy compatibility: pending batch to emit one path at a time
109 /// Thread-safe: wrapped in Arc<Mutex<T>> for concurrent access
110 legacy_pending_batch: Arc<Mutex<Option<WatcherBatch>>>,
111 /// Legacy compatibility: current index into pending batch
112 /// Thread-safe: wrapped in Arc<Mutex<T>> for concurrent access
113 legacy_pending_index: Arc<Mutex<usize>>,
114
115}
116
117impl FileSystemWatcher {
118 /// Create a new watcher for the given directory.
119 ///
120 /// # Arguments
121 /// * `path` - Directory to watch recursively (also used as root_path for validation)
122 /// * `config` - Watcher configuration
123 /// * `shutdown` - AtomicBool for graceful shutdown
124 ///
125 /// # Returns
126 /// A watcher that can be polled for batch events
127 pub fn new(path: PathBuf, config: WatcherConfig, shutdown: Arc<AtomicBool>) -> Result<Self> {
128 let (batch_tx, batch_rx) = mpsc::channel();
129
130 // Ensure root_path is set to the watched directory for validation
131 let config = WatcherConfig {
132 root_path: path.clone(),
133 ..config
134 };
135
136 let thread = thread::spawn(move || {
137 if let Err(e) = run_watcher(path, batch_tx, config, shutdown) {
138 eprintln!("Watcher error: {:?}", e);
139 }
140 });
141
142 Ok(Self {
143 _watcher_thread: ManuallyDrop::new(thread),
144 batch_receiver: batch_rx,
145 legacy_pending_batch: Arc::new(Mutex::new(None)),
146 legacy_pending_index: Arc::new(Mutex::new(0)),
147 })
148 }
149
150
151
152 /// Receive the next batch, blocking until available.
153 ///
154 /// # Returns
155 /// `None` if the watcher thread has terminated
156 pub fn recv_batch(&self) -> Option<WatcherBatch> {
157 self.batch_receiver.recv().ok()
158 }
159
160 /// Try to receive a batch without blocking.
161 ///
162 /// # Returns
163 /// - `Some(batch)` if a batch is available
164 /// - `None` if no batch is available or watcher terminated
165 pub fn try_recv_batch(&self) -> Option<WatcherBatch> {
166 self.batch_receiver.try_recv().ok()
167 }
168
169 /// Receive the next batch with a timeout.
170 ///
171 /// # Returns
172 /// - `Ok(Some(batch))` if a batch is available
173 /// - `Ok(None)` if the watcher thread has terminated
174 /// - `Err` if timeout elapsed
175 pub fn recv_batch_timeout(&self, timeout: Duration) -> Result<Option<WatcherBatch>, ()> {
176 match self.batch_receiver.recv_timeout(timeout) {
177 Ok(batch) => Ok(Some(batch)),
178 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(()),
179 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => Ok(None),
180 }
181 }
182
183 // ========================================================================
184 // LEGACY: Old single-event API for backward compatibility during migration
185 // ========================================================================
186
187 /// Legacy: Try to receive a single event without blocking (DEPRECATED).
188 ///
189 /// This method converts batch events to single events for backward
190 /// compatibility. Paths from each batch are returned one at a time
191 /// in sorted order.
192 ///
193 /// # Deprecated
194 /// Use `try_recv_batch()` instead for deterministic batch processing.
195 ///
196 /// # Errors
197 /// Returns an error if a mutex is poisoned (thread panicked while holding the lock).
198 pub fn try_recv_event(&self) -> Result<Option<FileEvent>> {
199 // First, check if we have a pending batch to continue from
200 {
201 let mut pending_batch = self.legacy_pending_batch.lock()
202 .map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
203 let mut pending_index = self.legacy_pending_index.lock()
204 .map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
205
206 if let Some(ref batch) = *pending_batch {
207 if *pending_index < batch.paths.len() {
208 let path = batch.paths[*pending_index].clone();
209 *pending_index += 1;
210
211 // Check if we've exhausted this batch
212 if *pending_index >= batch.paths.len() {
213 *pending_batch = None;
214 *pending_index = 0;
215 }
216
217 return Ok(Some(FileEvent {
218 path,
219 event_type: EventType::Modify,
220 }));
221 }
222 }
223 }
224
225 // No pending batch or batch exhausted, try to get a new batch
226 if let Ok(batch) = self.batch_receiver.try_recv() {
227 if batch.paths.is_empty() {
228 return Ok(None);
229 }
230
231 // If there are multiple paths, store the batch for next call
232 if batch.paths.len() > 1 {
233 let path = batch.paths[0].clone();
234 let mut pending_batch = self.legacy_pending_batch.lock()
235 .map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
236 let mut pending_index = self.legacy_pending_index.lock()
237 .map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
238 *pending_batch = Some(batch);
239 *pending_index = 1; // Next call will return index 1
240 drop(pending_batch);
241 drop(pending_index);
242 return Ok(Some(FileEvent {
243 path,
244 event_type: EventType::Modify,
245 }));
246 }
247
248 // Single path, return it directly
249 Ok(Some(FileEvent {
250 path: batch.paths[0].clone(),
251 event_type: EventType::Modify,
252 }))
253 } else {
254 Ok(None)
255 }
256 }
257
258 /// Legacy: Receive the next event, blocking until available (DEPRECATED).
259 ///
260 /// This method converts batch events to single events for backward
261 /// compatibility. Paths from each batch are returned one at a time
262 /// in sorted order.
263 ///
264 /// # Deprecated
265 /// Use `recv_batch()` instead for deterministic batch processing.
266 ///
267 /// # Errors
268 /// Returns an error if a mutex is poisoned (thread panicked while holding the lock).
269 pub fn recv_event(&self) -> Result<Option<FileEvent>> {
270 // First, check if we have a pending batch to continue from
271 {
272 let mut pending_batch = self.legacy_pending_batch.lock()
273 .map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
274 let mut pending_index = self.legacy_pending_index.lock()
275 .map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
276
277 if let Some(ref batch) = *pending_batch {
278 if *pending_index < batch.paths.len() {
279 let path = batch.paths[*pending_index].clone();
280 *pending_index += 1;
281
282 // Check if we've exhausted this batch
283 if *pending_index >= batch.paths.len() {
284 *pending_batch = None;
285 *pending_index = 0;
286 }
287
288 return Ok(Some(FileEvent {
289 path,
290 event_type: EventType::Modify,
291 }));
292 }
293 }
294 }
295
296 // No pending batch or batch exhausted, block for a new batch
297 if let Ok(batch) = self.batch_receiver.recv() {
298 if batch.paths.is_empty() {
299 return Ok(None);
300 }
301
302 // If there are multiple paths, store the batch for next call
303 if batch.paths.len() > 1 {
304 let path = batch.paths[0].clone();
305 let mut pending_batch = self.legacy_pending_batch.lock()
306 .map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
307 let mut pending_index = self.legacy_pending_index.lock()
308 .map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
309 *pending_batch = Some(batch);
310 *pending_index = 1; // Next call will return index 1
311 drop(pending_batch);
312 drop(pending_index);
313 return Ok(Some(FileEvent {
314 path,
315 event_type: EventType::Modify,
316 }));
317 }
318
319 // Single path, return it directly
320 Ok(Some(FileEvent {
321 path: batch.paths[0].clone(),
322 event_type: EventType::Modify,
323 }))
324 } else {
325 Ok(None)
326 }
327 }
328
329 /// Explicitly shut down the watcher and join all background threads.
330 ///
331 /// This method consumes the watcher, ensuring that:
332 /// 1. The pub/sub receiver is shut down cleanly (if present)
333 /// 2. The watcher thread is joined (waits for clean termination)
334 ///
335 /// # Note
336 ///
337 /// This method should be called during graceful shutdown to ensure
338 /// all threads have terminated before the program exits.
339 pub fn shutdown(mut self) {
340 // Take ownership of self (consume it)
341 // SAFETY: We're consuming self, so we can safely extract the JoinHandle
342 let thread = unsafe { ManuallyDrop::take(&mut self._watcher_thread) };
343 // Join the thread - this waits for the watcher to exit cleanly
344 let _ = thread.join();
345 // Note: pubsub_receiver is dropped here, triggering its Drop impl
346 }
347}
348
349impl Drop for FileSystemWatcher {
350 fn drop(&mut self) {
351 // SAFETY: Drop is running, we can safely extract the JoinHandle
352 // and drop it without running its destructor (thread should be shutting down)
353 let _thread = unsafe { ManuallyDrop::take(&mut self._watcher_thread) };
354 drop(_thread);
355 // Note: The watcher thread will exit when shutdown flag is set
356 }
357}
358
359/// Run the debounced watcher in a dedicated thread.
360///
361/// Uses notify-debouncer-mini for event coalescing. Batches are emitted
362/// after the debounce delay expires with all paths that changed during
363/// the window.
364fn run_watcher(
365 path: PathBuf,
366 tx: Sender<WatcherBatch>,
367 config: WatcherConfig,
368 shutdown: Arc<AtomicBool>,
369) -> Result<()> {
370 // Convert debounce_ms to Duration
371 let debounce_duration = Duration::from_millis(config.debounce_ms);
372
373 // Get the root path for validation
374 let root_path = config.root_path.clone();
375
376 // Create gitignore filter if enabled (created ONCE before debouncer)
377 // This avoids re-parsing .gitignore on every event
378 let filter = if config.gitignore_aware {
379 match FileFilter::new(&root_path, &[], &[]) {
380 Ok(f) => Some(f),
381 Err(e) => {
382 eprintln!("Warning: Failed to create gitignore filter: {}", e);
383 None
384 }
385 }
386 } else {
387 None
388 };
389
390 // Create debouncer with notify 8.x API
391 // The debouncer calls our closure on each batch of events
392 let mut debouncer = new_debouncer(
393 debounce_duration,
394 move |result: notify_debouncer_mini::DebounceEventResult| {
395 match result {
396 Ok(events) => {
397 // Collect all dirty paths from this batch
398 // Pass filter reference (moved into closure)
399 let dirty_paths = extract_dirty_paths(&events, &root_path, filter.as_ref());
400
401 if !dirty_paths.is_empty() {
402 let batch = WatcherBatch::from_set(dirty_paths);
403 let _ = tx.send(batch);
404 }
405 }
406 Err(error) => {
407 eprintln!("Watcher error: {:?}", error);
408 }
409 }
410 },
411 )?;
412
413 // Watch the directory recursively via the inner watcher
414 debouncer.watcher().watch(&path, RecursiveMode::Recursive)?;
415
416 // Keep the thread alive until shutdown is signaled
417 // The debouncer runs in the background and sends batches via callback
418 while !shutdown.load(Ordering::SeqCst) {
419 thread::sleep(Duration::from_secs(1));
420 }
421
422 Ok(())
423}
424
425/// Extract dirty paths from a batch of debouncer events.
426///
427/// Filtering rules:
428/// - Exclude directories (only process files)
429/// - Exclude database-related files (.db, .sqlite, etc.)
430/// - Apply gitignore filter if provided (skip ignored files)
431/// - Validate paths are within project root (security: prevent path traversal)
432/// - De-duplicate via BTreeSet
433///
434/// Returns: BTreeSet of dirty paths (sorted deterministically)
435fn extract_dirty_paths(
436 events: &[notify_debouncer_mini::DebouncedEvent],
437 root: &Path,
438 filter: Option<&FileFilter>,
439) -> BTreeSet<PathBuf> {
440 let mut dirty_paths = BTreeSet::new();
441
442 for event in events {
443 let path = &event.path;
444
445 // Skip directories
446 if path.is_dir() {
447 continue;
448 }
449
450 // Skip database-related files to avoid feedback loop
451 let path_str = path.to_string_lossy();
452 if is_database_file(&path_str) {
453 continue;
454 }
455
456 // Apply gitignore filter if enabled
457 // This checks .gitignore patterns and internal ignores (target/, node_modules/, etc.)
458 if let Some(f) = filter {
459 match f.should_skip(path) {
460 None => {} // Path is not skipped, continue processing
461 Some(SkipReason::NotAFile) => {
462 // File doesn't exist on disk - this could be a delete event
463 // or a temporary file. Still report it so the indexer can
464 // reconcile deletions.
465 }
466 Some(_) => {
467 // Path is ignored by gitignore or other reasons, skip it
468 continue;
469 }
470 }
471 }
472
473 // Validate path is within project root (security: prevent path traversal)
474 match crate::validation::validate_path_within_root(path, root) {
475 Ok(_) => {
476 // Path is safe, normalize before inserting
477 let normalized = crate::validation::normalize_path(path)
478 .unwrap_or_else(|_| path.to_string_lossy().to_string());
479 dirty_paths.insert(PathBuf::from(normalized));
480 }
481 Err(crate::validation::PathValidationError::OutsideRoot(p, _)) => {
482 // Log the rejection but don't crash
483 eprintln!("WARNING: Watcher rejected path outside project root: {}", p);
484 }
485 Err(crate::validation::PathValidationError::SuspiciousTraversal(p)) => {
486 // Log suspicious path patterns
487 eprintln!(
488 "WARNING: Watcher rejected suspicious traversal pattern: {}",
489 p
490 );
491 }
492 Err(crate::validation::PathValidationError::SymlinkEscape(from, to)) => {
493 eprintln!(
494 "WARNING: Watcher rejected symlink escaping root: {} -> {}",
495 from, to
496 );
497 }
498 Err(crate::validation::PathValidationError::CannotCanonicalize(_)) => {
499 // Path doesn't exist or can't be accessed
500 // This happens for deleted files - still report them so the indexer
501 // can reconcile the deletion
502 let normalized = crate::validation::normalize_path(path)
503 .unwrap_or_else(|_| path.to_string_lossy().to_string());
504 dirty_paths.insert(PathBuf::from(normalized));
505 }
506 }
507 }
508
509 dirty_paths
510}
511
512/// Check if a path is a database file that should be excluded from watching.
513///
514/// Database files are excluded because the indexer writes to them, which
515/// would create a feedback loop (write event -> indexer writes again -> ...).
516fn is_database_file(path: &str) -> bool {
517 let path_lower = path.to_lowercase();
518 path_lower.ends_with(".db")
519 || path_lower.ends_with(".db-journal")
520 || path_lower.ends_with(".db-wal")
521 || path_lower.ends_with(".db-shm")
522 || path_lower.ends_with(".sqlite")
523 || path_lower.ends_with(".sqlite3")
524}
525
526// ============================================================================
527// LEGACY: Old single-event types for backward compatibility during migration
528// ============================================================================
529
530/// Legacy: File event emitted by the watcher (DEPRECATED).
531///
532/// This type is kept for backward compatibility during the migration to
533/// batch-based processing. New code should use `WatcherBatch` instead.
534#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
535pub struct FileEvent {
536 /// Path of the affected file
537 pub path: PathBuf,
538 /// Type of event (DEPRECATED - not used in batch processing)
539 pub event_type: EventType,
540}
541
542/// Type of file event (DEPRECATED - not used in batch processing).
543#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
544pub enum EventType {
545 /// File was created
546 Create,
547 /// File was modified
548 Modify,
549 /// File was deleted
550 Delete,
551}
552
553impl std::fmt::Display for EventType {
554 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
555 match self {
556 EventType::Create => write!(f, "CREATE"),
557 EventType::Modify => write!(f, "MODIFY"),
558 EventType::Delete => write!(f, "DELETE"),
559 }
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566
567 #[test]
568 fn test_batch_is_empty() {
569 let batch = WatcherBatch::empty();
570 assert!(batch.is_empty());
571 }
572
573 #[test]
574 fn test_batch_from_set_sorts_deterministically() {
575 let mut set = BTreeSet::new();
576 set.insert(PathBuf::from("/zebra.rs"));
577 set.insert(PathBuf::from("/alpha.rs"));
578 set.insert(PathBuf::from("/beta.rs"));
579
580 let batch = WatcherBatch::from_set(set);
581
582 // BTreeSet iterates in sorted order
583 assert_eq!(batch.paths[0], PathBuf::from("/alpha.rs"));
584 assert_eq!(batch.paths[1], PathBuf::from("/beta.rs"));
585 assert_eq!(batch.paths[2], PathBuf::from("/zebra.rs"));
586 }
587
588 #[test]
589 fn test_database_file_detection() {
590 assert!(is_database_file("test.db"));
591 assert!(is_database_file("test.sqlite"));
592 assert!(is_database_file("test.db-journal"));
593 assert!(is_database_file("test.DB")); // Case insensitive
594 assert!(is_database_file("test.SQLITE"));
595
596 assert!(!is_database_file("test.rs"));
597 assert!(!is_database_file("test.py"));
598 assert!(!is_database_file("database.rs")); // Extension matters
599 }
600
601 #[test]
602 fn test_batch_serialization() {
603 let batch = WatcherBatch {
604 paths: vec![PathBuf::from("/alpha.rs"), PathBuf::from("/beta.rs")],
605 };
606
607 let json = serde_json::to_string(&batch).unwrap();
608 let deserialized: WatcherBatch = serde_json::from_str(&json).unwrap();
609
610 assert_eq!(batch.paths, deserialized.paths);
611 }
612
613 #[test]
614 fn test_watcher_config_has_root() {
615 let config = WatcherConfig {
616 root_path: PathBuf::from("/test/root"),
617 debounce_ms: 100,
618 gitignore_aware: true,
619 };
620
621 assert_eq!(config.root_path, PathBuf::from("/test/root"));
622 assert_eq!(config.debounce_ms, 100);
623 assert!(config.gitignore_aware);
624 }
625
626 #[test]
627 fn test_watcher_config_default() {
628 let config = WatcherConfig::default();
629
630 assert_eq!(config.root_path, PathBuf::from("."));
631 assert_eq!(config.debounce_ms, 500);
632 assert!(config.gitignore_aware);
633 }
634
635 #[test]
636 fn test_extract_dirty_paths_filters_traversal() {
637 use std::fs;
638 use tempfile::TempDir;
639
640 let temp_dir = TempDir::new().unwrap();
641 let root = temp_dir.path();
642
643 // Create a valid file
644 let valid_file = root.join("valid.rs");
645 fs::write(&valid_file, b"fn valid() {}").unwrap();
646
647 // Test the validation logic directly
648 // since DebouncedEvent cannot be easily constructed in tests
649 let result = crate::validation::validate_path_within_root(&valid_file, root);
650 assert!(result.is_ok());
651
652 // Test that traversal is rejected
653 let outside = root.join("../../../etc/passwd");
654 let result_outside = crate::validation::validate_path_within_root(&outside, root);
655 assert!(result_outside.is_err());
656 }
657}