1use async_std::{fs::File, io::BufReader, prelude::*, sync::Mutex, task};
2
3use notify::event::{DataChange, ModifyKind};
4use notify::{event::EventKind, RecommendedWatcher, RecursiveMode, Watcher};
5use regex::RegexSet;
6use shellexpand::tilde;
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::pin::Pin;
10use std::sync::{mpsc::channel, Arc};
11
12#[derive(Debug, thiserror::Error)]
15pub enum ErrorKind {
16 #[error("failed to open file - {0}")]
17 FileOpenError(std::io::Error),
18 #[error("failed to seek file - {0}")]
19 FileSeekError(std::io::Error),
20}
21
22#[derive(Debug)]
23pub struct LogError {
24 pub kind: ErrorKind,
25 }
27
28impl LogError {
29 pub fn display_error(&self) -> String {
31 match &self.kind {
32 ErrorKind::FileOpenError(err) => {
33 format!("{:?}", err)
35 }
36 ErrorKind::FileSeekError(err) => {
37 format!("{:?}", err)
39 }
40 }
41 }
42}
43
44impl std::fmt::Display for LogError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(f, "{}", self.display_error())
47 }
48}
49
50#[derive(Debug, thiserror::Error)]
51pub enum Error {
52 #[error("event error - {0}")]
53 EventError(notify::Error),
54 #[error("failed to receive data - {0}")]
55 RecvError(std::sync::mpsc::RecvError),
56}
57
58pub struct LogEvent {
60 line: Option<String>,
61 log_error: Option<LogError>,
62 path: String,
63 }
65
66impl LogEvent {
67 fn new(
68 path: String,
69 line: Option<String>,
70 error: Option<LogError>, ) -> Self {
72 Self {
73 path,
74 line,
75 log_error: error,
76 }
78 }
79
80 pub fn file_path(&self) -> &str {
88 self.path.as_str()
89 }
90
91 pub fn get_line(&self) -> Option<&String> {
92 self.line.as_ref()
93 }
94
95 pub fn get_log_error(&self) -> Option<&LogError> {
96 self.log_error.as_ref()
97 }
98}
99
100pub type LogCallback =
103 Arc<dyn Fn(LogEvent) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> + Send + Sync>;
104
105pub struct LogWatcher {
106 log_callbacks: Arc<Mutex<HashMap<String, (LogCallback, Option<RegexSet>)>>>,
107 watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
108}
109
110impl LogWatcher {
111 pub fn new() -> Self {
112 Self {
113 log_callbacks: Arc::new(Mutex::new(HashMap::new())),
114 watcher: Arc::new(Mutex::new(None)),
115 }
116 }
117
118 pub async fn change_file_path(&mut self, old_path: &str, new_path: &str) -> Result<(), Error> {
119 let old_path = self.make_absolute_path(&Path::new(old_path));
121 let old_path = old_path.into_os_string().into_string().unwrap();
122
123 let callback = self.log_callbacks.lock().await.remove(&old_path);
124 if let Some(callback) = callback {
125 self.log_callbacks
126 .lock()
127 .await
128 .insert(new_path.to_owned(), callback);
129 let mut watcher = self.watcher.lock().await;
130 if let Some(watcher) = &mut *watcher {
131 watcher
132 .unwatch(Path::new(&old_path))
133 .map_err(|e| Error::EventError(e))?;
134 watcher
135 .watch(Path::new(new_path), RecursiveMode::NonRecursive)
136 .map_err(|e| Error::EventError(e))?;
137 }
138 }
139 Ok(())
140 }
141
142 pub async fn stop_monitoring_file(&mut self, path: &str) -> Result<(), Error> {
143 let path = self.make_absolute_path(&Path::new(path));
145 let path = path.into_os_string().into_string().unwrap();
146
147 self.log_callbacks.lock().await.remove(&path);
148 let mut watcher = self.watcher.lock().await;
149 if let Some(watcher) = &mut *watcher {
150 watcher
151 .unwatch(Path::new(&path))
152 .map_err(|e| Error::EventError(e))?;
153 }
154 Ok(())
155 }
156
157 fn make_absolute_path(&self, path: &Path) -> PathBuf {
159 let expanded_path = tilde(&path.to_string_lossy()).into_owned();
160 let expanded_path = Path::new(&expanded_path);
161
162 if expanded_path.is_absolute() {
163 expanded_path.to_path_buf()
164 } else {
165 std::env::current_dir().unwrap().join(expanded_path)
166 }
167 }
168
169 pub async fn register<P: AsRef<Path>, F, Fut>(
171 &mut self,
172 path: P,
173 callback: F,
174 patterns: Option<Vec<&str>>,
175 ) where
176 F: Fn(LogEvent) -> Fut + Send + Sync + 'static,
177 Fut: std::future::Future<Output = ()> + Send + Sync + 'static,
178 {
179 let path = self.make_absolute_path(path.as_ref());
180 let path = path.into_os_string().into_string().unwrap();
181
182 let callback = Arc::new(
183 move |log_event: LogEvent| -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
184 Box::pin(callback(log_event))
185 },
186 );
187 let regex_set = if let Some(patterns) = patterns {
188 Some(RegexSet::new(patterns).unwrap())
189 } else {
190 None
191 };
192 self.log_callbacks
193 .lock()
194 .await
195 .insert(path, (callback, regex_set));
196 }
197
198 pub async fn monitoring(&self, poll_interval: std::time::Duration) -> Result<(), Error> {
200 let (tx, rx) = channel();
201
202 let config = notify::Config::default().with_poll_interval(poll_interval);
203
204 let watcher: RecommendedWatcher = Watcher::new(tx, config).unwrap();
205 *self.watcher.lock().await = Some(watcher);
206
207 for path in self.log_callbacks.lock().await.keys() {
208 self.watcher
209 .lock()
210 .await
211 .as_mut()
212 .unwrap()
213 .watch(Path::new(&path), RecursiveMode::NonRecursive)
214 .map_err(|e| Error::EventError(e))?;
215 }
216
217 let file_positions = Arc::new(Mutex::new(HashMap::<String, u64>::new()));
218 loop {
219 match rx.recv() {
220 Ok(event) => match event {
221 Ok(event) => match event.kind {
222 EventKind::Modify(ModifyKind::Data(DataChange::Any)) => {
223 let paths = &event.paths;
224 for path in paths {
225 let path_str = path.clone().into_os_string().into_string().unwrap();
226
227 let log_callbacks = Arc::clone(&self.log_callbacks);
229 let file_positions_clone = Arc::clone(&file_positions);
230
231 task::spawn(async move {
232 let log_callbacks = log_callbacks.lock().await;
234
235 if let Some((callback, regex_set)) =
236 log_callbacks.get(&path_str)
237 {
238 let callback = Arc::clone(callback);
239
240 let mut file_positions = file_positions_clone.lock().await;
241 let position = file_positions
242 .entry(path_str.clone())
243 .or_insert(std::u64::MAX);
244
245 match File::open(&path_str).await {
247 Ok(file) => {
248 let mut reader = BufReader::new(file);
249 let mut line = String::new();
250
251 if *position == std::u64::MAX {
253 *position = find_last_line(&mut reader).await;
254 }
255
256 match reader
258 .seek(std::io::SeekFrom::Start(*position))
259 .await
260 {
261 Ok(_) => {
262 if reader
264 .read_line(&mut line)
265 .await
266 .unwrap()
267 > 0
268 && line.ends_with('\n')
269 {
270 *position += line.len() as u64;
271
272 let line = line
275 .trim_end_matches(|c| {
276 c == '\n' || c == '\r'
277 })
278 .to_owned();
279
280 let notify = if let Some(regex_set) =
281 regex_set
282 {
283 if regex_set.is_match(&line) {
284 true
285 } else {
286 false
287 }
288 } else {
289 true
290 };
291
292 if notify {
293 let event = LogEvent::new(
294 path_str,
295 Some(line),
296 None,
297 );
298 callback(event).await;
299 }
300 }
301 }
302 Err(e) => {
303 let log_error = LogError {
304 kind: ErrorKind::FileSeekError(e),
305 };
306 let event = LogEvent::new(
307 path_str,
308 None,
309 Some(log_error),
310 );
311 callback(event).await;
312 }
313 }
314 }
315 Err(e) => {
316 let log_error = LogError {
317 kind: ErrorKind::FileOpenError(e),
318 };
319 let event =
320 LogEvent::new(path_str, None, Some(log_error));
321 callback(event).await;
322 }
323 }
324 }
325 });
326 }
327 }
328 _ => {}
329 },
330 Err(e) => return Err(Error::EventError(e)),
331 },
332 Err(e) => return Err(Error::RecvError(e)),
333 }
334 }
335 }
336}
337
338async fn find_last_line(reader: &mut BufReader<File>) -> u64 {
340 let mut last_line_start = 0;
341 let mut last_line = String::new();
342 let mut current_position = 0;
343
344 while let Ok(len) = reader.read_line(&mut last_line).await {
345 if len == 0 || !last_line.ends_with('\n') {
346 break;
347 }
348 last_line_start = current_position;
349 current_position += len as u64;
350 last_line.clear();
351 }
352
353 last_line_start
354}
355#[cfg(test)]
356mod tests {
357
358 use super::find_last_line;
359 use async_std::{fs::remove_file, fs::File, io::BufReader, prelude::*};
360
361 #[async_std::test]
362 async fn test_find_last_line() {
363 let filepath = "test-log.txt";
365
366 let _ = remove_file(filepath);
367
368 let mut file = File::create(filepath).await.unwrap();
369
370 file.write_all(b"0\n").await.unwrap();
371 file.write_all(b"1\n").await.unwrap();
372 file.write_all(b"2\n").await.unwrap();
373 file.write_all(b"3\n").await.unwrap();
374 file.flush().await.unwrap();
375
376 let ofile = File::open(&filepath).await.unwrap();
377 let mut reader = BufReader::new(ofile);
378 let position = find_last_line(&mut reader).await;
379
380 assert_eq!(position, 6);
382
383 let mut line = String::new();
384 reader
385 .seek(std::io::SeekFrom::Start(position))
386 .await
387 .unwrap();
388 reader.read_line(&mut line).await.unwrap();
389 assert_eq!(line, "3\n");
391
392 let _ = remove_file(filepath).await; }
394
395 #[async_std::test]
396 async fn test_log_watcher() {
397 let mut log_watcher = super::LogWatcher::new();
398
399 let log_file_1 = "test-log1.txt";
400 let log_file_2 = "test-log2.txt";
401 let log_file_3 = "test-log3.txt";
402
403 let mut file_1 = File::create(log_file_1).await.unwrap();
405 let mut file_2 = File::create(log_file_2).await.unwrap();
406 let mut file_3 = File::create(log_file_3).await.unwrap();
407
408 log_watcher.register(log_file_1, |_| async {}, None).await;
409 log_watcher.register(log_file_2, |_| async {}, None).await;
410
411 file_1.write_all(b"line 1\n").await.unwrap();
413 file_1.sync_all().await.unwrap();
414 file_2.write_all(b"line 2\n").await.unwrap();
415 file_2.sync_all().await.unwrap();
416
417 log_watcher.stop_monitoring_file(log_file_1).await.unwrap();
419 log_watcher
421 .change_file_path(log_file_2, log_file_3)
422 .await
423 .unwrap();
424
425 file_1.write_all(b"line 3\n").await.unwrap();
427 file_1.sync_all().await.unwrap();
428 file_3.write_all(b"line 4\n").await.unwrap();
429 file_3.sync_all().await.unwrap();
430
431 assert!(!log_watcher
432 .log_callbacks
433 .lock()
434 .await
435 .contains_key(log_file_1));
436 assert!(!log_watcher
437 .log_callbacks
438 .lock()
439 .await
440 .contains_key(log_file_2));
441 assert!(log_watcher
442 .log_callbacks
443 .lock()
444 .await
445 .contains_key(log_file_3));
446
447 remove_file(log_file_1).await.unwrap();
449 remove_file(log_file_2).await.unwrap();
450 remove_file(log_file_3).await.unwrap();
451 }
452}