async_inotify/
handler.rs

1use std::ffi::{OsStr, OsString};
2use std::future::Future;
3use std::path::Path;
4
5use inotify::EventMask;
6use regex::Regex;
7use tokio::sync::{broadcast, mpsc};
8
9use crate::app::Anotify;
10use crate::watcher::{Event, Watcher};
11
12/// Start a inotify server to monitor files change.
13/// When catch `shutdown` result, or some error were caused, quit.
14///
15/// You'd better define a `output` to recv all events, if not, the result would be print to stdout
16/// like:
17///
18/// ```no_run
19/// println!("{:?}: {:?}", event.mask(), event.path());
20/// ```
21pub async fn run(
22    anotify: Anotify,
23    output: Option<broadcast::Sender<Event>>,
24    shutdown: impl Future,
25) -> crate::Result<()> {
26    tokio::select! {
27        Err(e) = handler(anotify, output) => Err(e),
28        _ = shutdown => Ok(()),
29    }
30}
31
32async fn handler(anotify: Anotify, output: Option<broadcast::Sender<Event>>) -> crate::Result<()> {
33    let Anotify {
34        mask,
35        recursive,
36        regex,
37        targets,
38    } = anotify;
39
40    // init re mode
41    let mut re = None;
42    if let Some(regex) = regex {
43        re = Some(Regex::new(&regex).unwrap());
44    }
45
46    let mut watcher = Watcher::init();
47    let (handler_tx, mut handler_rx) = mpsc::channel(10);
48
49    dispatch(targets, &handler_tx);
50
51    loop {
52        tokio::select! {
53            // here handle the event
54            Some(event) = watcher.next() => {
55                // if recursive mode && found new dir
56                if recursive && event.is_dir() && event.is_new() {
57                    handler_tx.send(event.path().as_os_str().to_os_string()).await.unwrap();
58                }
59
60                // fd was remvoed
61                if !(*event.mask() & EventMask::IGNORED).is_empty() {
62                    let _ = watcher.remove(event.wd().clone());
63                    continue
64                }
65
66                // fliter
67                if re.is_some()
68                    && !re
69                        .as_ref()
70                        .unwrap()
71                        .is_match(&event.path().to_str().unwrap())
72                {
73                    continue
74                }
75
76                // send event to output or print to stdout
77                if output.is_some() {
78                    let _ = output.as_ref().unwrap().send(event)?;
79                } else {
80                    println!("{:?}: {:?}", event.watchmask(), event.path());
81                }
82            },
83            // add new watch task
84            Some(target) = handler_rx.recv() => {
85                let _ = watcher.add(&target, &mask)?;
86                if recursive {
87                    let targets = sub_dir(&target)?;
88
89                    dispatch(targets, &handler_tx);
90                }
91            },
92        }
93    }
94}
95
96fn dispatch(targets: Vec<OsString>, tx: &mpsc::Sender<OsString>) {
97    let tx = tx.clone();
98    tokio::spawn(async move {
99        for target in targets {
100            tx.send(target).await.unwrap();
101        }
102    });
103}
104
105fn sub_dir<P>(path: P) -> crate::Result<Vec<OsString>>
106where
107    P: AsRef<Path> + std::convert::AsRef<OsStr>,
108{
109    let mut res = vec![];
110    let path = Path::new(&path);
111
112    for entry in path.read_dir().expect("failed to read_dir") {
113        if let Ok(entry) = entry {
114            if let Ok(file_type) = entry.file_type() {
115                if file_type.is_dir() {
116                    res.push(path.join(entry.path()).into_os_string());
117                }
118            }
119        }
120    }
121
122    Ok(res)
123}