use std::path::PathBuf;
use std::sync::mpsc::{self, Sender};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use notify_debouncer_mini::notify::RecursiveMode;
use notify_debouncer_mini::{new_debouncer, DebouncedEventKind};
use crate::error::Result;
use crate::events::{FileChangeKind, VaultEvent};
use crate::Taskdn;
#[derive(Debug, Clone, Copy)]
pub struct WatchConfig {
pub debounce: Duration,
}
impl Default for WatchConfig {
fn default() -> Self {
Self {
debounce: Duration::from_millis(500),
}
}
}
impl WatchConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_debounce(mut self, debounce: Duration) -> Self {
self.debounce = debounce;
self
}
}
pub struct FileWatcher {
thread_handle: Option<JoinHandle<()>>,
stop_sender: Sender<()>,
watched_paths: Vec<PathBuf>,
}
impl FileWatcher {
pub fn stop(mut self) {
self.stop_internal();
}
#[must_use]
pub fn watched_paths(&self) -> &[PathBuf] {
&self.watched_paths
}
fn stop_internal(&mut self) {
let _ = self.stop_sender.send(());
if let Some(handle) = self.thread_handle.take() {
let _ = handle.join();
}
}
}
impl Drop for FileWatcher {
fn drop(&mut self) {
self.stop_internal();
}
}
impl Taskdn {
pub fn watch<F>(&self, callback: F) -> Result<FileWatcher>
where
F: Fn(VaultEvent) + Send + 'static,
{
self.watch_with_config(WatchConfig::default(), callback)
}
pub fn watch_with_config<F>(&self, config: WatchConfig, callback: F) -> Result<FileWatcher>
where
F: Fn(VaultEvent) + Send + 'static,
{
let watched_paths = self.watched_paths();
let (stop_tx, stop_rx) = mpsc::channel();
let tasks_dir = self.config.tasks_dir.clone();
let projects_dir = self.config.projects_dir.clone();
let areas_dir = self.config.areas_dir.clone();
let thread_handle = thread::spawn(move || {
let thread_config = crate::TaskdnConfig::new(
tasks_dir.clone(),
projects_dir.clone(),
areas_dir.clone(),
);
let Ok(taskdn) = Taskdn::new(thread_config) else {
return;
};
let (event_tx, event_rx) = mpsc::channel();
let Ok(mut debouncer) = new_debouncer(config.debounce, move |res| {
let _ = event_tx.send(res);
}) else {
return;
};
for path in &[tasks_dir, projects_dir, areas_dir] {
if debouncer
.watcher()
.watch(path, RecursiveMode::Recursive)
.is_err()
{
return;
}
}
loop {
if stop_rx.try_recv().is_ok() {
break;
}
match event_rx.recv_timeout(Duration::from_millis(100)) {
Ok(Ok(events)) => {
for event in events {
let kind = match event.kind {
DebouncedEventKind::Any | DebouncedEventKind::AnyContinuous => {
if event.path.exists() {
FileChangeKind::Modified
} else {
FileChangeKind::Deleted
}
}
_ => {
if event.path.exists() {
FileChangeKind::Modified
} else {
FileChangeKind::Deleted
}
}
};
if let Ok(Some(vault_event)) =
taskdn.process_file_change(&event.path, kind)
{
callback(vault_event);
}
}
}
Ok(Err(_errors)) => {
}
Err(mpsc::RecvTimeoutError::Timeout) => {
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
break;
}
}
}
});
Ok(FileWatcher {
thread_handle: Some(thread_handle),
stop_sender: stop_tx,
watched_paths,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::TaskdnConfig;
use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
fn setup_test_vault() -> (TempDir, Taskdn) {
let temp_dir = TempDir::new().unwrap();
let tasks_dir = temp_dir.path().join("tasks");
let projects_dir = temp_dir.path().join("projects");
let areas_dir = temp_dir.path().join("areas");
fs::create_dir_all(&tasks_dir).unwrap();
fs::create_dir_all(&projects_dir).unwrap();
fs::create_dir_all(&areas_dir).unwrap();
let config = TaskdnConfig::new(tasks_dir, projects_dir, areas_dir);
let taskdn = Taskdn::new(config).unwrap();
(temp_dir, taskdn)
}
#[test]
fn watch_config_default() {
let config = WatchConfig::default();
assert_eq!(config.debounce, Duration::from_millis(500));
}
#[test]
fn watch_config_builder() {
let config = WatchConfig::new().with_debounce(Duration::from_millis(200));
assert_eq!(config.debounce, Duration::from_millis(200));
}
#[test]
fn file_watcher_can_be_created_and_stopped() {
let (_temp, taskdn) = setup_test_vault();
let watcher = taskdn.watch(|_event| {}).unwrap();
assert_eq!(watcher.watched_paths().len(), 3);
watcher.stop();
}
#[test]
fn file_watcher_with_custom_config() {
let (_temp, taskdn) = setup_test_vault();
let config = WatchConfig::new().with_debounce(Duration::from_millis(100));
let watcher = taskdn.watch_with_config(config, |_event| {}).unwrap();
watcher.stop();
}
#[test]
#[ignore] fn file_watcher_detects_new_task() {
let (temp, taskdn) = setup_test_vault();
let event_count = Arc::new(AtomicUsize::new(0));
let event_count_clone = Arc::clone(&event_count);
let config = WatchConfig::new().with_debounce(Duration::from_millis(50));
let watcher = taskdn
.watch_with_config(config, move |_event| {
event_count_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
thread::sleep(Duration::from_millis(100));
let task_path = temp.path().join("tasks/new-task.md");
let content = r#"---
title: New Task
status: inbox
created-at: 2025-01-01
updated-at: 2025-01-01
---
Body
"#;
fs::write(&task_path, content).unwrap();
thread::sleep(Duration::from_millis(200));
watcher.stop();
assert!(
event_count.load(Ordering::SeqCst) >= 1,
"Expected at least 1 event, got {}",
event_count.load(Ordering::SeqCst)
);
}
}