1use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
7use std::{
8 collections::HashMap,
9 path::{Path, PathBuf},
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc, Mutex,
13 },
14 thread,
15 time::Duration,
16};
17
18pub type ScanEvent = crate::Result<PathBuf>;
20
21pub trait ScanEventHandler: Send + 'static {
26 fn handle_event(&mut self, event: ScanEvent);
28}
29
30impl<F> ScanEventHandler for F
31where
32 F: FnMut(ScanEvent) + Send + 'static,
33{
34 fn handle_event(&mut self, event: ScanEvent) {
35 (self)(event);
36 }
37}
38
39#[cfg(feature = "crossbeam-channel")]
40impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
41 fn handle_event(&mut self, event: ScanEvent) {
42 let _ = self.send(event);
43 }
44}
45
46impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
47 fn handle_event(&mut self, event: ScanEvent) {
48 let _ = self.send(event);
49 }
50}
51
52impl ScanEventHandler for () {
53 fn handle_event(&mut self, _event: ScanEvent) {}
54}
55
56use data::{DataBuilder, WatchData};
57mod data {
58 use crate::{
59 event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
60 EventHandler,
61 };
62 use filetime::FileTime;
63 use std::{
64 cell::RefCell,
65 collections::{hash_map::RandomState, HashMap},
66 fmt::{self, Debug},
67 fs::{self, File, Metadata},
68 hash::{BuildHasher, Hasher},
69 io::{self, Read},
70 path::{Path, PathBuf},
71 time::Instant,
72 };
73 use walkdir::WalkDir;
74
75 use super::ScanEventHandler;
76
77 pub(super) struct DataBuilder {
79 emitter: EventEmitter,
80 scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
81
82 build_hasher: Option<RandomState>,
85
86 now: Instant,
88 }
89
90 impl DataBuilder {
91 pub(super) fn new<F, G>(
92 event_handler: F,
93 compare_content: bool,
94 scan_emitter: Option<G>,
95 ) -> Self
96 where
97 F: EventHandler,
98 G: ScanEventHandler,
99 {
100 let scan_emitter = match scan_emitter {
101 None => None,
102 Some(v) => {
103 let intermediate: Box<RefCell<dyn ScanEventHandler>> =
105 Box::new(RefCell::new(v));
106 Some(intermediate)
107 }
108 };
109 Self {
110 emitter: EventEmitter::new(event_handler),
111 scan_emitter,
112 build_hasher: compare_content.then(RandomState::default),
113 now: Instant::now(),
114 }
115 }
116
117 pub(super) fn update_timestamp(&mut self) {
119 self.now = Instant::now();
120 }
121
122 pub(super) fn build_watch_data(
127 &self,
128 root: PathBuf,
129 is_recursive: bool,
130 ) -> Option<WatchData> {
131 WatchData::new(self, root, is_recursive)
132 }
133
134 fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
136 PathData::new(self, meta_path)
137 }
138 }
139
140 impl Debug for DataBuilder {
141 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142 f.debug_struct("DataBuilder")
143 .field("build_hasher", &self.build_hasher)
144 .field("now", &self.now)
145 .finish()
146 }
147 }
148
149 #[derive(Debug)]
150 pub(super) struct WatchData {
151 root: PathBuf,
153 is_recursive: bool,
154
155 all_path_data: HashMap<PathBuf, PathData>,
157 }
158
159 impl WatchData {
160 fn new(data_builder: &DataBuilder, root: PathBuf, is_recursive: bool) -> Option<Self> {
166 if let Err(e) = fs::metadata(&root) {
185 data_builder.emitter.emit_io_err(e, &root);
186 return None;
187 }
188
189 let all_path_data =
190 Self::scan_all_path_data(data_builder, root.clone(), is_recursive, true).collect();
191
192 Some(Self {
193 root,
194 is_recursive,
195 all_path_data,
196 })
197 }
198
199 pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
205 for (path, new_path_data) in
207 Self::scan_all_path_data(data_builder, self.root.clone(), self.is_recursive, false)
208 {
209 let old_path_data = self
210 .all_path_data
211 .insert(path.clone(), new_path_data.clone());
212
213 let event =
215 PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
216 if let Some(event) = event {
217 data_builder.emitter.emit_ok(event);
218 }
219 }
220
221 let mut disappeared_paths = Vec::new();
223 for (path, path_data) in self.all_path_data.iter() {
224 if path_data.last_check < data_builder.now {
225 disappeared_paths.push(path.clone());
226 }
227 }
228
229 for path in disappeared_paths {
231 let old_path_data = self.all_path_data.remove(&path);
232
233 let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
235 if let Some(event) = event {
236 data_builder.emitter.emit_ok(event);
237 }
238 }
239 }
240
241 fn scan_all_path_data(
247 data_builder: &'_ DataBuilder,
248 root: PathBuf,
249 is_recursive: bool,
250 is_initial: bool,
252 ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
253 log::trace!("rescanning {root:?}");
254 WalkDir::new(root)
259 .follow_links(true)
260 .max_depth(Self::dir_scan_depth(is_recursive))
261 .into_iter()
262 .filter_map(|entry_res| match entry_res {
275 Ok(entry) => Some(entry),
276 Err(err) => {
277 log::warn!("walkdir error scanning {err:?}");
278 let crate_err =
279 crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
280 data_builder.emitter.emit(Err(crate_err));
281 None
282 }
283 })
284 .filter_map(move |entry| match entry.metadata() {
285 Ok(metadata) => {
286 let path = entry.into_path();
287 if is_initial {
288 if let Some(ref emitter) = data_builder.scan_emitter {
290 emitter.borrow_mut().handle_event(Ok(path.clone()));
291 }
292 }
293 let meta_path = MetaPath::from_parts_unchecked(path, metadata);
294 let data_path = data_builder.build_path_data(&meta_path);
295
296 Some((meta_path.into_path(), data_path))
297 }
298 Err(e) => {
299 let path = entry.into_path();
301 data_builder.emitter.emit_io_err(e, path);
302
303 None
304 }
305 })
306 }
307
308 fn dir_scan_depth(is_recursive: bool) -> usize {
309 if is_recursive {
310 usize::MAX
311 } else {
312 1
313 }
314 }
315 }
316
317 #[derive(Debug, Clone)]
321 struct PathData {
322 mtime: i64,
324
325 hash: Option<u64>,
328
329 last_check: Instant,
331 }
332
333 impl PathData {
334 fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
336 let metadata = meta_path.metadata();
337
338 PathData {
339 mtime: FileTime::from_last_modification_time(metadata).seconds(),
340 hash: data_builder
341 .build_hasher
342 .as_ref()
343 .filter(|_| metadata.is_file())
344 .and_then(|build_hasher| {
345 Self::get_content_hash(build_hasher, meta_path.path()).ok()
346 }),
347
348 last_check: data_builder.now,
349 }
350 }
351
352 fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
354 let mut hasher = build_hasher.build_hasher();
355 let mut file = File::open(path)?;
356 let mut buf = [0; 512];
357
358 loop {
359 let n = match file.read(&mut buf) {
360 Ok(0) => break,
361 Ok(len) => len,
362 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
363 Err(e) => return Err(e),
364 };
365
366 hasher.write(&buf[..n]);
367 }
368
369 Ok(hasher.finish())
370 }
371
372 fn compare_to_event<P>(
374 path: P,
375 old: Option<&PathData>,
376 new: Option<&PathData>,
377 ) -> Option<Event>
378 where
379 P: Into<PathBuf>,
380 {
381 match (old, new) {
382 (Some(old), Some(new)) => {
383 if new.mtime > old.mtime {
384 Some(EventKind::Modify(ModifyKind::Metadata(
385 MetadataKind::WriteTime,
386 )))
387 } else if new.hash != old.hash {
388 Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
389 } else {
390 None
391 }
392 }
393 (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
394 (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
395 (None, None) => None,
396 }
397 .map(|event_kind| Event::new(event_kind).add_path(path.into()))
398 }
399 }
400
401 #[derive(Debug)]
407 pub(super) struct MetaPath {
408 path: PathBuf,
409 metadata: Metadata,
410 }
411
412 impl MetaPath {
413 fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
419 Self { path, metadata }
420 }
421
422 fn path(&self) -> &Path {
423 &self.path
424 }
425
426 fn metadata(&self) -> &Metadata {
427 &self.metadata
428 }
429
430 fn into_path(self) -> PathBuf {
431 self.path
432 }
433 }
434
435 struct EventEmitter(
437 Box<RefCell<dyn EventHandler>>,
440 );
441
442 impl EventEmitter {
443 fn new<F: EventHandler>(event_handler: F) -> Self {
444 Self(Box::new(RefCell::new(event_handler)))
445 }
446
447 fn emit(&self, event: crate::Result<Event>) {
449 self.0.borrow_mut().handle_event(event);
450 }
451
452 fn emit_ok(&self, event: Event) {
454 self.emit(Ok(event))
455 }
456
457 fn emit_io_err<E, P>(&self, err: E, path: P)
459 where
460 E: Into<io::Error>,
461 P: Into<PathBuf>,
462 {
463 self.emit(Err(crate::Error::io(err.into()).add_path(path.into())))
464 }
465 }
466}
467
468#[derive(Debug)]
475pub struct PollWatcher {
476 watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
477 data_builder: Arc<Mutex<DataBuilder>>,
478 want_to_stop: Arc<AtomicBool>,
479 message_channel: Sender<()>,
482 delay: Option<Duration>,
483}
484
485impl PollWatcher {
486 pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
488 Self::with_opt::<_, ()>(event_handler, config, None)
489 }
490
491 pub fn poll(&self) -> crate::Result<()> {
493 self.message_channel
494 .send(())
495 .map_err(|_| Error::generic("failed to send poll message"))?;
496 Ok(())
497 }
498
499 pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
503 event_handler: F,
504 config: Config,
505 scan_callback: G,
506 ) -> crate::Result<PollWatcher> {
507 Self::with_opt(event_handler, config, Some(scan_callback))
508 }
509
510 fn with_opt<F: EventHandler, G: ScanEventHandler>(
512 event_handler: F,
513 config: Config,
514 scan_callback: Option<G>,
515 ) -> crate::Result<PollWatcher> {
516 let data_builder =
517 DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
518
519 let (tx, rx) = unbounded();
520
521 let poll_watcher = PollWatcher {
522 watches: Default::default(),
523 data_builder: Arc::new(Mutex::new(data_builder)),
524 want_to_stop: Arc::new(AtomicBool::new(false)),
525 delay: config.poll_interval(),
526 message_channel: tx,
527 };
528
529 poll_watcher.run(rx);
530
531 Ok(poll_watcher)
532 }
533
534 fn run(&self, rx: Receiver<()>) {
535 let watches = Arc::clone(&self.watches);
536 let data_builder = Arc::clone(&self.data_builder);
537 let want_to_stop = Arc::clone(&self.want_to_stop);
538 let delay = self.delay;
539
540 let _ = thread::Builder::new()
541 .name("notify-rs poll loop".to_string())
542 .spawn(move || {
543 loop {
544 if want_to_stop.load(Ordering::SeqCst) {
545 break;
546 }
547
548 if let (Ok(mut watches), Ok(mut data_builder)) =
553 (watches.lock(), data_builder.lock())
554 {
555 data_builder.update_timestamp();
556
557 let vals = watches.values_mut();
558 for watch_data in vals {
559 watch_data.rescan(&mut data_builder);
560 }
561 }
562 if let Some(delay) = delay {
564 let _ = rx.recv_timeout(delay);
565 } else {
566 let _ = rx.recv();
567 }
568 }
569 });
570 }
571
572 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
577 if let (Ok(mut watches), Ok(mut data_builder)) =
581 (self.watches.lock(), self.data_builder.lock())
582 {
583 data_builder.update_timestamp();
584
585 let watch_data =
586 data_builder.build_watch_data(path.to_path_buf(), recursive_mode.is_recursive());
587
588 if let Some(watch_data) = watch_data {
590 watches.insert(path.to_path_buf(), watch_data);
591 }
592 }
593 }
594
595 fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
599 self.watches
601 .lock()
602 .unwrap()
603 .remove(path)
604 .map(|_| ())
605 .ok_or_else(crate::Error::watch_not_found)
606 }
607}
608
609impl Watcher for PollWatcher {
610 fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
612 Self::new(event_handler, config)
613 }
614
615 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
616 self.watch_inner(path, recursive_mode);
617
618 Ok(())
619 }
620
621 fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
622 self.unwatch_inner(path)
623 }
624
625 fn kind() -> crate::WatcherKind {
626 crate::WatcherKind::PollWatcher
627 }
628}
629
630impl Drop for PollWatcher {
631 fn drop(&mut self) {
632 self.want_to_stop.store(true, Ordering::Relaxed);
633 }
634}
635
636#[test]
637fn poll_watcher_is_send_and_sync() {
638 fn check<T: Send + Sync>() {}
639 check::<PollWatcher>();
640}