use crate::config::CFG;
use crate::viewer::error::ViewerError;
use crate::viewer::sse_server::SseToken;
use notify::{watcher, DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};
use std::panic::panic_any;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::TrySendError;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, SyncSender};
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::time::Duration;
use std::time::Instant;
const WATCHER_TIMEOUT: u64 = 10;
const WAIT_EDITOR_WRITING_NEW_FILE: u64 = 200;
const WATCHER_MIN_UPTIME: u128 = 3000;
pub struct FileWatcher {
rx: Receiver<DebouncedEvent>,
watcher: RecommendedWatcher,
event_tx_list: Arc<Mutex<Vec<SyncSender<SseToken>>>>,
terminate_on_browser_disconnect: Arc<AtomicBool>,
start_time: Instant,
}
impl FileWatcher {
pub fn new(
file: PathBuf,
event_tx_list: Arc<Mutex<Vec<SyncSender<SseToken>>>>,
terminate_on_browser_disconnect: Arc<AtomicBool>,
) -> Result<Self, ViewerError> {
let notify_period = CFG.viewer.notify_period;
let (tx, rx) = channel();
let mut watcher = watcher(tx, Duration::from_millis(notify_period))?;
watcher.watch(&file, RecursiveMode::Recursive)?;
log::debug!("File watcher started.");
Ok(Self {
rx,
watcher,
event_tx_list,
start_time: Instant::now(),
terminate_on_browser_disconnect,
})
}
pub fn run(&mut self) {
match Self::run2(self) {
Ok(_) => (),
Err(e) => {
log::debug!("File watcher terminated: {}", e);
}
}
}
fn run2(&mut self) -> Result<(), ViewerError> {
loop {
let evnt = match self.rx.recv_timeout(Duration::from_secs(WATCHER_TIMEOUT)) {
Ok(ev) => ev,
Err(RecvTimeoutError::Timeout) => {
self.update(SseToken::Ping)?;
let tx_list = &mut *self.event_tx_list.lock().unwrap();
if tx_list.is_empty()
&& self.start_time.elapsed().as_millis() > WATCHER_MIN_UPTIME
&& self.terminate_on_browser_disconnect.load(Ordering::SeqCst)
{
return Err(ViewerError::AllSubscriberDiconnected);
}
continue;
}
Err(RecvTimeoutError::Disconnected) => panic_any(()),
};
log::trace!("File watcher event: {:?}", evnt);
match evnt {
DebouncedEvent::NoticeRemove(path) | DebouncedEvent::Remove(path) => {
sleep(Duration::from_millis(WAIT_EDITOR_WRITING_NEW_FILE));
self.watcher
.watch(path.clone(), RecursiveMode::NonRecursive)
.map_err(|e| e.into())
.and_then(|_| self.update(SseToken::Update))?
}
DebouncedEvent::NoticeWrite(_) | DebouncedEvent::Rescan => {}
DebouncedEvent::Write(_) | DebouncedEvent::Chmod(_) | DebouncedEvent::Create(_) =>
{
self.update(SseToken::Update)?
}
DebouncedEvent::Rename(_path, _) => return Err(ViewerError::LostRenamedFile),
DebouncedEvent::Error(err, _path) => return Err(err.into()),
}
}
}
pub fn update(&self, msg: SseToken) -> Result<(), ViewerError> {
let tx_list = &mut *self.event_tx_list.lock().unwrap();
let tx_list_len_before_update = tx_list.len();
*tx_list = tx_list
.drain(..)
.filter(|tx| match tx.try_send(msg.to_owned()) {
Ok(()) => true,
Err(TrySendError::Disconnected(_)) => false,
Err(_) => true,
})
.collect();
let tx_list_len = tx_list.len();
log::trace!(
"File watcher `update({:?})`: {} dropped TCP connections, {} still open.",
msg,
tx_list_len_before_update - tx_list_len,
tx_list_len,
);
Ok(())
}
}