notify_forked/poll.rs
1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use self::walkdir::WalkDir;
7use super::debounce::{Debounce, EventTx};
8use super::{op, DebouncedEvent, Error, RawEvent, RecursiveMode, Result, Watcher};
9use filetime::FileTime;
10use std::collections::HashMap;
11use std::fs;
12use std::path::{Path, PathBuf};
13use std::sync::mpsc::Sender;
14use std::sync::{Arc, Mutex, RwLock};
15use std::thread;
16use std::time::{Duration, Instant};
17
18extern crate walkdir;
19
20struct PathData {
21 mtime: i64,
22 last_check: Instant,
23}
24
25struct WatchData {
26 is_recursive: bool,
27 paths: HashMap<PathBuf, PathData>,
28}
29
30/// Polling based `Watcher` implementation
31pub struct PollWatcher {
32 event_tx: EventTx,
33 watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
34 open: Arc<RwLock<bool>>,
35}
36
37impl PollWatcher {
38 /// Create a PollWatcher which polls every `delay` milliseconds
39 pub fn with_delay_ms(tx: Sender<RawEvent>, delay: u32) -> Result<PollWatcher> {
40 let mut p = PollWatcher {
41 event_tx: EventTx::Raw { tx: tx.clone() },
42 watches: Arc::new(Mutex::new(HashMap::new())),
43 open: Arc::new(RwLock::new(true)),
44 };
45 let event_tx = EventTx::Raw { tx: tx };
46 p.run(Duration::from_millis(delay as u64), event_tx);
47 Ok(p)
48 }
49
50 fn run(&mut self, delay: Duration, mut event_tx: EventTx) {
51 let watches = self.watches.clone();
52 let open = self.open.clone();
53
54 thread::spawn(move || {
55 // In order of priority:
56 // TODO: handle chmod events
57 // TODO: handle renames
58 // TODO: DRY it up
59
60 loop {
61 if !(*open.read().unwrap()) {
62 break;
63 }
64
65 if let Ok(mut watches) = watches.lock() {
66 let current_time = Instant::now();
67
68 for (
69 watch,
70 &mut WatchData {
71 is_recursive,
72 ref mut paths,
73 },
74 ) in watches.iter_mut()
75 {
76 match fs::metadata(watch) {
77 Err(e) => {
78 event_tx.send(RawEvent {
79 path: Some(watch.clone()),
80 op: Err(Error::Io(e)),
81 cookie: None,
82 });
83 continue;
84 }
85 Ok(metadata) => {
86 if !metadata.is_dir() {
87 let mtime =
88 FileTime::from_last_modification_time(&metadata).seconds();
89 match paths.insert(
90 watch.clone(),
91 PathData {
92 mtime: mtime,
93 last_check: current_time,
94 },
95 ) {
96 None => {
97 unreachable!();
98 }
99 Some(PathData {
100 mtime: old_mtime, ..
101 }) => {
102 if mtime > old_mtime {
103 event_tx.send(RawEvent {
104 path: Some(watch.clone()),
105 op: Ok(op::Op::WRITE),
106 cookie: None,
107 });
108 }
109 }
110 }
111 } else {
112 let depth = if is_recursive { usize::max_value() } else { 1 };
113 for entry in WalkDir::new(watch)
114 .follow_links(true)
115 .max_depth(depth)
116 .into_iter()
117 .filter_map(|e| e.ok())
118 {
119 let path = entry.path();
120
121 match entry.metadata() {
122 Err(e) => {
123 event_tx.send(RawEvent {
124 path: Some(path.to_path_buf()),
125 op: Err(Error::Io(e.into())),
126 cookie: None,
127 });
128 }
129 Ok(m) => {
130 let mtime =
131 FileTime::from_last_modification_time(&m)
132 .seconds();
133 match paths.insert(
134 path.to_path_buf(),
135 PathData {
136 mtime: mtime,
137 last_check: current_time,
138 },
139 ) {
140 None => {
141 event_tx.send(RawEvent {
142 path: Some(path.to_path_buf()),
143 op: Ok(op::Op::CREATE),
144 cookie: None,
145 });
146 }
147 Some(PathData {
148 mtime: old_mtime, ..
149 }) => {
150 if mtime > old_mtime {
151 event_tx.send(RawEvent {
152 path: Some(path.to_path_buf()),
153 op: Ok(op::Op::WRITE),
154 cookie: None,
155 });
156 }
157 }
158 }
159 }
160 }
161 }
162 }
163 }
164 }
165 }
166
167 for (_, &mut WatchData { ref mut paths, .. }) in watches.iter_mut() {
168 let mut removed = Vec::new();
169 for (path, &PathData { last_check, .. }) in paths.iter() {
170 if last_check < current_time {
171 event_tx.send(RawEvent {
172 path: Some(path.clone()),
173 op: Ok(op::Op::REMOVE),
174 cookie: None,
175 });
176 removed.push(path.clone());
177 }
178 }
179 for path in removed {
180 (*paths).remove(&path);
181 }
182 }
183 }
184
185 thread::sleep(delay);
186 }
187 });
188 }
189}
190
191impl Watcher for PollWatcher {
192 fn new_raw(tx: Sender<RawEvent>) -> Result<PollWatcher> {
193 PollWatcher::with_delay_ms(tx, 30_000)
194 }
195
196 fn new(tx: Sender<DebouncedEvent>, delay: Duration) -> Result<PollWatcher> {
197 let mut p = PollWatcher {
198 event_tx: EventTx::DebouncedTx { tx: tx.clone() },
199 watches: Arc::new(Mutex::new(HashMap::new())),
200 open: Arc::new(RwLock::new(true)),
201 };
202 let event_tx = EventTx::Debounced {
203 tx: tx.clone(),
204 debounce: Debounce::new(delay, tx),
205 };
206 p.run(delay, event_tx);
207 Ok(p)
208 }
209
210 fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> {
211 if let Ok(mut watches) = self.watches.lock() {
212 let current_time = Instant::now();
213
214 let watch = path.as_ref().to_owned();
215
216 match fs::metadata(path) {
217 Err(e) => {
218 self.event_tx.send(RawEvent {
219 path: Some(watch.clone()),
220 op: Err(Error::Io(e)),
221 cookie: None,
222 });
223 }
224 Ok(metadata) => {
225 if !metadata.is_dir() {
226 let mut paths = HashMap::new();
227 let mtime = FileTime::from_last_modification_time(&metadata).seconds();
228 paths.insert(
229 watch.clone(),
230 PathData {
231 mtime: mtime,
232 last_check: current_time,
233 },
234 );
235 watches.insert(
236 watch,
237 WatchData {
238 is_recursive: recursive_mode.is_recursive(),
239 paths: paths,
240 },
241 );
242 } else {
243 let mut paths = HashMap::new();
244 let depth = if recursive_mode.is_recursive() {
245 usize::max_value()
246 } else {
247 1
248 };
249 for entry in WalkDir::new(watch.clone())
250 .follow_links(true)
251 .max_depth(depth)
252 .into_iter()
253 .filter_map(|e| e.ok())
254 {
255 let path = entry.path();
256
257 match entry.metadata() {
258 Err(e) => {
259 self.event_tx.send(RawEvent {
260 path: Some(path.to_path_buf()),
261 op: Err(Error::Io(e.into())),
262 cookie: None,
263 });
264 }
265 Ok(m) => {
266 let mtime = FileTime::from_last_modification_time(&m).seconds();
267 paths.insert(
268 path.to_path_buf(),
269 PathData {
270 mtime: mtime,
271 last_check: current_time,
272 },
273 );
274 }
275 }
276 }
277 watches.insert(
278 watch,
279 WatchData {
280 is_recursive: recursive_mode.is_recursive(),
281 paths: paths,
282 },
283 );
284 }
285 }
286 }
287 }
288 Ok(())
289 }
290
291 fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
292 if (*self.watches)
293 .lock()
294 .unwrap()
295 .remove(path.as_ref())
296 .is_some()
297 {
298 Ok(())
299 } else {
300 Err(Error::WatchNotFound)
301 }
302 }
303}
304
305impl Drop for PollWatcher {
306 fn drop(&mut self) {
307 {
308 let mut open = (*self.open).write().unwrap();
309 (*open) = false;
310 }
311 }
312}
313
314// Because all public methods are `&mut self` it's also perfectly safe to share references.
315unsafe impl Sync for PollWatcher {}