Skip to main content

lxcrond/
watcher.rs

1
2use std::io;
3use std::mem;
4
5use std::sync::atomic::{Ordering, AtomicBool};
6
7use libc;
8use libc::inotify_init;
9use libc::inotify_add_watch;
10use libc::{IN_CLOSE_WRITE, IN_CREATE};
11use std::ffi::CString;
12use crate::config::Job;
13use crate::config::VERBOSE;
14use crate::exec::execute_job;
15use log::*;
16
17// this struct ought to be in libc, see `man 7 inotify`
18#[allow(non_camel_case_types)]
19struct inotify_event {
20   wd: i32,
21   _mask: u32,
22   _cookie: u32,
23   len: u32,
24    // followed by a CString name
25}
26
27pub struct Watcher {
28    inotify_fd: i32,
29    shutdown: AtomicBool,
30    jobs: Vec<Job>
31}
32
33/// use Linux kernels inotify to execute file_spec jobs
34///
35impl Watcher {
36
37    /// returns an error or a Watcher, if there was an error it returns the Vec<Job>
38    pub fn new(mut jobs: Vec<Job>) -> (io::Result<Watcher>, Option<Vec<Job>>) {
39        unsafe {
40            let inotify_fd = inotify_init();
41            if inotify_fd == -1 {
42                return (Err(io::Error::last_os_error()), Some(jobs));
43            }
44
45            for job in jobs.iter_mut() {
46                let path = CString::new(job.file_spec().path().as_bytes()).unwrap();
47                if job.file_spec().is_dir() {
48                    let watch_descriptor = inotify_add_watch(inotify_fd, path.as_ptr(), IN_CREATE);
49                    job.set_watch_descriptor(watch_descriptor);
50                } else {
51                    let watch_descriptor = inotify_add_watch(inotify_fd, path.as_ptr(), IN_CLOSE_WRITE);
52                    job.set_watch_descriptor(watch_descriptor);
53                }
54            }
55
56            let watcher = Watcher {
57                inotify_fd, shutdown: AtomicBool::new(false), jobs
58            };
59
60            return (Ok(watcher), None);
61        }
62    }
63
64    fn len(&self) -> usize {
65        self.jobs.len()
66    }
67
68    fn job_by_watch_descriptor(&self, wd: i32) -> Option<&Job> {
69        for job in &self.jobs {
70            if job.watch_descriptor == wd {
71                return Some(&job);
72            }
73        }
74        None
75    }
76
77    pub fn run(&self) {
78        if 0 == self.len() {
79            return;
80        }
81        while ! self.shutdown.load(Ordering::Relaxed) {
82            unsafe {
83                let mut buf = [0u8; 4096];
84                let mut remain: isize = libc::read(self.inotify_fd, buf.as_mut_ptr() as *mut libc::c_void, 4096);
85                if remain > 0 {
86                    let mut slice: &[u8] = &buf;
87                    while remain > 0 {
88                        let (len, event) = Watcher::inotify_event_from_buf(slice);
89                        remain = remain - (len as isize);
90                        slice = &slice[len..];
91                        if let Some(job) = self.job_by_watch_descriptor(event.wd) {
92                            debug!("inotify executing job {}", job.command());
93                            execute_job(job);
94                        }
95                    }
96                } else {
97                    if VERBOSE.load(Ordering::Relaxed) {
98                        println!("inotify error");
99                    }
100                    warn!("inotify error");
101                }
102            }
103        }
104    }
105
106    pub fn shutdown(&self) {
107        self.shutdown.store(true, Ordering::Relaxed);
108    }
109
110    // takes as buffer read from an io stream and casts it as a struct
111    // this is easy in C but hard in rust but it is possible
112    fn inotify_event_from_buf(buf: &[u8]) -> (usize, &inotify_event) {
113        let event_size = mem::size_of::<inotify_event>();
114
115        let event = buf.as_ptr() as *const inotify_event;
116
117        let event = unsafe { &*event };
118
119        ((event.len as usize + event_size), &event)
120    }
121}
122
123impl Drop for Watcher {
124    fn drop(&mut self) {
125        unsafe { libc::close(self.inotify_fd); }
126    }
127}
128
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use std::fs;
134    use crate::config::FileSpec;
135    use std::time::Duration;
136    use std::sync::Arc;
137    use std::fs::File;
138
139    #[test]
140    fn test_watcher() {
141        let file_spec = FileSpec::new("tests/test.flag".to_string());
142        let job = Job::new(None, Some(file_spec), "teknopaul", String::from("touch /tmp/test.flag.done"));
143
144        if let (Ok(watcher), None) = Watcher::new(vec![job]) {
145            let lock = Arc::new(watcher);
146            let w1 = lock.clone();
147            let w2 = lock.clone();
148            std::thread::spawn(move|| {
149                println!("THREAD");
150                w1.run();
151            });
152            std::thread::sleep(Duration::new(1,0));
153            fs::write("tests/test.flag", "").ok();
154            File::create("tests/test.flag").ok();
155            std::thread::sleep(Duration::new(1,0));
156            std::thread::spawn(move|| {
157                std::thread::sleep(Duration::new(1,0));
158                w2.shutdown();
159                println!("HERE");
160                panic!();
161            });
162        } else {
163            panic!()
164        }
165
166    }
167
168}