use crate::{LoadedListener, LogEvent};
use encoding_rs::Encoding;
use log::trace;
use notify::{event::ModifyKind, recommended_watcher, EventKind, RecursiveMode, Watcher};
use std::{
borrow::Cow,
collections::HashMap,
fs::File,
io::{BufRead, BufReader, Seek, SeekFrom},
path::{Path, PathBuf},
sync::{
mpsc::{channel, RecvTimeoutError, Sender},
Arc, RwLock,
},
thread,
time::Duration,
};
use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
use tokio_stream::{wrappers::UnboundedReceiverStream, Stream};
pub struct LogObserver {
loaded_listeners: Arc<RwLock<Vec<LoadedListener>>>,
listeners: Arc<RwLock<Vec<UnboundedSender<LogEvent>>>>,
named_listeners: Arc<RwLock<HashMap<String, Vec<UnboundedSender<LogEvent>>>>>,
}
impl LogObserver {
pub fn new<P: AsRef<Path>>(path: P) -> LogObserver {
let path = path.as_ref().to_path_buf();
let listeners = Arc::new(RwLock::new(Vec::new()));
let named_listeners = Arc::new(RwLock::new(HashMap::new()));
let loaded_listeners = Arc::new(RwLock::new(Vec::new()));
let backend = LogObserverBackend {
path,
loaded_listeners: loaded_listeners.clone(),
listeners: listeners.clone(),
named_listeners: named_listeners.clone(),
};
let (initialized_sender, initialized_receiver) = channel();
thread::spawn(|| backend.observe_log(initialized_sender));
let _ = initialized_receiver.recv();
LogObserver {
loaded_listeners,
listeners,
named_listeners,
}
}
pub(crate) fn add_loaded_listener(&self, listener: LoadedListener) {
self.loaded_listeners.write().unwrap().push(listener);
}
pub fn add_listener(&self) -> impl Stream<Item = LogEvent> {
let (sender, receiver) = unbounded_channel();
self.listeners.write().unwrap().push(sender);
UnboundedReceiverStream::new(receiver)
}
pub fn add_named_listener(&self, name: impl Into<String>) -> impl Stream<Item = LogEvent> {
let (sender, receiver) = unbounded_channel();
self.named_listeners
.write()
.unwrap()
.entry(name.into())
.or_default()
.push(sender);
UnboundedReceiverStream::new(receiver)
}
}
#[cfg(target_os = "windows")]
static ENCODING: &'static Encoding = encoding_rs::WINDOWS_1252;
#[cfg(not(target_os = "windows"))]
static ENCODING: &'static Encoding = encoding_rs::UTF_8;
struct LogObserverBackend {
path: PathBuf,
loaded_listeners: Arc<RwLock<Vec<LoadedListener>>>,
listeners: Arc<RwLock<Vec<UnboundedSender<LogEvent>>>>,
named_listeners: Arc<RwLock<HashMap<String, Vec<UnboundedSender<LogEvent>>>>>,
}
impl LogObserverBackend {
fn observe_log(self, initialized_sender: Sender<()>) {
let (event_sender, event_reciever) = channel();
let mut watcher = recommended_watcher(event_sender).unwrap(); let watch_path = self.path.parent().unwrap_or(&self.path);
watcher.watch(watch_path, RecursiveMode::Recursive).unwrap();
let mut file = File::open(&self.path).unwrap(); file.seek(SeekFrom::End(0)).unwrap();
let _ = initialized_sender.send(());
let mut reader = LogFileReader::new(file);
self.continue_to_read_file(&mut reader);
while Arc::strong_count(&self.listeners) > 1 {
match event_reciever.recv_timeout(Duration::from_millis(50)) {
Ok(Ok(event)) if event.paths.contains(&self.path) => match event.kind {
EventKind::Create(_) => self.update_reader(&mut reader),
EventKind::Modify(ModifyKind::Data(_)) => {
self.continue_to_read_file(&mut reader)
}
_ => {}
},
Err(RecvTimeoutError::Timeout) => self.continue_to_read_file(&mut reader),
Err(RecvTimeoutError::Disconnected) => panic!("File watcher thread crashed!"),
_ => {}
}
}
trace!("Shutting down LogObserverBackend");
}
fn update_reader(&self, reader: &mut LogFileReader) {
self.continue_to_read_file(reader);
if let Ok(file) = File::open(&self.path) {
trace!("Detected file change");
reader.set_file(file);
}
}
fn continue_to_read_file(&self, reader: &mut LogFileReader) {
while let Some(line) = reader.read_next_line() {
self.process_line(&line);
}
}
fn process_line(&self, line: &str) {
if let Some(event) = line.parse::<LogEvent>().ok() {
self.send_event_to_loaded_listeners(&event);
self.send_event_to_listeners(&event);
self.send_event_to_named_listeners(event);
}
}
fn send_event_to_loaded_listeners(&self, event: &LogEvent) {
let loaded_listeners = self.loaded_listeners.read().unwrap();
for loaded_listener in loaded_listeners.iter() {
loaded_listener.on_event(event.clone())
}
}
fn send_event_to_listeners(&self, event: &LogEvent) {
let indexes_to_delete = {
let listeners = self.listeners.read().unwrap();
send_event_to_listeners(event, listeners.iter())
};
if !indexes_to_delete.is_empty() {
let mut listeners = self.listeners.write().unwrap();
delete_indexes(&mut listeners, indexes_to_delete);
}
}
fn send_event_to_named_listeners(&self, event: LogEvent) {
let indexes_to_delete = {
let named_listeners = self.named_listeners.read().unwrap();
if let Some(named_listeners) = named_listeners.get(&event.executor) {
send_event_to_listeners(&event, named_listeners)
} else {
Vec::new()
}
};
if !indexes_to_delete.is_empty() {
let mut named_listeners = self.named_listeners.write().unwrap();
if let Some(listeners) = named_listeners.get_mut(&event.executor) {
if indexes_to_delete.len() == listeners.len() {
named_listeners.remove(&event.executor);
} else {
delete_indexes(listeners, indexes_to_delete);
}
}
}
}
}
fn send_event_to_listeners<'l>(
event: &LogEvent,
listeners: impl IntoIterator<Item = &'l UnboundedSender<LogEvent>>,
) -> Vec<usize> {
let mut indexes_to_delete = Vec::new();
for (index, listener) in listeners.into_iter().enumerate() {
if let Err(SendError(_event)) = listener.send(event.clone()) {
indexes_to_delete.push(index);
}
}
indexes_to_delete
}
fn delete_indexes<E>(listeners: &mut Vec<E>, indexes_to_delete: Vec<usize>) {
for index in indexes_to_delete.into_iter().rev() {
listeners.remove(index);
}
}
struct LogFileReader {
reader: BufReader<File>,
line: Vec<u8>,
}
impl LogFileReader {
fn new(file: File) -> Self {
Self {
reader: BufReader::new(file),
line: Vec::new(),
}
}
fn read_next_line(&mut self) -> Option<Cow<'_, str>> {
const LINE_TERMINATOR: u8 = b'\n';
if self.line.ends_with(&[LINE_TERMINATOR]) {
self.line.clear();
}
self.reader
.read_until(LINE_TERMINATOR, &mut self.line)
.unwrap();
if self.line.ends_with(&[LINE_TERMINATOR]) {
let (line, _) = ENCODING.decode_without_bom_handling(&self.line);
Some(line)
} else {
None
}
}
fn set_file(&mut self, file: File) {
self.reader = BufReader::new(file);
}
}