async_log_watcher/
lib.rs

1use std::{
2    error::Error,
3    future::Future,
4    io::SeekFrom,
5    path::{Path, PathBuf},
6    pin::Pin,
7    sync::Arc,
8    time::Duration,
9};
10
11use tokio::{
12    fs::File,
13    io::{AsyncReadExt, AsyncSeekExt, BufReader},
14    sync::mpsc::{
15        error::{SendError, TryRecvError},
16        Receiver, Sender,
17    },
18    time::sleep,
19};
20
21#[derive(Debug)]
22struct LogBufReader {
23    file: BufReader<File>,
24    sender: Arc<Sender<Vec<u8>>>,
25    path: PathBuf,
26    last_ctime: u64,
27}
28
29#[derive(Debug)]
30pub struct LogWatcher {
31    receiver: Receiver<Vec<u8>>,
32    sender: Arc<Sender<Vec<u8>>>,
33    path: PathBuf,
34    signal_tx: Sender<LogWatcherSignal>,
35    signal_rx: std::sync::Mutex<Option<Receiver<LogWatcherSignal>>>,
36}
37
38#[derive(Debug)]
39enum DetachedLogWatcher {
40    Initializing(LogBufReader),
41    Waiting(LogBufReader),
42    Reading(LogBufReader),
43    Missing(LogBufReader),
44    Reloading((PathBuf, Arc<Sender<Vec<u8>>>)),
45    Closed,
46}
47#[derive(Debug)]
48pub enum LogWatcherSignal {
49    Close,
50    Reload,
51    Swap(PathBuf),
52}
53
54type BoxedError = Box<dyn Error + 'static + Send + Sync>;
55type BoxedResult<T> = Result<T, BoxedError>;
56type SpawnFnResult = Pin<Box<dyn Future<Output = BoxedResult<()>> + Send + Sync>>;
57
58impl LogWatcher {
59    pub fn new(file_path: impl Into<PathBuf>) -> Self {
60        let (sender, receiver) = tokio::sync::mpsc::channel(256);
61        let (signal_tx, signal_rx) = tokio::sync::mpsc::channel(256);
62
63        Self {
64            receiver,
65            sender: Arc::new(sender),
66            path: file_path.into(),
67            signal_rx: Some(signal_rx).into(),
68            signal_tx,
69        }
70    }
71
72    pub async fn send_signal(
73        &self,
74        signal: LogWatcherSignal,
75    ) -> Result<(), SendError<LogWatcherSignal>> {
76        self.signal_tx.send(signal).await
77    }
78
79    pub async fn read_message(&mut self) -> Option<Vec<u8>> {
80        self.receiver.recv().await
81    }
82
83    pub fn try_read_message(&mut self) -> Result<Vec<u8>, TryRecvError> {
84        self.receiver.try_recv()
85    }
86
87    pub fn spawn(&self, skip_to_end: bool) -> SpawnFnResult {
88        let sender = self.sender.clone();
89        let path = self.path.clone();
90
91        let signal_rx = self.signal_rx.lock().unwrap().take();
92
93        if signal_rx.is_none() {
94            panic!("Log watcher spanwed twice {:?}", self);
95        };
96
97        let mut signal_rx = signal_rx.unwrap();
98
99        let future: SpawnFnResult = Box::pin(async move {
100            let file = File::open(&path).await?;
101            let mut detached = if skip_to_end {
102                DetachedLogWatcher::Initializing(LogBufReader {
103                    file: BufReader::new(file),
104                    sender,
105                    path: path.clone(),
106                    last_ctime: get_c_time(&path).await.unwrap(),
107                })
108            } else {
109                DetachedLogWatcher::Reading(LogBufReader {
110                    file: BufReader::new(file),
111                    sender,
112                    path: path.clone(),
113                    last_ctime: get_c_time(&path).await.unwrap(),
114                })
115            };
116
117            loop {
118                match signal_rx.try_recv() {
119                    Ok(LogWatcherSignal::Close) => {
120                        detached.close().await;
121                    }
122                    Ok(LogWatcherSignal::Reload) => {
123                        detached.reload().await;
124                    }
125                    Ok(LogWatcherSignal::Swap(path)) => {
126                        detached.swap(path).await;
127                    }
128                    Err(err) => {
129                        if err == TryRecvError::Disconnected {
130                            break;
131                        }
132                    }
133                }
134
135                match detached {
136                    DetachedLogWatcher::Closed => {
137                        break;
138                    }
139                    _ => {
140                        detached = detached
141                            .next()
142                            .await
143                            .expect("failed to move next on detached log watcher");
144                    }
145                }
146            }
147
148            Ok(())
149        });
150        future
151    }
152}
153
154impl DetachedLogWatcher {
155    pub async fn next(self) -> Result<Self, std::io::Error> {
156        match self {
157            DetachedLogWatcher::Initializing(mut inner) => {
158                inner.skip_file().await?;
159                Ok(DetachedLogWatcher::Waiting(inner))
160            }
161            DetachedLogWatcher::Waiting(mut inner) => match inner.read_next().await {
162                Ok(size) if size > 4096 => Ok(DetachedLogWatcher::Reading(inner)),
163                Ok(size) => {
164                    if size == 0 {
165                        let curr_ctime = get_c_time(&inner.path).await?;
166
167                        if curr_ctime > inner.last_ctime {
168                            return Ok(DetachedLogWatcher::Missing(inner));
169                        }
170
171                        sleep(Duration::from_secs(1)).await;
172                        Ok(DetachedLogWatcher::Waiting(inner))
173                    } else {
174                        sleep(Duration::from_secs(1)).await;
175                        Ok(DetachedLogWatcher::Waiting(inner))
176                    }
177                }
178                Err(err) => match err.kind() {
179                    std::io::ErrorKind::NotFound => Ok(DetachedLogWatcher::Missing(inner)),
180                    _ => Err(err),
181                },
182            },
183            DetachedLogWatcher::Reading(mut inner) => match inner.read_next().await {
184                Ok(size) if size < 4096 => Ok(DetachedLogWatcher::Waiting(inner)),
185                Ok(_) => Ok(DetachedLogWatcher::Reading(inner)),
186                Err(err) => match err.kind() {
187                    std::io::ErrorKind::NotFound => Ok(DetachedLogWatcher::Missing(inner)),
188                    _ => Err(err),
189                },
190            },
191            DetachedLogWatcher::Missing(inner) => {
192                inner.sender.try_send(inner.file.buffer().to_vec()).ok();
193                Ok(DetachedLogWatcher::Reloading((inner.path, inner.sender)))
194            }
195            DetachedLogWatcher::Reloading((path, sender)) => {
196                let file_exists = match tokio::fs::metadata(&path).await {
197                    Ok(meta) => Ok(meta.is_file()),
198                    Err(err) => match err.kind() {
199                        std::io::ErrorKind::NotFound | std::io::ErrorKind::PermissionDenied => {
200                            Ok(false)
201                        }
202                        _ => Err(err),
203                    },
204                }?;
205
206                if file_exists {
207                    let new_inner = LogBufReader {
208                        file: BufReader::new(File::open(&path).await?),
209                        path: path.clone(),
210                        sender,
211                        last_ctime: get_c_time(&path).await.unwrap(),
212                    };
213
214                    Ok(DetachedLogWatcher::Waiting(new_inner))
215                } else {
216                    sleep(Duration::from_secs(1)).await;
217                    Ok(DetachedLogWatcher::Reloading((path, sender)))
218                }
219            }
220            DetachedLogWatcher::Closed => Ok(DetachedLogWatcher::Closed),
221        }
222    }
223
224    pub async fn close(&mut self) {
225        match self {
226            DetachedLogWatcher::Initializing(inner)
227            | DetachedLogWatcher::Waiting(inner)
228            | DetachedLogWatcher::Reading(inner)
229            | DetachedLogWatcher::Missing(inner) => {
230                inner.read_next().await.ok();
231                *self = DetachedLogWatcher::Closed
232            }
233            DetachedLogWatcher::Reloading(_) => *self = DetachedLogWatcher::Closed,
234            DetachedLogWatcher::Closed => {}
235        }
236    }
237
238    pub async fn reload(&mut self) {
239        match self {
240            DetachedLogWatcher::Initializing(inner)
241            | DetachedLogWatcher::Waiting(inner)
242            | DetachedLogWatcher::Reading(inner)
243            | DetachedLogWatcher::Missing(inner) => {
244                let result = inner.read_next().await.unwrap_or(0);
245
246                if result == 0 {
247                    inner.sender.try_send(inner.file.buffer().to_vec()).ok();
248                }
249                *self = DetachedLogWatcher::Reloading((inner.path.clone(), inner.sender.clone()));
250            }
251            DetachedLogWatcher::Reloading(_) | DetachedLogWatcher::Closed => {}
252        }
253    }
254
255    pub async fn swap(&mut self, path: PathBuf) {
256        match self {
257            DetachedLogWatcher::Initializing(inner)
258            | DetachedLogWatcher::Waiting(inner)
259            | DetachedLogWatcher::Reading(inner)
260            | DetachedLogWatcher::Missing(inner) => {
261                let result = inner.read_next().await.unwrap_or(0);
262
263                if result == 0 {
264                    inner.sender.try_send(inner.file.buffer().to_vec()).ok();
265                }
266                *self = DetachedLogWatcher::Reloading((path, inner.sender.clone()));
267            }
268            DetachedLogWatcher::Reloading((_old_path, sender)) => {
269                *self = DetachedLogWatcher::Reloading((path, sender.clone()));
270            }
271            DetachedLogWatcher::Closed => {}
272        }
273    }
274}
275
276impl LogBufReader {
277    async fn read_next(&mut self) -> Result<usize, std::io::Error> {
278        let mut buffer: Vec<u8> = Vec::new();
279        let result: Result<usize, std::io::Error> = self.file.read_to_end(&mut buffer).await;
280
281        match result {
282            Ok(size) if size > 0 => match self.sender.try_send(buffer) {
283                Ok(_) => {
284                    self.last_ctime = get_c_time(&self.path).await?;
285                    Ok(size)
286                }
287                Err(_) => Err(std::io::Error::new(
288                    std::io::ErrorKind::NotConnected,
289                    "failed to send to channel",
290                )),
291            },
292            Ok(size) => Ok(size),
293            Err(err) => match err.kind() {
294                std::io::ErrorKind::UnexpectedEof => Ok(0),
295                _ => Err(err),
296            },
297        }
298    }
299
300    async fn skip_file(&mut self) -> Result<(), std::io::Error> {
301        self.file.seek(SeekFrom::End(0)).await?;
302        Ok(())
303    }
304}
305
306#[cfg(windows)]
307async fn get_c_time(path: &Path) -> Result<u64, std::io::Error> {
308    use std::os::windows::prelude::MetadataExt;
309
310    let meta = tokio::fs::metadata(path).await?;
311    Ok(meta.last_write_time())
312}
313
314#[cfg(unix)]
315async fn get_c_time(path: &Path) -> Result<u64, std::io::Error> {
316    use std::os::unix::prelude::MetadataExt;
317
318    let meta = tokio::fs::metadata(path).await?;
319    Ok(meta.ctime() as u64)
320}