use glob::Pattern;
use notify::event::{AccessKind, AccessMode, CreateKind, ModifyKind, RemoveKind, RenameMode};
use notify::EventKind;
use notify::{RecursiveMode, Watcher};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task::{self};
use walkdir::WalkDir;
use super::{DocumentEvent, WatcherHandle};
use crate::backend::WatcherCommand;
use crate::{hash_str, WatcherError};
pub fn run_config_file_watcher<P: AsRef<Path>>(
watch_path: P,
file_pattern: impl Into<String>,
) -> Result<(WatcherHandle, tokio::sync::mpsc::Receiver<DocumentEvent>), WatcherError> {
let (event_sender, event_receiver) = mpsc::channel(100);
let (command_sender, mut command_receiver) = mpsc::channel(1);
let watch_path = watch_path.as_ref().to_path_buf();
let file_pattern = file_pattern.into();
let handle = tokio::spawn(async move {
match command_receiver.recv().await {
Some(WatcherCommand::Stop) | None => {
log::info!("Watcher received stop command before starting or channel closed");
return Ok(());
}
_ => {}
}
let mut file_hashes =
initial_file_search(&watch_path, &file_pattern, &event_sender).await?;
let (wh, mut rx) = AsyncWatcherHandler::new();
let mut watcher = notify::recommended_watcher(wh)?;
watcher.watch(&watch_path, RecursiveMode::Recursive)?;
let gp = Pattern::new(&file_pattern)?;
loop {
tokio::select! {
Some(res) = rx.recv() => {
handle_fs_event(res, &mut file_hashes, &event_sender, &watch_path, &gp).await?;
}
Some(command) = command_receiver.recv() => {
if let WatcherCommand::Stop = command {
log::info!("Watcher received stop command");
break;
}
}
}
}
log::debug!("Exiting ConfigFileWatcher loop");
Ok(())
});
Ok((
WatcherHandle {
command_sender,
handle: Some(handle),
},
event_receiver,
))
}
async fn find_matching_files<P: AsRef<Path>>(
watch_path: P,
file_pattern: &str,
) -> Result<Vec<PathBuf>, WatcherError> {
let watch_path = watch_path.as_ref().to_path_buf();
let gp = Pattern::new(file_pattern)?;
task::spawn_blocking(move || {
let mut matching_files = Vec::new();
for entry in WalkDir::new(&watch_path).into_iter().filter_map(|e| e.ok()) {
let path = entry.path();
if path.is_file() {
if let Ok(Some(file_name)) = path.strip_prefix(&watch_path).map(|f| f.to_str()) {
if gp.matches(file_name) {
matching_files.push(path.to_path_buf());
}
}
}
}
Ok(matching_files)
})
.await
.unwrap_or(Ok(vec![]))
}
async fn initial_file_search<P: AsRef<Path>>(
watch_path: P,
file_pattern: &str,
sender: &mpsc::Sender<DocumentEvent>,
) -> Result<HashMap<PathBuf, u64>, WatcherError> {
let files = find_matching_files(watch_path, file_pattern).await?;
let mut file_hashes = HashMap::new();
for file in files {
let content = read_file(&file).await?;
file_hashes.insert(file.clone(), hash_str(&content));
sender
.send(DocumentEvent::NewDocument(
file.to_string_lossy().into_owned(),
content,
))
.await
.map_err(|_| WatcherError::Notify(notify::Error::generic("Failed to send event")))?;
}
Ok(file_hashes)
}
async fn read_file(path: &Path) -> Result<String, WatcherError> {
let file = File::open(path)
.await
.map_err(|e| WatcherError::FileReadError(path.to_path_buf(), e))?;
let mut reader = BufReader::new(file);
let mut content = String::new();
reader
.read_to_string(&mut content)
.await
.map_err(|e| WatcherError::FileReadError(path.to_path_buf(), e))?;
Ok(content)
}
async fn handle_fs_event(
event: notify::Event,
file_hashes: &mut HashMap<PathBuf, u64>,
event_sender: &tokio::sync::mpsc::Sender<DocumentEvent>,
watch_path: &PathBuf,
gp: &Pattern,
) -> Result<(), WatcherError> {
if match_path(watch_path, gp, &event) {
match event.kind {
EventKind::Create(CreateKind::File)
| EventKind::Modify(ModifyKind::Data(_))
| EventKind::Access(AccessKind::Close(AccessMode::Write)) => {
if let Some(path) = event.paths.first() {
let content = read_file(path).await?;
let new_hash = hash_str(&content);
if let Some(existing_hash) = file_hashes.get(path) {
if existing_hash != &new_hash {
file_hashes.insert(path.to_path_buf(), new_hash);
event_sender
.send(DocumentEvent::ContentChanged(
path.to_string_lossy().into_owned(),
content,
))
.await
.unwrap();
}
} else {
file_hashes.insert(path.to_path_buf(), new_hash);
event_sender
.send(DocumentEvent::NewDocument(
path.to_string_lossy().into_owned(),
content,
))
.await
.unwrap();
}
}
}
EventKind::Remove(RemoveKind::File) => {
if let Some(path) = event.paths.first() {
if file_hashes.remove(path).is_some() {
event_sender
.send(DocumentEvent::DocumentRemoved(
path.to_string_lossy().into_owned(),
))
.await
.unwrap();
}
}
}
EventKind::Modify(ModifyKind::Name(mode)) => {
match mode {
RenameMode::To => {
if let Some(path) = event.paths.first() {
let content = read_file(path).await?;
let new_hash = hash_str(&content);
if let Some(existing_hash) = file_hashes.get(path) {
if existing_hash != &new_hash {
file_hashes.insert(path.to_path_buf(), new_hash);
event_sender
.send(DocumentEvent::ContentChanged(
path.to_string_lossy().into_owned(),
content,
))
.await
.unwrap();
}
} else {
file_hashes.insert(path.to_path_buf(), new_hash);
event_sender
.send(DocumentEvent::NewDocument(
path.to_string_lossy().into_owned(),
content,
))
.await
.unwrap();
}
}
}
RenameMode::From => {
if let Some(path) = event.paths.first() {
if file_hashes.remove(path).is_some() {
event_sender
.send(DocumentEvent::DocumentRemoved(
path.to_string_lossy().into_owned(),
))
.await
.unwrap();
}
}
}
RenameMode::Both => {
if let [from, to, ..] = &event.paths[..] {
if file_hashes.remove(from).is_some() {
event_sender
.send(DocumentEvent::DocumentRemoved(
from.to_string_lossy().into_owned(),
))
.await
.unwrap();
let content = read_file(from).await?;
let new_hash = hash_str(&content);
file_hashes.insert(to.to_path_buf(), new_hash);
event_sender
.send(DocumentEvent::NewDocument(
to.to_string_lossy().into_owned(),
content,
))
.await
.unwrap();
}
}
}
_ => {
}
}
}
_ => {
}
}
} else {
}
Ok(())
}
pub struct AsyncWatcherHandler {
tx: mpsc::Sender<notify::Event>,
runtime: Runtime,
}
impl AsyncWatcherHandler {
pub fn new() -> (Self, mpsc::Receiver<notify::Event>) {
let (tx, rx) = mpsc::channel(100);
let runtime = tokio::runtime::Runtime::new().unwrap();
(Self { tx, runtime }, rx)
}
}
impl notify::EventHandler for AsyncWatcherHandler {
fn handle_event(&mut self, event: notify::Result<notify::Event>) {
match event {
Ok(event) => self.runtime.block_on(async {
if !self.tx.is_closed() {
match self.tx.send(event).await {
Ok(_) => {}
Err(err) => {
log::error!("Error sending file watcher event: {}", err);
}
}
} else {
log::warn!(
"Debounce Channel closed before all events could be sent: {:?}",
event
);
}
}),
Err(err) => {
log::error!("Error watching files: {}", err);
}
}
}
}
fn match_path<P: AsRef<Path>>(watch_path: P, gp: &Pattern, event: ¬ify::Event) -> bool {
event.paths.iter().any(|path| {
if let Ok(removed_base) = path.strip_prefix(&watch_path) {
gp.matches(removed_base.to_str().unwrap_or_default())
} else {
false
}
})
}