1use crate::{Callback, ChangeToken, Registration, SingleChangeToken};
2use notify::{Config, RecommendedWatcher, RecursiveMode::NonRecursive, Watcher};
3use std::path::Path;
4use std::sync::mpsc::channel;
5use std::sync::Arc;
6use std::thread::{self, JoinHandle};
7use std::{any::Any, mem::ManuallyDrop};
8
9pub struct FileChangeToken {
15 watcher: ManuallyDrop<RecommendedWatcher>,
16 handle: ManuallyDrop<JoinHandle<()>>,
17 inner: Arc<SingleChangeToken>,
18}
19
20impl FileChangeToken {
21 pub fn new<T: AsRef<Path>>(path: T) -> Self {
27 let file = path.as_ref().to_path_buf();
28 let path = file.clone();
29 let inner = Arc::new(SingleChangeToken::default());
30 let handler = inner.clone();
31 let (sender, receiver) = channel();
32 let mut watcher = RecommendedWatcher::new(sender, Config::default()).unwrap();
33 let handle = thread::spawn(move || {
34 if let Ok(Ok(event)) = receiver.recv() {
35 let changed =
36 event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove();
37
38 if changed || event.need_rescan() {
39 let mut paths = event.paths.iter();
40 let other = path.as_os_str();
41
42 if paths.any(|p| p.as_os_str().eq_ignore_ascii_case(other)) {
43 handler.notify();
44 }
45 }
46 }
47 });
48
49 if let Some(folder) = file.parent() {
50 if folder.exists() {
51 watcher.watch(folder, NonRecursive).unwrap();
52 }
53 }
54
55 Self {
56 watcher: ManuallyDrop::new(watcher),
57 handle: ManuallyDrop::new(handle),
58 inner,
59 }
60 }
61}
62
63impl ChangeToken for FileChangeToken {
64 fn changed(&self) -> bool {
65 self.inner.changed()
66 }
67
68 fn register(&self, callback: Callback, state: Option<Arc<dyn Any>>) -> Registration {
69 self.inner.register(callback, state)
70 }
71}
72
73impl Drop for FileChangeToken {
74 fn drop(&mut self) {
75 let handle = unsafe {
80 let _ = ManuallyDrop::take(&mut self.watcher);
81 ManuallyDrop::take(&mut self.handle)
82 };
83 handle.join().ok();
84 }
85}
86
87#[cfg(test)]
88mod tests {
89
90 use super::*;
91 use std::sync::{
92 atomic::{AtomicBool, Ordering::Relaxed},
93 Arc, Condvar, Mutex,
94 };
95 use std::time::{Duration, Instant};
96 use std::{fs::File, io::Write};
97 use tempfile::{NamedTempFile, TempPath};
98
99 #[test]
100 fn changed_should_be_false_when_source_file_is_unchanged() {
101 let mut file = NamedTempFile::new().expect("new file");
103
104 file.write_all("test".as_bytes()).unwrap();
105
106 let token = FileChangeToken::new(file.path());
107
108 let changed = token.changed();
110
111 assert!(!changed);
113 }
114
115 #[test]
116 fn changed_should_be_true_when_source_file_changes() {
117 let mut file = NamedTempFile::new().expect("new file");
119
120 file.write_all("original".as_bytes()).unwrap();
121
122 let path = file.into_temp_path();
123 let token = FileChangeToken::new(&path);
124 let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
125
126 file.write_all("updated".as_bytes()).unwrap();
127 thread::sleep(Duration::from_millis(250));
128
129 let changed = token.changed();
131
132 assert!(changed);
134 }
135
136 #[test]
137 fn callback_should_be_invoked_when_source_file_changes() {
138 let mut file = NamedTempFile::new().expect("new file");
140
141 file.write_all("original".as_bytes()).unwrap();
142
143 let path = file.into_temp_path();
144 let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
145 let token = FileChangeToken::new(&path);
146 let _unused = token.register(
147 Box::new(|state| {
148 let data = state.unwrap();
149 let (fired, event, value) = data
150 .downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>()
151 .unwrap();
152 value.store(true, Relaxed);
153 *fired.lock().unwrap() = true;
154 event.notify_one();
155 }),
156 Some(state.clone()),
157 );
158 let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
159
160 file.write_all("updated".as_bytes()).unwrap();
162
163 let time = Instant::now();
164 let quarter_second = Duration::from_millis(250);
165 let three_seconds = Duration::from_secs(3);
166 let (mutex, event, changed) = &*state;
167 let mut fired = mutex.lock().unwrap();
168
169 while !*fired && time.elapsed() < three_seconds {
170 fired = event.wait_timeout(fired, quarter_second).unwrap().0;
171 }
172
173 assert!(changed.load(Relaxed));
175 }
176
177 #[test]
178 fn callback_should_not_be_invoked_after_token_is_dropped() {
179 let mut file = NamedTempFile::new().expect("new file");
181
182 file.write_all("original".as_bytes()).unwrap();
183
184 let path = file.into_temp_path();
185 let changed = Arc::<AtomicBool>::default();
186 let token = FileChangeToken::new(&path);
187 let registration = token.register(
188 Box::new(|state| {
189 state
190 .unwrap()
191 .downcast_ref::<AtomicBool>()
192 .unwrap()
193 .store(true, Relaxed)
194 }),
195 Some(changed.clone()),
196 );
197 let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
198
199 drop(registration);
201 drop(token);
202 file.write_all("updated".as_bytes()).unwrap();
203 thread::sleep(Duration::from_millis(250));
204
205 assert_eq!(changed.load(Relaxed), false);
207 }
208
209 #[test]
210 fn callback_should_be_invoked_when_source_file_is_created() {
211 let path = std::env::temp_dir().join("new_file.txt");
213 let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
214 let token = FileChangeToken::new(&path);
215 let _unused = token.register(
216 Box::new(|state| {
217 let data = state.unwrap();
218 let (fired, event, value) = data
219 .downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>()
220 .unwrap();
221 value.store(true, Relaxed);
222 *fired.lock().unwrap() = true;
223 event.notify_one();
224 }),
225 Some(state.clone()),
226 );
227 let mut file = NamedTempFile::from_parts(
228 File::create(&path).expect("valid path"),
229 TempPath::from_path(path),
230 );
231
232 file.write_all("updated".as_bytes()).unwrap();
234
235 let time = Instant::now();
236 let quarter_second = Duration::from_millis(250);
237 let three_seconds = Duration::from_secs(3);
238 let (mutex, event, changed) = &*state;
239 let mut fired = mutex.lock().unwrap();
240
241 while !*fired && time.elapsed() < three_seconds {
242 fired = event.wait_timeout(fired, quarter_second).unwrap().0;
243 }
244
245 assert!(changed.load(Relaxed));
247 }
248
249 #[test]
250 fn callback_should_be_invoked_when_source_file_is_removed() {
251 let mut file = NamedTempFile::new().expect("new file");
253
254 file.write_all("existing".as_bytes()).unwrap();
255
256 let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
257 let token = FileChangeToken::new(file.path());
258 let _unused = token.register(
259 Box::new(|state| {
260 let data = state.unwrap();
261 let (fired, event, value) = data
262 .downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>()
263 .unwrap();
264 value.store(true, Relaxed);
265 *fired.lock().unwrap() = true;
266 event.notify_one();
267 }),
268 Some(state.clone()),
269 );
270
271 drop(file);
273
274 let time = Instant::now();
275 let quarter_second = Duration::from_millis(250);
276 let three_seconds = Duration::from_secs(3);
277 let (mutex, event, changed) = &*state;
278 let mut fired = mutex.lock().unwrap();
279
280 while !*fired && time.elapsed() < three_seconds {
281 fired = event.wait_timeout(fired, quarter_second).unwrap().0;
282 }
283
284 assert!(changed.load(Relaxed));
286 }
287}