ra_ap_vfs_notify/
lib.rs

1//! An implementation of `loader::Handle`, based on `walkdir` and `notify`.
2//!
3//! The file watching bits here are untested and quite probably buggy. For this
4//! reason, by default we don't watch files and rely on editor's file watching
5//! capabilities.
6//!
7//! Hopefully, one day a reliable file watching/walking crate appears on
8//! crates.io, and we can reduce this to trivial glue code.
9
10use std::{
11    fs,
12    path::{Component, Path},
13    sync::atomic::AtomicUsize,
14};
15
16use crossbeam_channel::{Receiver, Sender, select, unbounded};
17use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
18use paths::{AbsPath, AbsPathBuf, Utf8PathBuf};
19use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator};
20use rustc_hash::FxHashSet;
21use vfs::loader::{self, LoadingProgress};
22use walkdir::WalkDir;
23
24#[derive(Debug)]
25pub struct NotifyHandle {
26    // Relative order of fields below is significant.
27    sender: Sender<Message>,
28    _thread: stdx::thread::JoinHandle,
29}
30
31#[derive(Debug)]
32enum Message {
33    Config(loader::Config),
34    Invalidate(AbsPathBuf),
35}
36
37impl loader::Handle for NotifyHandle {
38    fn spawn(sender: loader::Sender) -> NotifyHandle {
39        let actor = NotifyActor::new(sender);
40        let (sender, receiver) = unbounded::<Message>();
41        let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker, "VfsLoader")
42            .spawn(move || actor.run(receiver))
43            .expect("failed to spawn thread");
44        NotifyHandle { sender, _thread: thread }
45    }
46
47    fn set_config(&mut self, config: loader::Config) {
48        self.sender.send(Message::Config(config)).unwrap();
49    }
50
51    fn invalidate(&mut self, path: AbsPathBuf) {
52        self.sender.send(Message::Invalidate(path)).unwrap();
53    }
54
55    fn load_sync(&mut self, path: &AbsPath) -> Option<Vec<u8>> {
56        read(path)
57    }
58}
59
60type NotifyEvent = notify::Result<notify::Event>;
61
62struct NotifyActor {
63    sender: loader::Sender,
64    watched_file_entries: FxHashSet<AbsPathBuf>,
65    watched_dir_entries: Vec<loader::Directories>,
66    // Drop order is significant.
67    watcher: Option<(RecommendedWatcher, Receiver<NotifyEvent>)>,
68}
69
70#[derive(Debug)]
71enum Event {
72    Message(Message),
73    NotifyEvent(NotifyEvent),
74}
75
76impl NotifyActor {
77    fn new(sender: loader::Sender) -> NotifyActor {
78        NotifyActor {
79            sender,
80            watched_dir_entries: Vec::new(),
81            watched_file_entries: FxHashSet::default(),
82            watcher: None,
83        }
84    }
85
86    fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> {
87        let Some((_, watcher_receiver)) = &self.watcher else {
88            return receiver.recv().ok().map(Event::Message);
89        };
90
91        select! {
92            recv(receiver) -> it => it.ok().map(Event::Message),
93            recv(watcher_receiver) -> it => Some(Event::NotifyEvent(it.unwrap())),
94        }
95    }
96
97    fn run(mut self, inbox: Receiver<Message>) {
98        while let Some(event) = self.next_event(&inbox) {
99            tracing::debug!(?event, "vfs-notify event");
100            match event {
101                Event::Message(msg) => match msg {
102                    Message::Config(config) => {
103                        self.watcher = None;
104                        if !config.watch.is_empty() {
105                            let (watcher_sender, watcher_receiver) = unbounded();
106                            let watcher = log_notify_error(RecommendedWatcher::new(
107                                move |event| {
108                                    // we don't care about the error. If sending fails that usually
109                                    // means we were dropped, so unwrapping will just add to the
110                                    // panic noise.
111                                    _ = watcher_sender.send(event);
112                                },
113                                Config::default(),
114                            ));
115                            self.watcher = watcher.map(|it| (it, watcher_receiver));
116                        }
117
118                        let config_version = config.version;
119
120                        let n_total = config.load.len();
121                        self.watched_dir_entries.clear();
122                        self.watched_file_entries.clear();
123
124                        self.send(loader::Message::Progress {
125                            n_total,
126                            n_done: LoadingProgress::Started,
127                            config_version,
128                            dir: None,
129                        });
130
131                        let (entry_tx, entry_rx) = unbounded();
132                        let (watch_tx, watch_rx) = unbounded();
133                        let processed = AtomicUsize::new(0);
134
135                        config.load.into_par_iter().enumerate().for_each(|(i, entry)| {
136                            let do_watch = config.watch.contains(&i);
137                            if do_watch {
138                                _ = entry_tx.send(entry.clone());
139                            }
140                            let files = Self::load_entry(
141                                |f| _ = watch_tx.send(f.to_owned()),
142                                entry,
143                                do_watch,
144                                |file| {
145                                    self.send(loader::Message::Progress {
146                                        n_total,
147                                        n_done: LoadingProgress::Progress(
148                                            processed.load(std::sync::atomic::Ordering::Relaxed),
149                                        ),
150                                        dir: Some(file),
151                                        config_version,
152                                    });
153                                },
154                            );
155                            self.send(loader::Message::Loaded { files });
156                            self.send(loader::Message::Progress {
157                                n_total,
158                                n_done: LoadingProgress::Progress(
159                                    processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1,
160                                ),
161                                config_version,
162                                dir: None,
163                            });
164                        });
165
166                        drop(watch_tx);
167                        for path in watch_rx {
168                            self.watch(&path);
169                        }
170
171                        drop(entry_tx);
172                        for entry in entry_rx {
173                            match entry {
174                                loader::Entry::Files(files) => {
175                                    self.watched_file_entries.extend(files)
176                                }
177                                loader::Entry::Directories(dir) => {
178                                    self.watched_dir_entries.push(dir)
179                                }
180                            }
181                        }
182
183                        self.send(loader::Message::Progress {
184                            n_total,
185                            n_done: LoadingProgress::Finished,
186                            config_version,
187                            dir: None,
188                        });
189                    }
190                    Message::Invalidate(path) => {
191                        let contents = read(path.as_path());
192                        let files = vec![(path, contents)];
193                        self.send(loader::Message::Changed { files });
194                    }
195                },
196                Event::NotifyEvent(event) => {
197                    if let Some(event) = log_notify_error(event) {
198                        if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) =
199                            event.kind
200                        {
201                            let files = event
202                                .paths
203                                .into_iter()
204                                .filter_map(|path| {
205                                    Some(
206                                        AbsPathBuf::try_from(
207                                            Utf8PathBuf::from_path_buf(path).ok()?,
208                                        )
209                                        .expect("path is absolute"),
210                                    )
211                                })
212                                .filter_map(|path| -> Option<(AbsPathBuf, Option<Vec<u8>>)> {
213                                    let meta = fs::metadata(&path).ok()?;
214                                    if meta.file_type().is_dir()
215                                        && self
216                                            .watched_dir_entries
217                                            .iter()
218                                            .any(|dir| dir.contains_dir(&path))
219                                    {
220                                        self.watch(path.as_ref());
221                                        return None;
222                                    }
223
224                                    if !meta.file_type().is_file() {
225                                        return None;
226                                    }
227
228                                    if !(self.watched_file_entries.contains(&path)
229                                        || self
230                                            .watched_dir_entries
231                                            .iter()
232                                            .any(|dir| dir.contains_file(&path)))
233                                    {
234                                        return None;
235                                    }
236
237                                    let contents = read(&path);
238                                    Some((path, contents))
239                                })
240                                .collect();
241                            self.send(loader::Message::Changed { files });
242                        }
243                    }
244                }
245            }
246        }
247    }
248
249    fn load_entry(
250        mut watch: impl FnMut(&Path),
251        entry: loader::Entry,
252        do_watch: bool,
253        send_message: impl Fn(AbsPathBuf),
254    ) -> Vec<(AbsPathBuf, Option<Vec<u8>>)> {
255        match entry {
256            loader::Entry::Files(files) => files
257                .into_iter()
258                .map(|file| {
259                    if do_watch {
260                        watch(file.as_ref());
261                    }
262                    let contents = read(file.as_path());
263                    (file, contents)
264                })
265                .collect::<Vec<_>>(),
266            loader::Entry::Directories(dirs) => {
267                let mut res = Vec::new();
268
269                for root in &dirs.include {
270                    send_message(root.clone());
271                    let walkdir =
272                        WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| {
273                            if !entry.file_type().is_dir() {
274                                return true;
275                            }
276                            let path = entry.path();
277
278                            if path_might_be_cyclic(path) {
279                                return false;
280                            }
281
282                            // We want to filter out subdirectories that are roots themselves, because they will be visited separately.
283                            dirs.exclude.iter().all(|it| it != path)
284                                && (root == path || dirs.include.iter().all(|it| it != path))
285                        });
286
287                    let files = walkdir.filter_map(|it| it.ok()).filter_map(|entry| {
288                        let depth = entry.depth();
289                        let is_dir = entry.file_type().is_dir();
290                        let is_file = entry.file_type().is_file();
291                        let abs_path = AbsPathBuf::try_from(
292                            Utf8PathBuf::from_path_buf(entry.into_path()).ok()?,
293                        )
294                        .ok()?;
295                        if depth < 2 && is_dir {
296                            send_message(abs_path.clone());
297                        }
298                        if is_dir && do_watch {
299                            watch(abs_path.as_ref());
300                        }
301                        if !is_file {
302                            return None;
303                        }
304                        let ext = abs_path.extension().unwrap_or_default();
305                        if dirs.extensions.iter().all(|it| it.as_str() != ext) {
306                            return None;
307                        }
308                        Some(abs_path)
309                    });
310
311                    res.extend(files.map(|file| {
312                        let contents = read(file.as_path());
313                        (file, contents)
314                    }));
315                }
316                res
317            }
318        }
319    }
320
321    fn watch(&mut self, path: &Path) {
322        if let Some((watcher, _)) = &mut self.watcher {
323            log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive));
324        }
325    }
326
327    #[track_caller]
328    fn send(&self, msg: loader::Message) {
329        self.sender.send(msg).unwrap();
330    }
331}
332
333fn read(path: &AbsPath) -> Option<Vec<u8>> {
334    std::fs::read(path).ok()
335}
336
337fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> {
338    res.map_err(|err| tracing::warn!("notify error: {}", err)).ok()
339}
340
341/// Is `path` a symlink to a parent directory?
342///
343/// Including this path is guaranteed to cause an infinite loop. This
344/// heuristic is not sufficient to catch all symlink cycles (it's
345/// possible to construct cycle using two or more symlinks), but it
346/// catches common cases.
347fn path_might_be_cyclic(path: &Path) -> bool {
348    let Ok(destination) = std::fs::read_link(path) else {
349        return false;
350    };
351
352    // If the symlink is of the form "../..", it's a parent symlink.
353    let is_relative_parent =
354        destination.components().all(|c| matches!(c, Component::CurDir | Component::ParentDir));
355
356    is_relative_parent || path.starts_with(destination)
357}