zeph_core/
file_watcher.rs1use std::path::PathBuf;
5use std::time::Duration;
6
7use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
8use tokio::sync::mpsc;
9
10#[non_exhaustive]
11#[derive(Debug, thiserror::Error)]
12pub enum FileWatcherError {
13 #[error("no watch paths configured")]
14 NoWatchPaths,
15
16 #[error("filesystem watcher error: {0}")]
17 Notify(#[from] notify::Error),
18}
19
20#[derive(Debug, Clone)]
22pub struct FileChangedEvent {
23 pub path: PathBuf,
24}
25
26pub struct FileChangeWatcher {
34 handle: tokio::task::JoinHandle<()>,
35}
36
37impl std::fmt::Debug for FileChangeWatcher {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("FileChangeWatcher").finish_non_exhaustive()
40 }
41}
42
43impl Drop for FileChangeWatcher {
44 fn drop(&mut self) {
45 self.handle.abort();
46 }
47}
48
49impl FileChangeWatcher {
50 pub fn start(
59 watch_paths: &[PathBuf],
60 debounce_ms: u64,
61 tx: mpsc::Sender<FileChangedEvent>,
62 ) -> Result<Self, FileWatcherError> {
63 if watch_paths.is_empty() {
64 return Err(FileWatcherError::NoWatchPaths);
65 }
66
67 let (notify_tx, mut notify_rx) = mpsc::channel::<PathBuf>(64);
68
69 let mut debouncer = new_debouncer(
70 Duration::from_millis(debounce_ms),
71 move |events: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
72 let events = match events {
73 Ok(e) => e,
74 Err(e) => {
75 tracing::warn!("file watcher error: {e}");
76 return;
77 }
78 };
79 for event in events {
80 if event.kind == DebouncedEventKind::Any {
81 let _ = notify_tx.blocking_send(event.path);
82 }
83 }
84 },
85 )?;
86
87 for path in watch_paths {
88 if let Err(e) = debouncer
89 .watcher()
90 .watch(path, notify::RecursiveMode::Recursive)
91 {
92 tracing::warn!(path = %path.display(), error = %e, "file watcher: failed to watch path");
93 }
94 }
95
96 let handle = tokio::spawn(async move {
97 let _debouncer = debouncer;
98 while let Some(path) = notify_rx.recv().await {
99 if tx.send(FileChangedEvent { path }).await.is_err() {
100 break;
101 }
102 }
103 });
104
105 Ok(Self { handle })
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112
113 #[tokio::test]
114 async fn start_with_empty_paths_fails() {
115 let (tx, _rx) = mpsc::channel(16);
116 let result = FileChangeWatcher::start(&[], 500, tx);
117 assert!(result.is_err());
118 assert!(matches!(
119 result.unwrap_err(),
120 FileWatcherError::NoWatchPaths
121 ));
122 }
123
124 #[tokio::test]
125 async fn start_with_valid_dir() {
126 let dir = tempfile::tempdir().unwrap();
127 let (tx, _rx) = mpsc::channel(16);
128 let result = FileChangeWatcher::start(&[dir.path().to_path_buf()], 500, tx);
129 assert!(result.is_ok());
130 }
131
132 #[tokio::test]
133 async fn detects_file_change() {
134 let dir = tempfile::tempdir().unwrap();
135 let file_path = dir.path().join("test.txt");
136 std::fs::write(&file_path, "initial").unwrap();
137
138 let (tx, mut rx) = mpsc::channel(16);
139 let _watcher = FileChangeWatcher::start(&[dir.path().to_path_buf()], 500, tx).unwrap();
140
141 tokio::time::sleep(Duration::from_millis(100)).await;
143 std::fs::write(&file_path, "updated").unwrap();
144
145 let result = tokio::time::timeout(Duration::from_secs(3), rx.recv()).await;
146 assert!(result.is_ok(), "expected FileChangedEvent within timeout");
147 assert!(result.unwrap().is_some());
151 }
152}