use crate::config::CFG;
use crate::viewer::error::ViewerError;
use crate::viewer::sse_server::SseToken;
use notify::{RecommendedWatcher, RecursiveMode};
use notify_debouncer_mini::{new_debouncer, DebouncedEvent, Debouncer};
use std::panic::panic_any;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::TrySendError;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, SyncSender};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::Instant;
const WATCHER_TIMEOUT: u64 = 10;
const WATCHER_MIN_UPTIME: u128 = 3000;
pub struct FileWatcher {
rx: Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>,
#[allow(dead_code)]
debouncer: Debouncer<RecommendedWatcher>,
event_tx_list: Arc<Mutex<Vec<SyncSender<SseToken>>>>,
terminate_on_browser_disconnect: Arc<AtomicBool>,
start_time: Instant,
}
impl FileWatcher {
pub fn new(
file: PathBuf,
event_tx_list: Arc<Mutex<Vec<SyncSender<SseToken>>>>,
terminate_on_browser_disconnect: Arc<AtomicBool>,
) -> Result<Self, ViewerError> {
let notify_period = CFG.viewer.notify_period;
let (tx, rx) = channel();
let mut debouncer = new_debouncer(Duration::from_millis(notify_period), None, tx)?;
debouncer.watcher().watch(&file, RecursiveMode::Recursive)?;
log::debug!("File watcher started.");
Ok(Self {
rx,
debouncer,
event_tx_list,
start_time: Instant::now(),
terminate_on_browser_disconnect,
})
}
pub fn run(&mut self) {
match Self::run2(self) {
Ok(_) => (),
Err(e) => {
log::debug!("File watcher terminated: {}", e);
}
}
}
fn run2(&mut self) -> Result<(), ViewerError> {
loop {
let evnt = match self.rx.recv_timeout(Duration::from_secs(WATCHER_TIMEOUT)) {
Ok(ev) => ev,
Err(RecvTimeoutError::Timeout) => {
self.update(SseToken::Ping)?;
let tx_list = &mut *self.event_tx_list.lock().unwrap();
if tx_list.is_empty()
&& self.start_time.elapsed().as_millis() > WATCHER_MIN_UPTIME
&& self.terminate_on_browser_disconnect.load(Ordering::SeqCst)
{
return Err(ViewerError::AllSubscriberDiconnected);
}
continue;
}
Err(RecvTimeoutError::Disconnected) => panic_any("RecvTimeoutError::Disconnected"),
};
log::trace!("File watcher event: {:?}", evnt);
match evnt {
Ok(_) => self.update(SseToken::Update)?,
Err(mut e) => return Err(e.remove(1).into()),
}
}
}
pub fn update(&self, msg: SseToken) -> Result<(), ViewerError> {
let tx_list = &mut *self.event_tx_list.lock().unwrap();
let tx_list_len_before_update = tx_list.len();
*tx_list = tx_list
.drain(..)
.filter(|tx| match tx.try_send(msg.to_owned()) {
Ok(()) => true,
Err(TrySendError::Disconnected(_)) => false,
Err(_) => true,
})
.collect();
let tx_list_len = tx_list.len();
log::trace!(
"File watcher `update({:?})`: {} dropped TCP connections, {} still open.",
msg,
tx_list_len_before_update - tx_list_len,
tx_list_len,
);
Ok(())
}
}