async_log_watch/
lib.rs

1use async_std::{fs::File, io::BufReader, prelude::*, sync::Mutex, task};
2
3use notify::event::{DataChange, ModifyKind};
4use notify::{event::EventKind, RecommendedWatcher, RecursiveMode, Watcher};
5use regex::RegexSet;
6use shellexpand::tilde;
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::pin::Pin;
10use std::sync::{mpsc::channel, Arc};
11
12//==== Errors
13
14#[derive(Debug, thiserror::Error)]
15pub enum ErrorKind {
16    #[error("failed to open file - {0}")]
17    FileOpenError(std::io::Error),
18    #[error("failed to seek file - {0}")]
19    FileSeekError(std::io::Error),
20}
21
22#[derive(Debug)]
23pub struct LogError {
24    pub kind: ErrorKind,
25    // pub path: String,
26}
27
28impl LogError {
29    // Display the error message
30    pub fn display_error(&self) -> String {
31        match &self.kind {
32            ErrorKind::FileOpenError(err) => {
33                // format!("{:?} - {}", err, self.path)
34                format!("{:?}", err)
35            }
36            ErrorKind::FileSeekError(err) => {
37                // format!("{:?} - {}", err, self.path)
38                format!("{:?}", err)
39            }
40        }
41    }
42}
43
44impl std::fmt::Display for LogError {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        write!(f, "{}", self.display_error())
47    }
48}
49
50#[derive(Debug, thiserror::Error)]
51pub enum Error {
52    #[error("event error - {0}")]
53    EventError(notify::Error),
54    #[error("failed to receive data - {0}")]
55    RecvError(std::sync::mpsc::RecvError),
56}
57
58//==== Events
59pub struct LogEvent {
60    line: Option<String>,
61    log_error: Option<LogError>,
62    path: String,
63    // log_watcher: Arc<Mutex<LogWatcher>>,
64}
65
66impl LogEvent {
67    fn new(
68        path: String,
69        line: Option<String>,
70        error: Option<LogError>, /*, log_watcher:Arc<Mutex<LogWatcher>>*/
71    ) -> Self {
72        Self {
73            path,
74            line,
75            log_error: error,
76            // log_watcher
77        }
78    }
79
80    // pub async fn change_file_path(&self, new_path: &str) -> Result<(), Error>{
81    //     self.log_watcher.lock().await.change_file_path(&self.path, new_path).await
82    // }
83    //
84    // pub async fn stop_monitoring_file(&self) -> Result<(), Error>{
85    //     self.log_watcher.lock().await.stop_monitoring_file(&self.path).await
86    // }
87    pub fn file_path(&self) -> &str {
88        self.path.as_str()
89    }
90
91    pub fn get_line(&self) -> Option<&String> {
92        self.line.as_ref()
93    }
94
95    pub fn get_log_error(&self) -> Option<&LogError> {
96        self.log_error.as_ref()
97    }
98}
99
100//==== Callback
101
102pub type LogCallback =
103    Arc<dyn Fn(LogEvent) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> + Send + Sync>;
104
105pub struct LogWatcher {
106    log_callbacks: Arc<Mutex<HashMap<String, (LogCallback, Option<RegexSet>)>>>,
107    watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
108}
109
110impl LogWatcher {
111    pub fn new() -> Self {
112        Self {
113            log_callbacks: Arc::new(Mutex::new(HashMap::new())),
114            watcher: Arc::new(Mutex::new(None)),
115        }
116    }
117
118    pub async fn change_file_path(&mut self, old_path: &str, new_path: &str) -> Result<(), Error> {
119        // change into absolute path
120        let old_path = self.make_absolute_path(&Path::new(old_path));
121        let old_path = old_path.into_os_string().into_string().unwrap();
122
123        let callback = self.log_callbacks.lock().await.remove(&old_path);
124        if let Some(callback) = callback {
125            self.log_callbacks
126                .lock()
127                .await
128                .insert(new_path.to_owned(), callback);
129            let mut watcher = self.watcher.lock().await;
130            if let Some(watcher) = &mut *watcher {
131                watcher
132                    .unwatch(Path::new(&old_path))
133                    .map_err(|e| Error::EventError(e))?;
134                watcher
135                    .watch(Path::new(new_path), RecursiveMode::NonRecursive)
136                    .map_err(|e| Error::EventError(e))?;
137            }
138        }
139        Ok(())
140    }
141
142    pub async fn stop_monitoring_file(&mut self, path: &str) -> Result<(), Error> {
143        // change into absolute path
144        let path = self.make_absolute_path(&Path::new(path));
145        let path = path.into_os_string().into_string().unwrap();
146
147        self.log_callbacks.lock().await.remove(&path);
148        let mut watcher = self.watcher.lock().await;
149        if let Some(watcher) = &mut *watcher {
150            watcher
151                .unwatch(Path::new(&path))
152                .map_err(|e| Error::EventError(e))?;
153        }
154        Ok(())
155    }
156
157    // helper function to convert a relative path into an absolute path
158    fn make_absolute_path(&self, path: &Path) -> PathBuf {
159        let expanded_path = tilde(&path.to_string_lossy()).into_owned();
160        let expanded_path = Path::new(&expanded_path);
161
162        if expanded_path.is_absolute() {
163            expanded_path.to_path_buf()
164        } else {
165            std::env::current_dir().unwrap().join(expanded_path)
166        }
167    }
168
169    // register a file path and its associated callback function.
170    pub async fn register<P: AsRef<Path>, F, Fut>(
171        &mut self,
172        path: P,
173        callback: F,
174        patterns: Option<Vec<&str>>,
175    ) where
176        F: Fn(LogEvent) -> Fut + Send + Sync + 'static,
177        Fut: std::future::Future<Output = ()> + Send + Sync + 'static,
178    {
179        let path = self.make_absolute_path(path.as_ref());
180        let path = path.into_os_string().into_string().unwrap();
181
182        let callback = Arc::new(
183            move |log_event: LogEvent| -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
184                Box::pin(callback(log_event))
185            },
186        );
187        let regex_set = if let Some(patterns) = patterns {
188            Some(RegexSet::new(patterns).unwrap())
189        } else {
190            None
191        };
192        self.log_callbacks
193            .lock()
194            .await
195            .insert(path, (callback, regex_set));
196    }
197
198    // Start monitoring
199    pub async fn monitoring(&self, poll_interval: std::time::Duration) -> Result<(), Error> {
200        let (tx, rx) = channel();
201
202        let config = notify::Config::default().with_poll_interval(poll_interval);
203
204        let watcher: RecommendedWatcher = Watcher::new(tx, config).unwrap();
205        *self.watcher.lock().await = Some(watcher);
206
207        for path in self.log_callbacks.lock().await.keys() {
208            self.watcher
209                .lock()
210                .await
211                .as_mut()
212                .unwrap()
213                .watch(Path::new(&path), RecursiveMode::NonRecursive)
214                .map_err(|e| Error::EventError(e))?;
215        }
216
217        let file_positions = Arc::new(Mutex::new(HashMap::<String, u64>::new()));
218        loop {
219            match rx.recv() {
220                Ok(event) => match event {
221                    Ok(event) => match event.kind {
222                        EventKind::Modify(ModifyKind::Data(DataChange::Any)) => {
223                            let paths = &event.paths;
224                            for path in paths {
225                                let path_str = path.clone().into_os_string().into_string().unwrap();
226
227                                // clone the contianers
228                                let log_callbacks = Arc::clone(&self.log_callbacks);
229                                let file_positions_clone = Arc::clone(&file_positions);
230
231                                task::spawn(async move {
232                                    // to avoid the deadlock
233                                    let log_callbacks = log_callbacks.lock().await;
234
235                                    if let Some((callback, regex_set)) =
236                                        log_callbacks.get(&path_str)
237                                    {
238                                        let callback = Arc::clone(callback);
239
240                                        let mut file_positions = file_positions_clone.lock().await;
241                                        let position = file_positions
242                                            .entry(path_str.clone())
243                                            .or_insert(std::u64::MAX);
244
245                                        // file open
246                                        match File::open(&path_str).await {
247                                            Ok(file) => {
248                                                let mut reader = BufReader::new(file);
249                                                let mut line = String::new();
250
251                                                // need to set initial position
252                                                if *position == std::u64::MAX {
253                                                    *position = find_last_line(&mut reader).await;
254                                                }
255
256                                                // seek from *position
257                                                match reader
258                                                    .seek(std::io::SeekFrom::Start(*position))
259                                                    .await
260                                                {
261                                                    Ok(_) => {
262                                                        // check if a full line has been read
263                                                        if reader
264                                                            .read_line(&mut line)
265                                                            .await
266                                                            .unwrap()
267                                                            > 0
268                                                            && line.ends_with('\n')
269                                                        {
270                                                            *position += line.len() as u64;
271
272                                                            // remove trailing newline character, if present
273                                                            // use the trim_end_matches
274                                                            let line = line
275                                                                .trim_end_matches(|c| {
276                                                                    c == '\n' || c == '\r'
277                                                                })
278                                                                .to_owned();
279
280                                                            let notify = if let Some(regex_set) =
281                                                                regex_set
282                                                            {
283                                                                if regex_set.is_match(&line) {
284                                                                    true
285                                                                } else {
286                                                                    false
287                                                                }
288                                                            } else {
289                                                                true
290                                                            };
291
292                                                            if notify {
293                                                                let event = LogEvent::new(
294                                                                    path_str,
295                                                                    Some(line),
296                                                                    None,
297                                                                );
298                                                                callback(event).await;
299                                                            }
300                                                        }
301                                                    }
302                                                    Err(e) => {
303                                                        let log_error = LogError {
304                                                            kind: ErrorKind::FileSeekError(e),
305                                                        };
306                                                        let event = LogEvent::new(
307                                                            path_str,
308                                                            None,
309                                                            Some(log_error),
310                                                        );
311                                                        callback(event).await;
312                                                    }
313                                                }
314                                            }
315                                            Err(e) => {
316                                                let log_error = LogError {
317                                                    kind: ErrorKind::FileOpenError(e),
318                                                };
319                                                let event =
320                                                    LogEvent::new(path_str, None, Some(log_error));
321                                                callback(event).await;
322                                            }
323                                        }
324                                    }
325                                });
326                            }
327                        }
328                        _ => {}
329                    },
330                    Err(e) => return Err(Error::EventError(e)),
331                },
332                Err(e) => return Err(Error::RecvError(e)),
333            }
334        }
335    }
336}
337
338// find the position of last line.
339async fn find_last_line(reader: &mut BufReader<File>) -> u64 {
340    let mut last_line_start = 0;
341    let mut last_line = String::new();
342    let mut current_position = 0;
343
344    while let Ok(len) = reader.read_line(&mut last_line).await {
345        if len == 0 || !last_line.ends_with('\n') {
346            break;
347        }
348        last_line_start = current_position;
349        current_position += len as u64;
350        last_line.clear();
351    }
352
353    last_line_start
354}
355#[cfg(test)]
356mod tests {
357
358    use super::find_last_line;
359    use async_std::{fs::remove_file, fs::File, io::BufReader, prelude::*};
360
361    #[async_std::test]
362    async fn test_find_last_line() {
363        //
364        let filepath = "test-log.txt";
365
366        let _ = remove_file(filepath);
367
368        let mut file = File::create(filepath).await.unwrap();
369
370        file.write_all(b"0\n").await.unwrap();
371        file.write_all(b"1\n").await.unwrap();
372        file.write_all(b"2\n").await.unwrap();
373        file.write_all(b"3\n").await.unwrap();
374        file.flush().await.unwrap();
375
376        let ofile = File::open(&filepath).await.unwrap();
377        let mut reader = BufReader::new(ofile);
378        let position = find_last_line(&mut reader).await;
379
380        // assert last line position
381        assert_eq!(position, 6);
382
383        let mut line = String::new();
384        reader
385            .seek(std::io::SeekFrom::Start(position))
386            .await
387            .unwrap();
388        reader.read_line(&mut line).await.unwrap();
389        // assert last line
390        assert_eq!(line, "3\n");
391
392        let _ = remove_file(filepath).await; // Remove the file if it exists
393    }
394
395    #[async_std::test]
396    async fn test_log_watcher() {
397        let mut log_watcher = super::LogWatcher::new();
398
399        let log_file_1 = "test-log1.txt";
400        let log_file_2 = "test-log2.txt";
401        let log_file_3 = "test-log3.txt";
402
403        // create log files
404        let mut file_1 = File::create(log_file_1).await.unwrap();
405        let mut file_2 = File::create(log_file_2).await.unwrap();
406        let mut file_3 = File::create(log_file_3).await.unwrap();
407
408        log_watcher.register(log_file_1, |_| async {}, None).await;
409        log_watcher.register(log_file_2, |_| async {}, None).await;
410
411        // write data to log files
412        file_1.write_all(b"line 1\n").await.unwrap();
413        file_1.sync_all().await.unwrap();
414        file_2.write_all(b"line 2\n").await.unwrap();
415        file_2.sync_all().await.unwrap();
416
417        // stop monitoring log_file_1
418        log_watcher.stop_monitoring_file(log_file_1).await.unwrap();
419        // change the path of log_file_2 to log_file_3
420        log_watcher
421            .change_file_path(log_file_2, log_file_3)
422            .await
423            .unwrap();
424
425        // write data to log files
426        file_1.write_all(b"line 3\n").await.unwrap();
427        file_1.sync_all().await.unwrap();
428        file_3.write_all(b"line 4\n").await.unwrap();
429        file_3.sync_all().await.unwrap();
430
431        assert!(!log_watcher
432            .log_callbacks
433            .lock()
434            .await
435            .contains_key(log_file_1));
436        assert!(!log_watcher
437            .log_callbacks
438            .lock()
439            .await
440            .contains_key(log_file_2));
441        assert!(log_watcher
442            .log_callbacks
443            .lock()
444            .await
445            .contains_key(log_file_3));
446
447        // remove the test log files
448        remove_file(log_file_1).await.unwrap();
449        remove_file(log_file_2).await.unwrap();
450        remove_file(log_file_3).await.unwrap();
451    }
452}