extern crate kernel32;
use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE, INFINITE, TRUE, WAIT_OBJECT_0,
ERROR_OPERATION_ABORTED, FILE_NOTIFY_INFORMATION, fileapi, winbase, winnt};
use std::collections::HashMap;
use std::mem;
use std::path::{Path, PathBuf};
use std::ptr;
use std::slice;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::ffi::OsString;
use std::os::windows::ffi::{OsStrExt, OsStringExt};
use std::thread;
use std::os::raw::c_void;
use super::{Event, Error, op, Op, Watcher};
const BUF_SIZE: u32 = 16384;
#[derive(Clone)]
struct ReadData {
dir: PathBuf, file: Option<PathBuf>, complete_sem: HANDLE,
}
struct ReadDirectoryRequest {
tx: Sender<Event>,
buffer: [u8; BUF_SIZE as usize],
handle: HANDLE,
data: ReadData
}
enum Action {
Watch(PathBuf),
Unwatch(PathBuf),
Stop
}
pub enum MetaEvent {
SingleWatchComplete,
WatcherAwakened
}
struct WatchState {
dir_handle: HANDLE,
complete_sem: HANDLE,
}
struct ReadDirectoryChangesServer {
rx: Receiver<Action>,
tx: Sender<Event>,
meta_tx: Sender<MetaEvent>,
cmd_tx: Sender<Result<PathBuf,Error>>,
watches: HashMap<PathBuf, WatchState>,
wakeup_sem: HANDLE
}
impl ReadDirectoryChangesServer {
fn start(event_tx: Sender<Event>, meta_tx: Sender<MetaEvent>, cmd_tx:Sender<Result<PathBuf,Error>>, wakeup_sem: HANDLE) -> Sender<Action> {
let (action_tx, action_rx) = channel();
let sem_temp = wakeup_sem as u64;
thread::spawn(move || {
let wakeup_sem = sem_temp as HANDLE;
let server = ReadDirectoryChangesServer {
tx: event_tx,
rx: action_rx,
meta_tx: meta_tx,
cmd_tx: cmd_tx,
watches: HashMap::new(),
wakeup_sem: wakeup_sem
};
server.run();
});
action_tx
}
fn run(mut self) {
loop {
let mut stopped = false;
while let Ok(action) = self.rx.try_recv() {
match action {
Action::Watch(path) => {
let res = self.add_watch(path);
let _ = self.cmd_tx.send(res);
},
Action::Unwatch(path) => self.remove_watch(path),
Action::Stop => {
stopped = true;
for (_, ws) in &self.watches {
stop_watch(ws, &self.meta_tx);
}
break;
}
}
};
if stopped {
break;
}
unsafe {
let waitres = kernel32::WaitForSingleObjectEx(self.wakeup_sem, 500, TRUE);
if waitres == WAIT_OBJECT_0 {
let _ = self.meta_tx.send(MetaEvent::WatcherAwakened);
}
}
}
unsafe {
kernel32::CloseHandle(self.wakeup_sem);
}
}
fn add_watch(&mut self, path: PathBuf) -> Result<PathBuf,Error> {
if !path.is_dir() && !path.is_file() {
return Err(Error::Generic("Input watch path is neither a file nor a directory.".to_owned()));
}
let (watching_file,dir_target) = {
if path.is_dir() {
(false,path.clone())
} else {
(true,path.parent().unwrap().to_path_buf())
}
};
let encoded_path: Vec<u16> = dir_target.as_os_str().encode_wide().chain(Some(0)).collect();
let handle;
unsafe {
handle = kernel32::CreateFileW(
encoded_path.as_ptr(),
winnt::FILE_LIST_DIRECTORY,
winnt::FILE_SHARE_READ | winnt::FILE_SHARE_DELETE | winnt::FILE_SHARE_WRITE,
ptr::null_mut(),
fileapi::OPEN_EXISTING,
winbase::FILE_FLAG_BACKUP_SEMANTICS | winbase::FILE_FLAG_OVERLAPPED,
ptr::null_mut());
if handle == INVALID_HANDLE_VALUE {
let err = if watching_file {
Err(Error::Generic("You attempted to watch a single file, but parent directory could not be opened.".to_owned()))
} else {
Err(Error::PathNotFound)
};
return err;
}
}
let wf = if watching_file {
Some(path.clone())
} else {
None
};
let semaphore = unsafe {
kernel32::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut())
};
if semaphore == ptr::null_mut() || semaphore == INVALID_HANDLE_VALUE {
unsafe { kernel32::CloseHandle(handle); }
return Err(Error::Generic("Failed to create semaphore for watch.".to_owned()));
}
let rd = ReadData {
dir: dir_target,
file: wf,
complete_sem: semaphore
};
let ws = WatchState {
dir_handle: handle,
complete_sem: semaphore
};
self.watches.insert(path.clone(), ws);
start_read(&rd, &self.tx, handle);
Ok(path.to_path_buf())
}
fn remove_watch(&mut self, path: PathBuf) {
if let Some(ws) = self.watches.remove(&path) {
stop_watch(&ws, &self.meta_tx);
}
}
}
fn stop_watch(ws:&WatchState,meta_tx: &Sender<MetaEvent>) {
unsafe {
let cio = kernel32::CancelIo(ws.dir_handle);
let ch = kernel32::CloseHandle(ws.dir_handle);
if cio != 0 && ch != 0 {
kernel32::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE);
}
kernel32::CloseHandle(ws.complete_sem);
}
let _ = meta_tx.send(MetaEvent::SingleWatchComplete);
}
fn start_read(rd: &ReadData, tx: &Sender<Event>, handle: HANDLE) {
let mut request = Box::new(ReadDirectoryRequest {
tx: tx.clone(),
handle: handle,
buffer: [0u8; BUF_SIZE as usize],
data: rd.clone()
});
let flags = winnt::FILE_NOTIFY_CHANGE_FILE_NAME
| winnt::FILE_NOTIFY_CHANGE_DIR_NAME
| winnt::FILE_NOTIFY_CHANGE_ATTRIBUTES
| winnt::FILE_NOTIFY_CHANGE_SIZE
| winnt::FILE_NOTIFY_CHANGE_LAST_WRITE
| winnt::FILE_NOTIFY_CHANGE_CREATION
| winnt::FILE_NOTIFY_CHANGE_SECURITY;
let monitor_subdir = if (&request.data.file).is_none() {
1
} else {
0
};
unsafe {
let mut overlapped: Box<OVERLAPPED> = Box::new(mem::zeroed());
let req_buf = request.buffer.as_mut_ptr() as *mut c_void;
let request_p = Box::into_raw(request) as *mut c_void;
overlapped.hEvent = request_p;
let ret = kernel32::ReadDirectoryChangesW(
handle,
req_buf,
BUF_SIZE,
monitor_subdir,
flags,
&mut 0u32 as *mut u32, &mut *overlapped as *mut OVERLAPPED,
Some(handle_event));
if ret == 0 {
let request: Box<ReadDirectoryRequest> = mem::transmute(request_p);
kernel32::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
} else {
mem::forget(overlapped);
}
}
}
unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, overlapped: LPOVERLAPPED) {
let overlapped: Box<OVERLAPPED> = Box::from_raw(overlapped);
let request: Box<ReadDirectoryRequest> = Box::from_raw(overlapped.hEvent as *mut _);
if error_code == ERROR_OPERATION_ABORTED {
kernel32::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
return;
}
start_read(&request.data, &request.tx, request.handle);
let mut cur_offset: *const u8 = request.buffer.as_ptr();
let mut cur_entry: *const FILE_NOTIFY_INFORMATION = mem::transmute(cur_offset);
loop {
let len = (*cur_entry).FileNameLength as usize / 2;
let encoded_path: &[u16] = slice::from_raw_parts((*cur_entry).FileName.as_ptr(), len);
let path = request.data.dir.join(PathBuf::from(OsString::from_wide(encoded_path)));
let skip = match request.data.file {
None => false,
Some(ref watch_path) => *watch_path != path
};
if !skip {
let op = match (*cur_entry).Action {
winnt::FILE_ACTION_ADDED => op::CREATE,
winnt::FILE_ACTION_REMOVED => op::REMOVE,
winnt::FILE_ACTION_MODIFIED => op::WRITE,
winnt::FILE_ACTION_RENAMED_OLD_NAME | winnt::FILE_ACTION_RENAMED_NEW_NAME => op::RENAME,
_ => Op::empty()
};
let evt = Event {
path: Some(path),
op: Ok(op)
};
let _ = request.tx.send(evt);
}
if (*cur_entry).NextEntryOffset == 0 {
break;
}
cur_offset = cur_offset.offset((*cur_entry).NextEntryOffset as isize);
cur_entry = mem::transmute(cur_offset);
}
}
pub struct ReadDirectoryChangesWatcher {
tx: Sender<Action>,
cmd_rx: Receiver<Result<PathBuf,Error>>,
wakeup_sem: HANDLE
}
impl ReadDirectoryChangesWatcher {
pub fn create(event_tx: Sender<Event>, meta_tx: Sender<MetaEvent>) -> Result<ReadDirectoryChangesWatcher, Error> {
let (cmd_tx, cmd_rx) = channel();
let wakeup_sem = unsafe {
kernel32::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut())
};
if wakeup_sem == ptr::null_mut() || wakeup_sem == INVALID_HANDLE_VALUE {
return Err(Error::Generic("Failed to create wakeup semaphore.".to_owned()));
}
let action_tx = ReadDirectoryChangesServer::start(event_tx,meta_tx,cmd_tx,wakeup_sem);
Ok(ReadDirectoryChangesWatcher {
tx: action_tx,
cmd_rx: cmd_rx,
wakeup_sem: wakeup_sem
})
}
fn wakeup_server(&mut self) {
unsafe { kernel32::ReleaseSemaphore(self.wakeup_sem, 1, ptr::null_mut()); }
}
fn send_action_require_ack(&mut self, action:Action, pb:&PathBuf) -> Result<(), Error> {
match self.tx.send(action) {
Err(_) => Err(Error::Generic("Error sending to internal channel".to_owned())),
Ok(_) => {
self.wakeup_server();
match self.cmd_rx.recv() {
Err(_) => Err(Error::Generic("Error receiving from command channel".to_owned())),
Ok(ack_res) => {
match ack_res {
Err(e) => Err(Error::Generic(format!("Error in watcher: {:?}", e))),
Ok(ack_pb) => {
if pb.as_path() != ack_pb.as_path() {
Err(Error::Generic(format!("Expected ack for {:?} but got ack for {:?}", pb, ack_pb)))
} else {
Ok(())
}
}
}
}
}
}
}
}
}
impl Watcher for ReadDirectoryChangesWatcher {
fn new(event_tx: Sender<Event>) -> Result<ReadDirectoryChangesWatcher, Error> {
let (meta_tx, _) = channel();
ReadDirectoryChangesWatcher::create(event_tx, meta_tx)
}
fn watch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
let pb = path.as_ref().to_path_buf();
if !pb.is_dir() && !pb.is_file() {
return Err(Error::Generic("Input watch path is neither a file nor a directory.".to_owned()));
}
self.send_action_require_ack(Action::Watch(path.as_ref().to_path_buf()), &pb)
}
fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
let res = self.tx.send(Action::Unwatch(path.as_ref().to_path_buf()))
.map_err(|_| Error::Generic("Error sending to internal channel".to_owned()));
self.wakeup_server();
res
}
}
impl Drop for ReadDirectoryChangesWatcher {
fn drop(&mut self) {
let _ = self.tx.send(Action::Stop);
self.wakeup_server();
}
}