use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use async_std::task::{spawn_blocking, JoinHandle};
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::prelude::*;
use indicatif::ProgressBar;
use notify::{watcher, DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};
use crate::build::BuildSystem;
use crate::config::RtcWatch;
pub struct WatchSystem {
progress: ProgressBar,
build: BuildSystem,
ignores: Vec<PathBuf>,
watch_rx: Receiver<DebouncedEvent>,
build_rx: Receiver<PathBuf>,
_watcher: (JoinHandle<()>, RecommendedWatcher),
}
impl WatchSystem {
pub async fn new(cfg: Arc<RtcWatch>, progress: ProgressBar) -> Result<Self> {
let (watch_tx, watch_rx) = channel(1);
let (build_tx, build_rx) = channel(1);
let mut ignores = cfg.ignore.iter().try_fold(vec![], |mut acc, path| -> Result<Vec<PathBuf>> {
let abs_path = path.canonicalize().map_err(|err| anyhow!("invalid path provided: {}", err))?;
acc.push(abs_path);
Ok(acc)
})?;
ignores.append(&mut vec![cfg.build.dist.clone()]);
let _watcher = build_watcher(watch_tx)?;
let build = BuildSystem::new(cfg.build.clone(), progress.clone(), Some(build_tx)).await?;
Ok(Self {
progress,
build,
ignores,
watch_rx,
build_rx,
_watcher,
})
}
pub async fn build(&mut self) {
if let Err(err) = self.build.build().await {
self.progress.println(format!("{}", err));
}
}
pub async fn run(mut self) {
loop {
futures::select! {
ign_res = self.build_rx.next() => if let Some(ign) = ign_res {
self.update_ignore_list(ign);
},
ev_res = self.watch_rx.next() => if let Some(ev) = ev_res {
self.handle_watch_event(ev).await;
},
}
}
}
async fn handle_watch_event(&mut self, event: DebouncedEvent) {
let ev_path = match event {
DebouncedEvent::Create(path) | DebouncedEvent::Write(path) | DebouncedEvent::Remove(path) | DebouncedEvent::Rename(_, path) => path,
_ => return,
};
for path in ev_path.ancestors() {
if self.ignores.iter().map(|p| p.as_path()).any(|p| p == path) {
return; }
}
if let Err(err) = self.build.build().await {
self.progress.println(format!("{}", err));
}
}
fn update_ignore_list(&mut self, path: PathBuf) {
if !self.ignores.contains(&path) {
self.ignores.push(path);
}
}
}
fn build_watcher(mut watch_tx: Sender<DebouncedEvent>) -> Result<(JoinHandle<()>, RecommendedWatcher)> {
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = watcher(tx, std::time::Duration::from_secs(1)).context("failed to build file system watcher")?;
watcher
.watch(".", RecursiveMode::Recursive)
.context("failed to watch CWD for file system changes")?;
let handle = spawn_blocking(move || loop {
if let Ok(event) = rx.recv() {
let _ = watch_tx.try_send(event);
}
});
Ok((handle, watcher))
}