use crate::prelude::terminal;
use anyhow::Result;
use forky_core::prelude::*;
use futures::SinkExt;
use futures::StreamExt;
use notify::*;
use std::path::Path;
use std::sync::MutexGuard;
use std::time::Duration;
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct FsWatcher {
pub path: String,
pub interval: Duration,
pub run_on_start: bool,
pub quiet: bool,
pub once_per_tick: bool,
pub watches: Vec<glob::Pattern>,
pub mutex: Option<ArcMut<()>>,
pub ignores: Vec<glob::Pattern>,
}
impl Default for FsWatcher {
fn default() -> Self {
Self {
mutex: None,
run_on_start: true,
quiet: false,
once_per_tick: true,
path: String::from("./"),
interval: Duration::from_secs(1),
watches: Vec::new(),
ignores: Vec::new(),
}
}
}
impl FsWatcher {
pub fn new() -> Self { Self::default() }
pub fn with_path(mut self, path: String) -> Self {
self.path = path;
self
}
pub fn with_quiet(mut self, quiet: bool) -> Self {
self.quiet = quiet;
self
}
pub fn with_run_on_start(mut self, run_on_start: bool) -> Self {
self.run_on_start = run_on_start;
self
}
pub fn with_mutex(mut self, mutex: ArcMut<()>) -> Self {
self.mutex = Some(mutex);
self
}
pub fn with_watches(mut self, watch: Vec<&str>) -> Self {
self.watches = watch
.iter()
.map(|w| glob::Pattern::new(w).unwrap())
.collect();
self
}
pub fn with_ignores(mut self, ignore: Vec<&str>) -> Self {
self.ignores = ignore
.iter()
.map(|w| glob::Pattern::new(w).unwrap())
.collect();
self
}
pub fn with_watch(mut self, watch: &str) -> Self {
self.watches.push(glob::Pattern::new(watch).unwrap());
self
}
pub fn with_ignore(mut self, watch: &str) -> Self {
self.ignores.push(glob::Pattern::new(watch).unwrap());
self
}
pub fn passes(&self, path: &Path) -> bool {
let path = path.to_forward_slash_str();
self.passes_watch(&path) && self.passes_ignore(&path)
}
pub fn passes_watch(&self, path: &str) -> bool {
self.watches.iter().any(|watch| watch.matches(path))
|| self.watches.is_empty()
}
pub fn passes_ignore(&self, path: &str) -> bool {
false == self.ignores.iter().any(|watch| watch.matches(path))
}
pub fn watch_log(&self) -> Result<()> {
self.watch(|e| {
println!("changed: {:?}", e);
Ok(())
})
}
fn prep_terminal(&self) {
if !self.quiet {
terminal::clear();
terminal::print_forky();
println!(
"watching...\nwatching: {:?}\nignoring: {:?}\n",
self.watches.iter().map(|w| w.as_str()).collect::<Vec<_>>(),
self.ignores.iter().map(|w| w.as_str()).collect::<Vec<_>>()
);
}
}
fn lock(&self) -> Option<MutexGuard<()>> {
self.mutex.as_ref().map(|m| m.lock().unwrap())
}
fn handle_rx(
&self,
res: Result<Event, Error>,
(start, last_run): &mut (Instant, Instant),
on_change: impl Fn(&str) -> Result<()>,
) -> Result<bool> {
let mut changed = false;
let now = Instant::now();
let last_elapsed = now.duration_since(*last_run);
if last_elapsed < self.interval {
return Ok(changed);
}
match res {
Ok(e) => {
let _mutex = self.lock();
let last_run2 = last_run;
let start_elapsed = now.duration_since(*start).as_secs_f32();
e.paths
.iter()
.filter(|path| self.passes(&path))
.take(if self.once_per_tick { 1 } else { usize::MAX })
.map(|path| {
self.prep_terminal();
if !self.quiet {
println!(
"{:.2} - file changed: {}\n",
start_elapsed,
path.to_str().unwrap_or("")
)
}
on_change(path.to_str().ok()?)?;
*last_run2 = Instant::now();
changed = true;
Ok(())
})
.collect::<Result<()>>()?;
}
Err(e) => eprintln!("watch error: {:?}", e),
}
Ok(changed)
}
pub fn block(&self) -> Result<()> {
let (_watcher, rx) = self.watcher()?;
let mut timers = timers();
for res in rx {
if self.handle_rx(res, &mut timers, |_| Ok(()))? {
return Ok(());
}
}
Ok(())
}
pub async fn block_async(&self) -> Result<()> {
let (_watcher, mut rx) = self.watcher_async()?;
let mut timers = timers();
while let Some(res) = rx.next().await {
if self.handle_rx(res, &mut timers, |_| Ok(()))? {
return Ok(());
}
}
Ok(())
}
pub fn watch(&self, on_change: impl Fn(&str) -> Result<()>) -> Result<()> {
self.try_run_on_start(&on_change)?;
let (_watcher, rx) = self.watcher()?;
let mut timers = timers();
for res in rx {
self.handle_rx(res, &mut timers, &on_change)?;
}
Ok(())
}
pub async fn watch_async(
&self,
on_change: impl Fn(&str) -> Result<()>,
) -> Result<()> {
self.try_run_on_start(&on_change)?;
let (_watcher, mut rx) = self.watcher_async()?;
let mut timers = timers();
while let Some(res) = rx.next().await {
self.handle_rx(res, &mut timers, &on_change)?;
}
Ok(())
}
fn try_run_on_start(
&self,
on_change: impl Fn(&str) -> Result<()>,
) -> Result<()> {
if self.run_on_start {
let _mutex = self.lock();
self.prep_terminal();
on_change("")?;
}
Ok(())
}
fn watcher(
&self,
) -> Result<(
RecommendedWatcher,
std::sync::mpsc::Receiver<notify::Result<Event>>,
)> {
let path = Path::new(&self.path);
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
watcher.watch(path, RecursiveMode::Recursive)?;
Ok((watcher, rx))
}
fn watcher_async(
&self,
) -> Result<(
RecommendedWatcher,
futures::channel::mpsc::Receiver<notify::Result<Event>>,
)> {
let (mut tx, rx) = futures::channel::mpsc::channel(1);
let mut watcher = RecommendedWatcher::new(
move |res| {
futures::executor::block_on(async {
if let Err(err) = tx.send(res).await {
eprintln!("{:?}", err);
}
})
},
Config::default(),
)?;
let path = Path::new(&self.path);
watcher.watch(path, RecursiveMode::Recursive)?;
Ok((watcher, rx))
}
}
fn timers() -> (Instant, Instant) {
let start = Instant::now();
let last_run = start;
(start, last_run)
}