1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
use {
crate::{
command::Sequence,
errors::ProgramError,
},
notify::{
RecommendedWatcher,
RecursiveMode,
Watcher as NotifyWatcher,
event::{
AccessKind,
AccessMode,
DataChange,
EventKind,
ModifyKind,
},
},
std::{
path::PathBuf,
thread,
},
termimad::crossbeam::channel,
};
const DEBOUNCE_MAX_DELAY: std::time::Duration = std::time::Duration::from_millis(500);
/// Watch for notify events on a path, and send a :refresh sequence when a change is detected
///
/// inotify events are debounced:
/// - an isolated event sends a refresh immediately
/// - successive events after the first one will have to wait a little
/// - there's at most one refrest sent every DEBOUNCE_MAX_DELAY
/// - if there's a long sequence of events, it's guaranteed that there's one
/// refresh sent every DEBOUNCE_MAX_DELAY
/// - the last event of the sequence is always sent (with a delay of
/// at most DEBOUNCE_MAX_DELAY), ensuring we don't miss any change
pub struct Watcher {
notify_sender: channel::Sender<()>,
notify_watcher: Option<RecommendedWatcher>,
watched: Vec<PathBuf>,
}
impl Watcher {
pub fn new(tx_seqs: channel::Sender<Sequence>) -> Self {
let (notify_sender, notify_receiver) = channel::unbounded();
thread::spawn(move || {
let mut period_events = 0;
loop {
match notify_receiver.recv_timeout(DEBOUNCE_MAX_DELAY) {
Ok(()) => {
period_events += 1;
if period_events > 1 {
continue;
}
debug!("sending single event");
Self::send_refresh(&tx_seqs);
}
Err(channel::RecvTimeoutError::Timeout) => {
if period_events <= 1 {
continue;
}
debug!("sending aggregation of {} pending events", period_events - 1);
Self::send_refresh(&tx_seqs);
period_events = 0;
}
Err(channel::RecvTimeoutError::Disconnected) => {
info!("notify sender disconnected, stopping notify watcher thread");
break;
}
}
}
});
Self {
notify_sender,
notify_watcher: None,
watched: Default::default(),
}
}
fn send_refresh(
tx_seqs: &channel::Sender<Sequence>,
) {
if !tx_seqs.is_empty() {
// let's avoid accumulating refreshes when the tree is long to update
debug!("skipping refresh, channel full");
return;
}
let sequence = Sequence::new_single(":refresh");
if let Err(e) = tx_seqs.send(sequence) {
warn!("error when sending sequence from watcher: {}", e);
}
}
/// stop watching the previous path, watch new one.
///
/// In case of error, we try to stop watching the previous path anyway.
pub fn watch(
&mut self,
paths: Vec<PathBuf>,
) -> Result<(), ProgramError> {
debug!("start watching new paths");
let notify_watcher = match self.notify_watcher.as_mut() {
Some(nw) => {
for path in self.watched.drain(..) {
debug!("stop watching previous path {:?}", path);
if let Err(e) = nw.unwatch(&path) {
warn!("error when unwatching path {:?}: {}", path, e);
}
}
nw
}
None => self
.notify_watcher
.insert(Self::make_notify_watcher(self.notify_sender.clone())?),
};
let mut err = None;
for path in &paths {
if !path.exists() {
warn!("watch path doesn't exist: {:?}", path);
return Ok(());
}
debug!("add watch {:?}", &path);
if let Err(e) = notify_watcher.watch(path, RecursiveMode::NonRecursive) {
warn!("error when watching path {:?}: {}", path, e);
err = Some(e);
break;
}
}
if let Some(err) = err {
// the RecommendedWatcher sometimes ends in an unconsistent state when failing
// to watch a path, so we drop it
self.notify_watcher = None;
Err(err.into())
} else {
self.watched = paths;
Ok(())
}
}
fn make_notify_watcher(sender: channel::Sender<()>) -> Result<RecommendedWatcher, ProgramError> {
let mut notify_watcher =
notify::recommended_watcher(move |res: notify::Result<notify::Event>| match res {
Ok(we) => {
// Warning: don't log we, or a Modify::Any event, as this could cause infinite
// loop while the logger writes to the file being watched (if the log file is
// inside the watched directory)
match we.kind {
EventKind::Modify(ModifyKind::Metadata(_)) => {
debug!("ignoring metadata change");
return; // useless event
}
EventKind::Modify(ModifyKind::Data(DataChange::Any)) => {
// might be data append, we prefer to ignore it
// as some cases (eg log files) are very noisy
return;
}
EventKind::Access(AccessKind::Close(AccessMode::Write)) => {
debug!("close write event: {we:?}");
}
EventKind::Access(_) => {
// we don't want to watch for reads
return;
}
_ => {
debug!("notify event: {we:?}");
}
}
if let Err(e) = sender.send(()) {
info!("error when notifying on notify event: {}", e);
}
}
Err(e) => warn!("watch error: {:?}", e),
})?;
notify_watcher.configure(
notify::Config::default()
.with_compare_contents(false)
.with_follow_symlinks(false),
)?;
Ok(notify_watcher)
}
pub fn stop_watching(&mut self) -> Result<(), ProgramError> {
for path in self.watched.drain(..) {
if let Some(nw) = self.notify_watcher.as_mut() {
debug!("stop watching previous path {:?}", path);
if let Err(e) = nw.unwatch(&path) {
warn!("error when unwatching path {:?}: {}", path, e);
}
}
}
Ok(())
}
}