tokens/
file.rs

1use crate::{Callback, ChangeToken, Registration, SingleChangeToken};
2use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
3use std::any::Any;
4use std::mem::ManuallyDrop;
5use std::path::Path;
6use std::sync::mpsc::channel;
7use std::sync::Arc;
8use std::thread::{self, JoinHandle};
9
10/// Represents a [`ChangeToken`](crate::ChangeToken) for a file.
11/// 
12/// # Remarks
13/// 
14/// Registered notifications always occur on another thread.
15pub struct FileChangeToken {
16    watcher: ManuallyDrop<RecommendedWatcher>,
17    handle: ManuallyDrop<JoinHandle<()>>,
18    inner: Arc<SingleChangeToken>,
19}
20
21impl FileChangeToken {
22    /// Initializes a new file change token.
23    ///
24    /// # Arguments
25    ///
26    /// * `path` - The [path](std::path::Path) of the file to watch for changes
27    pub fn new<T: AsRef<Path>>(path: T) -> Self {
28        let file = path.as_ref().to_path_buf();
29        let inner = Arc::new(SingleChangeToken::default());
30        let handler = inner.clone();
31        let (sender, receiver) = channel();
32        let mut watcher = RecommendedWatcher::new(sender, Config::default()).unwrap();
33
34        let handle = thread::spawn(move || {
35            if let Ok(Ok(event)) = receiver.recv() {
36                if event.kind.is_modify() {
37                    handler.notify()
38                }
39            }
40        });
41
42        watcher
43            .watch(file.as_ref(), RecursiveMode::NonRecursive)
44            .unwrap();
45
46        Self {
47            watcher: ManuallyDrop::new(watcher),
48            handle: ManuallyDrop::new(handle),
49            inner,
50        }
51    }
52}
53
54impl ChangeToken for FileChangeToken {
55    fn changed(&self) -> bool {
56        self.inner.changed()
57    }
58
59    fn register(&self, callback: Callback, state: Option<Arc<dyn Any>>) -> Registration {
60        self.inner.register(callback, state)
61    }
62}
63
64impl Drop for FileChangeToken {
65    fn drop(&mut self) {
66        // manual drop is necessary to control terminating
67        // the channel receiver. if we don't, then we will
68        // likely deadlock while waiting to join the
69        // receiver's background thread
70        let handle = unsafe {
71            let _ = ManuallyDrop::take(&mut self.watcher);
72            ManuallyDrop::take(&mut self.handle)
73        };
74        handle.join().ok();
75    }
76}
77
78#[cfg(test)]
79mod tests {
80
81    use super::*;
82    use std::env::temp_dir;
83    use std::fs::{remove_file, File};
84    use std::io::Write;
85    use std::sync::atomic::{AtomicBool, Ordering};
86    use std::sync::{Arc, Condvar, Mutex};
87    use std::time::Duration;
88
89    #[test]
90    fn changed_should_be_false_when_source_file_is_unchanged() {
91        // arrange
92        let path = temp_dir().join("test.1.txt");
93        let mut file = File::create(&path).unwrap();
94
95        file.write_all("test".as_bytes()).unwrap();
96
97        let token = FileChangeToken::new(&path);
98
99        // act
100        let changed = token.changed();
101
102        // assert
103        if path.exists() {
104            remove_file(&path).ok();
105        }
106
107        assert!(!changed);
108    }
109
110    #[test]
111    fn changed_should_be_true_when_source_file_changes() {
112        // arrange
113        let path = temp_dir().join("test.2.txt");
114        let mut file = File::create(&path).unwrap();
115
116        file.write_all("original".as_bytes()).unwrap();
117        drop(file);
118
119        let token = FileChangeToken::new(&path);
120        let mut file = File::create(&path).unwrap();
121
122        file.write_all("updated".as_bytes()).unwrap();
123        thread::sleep(Duration::from_millis(250));
124
125        // act
126        let changed = token.changed();
127
128        // assert
129        if path.exists() {
130            remove_file(&path).ok();
131        }
132
133        assert!(changed);
134    }
135
136    #[test]
137    fn callback_should_be_invoked_when_source_file_changes() {
138        // arrange
139        let path = temp_dir().join("test.3.txt");
140        let mut file = File::create(&path).unwrap();
141
142        file.write_all("original".as_bytes()).unwrap();
143        drop(file);
144
145        let state = Arc::new((Mutex::new(false), Condvar::new(), AtomicBool::default()));
146        let token = FileChangeToken::new(&path);
147        let _unused = token.register(
148            Box::new(|state| {
149                let data = state.unwrap();
150                let (fired, event, value) = data
151                    .downcast_ref::<(Mutex<bool>, Condvar, AtomicBool)>()
152                    .unwrap();
153                value.store(true, Ordering::SeqCst);
154                *fired.lock().unwrap() = true;
155                event.notify_one();
156            }),
157            Some(state.clone()),
158        );
159        let mut file = File::create(&path).unwrap();
160
161        // act
162        file.write_all("updated".as_bytes()).unwrap();
163        thread::sleep(Duration::from_millis(250));
164
165        let one_second = Duration::from_secs(1);
166        let (mutex, event, changed) = &*state;
167        let mut fired = mutex.lock().unwrap();
168
169        while !*fired {
170            fired = event.wait_timeout(fired, one_second).unwrap().0;
171        }
172
173        // assert
174        if path.exists() {
175            remove_file(&path).ok();
176        }
177
178        assert!(changed.load(Ordering::SeqCst));
179    }
180
181    #[test]
182    fn callback_should_not_be_invoked_after_token_is_dropped() {
183        // arrange
184        let path = temp_dir().join("test.4.txt");
185        let mut file = File::create(&path).unwrap();
186
187        file.write_all("original".as_bytes()).unwrap();
188        drop(file);
189
190        let changed = Arc::<AtomicBool>::default();
191        let token = FileChangeToken::new(&path);
192        let registration = token.register(
193            Box::new(|state| {
194                state
195                    .unwrap()
196                    .downcast_ref::<AtomicBool>()
197                    .unwrap()
198                    .store(true, Ordering::SeqCst)
199            }),
200            Some(changed.clone()),
201        );
202        let mut file = File::create(&path).unwrap();
203
204        // act
205        drop(registration);
206        drop(token);
207        file.write_all("updated".as_bytes()).unwrap();
208        thread::sleep(Duration::from_millis(250));
209
210        // assert
211        if path.exists() {
212            remove_file(&path).ok();
213        }
214
215        assert_eq!(changed.load(Ordering::SeqCst), false);
216    }
217}