lxcrond 0.2.1

cron and entr/inotify server for lxc containers
Documentation

use std::io;
use std::mem;

use std::sync::atomic::{Ordering, AtomicBool};

use libc;
use libc::inotify_init;
use libc::inotify_add_watch;
use libc::{IN_CLOSE_WRITE,IN_CREATE};
use std::ffi::CString;
use crate::config::Job;
use crate::config::VERBOSE;
use crate::exec::execute_job;

// this struct ought to be in libc, see `man 7 inotify`
#[allow(non_camel_case_types)]
struct inotify_event {
   wd: i32,
   _mask: u32,
   _cookie: u32,
   len: u32,
    // followed by a CString name
}

pub struct Watcher {
    inotify_fd: i32,
    shutdown: AtomicBool,
    jobs: Vec<Job>
}

/// use Linux kernels inotify to execute file_spec jobs
///
impl Watcher {

    /// returns an error or a Watcher, if there was an error it returns the Vec<Job>
    pub fn new(mut jobs: Vec<Job>) -> (io::Result<Watcher>, Option<Vec<Job>>) {
        unsafe {
            let inotify_fd = inotify_init();
            if inotify_fd == -1 {
                return (Err(io::Error::last_os_error()), Some(jobs));
            }

            for job in jobs.iter_mut() {
                let path = CString::new(job.file_spec().path().as_bytes()).unwrap();
                if job.file_spec().is_dir() {
                    let watch_descriptor = inotify_add_watch(inotify_fd, path.as_ptr(), IN_CLOSE_WRITE);
                    job.set_watch_descriptor(watch_descriptor);
                } else {
                    let watch_descriptor = inotify_add_watch(inotify_fd, path.as_ptr(), IN_CREATE);
                    job.set_watch_descriptor(watch_descriptor);
                }
            }

            let watcher = Watcher {
                inotify_fd, shutdown: AtomicBool::new(false), jobs
            };

            return (Ok(watcher), None);
        }
    }

    fn len(&self) -> usize {
        self.jobs.len()
    }

    fn job_by_watch_descriptor(&self, wd: i32) -> Option<&Job> {
        for job in &self.jobs {
            if job.watch_descriptor == wd {
                return Some(&job);
            }
        }
        None
    }

    pub fn run(&self) {
        if 0 == self.len() {
            return;
        }
        while ! self.shutdown.load(Ordering::Relaxed) {
            unsafe {
                let mut buf = [0u8; 4096];
                let mut remain: isize = libc::read(self.inotify_fd, buf.as_mut_ptr() as *mut libc::c_void, 4096);
                if remain > 0 {
                    let mut slice: &[u8] = &buf;
                    while remain > 0 {
                        let (len, event) = Watcher::inotify_event_from_buf(slice);
                        remain = remain - (len as isize);
                        slice = &slice[len..];
                        if let Some(job) = self.job_by_watch_descriptor(event.wd) {
                            execute_job(job);
                        }
                    }
                } else {
                    if VERBOSE.load(Ordering::Relaxed) {
                        println!("inotify error");
                    }
                }
            }
        }
    }

    pub fn shutdown(&self) {
        self.shutdown.store(true, Ordering::Relaxed);
    }

    // takes as buffer read from an io stream and casts it as a struct
    // this is easy in C but hard in rust but it is possible
    fn inotify_event_from_buf(buf: &[u8]) -> (usize, &inotify_event) {
        let event_size = mem::size_of::<inotify_event>();

        let event = buf.as_ptr() as *const inotify_event;

        let event = unsafe { &*event };

        ((event.len as usize + event_size), &event)
    }
}

impl Drop for Watcher {
    fn drop(&mut self) {
        unsafe { libc::close(self.inotify_fd); }
    }
}


#[cfg(test)]
mod tests {
    use super::*;
    use std::fs;
    use crate::config::FileSpec;
    use std::time::Duration;
    use std::sync::Arc;
    use std::fs::File;

    #[test]
    fn test_watcher() {
        let file_spec = FileSpec::new("tests/test.flag".to_string());
        let job = Job::new(None, Some(file_spec), "teknopaul", String::from("touch /tmp/test.flag.done"));

        if let (Ok(watcher), None) = Watcher::new(vec![job]) {
            let lock = Arc::new(watcher);
            let w1 = lock.clone();
            let w2 = lock.clone();
            std::thread::spawn(move|| {
                println!("THREAD");
                w1.run();
            });
            std::thread::sleep(Duration::new(1,0));
            fs::write("tests/test.flag", "").ok();
            File::create("tests/test.flag").ok();
            std::thread::sleep(Duration::new(1,0));
            std::thread::spawn(move|| {
                std::thread::sleep(Duration::new(1,0));
                w2.shutdown();
                println!("HERE");
                panic!();
            });
        } else {
            panic!()
        }

    }

}