1use std::{
11 fs,
12 path::{Component, Path},
13 sync::atomic::AtomicUsize,
14};
15
16use crossbeam_channel::{select, unbounded, Receiver, Sender};
17use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
18use paths::{AbsPath, AbsPathBuf, Utf8PathBuf};
19use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator};
20use rustc_hash::FxHashSet;
21use vfs::loader::{self, LoadingProgress};
22use walkdir::WalkDir;
23
24#[derive(Debug)]
25pub struct NotifyHandle {
26 sender: Sender<Message>,
28 _thread: stdx::thread::JoinHandle,
29}
30
31#[derive(Debug)]
32enum Message {
33 Config(loader::Config),
34 Invalidate(AbsPathBuf),
35}
36
37impl loader::Handle for NotifyHandle {
38 fn spawn(sender: loader::Sender) -> NotifyHandle {
39 let actor = NotifyActor::new(sender);
40 let (sender, receiver) = unbounded::<Message>();
41 let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker)
42 .name("VfsLoader".to_owned())
43 .spawn(move || actor.run(receiver))
44 .expect("failed to spawn thread");
45 NotifyHandle { sender, _thread: thread }
46 }
47
48 fn set_config(&mut self, config: loader::Config) {
49 self.sender.send(Message::Config(config)).unwrap();
50 }
51
52 fn invalidate(&mut self, path: AbsPathBuf) {
53 self.sender.send(Message::Invalidate(path)).unwrap();
54 }
55
56 fn load_sync(&mut self, path: &AbsPath) -> Option<Vec<u8>> {
57 read(path)
58 }
59}
60
61type NotifyEvent = notify::Result<notify::Event>;
62
63struct NotifyActor {
64 sender: loader::Sender,
65 watched_file_entries: FxHashSet<AbsPathBuf>,
66 watched_dir_entries: Vec<loader::Directories>,
67 watcher: Option<(RecommendedWatcher, Receiver<NotifyEvent>)>,
69}
70
71#[derive(Debug)]
72enum Event {
73 Message(Message),
74 NotifyEvent(NotifyEvent),
75}
76
77impl NotifyActor {
78 fn new(sender: loader::Sender) -> NotifyActor {
79 NotifyActor {
80 sender,
81 watched_dir_entries: Vec::new(),
82 watched_file_entries: FxHashSet::default(),
83 watcher: None,
84 }
85 }
86
87 fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> {
88 let Some((_, watcher_receiver)) = &self.watcher else {
89 return receiver.recv().ok().map(Event::Message);
90 };
91
92 select! {
93 recv(receiver) -> it => it.ok().map(Event::Message),
94 recv(watcher_receiver) -> it => Some(Event::NotifyEvent(it.unwrap())),
95 }
96 }
97
98 fn run(mut self, inbox: Receiver<Message>) {
99 while let Some(event) = self.next_event(&inbox) {
100 tracing::debug!(?event, "vfs-notify event");
101 match event {
102 Event::Message(msg) => match msg {
103 Message::Config(config) => {
104 self.watcher = None;
105 if !config.watch.is_empty() {
106 let (watcher_sender, watcher_receiver) = unbounded();
107 let watcher = log_notify_error(RecommendedWatcher::new(
108 move |event| {
109 _ = watcher_sender.send(event);
113 },
114 Config::default(),
115 ));
116 self.watcher = watcher.map(|it| (it, watcher_receiver));
117 }
118
119 let config_version = config.version;
120
121 let n_total = config.load.len();
122 self.watched_dir_entries.clear();
123 self.watched_file_entries.clear();
124
125 self.send(loader::Message::Progress {
126 n_total,
127 n_done: LoadingProgress::Started,
128 config_version,
129 dir: None,
130 });
131
132 let (entry_tx, entry_rx) = unbounded();
133 let (watch_tx, watch_rx) = unbounded();
134 let processed = AtomicUsize::new(0);
135
136 config.load.into_par_iter().enumerate().for_each(|(i, entry)| {
137 let do_watch = config.watch.contains(&i);
138 if do_watch {
139 _ = entry_tx.send(entry.clone());
140 }
141 let files = Self::load_entry(
142 |f| _ = watch_tx.send(f.to_owned()),
143 entry,
144 do_watch,
145 |file| {
146 self.send(loader::Message::Progress {
147 n_total,
148 n_done: LoadingProgress::Progress(
149 processed.load(std::sync::atomic::Ordering::Relaxed),
150 ),
151 dir: Some(file),
152 config_version,
153 });
154 },
155 );
156 self.send(loader::Message::Loaded { files });
157 self.send(loader::Message::Progress {
158 n_total,
159 n_done: LoadingProgress::Progress(
160 processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1,
161 ),
162 config_version,
163 dir: None,
164 });
165 });
166
167 drop(watch_tx);
168 for path in watch_rx {
169 self.watch(&path);
170 }
171
172 drop(entry_tx);
173 for entry in entry_rx {
174 match entry {
175 loader::Entry::Files(files) => {
176 self.watched_file_entries.extend(files)
177 }
178 loader::Entry::Directories(dir) => {
179 self.watched_dir_entries.push(dir)
180 }
181 }
182 }
183
184 self.send(loader::Message::Progress {
185 n_total,
186 n_done: LoadingProgress::Finished,
187 config_version,
188 dir: None,
189 });
190 }
191 Message::Invalidate(path) => {
192 let contents = read(path.as_path());
193 let files = vec![(path, contents)];
194 self.send(loader::Message::Changed { files });
195 }
196 },
197 Event::NotifyEvent(event) => {
198 if let Some(event) = log_notify_error(event) {
199 if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) =
200 event.kind
201 {
202 let files = event
203 .paths
204 .into_iter()
205 .filter_map(|path| {
206 Some(
207 AbsPathBuf::try_from(
208 Utf8PathBuf::from_path_buf(path).ok()?,
209 )
210 .expect("path is absolute"),
211 )
212 })
213 .filter_map(|path| -> Option<(AbsPathBuf, Option<Vec<u8>>)> {
214 let meta = fs::metadata(&path).ok()?;
215 if meta.file_type().is_dir()
216 && self
217 .watched_dir_entries
218 .iter()
219 .any(|dir| dir.contains_dir(&path))
220 {
221 self.watch(path.as_ref());
222 return None;
223 }
224
225 if !meta.file_type().is_file() {
226 return None;
227 }
228
229 if !(self.watched_file_entries.contains(&path)
230 || self
231 .watched_dir_entries
232 .iter()
233 .any(|dir| dir.contains_file(&path)))
234 {
235 return None;
236 }
237
238 let contents = read(&path);
239 Some((path, contents))
240 })
241 .collect();
242 self.send(loader::Message::Changed { files });
243 }
244 }
245 }
246 }
247 }
248 }
249
250 fn load_entry(
251 mut watch: impl FnMut(&Path),
252 entry: loader::Entry,
253 do_watch: bool,
254 send_message: impl Fn(AbsPathBuf),
255 ) -> Vec<(AbsPathBuf, Option<Vec<u8>>)> {
256 match entry {
257 loader::Entry::Files(files) => files
258 .into_iter()
259 .map(|file| {
260 if do_watch {
261 watch(file.as_ref());
262 }
263 let contents = read(file.as_path());
264 (file, contents)
265 })
266 .collect::<Vec<_>>(),
267 loader::Entry::Directories(dirs) => {
268 let mut res = Vec::new();
269
270 for root in &dirs.include {
271 send_message(root.clone());
272 let walkdir =
273 WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| {
274 if !entry.file_type().is_dir() {
275 return true;
276 }
277 let path = entry.path();
278
279 if path_might_be_cyclic(path) {
280 return false;
281 }
282
283 dirs.exclude.iter().all(|it| it != path)
285 && (root == path || dirs.include.iter().all(|it| it != path))
286 });
287
288 let files = walkdir.filter_map(|it| it.ok()).filter_map(|entry| {
289 let depth = entry.depth();
290 let is_dir = entry.file_type().is_dir();
291 let is_file = entry.file_type().is_file();
292 let abs_path = AbsPathBuf::try_from(
293 Utf8PathBuf::from_path_buf(entry.into_path()).ok()?,
294 )
295 .ok()?;
296 if depth < 2 && is_dir {
297 send_message(abs_path.clone());
298 }
299 if is_dir && do_watch {
300 watch(abs_path.as_ref());
301 }
302 if !is_file {
303 return None;
304 }
305 let ext = abs_path.extension().unwrap_or_default();
306 if dirs.extensions.iter().all(|it| it.as_str() != ext) {
307 return None;
308 }
309 Some(abs_path)
310 });
311
312 res.extend(files.map(|file| {
313 let contents = read(file.as_path());
314 (file, contents)
315 }));
316 }
317 res
318 }
319 }
320 }
321
322 fn watch(&mut self, path: &Path) {
323 if let Some((watcher, _)) = &mut self.watcher {
324 log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive));
325 }
326 }
327
328 #[track_caller]
329 fn send(&self, msg: loader::Message) {
330 self.sender.send(msg).unwrap();
331 }
332}
333
334fn read(path: &AbsPath) -> Option<Vec<u8>> {
335 std::fs::read(path).ok()
336}
337
338fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> {
339 res.map_err(|err| tracing::warn!("notify error: {}", err)).ok()
340}
341
342fn path_might_be_cyclic(path: &Path) -> bool {
349 let Ok(destination) = std::fs::read_link(path) else {
350 return false;
351 };
352
353 let is_relative_parent =
355 destination.components().all(|c| matches!(c, Component::CurDir | Component::ParentDir));
356
357 is_relative_parent || path.starts_with(destination)
358}