zeph_core/
file_watcher.rs1use std::path::PathBuf;
5use std::time::Duration;
6
7use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
8use tokio::sync::mpsc;
9
10#[derive(Debug, thiserror::Error)]
11pub enum FileWatcherError {
12 #[error("no watch paths configured")]
13 NoWatchPaths,
14
15 #[error("filesystem watcher error: {0}")]
16 Notify(#[from] notify::Error),
17}
18
19#[derive(Debug, Clone)]
21pub struct FileChangedEvent {
22 pub path: PathBuf,
23}
24
25pub struct FileChangeWatcher {
33 handle: tokio::task::JoinHandle<()>,
34}
35
36impl std::fmt::Debug for FileChangeWatcher {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("FileChangeWatcher").finish_non_exhaustive()
39 }
40}
41
42impl Drop for FileChangeWatcher {
43 fn drop(&mut self) {
44 self.handle.abort();
45 }
46}
47
48impl FileChangeWatcher {
49 pub fn start(
58 watch_paths: &[PathBuf],
59 debounce_ms: u64,
60 tx: mpsc::Sender<FileChangedEvent>,
61 ) -> Result<Self, FileWatcherError> {
62 if watch_paths.is_empty() {
63 return Err(FileWatcherError::NoWatchPaths);
64 }
65
66 let (notify_tx, mut notify_rx) = mpsc::channel::<PathBuf>(64);
67
68 let mut debouncer = new_debouncer(
69 Duration::from_millis(debounce_ms),
70 move |events: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
71 let events = match events {
72 Ok(e) => e,
73 Err(e) => {
74 tracing::warn!("file watcher error: {e}");
75 return;
76 }
77 };
78 for event in events {
79 if event.kind == DebouncedEventKind::Any {
80 let _ = notify_tx.blocking_send(event.path);
81 }
82 }
83 },
84 )?;
85
86 for path in watch_paths {
87 if let Err(e) = debouncer
88 .watcher()
89 .watch(path, notify::RecursiveMode::Recursive)
90 {
91 tracing::warn!(path = %path.display(), error = %e, "file watcher: failed to watch path");
92 }
93 }
94
95 let handle = tokio::spawn(async move {
96 let _debouncer = debouncer;
97 while let Some(path) = notify_rx.recv().await {
98 if tx.send(FileChangedEvent { path }).await.is_err() {
99 break;
100 }
101 }
102 });
103
104 Ok(Self { handle })
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111
112 #[tokio::test]
113 async fn start_with_empty_paths_fails() {
114 let (tx, _rx) = mpsc::channel(16);
115 let result = FileChangeWatcher::start(&[], 500, tx);
116 assert!(result.is_err());
117 assert!(matches!(
118 result.unwrap_err(),
119 FileWatcherError::NoWatchPaths
120 ));
121 }
122
123 #[tokio::test]
124 async fn start_with_valid_dir() {
125 let dir = tempfile::tempdir().unwrap();
126 let (tx, _rx) = mpsc::channel(16);
127 let result = FileChangeWatcher::start(&[dir.path().to_path_buf()], 500, tx);
128 assert!(result.is_ok());
129 }
130
131 #[tokio::test]
132 async fn detects_file_change() {
133 let dir = tempfile::tempdir().unwrap();
134 let file_path = dir.path().join("test.txt");
135 std::fs::write(&file_path, "initial").unwrap();
136
137 let (tx, mut rx) = mpsc::channel(16);
138 let _watcher = FileChangeWatcher::start(&[dir.path().to_path_buf()], 500, tx).unwrap();
139
140 tokio::time::sleep(Duration::from_millis(100)).await;
142 std::fs::write(&file_path, "updated").unwrap();
143
144 let result = tokio::time::timeout(Duration::from_secs(3), rx.recv()).await;
145 assert!(result.is_ok(), "expected FileChangedEvent within timeout");
146 assert!(result.unwrap().is_some());
150 }
151}