use std::{
collections::HashMap,
hash::{DefaultHasher, Hash, Hasher},
path::{Path, PathBuf},
sync::Arc,
};
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use walkdir::WalkDir;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileBuffer {
pub content: String,
pub uri: String,
pub hash: u64,
pub path: PathBuf,
}
#[derive(Debug, Clone)]
pub enum FileWatchEvent {
Modified {
file: FileBuffer,
old_content: String,
},
Deleted { file: FileBuffer },
Created { file: FileBuffer },
Raw { event: Event },
}
pub trait FileFilter: Send + Sync {
fn should_watch(&self, path: &Path) -> bool;
}
pub struct ClosureFilter<F>
where
F: Fn(&Path) -> bool + Send + Sync,
{
filter_fn: F,
}
impl<F> ClosureFilter<F>
where
F: Fn(&Path) -> bool + Send + Sync,
{
pub fn new(filter_fn: F) -> Self {
Self { filter_fn }
}
}
impl<F> FileFilter for ClosureFilter<F>
where
F: Fn(&Path) -> bool + Send + Sync,
{
fn should_watch(&self, path: &Path) -> bool {
(self.filter_fn)(path)
}
}
pub struct FileWatcher {
watch_dir: PathBuf,
watched_files: HashMap<String, FileBuffer>,
filter: Arc<dyn FileFilter>,
watcher: Option<RecommendedWatcher>,
}
impl FileWatcher {
pub fn new<F>(watch_dir: PathBuf, filter: F) -> Self
where
F: FileFilter + 'static,
{
Self {
watch_dir,
watched_files: HashMap::new(),
filter: Arc::new(filter),
watcher: None,
}
}
pub fn initialize(&mut self) -> Result<(), String> {
self.watched_files = self.scan_directory()?;
Ok(())
}
pub async fn start_watching(&mut self) -> Result<mpsc::Receiver<FileWatchEvent>, String> {
let (processed_tx, processed_rx) = mpsc::channel(100);
let (raw_tx, mut raw_rx) = mpsc::unbounded_channel();
let watch_dir = self.watch_dir.clone();
let filter = Arc::clone(&self.filter);
let raw_tx_clone = raw_tx.clone();
let watcher: Result<RecommendedWatcher, notify::Error> = RecommendedWatcher::new(
move |result: Result<Event, notify::Error>| {
if let Ok(event) = result {
let should_process = event
.paths
.iter()
.any(|path| path.is_file() && filter.should_watch(path));
if should_process {
let _ = raw_tx_clone.send(event);
}
}
},
Config::default(),
);
let mut watcher = watcher.map_err(|e| format!("Failed to create watcher: {}", e))?;
watcher
.watch(&watch_dir, RecursiveMode::Recursive)
.map_err(|e| format!("Failed to watch directory: {}", e))?;
self.watcher = Some(watcher);
let watch_dir_clone = self.watch_dir.clone();
let filter_clone = Arc::clone(&self.filter);
let watched_files = self.watched_files.clone();
tokio::spawn(async move {
let mut internal_watcher = InternalEventProcessor {
watch_dir: watch_dir_clone,
watched_files,
filter: filter_clone,
processed_tx,
};
while let Some(raw_event) = raw_rx.recv().await {
if let Err(e) = internal_watcher.process_event(raw_event).await {
eprintln!("Error processing file watch event: {}", e);
}
}
});
Ok(processed_rx)
}
pub fn get_watched_files(&self) -> &HashMap<String, FileBuffer> {
&self.watched_files
}
pub fn watch_dir(&self) -> &Path {
&self.watch_dir
}
fn scan_directory(&self) -> Result<HashMap<String, FileBuffer>, String> {
let mut files = HashMap::new();
for entry in WalkDir::new(&self.watch_dir)
.into_iter()
.filter_map(Result::ok)
.filter(|entry| entry.path().is_file() && self.filter.should_watch(entry.path()))
{
let path = entry.path();
if let Ok(buffer) = self.create_file_buffer(path) {
files.insert(buffer.uri.clone(), buffer);
}
}
Ok(files)
}
fn create_file_buffer(&self, path: &Path) -> Result<FileBuffer, String> {
let content = std::fs::read_to_string(path)
.map_err(|e| format!("Failed to read file {}: {}", path.display(), e))?;
let hash = self.hash_content(&content);
let uri = self.path_to_uri(path);
let canonical_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
Ok(FileBuffer {
content,
uri,
hash,
path: canonical_path,
})
}
fn path_to_uri(&self, path: &Path) -> String {
let canonical_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
format!(
"file://{}",
canonical_path.to_string_lossy().replace('\\', "/")
)
}
fn hash_content(&self, content: &str) -> u64 {
let mut hasher = DefaultHasher::new();
content.hash(&mut hasher);
hasher.finish()
}
}
struct InternalEventProcessor {
#[allow(dead_code)]
watch_dir: PathBuf,
watched_files: HashMap<String, FileBuffer>,
filter: Arc<dyn FileFilter>,
processed_tx: mpsc::Sender<FileWatchEvent>,
}
impl InternalEventProcessor {
async fn process_event(&mut self, event: Event) -> Result<(), String> {
let mut events_to_send = Vec::new();
self.process_deletions(&mut events_to_send);
self.process_modifications(&event, &mut events_to_send)?;
for event in events_to_send {
if self.processed_tx.send(event).await.is_err() {
return Err("Event channel closed".to_string());
}
}
Ok(())
}
fn process_deletions(&mut self, events: &mut Vec<FileWatchEvent>) {
let mut to_remove = Vec::new();
for (uri, buffer) in &self.watched_files {
if !buffer.path.exists() {
events.push(FileWatchEvent::Deleted {
file: buffer.clone(),
});
to_remove.push(uri.clone());
}
}
for uri in to_remove {
self.watched_files.remove(&uri);
}
}
fn process_modifications(
&mut self,
event: &Event,
events: &mut Vec<FileWatchEvent>,
) -> Result<(), String> {
for path in &event.paths {
if !path.is_file() || !self.filter.should_watch(path) {
continue;
}
let uri = self.path_to_uri(path);
match self.create_file_buffer(path) {
Ok(new_buffer) => {
if let Some(old_buffer) = self.watched_files.get(&uri) {
if old_buffer.hash != new_buffer.hash {
events.push(FileWatchEvent::Modified {
file: new_buffer.clone(),
old_content: old_buffer.content.clone(),
});
self.watched_files.insert(uri, new_buffer);
}
} else {
events.push(FileWatchEvent::Created {
file: new_buffer.clone(),
});
self.watched_files.insert(uri, new_buffer);
}
}
Err(_) => {
if let Some(old_buffer) = self.watched_files.remove(&uri) {
events.push(FileWatchEvent::Deleted { file: old_buffer });
}
}
}
}
Ok(())
}
fn create_file_buffer(&self, path: &Path) -> Result<FileBuffer, String> {
let content = std::fs::read_to_string(path)
.map_err(|e| format!("Failed to read file {}: {}", path.display(), e))?;
let hash = self.hash_content(&content);
let uri = self.path_to_uri(path);
let canonical_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
Ok(FileBuffer {
content,
uri,
hash,
path: canonical_path,
})
}
fn path_to_uri(&self, path: &Path) -> String {
let canonical_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
format!(
"file://{}",
canonical_path.to_string_lossy().replace('\\', "/")
)
}
fn hash_content(&self, content: &str) -> u64 {
let mut hasher = DefaultHasher::new();
content.hash(&mut hasher);
hasher.finish()
}
}
pub fn create_file_watcher<F>(watch_dir: PathBuf, filter: F) -> Result<FileWatcher, String>
where
F: Fn(&Path) -> bool + Send + Sync + 'static,
{
let filter = ClosureFilter::new(filter);
let watcher = FileWatcher::new(watch_dir, filter);
Ok(watcher)
}
pub async fn create_and_start_watcher<F>(
watch_dir: PathBuf,
filter: F,
) -> Result<(FileWatcher, mpsc::Receiver<FileWatchEvent>), String>
where
F: Fn(&Path) -> bool + Send + Sync + 'static,
{
let mut watcher = create_file_watcher(watch_dir, filter)?;
watcher.initialize()?;
let receiver = watcher.start_watching().await?;
Ok((watcher, receiver))
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::Duration;
fn create_test_directory() -> TempDir {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let temp_path = temp_dir.path();
fs::write(temp_path.join("test1.txt"), "content1").expect("Failed to write test1.txt");
fs::write(temp_path.join("test2.rs"), "fn main() {}").expect("Failed to write test2.rs");
fs::write(temp_path.join("ignore.log"), "log content").expect("Failed to write ignore.log");
let sub_dir = temp_path.join("subdir");
fs::create_dir(&sub_dir).expect("Failed to create subdirectory");
fs::write(sub_dir.join("nested.txt"), "nested content")
.expect("Failed to write nested.txt");
temp_dir
}
fn test_filter(path: &Path) -> bool {
if let Some(ext) = path.extension() {
matches!(ext.to_str(), Some("txt") | Some("rs"))
} else {
false
}
}
#[test]
fn test_file_filter_trait() {
let filter = ClosureFilter::new(test_filter);
assert!(filter.should_watch(Path::new("test.txt")));
assert!(filter.should_watch(Path::new("test.rs")));
assert!(!filter.should_watch(Path::new("test.log")));
assert!(!filter.should_watch(Path::new("test")));
}
#[test]
fn test_file_watcher_creation() {
let temp_dir = create_test_directory();
let filter = ClosureFilter::new(test_filter);
let watcher = FileWatcher::new(temp_dir.path().to_path_buf(), filter);
assert_eq!(watcher.watch_dir(), temp_dir.path());
assert_eq!(watcher.get_watched_files().len(), 0); }
#[test]
fn test_file_watcher_initialization() {
let temp_dir = create_test_directory();
let filter = ClosureFilter::new(test_filter);
let mut watcher = FileWatcher::new(temp_dir.path().to_path_buf(), filter);
watcher.initialize().expect("Failed to initialize watcher");
let watched_files = watcher.get_watched_files();
assert_eq!(watched_files.len(), 3);
let file_names: Vec<_> = watched_files
.values()
.map(|f| f.path.file_name().unwrap().to_str().unwrap())
.collect();
assert!(file_names.contains(&"test1.txt"));
assert!(file_names.contains(&"test2.rs"));
assert!(file_names.contains(&"nested.txt"));
}
#[tokio::test]
async fn test_create_and_start_watcher() {
let temp_dir = create_test_directory();
let (watcher, _rx) = create_and_start_watcher(temp_dir.path().to_path_buf(), test_filter)
.await
.expect("Failed to create and start watcher");
assert_eq!(watcher.get_watched_files().len(), 3);
assert_eq!(watcher.watch_dir(), temp_dir.path());
}
#[tokio::test]
async fn test_real_file_creation_detection() {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let (_watcher, mut event_rx) =
create_and_start_watcher(temp_dir.path().to_path_buf(), test_filter)
.await
.expect("Failed to create and start watcher");
tokio::time::sleep(Duration::from_millis(200)).await;
let new_file = temp_dir.path().join("new_test.txt");
fs::write(&new_file, "new file content").expect("Failed to create new file");
let new_file_canonical = new_file
.canonicalize()
.expect("Failed to canonicalize path");
let mut creation_detected = false;
let timeout = tokio::time::Instant::now() + Duration::from_secs(2);
while tokio::time::Instant::now() < timeout && !creation_detected {
tokio::select! {
Some(event) = event_rx.recv() => {
if let FileWatchEvent::Created { file } = event
&& file.path == new_file_canonical {
assert_eq!(file.content, "new file content");
creation_detected = true;
break;
}
}
_ = tokio::time::sleep(Duration::from_millis(50)) => {
}
}
}
assert!(creation_detected, "File creation was not detected");
}
#[tokio::test]
async fn test_real_file_modification_detection() {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let test_file = temp_dir.path().join("modify_test.txt");
fs::write(&test_file, "initial content").expect("Failed to create initial file");
let test_file_canonical = test_file
.canonicalize()
.expect("Failed to canonicalize path");
let (_watcher, mut event_rx) =
create_and_start_watcher(temp_dir.path().to_path_buf(), test_filter)
.await
.expect("Failed to create and start watcher");
tokio::time::sleep(Duration::from_millis(200)).await;
fs::write(&test_file, "modified content").expect("Failed to modify file");
let mut modification_detected = false;
let timeout = tokio::time::Instant::now() + Duration::from_secs(2);
while tokio::time::Instant::now() < timeout && !modification_detected {
tokio::select! {
Some(event) = event_rx.recv() => {
if let FileWatchEvent::Modified { file, old_content } = event
&& file.path == test_file_canonical {
assert_eq!(file.content, "modified content");
assert_eq!(old_content, "initial content");
modification_detected = true;
break;
}
}
_ = tokio::time::sleep(Duration::from_millis(50)) => {
}
}
}
assert!(modification_detected, "File modification was not detected");
}
#[tokio::test]
async fn test_file_filter_in_real_watching() {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let (_watcher, mut event_rx) = create_and_start_watcher(
temp_dir.path().to_path_buf(),
test_filter, )
.await
.expect("Failed to create and start watcher");
tokio::time::sleep(Duration::from_millis(200)).await;
let txt_file = temp_dir.path().join("watched.txt");
let log_file = temp_dir.path().join("ignored.log");
fs::write(&txt_file, "should be watched").expect("Failed to create txt file");
fs::write(&log_file, "should be ignored").expect("Failed to create log file");
let txt_file_canonical = txt_file
.canonicalize()
.expect("Failed to canonicalize txt file path");
let log_file_canonical = log_file
.canonicalize()
.expect("Failed to canonicalize log file path");
let mut txt_detected = false;
let mut log_detected = false;
let timeout = tokio::time::Instant::now() + Duration::from_secs(2);
while tokio::time::Instant::now() < timeout && !txt_detected {
tokio::select! {
Some(event) = event_rx.recv() => {
if let FileWatchEvent::Created { file } = event {
if file.path == txt_file_canonical {
txt_detected = true;
} else if file.path == log_file_canonical {
log_detected = true;
}
}
}
_ = tokio::time::sleep(Duration::from_millis(50)) => {
}
}
}
assert!(txt_detected, "TXT file creation should be detected");
assert!(!log_detected, "LOG file creation should be filtered out");
}
}