minect/log/
observer.rs

1// Minect is library that allows a program to connect to a running Minecraft instance without
2// requiring any Minecraft mods.
3//
4// © Copyright (C) 2021-2023 Adrodoc <adrodoc55@googlemail.com> & skess42 <skagaros@gmail.com>
5//
6// This file is part of Minect.
7//
8// Minect is free software: you can redistribute it and/or modify it under the terms of the GNU
9// General Public License as published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// Minect is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even
13// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
14// Public License for more details.
15//
16// You should have received a copy of the GNU General Public License along with Minect.
17// If not, see <http://www.gnu.org/licenses/>.
18
19use crate::{LoadedListener, LogEvent};
20use encoding_rs::Encoding;
21use log::trace;
22use notify::{event::ModifyKind, recommended_watcher, EventKind, RecursiveMode, Watcher};
23use std::{
24    borrow::Cow,
25    collections::HashMap,
26    fs::File,
27    io::{BufRead, BufReader, Seek, SeekFrom},
28    path::{Path, PathBuf},
29    sync::{
30        mpsc::{channel, RecvTimeoutError, Sender},
31        Arc, RwLock,
32    },
33    thread,
34    time::Duration,
35};
36use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
37use tokio_stream::{wrappers::UnboundedReceiverStream, Stream};
38
39/// A [LogObserver] reads Minecraft's log file and sends [LogEvent]s to registered listeners. It is
40/// used internally by a [MinecraftConnection](crate::MinecraftConnection), but can be used
41/// explicitely as well, when executing commands is not neccessary.
42///
43/// Internally [LogEvent]s are send to listeners via unbound channels. This means the streams
44/// returned by [add_listener](LogObserver::add_listener) and
45/// [add_named_listener](LogObserver::add_named_listener) should be polled regularly to avoid memory
46/// leaks.
47///
48/// [LogObserver] automatically detects and handles log file rotation and uses an appropriate
49/// character encoding on different platforms.
50///
51/// Each [LogObserver] has an associated background thread that does the actual reading. This thread
52/// is shut down after the [LogObserver] is dropped.
53pub struct LogObserver {
54    loaded_listeners: Arc<RwLock<Vec<LoadedListener>>>,
55    listeners: Arc<RwLock<Vec<UnboundedSender<LogEvent>>>>,
56    named_listeners: Arc<RwLock<HashMap<String, Vec<UnboundedSender<LogEvent>>>>>,
57}
58
59impl LogObserver {
60    pub fn new<P: AsRef<Path>>(path: P) -> LogObserver {
61        let path = path.as_ref().to_path_buf();
62        let listeners = Arc::new(RwLock::new(Vec::new()));
63        let named_listeners = Arc::new(RwLock::new(HashMap::new()));
64        let loaded_listeners = Arc::new(RwLock::new(Vec::new()));
65
66        let backend = LogObserverBackend {
67            path,
68            loaded_listeners: loaded_listeners.clone(),
69            listeners: listeners.clone(),
70            named_listeners: named_listeners.clone(),
71        };
72        let (initialized_sender, initialized_receiver) = channel();
73        thread::spawn(|| backend.observe_log(initialized_sender));
74        // Wait for the background thread to seek the end of the log file. This is important to
75        // ensure that no events of commands executed after starting the log observer are lost.
76        let _ = initialized_receiver.recv();
77
78        LogObserver {
79            loaded_listeners,
80            listeners,
81            named_listeners,
82        }
83    }
84
85    pub(crate) fn add_loaded_listener(&self, listener: LoadedListener) {
86        self.loaded_listeners.write().unwrap().push(listener);
87    }
88
89    /// Returns a [Stream] of all [LogEvent]s. To remove the listener simply drop the stream.
90    ///
91    /// Internally the stream is backed by an unbound channel. This means it should be polled
92    /// regularly to avoid memory leaks.
93    pub fn add_listener(&self) -> impl Stream<Item = LogEvent> {
94        let (sender, receiver) = unbounded_channel();
95        self.listeners.write().unwrap().push(sender);
96        UnboundedReceiverStream::new(receiver)
97    }
98
99    /// Returns a [Stream] of [LogEvent]s with [executor](LogEvent::executor) equal to the given
100    /// `name`. To remove the listener simply drop the stream.
101    ///
102    /// This can be more memory efficient than [add_listener](Self::add_listener), because only a
103    /// small subset of [LogEvent]s has to be buffered if not that many commands are executed with
104    /// the given `name`.
105    ///
106    /// Internally the stream is backed by an unbound channel. This means it should be polled
107    /// regularly to avoid memory leaks.
108    pub fn add_named_listener(&self, name: impl Into<String>) -> impl Stream<Item = LogEvent> {
109        let (sender, receiver) = unbounded_channel();
110        self.named_listeners
111            .write()
112            .unwrap()
113            .entry(name.into())
114            .or_default()
115            .push(sender);
116        UnboundedReceiverStream::new(receiver)
117    }
118}
119
120#[cfg(target_os = "windows")]
121static ENCODING: &'static Encoding = encoding_rs::WINDOWS_1252;
122#[cfg(not(target_os = "windows"))]
123static ENCODING: &'static Encoding = encoding_rs::UTF_8;
124
125struct LogObserverBackend {
126    path: PathBuf,
127    loaded_listeners: Arc<RwLock<Vec<LoadedListener>>>,
128    listeners: Arc<RwLock<Vec<UnboundedSender<LogEvent>>>>,
129    named_listeners: Arc<RwLock<HashMap<String, Vec<UnboundedSender<LogEvent>>>>>,
130}
131impl LogObserverBackend {
132    fn observe_log(self, initialized_sender: Sender<()>) {
133        let (event_sender, event_reciever) = channel();
134        let mut watcher = recommended_watcher(event_sender).unwrap(); // may panic
135        let watch_path = self.path.parent().unwrap_or(&self.path);
136        watcher.watch(watch_path, RecursiveMode::Recursive).unwrap(); // may panic
137
138        let mut file = File::open(&self.path).unwrap(); // may panic
139        file.seek(SeekFrom::End(0)).unwrap(); // may panic
140
141        let _ = initialized_sender.send(());
142
143        let mut reader = LogFileReader::new(file);
144        self.continue_to_read_file(&mut reader);
145
146        // Watch log file as long as the LogFileObserver is not dropped
147        while Arc::strong_count(&self.listeners) > 1 {
148            // On Windows we don't get any modify events, so we check for changes at least once per game tick
149            match event_reciever.recv_timeout(Duration::from_millis(50)) {
150                Ok(Ok(event)) if event.paths.contains(&self.path) => match event.kind {
151                    EventKind::Create(_) => self.update_reader(&mut reader),
152                    EventKind::Modify(ModifyKind::Data(_)) => {
153                        self.continue_to_read_file(&mut reader)
154                    }
155                    _ => {}
156                },
157                Err(RecvTimeoutError::Timeout) => self.continue_to_read_file(&mut reader),
158                Err(RecvTimeoutError::Disconnected) => panic!("File watcher thread crashed!"),
159                _ => {}
160            }
161        }
162        trace!("Shutting down LogObserverBackend");
163    }
164
165    fn update_reader(&self, reader: &mut LogFileReader) {
166        self.continue_to_read_file(reader);
167        if let Ok(file) = File::open(&self.path) {
168            trace!("Detected file change");
169            reader.set_file(file);
170        }
171    }
172
173    fn continue_to_read_file(&self, reader: &mut LogFileReader) {
174        while let Some(line) = reader.read_next_line() {
175            self.process_line(&line);
176        }
177    }
178
179    fn process_line(&self, line: &str) {
180        if let Some(event) = line.parse::<LogEvent>().ok() {
181            self.send_event_to_loaded_listeners(&event);
182            self.send_event_to_listeners(&event);
183            self.send_event_to_named_listeners(event);
184        }
185    }
186
187    fn send_event_to_loaded_listeners(&self, event: &LogEvent) {
188        let loaded_listeners = self.loaded_listeners.read().unwrap();
189        for loaded_listener in loaded_listeners.iter() {
190            loaded_listener.on_event(event.clone())
191        }
192    }
193
194    fn send_event_to_listeners(&self, event: &LogEvent) {
195        let indexes_to_delete = {
196            let listeners = self.listeners.read().unwrap();
197            send_event_to_listeners(event, listeners.iter())
198        };
199        if !indexes_to_delete.is_empty() {
200            let mut listeners = self.listeners.write().unwrap();
201            delete_indexes(&mut listeners, indexes_to_delete);
202        }
203    }
204
205    fn send_event_to_named_listeners(&self, event: LogEvent) {
206        let indexes_to_delete = {
207            let named_listeners = self.named_listeners.read().unwrap();
208            if let Some(named_listeners) = named_listeners.get(&event.executor) {
209                send_event_to_listeners(&event, named_listeners)
210            } else {
211                Vec::new()
212            }
213        };
214        if !indexes_to_delete.is_empty() {
215            let mut named_listeners = self.named_listeners.write().unwrap();
216            if let Some(listeners) = named_listeners.get_mut(&event.executor) {
217                if indexes_to_delete.len() == listeners.len() {
218                    named_listeners.remove(&event.executor);
219                } else {
220                    delete_indexes(listeners, indexes_to_delete);
221                }
222            }
223        }
224    }
225}
226
227fn send_event_to_listeners<'l>(
228    event: &LogEvent,
229    listeners: impl IntoIterator<Item = &'l UnboundedSender<LogEvent>>,
230) -> Vec<usize> {
231    let mut indexes_to_delete = Vec::new();
232    for (index, listener) in listeners.into_iter().enumerate() {
233        if let Err(SendError(_event)) = listener.send(event.clone()) {
234            indexes_to_delete.push(index);
235        }
236    }
237    indexes_to_delete
238}
239
240fn delete_indexes<E>(listeners: &mut Vec<E>, indexes_to_delete: Vec<usize>) {
241    // Back to front to avoid index shifting
242    for index in indexes_to_delete.into_iter().rev() {
243        listeners.remove(index);
244    }
245}
246
247struct LogFileReader {
248    reader: BufReader<File>,
249    line: Vec<u8>,
250}
251impl LogFileReader {
252    fn new(file: File) -> Self {
253        Self {
254            reader: BufReader::new(file),
255            line: Vec::new(),
256        }
257    }
258
259    fn read_next_line(&mut self) -> Option<Cow<'_, str>> {
260        const LINE_TERMINATOR: u8 = b'\n';
261
262        if self.line.ends_with(&[LINE_TERMINATOR]) {
263            // Clear line, because we returned this line last time.
264            self.line.clear();
265        }
266
267        self.reader
268            .read_until(LINE_TERMINATOR, &mut self.line)
269            .unwrap(); // may panic
270
271        if self.line.ends_with(&[LINE_TERMINATOR]) {
272            let (line, _) = ENCODING.decode_without_bom_handling(&self.line);
273            Some(line)
274        } else {
275            None
276        }
277    }
278
279    fn set_file(&mut self, file: File) {
280        self.reader = BufReader::new(file);
281    }
282}