Skip to main content

verso/library/
watch.rs

1use crossbeam_channel::{unbounded, Receiver};
2use notify::{Event, EventKind, RecursiveMode, Watcher};
3use std::path::{Path, PathBuf};
4use std::sync::mpsc;
5use std::time::{Duration, Instant};
6
7#[derive(Debug)]
8pub enum LibraryEvent {
9    Created(PathBuf),
10    Removed(PathBuf),
11    Renamed { from: PathBuf, to: PathBuf },
12    Changed,
13}
14
15/// Returns a receiver of library events and the watcher handle that must be kept alive.
16pub fn spawn_watcher(
17    dir: &Path,
18) -> anyhow::Result<(Receiver<LibraryEvent>, notify::RecommendedWatcher)> {
19    let (raw_tx, raw_rx) = mpsc::channel::<notify::Result<Event>>();
20    let mut watcher: notify::RecommendedWatcher = notify::recommended_watcher(move |res| {
21        let _ = raw_tx.send(res);
22    })?;
23    watcher.watch(dir, RecursiveMode::Recursive)?;
24
25    let (out_tx, out_rx) = unbounded::<LibraryEvent>();
26    std::thread::Builder::new()
27        .name("verso-fs-watch".into())
28        .spawn(move || {
29            // 500 ms coalescing.
30            let mut last_flush = Instant::now();
31            let mut pending: Vec<LibraryEvent> = Vec::new();
32            loop {
33                match raw_rx.recv_timeout(Duration::from_millis(100)) {
34                    Ok(Ok(ev)) => pending.extend(map_event(ev)),
35                    Ok(Err(_e)) => {}
36                    Err(_) => {}
37                }
38                if last_flush.elapsed() >= Duration::from_millis(500) && !pending.is_empty() {
39                    for ev in pending.drain(..) {
40                        if out_tx.send(ev).is_err() {
41                            return;
42                        }
43                    }
44                    last_flush = Instant::now();
45                }
46            }
47        })?;
48
49    Ok((out_rx, watcher))
50}
51
52fn map_event(ev: Event) -> Vec<LibraryEvent> {
53    use EventKind::*;
54    match ev.kind {
55        Create(_) => ev.paths.into_iter().map(LibraryEvent::Created).collect(),
56        Remove(_) => ev.paths.into_iter().map(LibraryEvent::Removed).collect(),
57        Modify(notify::event::ModifyKind::Name(_)) if ev.paths.len() == 2 => {
58            vec![LibraryEvent::Renamed {
59                from: ev.paths[0].clone(),
60                to: ev.paths[1].clone(),
61            }]
62        }
63        _ => vec![LibraryEvent::Changed],
64    }
65}