notify/
windows.rs

1#![allow(missing_docs)]
2//! Watcher implementation for Windows' directory management APIs
3//!
4//! For more information see the [ReadDirectoryChangesW reference][ref].
5//!
6//! [ref]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa363950(v=vs.85).aspx
7
8use crate::{BoundSender, Config, Receiver, Sender, bounded, unbounded};
9use crate::{Error, EventHandler, RecursiveMode, Result, Watcher};
10use crate::{WatcherKind, event::*};
11use std::alloc;
12use std::cell::RefCell;
13use std::collections::HashMap;
14#[cfg(test)]
15use std::collections::HashSet;
16use std::env;
17use std::ffi::OsString;
18use std::os::raw::c_void;
19use std::os::windows::ffi::{OsStrExt, OsStringExt};
20use std::path::{Path, PathBuf};
21use std::ptr;
22use std::rc::Rc;
23use std::slice;
24use std::sync::{Arc, Mutex};
25use std::thread;
26use windows_sys::Win32::Foundation::{
27    CloseHandle, ERROR_ACCESS_DENIED, ERROR_OPERATION_ABORTED, ERROR_SUCCESS, HANDLE,
28    INVALID_HANDLE_VALUE, WAIT_OBJECT_0,
29};
30use windows_sys::Win32::Storage::FileSystem::{
31    CreateFileW, FILE_ACTION_ADDED, FILE_ACTION_MODIFIED, FILE_ACTION_REMOVED,
32    FILE_ACTION_RENAMED_NEW_NAME, FILE_ACTION_RENAMED_OLD_NAME, FILE_FLAG_BACKUP_SEMANTICS,
33    FILE_FLAG_OVERLAPPED, FILE_LIST_DIRECTORY, FILE_NOTIFY_CHANGE_ATTRIBUTES,
34    FILE_NOTIFY_CHANGE_CREATION, FILE_NOTIFY_CHANGE_DIR_NAME, FILE_NOTIFY_CHANGE_FILE_NAME,
35    FILE_NOTIFY_CHANGE_LAST_WRITE, FILE_NOTIFY_CHANGE_SECURITY, FILE_NOTIFY_CHANGE_SIZE,
36    FILE_NOTIFY_INFORMATION, FILE_SHARE_DELETE, FILE_SHARE_READ, FILE_SHARE_WRITE, OPEN_EXISTING,
37    ReadDirectoryChangesW,
38};
39use windows_sys::Win32::System::IO::{CancelIo, OVERLAPPED};
40use windows_sys::Win32::System::Threading::{
41    CreateSemaphoreW, INFINITE, ReleaseSemaphore, WaitForSingleObjectEx,
42};
43
44const BUF_SIZE: u32 = 16384;
45
46#[derive(Clone)]
47struct ReadData {
48    dir: PathBuf, // directory that is being watched
49    watches: Rc<RefCell<HashMap<PathBuf, RecursiveMode>>>,
50    complete_sem: HANDLE,
51    is_recursive: bool,
52}
53
54struct ReadDirectoryRequest {
55    event_handler: Arc<Mutex<dyn EventHandler>>,
56    buffer: [u8; BUF_SIZE as usize],
57    handle: HANDLE,
58    data: ReadData,
59    action_tx: Sender<Action>,
60}
61
62impl ReadDirectoryRequest {
63    fn unwatch(&self) {
64        let _ = self.action_tx.send(Action::Unwatch(self.data.dir.clone()));
65    }
66}
67
68enum Action {
69    Watch(PathBuf, RecursiveMode),
70    Unwatch(PathBuf),
71    Stop,
72    Configure(Config, BoundSender<Result<bool>>),
73    #[cfg(test)]
74    GetWatchHandles(BoundSender<HashSet<PathBuf>>),
75}
76
77struct WatchState {
78    dir_handle: HANDLE,
79    complete_sem: HANDLE,
80}
81
82struct ReadDirectoryChangesServer {
83    tx: Sender<Action>,
84    rx: Receiver<Action>,
85    event_handler: Arc<Mutex<dyn EventHandler>>,
86    cmd_tx: Sender<Result<PathBuf>>,
87    watches: Rc<RefCell<HashMap<PathBuf, RecursiveMode>>>,
88    watch_handles: HashMap<PathBuf, (WatchState, /* is_recursive */ bool)>,
89    wakeup_sem: HANDLE,
90}
91
92impl ReadDirectoryChangesServer {
93    fn start(
94        event_handler: Arc<Mutex<dyn EventHandler>>,
95        cmd_tx: Sender<Result<PathBuf>>,
96        wakeup_sem: HANDLE,
97    ) -> Sender<Action> {
98        let (action_tx, action_rx) = unbounded();
99        // it is, in fact, ok to send the semaphore across threads
100        let sem_temp = wakeup_sem as u64;
101        let _ = thread::Builder::new()
102            .name("notify-rs windows loop".to_string())
103            .spawn({
104                let tx = action_tx.clone();
105                move || {
106                    let wakeup_sem = sem_temp as HANDLE;
107                    let server = ReadDirectoryChangesServer {
108                        tx,
109                        rx: action_rx,
110                        event_handler,
111                        cmd_tx,
112                        watches: Rc::new(RefCell::new(HashMap::new())),
113                        watch_handles: HashMap::new(),
114                        wakeup_sem,
115                    };
116                    server.run();
117                }
118            });
119        action_tx
120    }
121
122    fn run(mut self) {
123        loop {
124            // process all available actions first
125            let mut stopped = false;
126
127            while let Ok(action) = self.rx.try_recv() {
128                match action {
129                    Action::Watch(path, recursive_mode) => {
130                        let res = self.add_watch(path, recursive_mode);
131                        let _ = self.cmd_tx.send(res);
132                    }
133                    Action::Unwatch(path) => self.remove_watch(path),
134                    Action::Stop => {
135                        stopped = true;
136                        for (ws, _) in self.watch_handles.values() {
137                            stop_watch(ws);
138                        }
139                        break;
140                    }
141                    Action::Configure(config, tx) => {
142                        self.configure_raw_mode(config, tx);
143                    }
144                    #[cfg(test)]
145                    Action::GetWatchHandles(tx) => {
146                        let handles = self.watch_handles.keys().cloned().collect();
147                        tx.send(handles).unwrap();
148                    }
149                }
150            }
151
152            if stopped {
153                break;
154            }
155
156            unsafe {
157                // wait with alertable flag so that the completion routine fires
158                WaitForSingleObjectEx(self.wakeup_sem, 100, 1);
159            }
160        }
161
162        // we have to clean this up, since the watcher may be long gone
163        unsafe {
164            CloseHandle(self.wakeup_sem);
165        }
166    }
167
168    fn add_watch(&mut self, path: PathBuf, recursive_mode: RecursiveMode) -> Result<PathBuf> {
169        if let Some(existing) = self.watches.borrow().get(&path) {
170            let need_upgrade_to_recursive = match *existing {
171                RecursiveMode::Recursive => false,
172                RecursiveMode::NonRecursive => recursive_mode == RecursiveMode::Recursive,
173            };
174            if !need_upgrade_to_recursive {
175                return Ok(path);
176            }
177        }
178
179        let metadata = path.metadata();
180        // path must exist and be either a file or directory
181        if metadata
182            .as_ref()
183            .map(|m| !m.is_dir() && !m.is_file())
184            .unwrap_or(true)
185        {
186            return Err(
187                Error::generic("Input watch path is neither a file nor a directory.")
188                    .add_path(path),
189            );
190        }
191
192        let (watching_file, dir_target) = {
193            if metadata.map(|m| m.is_dir()).unwrap_or(false) {
194                (false, path.clone())
195            } else {
196                // emulate file watching by watching the parent directory
197                (true, path.parent().unwrap().to_path_buf())
198            }
199        };
200
201        self.add_watch_raw(dir_target, recursive_mode.is_recursive(), watching_file)?;
202        self.watches
203            .borrow_mut()
204            .insert(path.clone(), recursive_mode);
205
206        Ok(path)
207    }
208
209    fn add_watch_raw(
210        &mut self,
211        path: PathBuf,
212        is_recursive: bool,
213        watching_file: bool,
214    ) -> Result<()> {
215        if let Some((ws, was_recursive)) = self.watch_handles.get(&path) {
216            let need_upgrade_to_recursive = !*was_recursive && is_recursive;
217            if !need_upgrade_to_recursive {
218                return Ok(());
219            }
220            stop_watch(ws);
221        }
222
223        let encoded_path: Vec<u16> = path.as_os_str().encode_wide().chain(Some(0)).collect();
224        let handle;
225        unsafe {
226            handle = CreateFileW(
227                encoded_path.as_ptr(),
228                FILE_LIST_DIRECTORY,
229                FILE_SHARE_READ | FILE_SHARE_DELETE | FILE_SHARE_WRITE,
230                ptr::null_mut(),
231                OPEN_EXISTING,
232                FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
233                ptr::null_mut(),
234            );
235
236            if handle == INVALID_HANDLE_VALUE {
237                return Err(if watching_file {
238                    Error::generic(
239                        "You attempted to watch a single file, but parent \
240                         directory could not be opened.",
241                    )
242                    .add_path(path)
243                } else {
244                    // TODO: Call GetLastError for better error info?
245                    Error::path_not_found().add_path(path)
246                });
247            }
248        }
249        // every watcher gets its own semaphore to signal completion
250        let semaphore = unsafe { CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
251        if semaphore.is_null() || semaphore == INVALID_HANDLE_VALUE {
252            unsafe {
253                CloseHandle(handle);
254            }
255            return Err(Error::generic("Failed to create semaphore for watch.").add_path(path));
256        }
257        let rd = ReadData {
258            dir: path.clone(),
259            watches: Rc::clone(&self.watches),
260            complete_sem: semaphore,
261            is_recursive,
262        };
263        let ws = WatchState {
264            dir_handle: handle,
265            complete_sem: semaphore,
266        };
267        self.watch_handles.insert(path, (ws, is_recursive));
268        start_read(&rd, self.event_handler.clone(), handle, self.tx.clone());
269        Ok(())
270    }
271
272    fn remove_watch(&mut self, path: PathBuf) {
273        if self.watches.borrow_mut().remove(&path).is_some() {
274            if let Some((ws, _)) = self.watch_handles.remove(&path) {
275                stop_watch(&ws);
276            } else if let Some(parent_path) = path.parent()
277                && self.watches.borrow().get(parent_path).is_none()
278                && self
279                    .watches
280                    .borrow()
281                    .keys()
282                    .filter(|p| p.starts_with(parent_path))
283                    .count()
284                    == 0
285                && let Some((ws, _)) = self.watch_handles.remove(parent_path)
286            {
287                // if the parent path is not watched, the watch handle is used for the files under it
288                // if no files under it are watched anymore, we can stop the watch on the parent path
289                stop_watch(&ws);
290            }
291        }
292    }
293
294    fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
295        tx.send(Ok(false))
296            .expect("configuration channel disconnect");
297    }
298}
299
300fn stop_watch(ws: &WatchState) {
301    unsafe {
302        let cio = CancelIo(ws.dir_handle);
303        let ch = CloseHandle(ws.dir_handle);
304        // have to wait for it, otherwise we leak the memory allocated for there read request
305        if cio != 0 && ch != 0 {
306            while WaitForSingleObjectEx(ws.complete_sem, INFINITE, 1) != WAIT_OBJECT_0 {
307                // drain the apc queue, fix for https://github.com/notify-rs/notify/issues/287#issuecomment-801465550
308            }
309        }
310        CloseHandle(ws.complete_sem);
311    }
312}
313
314fn start_read(
315    rd: &ReadData,
316    event_handler: Arc<Mutex<dyn EventHandler>>,
317    handle: HANDLE,
318    action_tx: Sender<Action>,
319) {
320    let request = Box::new(ReadDirectoryRequest {
321        event_handler,
322        handle,
323        buffer: [0u8; BUF_SIZE as usize],
324        data: rd.clone(),
325        action_tx,
326    });
327
328    let flags = FILE_NOTIFY_CHANGE_FILE_NAME
329        | FILE_NOTIFY_CHANGE_DIR_NAME
330        | FILE_NOTIFY_CHANGE_ATTRIBUTES
331        | FILE_NOTIFY_CHANGE_SIZE
332        | FILE_NOTIFY_CHANGE_LAST_WRITE
333        | FILE_NOTIFY_CHANGE_CREATION
334        | FILE_NOTIFY_CHANGE_SECURITY;
335
336    let monitor_subdir = if request.data.is_recursive { 1 } else { 0 };
337
338    unsafe {
339        let overlapped = alloc::alloc_zeroed(alloc::Layout::new::<OVERLAPPED>()) as *mut OVERLAPPED;
340        // When using callback based async requests, we are allowed to use the hEvent member
341        // for our own purposes
342
343        let request = Box::leak(request);
344        (*overlapped).hEvent = request as *mut _ as _;
345
346        // This is using an asynchronous call with a completion routine for receiving notifications
347        // An I/O completion port would probably be more performant
348        let ret = ReadDirectoryChangesW(
349            handle,
350            request.buffer.as_mut_ptr() as *mut c_void,
351            BUF_SIZE,
352            monitor_subdir,
353            flags,
354            &mut 0u32 as *mut u32, // not used for async reqs
355            overlapped,
356            Some(handle_event),
357        );
358
359        if ret == 0 {
360            // error reading. retransmute request memory to allow drop.
361            // Because of the error, ownership of the `overlapped` alloc was not passed
362            // over to `ReadDirectoryChangesW`.
363            // So we can claim ownership back.
364            let _overlapped = Box::from_raw(overlapped);
365            let request = Box::from_raw(request);
366            ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
367        }
368    }
369}
370
371unsafe extern "system" fn handle_event(
372    error_code: u32,
373    _bytes_written: u32,
374    overlapped: *mut OVERLAPPED,
375) {
376    let overlapped: Box<OVERLAPPED> = unsafe { Box::from_raw(overlapped) };
377    let request: Box<ReadDirectoryRequest> = unsafe { Box::from_raw(overlapped.hEvent as *mut _) };
378
379    let release_semaphore =
380        || unsafe { ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()) };
381
382    match error_code {
383        ERROR_OPERATION_ABORTED => {
384            // received when dir is unwatched or watcher is shutdown; return and let overlapped/request get drop-cleaned
385            release_semaphore();
386            return;
387        }
388        ERROR_ACCESS_DENIED => {
389            // This could happen when the watched directory is deleted or trashed, first check if it's the case.
390            // If so, unwatch the directory and return, otherwise, continue to handle the event.
391            if !request.data.dir.exists() {
392                request.unwatch();
393                release_semaphore();
394                return;
395            }
396        }
397        ERROR_SUCCESS => {
398            // Success, continue to handle the event
399        }
400        _ => {
401            // Some unidentified error occurred, log and unwatch the directory, then return.
402            log::error!(
403                "unknown error in ReadDirectoryChangesW for directory {}: {}",
404                request.data.dir.display(),
405                error_code
406            );
407            request.unwatch();
408            release_semaphore();
409            return;
410        }
411    }
412
413    // Get the next request queued up as soon as possible
414    start_read(
415        &request.data,
416        request.event_handler.clone(),
417        request.handle,
418        request.action_tx,
419    );
420
421    // The FILE_NOTIFY_INFORMATION struct has a variable length due to the variable length
422    // string as its last member. Each struct contains an offset for getting the next entry in
423    // the buffer.
424    let mut cur_offset: *const u8 = request.buffer.as_ptr();
425    // In Wine, FILE_NOTIFY_INFORMATION structs are packed placed in the buffer;
426    // they are aligned to 16bit (WCHAR) boundary instead of 32bit required by FILE_NOTIFY_INFORMATION.
427    // Hence, we need to use `read_unaligned` here to avoid UB.
428    let mut cur_entry =
429        unsafe { ptr::read_unaligned(cur_offset as *const FILE_NOTIFY_INFORMATION) };
430    loop {
431        // filename length is size in bytes, so / 2
432        let len = cur_entry.FileNameLength as usize / 2;
433        let encoded_path: &[u16] = unsafe {
434            slice::from_raw_parts(
435                cur_offset.add(std::mem::offset_of!(FILE_NOTIFY_INFORMATION, FileName)) as _,
436                len,
437            )
438        };
439        // prepend root to get a full path
440        let path = request
441            .data
442            .dir
443            .join(PathBuf::from(OsString::from_wide(encoded_path)));
444
445        // if we are watching a single file, ignore the event unless the path is exactly
446        // the watched file
447        let skip = !(request
448            .data
449            .watches
450            .borrow()
451            .contains_key(&request.data.dir)
452            || request.data.watches.borrow().contains_key(&path));
453
454        if !skip {
455            log::trace!(
456                "Event: path = `{}`, action = {:?}",
457                path.display(),
458                cur_entry.Action
459            );
460
461            let newe = Event::new(EventKind::Any).add_path(path);
462
463            fn emit_event(event_handler: &Mutex<dyn EventHandler>, res: Result<Event>) {
464                if let Ok(mut guard) = event_handler.lock() {
465                    let f: &mut dyn EventHandler = &mut *guard;
466                    f.handle_event(res);
467                }
468            }
469
470            let event_handler = |res| emit_event(&request.event_handler, res);
471
472            if cur_entry.Action == FILE_ACTION_RENAMED_OLD_NAME {
473                let mode = RenameMode::From;
474                let kind = ModifyKind::Name(mode);
475                let kind = EventKind::Modify(kind);
476                let ev = newe.set_kind(kind);
477                event_handler(Ok(ev))
478            } else {
479                match cur_entry.Action {
480                    FILE_ACTION_RENAMED_NEW_NAME => {
481                        let kind = EventKind::Modify(ModifyKind::Name(RenameMode::To));
482                        let ev = newe.set_kind(kind);
483                        event_handler(Ok(ev));
484                    }
485                    FILE_ACTION_ADDED => {
486                        let kind = EventKind::Create(CreateKind::Any);
487                        let ev = newe.set_kind(kind);
488                        event_handler(Ok(ev));
489                    }
490                    FILE_ACTION_REMOVED => {
491                        let kind = EventKind::Remove(RemoveKind::Any);
492                        let ev = newe.set_kind(kind);
493                        event_handler(Ok(ev));
494                    }
495                    FILE_ACTION_MODIFIED => {
496                        let kind = EventKind::Modify(ModifyKind::Any);
497                        let ev = newe.set_kind(kind);
498                        event_handler(Ok(ev));
499                    }
500                    _ => (),
501                };
502            }
503        }
504
505        if cur_entry.NextEntryOffset == 0 {
506            break;
507        }
508        cur_offset = unsafe { cur_offset.add(cur_entry.NextEntryOffset as usize) };
509        cur_entry = unsafe { ptr::read_unaligned(cur_offset as *const FILE_NOTIFY_INFORMATION) };
510    }
511}
512
513/// Watcher implementation based on ReadDirectoryChanges
514#[derive(Debug)]
515pub struct ReadDirectoryChangesWatcher {
516    tx: Sender<Action>,
517    cmd_rx: Receiver<Result<PathBuf>>,
518    wakeup_sem: HANDLE,
519}
520
521impl ReadDirectoryChangesWatcher {
522    pub fn create(
523        event_handler: Arc<Mutex<dyn EventHandler>>,
524    ) -> Result<ReadDirectoryChangesWatcher> {
525        let (cmd_tx, cmd_rx) = unbounded();
526
527        let wakeup_sem = unsafe { CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
528        if wakeup_sem.is_null() || wakeup_sem == INVALID_HANDLE_VALUE {
529            return Err(Error::generic("Failed to create wakeup semaphore."));
530        }
531
532        let action_tx = ReadDirectoryChangesServer::start(event_handler, cmd_tx, wakeup_sem);
533
534        Ok(ReadDirectoryChangesWatcher {
535            tx: action_tx,
536            cmd_rx,
537            wakeup_sem,
538        })
539    }
540
541    fn wakeup_server(&mut self) {
542        // breaks the server out of its wait state.  right now this is really just an optimization,
543        // so that if you add a watch you don't block for 100ms in watch() while the
544        // server sleeps.
545        unsafe {
546            ReleaseSemaphore(self.wakeup_sem, 1, ptr::null_mut());
547        }
548    }
549
550    fn send_action_require_ack(&mut self, action: Action, pb: &PathBuf) -> Result<()> {
551        self.tx
552            .send(action)
553            .map_err(|_| Error::generic("Error sending to internal channel"))?;
554
555        // wake 'em up, we don't want to wait around for the ack
556        self.wakeup_server();
557
558        let ack_pb = self
559            .cmd_rx
560            .recv()
561            .map_err(|_| Error::generic("Error receiving from command channel"))?
562            .map_err(|e| Error::generic(&format!("Error in watcher: {:?}", e)))?;
563
564        if pb.as_path() != ack_pb.as_path() {
565            Err(Error::generic(&format!(
566                "Expected ack for {:?} but got \
567                 ack for {:?}",
568                pb, ack_pb
569            )))
570        } else {
571            Ok(())
572        }
573    }
574
575    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
576        let pb = if path.is_absolute() {
577            path.to_owned()
578        } else {
579            let p = env::current_dir().map_err(Error::io)?;
580            p.join(path)
581        };
582        self.send_action_require_ack(Action::Watch(pb.clone(), recursive_mode), &pb)
583    }
584
585    fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
586        let pb = if path.is_absolute() {
587            path.to_owned()
588        } else {
589            let p = env::current_dir().map_err(Error::io)?;
590            p.join(path)
591        };
592        let res = self
593            .tx
594            .send(Action::Unwatch(pb))
595            .map_err(|_| Error::generic("Error sending to internal channel"));
596        self.wakeup_server();
597        res
598    }
599}
600
601impl Watcher for ReadDirectoryChangesWatcher {
602    fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> {
603        let event_handler = Arc::new(Mutex::new(event_handler));
604        Self::create(event_handler)
605    }
606
607    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
608        self.watch_inner(path, recursive_mode)
609    }
610
611    fn unwatch(&mut self, path: &Path) -> Result<()> {
612        self.unwatch_inner(path)
613    }
614
615    fn configure(&mut self, config: Config) -> Result<bool> {
616        let (tx, rx) = bounded(1);
617        self.tx.send(Action::Configure(config, tx))?;
618        rx.recv()?
619    }
620
621    fn kind() -> crate::WatcherKind {
622        WatcherKind::ReadDirectoryChangesWatcher
623    }
624
625    #[cfg(test)]
626    fn get_watch_handles(&self) -> HashSet<PathBuf> {
627        let (tx, rx) = bounded(1);
628        self.tx.send(Action::GetWatchHandles(tx)).unwrap();
629        rx.recv().unwrap()
630    }
631}
632
633impl Drop for ReadDirectoryChangesWatcher {
634    fn drop(&mut self) {
635        let _ = self.tx.send(Action::Stop);
636        // better wake it up
637        self.wakeup_server();
638    }
639}
640
641// `ReadDirectoryChangesWatcher` is not Send/Sync because of the semaphore Handle.
642// As said elsewhere it's perfectly safe to send it across threads.
643unsafe impl Send for ReadDirectoryChangesWatcher {}
644// Because all public methods are `&mut self` it's also perfectly safe to share references.
645unsafe impl Sync for ReadDirectoryChangesWatcher {}
646
647#[cfg(test)]
648pub mod tests {
649    use crate::{ReadDirectoryChangesWatcher, RecursiveMode, Watcher, test::*};
650
651    use std::{collections::HashSet, time::Duration};
652
653    fn watcher() -> (TestWatcher<ReadDirectoryChangesWatcher>, Receiver) {
654        channel()
655    }
656
657    #[test]
658    fn trash_dir() -> std::result::Result<(), Box<dyn std::error::Error>> {
659        let dir = testdir();
660        let child_dir = dir.path().join("child");
661        std::fs::create_dir(&child_dir)?;
662
663        let mut watcher = crate::recommended_watcher(|_| {
664            // Do something with the event
665        })?;
666        watcher.watch(&child_dir, RecursiveMode::NonRecursive)?;
667        assert_eq!(
668            watcher.get_watch_handles(),
669            HashSet::from([child_dir.clone()])
670        );
671
672        trash::delete(&child_dir)?;
673
674        watcher.watch(dir.path(), RecursiveMode::NonRecursive)?;
675        assert_eq!(
676            watcher.get_watch_handles(),
677            HashSet::from([dir.to_path_buf()])
678        );
679
680        Ok(())
681    }
682
683    #[test]
684    fn watcher_is_send_and_sync() {
685        fn check<T: Send + Sync>() {}
686        check::<ReadDirectoryChangesWatcher>();
687    }
688
689    #[test]
690    fn create_file() {
691        let tmpdir = testdir();
692        let (mut watcher, mut rx) = watcher();
693        watcher.watch_recursively(&tmpdir);
694
695        let path = tmpdir.path().join("entry");
696        std::fs::File::create_new(&path).expect("create");
697
698        rx.wait_ordered_exact([expected(&path).create_any()])
699            .ensure_no_tail();
700        assert_eq!(
701            watcher.get_watch_handles(),
702            HashSet::from([tmpdir.to_path_buf()])
703        );
704    }
705
706    #[test]
707    fn write_file() {
708        let tmpdir = testdir();
709        let (mut watcher, mut rx) = watcher();
710
711        let path = tmpdir.path().join("entry");
712        std::fs::File::create_new(&path).expect("create");
713
714        watcher.watch_recursively(&tmpdir);
715        std::fs::write(&path, b"123").expect("write");
716
717        rx.wait_ordered_exact([expected(&path).modify_any().multiple()])
718            .ensure_no_tail();
719        assert_eq!(
720            watcher.get_watch_handles(),
721            HashSet::from([tmpdir.to_path_buf()])
722        );
723    }
724
725    #[test]
726    fn chmod_file() {
727        let tmpdir = testdir();
728        let (mut watcher, mut rx) = watcher();
729
730        let path = tmpdir.path().join("entry");
731        let file = std::fs::File::create_new(&path).expect("create");
732        let mut permissions = file.metadata().expect("metadata").permissions();
733        permissions.set_readonly(true);
734
735        watcher.watch_recursively(&tmpdir);
736        file.set_permissions(permissions).expect("set_permissions");
737
738        rx.wait_ordered_exact([expected(&path).modify_any()])
739            .ensure_no_tail();
740        assert_eq!(
741            watcher.get_watch_handles(),
742            HashSet::from([tmpdir.to_path_buf()])
743        );
744    }
745
746    #[test]
747    fn rename_file() {
748        let tmpdir = testdir();
749        let (mut watcher, mut rx) = watcher();
750
751        let path = tmpdir.path().join("entry");
752        std::fs::File::create_new(&path).expect("create");
753
754        watcher.watch_recursively(&tmpdir);
755        let new_path = tmpdir.path().join("renamed");
756
757        std::fs::rename(&path, &new_path).expect("rename");
758
759        rx.wait_ordered_exact([
760            expected(&path).rename_from(),
761            expected(&new_path).rename_to(),
762        ])
763        .ensure_no_tail();
764        assert_eq!(
765            watcher.get_watch_handles(),
766            HashSet::from([tmpdir.to_path_buf()])
767        );
768    }
769
770    #[test]
771    fn delete_file() {
772        let tmpdir = testdir();
773        let (mut watcher, mut rx) = watcher();
774        let file = tmpdir.path().join("file");
775        std::fs::write(&file, "").expect("write");
776
777        watcher.watch_nonrecursively(&tmpdir);
778
779        std::fs::remove_file(&file).expect("remove");
780
781        rx.wait_ordered_exact([expected(&file).remove_any()])
782            .ensure_no_tail();
783        assert_eq!(
784            watcher.get_watch_handles(),
785            HashSet::from([tmpdir.to_path_buf()])
786        );
787    }
788
789    #[test]
790    fn delete_self_file() {
791        let tmpdir = testdir();
792        let (mut watcher, mut rx) = watcher();
793        let file = tmpdir.path().join("file");
794        std::fs::write(&file, "").expect("write");
795
796        watcher.watch_nonrecursively(&file);
797
798        std::fs::remove_file(&file).expect("remove");
799
800        rx.wait_ordered_exact([expected(&file).remove_any()]);
801        assert_eq!(
802            watcher.get_watch_handles(),
803            HashSet::from([tmpdir.to_path_buf()])
804        );
805    }
806
807    #[test]
808    fn create_write_overwrite() {
809        let tmpdir = testdir();
810        let (mut watcher, mut rx) = watcher();
811        let overwritten_file = tmpdir.path().join("overwritten_file");
812        let overwriting_file = tmpdir.path().join("overwriting_file");
813        std::fs::write(&overwritten_file, "123").expect("write1");
814
815        watcher.watch_nonrecursively(&tmpdir);
816
817        std::fs::File::create(&overwriting_file).expect("create");
818        std::fs::write(&overwriting_file, "321").expect("write2");
819        std::fs::rename(&overwriting_file, &overwritten_file).expect("rename");
820
821        rx.wait_ordered_exact([
822            expected(&overwriting_file).create_any(),
823            expected(&overwriting_file).modify_any().multiple(),
824            expected(&overwritten_file).remove_any(),
825            expected(&overwriting_file).rename_from(),
826            expected(&overwritten_file).rename_to(),
827        ])
828        .ensure_no_tail();
829        assert_eq!(
830            watcher.get_watch_handles(),
831            HashSet::from([tmpdir.to_path_buf()])
832        );
833    }
834
835    #[test]
836    fn create_dir() {
837        let tmpdir = testdir();
838        let (mut watcher, mut rx) = watcher();
839        watcher.watch_recursively(&tmpdir);
840
841        let path = tmpdir.path().join("entry");
842        std::fs::create_dir(&path).expect("create");
843
844        rx.wait_ordered_exact([expected(&path).create_any()])
845            .ensure_no_tail();
846        assert_eq!(
847            watcher.get_watch_handles(),
848            HashSet::from([tmpdir.to_path_buf()])
849        );
850    }
851
852    #[test]
853    fn chmod_dir() {
854        let tmpdir = testdir();
855        let (mut watcher, mut rx) = watcher();
856
857        let path = tmpdir.path().join("entry");
858        std::fs::create_dir(&path).expect("create_dir");
859        let mut permissions = std::fs::metadata(&path).expect("metadata").permissions();
860        permissions.set_readonly(true);
861
862        watcher.watch_recursively(&tmpdir);
863        std::fs::set_permissions(&path, permissions).expect("set_permissions");
864
865        rx.wait_ordered_exact([expected(&path).modify_any()])
866            .ensure_no_tail();
867        assert_eq!(
868            watcher.get_watch_handles(),
869            HashSet::from([tmpdir.to_path_buf()])
870        );
871    }
872
873    #[test]
874    fn rename_dir() {
875        let tmpdir = testdir();
876        let (mut watcher, mut rx) = watcher();
877
878        let path = tmpdir.path().join("entry");
879        let new_path = tmpdir.path().join("new_path");
880        std::fs::create_dir(&path).expect("create_dir");
881
882        watcher.watch_recursively(&tmpdir);
883
884        std::fs::rename(&path, &new_path).expect("rename");
885
886        rx.wait_ordered_exact([
887            expected(&path).rename_from(),
888            expected(&new_path).rename_to(),
889        ])
890        .ensure_no_tail();
891        assert_eq!(
892            watcher.get_watch_handles(),
893            HashSet::from([tmpdir.to_path_buf()])
894        );
895    }
896
897    #[test]
898    fn delete_dir() {
899        let tmpdir = testdir();
900        let (mut watcher, mut rx) = watcher();
901
902        let path = tmpdir.path().join("entry");
903        std::fs::create_dir(&path).expect("create_dir");
904
905        watcher.watch_recursively(&tmpdir);
906        std::fs::remove_dir(&path).expect("remove");
907
908        rx.wait_ordered_exact([expected(&path).remove_any()])
909            .ensure_no_tail();
910        assert_eq!(
911            watcher.get_watch_handles(),
912            HashSet::from([tmpdir.to_path_buf()])
913        );
914    }
915
916    #[test]
917    fn rename_dir_twice() {
918        let tmpdir = testdir();
919        let (mut watcher, mut rx) = watcher();
920
921        let path = tmpdir.path().join("entry");
922        let new_path = tmpdir.path().join("new_path");
923        let new_path2 = tmpdir.path().join("new_path2");
924        std::fs::create_dir(&path).expect("create_dir");
925
926        watcher.watch_recursively(&tmpdir);
927        std::fs::rename(&path, &new_path).expect("rename");
928        std::fs::rename(&new_path, &new_path2).expect("rename2");
929
930        rx.wait_ordered_exact([
931            expected(&path).rename_from(),
932            expected(&new_path).rename_to(),
933            expected(&new_path).rename_from(),
934            expected(&new_path2).rename_to(),
935        ])
936        .ensure_no_tail();
937        assert_eq!(
938            watcher.get_watch_handles(),
939            HashSet::from([tmpdir.to_path_buf()])
940        );
941    }
942
943    #[test]
944    fn move_out_of_watched_dir() {
945        let tmpdir = testdir();
946        let subdir = tmpdir.path().join("subdir");
947        let (mut watcher, mut rx) = watcher();
948
949        let path = subdir.join("entry");
950        std::fs::create_dir_all(&subdir).expect("create_dir_all");
951        std::fs::File::create_new(&path).expect("create");
952
953        watcher.watch_recursively(&subdir);
954        let new_path = tmpdir.path().join("entry");
955
956        std::fs::rename(&path, &new_path).expect("rename");
957
958        let event = rx.recv();
959        assert_eq!(event, expected(path).remove_any());
960        rx.ensure_empty();
961        assert_eq!(watcher.get_watch_handles(), HashSet::from([subdir]));
962    }
963
964    #[test]
965    fn create_write_write_rename_write_remove() {
966        let tmpdir = testdir();
967        let (mut watcher, mut rx) = watcher();
968
969        let file1 = tmpdir.path().join("entry");
970        let file2 = tmpdir.path().join("entry2");
971        std::fs::File::create_new(&file2).expect("create file2");
972        let new_path = tmpdir.path().join("renamed");
973
974        watcher.watch_recursively(&tmpdir);
975        std::fs::write(&file1, "123").expect("write 1");
976        std::fs::write(&file2, "321").expect("write 2");
977        std::fs::rename(&file1, &new_path).expect("rename");
978        std::fs::write(&new_path, b"1").expect("write 3");
979        std::fs::remove_file(&new_path).expect("remove");
980
981        rx.wait_ordered_exact([
982            expected(&file1).create_any(),
983            expected(&file1).modify_any().multiple(),
984            expected(&file2).modify_any().multiple(),
985            expected(&file1).rename_from(),
986            expected(&new_path).rename_to(),
987            expected(&new_path).modify_any().multiple(),
988            expected(&new_path).remove_any(),
989        ]);
990        assert_eq!(
991            watcher.get_watch_handles(),
992            HashSet::from([tmpdir.to_path_buf()])
993        );
994    }
995
996    #[test]
997    fn rename_twice() {
998        let tmpdir = testdir();
999        let (mut watcher, mut rx) = watcher();
1000
1001        let path = tmpdir.path().join("entry");
1002        std::fs::File::create_new(&path).expect("create");
1003
1004        watcher.watch_recursively(&tmpdir);
1005        let new_path1 = tmpdir.path().join("renamed1");
1006        let new_path2 = tmpdir.path().join("renamed2");
1007
1008        std::fs::rename(&path, &new_path1).expect("rename1");
1009        std::fs::rename(&new_path1, &new_path2).expect("rename2");
1010
1011        rx.wait_ordered_exact([
1012            expected(&path).rename_from(),
1013            expected(&new_path1).rename_to(),
1014            expected(&new_path1).rename_from(),
1015            expected(&new_path2).rename_to(),
1016        ])
1017        .ensure_no_tail();
1018        assert_eq!(
1019            watcher.get_watch_handles(),
1020            HashSet::from([tmpdir.to_path_buf()])
1021        );
1022    }
1023
1024    #[test]
1025    fn set_file_mtime() {
1026        let tmpdir = testdir();
1027        let (mut watcher, mut rx) = watcher();
1028
1029        let path = tmpdir.path().join("entry");
1030        let file = std::fs::File::create_new(&path).expect("create");
1031
1032        watcher.watch_recursively(&tmpdir);
1033
1034        file.set_modified(
1035            std::time::SystemTime::now()
1036                .checked_sub(Duration::from_secs(60 * 60))
1037                .expect("time"),
1038        )
1039        .expect("set_time");
1040
1041        assert_eq!(rx.recv(), expected(&path).modify_any());
1042        rx.ensure_empty();
1043    }
1044
1045    #[test]
1046    fn write_file_non_recursive_watch() {
1047        let tmpdir = testdir();
1048        let (mut watcher, mut rx) = watcher();
1049
1050        let path = tmpdir.path().join("entry");
1051        std::fs::File::create_new(&path).expect("create");
1052
1053        watcher.watch_nonrecursively(&path);
1054
1055        std::fs::write(&path, b"123").expect("write");
1056
1057        rx.wait_ordered_exact([expected(&path).modify_any().multiple()])
1058            .ensure_no_tail();
1059        assert_eq!(
1060            watcher.get_watch_handles(),
1061            HashSet::from([tmpdir.to_path_buf()])
1062        );
1063    }
1064
1065    #[test]
1066    fn write_to_a_hardlink_pointed_to_the_file_in_the_watched_dir_doesnt_trigger_an_event() {
1067        let tmpdir = testdir();
1068        let (mut watcher, mut rx) = watcher();
1069
1070        let subdir = tmpdir.path().join("subdir");
1071        let file = subdir.join("file");
1072        let hardlink = tmpdir.path().join("hardlink");
1073
1074        std::fs::create_dir(&subdir).expect("create");
1075        std::fs::write(&file, "").expect("file");
1076        std::fs::hard_link(&file, &hardlink).expect("hardlink");
1077
1078        watcher.watch_nonrecursively(&subdir);
1079
1080        std::fs::write(&hardlink, "123123").expect("write to the hard link");
1081
1082        let events = rx.iter().collect::<Vec<_>>();
1083        assert!(events.is_empty(), "unexpected events: {events:#?}");
1084        assert_eq!(
1085            watcher.get_watch_handles(),
1086            HashSet::from([subdir.to_path_buf()])
1087        );
1088    }
1089
1090    #[test]
1091    fn recursive_creation() {
1092        let tmpdir = testdir();
1093        let nested1 = tmpdir.path().join("1");
1094        let nested2 = tmpdir.path().join("1/2");
1095        let nested3 = tmpdir.path().join("1/2/3");
1096        let nested4 = tmpdir.path().join("1/2/3/4");
1097        let nested5 = tmpdir.path().join("1/2/3/4/5");
1098        let nested6 = tmpdir.path().join("1/2/3/4/5/6");
1099        let nested7 = tmpdir.path().join("1/2/3/4/5/6/7");
1100        let nested8 = tmpdir.path().join("1/2/3/4/5/6/7/8");
1101        let nested9 = tmpdir.path().join("1/2/3/4/5/6/7/8/9");
1102
1103        let (mut watcher, mut rx) = watcher();
1104
1105        watcher.watch_recursively(&tmpdir);
1106
1107        std::fs::create_dir_all(&nested9).expect("create_dir_all");
1108        rx.wait_ordered_exact([
1109            expected(&nested1).create_any(),
1110            expected(&nested2).create_any(),
1111            expected(&nested3).create_any(),
1112            expected(&nested4).create_any(),
1113            expected(&nested5).create_any(),
1114            expected(&nested6).create_any(),
1115            expected(&nested7).create_any(),
1116            expected(&nested8).create_any(),
1117            expected(&nested9).create_any(),
1118        ])
1119        .ensure_no_tail();
1120        assert_eq!(
1121            watcher.get_watch_handles(),
1122            HashSet::from([tmpdir.to_path_buf()])
1123        );
1124    }
1125
1126    #[test]
1127    fn upgrade_to_recursive() {
1128        let tmpdir = testdir();
1129        let (mut watcher, mut rx) = watcher();
1130
1131        let path = tmpdir.path().join("upgrade");
1132        let deep = tmpdir.path().join("upgrade/deep");
1133        let file = tmpdir.path().join("upgrade/deep/file");
1134        std::fs::create_dir_all(&deep).expect("create_dir");
1135
1136        watcher.watch_nonrecursively(&path);
1137        std::fs::File::create_new(&file).expect("create");
1138        std::fs::remove_file(&file).expect("delete");
1139
1140        watcher.watch_recursively(&path);
1141        std::fs::File::create_new(&file).expect("create");
1142
1143        rx.wait_ordered_exact([expected(&deep).modify_any(), expected(&file).create_any()])
1144            .ensure_no_tail();
1145        assert_eq!(watcher.get_watch_handles(), HashSet::from([path]));
1146    }
1147}