1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
use futures::channel::oneshot::*;
use futures::FutureExt;
use notify::event::{AccessKind, AccessMode, EventKind};
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use std::error::Error;
use std::future::Future;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

/// A way to load URLs from a data source.
///
/// Included implementations:
/// - `PathBuf`, loads data from disk using the `PathBuf` as working directory.
pub trait Loader: Send + Sync {
    /// A future returned when calling `load`.
    type Load: Future<Output = Result<Vec<u8>, Self::Error>> + Send;
    /// A future returned when calling `wait`.
    type Wait: Future<Output = Result<(), Self::Error>> + Send;
    /// Error returned by the loader when the request failed.
    type Error: 'static + Error + Send + Sync;
    /// Asynchronously load a resource located at the given url
    fn load(&self, url: impl AsRef<str>) -> Self::Load;
    /// Wait for a resource to be modified externally.
    fn wait(&self, url: impl AsRef<str>) -> Self::Wait;
}

/// Load file from the filesystem
pub struct FsLoader {
    base: PathBuf,
    _watcher: RecommendedWatcher,
    watches: Arc<Mutex<Vec<(PathBuf, Sender<Result<(), std::io::Error>>)>>>,
}

impl FsLoader {
    /// Construct a new FsLoader with the given base path
    pub fn new(base: PathBuf) -> Result<Self, std::io::Error> {
        let watches = Arc::new(Mutex::new(Vec::<(PathBuf, Sender<Result<(), std::io::Error>>)>::new()));

        let base = base.canonicalize()?;

        let mut watcher = notify::immediate_watcher({
            let watches = watches.clone();
            move |event: Result<notify::event::Event, notify::Error>| match event {
                Ok(event) => match event.kind {
                    EventKind::Access(AccessKind::Close(AccessMode::Write)) => {
                        let mut guard = watches.lock().unwrap();
                        *guard = guard
                            .drain(..)
                            .filter_map(|(path, sender)| {
                                if event.paths.contains(&path) {
                                    sender.send(Ok(())).ok();
                                    None
                                } else {
                                    Some((path, sender))
                                }
                            })
                            .collect();
                    }
                    _ => (),
                },
                Err(_) => (),
            }
        })
        .expect("unable to create watcher");

        watcher
            .watch(&base, RecursiveMode::Recursive)
            .expect("failed to watch base path");

        Ok(Self {
            base,
            _watcher: watcher,
            watches,
        })
    }
}

impl Loader for FsLoader {
    type Load = futures::future::Ready<Result<Vec<u8>, Self::Error>>;
    type Wait = futures::future::Map<
        Receiver<Result<(), Self::Error>>,
        fn(Result<Result<(), Self::Error>, Canceled>) -> Result<(), Self::Error>,
    >;
    type Error = std::io::Error;

    fn load(&self, url: impl AsRef<str>) -> Self::Load {
        let path = self.base.join(std::path::Path::new(url.as_ref()));
        futures::future::ready(std::fs::read(path))
    }

    fn wait(&self, url: impl AsRef<str>) -> Self::Wait {
        let (tx, rx) = channel();

        match self.base.join(std::path::Path::new(url.as_ref())).canonicalize() {
            Ok(path) => {
                self.watches.lock().unwrap().push((path, tx));
            }
            Err(error) => {
                tx.send(Err(error)).ok();
            }
        }

        rx.map(|result| result.unwrap_or(Ok(())))
    }
}