use crate::{
build::{BuildResult, BuildSystem},
config::{rt::RtcWatch, types::WsProtocol},
ws,
};
use anyhow::{Context, Result};
use futures_util::stream::StreamExt;
use notify::{
event::{MetadataKind, ModifyKind},
EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
};
use notify_debouncer_full::{
new_debouncer_opt, DebounceEventResult, DebouncedEvent, Debouncer, FileIdMap,
};
use std::path::Path;
use std::{fmt::Write, path::PathBuf, sync::Arc, time::Duration};
use tokio::{
sync::{broadcast, mpsc, watch, Mutex},
time::Instant,
};
use tokio_stream::wrappers::BroadcastStream;
pub enum FsDebouncer {
Default(Debouncer<RecommendedWatcher, FileIdMap>),
Polling(Debouncer<PollWatcher, FileIdMap>),
}
impl FsDebouncer {
pub fn watch(
&mut self,
path: impl AsRef<Path>,
recursive_mode: RecursiveMode,
) -> notify::Result<()> {
match self {
Self::Default(deb) => deb.watch(path, recursive_mode),
Self::Polling(deb) => deb.watch(path, recursive_mode),
}
}
}
const BLACKLIST: [&str; 2] = [".git", ".DS_Store"];
const DEBOUNCE_DURATION: Duration = Duration::from_millis(25);
const WATCHER_COOLDOWN: Duration = Duration::from_secs(1);
pub struct WatchSystem {
build: Arc<Mutex<BuildSystem>>,
ignored_paths: Vec<PathBuf>,
watch_rx: mpsc::Receiver<DebouncedEvent>,
ignore_rx: mpsc::Receiver<PathBuf>,
build_tx: mpsc::Sender<BuildResult>,
build_rx: mpsc::Receiver<BuildResult>,
_debouncer: FsDebouncer,
shutdown: BroadcastStream<()>,
ws_state: Option<watch::Sender<ws::State>>,
last_build_started: Instant,
last_build_finished: Instant,
last_change: Instant,
watcher_cooldown: Option<Duration>,
clear_screen: bool,
no_error_reporting: bool,
}
impl WatchSystem {
pub async fn new(
cfg: Arc<RtcWatch>,
shutdown: broadcast::Sender<()>,
ws_state: Option<watch::Sender<ws::State>>,
ws_protocol: Option<WsProtocol>,
) -> Result<Self> {
let (watch_tx, watch_rx) = mpsc::channel(1);
let (ignore_tx, ignore_rx) = mpsc::channel(1);
let (build_tx, build_rx) = mpsc::channel(1);
let _debouncer = build_watcher(watch_tx, cfg.paths.clone(), cfg.poll)?;
let watcher_cooldown = cfg.enable_cooldown.then_some(WATCHER_COOLDOWN);
tracing::debug!(
"Build cooldown: {:?}",
watcher_cooldown.map(humantime::Duration::from)
);
let build = Arc::new(Mutex::new(
BuildSystem::new(cfg.build.clone(), Some(ignore_tx), ws_protocol).await?,
));
Ok(Self {
build,
ignored_paths: cfg.ignored_paths.clone(),
watch_rx,
ignore_rx,
build_rx,
build_tx,
_debouncer,
shutdown: BroadcastStream::new(shutdown.subscribe()),
ws_state,
last_build_started: Instant::now(),
last_build_finished: Instant::now(),
last_change: Instant::now(),
watcher_cooldown,
clear_screen: cfg.clear_screen,
no_error_reporting: cfg.no_error_reporting,
})
}
#[tracing::instrument(level = "trace", skip(self))]
pub async fn build(&mut self) -> Result<()> {
self.build.lock().await.build().await
}
#[tracing::instrument(level = "trace", skip(self))]
pub async fn run(mut self) {
loop {
tokio::select! {
Some(ign) = self.ignore_rx.recv() => self.update_ignore_list(ign),
Some(ev) = self.watch_rx.recv() => self.handle_watch_event(ev).await,
Some(build) = self.build_rx.recv() => self.build_complete(build).await,
_ = self.shutdown.next() => break, }
}
tracing::debug!("watcher system has shut down");
}
#[tracing::instrument(level = "trace", skip(self))]
async fn build_complete(&mut self, build_result: Result<(), anyhow::Error>) {
tracing::debug!("Build reported completion");
self.last_build_finished = Instant::now();
if let Some(tx) = &mut self.ws_state {
match build_result {
Ok(()) => {
let _ = tx.send_replace(ws::State::Ok);
}
Err(err) => {
if !self.no_error_reporting {
let _ = tx.send_replace(ws::State::Failed {
reason: build_error_reason(err),
});
}
}
}
}
self.check_spawn_build().await;
}
fn is_build_active(&self) -> bool {
self.last_build_started > self.last_build_finished
}
async fn spawn_build(&mut self) {
self.last_build_started = Instant::now();
let build = self.build.clone();
let build_tx = self.build_tx.clone();
tokio::spawn(async move {
let result = build.lock().await.build().await;
build_tx.send(result).await
});
}
async fn check_spawn_build(&mut self) {
if self.last_change <= self.last_build_started {
tracing::trace!("No changes since the last build was started");
return;
}
tracing::debug!("Changes since the last build was started, checking cooldown");
if let Some(cooldown) = self.watcher_cooldown {
let time_since_last_build = self
.last_change
.saturating_duration_since(self.last_build_finished);
if time_since_last_build < cooldown {
tracing::debug!(
"Cooldown is still active: {} remaining",
humantime::Duration::from(cooldown - time_since_last_build)
);
return;
}
}
if self.clear_screen {
tracing::trace!("Clear screen is enabled, clearing the screen");
let term = console::Term::stdout();
if let Err(err) = term.clear_screen() {
tracing::error!("Unable to clear the screen due to error: #{err}");
} else {
tracing::trace!("Clear screen is enabled, cleared the screen");
}
}
self.spawn_build().await;
}
#[tracing::instrument(level = "trace", skip(self, event))]
async fn handle_watch_event(&mut self, event: DebouncedEvent) {
tracing::trace!(
"change detected in {:?} of type {:?}",
event.paths,
event.kind
);
if !self.is_event_relevant(&event).await {
tracing::trace!("Event not relevant, skipping");
return;
}
self.last_change = Instant::now();
if self.is_build_active() {
tracing::debug!("Build is active, postponing start");
return;
}
self.check_spawn_build().await;
}
async fn is_event_relevant(&self, event: &DebouncedEvent) -> bool {
match event.event.kind {
EventKind::Modify(
ModifyKind::Name(_)
| ModifyKind::Data(_)
| ModifyKind::Metadata(MetadataKind::WriteTime)
| ModifyKind::Any,
)
| EventKind::Create(_)
| EventKind::Remove(_) => (),
_ => return false,
};
for ev_path in &event.paths {
let ev_path = match tokio::fs::canonicalize(&ev_path).await {
Ok(ev_path) => ev_path,
Err(_) => continue,
};
if ev_path.ancestors().any(|path| {
self.ignored_paths
.iter()
.any(|ignored_path| ignored_path == path)
}) {
continue; }
if ev_path
.components()
.filter_map(|segment| segment.as_os_str().to_str())
.any(|segment| BLACKLIST.contains(&segment))
{
continue; }
tracing::debug!("accepted change in {:?} of type {:?}", ev_path, event.kind);
return true;
}
false
}
fn update_ignore_list(&mut self, arg_path: PathBuf) {
let path = match arg_path.canonicalize() {
Ok(canon_path) => canon_path,
Err(_) => arg_path,
};
if !self.ignored_paths.contains(&path) {
self.ignored_paths.push(path);
}
}
}
fn new_debouncer<T: Watcher>(
watch_tx: mpsc::Sender<DebouncedEvent>,
config: Option<notify::Config>,
) -> Result<Debouncer<T, FileIdMap>> {
new_debouncer_opt::<_, T, FileIdMap>(
DEBOUNCE_DURATION,
None,
move |result: DebounceEventResult| match result {
Ok(events) => events.into_iter().for_each(|event| {
let _ = watch_tx.blocking_send(event);
}),
Err(errors) => errors
.into_iter()
.for_each(|err| tracing::warn!(error=?err, "error from filesystem watcher")),
},
FileIdMap::new(),
config.unwrap_or_default(),
)
.context("failed to build file system watcher")
}
fn build_watcher(
watch_tx: mpsc::Sender<DebouncedEvent>,
paths: Vec<PathBuf>,
poll: Option<Duration>,
) -> Result<FsDebouncer> {
if let Some(duration) = poll {
tracing::info!(
"Running in polling mode: {}",
humantime::Duration::from(duration)
);
}
let mut debouncer = match poll {
None => FsDebouncer::Default(new_debouncer::<RecommendedWatcher>(watch_tx, None)?),
Some(duration) => FsDebouncer::Polling(new_debouncer::<PollWatcher>(
watch_tx,
Some(notify::Config::default().with_poll_interval(duration)),
)?),
};
for path in paths {
debouncer
.watch(&path, RecursiveMode::Recursive)
.context(format!(
"failed to watch {:?} for file system changes",
path
))?;
}
Ok(debouncer)
}
fn build_error_reason(error: anyhow::Error) -> String {
let mut result = error.to_string();
result.push_str("\n\n");
let mut i = 0usize;
let mut next = error.source();
while let Some(current) = next {
if i == 0 {
writeln!(&mut result, "Caused by:").unwrap();
}
writeln!(&mut result, "\t{i}: {current}").unwrap();
i += 1;
next = current.source();
}
result
}