tokens/
file.rs

1use crate::{Callback, ChangeToken, Registration, SingleChangeToken};
2use notify::{Config, RecommendedWatcher, RecursiveMode::NonRecursive, Watcher};
3use std::path::Path;
4use std::sync::mpsc::channel;
5use std::sync::Arc;
6use std::thread::{self, JoinHandle};
7use std::{any::Any, mem::ManuallyDrop};
8
9/// Represents a [`ChangeToken`](crate::ChangeToken) for a file.
10///
11/// # Remarks
12///
13/// Registered notifications always occur on another thread.
14pub struct FileChangeToken {
15    watcher: ManuallyDrop<RecommendedWatcher>,
16    handle: ManuallyDrop<JoinHandle<()>>,
17    inner: Arc<SingleChangeToken>,
18}
19
20impl FileChangeToken {
21    /// Initializes a new file change token.
22    ///
23    /// # Arguments
24    ///
25    /// * `path` - The [path](std::path::Path) of the file to watch for changes
26    pub fn new<T: AsRef<Path>>(path: T) -> Self {
27        let file = path.as_ref().to_path_buf();
28        let path = file.clone();
29        let inner = Arc::new(SingleChangeToken::default());
30        let handler = inner.clone();
31        let (sender, receiver) = channel();
32        let mut watcher = RecommendedWatcher::new(sender, Config::default()).unwrap();
33        let handle = thread::spawn(move || {
34            if let Ok(Ok(event)) = receiver.recv() {
35                let changed =
36                    event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove();
37
38                if changed || event.need_rescan() {
39                    let mut paths = event.paths.iter();
40                    let other = path.as_os_str();
41
42                    if paths.any(|p| p.as_os_str().eq_ignore_ascii_case(other)) {
43                        handler.notify();
44                    }
45                }
46            }
47        });
48
49        if let Some(folder) = file.parent() {
50            if folder.exists() {
51                watcher.watch(folder, NonRecursive).unwrap();
52            }
53        }
54
55        Self {
56            watcher: ManuallyDrop::new(watcher),
57            handle: ManuallyDrop::new(handle),
58            inner,
59        }
60    }
61}
62
63impl ChangeToken for FileChangeToken {
64    fn changed(&self) -> bool {
65        self.inner.changed()
66    }
67
68    fn register(&self, callback: Callback, state: Option<Arc<dyn Any>>) -> Registration {
69        self.inner.register(callback, state)
70    }
71}
72
73impl Drop for FileChangeToken {
74    fn drop(&mut self) {
75        // manual drop is necessary to control terminating
76        // the channel receiver. if we don't, then we will
77        // likely deadlock while waiting to join the
78        // receiver's background thread
79        let handle = unsafe {
80            let _ = ManuallyDrop::take(&mut self.watcher);
81            ManuallyDrop::take(&mut self.handle)
82        };
83        handle.join().ok();
84    }
85}
86
87#[cfg(test)]
88mod tests {
89
90    use super::*;
91    use std::sync::{
92        atomic::{AtomicBool, Ordering::Relaxed},
93        Arc, Condvar, Mutex,
94    };
95    use std::time::{Duration, Instant};
96    use std::{fs::File, io::Write};
97    use tempfile::{NamedTempFile, TempPath};
98
99    #[test]
100    fn changed_should_be_false_when_source_file_is_unchanged() {
101        // arrange
102        let mut file = NamedTempFile::new().expect("new file");
103
104        file.write_all("test".as_bytes()).unwrap();
105
106        let token = FileChangeToken::new(file.path());
107
108        // act
109        let changed = token.changed();
110
111        // assert
112        assert!(!changed);
113    }
114
115    #[test]
116    fn changed_should_be_true_when_source_file_changes() {
117        // arrange
118        let mut file = NamedTempFile::new().expect("new file");
119
120        file.write_all("original".as_bytes()).unwrap();
121
122        let path = file.into_temp_path();
123        let token = FileChangeToken::new(&path);
124        let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
125
126        file.write_all("updated".as_bytes()).unwrap();
127        thread::sleep(Duration::from_millis(250));
128
129        // act
130        let changed = token.changed();
131
132        // assert
133        assert!(changed);
134    }
135
136    #[test]
137    fn callback_should_be_invoked_when_source_file_changes() {
138        // arrange
139        let mut file = NamedTempFile::new().expect("new file");
140
141        file.write_all("original".as_bytes()).unwrap();
142
143        let path = file.into_temp_path();
144        let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
145        let token = FileChangeToken::new(&path);
146        let _unused = token.register(
147            Box::new(|state| {
148                let data = state.unwrap();
149                let (fired, event, value) = data
150                    .downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>()
151                    .unwrap();
152                value.store(true, Relaxed);
153                *fired.lock().unwrap() = true;
154                event.notify_one();
155            }),
156            Some(state.clone()),
157        );
158        let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
159
160        // act
161        file.write_all("updated".as_bytes()).unwrap();
162
163        let time = Instant::now();
164        let quarter_second = Duration::from_millis(250);
165        let three_seconds = Duration::from_secs(3);
166        let (mutex, event, changed) = &*state;
167        let mut fired = mutex.lock().unwrap();
168
169        while !*fired && time.elapsed() < three_seconds {
170            fired = event.wait_timeout(fired, quarter_second).unwrap().0;
171        }
172
173        // assert
174        assert!(changed.load(Relaxed));
175    }
176
177    #[test]
178    fn callback_should_not_be_invoked_after_token_is_dropped() {
179        // arrange
180        let mut file = NamedTempFile::new().expect("new file");
181
182        file.write_all("original".as_bytes()).unwrap();
183
184        let path = file.into_temp_path();
185        let changed = Arc::<AtomicBool>::default();
186        let token = FileChangeToken::new(&path);
187        let registration = token.register(
188            Box::new(|state| {
189                state
190                    .unwrap()
191                    .downcast_ref::<AtomicBool>()
192                    .unwrap()
193                    .store(true, Relaxed)
194            }),
195            Some(changed.clone()),
196        );
197        let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
198
199        // act
200        drop(registration);
201        drop(token);
202        file.write_all("updated".as_bytes()).unwrap();
203        thread::sleep(Duration::from_millis(250));
204
205        // assert
206        assert_eq!(changed.load(Relaxed), false);
207    }
208
209    #[test]
210    fn callback_should_be_invoked_when_source_file_is_created() {
211        // arrange
212        let path = std::env::temp_dir().join("new_file.txt");
213        let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
214        let token = FileChangeToken::new(&path);
215        let _unused = token.register(
216            Box::new(|state| {
217                let data = state.unwrap();
218                let (fired, event, value) = data
219                    .downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>()
220                    .unwrap();
221                value.store(true, Relaxed);
222                *fired.lock().unwrap() = true;
223                event.notify_one();
224            }),
225            Some(state.clone()),
226        );
227        let mut file = NamedTempFile::from_parts(
228            File::create(&path).expect("valid path"),
229            TempPath::from_path(path),
230        );
231
232        // act
233        file.write_all("updated".as_bytes()).unwrap();
234
235        let time = Instant::now();
236        let quarter_second = Duration::from_millis(250);
237        let three_seconds = Duration::from_secs(3);
238        let (mutex, event, changed) = &*state;
239        let mut fired = mutex.lock().unwrap();
240
241        while !*fired && time.elapsed() < three_seconds {
242            fired = event.wait_timeout(fired, quarter_second).unwrap().0;
243        }
244
245        // assert
246        assert!(changed.load(Relaxed));
247    }
248
249    #[test]
250    fn callback_should_be_invoked_when_source_file_is_removed() {
251        // arrange
252        let mut file = NamedTempFile::new().expect("new file");
253
254        file.write_all("existing".as_bytes()).unwrap();
255
256        let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
257        let token = FileChangeToken::new(file.path());
258        let _unused = token.register(
259            Box::new(|state| {
260                let data = state.unwrap();
261                let (fired, event, value) = data
262                    .downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>()
263                    .unwrap();
264                value.store(true, Relaxed);
265                *fired.lock().unwrap() = true;
266                event.notify_one();
267            }),
268            Some(state.clone()),
269        );
270
271        // act
272        drop(file);
273
274        let time = Instant::now();
275        let quarter_second = Duration::from_millis(250);
276        let three_seconds = Duration::from_secs(3);
277        let (mutex, event, changed) = &*state;
278        let mut fired = mutex.lock().unwrap();
279
280        while !*fired && time.elapsed() < three_seconds {
281            fired = event.wait_timeout(fired, quarter_second).unwrap().0;
282        }
283
284        // assert
285        assert!(changed.load(Relaxed));
286    }
287}