1use std::{
11 fs,
12 path::{Component, Path},
13 sync::atomic::AtomicUsize,
14};
15
16use crossbeam_channel::{Receiver, Sender, select, unbounded};
17use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
18use paths::{AbsPath, AbsPathBuf, Utf8PathBuf};
19use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator};
20use rustc_hash::FxHashSet;
21use vfs::loader::{self, LoadingProgress};
22use walkdir::WalkDir;
23
24#[derive(Debug)]
25pub struct NotifyHandle {
26 sender: Sender<Message>,
28 _thread: stdx::thread::JoinHandle,
29}
30
31#[derive(Debug)]
32enum Message {
33 Config(loader::Config),
34 Invalidate(AbsPathBuf),
35}
36
37impl loader::Handle for NotifyHandle {
38 fn spawn(sender: loader::Sender) -> NotifyHandle {
39 let actor = NotifyActor::new(sender);
40 let (sender, receiver) = unbounded::<Message>();
41 let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker, "VfsLoader")
42 .spawn(move || actor.run(receiver))
43 .expect("failed to spawn thread");
44 NotifyHandle { sender, _thread: thread }
45 }
46
47 fn set_config(&mut self, config: loader::Config) {
48 self.sender.send(Message::Config(config)).unwrap();
49 }
50
51 fn invalidate(&mut self, path: AbsPathBuf) {
52 self.sender.send(Message::Invalidate(path)).unwrap();
53 }
54
55 fn load_sync(&mut self, path: &AbsPath) -> Option<Vec<u8>> {
56 read(path)
57 }
58}
59
60type NotifyEvent = notify::Result<notify::Event>;
61
62struct NotifyActor {
63 sender: loader::Sender,
64 watched_file_entries: FxHashSet<AbsPathBuf>,
65 watched_dir_entries: Vec<loader::Directories>,
66 watcher: Option<(RecommendedWatcher, Receiver<NotifyEvent>)>,
68}
69
70#[derive(Debug)]
71enum Event {
72 Message(Message),
73 NotifyEvent(NotifyEvent),
74}
75
76impl NotifyActor {
77 fn new(sender: loader::Sender) -> NotifyActor {
78 NotifyActor {
79 sender,
80 watched_dir_entries: Vec::new(),
81 watched_file_entries: FxHashSet::default(),
82 watcher: None,
83 }
84 }
85
86 fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> {
87 let Some((_, watcher_receiver)) = &self.watcher else {
88 return receiver.recv().ok().map(Event::Message);
89 };
90
91 select! {
92 recv(receiver) -> it => it.ok().map(Event::Message),
93 recv(watcher_receiver) -> it => Some(Event::NotifyEvent(it.unwrap())),
94 }
95 }
96
97 fn run(mut self, inbox: Receiver<Message>) {
98 while let Some(event) = self.next_event(&inbox) {
99 tracing::debug!(?event, "vfs-notify event");
100 match event {
101 Event::Message(msg) => match msg {
102 Message::Config(config) => {
103 self.watcher = None;
104 if !config.watch.is_empty() {
105 let (watcher_sender, watcher_receiver) = unbounded();
106 let watcher = log_notify_error(RecommendedWatcher::new(
107 move |event| {
108 _ = watcher_sender.send(event);
112 },
113 Config::default(),
114 ));
115 self.watcher = watcher.map(|it| (it, watcher_receiver));
116 }
117
118 let config_version = config.version;
119
120 let n_total = config.load.len();
121 self.watched_dir_entries.clear();
122 self.watched_file_entries.clear();
123
124 self.send(loader::Message::Progress {
125 n_total,
126 n_done: LoadingProgress::Started,
127 config_version,
128 dir: None,
129 });
130
131 let (entry_tx, entry_rx) = unbounded();
132 let (watch_tx, watch_rx) = unbounded();
133 let processed = AtomicUsize::new(0);
134
135 config.load.into_par_iter().enumerate().for_each(|(i, entry)| {
136 let do_watch = config.watch.contains(&i);
137 if do_watch {
138 _ = entry_tx.send(entry.clone());
139 }
140 let files = Self::load_entry(
141 |f| _ = watch_tx.send(f.to_owned()),
142 entry,
143 do_watch,
144 |file| {
145 self.send(loader::Message::Progress {
146 n_total,
147 n_done: LoadingProgress::Progress(
148 processed.load(std::sync::atomic::Ordering::Relaxed),
149 ),
150 dir: Some(file),
151 config_version,
152 });
153 },
154 );
155 self.send(loader::Message::Loaded { files });
156 self.send(loader::Message::Progress {
157 n_total,
158 n_done: LoadingProgress::Progress(
159 processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1,
160 ),
161 config_version,
162 dir: None,
163 });
164 });
165
166 drop(watch_tx);
167 for path in watch_rx {
168 self.watch(&path);
169 }
170
171 drop(entry_tx);
172 for entry in entry_rx {
173 match entry {
174 loader::Entry::Files(files) => {
175 self.watched_file_entries.extend(files)
176 }
177 loader::Entry::Directories(dir) => {
178 self.watched_dir_entries.push(dir)
179 }
180 }
181 }
182
183 self.send(loader::Message::Progress {
184 n_total,
185 n_done: LoadingProgress::Finished,
186 config_version,
187 dir: None,
188 });
189 }
190 Message::Invalidate(path) => {
191 let contents = read(path.as_path());
192 let files = vec![(path, contents)];
193 self.send(loader::Message::Changed { files });
194 }
195 },
196 Event::NotifyEvent(event) => {
197 if let Some(event) = log_notify_error(event) {
198 if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) =
199 event.kind
200 {
201 let files = event
202 .paths
203 .into_iter()
204 .filter_map(|path| {
205 Some(
206 AbsPathBuf::try_from(
207 Utf8PathBuf::from_path_buf(path).ok()?,
208 )
209 .expect("path is absolute"),
210 )
211 })
212 .filter_map(|path| -> Option<(AbsPathBuf, Option<Vec<u8>>)> {
213 let meta = fs::metadata(&path).ok()?;
214 if meta.file_type().is_dir()
215 && self
216 .watched_dir_entries
217 .iter()
218 .any(|dir| dir.contains_dir(&path))
219 {
220 self.watch(path.as_ref());
221 return None;
222 }
223
224 if !meta.file_type().is_file() {
225 return None;
226 }
227
228 if !(self.watched_file_entries.contains(&path)
229 || self
230 .watched_dir_entries
231 .iter()
232 .any(|dir| dir.contains_file(&path)))
233 {
234 return None;
235 }
236
237 let contents = read(&path);
238 Some((path, contents))
239 })
240 .collect();
241 self.send(loader::Message::Changed { files });
242 }
243 }
244 }
245 }
246 }
247 }
248
249 fn load_entry(
250 mut watch: impl FnMut(&Path),
251 entry: loader::Entry,
252 do_watch: bool,
253 send_message: impl Fn(AbsPathBuf),
254 ) -> Vec<(AbsPathBuf, Option<Vec<u8>>)> {
255 match entry {
256 loader::Entry::Files(files) => files
257 .into_iter()
258 .map(|file| {
259 if do_watch {
260 watch(file.as_ref());
261 }
262 let contents = read(file.as_path());
263 (file, contents)
264 })
265 .collect::<Vec<_>>(),
266 loader::Entry::Directories(dirs) => {
267 let mut res = Vec::new();
268
269 for root in &dirs.include {
270 send_message(root.clone());
271 let walkdir =
272 WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| {
273 if !entry.file_type().is_dir() {
274 return true;
275 }
276 let path = entry.path();
277
278 if path_might_be_cyclic(path) {
279 return false;
280 }
281
282 dirs.exclude.iter().all(|it| it != path)
284 && (root == path || dirs.include.iter().all(|it| it != path))
285 });
286
287 let files = walkdir.filter_map(|it| it.ok()).filter_map(|entry| {
288 let depth = entry.depth();
289 let is_dir = entry.file_type().is_dir();
290 let is_file = entry.file_type().is_file();
291 let abs_path = AbsPathBuf::try_from(
292 Utf8PathBuf::from_path_buf(entry.into_path()).ok()?,
293 )
294 .ok()?;
295 if depth < 2 && is_dir {
296 send_message(abs_path.clone());
297 }
298 if is_dir && do_watch {
299 watch(abs_path.as_ref());
300 }
301 if !is_file {
302 return None;
303 }
304 let ext = abs_path.extension().unwrap_or_default();
305 if dirs.extensions.iter().all(|it| it.as_str() != ext) {
306 return None;
307 }
308 Some(abs_path)
309 });
310
311 res.extend(files.map(|file| {
312 let contents = read(file.as_path());
313 (file, contents)
314 }));
315 }
316 res
317 }
318 }
319 }
320
321 fn watch(&mut self, path: &Path) {
322 if let Some((watcher, _)) = &mut self.watcher {
323 log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive));
324 }
325 }
326
327 #[track_caller]
328 fn send(&self, msg: loader::Message) {
329 self.sender.send(msg).unwrap();
330 }
331}
332
333fn read(path: &AbsPath) -> Option<Vec<u8>> {
334 std::fs::read(path).ok()
335}
336
337fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> {
338 res.map_err(|err| tracing::warn!("notify error: {}", err)).ok()
339}
340
341fn path_might_be_cyclic(path: &Path) -> bool {
348 let Ok(destination) = std::fs::read_link(path) else {
349 return false;
350 };
351
352 let is_relative_parent =
354 destination.components().all(|c| matches!(c, Component::CurDir | Component::ParentDir));
355
356 is_relative_parent || path.starts_with(destination)
357}