1use crate::{Callback, ChangeToken, Registration, SingleChangeToken, State};
2use notify::{Config, RecommendedWatcher, RecursiveMode::NonRecursive, Watcher};
3use std::path::Path;
4use std::sync::mpsc::channel;
5use std::sync::Arc;
6use std::thread::{spawn, JoinHandle};
7
8pub struct FileChangeToken {
14 watcher: Option<RecommendedWatcher>,
15 handle: Option<JoinHandle<()>>,
16 inner: Arc<SingleChangeToken>,
17}
18
19impl FileChangeToken {
20 pub fn new<T: AsRef<Path>>(path: T) -> Self {
26 let file = path.as_ref().to_path_buf();
27 let path = file.clone();
28 let inner = Arc::new(SingleChangeToken::default());
29 let handler = inner.clone();
30 let (sender, receiver) = channel();
31 let mut watcher = RecommendedWatcher::new(sender, Config::default()).unwrap_or_else(|e| panic!("{}", e));
32 let handle = spawn(move || {
33 while let Ok(Ok(event)) = receiver.recv() {
34 let changed = event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove();
35
36 if changed || event.need_rescan() {
37 let mut paths = event.paths.iter();
38 let other = path.as_os_str();
39
40 if paths.any(|p| p.as_os_str().eq_ignore_ascii_case(other)) {
41 handler.notify();
42 break;
43 }
44 }
45 }
46 });
47
48 if let Some(folder) = file.parent() {
49 if folder.exists() {
50 watcher.watch(folder, NonRecursive).unwrap();
51 }
52 }
53
54 Self {
55 watcher: Some(watcher),
56 handle: Some(handle),
57 inner,
58 }
59 }
60}
61
62impl ChangeToken for FileChangeToken {
63 #[inline]
64 fn changed(&self) -> bool {
65 self.inner.changed()
66 }
67
68 #[inline]
69 fn register(&self, callback: Callback, state: State) -> Registration {
70 self.inner.register(callback, state)
71 }
72}
73
74impl Drop for FileChangeToken {
75 fn drop(&mut self) {
76 let _ = self.watcher.take();
77
78 if let Some(handle) = self.handle.take() {
79 handle.join().ok();
80 }
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87 use std::sync::{
88 atomic::{AtomicBool, Ordering::Relaxed},
89 Arc, Condvar, Mutex,
90 };
91 use std::time::{Duration, Instant};
92 use std::{fs::File, io::Write, thread::sleep};
93 use tempfile::{tempdir, NamedTempFile, TempPath};
94 use crate::assert_send_and_sync;
95
96 #[test]
97 fn file_change_token_should_send_and_sync() {
98 let dir = tempdir().expect("temp dir");
100 let file = NamedTempFile::new_in(&dir).expect("new file");
101 let token = FileChangeToken::new(file.path());
102
103 assert_send_and_sync(token);
107 }
108
109 #[test]
110 fn changed_should_be_false_when_source_file_is_unchanged() {
111 let dir = tempdir().expect("temp dir");
113 let mut file = NamedTempFile::new_in(&dir).expect("new file");
114
115 file.write_all("test".as_bytes()).unwrap();
116
117 let token = FileChangeToken::new(file.path());
118
119 let changed = token.changed();
121
122 assert!(!changed);
124 }
125
126 #[test]
127 fn changed_should_be_true_when_source_file_changes() {
128 let dir = tempdir().expect("temp dir");
130 let mut file = NamedTempFile::new_in(&dir).expect("new file");
131
132 file.write_all("original".as_bytes()).unwrap();
133
134 let path = file.into_temp_path();
135 let token = FileChangeToken::new(&path);
136 let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
137
138 file.write_all("updated".as_bytes()).unwrap();
139 sleep(Duration::from_millis(250));
140
141 let changed = token.changed();
143
144 assert!(changed);
146 }
147
148 #[test]
149 fn callback_should_be_invoked_when_source_file_changes() {
150 let dir = tempdir().expect("temp dir");
152 let mut file = NamedTempFile::new_in(&dir).expect("new file");
153
154 file.write_all("original".as_bytes()).unwrap();
155
156 let path = file.into_temp_path();
157 let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
158 let token = FileChangeToken::new(&path);
159 let _unused = token.register(
160 Box::new(|state| {
161 let data = state.unwrap();
162 let (fired, event, value) = data.downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>().unwrap();
163 value.store(true, Relaxed);
164 *fired.lock().unwrap() = true;
165 event.notify_one();
166 }),
167 Some(state.clone()),
168 );
169 let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
170
171 file.write_all("updated".as_bytes()).unwrap();
173
174 let time = Instant::now();
175 let quarter_second = Duration::from_millis(250);
176 let three_seconds = Duration::from_secs(3);
177 let (mutex, event, changed) = &*state;
178 let mut fired = mutex.lock().unwrap();
179
180 while !*fired && time.elapsed() < three_seconds {
181 fired = event.wait_timeout(fired, quarter_second).unwrap().0;
182 }
183
184 assert!(changed.load(Relaxed));
186 }
187
188 #[test]
189 fn callback_should_not_be_invoked_after_token_is_dropped() {
190 let dir = tempdir().expect("temp dir");
192 let mut file = NamedTempFile::new_in(&dir).expect("new file");
193
194 file.write_all("original".as_bytes()).unwrap();
195
196 let path = file.into_temp_path();
197 let changed = Arc::<AtomicBool>::default();
198 let token = FileChangeToken::new(&path);
199 let registration = token.register(
200 Box::new(|state| {
201 state
202 .unwrap()
203 .downcast_ref::<AtomicBool>()
204 .unwrap()
205 .store(true, Relaxed)
206 }),
207 Some(changed.clone()),
208 );
209
210 drop(registration);
212 drop(token);
213
214 let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
215
216 file.write_all("updated".as_bytes()).unwrap();
217 sleep(Duration::from_millis(250));
218
219 assert_eq!(changed.load(Relaxed), false);
221 }
222
223 #[test]
224 fn callback_should_be_invoked_when_source_file_is_created() {
225 let dir = tempdir().expect("temp dir");
227 let path = dir.path().join("new_file.txt");
228 let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
229 let token = FileChangeToken::new(&path);
230 let _unused = token.register(
231 Box::new(|state| {
232 let data = state.unwrap();
233 let (fired, event, value) = data.downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>().unwrap();
234 value.store(true, Relaxed);
235 *fired.lock().unwrap() = true;
236 event.notify_one();
237 }),
238 Some(state.clone()),
239 );
240 let mut file = NamedTempFile::from_parts(
241 File::create(&path).expect("valid path"),
242 TempPath::try_from_path(path).unwrap(),
243 );
244
245 file.write_all("updated".as_bytes()).unwrap();
247
248 let time = Instant::now();
249 let quarter_second = Duration::from_millis(250);
250 let three_seconds = Duration::from_secs(3);
251 let (mutex, event, changed) = &*state;
252 let mut fired = mutex.lock().unwrap();
253
254 while !*fired && time.elapsed() < three_seconds {
255 fired = event.wait_timeout(fired, quarter_second).unwrap().0;
256 }
257
258 assert!(changed.load(Relaxed));
260 }
261
262 #[test]
263 fn callback_should_be_invoked_when_source_file_is_removed() {
264 let dir = tempdir().expect("temp dir");
266 let mut file = NamedTempFile::new_in(&dir).expect("new file");
267
268 file.write_all("existing".as_bytes()).unwrap();
269
270 let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
271 let token = FileChangeToken::new(file.path());
272 let _unused = token.register(
273 Box::new(|state| {
274 let data = state.unwrap();
275 let (fired, event, value) = data.downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>().unwrap();
276 value.store(true, Relaxed);
277 *fired.lock().unwrap() = true;
278 event.notify_one();
279 }),
280 Some(state.clone()),
281 );
282
283 drop(file);
285
286 let time = Instant::now();
287 let quarter_second = Duration::from_millis(250);
288 let three_seconds = Duration::from_secs(3);
289 let (mutex, event, changed) = &*state;
290 let mut fired = mutex.lock().unwrap();
291
292 while !*fired && time.elapsed() < three_seconds {
293 fired = event.wait_timeout(fired, quarter_second).unwrap().0;
294 }
295
296 assert!(changed.load(Relaxed));
298 }
299}