notify_win/
windows.rs

1#![allow(missing_docs)]
2//! Watcher implementation for Windows' directory management APIs
3//!
4//! For more information see the [ReadDirectoryChangesW reference][ref].
5//!
6//! [ref]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa363950(v=vs.85).aspx
7
8use crate::{bounded, unbounded, BoundSender, Config, Receiver, Sender};
9use crate::{event::*, WatcherKind};
10use crate::{Error, EventHandler, RecursiveMode, Result, Watcher};
11use std::alloc;
12use std::collections::HashMap;
13use std::env;
14use std::ffi::OsString;
15use std::os::raw::c_void;
16use std::os::windows::ffi::{OsStrExt, OsStringExt};
17use std::path::{Path, PathBuf};
18use std::ptr;
19use std::slice;
20use std::sync::{Arc, Mutex};
21use std::thread;
22use windows_sys::Win32::Foundation::{
23    CloseHandle, ERROR_OPERATION_ABORTED, HANDLE, INVALID_HANDLE_VALUE, WAIT_OBJECT_0,
24};
25use windows_sys::Win32::Storage::FileSystem::{
26    CreateFileW, ReadDirectoryChangesW, FILE_ACTION_ADDED, FILE_ACTION_MODIFIED,
27    FILE_ACTION_REMOVED, FILE_ACTION_RENAMED_NEW_NAME, FILE_ACTION_RENAMED_OLD_NAME,
28    FILE_FLAG_BACKUP_SEMANTICS, FILE_FLAG_OVERLAPPED, FILE_LIST_DIRECTORY,
29    FILE_NOTIFY_CHANGE_ATTRIBUTES, FILE_NOTIFY_CHANGE_CREATION, FILE_NOTIFY_CHANGE_DIR_NAME,
30    FILE_NOTIFY_CHANGE_FILE_NAME, FILE_NOTIFY_CHANGE_LAST_WRITE, FILE_NOTIFY_CHANGE_SECURITY,
31    FILE_NOTIFY_CHANGE_SIZE, FILE_NOTIFY_INFORMATION, FILE_SHARE_DELETE, FILE_SHARE_READ,
32    FILE_SHARE_WRITE, OPEN_EXISTING,
33};
34use windows_sys::Win32::System::Threading::{
35    CreateSemaphoreW, ReleaseSemaphore, WaitForSingleObjectEx, INFINITE,
36};
37use windows_sys::Win32::System::IO::{CancelIo, OVERLAPPED};
38
39const BUF_SIZE: u32 = 16384;
40
41#[derive(Clone)]
42struct ReadData {
43    dir: PathBuf,          // directory that is being watched
44    file: Option<PathBuf>, // if a file is being watched, this is its full path
45    complete_sem: HANDLE,
46    is_recursive: bool,
47}
48
49struct ReadDirectoryRequest {
50    event_handler: Arc<Mutex<dyn EventHandler>>,
51    buffer: [u8; BUF_SIZE as usize],
52    handle: HANDLE,
53    data: ReadData,
54}
55
56enum Action {
57    Watch(PathBuf, RecursiveMode),
58    Unwatch(PathBuf),
59    Stop,
60    Configure(Config, BoundSender<Result<bool>>),
61}
62
63#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
64pub enum MetaEvent {
65    SingleWatchComplete,
66    WatcherAwakened,
67}
68
69struct WatchState {
70    dir_handle: HANDLE,
71    complete_sem: HANDLE,
72}
73
74struct ReadDirectoryChangesServer {
75    rx: Receiver<Action>,
76    event_handler: Arc<Mutex<dyn EventHandler>>,
77    meta_tx: Sender<MetaEvent>,
78    cmd_tx: Sender<Result<PathBuf>>,
79    watches: HashMap<PathBuf, WatchState>,
80    wakeup_sem: HANDLE,
81}
82
83impl ReadDirectoryChangesServer {
84    fn start(
85        event_handler: Arc<Mutex<dyn EventHandler>>,
86        meta_tx: Sender<MetaEvent>,
87        cmd_tx: Sender<Result<PathBuf>>,
88        wakeup_sem: HANDLE,
89    ) -> Sender<Action> {
90        let (action_tx, action_rx) = unbounded();
91        // it is, in fact, ok to send the semaphore across threads
92        let sem_temp = wakeup_sem as u64;
93        let _ = thread::Builder::new()
94            .name("notify-win windows loop".to_string())
95            .spawn(move || {
96                let wakeup_sem = sem_temp as HANDLE;
97                let server = ReadDirectoryChangesServer {
98                    rx: action_rx,
99                    event_handler,
100                    meta_tx,
101                    cmd_tx,
102                    watches: HashMap::new(),
103                    wakeup_sem,
104                };
105                server.run();
106            });
107        action_tx
108    }
109
110    fn run(mut self) {
111        loop {
112            // process all available actions first
113            let mut stopped = false;
114
115            while let Ok(action) = self.rx.try_recv() {
116                match action {
117                    Action::Watch(path, recursive_mode) => {
118                        let res = self.add_watch(path, recursive_mode.is_recursive());
119                        let _ = self.cmd_tx.send(res);
120                    }
121                    Action::Unwatch(path) => self.remove_watch(path),
122                    Action::Stop => {
123                        stopped = true;
124                        for ws in self.watches.values() {
125                            stop_watch(ws, &self.meta_tx);
126                        }
127                        break;
128                    }
129                    Action::Configure(config, tx) => {
130                        self.configure_raw_mode(config, tx);
131                    }
132                }
133            }
134
135            if stopped {
136                break;
137            }
138
139            unsafe {
140                // wait with alertable flag so that the completion routine fires
141                let waitres = WaitForSingleObjectEx(self.wakeup_sem, 100, 1);
142                if waitres == WAIT_OBJECT_0 {
143                    let _ = self.meta_tx.send(MetaEvent::WatcherAwakened);
144                }
145            }
146        }
147
148        // we have to clean this up, since the watcher may be long gone
149        unsafe {
150            CloseHandle(self.wakeup_sem);
151        }
152    }
153
154    fn add_watch(&mut self, path: PathBuf, is_recursive: bool) -> Result<PathBuf> {
155        // path must exist and be either a file or directory
156        if !path.is_dir() && !path.is_file() {
157            return Err(
158                Error::generic("Input watch path is neither a file nor a directory.")
159                    .add_path(path),
160            );
161        }
162
163        let (watching_file, dir_target) = {
164            if path.is_dir() {
165                (false, path.clone())
166            } else {
167                // emulate file watching by watching the parent directory
168                (true, path.parent().unwrap().to_path_buf())
169            }
170        };
171
172        let encoded_path: Vec<u16> = dir_target
173            .as_os_str()
174            .encode_wide()
175            .chain(Some(0))
176            .collect();
177        let handle;
178        unsafe {
179            handle = CreateFileW(
180                encoded_path.as_ptr(),
181                FILE_LIST_DIRECTORY,
182                FILE_SHARE_READ | FILE_SHARE_DELETE | FILE_SHARE_WRITE,
183                ptr::null_mut(),
184                OPEN_EXISTING,
185                FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
186                ptr::null_mut(),
187            );
188
189            if handle == INVALID_HANDLE_VALUE {
190                return Err(if watching_file {
191                    Error::generic(
192                        "You attempted to watch a single file, but parent \
193                         directory could not be opened.",
194                    )
195                    .add_path(path)
196                } else {
197                    // TODO: Call GetLastError for better error info?
198                    Error::path_not_found().add_path(path)
199                });
200            }
201        }
202        let wf = if watching_file {
203            Some(path.clone())
204        } else {
205            None
206        };
207        // every watcher gets its own semaphore to signal completion
208        let semaphore = unsafe { CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
209        if semaphore.is_null() || semaphore == INVALID_HANDLE_VALUE {
210            unsafe {
211                CloseHandle(handle);
212            }
213            return Err(Error::generic("Failed to create semaphore for watch.").add_path(path));
214        }
215        let rd = ReadData {
216            dir: dir_target,
217            file: wf,
218            complete_sem: semaphore,
219            is_recursive,
220        };
221        let ws = WatchState {
222            dir_handle: handle,
223            complete_sem: semaphore,
224        };
225        self.watches.insert(path.clone(), ws);
226        start_read(&rd, self.event_handler.clone(), handle);
227        Ok(path)
228    }
229
230    fn remove_watch(&mut self, path: PathBuf) {
231        if let Some(ws) = self.watches.remove(&path) {
232            stop_watch(&ws, &self.meta_tx);
233        }
234    }
235
236    fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
237        tx.send(Ok(false))
238            .expect("configuration channel disconnect");
239    }
240}
241
242fn stop_watch(ws: &WatchState, meta_tx: &Sender<MetaEvent>) {
243    unsafe {
244        let cio = CancelIo(ws.dir_handle);
245        let ch = CloseHandle(ws.dir_handle);
246        // have to wait for it, otherwise we leak the memory allocated for there read request
247        if cio != 0 && ch != 0 {
248            while WaitForSingleObjectEx(ws.complete_sem, INFINITE, 1) != WAIT_OBJECT_0 {
249                // drain the apc queue
250            }
251        }
252        CloseHandle(ws.complete_sem);
253    }
254    let _ = meta_tx.send(MetaEvent::SingleWatchComplete);
255}
256
257fn start_read(rd: &ReadData, event_handler: Arc<Mutex<dyn EventHandler>>, handle: HANDLE) {
258    let request = Box::new(ReadDirectoryRequest {
259        event_handler,
260        handle,
261        buffer: [0u8; BUF_SIZE as usize],
262        data: rd.clone(),
263    });
264
265    let flags = FILE_NOTIFY_CHANGE_FILE_NAME
266        | FILE_NOTIFY_CHANGE_DIR_NAME
267        | FILE_NOTIFY_CHANGE_ATTRIBUTES
268        | FILE_NOTIFY_CHANGE_SIZE
269        | FILE_NOTIFY_CHANGE_LAST_WRITE
270        | FILE_NOTIFY_CHANGE_CREATION
271        | FILE_NOTIFY_CHANGE_SECURITY;
272
273    let monitor_subdir = if request.data.file.is_none() && request.data.is_recursive {
274        1
275    } else {
276        0
277    };
278
279    unsafe {
280        let overlapped = alloc::alloc_zeroed(alloc::Layout::new::<OVERLAPPED>()) as *mut OVERLAPPED;
281        // When using callback based async requests, we are allowed to use the hEvent member
282        // for our own purposes
283
284        let request = Box::leak(request);
285        (*overlapped).hEvent = request as *mut _ as _;
286
287        // This is using an asynchronous call with a completion routine for receiving notifications
288        // An I/O completion port would probably be more performant
289        let ret = ReadDirectoryChangesW(
290            handle,
291            request.buffer.as_mut_ptr() as *mut c_void,
292            BUF_SIZE,
293            monitor_subdir,
294            flags,
295            &mut 0u32 as *mut u32, // not used for async reqs
296            overlapped,
297            Some(handle_event),
298        );
299
300        if ret == 0 {
301            // error reading. retransmute request memory to allow drop.
302            // Because of the error, ownership of the `overlapped` alloc was not passed
303            // over to `ReadDirectoryChangesW`.
304            // So we can claim ownership back.
305            let _overlapped = Box::from_raw(overlapped);
306            let request = Box::from_raw(request);
307            ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
308        }
309    }
310}
311
312unsafe extern "system" fn handle_event(
313    error_code: u32,
314    _bytes_written: u32,
315    overlapped: *mut OVERLAPPED,
316) {
317    let overlapped: Box<OVERLAPPED> = Box::from_raw(overlapped);
318    let request: Box<ReadDirectoryRequest> = Box::from_raw(overlapped.hEvent as *mut _);
319
320    if error_code == ERROR_OPERATION_ABORTED {
321        // received when dir is unwatched or watcher is shutdown; return and let overlapped/request
322        // get drop-cleaned
323        ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
324        return;
325    }
326
327    // Get the next request queued up as soon as possible
328    start_read(&request.data, request.event_handler.clone(), request.handle);
329
330    // The FILE_NOTIFY_INFORMATION struct has a variable length due to the variable length
331    // string as its last member. Each struct contains an offset for getting the next entry in
332    // the buffer.
333    let mut cur_offset: *const u8 = request.buffer.as_ptr();
334    // In Wine, FILE_NOTIFY_INFORMATION structs are packed placed in the buffer;
335    // they are aligned to 16bit (WCHAR) boundary instead of 32bit required by FILE_NOTIFY_INFORMATION.
336    // Hence, we need to use `read_unaligned` here to avoid UB.
337    let mut cur_entry = ptr::read_unaligned(cur_offset as *const FILE_NOTIFY_INFORMATION);
338    loop {
339        // filename length is size in bytes, so / 2
340        let len = cur_entry.FileNameLength as usize / 2;
341        let encoded_path: &[u16] = slice::from_raw_parts(
342            cur_offset.add(std::mem::offset_of!(FILE_NOTIFY_INFORMATION, FileName)) as _,
343            len,
344        );
345        // prepend root to get a full path
346        let path = request
347            .data
348            .dir
349            .join(PathBuf::from(OsString::from_wide(encoded_path)));
350
351        // if we are watching a single file, ignore the event unless the path is exactly
352        // the watched file
353        let skip = match request.data.file {
354            None => false,
355            Some(ref watch_path) => *watch_path != path,
356        };
357
358        if !skip {
359            log::trace!(
360                "Event: path = `{}`, action = {:?}",
361                path.display(),
362                cur_entry.Action
363            );
364
365            let newe = Event::new(EventKind::Any).add_path(path);
366
367            fn emit_event(event_handler: &Mutex<dyn EventHandler>, res: Result<Event>) {
368                if let Ok(mut guard) = event_handler.lock() {
369                    let f: &mut dyn EventHandler = &mut *guard;
370                    f.handle_event(res);
371                }
372            }
373
374            let event_handler = |res| emit_event(&request.event_handler, res);
375            if cur_entry.Action == FILE_ACTION_RENAMED_OLD_NAME {
376                let mode = RenameMode::From;
377                let kind = ModifyKind::Name(mode);
378                let kind = EventKind::Modify(kind);
379                let ev = newe.set_kind(kind);
380                event_handler(Ok(ev))
381            } else {
382                match cur_entry.Action {
383                    FILE_ACTION_RENAMED_NEW_NAME => {
384                        let kind = EventKind::Modify(ModifyKind::Name(RenameMode::To));
385                        let ev = newe.set_kind(kind);
386                        event_handler(Ok(ev));
387                    }
388                    FILE_ACTION_ADDED => {
389                        let kind = EventKind::Create(CreateKind::Any);
390                        let ev = newe.set_kind(kind);
391                        event_handler(Ok(ev));
392                    }
393                    FILE_ACTION_REMOVED => {
394                        let kind = EventKind::Remove(RemoveKind::Any);
395                        let ev = newe.set_kind(kind);
396                        event_handler(Ok(ev));
397                    }
398                    FILE_ACTION_MODIFIED => {
399                        let kind = EventKind::Modify(ModifyKind::Any);
400                        let ev = newe.set_kind(kind);
401                        event_handler(Ok(ev));
402                    }
403                    _ => (),
404                };
405            }
406        }
407
408        if cur_entry.NextEntryOffset == 0 {
409            break;
410        }
411        cur_offset = cur_offset.offset(cur_entry.NextEntryOffset as isize);
412        cur_entry = ptr::read_unaligned(cur_offset as *const FILE_NOTIFY_INFORMATION);
413    }
414}
415
416/// Watcher implementation based on ReadDirectoryChanges
417#[derive(Debug)]
418pub struct ReadDirectoryChangesWatcher {
419    tx: Sender<Action>,
420    cmd_rx: Receiver<Result<PathBuf>>,
421    wakeup_sem: HANDLE,
422}
423
424impl ReadDirectoryChangesWatcher {
425    pub fn create(
426        event_handler: Arc<Mutex<dyn EventHandler>>,
427        meta_tx: Sender<MetaEvent>,
428    ) -> Result<ReadDirectoryChangesWatcher> {
429        let (cmd_tx, cmd_rx) = unbounded();
430
431        let wakeup_sem = unsafe { CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
432        if wakeup_sem.is_null() || wakeup_sem == INVALID_HANDLE_VALUE {
433            return Err(Error::generic("Failed to create wakeup semaphore."));
434        }
435
436        let action_tx =
437            ReadDirectoryChangesServer::start(event_handler, meta_tx, cmd_tx, wakeup_sem);
438
439        Ok(ReadDirectoryChangesWatcher {
440            tx: action_tx,
441            cmd_rx,
442            wakeup_sem,
443        })
444    }
445
446    fn wakeup_server(&mut self) {
447        // breaks the server out of its wait state.  right now this is really just an optimization,
448        // so that if you add a watch you don't block for 100ms in watch() while the
449        // server sleeps.
450        unsafe {
451            ReleaseSemaphore(self.wakeup_sem, 1, ptr::null_mut());
452        }
453    }
454
455    fn send_action_require_ack(&mut self, action: Action, pb: &PathBuf) -> Result<()> {
456        self.tx
457            .send(action)
458            .map_err(|_| Error::generic("Error sending to internal channel"))?;
459
460        // wake 'em up, we don't want to wait around for the ack
461        self.wakeup_server();
462
463        let ack_pb = self
464            .cmd_rx
465            .recv()
466            .map_err(|_| Error::generic("Error receiving from command channel"))?
467            .map_err(|e| Error::generic(&format!("Error in watcher: {:?}", e)))?;
468
469        if pb.as_path() != ack_pb.as_path() {
470            Err(Error::generic(&format!(
471                "Expected ack for {:?} but got \
472                 ack for {:?}",
473                pb, ack_pb
474            )))
475        } else {
476            Ok(())
477        }
478    }
479
480    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
481        let pb = if path.is_absolute() {
482            path.to_owned()
483        } else {
484            let p = env::current_dir().map_err(Error::io)?;
485            p.join(path)
486        };
487        // path must exist and be either a file or directory
488        if !pb.is_dir() && !pb.is_file() {
489            return Err(Error::generic(
490                "Input watch path is neither a file nor a directory.",
491            ));
492        }
493        self.send_action_require_ack(Action::Watch(pb.clone(), recursive_mode), &pb)
494    }
495
496    fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
497        let pb = if path.is_absolute() {
498            path.to_owned()
499        } else {
500            let p = env::current_dir().map_err(Error::io)?;
501            p.join(path)
502        };
503        let res = self
504            .tx
505            .send(Action::Unwatch(pb))
506            .map_err(|_| Error::generic("Error sending to internal channel"));
507        self.wakeup_server();
508        res
509    }
510}
511
512impl Watcher for ReadDirectoryChangesWatcher {
513    fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> {
514        // create dummy channel for meta event
515        // TODO: determine the original purpose of this - can we remove it?
516        let (meta_tx, _) = unbounded();
517        let event_handler = Arc::new(Mutex::new(event_handler));
518        Self::create(event_handler, meta_tx)
519    }
520
521    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
522        self.watch_inner(path, recursive_mode)
523    }
524
525    fn unwatch(&mut self, path: &Path) -> Result<()> {
526        self.unwatch_inner(path)
527    }
528
529    fn configure(&mut self, config: Config) -> Result<bool> {
530        let (tx, rx) = bounded(1);
531        self.tx.send(Action::Configure(config, tx))?;
532        rx.recv()?
533    }
534
535    fn kind() -> crate::WatcherKind {
536        WatcherKind::ReadDirectoryChangesWatcher
537    }
538}
539
540impl Drop for ReadDirectoryChangesWatcher {
541    fn drop(&mut self) {
542        let _ = self.tx.send(Action::Stop);
543        // better wake it up
544        self.wakeup_server();
545    }
546}
547
548// `ReadDirectoryChangesWatcher` is not Send/Sync because of the semaphore Handle.
549// As said elsewhere it's perfectly safe to send it across threads.
550unsafe impl Send for ReadDirectoryChangesWatcher {}
551// Because all public methods are `&mut self` it's also perfectly safe to share references.
552unsafe impl Sync for ReadDirectoryChangesWatcher {}