notify_forked/
poll.rs

1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use self::walkdir::WalkDir;
7use super::debounce::{Debounce, EventTx};
8use super::{op, DebouncedEvent, Error, RawEvent, RecursiveMode, Result, Watcher};
9use filetime::FileTime;
10use std::collections::HashMap;
11use std::fs;
12use std::path::{Path, PathBuf};
13use std::sync::mpsc::Sender;
14use std::sync::{Arc, Mutex, RwLock};
15use std::thread;
16use std::time::{Duration, Instant};
17
18extern crate walkdir;
19
20struct PathData {
21    mtime: i64,
22    last_check: Instant,
23}
24
25struct WatchData {
26    is_recursive: bool,
27    paths: HashMap<PathBuf, PathData>,
28}
29
30/// Polling based `Watcher` implementation
31pub struct PollWatcher {
32    event_tx: EventTx,
33    watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
34    open: Arc<RwLock<bool>>,
35}
36
37impl PollWatcher {
38    /// Create a PollWatcher which polls every `delay` milliseconds
39    pub fn with_delay_ms(tx: Sender<RawEvent>, delay: u32) -> Result<PollWatcher> {
40        let mut p = PollWatcher {
41            event_tx: EventTx::Raw { tx: tx.clone() },
42            watches: Arc::new(Mutex::new(HashMap::new())),
43            open: Arc::new(RwLock::new(true)),
44        };
45        let event_tx = EventTx::Raw { tx: tx };
46        p.run(Duration::from_millis(delay as u64), event_tx);
47        Ok(p)
48    }
49
50    fn run(&mut self, delay: Duration, mut event_tx: EventTx) {
51        let watches = self.watches.clone();
52        let open = self.open.clone();
53
54        thread::spawn(move || {
55            // In order of priority:
56            // TODO: handle chmod events
57            // TODO: handle renames
58            // TODO: DRY it up
59
60            loop {
61                if !(*open.read().unwrap()) {
62                    break;
63                }
64
65                if let Ok(mut watches) = watches.lock() {
66                    let current_time = Instant::now();
67
68                    for (
69                        watch,
70                        &mut WatchData {
71                            is_recursive,
72                            ref mut paths,
73                        },
74                    ) in watches.iter_mut()
75                    {
76                        match fs::metadata(watch) {
77                            Err(e) => {
78                                event_tx.send(RawEvent {
79                                    path: Some(watch.clone()),
80                                    op: Err(Error::Io(e)),
81                                    cookie: None,
82                                });
83                                continue;
84                            }
85                            Ok(metadata) => {
86                                if !metadata.is_dir() {
87                                    let mtime =
88                                        FileTime::from_last_modification_time(&metadata).seconds();
89                                    match paths.insert(
90                                        watch.clone(),
91                                        PathData {
92                                            mtime: mtime,
93                                            last_check: current_time,
94                                        },
95                                    ) {
96                                        None => {
97                                            unreachable!();
98                                        }
99                                        Some(PathData {
100                                            mtime: old_mtime, ..
101                                        }) => {
102                                            if mtime > old_mtime {
103                                                event_tx.send(RawEvent {
104                                                    path: Some(watch.clone()),
105                                                    op: Ok(op::Op::WRITE),
106                                                    cookie: None,
107                                                });
108                                            }
109                                        }
110                                    }
111                                } else {
112                                    let depth = if is_recursive { usize::max_value() } else { 1 };
113                                    for entry in WalkDir::new(watch)
114                                        .follow_links(true)
115                                        .max_depth(depth)
116                                        .into_iter()
117                                        .filter_map(|e| e.ok())
118                                    {
119                                        let path = entry.path();
120
121                                        match entry.metadata() {
122                                            Err(e) => {
123                                                event_tx.send(RawEvent {
124                                                    path: Some(path.to_path_buf()),
125                                                    op: Err(Error::Io(e.into())),
126                                                    cookie: None,
127                                                });
128                                            }
129                                            Ok(m) => {
130                                                let mtime =
131                                                    FileTime::from_last_modification_time(&m)
132                                                        .seconds();
133                                                match paths.insert(
134                                                    path.to_path_buf(),
135                                                    PathData {
136                                                        mtime: mtime,
137                                                        last_check: current_time,
138                                                    },
139                                                ) {
140                                                    None => {
141                                                        event_tx.send(RawEvent {
142                                                            path: Some(path.to_path_buf()),
143                                                            op: Ok(op::Op::CREATE),
144                                                            cookie: None,
145                                                        });
146                                                    }
147                                                    Some(PathData {
148                                                        mtime: old_mtime, ..
149                                                    }) => {
150                                                        if mtime > old_mtime {
151                                                            event_tx.send(RawEvent {
152                                                                path: Some(path.to_path_buf()),
153                                                                op: Ok(op::Op::WRITE),
154                                                                cookie: None,
155                                                            });
156                                                        }
157                                                    }
158                                                }
159                                            }
160                                        }
161                                    }
162                                }
163                            }
164                        }
165                    }
166
167                    for (_, &mut WatchData { ref mut paths, .. }) in watches.iter_mut() {
168                        let mut removed = Vec::new();
169                        for (path, &PathData { last_check, .. }) in paths.iter() {
170                            if last_check < current_time {
171                                event_tx.send(RawEvent {
172                                    path: Some(path.clone()),
173                                    op: Ok(op::Op::REMOVE),
174                                    cookie: None,
175                                });
176                                removed.push(path.clone());
177                            }
178                        }
179                        for path in removed {
180                            (*paths).remove(&path);
181                        }
182                    }
183                }
184
185                thread::sleep(delay);
186            }
187        });
188    }
189}
190
191impl Watcher for PollWatcher {
192    fn new_raw(tx: Sender<RawEvent>) -> Result<PollWatcher> {
193        PollWatcher::with_delay_ms(tx, 30_000)
194    }
195
196    fn new(tx: Sender<DebouncedEvent>, delay: Duration) -> Result<PollWatcher> {
197        let mut p = PollWatcher {
198            event_tx: EventTx::DebouncedTx { tx: tx.clone() },
199            watches: Arc::new(Mutex::new(HashMap::new())),
200            open: Arc::new(RwLock::new(true)),
201        };
202        let event_tx = EventTx::Debounced {
203            tx: tx.clone(),
204            debounce: Debounce::new(delay, tx),
205        };
206        p.run(delay, event_tx);
207        Ok(p)
208    }
209
210    fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> {
211        if let Ok(mut watches) = self.watches.lock() {
212            let current_time = Instant::now();
213
214            let watch = path.as_ref().to_owned();
215
216            match fs::metadata(path) {
217                Err(e) => {
218                    self.event_tx.send(RawEvent {
219                        path: Some(watch.clone()),
220                        op: Err(Error::Io(e)),
221                        cookie: None,
222                    });
223                }
224                Ok(metadata) => {
225                    if !metadata.is_dir() {
226                        let mut paths = HashMap::new();
227                        let mtime = FileTime::from_last_modification_time(&metadata).seconds();
228                        paths.insert(
229                            watch.clone(),
230                            PathData {
231                                mtime: mtime,
232                                last_check: current_time,
233                            },
234                        );
235                        watches.insert(
236                            watch,
237                            WatchData {
238                                is_recursive: recursive_mode.is_recursive(),
239                                paths: paths,
240                            },
241                        );
242                    } else {
243                        let mut paths = HashMap::new();
244                        let depth = if recursive_mode.is_recursive() {
245                            usize::max_value()
246                        } else {
247                            1
248                        };
249                        for entry in WalkDir::new(watch.clone())
250                            .follow_links(true)
251                            .max_depth(depth)
252                            .into_iter()
253                            .filter_map(|e| e.ok())
254                        {
255                            let path = entry.path();
256
257                            match entry.metadata() {
258                                Err(e) => {
259                                    self.event_tx.send(RawEvent {
260                                        path: Some(path.to_path_buf()),
261                                        op: Err(Error::Io(e.into())),
262                                        cookie: None,
263                                    });
264                                }
265                                Ok(m) => {
266                                    let mtime = FileTime::from_last_modification_time(&m).seconds();
267                                    paths.insert(
268                                        path.to_path_buf(),
269                                        PathData {
270                                            mtime: mtime,
271                                            last_check: current_time,
272                                        },
273                                    );
274                                }
275                            }
276                        }
277                        watches.insert(
278                            watch,
279                            WatchData {
280                                is_recursive: recursive_mode.is_recursive(),
281                                paths: paths,
282                            },
283                        );
284                    }
285                }
286            }
287        }
288        Ok(())
289    }
290
291    fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
292        if (*self.watches)
293            .lock()
294            .unwrap()
295            .remove(path.as_ref())
296            .is_some()
297        {
298            Ok(())
299        } else {
300            Err(Error::WatchNotFound)
301        }
302    }
303}
304
305impl Drop for PollWatcher {
306    fn drop(&mut self) {
307        {
308            let mut open = (*self.open).write().unwrap();
309            (*open) = false;
310        }
311    }
312}
313
314// Because all public methods are `&mut self` it's also perfectly safe to share references.
315unsafe impl Sync for PollWatcher {}