1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use futures::channel::oneshot::*;
use futures::FutureExt;
use notify::event::{AccessKind, AccessMode, EventKind};
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use std::error::Error;
use std::future::Future;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
pub trait Loader: Send + Sync {
type Load: Future<Output = Result<Vec<u8>, Self::Error>> + Send;
type Wait: Future<Output = Result<(), Self::Error>> + Send;
type Error: 'static + Error + Send + Sync;
fn load(&self, url: impl AsRef<str>) -> Self::Load;
fn wait(&self, url: impl AsRef<str>) -> Self::Wait;
}
pub struct FsLoader {
base: PathBuf,
_watcher: RecommendedWatcher,
watches: Arc<Mutex<Vec<(PathBuf, Sender<Result<(), std::io::Error>>)>>>,
}
impl FsLoader {
pub fn new(base: PathBuf) -> Result<Self, std::io::Error> {
let watches = Arc::new(Mutex::new(Vec::<(PathBuf, Sender<Result<(), std::io::Error>>)>::new()));
let base = base.canonicalize()?;
let mut watcher = notify::immediate_watcher({
let watches = watches.clone();
move |event: Result<notify::event::Event, notify::Error>| match event {
Ok(event) => match event.kind {
EventKind::Access(AccessKind::Close(AccessMode::Write)) => {
let mut guard = watches.lock().unwrap();
*guard = guard
.drain(..)
.filter_map(|(path, sender)| {
if event.paths.contains(&path) {
sender.send(Ok(())).ok();
None
} else {
Some((path, sender))
}
})
.collect();
}
_ => (),
},
Err(_) => (),
}
})
.expect("unable to create watcher");
watcher
.watch(&base, RecursiveMode::Recursive)
.expect("failed to watch base path");
Ok(Self {
base,
_watcher: watcher,
watches,
})
}
}
impl Loader for FsLoader {
type Load = futures::future::Ready<Result<Vec<u8>, Self::Error>>;
type Wait = futures::future::Map<
Receiver<Result<(), Self::Error>>,
fn(Result<Result<(), Self::Error>, Canceled>) -> Result<(), Self::Error>,
>;
type Error = std::io::Error;
fn load(&self, url: impl AsRef<str>) -> Self::Load {
let path = self.base.join(std::path::Path::new(url.as_ref()));
futures::future::ready(std::fs::read(path))
}
fn wait(&self, url: impl AsRef<str>) -> Self::Wait {
let (tx, rx) = channel();
match self.base.join(std::path::Path::new(url.as_ref())).canonicalize() {
Ok(path) => {
self.watches.lock().unwrap().push((path, tx));
}
Err(error) => {
tx.send(Err(error)).ok();
}
}
rx.map(|result| result.unwrap_or(Ok(())))
}
}