1#![allow(missing_docs)]
2use crate::{bounded, unbounded, BoundSender, Config, Receiver, Sender};
9use crate::{event::*, WatcherKind};
10use crate::{Error, EventHandler, RecursiveMode, Result, Watcher};
11use std::alloc;
12use std::collections::HashMap;
13use std::env;
14use std::ffi::OsString;
15use std::os::raw::c_void;
16use std::os::windows::ffi::{OsStrExt, OsStringExt};
17use std::path::{Path, PathBuf};
18use std::ptr;
19use std::slice;
20use std::sync::{Arc, Mutex};
21use std::thread;
22use windows_sys::Win32::Foundation::{
23 CloseHandle, ERROR_OPERATION_ABORTED, HANDLE, INVALID_HANDLE_VALUE, WAIT_OBJECT_0,
24};
25use windows_sys::Win32::Storage::FileSystem::{
26 CreateFileW, ReadDirectoryChangesW, FILE_ACTION_ADDED, FILE_ACTION_MODIFIED,
27 FILE_ACTION_REMOVED, FILE_ACTION_RENAMED_NEW_NAME, FILE_ACTION_RENAMED_OLD_NAME,
28 FILE_FLAG_BACKUP_SEMANTICS, FILE_FLAG_OVERLAPPED, FILE_LIST_DIRECTORY,
29 FILE_NOTIFY_CHANGE_ATTRIBUTES, FILE_NOTIFY_CHANGE_CREATION, FILE_NOTIFY_CHANGE_DIR_NAME,
30 FILE_NOTIFY_CHANGE_FILE_NAME, FILE_NOTIFY_CHANGE_LAST_WRITE, FILE_NOTIFY_CHANGE_SECURITY,
31 FILE_NOTIFY_CHANGE_SIZE, FILE_NOTIFY_INFORMATION, FILE_SHARE_DELETE, FILE_SHARE_READ,
32 FILE_SHARE_WRITE, OPEN_EXISTING,
33};
34use windows_sys::Win32::System::Threading::{
35 CreateSemaphoreW, ReleaseSemaphore, WaitForSingleObjectEx, INFINITE,
36};
37use windows_sys::Win32::System::IO::{CancelIo, OVERLAPPED};
38
39const BUF_SIZE: u32 = 16384;
40
41#[derive(Clone)]
42struct ReadData {
43 dir: PathBuf, file: Option<PathBuf>, complete_sem: HANDLE,
46 is_recursive: bool,
47}
48
49struct ReadDirectoryRequest {
50 event_handler: Arc<Mutex<dyn EventHandler>>,
51 buffer: [u8; BUF_SIZE as usize],
52 handle: HANDLE,
53 data: ReadData,
54}
55
56enum Action {
57 Watch(PathBuf, RecursiveMode),
58 Unwatch(PathBuf),
59 Stop,
60 Configure(Config, BoundSender<Result<bool>>),
61}
62
63#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
64pub enum MetaEvent {
65 SingleWatchComplete,
66 WatcherAwakened,
67}
68
69struct WatchState {
70 dir_handle: HANDLE,
71 complete_sem: HANDLE,
72}
73
74struct ReadDirectoryChangesServer {
75 rx: Receiver<Action>,
76 event_handler: Arc<Mutex<dyn EventHandler>>,
77 meta_tx: Sender<MetaEvent>,
78 cmd_tx: Sender<Result<PathBuf>>,
79 watches: HashMap<PathBuf, WatchState>,
80 wakeup_sem: HANDLE,
81}
82
83impl ReadDirectoryChangesServer {
84 fn start(
85 event_handler: Arc<Mutex<dyn EventHandler>>,
86 meta_tx: Sender<MetaEvent>,
87 cmd_tx: Sender<Result<PathBuf>>,
88 wakeup_sem: HANDLE,
89 ) -> Sender<Action> {
90 let (action_tx, action_rx) = unbounded();
91 let sem_temp = wakeup_sem as u64;
93 let _ = thread::Builder::new()
94 .name("notify-win windows loop".to_string())
95 .spawn(move || {
96 let wakeup_sem = sem_temp as HANDLE;
97 let server = ReadDirectoryChangesServer {
98 rx: action_rx,
99 event_handler,
100 meta_tx,
101 cmd_tx,
102 watches: HashMap::new(),
103 wakeup_sem,
104 };
105 server.run();
106 });
107 action_tx
108 }
109
110 fn run(mut self) {
111 loop {
112 let mut stopped = false;
114
115 while let Ok(action) = self.rx.try_recv() {
116 match action {
117 Action::Watch(path, recursive_mode) => {
118 let res = self.add_watch(path, recursive_mode.is_recursive());
119 let _ = self.cmd_tx.send(res);
120 }
121 Action::Unwatch(path) => self.remove_watch(path),
122 Action::Stop => {
123 stopped = true;
124 for ws in self.watches.values() {
125 stop_watch(ws, &self.meta_tx);
126 }
127 break;
128 }
129 Action::Configure(config, tx) => {
130 self.configure_raw_mode(config, tx);
131 }
132 }
133 }
134
135 if stopped {
136 break;
137 }
138
139 unsafe {
140 let waitres = WaitForSingleObjectEx(self.wakeup_sem, 100, 1);
142 if waitres == WAIT_OBJECT_0 {
143 let _ = self.meta_tx.send(MetaEvent::WatcherAwakened);
144 }
145 }
146 }
147
148 unsafe {
150 CloseHandle(self.wakeup_sem);
151 }
152 }
153
154 fn add_watch(&mut self, path: PathBuf, is_recursive: bool) -> Result<PathBuf> {
155 if !path.is_dir() && !path.is_file() {
157 return Err(
158 Error::generic("Input watch path is neither a file nor a directory.")
159 .add_path(path),
160 );
161 }
162
163 let (watching_file, dir_target) = {
164 if path.is_dir() {
165 (false, path.clone())
166 } else {
167 (true, path.parent().unwrap().to_path_buf())
169 }
170 };
171
172 let encoded_path: Vec<u16> = dir_target
173 .as_os_str()
174 .encode_wide()
175 .chain(Some(0))
176 .collect();
177 let handle;
178 unsafe {
179 handle = CreateFileW(
180 encoded_path.as_ptr(),
181 FILE_LIST_DIRECTORY,
182 FILE_SHARE_READ | FILE_SHARE_DELETE | FILE_SHARE_WRITE,
183 ptr::null_mut(),
184 OPEN_EXISTING,
185 FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
186 ptr::null_mut(),
187 );
188
189 if handle == INVALID_HANDLE_VALUE {
190 return Err(if watching_file {
191 Error::generic(
192 "You attempted to watch a single file, but parent \
193 directory could not be opened.",
194 )
195 .add_path(path)
196 } else {
197 Error::path_not_found().add_path(path)
199 });
200 }
201 }
202 let wf = if watching_file {
203 Some(path.clone())
204 } else {
205 None
206 };
207 let semaphore = unsafe { CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
209 if semaphore.is_null() || semaphore == INVALID_HANDLE_VALUE {
210 unsafe {
211 CloseHandle(handle);
212 }
213 return Err(Error::generic("Failed to create semaphore for watch.").add_path(path));
214 }
215 let rd = ReadData {
216 dir: dir_target,
217 file: wf,
218 complete_sem: semaphore,
219 is_recursive,
220 };
221 let ws = WatchState {
222 dir_handle: handle,
223 complete_sem: semaphore,
224 };
225 self.watches.insert(path.clone(), ws);
226 start_read(&rd, self.event_handler.clone(), handle);
227 Ok(path)
228 }
229
230 fn remove_watch(&mut self, path: PathBuf) {
231 if let Some(ws) = self.watches.remove(&path) {
232 stop_watch(&ws, &self.meta_tx);
233 }
234 }
235
236 fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
237 tx.send(Ok(false))
238 .expect("configuration channel disconnect");
239 }
240}
241
242fn stop_watch(ws: &WatchState, meta_tx: &Sender<MetaEvent>) {
243 unsafe {
244 let cio = CancelIo(ws.dir_handle);
245 let ch = CloseHandle(ws.dir_handle);
246 if cio != 0 && ch != 0 {
248 while WaitForSingleObjectEx(ws.complete_sem, INFINITE, 1) != WAIT_OBJECT_0 {
249 }
251 }
252 CloseHandle(ws.complete_sem);
253 }
254 let _ = meta_tx.send(MetaEvent::SingleWatchComplete);
255}
256
257fn start_read(rd: &ReadData, event_handler: Arc<Mutex<dyn EventHandler>>, handle: HANDLE) {
258 let request = Box::new(ReadDirectoryRequest {
259 event_handler,
260 handle,
261 buffer: [0u8; BUF_SIZE as usize],
262 data: rd.clone(),
263 });
264
265 let flags = FILE_NOTIFY_CHANGE_FILE_NAME
266 | FILE_NOTIFY_CHANGE_DIR_NAME
267 | FILE_NOTIFY_CHANGE_ATTRIBUTES
268 | FILE_NOTIFY_CHANGE_SIZE
269 | FILE_NOTIFY_CHANGE_LAST_WRITE
270 | FILE_NOTIFY_CHANGE_CREATION
271 | FILE_NOTIFY_CHANGE_SECURITY;
272
273 let monitor_subdir = if request.data.file.is_none() && request.data.is_recursive {
274 1
275 } else {
276 0
277 };
278
279 unsafe {
280 let overlapped = alloc::alloc_zeroed(alloc::Layout::new::<OVERLAPPED>()) as *mut OVERLAPPED;
281 let request = Box::leak(request);
285 (*overlapped).hEvent = request as *mut _ as _;
286
287 let ret = ReadDirectoryChangesW(
290 handle,
291 request.buffer.as_mut_ptr() as *mut c_void,
292 BUF_SIZE,
293 monitor_subdir,
294 flags,
295 &mut 0u32 as *mut u32, overlapped,
297 Some(handle_event),
298 );
299
300 if ret == 0 {
301 let _overlapped = Box::from_raw(overlapped);
306 let request = Box::from_raw(request);
307 ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
308 }
309 }
310}
311
312unsafe extern "system" fn handle_event(
313 error_code: u32,
314 _bytes_written: u32,
315 overlapped: *mut OVERLAPPED,
316) {
317 let overlapped: Box<OVERLAPPED> = Box::from_raw(overlapped);
318 let request: Box<ReadDirectoryRequest> = Box::from_raw(overlapped.hEvent as *mut _);
319
320 if error_code == ERROR_OPERATION_ABORTED {
321 ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
324 return;
325 }
326
327 start_read(&request.data, request.event_handler.clone(), request.handle);
329
330 let mut cur_offset: *const u8 = request.buffer.as_ptr();
334 let mut cur_entry = ptr::read_unaligned(cur_offset as *const FILE_NOTIFY_INFORMATION);
338 loop {
339 let len = cur_entry.FileNameLength as usize / 2;
341 let encoded_path: &[u16] = slice::from_raw_parts(
342 cur_offset.add(std::mem::offset_of!(FILE_NOTIFY_INFORMATION, FileName)) as _,
343 len,
344 );
345 let path = request
347 .data
348 .dir
349 .join(PathBuf::from(OsString::from_wide(encoded_path)));
350
351 let skip = match request.data.file {
354 None => false,
355 Some(ref watch_path) => *watch_path != path,
356 };
357
358 if !skip {
359 log::trace!(
360 "Event: path = `{}`, action = {:?}",
361 path.display(),
362 cur_entry.Action
363 );
364
365 let newe = Event::new(EventKind::Any).add_path(path);
366
367 fn emit_event(event_handler: &Mutex<dyn EventHandler>, res: Result<Event>) {
368 if let Ok(mut guard) = event_handler.lock() {
369 let f: &mut dyn EventHandler = &mut *guard;
370 f.handle_event(res);
371 }
372 }
373
374 let event_handler = |res| emit_event(&request.event_handler, res);
375 if cur_entry.Action == FILE_ACTION_RENAMED_OLD_NAME {
376 let mode = RenameMode::From;
377 let kind = ModifyKind::Name(mode);
378 let kind = EventKind::Modify(kind);
379 let ev = newe.set_kind(kind);
380 event_handler(Ok(ev))
381 } else {
382 match cur_entry.Action {
383 FILE_ACTION_RENAMED_NEW_NAME => {
384 let kind = EventKind::Modify(ModifyKind::Name(RenameMode::To));
385 let ev = newe.set_kind(kind);
386 event_handler(Ok(ev));
387 }
388 FILE_ACTION_ADDED => {
389 let kind = EventKind::Create(CreateKind::Any);
390 let ev = newe.set_kind(kind);
391 event_handler(Ok(ev));
392 }
393 FILE_ACTION_REMOVED => {
394 let kind = EventKind::Remove(RemoveKind::Any);
395 let ev = newe.set_kind(kind);
396 event_handler(Ok(ev));
397 }
398 FILE_ACTION_MODIFIED => {
399 let kind = EventKind::Modify(ModifyKind::Any);
400 let ev = newe.set_kind(kind);
401 event_handler(Ok(ev));
402 }
403 _ => (),
404 };
405 }
406 }
407
408 if cur_entry.NextEntryOffset == 0 {
409 break;
410 }
411 cur_offset = cur_offset.offset(cur_entry.NextEntryOffset as isize);
412 cur_entry = ptr::read_unaligned(cur_offset as *const FILE_NOTIFY_INFORMATION);
413 }
414}
415
416#[derive(Debug)]
418pub struct ReadDirectoryChangesWatcher {
419 tx: Sender<Action>,
420 cmd_rx: Receiver<Result<PathBuf>>,
421 wakeup_sem: HANDLE,
422}
423
424impl ReadDirectoryChangesWatcher {
425 pub fn create(
426 event_handler: Arc<Mutex<dyn EventHandler>>,
427 meta_tx: Sender<MetaEvent>,
428 ) -> Result<ReadDirectoryChangesWatcher> {
429 let (cmd_tx, cmd_rx) = unbounded();
430
431 let wakeup_sem = unsafe { CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
432 if wakeup_sem.is_null() || wakeup_sem == INVALID_HANDLE_VALUE {
433 return Err(Error::generic("Failed to create wakeup semaphore."));
434 }
435
436 let action_tx =
437 ReadDirectoryChangesServer::start(event_handler, meta_tx, cmd_tx, wakeup_sem);
438
439 Ok(ReadDirectoryChangesWatcher {
440 tx: action_tx,
441 cmd_rx,
442 wakeup_sem,
443 })
444 }
445
446 fn wakeup_server(&mut self) {
447 unsafe {
451 ReleaseSemaphore(self.wakeup_sem, 1, ptr::null_mut());
452 }
453 }
454
455 fn send_action_require_ack(&mut self, action: Action, pb: &PathBuf) -> Result<()> {
456 self.tx
457 .send(action)
458 .map_err(|_| Error::generic("Error sending to internal channel"))?;
459
460 self.wakeup_server();
462
463 let ack_pb = self
464 .cmd_rx
465 .recv()
466 .map_err(|_| Error::generic("Error receiving from command channel"))?
467 .map_err(|e| Error::generic(&format!("Error in watcher: {:?}", e)))?;
468
469 if pb.as_path() != ack_pb.as_path() {
470 Err(Error::generic(&format!(
471 "Expected ack for {:?} but got \
472 ack for {:?}",
473 pb, ack_pb
474 )))
475 } else {
476 Ok(())
477 }
478 }
479
480 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
481 let pb = if path.is_absolute() {
482 path.to_owned()
483 } else {
484 let p = env::current_dir().map_err(Error::io)?;
485 p.join(path)
486 };
487 if !pb.is_dir() && !pb.is_file() {
489 return Err(Error::generic(
490 "Input watch path is neither a file nor a directory.",
491 ));
492 }
493 self.send_action_require_ack(Action::Watch(pb.clone(), recursive_mode), &pb)
494 }
495
496 fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
497 let pb = if path.is_absolute() {
498 path.to_owned()
499 } else {
500 let p = env::current_dir().map_err(Error::io)?;
501 p.join(path)
502 };
503 let res = self
504 .tx
505 .send(Action::Unwatch(pb))
506 .map_err(|_| Error::generic("Error sending to internal channel"));
507 self.wakeup_server();
508 res
509 }
510}
511
512impl Watcher for ReadDirectoryChangesWatcher {
513 fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> {
514 let (meta_tx, _) = unbounded();
517 let event_handler = Arc::new(Mutex::new(event_handler));
518 Self::create(event_handler, meta_tx)
519 }
520
521 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
522 self.watch_inner(path, recursive_mode)
523 }
524
525 fn unwatch(&mut self, path: &Path) -> Result<()> {
526 self.unwatch_inner(path)
527 }
528
529 fn configure(&mut self, config: Config) -> Result<bool> {
530 let (tx, rx) = bounded(1);
531 self.tx.send(Action::Configure(config, tx))?;
532 rx.recv()?
533 }
534
535 fn kind() -> crate::WatcherKind {
536 WatcherKind::ReadDirectoryChangesWatcher
537 }
538}
539
540impl Drop for ReadDirectoryChangesWatcher {
541 fn drop(&mut self) {
542 let _ = self.tx.send(Action::Stop);
543 self.wakeup_server();
545 }
546}
547
548unsafe impl Send for ReadDirectoryChangesWatcher {}
551unsafe impl Sync for ReadDirectoryChangesWatcher {}