1
2use std::io;
3use std::mem;
4
5use std::sync::atomic::{Ordering, AtomicBool};
6
7use libc;
8use libc::inotify_init;
9use libc::inotify_add_watch;
10use libc::{IN_CLOSE_WRITE, IN_CREATE};
11use std::ffi::CString;
12use crate::config::Job;
13use crate::config::VERBOSE;
14use crate::exec::execute_job;
15use log::*;
16
17#[allow(non_camel_case_types)]
19struct inotify_event {
20 wd: i32,
21 _mask: u32,
22 _cookie: u32,
23 len: u32,
24 }
26
27pub struct Watcher {
28 inotify_fd: i32,
29 shutdown: AtomicBool,
30 jobs: Vec<Job>
31}
32
33impl Watcher {
36
37 pub fn new(mut jobs: Vec<Job>) -> (io::Result<Watcher>, Option<Vec<Job>>) {
39 unsafe {
40 let inotify_fd = inotify_init();
41 if inotify_fd == -1 {
42 return (Err(io::Error::last_os_error()), Some(jobs));
43 }
44
45 for job in jobs.iter_mut() {
46 let path = CString::new(job.file_spec().path().as_bytes()).unwrap();
47 if job.file_spec().is_dir() {
48 let watch_descriptor = inotify_add_watch(inotify_fd, path.as_ptr(), IN_CREATE);
49 job.set_watch_descriptor(watch_descriptor);
50 } else {
51 let watch_descriptor = inotify_add_watch(inotify_fd, path.as_ptr(), IN_CLOSE_WRITE);
52 job.set_watch_descriptor(watch_descriptor);
53 }
54 }
55
56 let watcher = Watcher {
57 inotify_fd, shutdown: AtomicBool::new(false), jobs
58 };
59
60 return (Ok(watcher), None);
61 }
62 }
63
64 fn len(&self) -> usize {
65 self.jobs.len()
66 }
67
68 fn job_by_watch_descriptor(&self, wd: i32) -> Option<&Job> {
69 for job in &self.jobs {
70 if job.watch_descriptor == wd {
71 return Some(&job);
72 }
73 }
74 None
75 }
76
77 pub fn run(&self) {
78 if 0 == self.len() {
79 return;
80 }
81 while ! self.shutdown.load(Ordering::Relaxed) {
82 unsafe {
83 let mut buf = [0u8; 4096];
84 let mut remain: isize = libc::read(self.inotify_fd, buf.as_mut_ptr() as *mut libc::c_void, 4096);
85 if remain > 0 {
86 let mut slice: &[u8] = &buf;
87 while remain > 0 {
88 let (len, event) = Watcher::inotify_event_from_buf(slice);
89 remain = remain - (len as isize);
90 slice = &slice[len..];
91 if let Some(job) = self.job_by_watch_descriptor(event.wd) {
92 debug!("inotify executing job {}", job.command());
93 execute_job(job);
94 }
95 }
96 } else {
97 if VERBOSE.load(Ordering::Relaxed) {
98 println!("inotify error");
99 }
100 warn!("inotify error");
101 }
102 }
103 }
104 }
105
106 pub fn shutdown(&self) {
107 self.shutdown.store(true, Ordering::Relaxed);
108 }
109
110 fn inotify_event_from_buf(buf: &[u8]) -> (usize, &inotify_event) {
113 let event_size = mem::size_of::<inotify_event>();
114
115 let event = buf.as_ptr() as *const inotify_event;
116
117 let event = unsafe { &*event };
118
119 ((event.len as usize + event_size), &event)
120 }
121}
122
123impl Drop for Watcher {
124 fn drop(&mut self) {
125 unsafe { libc::close(self.inotify_fd); }
126 }
127}
128
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use std::fs;
134 use crate::config::FileSpec;
135 use std::time::Duration;
136 use std::sync::Arc;
137 use std::fs::File;
138
139 #[test]
140 fn test_watcher() {
141 let file_spec = FileSpec::new("tests/test.flag".to_string());
142 let job = Job::new(None, Some(file_spec), "teknopaul", String::from("touch /tmp/test.flag.done"));
143
144 if let (Ok(watcher), None) = Watcher::new(vec![job]) {
145 let lock = Arc::new(watcher);
146 let w1 = lock.clone();
147 let w2 = lock.clone();
148 std::thread::spawn(move|| {
149 println!("THREAD");
150 w1.run();
151 });
152 std::thread::sleep(Duration::new(1,0));
153 fs::write("tests/test.flag", "").ok();
154 File::create("tests/test.flag").ok();
155 std::thread::sleep(Duration::new(1,0));
156 std::thread::spawn(move|| {
157 std::thread::sleep(Duration::new(1,0));
158 w2.shutdown();
159 println!("HERE");
160 panic!();
161 });
162 } else {
163 panic!()
164 }
165
166 }
167
168}