1use crate::{LoadedListener, LogEvent};
20use encoding_rs::Encoding;
21use log::trace;
22use notify::{event::ModifyKind, recommended_watcher, EventKind, RecursiveMode, Watcher};
23use std::{
24 borrow::Cow,
25 collections::HashMap,
26 fs::File,
27 io::{BufRead, BufReader, Seek, SeekFrom},
28 path::{Path, PathBuf},
29 sync::{
30 mpsc::{channel, RecvTimeoutError, Sender},
31 Arc, RwLock,
32 },
33 thread,
34 time::Duration,
35};
36use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
37use tokio_stream::{wrappers::UnboundedReceiverStream, Stream};
38
39pub struct LogObserver {
54 loaded_listeners: Arc<RwLock<Vec<LoadedListener>>>,
55 listeners: Arc<RwLock<Vec<UnboundedSender<LogEvent>>>>,
56 named_listeners: Arc<RwLock<HashMap<String, Vec<UnboundedSender<LogEvent>>>>>,
57}
58
59impl LogObserver {
60 pub fn new<P: AsRef<Path>>(path: P) -> LogObserver {
61 let path = path.as_ref().to_path_buf();
62 let listeners = Arc::new(RwLock::new(Vec::new()));
63 let named_listeners = Arc::new(RwLock::new(HashMap::new()));
64 let loaded_listeners = Arc::new(RwLock::new(Vec::new()));
65
66 let backend = LogObserverBackend {
67 path,
68 loaded_listeners: loaded_listeners.clone(),
69 listeners: listeners.clone(),
70 named_listeners: named_listeners.clone(),
71 };
72 let (initialized_sender, initialized_receiver) = channel();
73 thread::spawn(|| backend.observe_log(initialized_sender));
74 let _ = initialized_receiver.recv();
77
78 LogObserver {
79 loaded_listeners,
80 listeners,
81 named_listeners,
82 }
83 }
84
85 pub(crate) fn add_loaded_listener(&self, listener: LoadedListener) {
86 self.loaded_listeners.write().unwrap().push(listener);
87 }
88
89 pub fn add_listener(&self) -> impl Stream<Item = LogEvent> {
94 let (sender, receiver) = unbounded_channel();
95 self.listeners.write().unwrap().push(sender);
96 UnboundedReceiverStream::new(receiver)
97 }
98
99 pub fn add_named_listener(&self, name: impl Into<String>) -> impl Stream<Item = LogEvent> {
109 let (sender, receiver) = unbounded_channel();
110 self.named_listeners
111 .write()
112 .unwrap()
113 .entry(name.into())
114 .or_default()
115 .push(sender);
116 UnboundedReceiverStream::new(receiver)
117 }
118}
119
120#[cfg(target_os = "windows")]
121static ENCODING: &'static Encoding = encoding_rs::WINDOWS_1252;
122#[cfg(not(target_os = "windows"))]
123static ENCODING: &'static Encoding = encoding_rs::UTF_8;
124
125struct LogObserverBackend {
126 path: PathBuf,
127 loaded_listeners: Arc<RwLock<Vec<LoadedListener>>>,
128 listeners: Arc<RwLock<Vec<UnboundedSender<LogEvent>>>>,
129 named_listeners: Arc<RwLock<HashMap<String, Vec<UnboundedSender<LogEvent>>>>>,
130}
131impl LogObserverBackend {
132 fn observe_log(self, initialized_sender: Sender<()>) {
133 let (event_sender, event_reciever) = channel();
134 let mut watcher = recommended_watcher(event_sender).unwrap(); let watch_path = self.path.parent().unwrap_or(&self.path);
136 watcher.watch(watch_path, RecursiveMode::Recursive).unwrap(); let mut file = File::open(&self.path).unwrap(); file.seek(SeekFrom::End(0)).unwrap(); let _ = initialized_sender.send(());
142
143 let mut reader = LogFileReader::new(file);
144 self.continue_to_read_file(&mut reader);
145
146 while Arc::strong_count(&self.listeners) > 1 {
148 match event_reciever.recv_timeout(Duration::from_millis(50)) {
150 Ok(Ok(event)) if event.paths.contains(&self.path) => match event.kind {
151 EventKind::Create(_) => self.update_reader(&mut reader),
152 EventKind::Modify(ModifyKind::Data(_)) => {
153 self.continue_to_read_file(&mut reader)
154 }
155 _ => {}
156 },
157 Err(RecvTimeoutError::Timeout) => self.continue_to_read_file(&mut reader),
158 Err(RecvTimeoutError::Disconnected) => panic!("File watcher thread crashed!"),
159 _ => {}
160 }
161 }
162 trace!("Shutting down LogObserverBackend");
163 }
164
165 fn update_reader(&self, reader: &mut LogFileReader) {
166 self.continue_to_read_file(reader);
167 if let Ok(file) = File::open(&self.path) {
168 trace!("Detected file change");
169 reader.set_file(file);
170 }
171 }
172
173 fn continue_to_read_file(&self, reader: &mut LogFileReader) {
174 while let Some(line) = reader.read_next_line() {
175 self.process_line(&line);
176 }
177 }
178
179 fn process_line(&self, line: &str) {
180 if let Some(event) = line.parse::<LogEvent>().ok() {
181 self.send_event_to_loaded_listeners(&event);
182 self.send_event_to_listeners(&event);
183 self.send_event_to_named_listeners(event);
184 }
185 }
186
187 fn send_event_to_loaded_listeners(&self, event: &LogEvent) {
188 let loaded_listeners = self.loaded_listeners.read().unwrap();
189 for loaded_listener in loaded_listeners.iter() {
190 loaded_listener.on_event(event.clone())
191 }
192 }
193
194 fn send_event_to_listeners(&self, event: &LogEvent) {
195 let indexes_to_delete = {
196 let listeners = self.listeners.read().unwrap();
197 send_event_to_listeners(event, listeners.iter())
198 };
199 if !indexes_to_delete.is_empty() {
200 let mut listeners = self.listeners.write().unwrap();
201 delete_indexes(&mut listeners, indexes_to_delete);
202 }
203 }
204
205 fn send_event_to_named_listeners(&self, event: LogEvent) {
206 let indexes_to_delete = {
207 let named_listeners = self.named_listeners.read().unwrap();
208 if let Some(named_listeners) = named_listeners.get(&event.executor) {
209 send_event_to_listeners(&event, named_listeners)
210 } else {
211 Vec::new()
212 }
213 };
214 if !indexes_to_delete.is_empty() {
215 let mut named_listeners = self.named_listeners.write().unwrap();
216 if let Some(listeners) = named_listeners.get_mut(&event.executor) {
217 if indexes_to_delete.len() == listeners.len() {
218 named_listeners.remove(&event.executor);
219 } else {
220 delete_indexes(listeners, indexes_to_delete);
221 }
222 }
223 }
224 }
225}
226
227fn send_event_to_listeners<'l>(
228 event: &LogEvent,
229 listeners: impl IntoIterator<Item = &'l UnboundedSender<LogEvent>>,
230) -> Vec<usize> {
231 let mut indexes_to_delete = Vec::new();
232 for (index, listener) in listeners.into_iter().enumerate() {
233 if let Err(SendError(_event)) = listener.send(event.clone()) {
234 indexes_to_delete.push(index);
235 }
236 }
237 indexes_to_delete
238}
239
240fn delete_indexes<E>(listeners: &mut Vec<E>, indexes_to_delete: Vec<usize>) {
241 for index in indexes_to_delete.into_iter().rev() {
243 listeners.remove(index);
244 }
245}
246
247struct LogFileReader {
248 reader: BufReader<File>,
249 line: Vec<u8>,
250}
251impl LogFileReader {
252 fn new(file: File) -> Self {
253 Self {
254 reader: BufReader::new(file),
255 line: Vec::new(),
256 }
257 }
258
259 fn read_next_line(&mut self) -> Option<Cow<'_, str>> {
260 const LINE_TERMINATOR: u8 = b'\n';
261
262 if self.line.ends_with(&[LINE_TERMINATOR]) {
263 self.line.clear();
265 }
266
267 self.reader
268 .read_until(LINE_TERMINATOR, &mut self.line)
269 .unwrap(); if self.line.ends_with(&[LINE_TERMINATOR]) {
272 let (line, _) = ENCODING.decode_without_bom_handling(&self.line);
273 Some(line)
274 } else {
275 None
276 }
277 }
278
279 fn set_file(&mut self, file: File) {
280 self.reader = BufReader::new(file);
281 }
282}