1use crossbeam_channel::{unbounded, Receiver};
2use notify::{Event, EventKind, RecursiveMode, Watcher};
3use std::path::{Path, PathBuf};
4use std::sync::mpsc;
5use std::time::{Duration, Instant};
6
7#[derive(Debug)]
8pub enum LibraryEvent {
9 Created(PathBuf),
10 Removed(PathBuf),
11 Renamed { from: PathBuf, to: PathBuf },
12 Changed,
13}
14
15pub fn spawn_watcher(
17 dir: &Path,
18) -> anyhow::Result<(Receiver<LibraryEvent>, notify::RecommendedWatcher)> {
19 let (raw_tx, raw_rx) = mpsc::channel::<notify::Result<Event>>();
20 let mut watcher: notify::RecommendedWatcher = notify::recommended_watcher(move |res| {
21 let _ = raw_tx.send(res);
22 })?;
23 watcher.watch(dir, RecursiveMode::Recursive)?;
24
25 let (out_tx, out_rx) = unbounded::<LibraryEvent>();
26 std::thread::Builder::new()
27 .name("verso-fs-watch".into())
28 .spawn(move || {
29 let mut last_flush = Instant::now();
31 let mut pending: Vec<LibraryEvent> = Vec::new();
32 loop {
33 match raw_rx.recv_timeout(Duration::from_millis(100)) {
34 Ok(Ok(ev)) => pending.extend(map_event(ev)),
35 Ok(Err(_e)) => {}
36 Err(_) => {}
37 }
38 if last_flush.elapsed() >= Duration::from_millis(500) && !pending.is_empty() {
39 for ev in pending.drain(..) {
40 if out_tx.send(ev).is_err() {
41 return;
42 }
43 }
44 last_flush = Instant::now();
45 }
46 }
47 })?;
48
49 Ok((out_rx, watcher))
50}
51
52fn map_event(ev: Event) -> Vec<LibraryEvent> {
53 use EventKind::*;
54 match ev.kind {
55 Create(_) => ev.paths.into_iter().map(LibraryEvent::Created).collect(),
56 Remove(_) => ev.paths.into_iter().map(LibraryEvent::Removed).collect(),
57 Modify(notify::event::ModifyKind::Name(_)) if ev.paths.len() == 2 => {
58 vec![LibraryEvent::Renamed {
59 from: ev.paths[0].clone(),
60 to: ev.paths[1].clone(),
61 }]
62 }
63 _ => vec![LibraryEvent::Changed],
64 }
65}