use std::io;
use std::mem;
use std::sync::atomic::{Ordering, AtomicBool};
use libc;
use libc::inotify_init;
use libc::inotify_add_watch;
use libc::{IN_CLOSE_WRITE,IN_CREATE};
use std::ffi::CString;
use crate::config::Job;
use crate::config::VERBOSE;
use crate::exec::execute_job;
#[allow(non_camel_case_types)]
struct inotify_event {
wd: i32,
_mask: u32,
_cookie: u32,
len: u32,
}
pub struct Watcher {
inotify_fd: i32,
shutdown: AtomicBool,
jobs: Vec<Job>
}
impl Watcher {
pub fn new(mut jobs: Vec<Job>) -> (io::Result<Watcher>, Option<Vec<Job>>) {
unsafe {
let inotify_fd = inotify_init();
if inotify_fd == -1 {
return (Err(io::Error::last_os_error()), Some(jobs));
}
for job in jobs.iter_mut() {
let path = CString::new(job.file_spec().path().as_bytes()).unwrap();
if job.file_spec().is_dir() {
let watch_descriptor = inotify_add_watch(inotify_fd, path.as_ptr(), IN_CLOSE_WRITE);
job.set_watch_descriptor(watch_descriptor);
} else {
let watch_descriptor = inotify_add_watch(inotify_fd, path.as_ptr(), IN_CREATE);
job.set_watch_descriptor(watch_descriptor);
}
}
let watcher = Watcher {
inotify_fd, shutdown: AtomicBool::new(false), jobs
};
return (Ok(watcher), None);
}
}
fn len(&self) -> usize {
self.jobs.len()
}
fn job_by_watch_descriptor(&self, wd: i32) -> Option<&Job> {
for job in &self.jobs {
if job.watch_descriptor == wd {
return Some(&job);
}
}
None
}
pub fn run(&self) {
if 0 == self.len() {
return;
}
while ! self.shutdown.load(Ordering::Relaxed) {
unsafe {
let mut buf = [0u8; 4096];
let mut remain: isize = libc::read(self.inotify_fd, buf.as_mut_ptr() as *mut libc::c_void, 4096);
if remain > 0 {
let mut slice: &[u8] = &buf;
while remain > 0 {
let (len, event) = Watcher::inotify_event_from_buf(slice);
remain = remain - (len as isize);
slice = &slice[len..];
if let Some(job) = self.job_by_watch_descriptor(event.wd) {
execute_job(job);
}
}
} else {
if VERBOSE.load(Ordering::Relaxed) {
println!("inotify error");
}
}
}
}
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::Relaxed);
}
fn inotify_event_from_buf(buf: &[u8]) -> (usize, &inotify_event) {
let event_size = mem::size_of::<inotify_event>();
let event = buf.as_ptr() as *const inotify_event;
let event = unsafe { &*event };
((event.len as usize + event_size), &event)
}
}
impl Drop for Watcher {
fn drop(&mut self) {
unsafe { libc::close(self.inotify_fd); }
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use crate::config::FileSpec;
use std::time::Duration;
use std::sync::Arc;
use std::fs::File;
#[test]
fn test_watcher() {
let file_spec = FileSpec::new("tests/test.flag".to_string());
let job = Job::new(None, Some(file_spec), "teknopaul", String::from("touch /tmp/test.flag.done"));
if let (Ok(watcher), None) = Watcher::new(vec![job]) {
let lock = Arc::new(watcher);
let w1 = lock.clone();
let w2 = lock.clone();
std::thread::spawn(move|| {
println!("THREAD");
w1.run();
});
std::thread::sleep(Duration::new(1,0));
fs::write("tests/test.flag", "").ok();
File::create("tests/test.flag").ok();
std::thread::sleep(Duration::new(1,0));
std::thread::spawn(move|| {
std::thread::sleep(Duration::new(1,0));
w2.shutdown();
println!("HERE");
panic!();
});
} else {
panic!()
}
}
}