ra_ap_vfs_notify/
lib.rs

1//! An implementation of `loader::Handle`, based on `walkdir` and `notify`.
2//!
3//! The file watching bits here are untested and quite probably buggy. For this
4//! reason, by default we don't watch files and rely on editor's file watching
5//! capabilities.
6//!
7//! Hopefully, one day a reliable file watching/walking crate appears on
8//! crates.io, and we can reduce this to trivial glue code.
9
10use std::{
11    fs,
12    path::{Component, Path},
13    sync::atomic::AtomicUsize,
14};
15
16use crossbeam_channel::{select, unbounded, Receiver, Sender};
17use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
18use paths::{AbsPath, AbsPathBuf, Utf8PathBuf};
19use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator};
20use rustc_hash::FxHashSet;
21use vfs::loader::{self, LoadingProgress};
22use walkdir::WalkDir;
23
24#[derive(Debug)]
25pub struct NotifyHandle {
26    // Relative order of fields below is significant.
27    sender: Sender<Message>,
28    _thread: stdx::thread::JoinHandle,
29}
30
31#[derive(Debug)]
32enum Message {
33    Config(loader::Config),
34    Invalidate(AbsPathBuf),
35}
36
37impl loader::Handle for NotifyHandle {
38    fn spawn(sender: loader::Sender) -> NotifyHandle {
39        let actor = NotifyActor::new(sender);
40        let (sender, receiver) = unbounded::<Message>();
41        let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker)
42            .name("VfsLoader".to_owned())
43            .spawn(move || actor.run(receiver))
44            .expect("failed to spawn thread");
45        NotifyHandle { sender, _thread: thread }
46    }
47
48    fn set_config(&mut self, config: loader::Config) {
49        self.sender.send(Message::Config(config)).unwrap();
50    }
51
52    fn invalidate(&mut self, path: AbsPathBuf) {
53        self.sender.send(Message::Invalidate(path)).unwrap();
54    }
55
56    fn load_sync(&mut self, path: &AbsPath) -> Option<Vec<u8>> {
57        read(path)
58    }
59}
60
61type NotifyEvent = notify::Result<notify::Event>;
62
63struct NotifyActor {
64    sender: loader::Sender,
65    watched_file_entries: FxHashSet<AbsPathBuf>,
66    watched_dir_entries: Vec<loader::Directories>,
67    // Drop order is significant.
68    watcher: Option<(RecommendedWatcher, Receiver<NotifyEvent>)>,
69}
70
71#[derive(Debug)]
72enum Event {
73    Message(Message),
74    NotifyEvent(NotifyEvent),
75}
76
77impl NotifyActor {
78    fn new(sender: loader::Sender) -> NotifyActor {
79        NotifyActor {
80            sender,
81            watched_dir_entries: Vec::new(),
82            watched_file_entries: FxHashSet::default(),
83            watcher: None,
84        }
85    }
86
87    fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> {
88        let Some((_, watcher_receiver)) = &self.watcher else {
89            return receiver.recv().ok().map(Event::Message);
90        };
91
92        select! {
93            recv(receiver) -> it => it.ok().map(Event::Message),
94            recv(watcher_receiver) -> it => Some(Event::NotifyEvent(it.unwrap())),
95        }
96    }
97
98    fn run(mut self, inbox: Receiver<Message>) {
99        while let Some(event) = self.next_event(&inbox) {
100            tracing::debug!(?event, "vfs-notify event");
101            match event {
102                Event::Message(msg) => match msg {
103                    Message::Config(config) => {
104                        self.watcher = None;
105                        if !config.watch.is_empty() {
106                            let (watcher_sender, watcher_receiver) = unbounded();
107                            let watcher = log_notify_error(RecommendedWatcher::new(
108                                move |event| {
109                                    // we don't care about the error. If sending fails that usually
110                                    // means we were dropped, so unwrapping will just add to the
111                                    // panic noise.
112                                    _ = watcher_sender.send(event);
113                                },
114                                Config::default(),
115                            ));
116                            self.watcher = watcher.map(|it| (it, watcher_receiver));
117                        }
118
119                        let config_version = config.version;
120
121                        let n_total = config.load.len();
122                        self.watched_dir_entries.clear();
123                        self.watched_file_entries.clear();
124
125                        self.send(loader::Message::Progress {
126                            n_total,
127                            n_done: LoadingProgress::Started,
128                            config_version,
129                            dir: None,
130                        });
131
132                        let (entry_tx, entry_rx) = unbounded();
133                        let (watch_tx, watch_rx) = unbounded();
134                        let processed = AtomicUsize::new(0);
135
136                        config.load.into_par_iter().enumerate().for_each(|(i, entry)| {
137                            let do_watch = config.watch.contains(&i);
138                            if do_watch {
139                                _ = entry_tx.send(entry.clone());
140                            }
141                            let files = Self::load_entry(
142                                |f| _ = watch_tx.send(f.to_owned()),
143                                entry,
144                                do_watch,
145                                |file| {
146                                    self.send(loader::Message::Progress {
147                                        n_total,
148                                        n_done: LoadingProgress::Progress(
149                                            processed.load(std::sync::atomic::Ordering::Relaxed),
150                                        ),
151                                        dir: Some(file),
152                                        config_version,
153                                    });
154                                },
155                            );
156                            self.send(loader::Message::Loaded { files });
157                            self.send(loader::Message::Progress {
158                                n_total,
159                                n_done: LoadingProgress::Progress(
160                                    processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1,
161                                ),
162                                config_version,
163                                dir: None,
164                            });
165                        });
166
167                        drop(watch_tx);
168                        for path in watch_rx {
169                            self.watch(&path);
170                        }
171
172                        drop(entry_tx);
173                        for entry in entry_rx {
174                            match entry {
175                                loader::Entry::Files(files) => {
176                                    self.watched_file_entries.extend(files)
177                                }
178                                loader::Entry::Directories(dir) => {
179                                    self.watched_dir_entries.push(dir)
180                                }
181                            }
182                        }
183
184                        self.send(loader::Message::Progress {
185                            n_total,
186                            n_done: LoadingProgress::Finished,
187                            config_version,
188                            dir: None,
189                        });
190                    }
191                    Message::Invalidate(path) => {
192                        let contents = read(path.as_path());
193                        let files = vec![(path, contents)];
194                        self.send(loader::Message::Changed { files });
195                    }
196                },
197                Event::NotifyEvent(event) => {
198                    if let Some(event) = log_notify_error(event) {
199                        if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) =
200                            event.kind
201                        {
202                            let files = event
203                                .paths
204                                .into_iter()
205                                .filter_map(|path| {
206                                    Some(
207                                        AbsPathBuf::try_from(
208                                            Utf8PathBuf::from_path_buf(path).ok()?,
209                                        )
210                                        .expect("path is absolute"),
211                                    )
212                                })
213                                .filter_map(|path| -> Option<(AbsPathBuf, Option<Vec<u8>>)> {
214                                    let meta = fs::metadata(&path).ok()?;
215                                    if meta.file_type().is_dir()
216                                        && self
217                                            .watched_dir_entries
218                                            .iter()
219                                            .any(|dir| dir.contains_dir(&path))
220                                    {
221                                        self.watch(path.as_ref());
222                                        return None;
223                                    }
224
225                                    if !meta.file_type().is_file() {
226                                        return None;
227                                    }
228
229                                    if !(self.watched_file_entries.contains(&path)
230                                        || self
231                                            .watched_dir_entries
232                                            .iter()
233                                            .any(|dir| dir.contains_file(&path)))
234                                    {
235                                        return None;
236                                    }
237
238                                    let contents = read(&path);
239                                    Some((path, contents))
240                                })
241                                .collect();
242                            self.send(loader::Message::Changed { files });
243                        }
244                    }
245                }
246            }
247        }
248    }
249
250    fn load_entry(
251        mut watch: impl FnMut(&Path),
252        entry: loader::Entry,
253        do_watch: bool,
254        send_message: impl Fn(AbsPathBuf),
255    ) -> Vec<(AbsPathBuf, Option<Vec<u8>>)> {
256        match entry {
257            loader::Entry::Files(files) => files
258                .into_iter()
259                .map(|file| {
260                    if do_watch {
261                        watch(file.as_ref());
262                    }
263                    let contents = read(file.as_path());
264                    (file, contents)
265                })
266                .collect::<Vec<_>>(),
267            loader::Entry::Directories(dirs) => {
268                let mut res = Vec::new();
269
270                for root in &dirs.include {
271                    send_message(root.clone());
272                    let walkdir =
273                        WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| {
274                            if !entry.file_type().is_dir() {
275                                return true;
276                            }
277                            let path = entry.path();
278
279                            if path_might_be_cyclic(path) {
280                                return false;
281                            }
282
283                            // We want to filter out subdirectories that are roots themselves, because they will be visited separately.
284                            dirs.exclude.iter().all(|it| it != path)
285                                && (root == path || dirs.include.iter().all(|it| it != path))
286                        });
287
288                    let files = walkdir.filter_map(|it| it.ok()).filter_map(|entry| {
289                        let depth = entry.depth();
290                        let is_dir = entry.file_type().is_dir();
291                        let is_file = entry.file_type().is_file();
292                        let abs_path = AbsPathBuf::try_from(
293                            Utf8PathBuf::from_path_buf(entry.into_path()).ok()?,
294                        )
295                        .ok()?;
296                        if depth < 2 && is_dir {
297                            send_message(abs_path.clone());
298                        }
299                        if is_dir && do_watch {
300                            watch(abs_path.as_ref());
301                        }
302                        if !is_file {
303                            return None;
304                        }
305                        let ext = abs_path.extension().unwrap_or_default();
306                        if dirs.extensions.iter().all(|it| it.as_str() != ext) {
307                            return None;
308                        }
309                        Some(abs_path)
310                    });
311
312                    res.extend(files.map(|file| {
313                        let contents = read(file.as_path());
314                        (file, contents)
315                    }));
316                }
317                res
318            }
319        }
320    }
321
322    fn watch(&mut self, path: &Path) {
323        if let Some((watcher, _)) = &mut self.watcher {
324            log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive));
325        }
326    }
327
328    #[track_caller]
329    fn send(&self, msg: loader::Message) {
330        self.sender.send(msg).unwrap();
331    }
332}
333
334fn read(path: &AbsPath) -> Option<Vec<u8>> {
335    std::fs::read(path).ok()
336}
337
338fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> {
339    res.map_err(|err| tracing::warn!("notify error: {}", err)).ok()
340}
341
342/// Is `path` a symlink to a parent directory?
343///
344/// Including this path is guaranteed to cause an infinite loop. This
345/// heuristic is not sufficient to catch all symlink cycles (it's
346/// possible to construct cycle using two or more symlinks), but it
347/// catches common cases.
348fn path_might_be_cyclic(path: &Path) -> bool {
349    let Ok(destination) = std::fs::read_link(path) else {
350        return false;
351    };
352
353    // If the symlink is of the form "../..", it's a parent symlink.
354    let is_relative_parent =
355        destination.components().all(|c| matches!(c, Component::CurDir | Component::ParentDir));
356
357    is_relative_parent || path.starts_with(destination)
358}