1use crate::{Callback, ChangeToken, Registration, SingleChangeToken};
2use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
3use std::any::Any;
4use std::mem::ManuallyDrop;
5use std::path::Path;
6use std::sync::mpsc::channel;
7use std::sync::Arc;
8use std::thread::{self, JoinHandle};
9
10pub struct FileChangeToken {
16 watcher: ManuallyDrop<RecommendedWatcher>,
17 handle: ManuallyDrop<JoinHandle<()>>,
18 inner: Arc<SingleChangeToken>,
19}
20
21impl FileChangeToken {
22 pub fn new<T: AsRef<Path>>(path: T) -> Self {
28 let file = path.as_ref().to_path_buf();
29 let inner = Arc::new(SingleChangeToken::default());
30 let handler = inner.clone();
31 let (sender, receiver) = channel();
32 let mut watcher = RecommendedWatcher::new(sender, Config::default()).unwrap();
33
34 let handle = thread::spawn(move || {
35 if let Ok(Ok(event)) = receiver.recv() {
36 if event.kind.is_modify() {
37 handler.notify()
38 }
39 }
40 });
41
42 watcher
43 .watch(file.as_ref(), RecursiveMode::NonRecursive)
44 .unwrap();
45
46 Self {
47 watcher: ManuallyDrop::new(watcher),
48 handle: ManuallyDrop::new(handle),
49 inner,
50 }
51 }
52}
53
54impl ChangeToken for FileChangeToken {
55 fn changed(&self) -> bool {
56 self.inner.changed()
57 }
58
59 fn register(&self, callback: Callback, state: Option<Arc<dyn Any>>) -> Registration {
60 self.inner.register(callback, state)
61 }
62}
63
64impl Drop for FileChangeToken {
65 fn drop(&mut self) {
66 let handle = unsafe {
71 let _ = ManuallyDrop::take(&mut self.watcher);
72 ManuallyDrop::take(&mut self.handle)
73 };
74 handle.join().ok();
75 }
76}
77
78#[cfg(test)]
79mod tests {
80
81 use super::*;
82 use std::env::temp_dir;
83 use std::fs::{remove_file, File};
84 use std::io::Write;
85 use std::sync::atomic::{AtomicBool, Ordering};
86 use std::sync::{Arc, Condvar, Mutex};
87 use std::time::Duration;
88
89 #[test]
90 fn changed_should_be_false_when_source_file_is_unchanged() {
91 let path = temp_dir().join("test.1.txt");
93 let mut file = File::create(&path).unwrap();
94
95 file.write_all("test".as_bytes()).unwrap();
96
97 let token = FileChangeToken::new(&path);
98
99 let changed = token.changed();
101
102 if path.exists() {
104 remove_file(&path).ok();
105 }
106
107 assert!(!changed);
108 }
109
110 #[test]
111 fn changed_should_be_true_when_source_file_changes() {
112 let path = temp_dir().join("test.2.txt");
114 let mut file = File::create(&path).unwrap();
115
116 file.write_all("original".as_bytes()).unwrap();
117 drop(file);
118
119 let token = FileChangeToken::new(&path);
120 let mut file = File::create(&path).unwrap();
121
122 file.write_all("updated".as_bytes()).unwrap();
123 thread::sleep(Duration::from_millis(250));
124
125 let changed = token.changed();
127
128 if path.exists() {
130 remove_file(&path).ok();
131 }
132
133 assert!(changed);
134 }
135
136 #[test]
137 fn callback_should_be_invoked_when_source_file_changes() {
138 let path = temp_dir().join("test.3.txt");
140 let mut file = File::create(&path).unwrap();
141
142 file.write_all("original".as_bytes()).unwrap();
143 drop(file);
144
145 let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
146 let token = FileChangeToken::new(&path);
147 let _unused = token.register(
148 Box::new(|state| {
149 let data = state.unwrap();
150 let (fired, event, value) = data
151 .downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>()
152 .unwrap();
153 value.store(true, Ordering::SeqCst);
154 *fired.lock().unwrap() = true;
155 event.notify_one();
156 }),
157 Some(state.clone()),
158 );
159 let mut file = File::create(&path).unwrap();
160
161 file.write_all("updated".as_bytes()).unwrap();
163 thread::sleep(Duration::from_millis(250));
164
165 let one_second = Duration::from_secs(1);
166 let (mutex, event, changed) = &*state;
167 let mut fired = mutex.lock().unwrap();
168
169 while !*fired {
170 fired = event.wait_timeout(fired, one_second).unwrap().0;
171 }
172
173 if path.exists() {
175 remove_file(&path).ok();
176 }
177
178 assert!(changed.load(Ordering::SeqCst));
179 }
180
181 #[test]
182 fn callback_should_not_be_invoked_after_token_is_dropped() {
183 let path = temp_dir().join("test.4.txt");
185 let mut file = File::create(&path).unwrap();
186
187 file.write_all("original".as_bytes()).unwrap();
188 drop(file);
189
190 let changed = Arc::<AtomicBool>::default();
191 let token = FileChangeToken::new(&path);
192 let registration = token.register(
193 Box::new(|state| {
194 state
195 .unwrap()
196 .downcast_ref::<AtomicBool>()
197 .unwrap()
198 .store(true, Ordering::SeqCst)
199 }),
200 Some(changed.clone()),
201 );
202 let mut file = File::create(&path).unwrap();
203
204 drop(registration);
206 drop(token);
207 file.write_all("updated".as_bytes()).unwrap();
208 thread::sleep(Duration::from_millis(250));
209
210 if path.exists() {
212 remove_file(&path).ok();
213 }
214
215 assert_eq!(changed.load(Ordering::SeqCst), false);
216 }
217}