pub mod async_watcher;
use anyhow::Result;
use notify::RecursiveMode;
use notify_debouncer_mini::new_debouncer;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use std::mem::ManuallyDrop;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use crate::diagnostics::SkipReason;
use crate::graph::filter::FileFilter;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WatcherBatch {
pub paths: Vec<PathBuf>,
}
impl WatcherBatch {
pub fn from_set(paths: BTreeSet<PathBuf>) -> Self {
Self {
paths: paths.into_iter().collect(),
}
}
pub fn empty() -> Self {
Self { paths: Vec::new() }
}
pub fn is_empty(&self) -> bool {
self.paths.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct WatcherConfig {
pub root_path: PathBuf,
pub debounce_ms: u64,
pub gitignore_aware: bool,
}
impl Default for WatcherConfig {
fn default() -> Self {
Self {
root_path: PathBuf::from("."),
debounce_ms: 500,
gitignore_aware: true,
}
}
}
pub struct FileSystemWatcher {
_watcher_thread: ManuallyDrop<thread::JoinHandle<()>>,
batch_receiver: Receiver<WatcherBatch>,
legacy_pending_batch: Arc<Mutex<Option<WatcherBatch>>>,
legacy_pending_index: Arc<Mutex<usize>>,
}
impl FileSystemWatcher {
pub fn new(path: PathBuf, config: WatcherConfig, shutdown: Arc<AtomicBool>) -> Result<Self> {
let (batch_tx, batch_rx) = mpsc::channel();
let config = WatcherConfig {
root_path: path.clone(),
..config
};
let thread = thread::spawn(move || {
if let Err(e) = run_watcher(path, batch_tx, config, shutdown) {
eprintln!("Watcher error: {:?}", e);
}
});
Ok(Self {
_watcher_thread: ManuallyDrop::new(thread),
batch_receiver: batch_rx,
legacy_pending_batch: Arc::new(Mutex::new(None)),
legacy_pending_index: Arc::new(Mutex::new(0)),
})
}
pub fn recv_batch(&self) -> Option<WatcherBatch> {
self.batch_receiver.recv().ok()
}
pub fn try_recv_batch(&self) -> Option<WatcherBatch> {
self.batch_receiver.try_recv().ok()
}
#[allow(
clippy::result_unit_err,
reason = "simple timeout/empty signaling; no error details needed"
)]
pub fn recv_batch_timeout(&self, timeout: Duration) -> Result<Option<WatcherBatch>, ()> {
match self.batch_receiver.recv_timeout(timeout) {
Ok(batch) => Ok(Some(batch)),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(()),
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => Ok(None),
}
}
pub fn try_recv_event(&self) -> Result<Option<FileEvent>> {
{
let mut pending_batch = self
.legacy_pending_batch
.lock()
.map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
let mut pending_index = self
.legacy_pending_index
.lock()
.map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
if let Some(ref batch) = *pending_batch {
if *pending_index < batch.paths.len() {
let path = batch.paths[*pending_index].clone();
*pending_index += 1;
if *pending_index >= batch.paths.len() {
*pending_batch = None;
*pending_index = 0;
}
return Ok(Some(FileEvent {
path,
event_type: EventType::Modify,
}));
}
}
}
if let Ok(batch) = self.batch_receiver.try_recv() {
if batch.paths.is_empty() {
return Ok(None);
}
if batch.paths.len() > 1 {
let path = batch.paths[0].clone();
let mut pending_batch = self
.legacy_pending_batch
.lock()
.map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
let mut pending_index = self
.legacy_pending_index
.lock()
.map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
*pending_batch = Some(batch);
*pending_index = 1; drop(pending_batch);
drop(pending_index);
return Ok(Some(FileEvent {
path,
event_type: EventType::Modify,
}));
}
Ok(Some(FileEvent {
path: batch.paths[0].clone(),
event_type: EventType::Modify,
}))
} else {
Ok(None)
}
}
pub fn recv_event(&self) -> Result<Option<FileEvent>> {
{
let mut pending_batch = self
.legacy_pending_batch
.lock()
.map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
let mut pending_index = self
.legacy_pending_index
.lock()
.map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
if let Some(ref batch) = *pending_batch {
if *pending_index < batch.paths.len() {
let path = batch.paths[*pending_index].clone();
*pending_index += 1;
if *pending_index >= batch.paths.len() {
*pending_batch = None;
*pending_index = 0;
}
return Ok(Some(FileEvent {
path,
event_type: EventType::Modify,
}));
}
}
}
if let Ok(batch) = self.batch_receiver.recv() {
if batch.paths.is_empty() {
return Ok(None);
}
if batch.paths.len() > 1 {
let path = batch.paths[0].clone();
let mut pending_batch = self
.legacy_pending_batch
.lock()
.map_err(|e| anyhow::anyhow!("legacy_pending_batch mutex poisoned: {}", e))?;
let mut pending_index = self
.legacy_pending_index
.lock()
.map_err(|e| anyhow::anyhow!("legacy_pending_index mutex poisoned: {}", e))?;
*pending_batch = Some(batch);
*pending_index = 1; drop(pending_batch);
drop(pending_index);
return Ok(Some(FileEvent {
path,
event_type: EventType::Modify,
}));
}
Ok(Some(FileEvent {
path: batch.paths[0].clone(),
event_type: EventType::Modify,
}))
} else {
Ok(None)
}
}
pub fn shutdown(mut self) {
let thread = unsafe { ManuallyDrop::take(&mut self._watcher_thread) };
let _ = thread.join();
}
}
impl Drop for FileSystemWatcher {
fn drop(&mut self) {
let _thread = unsafe { ManuallyDrop::take(&mut self._watcher_thread) };
drop(_thread);
}
}
fn run_watcher(
path: PathBuf,
tx: Sender<WatcherBatch>,
config: WatcherConfig,
shutdown: Arc<AtomicBool>,
) -> Result<()> {
let debounce_duration = Duration::from_millis(config.debounce_ms);
let root_path = config.root_path.clone();
let filter = if config.gitignore_aware {
match FileFilter::new(&root_path, &[], &[]) {
Ok(f) => Some(f),
Err(e) => {
eprintln!("Warning: Failed to create gitignore filter: {}", e);
None
}
}
} else {
None
};
let mut debouncer = new_debouncer(
debounce_duration,
move |result: notify_debouncer_mini::DebounceEventResult| {
match result {
Ok(events) => {
let dirty_paths = extract_dirty_paths(&events, &root_path, filter.as_ref());
if !dirty_paths.is_empty() {
let batch = WatcherBatch::from_set(dirty_paths);
let _ = tx.send(batch);
}
}
Err(error) => {
eprintln!("Watcher error: {:?}", error);
}
}
},
)?;
debouncer.watcher().watch(&path, RecursiveMode::Recursive)?;
while !shutdown.load(Ordering::SeqCst) {
thread::sleep(Duration::from_secs(1));
}
Ok(())
}
fn extract_dirty_paths(
events: &[notify_debouncer_mini::DebouncedEvent],
root: &Path,
filter: Option<&FileFilter>,
) -> BTreeSet<PathBuf> {
let mut dirty_paths = BTreeSet::new();
for event in events {
let path = &event.path;
if path.is_dir() {
continue;
}
let path_str = path.to_string_lossy();
if is_database_file(&path_str) {
continue;
}
if let Some(f) = filter {
match f.should_skip(path) {
None => {} Some(SkipReason::NotAFile) => {
}
Some(_) => {
continue;
}
}
}
match crate::validation::validate_path_within_root(path, root) {
Ok(_) => {
let normalized = crate::validation::normalize_path(path)
.unwrap_or_else(|_| path.to_string_lossy().to_string());
dirty_paths.insert(PathBuf::from(normalized));
}
Err(crate::validation::PathValidationError::OutsideRoot(p, _)) => {
eprintln!("WARNING: Watcher rejected path outside project root: {}", p);
}
Err(crate::validation::PathValidationError::SuspiciousTraversal(p)) => {
eprintln!(
"WARNING: Watcher rejected suspicious traversal pattern: {}",
p
);
}
Err(crate::validation::PathValidationError::SymlinkEscape(from, to)) => {
eprintln!(
"WARNING: Watcher rejected symlink escaping root: {} -> {}",
from, to
);
}
Err(crate::validation::PathValidationError::CannotCanonicalize(_)) => {
let normalized = crate::validation::normalize_path(path)
.unwrap_or_else(|_| path.to_string_lossy().to_string());
dirty_paths.insert(PathBuf::from(normalized));
}
}
}
dirty_paths
}
fn is_database_file(path: &str) -> bool {
let path_lower = path.to_lowercase();
path_lower.ends_with(".db")
|| path_lower.ends_with(".db-journal")
|| path_lower.ends_with(".db-wal")
|| path_lower.ends_with(".db-shm")
|| path_lower.ends_with(".sqlite")
|| path_lower.ends_with(".sqlite3")
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct FileEvent {
pub path: PathBuf,
pub event_type: EventType,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum EventType {
Create,
Modify,
Delete,
}
impl std::fmt::Display for EventType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EventType::Create => write!(f, "CREATE"),
EventType::Modify => write!(f, "MODIFY"),
EventType::Delete => write!(f, "DELETE"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_batch_is_empty() {
let batch = WatcherBatch::empty();
assert!(batch.is_empty());
}
#[test]
fn test_batch_from_set_sorts_deterministically() {
let mut set = BTreeSet::new();
set.insert(PathBuf::from("/zebra.rs"));
set.insert(PathBuf::from("/alpha.rs"));
set.insert(PathBuf::from("/beta.rs"));
let batch = WatcherBatch::from_set(set);
assert_eq!(batch.paths[0], PathBuf::from("/alpha.rs"));
assert_eq!(batch.paths[1], PathBuf::from("/beta.rs"));
assert_eq!(batch.paths[2], PathBuf::from("/zebra.rs"));
}
#[test]
fn test_database_file_detection() {
assert!(is_database_file("test.db"));
assert!(is_database_file("test.sqlite"));
assert!(is_database_file("test.db-journal"));
assert!(is_database_file("test.DB")); assert!(is_database_file("test.SQLITE"));
assert!(!is_database_file("test.rs"));
assert!(!is_database_file("test.py"));
assert!(!is_database_file("database.rs")); }
#[test]
fn test_batch_serialization() {
let batch = WatcherBatch {
paths: vec![PathBuf::from("/alpha.rs"), PathBuf::from("/beta.rs")],
};
let json = serde_json::to_string(&batch).unwrap();
let deserialized: WatcherBatch = serde_json::from_str(&json).unwrap();
assert_eq!(batch.paths, deserialized.paths);
}
#[test]
fn test_watcher_config_has_root() {
let config = WatcherConfig {
root_path: PathBuf::from("/test/root"),
debounce_ms: 100,
gitignore_aware: true,
};
assert_eq!(config.root_path, PathBuf::from("/test/root"));
assert_eq!(config.debounce_ms, 100);
assert!(config.gitignore_aware);
}
#[test]
fn test_watcher_config_default() {
let config = WatcherConfig::default();
assert_eq!(config.root_path, PathBuf::from("."));
assert_eq!(config.debounce_ms, 500);
assert!(config.gitignore_aware);
}
#[test]
fn test_extract_dirty_paths_filters_traversal() {
use std::fs;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let root = temp_dir.path();
let valid_file = root.join("valid.rs");
fs::write(&valid_file, b"fn valid() {}").unwrap();
let result = crate::validation::validate_path_within_root(&valid_file, root);
assert!(result.is_ok());
let outside = root.join("../../../etc/passwd");
let result_outside = crate::validation::validate_path_within_root(&outside, root);
assert!(result_outside.is_err());
}
}