Skip to main content

tokens/
file.rs

1use crate::{Callback, ChangeToken, Registration, SingleChangeToken, State};
2use notify::{Config, RecommendedWatcher, RecursiveMode::NonRecursive, Watcher};
3use std::path::Path;
4use std::sync::mpsc::channel;
5use std::sync::Arc;
6use std::thread::{spawn, JoinHandle};
7
8/// Represents a [`ChangeToken`](ChangeToken) for a file.
9///
10/// # Remarks
11///
12/// Registered notifications always occur on another thread.
13pub struct FileChangeToken {
14    watcher: Option<RecommendedWatcher>,
15    handle: Option<JoinHandle<()>>,
16    inner: Arc<SingleChangeToken>,
17}
18
19impl FileChangeToken {
20    /// Initializes a new file change token.
21    ///
22    /// # Arguments
23    ///
24    /// * `path` - The [path](std::path::Path) of the file to watch for changes
25    pub fn new<T: AsRef<Path>>(path: T) -> Self {
26        let file = path.as_ref().to_path_buf();
27        let path = file.clone();
28        let inner = Arc::new(SingleChangeToken::default());
29        let handler = inner.clone();
30        let (sender, receiver) = channel();
31        let mut watcher = RecommendedWatcher::new(sender, Config::default()).unwrap_or_else(|e| panic!("{}", e));
32        let handle = spawn(move || {
33            while let Ok(Ok(event)) = receiver.recv() {
34                let changed = event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove();
35
36                if changed || event.need_rescan() {
37                    let mut paths = event.paths.iter();
38                    let other = path.as_os_str();
39
40                    if paths.any(|p| p.as_os_str().eq_ignore_ascii_case(other)) {
41                        handler.notify();
42                        break;
43                    }
44                }
45            }
46        });
47
48        if let Some(folder) = file.parent() {
49            if folder.exists() {
50                watcher.watch(folder, NonRecursive).unwrap();
51            }
52        }
53
54        Self {
55            watcher: Some(watcher),
56            handle: Some(handle),
57            inner,
58        }
59    }
60}
61
62impl ChangeToken for FileChangeToken {
63    #[inline]
64    fn changed(&self) -> bool {
65        self.inner.changed()
66    }
67
68    #[inline]
69    fn register(&self, callback: Callback, state: State) -> Registration {
70        self.inner.register(callback, state)
71    }
72}
73
74impl Drop for FileChangeToken {
75    fn drop(&mut self) {
76        let _ = self.watcher.take();
77
78        if let Some(handle) = self.handle.take() {
79            handle.join().ok();
80        }
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87    use std::sync::{
88        atomic::{AtomicBool, Ordering::Relaxed},
89        Arc, Condvar, Mutex,
90    };
91    use std::time::{Duration, Instant};
92    use std::{fs::File, io::Write, thread::sleep};
93    use tempfile::{tempdir, NamedTempFile, TempPath};
94        use crate::assert_send_and_sync;
95
96    #[test]
97    fn file_change_token_should_send_and_sync() {
98        // arrange
99        let dir = tempdir().expect("temp dir");
100        let file = NamedTempFile::new_in(&dir).expect("new file");
101        let token = FileChangeToken::new(file.path());
102
103        // act
104
105        // assert
106        assert_send_and_sync(token);
107    }
108
109    #[test]
110    fn changed_should_be_false_when_source_file_is_unchanged() {
111        // arrange
112        let dir = tempdir().expect("temp dir");
113        let mut file = NamedTempFile::new_in(&dir).expect("new file");
114
115        file.write_all("test".as_bytes()).unwrap();
116
117        let token = FileChangeToken::new(file.path());
118
119        // act
120        let changed = token.changed();
121
122        // assert
123        assert!(!changed);
124    }
125
126    #[test]
127    fn changed_should_be_true_when_source_file_changes() {
128        // arrange
129        let dir = tempdir().expect("temp dir");
130        let mut file = NamedTempFile::new_in(&dir).expect("new file");
131
132        file.write_all("original".as_bytes()).unwrap();
133
134        let path = file.into_temp_path();
135        let token = FileChangeToken::new(&path);
136        let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
137
138        file.write_all("updated".as_bytes()).unwrap();
139        sleep(Duration::from_millis(250));
140
141        // act
142        let changed = token.changed();
143
144        // assert
145        assert!(changed);
146    }
147
148    #[test]
149    fn callback_should_be_invoked_when_source_file_changes() {
150        // arrange
151        let dir = tempdir().expect("temp dir");
152        let mut file = NamedTempFile::new_in(&dir).expect("new file");
153
154        file.write_all("original".as_bytes()).unwrap();
155
156        let path = file.into_temp_path();
157        let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
158        let token = FileChangeToken::new(&path);
159        let _unused = token.register(
160            Box::new(|state| {
161                let data = state.unwrap();
162                let (fired, event, value) = data.downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>().unwrap();
163                value.store(true, Relaxed);
164                *fired.lock().unwrap() = true;
165                event.notify_one();
166            }),
167            Some(state.clone()),
168        );
169        let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
170
171        // act
172        file.write_all("updated".as_bytes()).unwrap();
173
174        let time = Instant::now();
175        let quarter_second = Duration::from_millis(250);
176        let three_seconds = Duration::from_secs(3);
177        let (mutex, event, changed) = &*state;
178        let mut fired = mutex.lock().unwrap();
179
180        while !*fired && time.elapsed() < three_seconds {
181            fired = event.wait_timeout(fired, quarter_second).unwrap().0;
182        }
183
184        // assert
185        assert!(changed.load(Relaxed));
186    }
187
188    #[test]
189    fn callback_should_not_be_invoked_after_token_is_dropped() {
190        // arrange
191        let dir = tempdir().expect("temp dir");
192        let mut file = NamedTempFile::new_in(&dir).expect("new file");
193
194        file.write_all("original".as_bytes()).unwrap();
195
196        let path = file.into_temp_path();
197        let changed = Arc::<AtomicBool>::default();
198        let token = FileChangeToken::new(&path);
199        let registration = token.register(
200            Box::new(|state| {
201                state
202                    .unwrap()
203                    .downcast_ref::<AtomicBool>()
204                    .unwrap()
205                    .store(true, Relaxed)
206            }),
207            Some(changed.clone()),
208        );
209
210        // act
211        drop(registration);
212        drop(token);
213        
214        let mut file = NamedTempFile::from_parts(File::create(&path).expect("valid path"), path);
215        
216        file.write_all("updated".as_bytes()).unwrap();
217        sleep(Duration::from_millis(250));
218
219        // assert
220        assert_eq!(changed.load(Relaxed), false);
221    }
222
223    #[test]
224    fn callback_should_be_invoked_when_source_file_is_created() {
225        // arrange
226        let dir = tempdir().expect("temp dir");
227        let path = dir.path().join("new_file.txt");
228        let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
229        let token = FileChangeToken::new(&path);
230        let _unused = token.register(
231            Box::new(|state| {
232                let data = state.unwrap();
233                let (fired, event, value) = data.downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>().unwrap();
234                value.store(true, Relaxed);
235                *fired.lock().unwrap() = true;
236                event.notify_one();
237            }),
238            Some(state.clone()),
239        );
240        let mut file = NamedTempFile::from_parts(
241            File::create(&path).expect("valid path"),
242            TempPath::try_from_path(path).unwrap(),
243        );
244
245        // act
246        file.write_all("updated".as_bytes()).unwrap();
247
248        let time = Instant::now();
249        let quarter_second = Duration::from_millis(250);
250        let three_seconds = Duration::from_secs(3);
251        let (mutex, event, changed) = &*state;
252        let mut fired = mutex.lock().unwrap();
253
254        while !*fired && time.elapsed() < three_seconds {
255            fired = event.wait_timeout(fired, quarter_second).unwrap().0;
256        }
257
258        // assert
259        assert!(changed.load(Relaxed));
260    }
261
262    #[test]
263    fn callback_should_be_invoked_when_source_file_is_removed() {
264        // arrange
265        let dir = tempdir().expect("temp dir");
266        let mut file = NamedTempFile::new_in(&dir).expect("new file");
267
268        file.write_all("existing".as_bytes()).unwrap();
269
270        let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
271        let token = FileChangeToken::new(file.path());
272        let _unused = token.register(
273            Box::new(|state| {
274                let data = state.unwrap();
275                let (fired, event, value) = data.downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>().unwrap();
276                value.store(true, Relaxed);
277                *fired.lock().unwrap() = true;
278                event.notify_one();
279            }),
280            Some(state.clone()),
281        );
282
283        // act
284        drop(file);
285
286        let time = Instant::now();
287        let quarter_second = Duration::from_millis(250);
288        let three_seconds = Duration::from_secs(3);
289        let (mutex, event, changed) = &*state;
290        let mut fired = mutex.lock().unwrap();
291
292        while !*fired && time.elapsed() < three_seconds {
293            fired = event.wait_timeout(fired, quarter_second).unwrap().0;
294        }
295
296        // assert
297        assert!(changed.load(Relaxed));
298    }
299}