1use std::{
2 error::Error,
3 future::Future,
4 io::SeekFrom,
5 path::{Path, PathBuf},
6 pin::Pin,
7 sync::Arc,
8 time::Duration,
9};
10
11use tokio::{
12 fs::File,
13 io::{AsyncReadExt, AsyncSeekExt, BufReader},
14 sync::mpsc::{
15 error::{SendError, TryRecvError},
16 Receiver, Sender,
17 },
18 time::sleep,
19};
20
21#[derive(Debug)]
22struct LogBufReader {
23 file: BufReader<File>,
24 sender: Arc<Sender<Vec<u8>>>,
25 path: PathBuf,
26 last_ctime: u64,
27}
28
29#[derive(Debug)]
30pub struct LogWatcher {
31 receiver: Receiver<Vec<u8>>,
32 sender: Arc<Sender<Vec<u8>>>,
33 path: PathBuf,
34 signal_tx: Sender<LogWatcherSignal>,
35 signal_rx: std::sync::Mutex<Option<Receiver<LogWatcherSignal>>>,
36}
37
38#[derive(Debug)]
39enum DetachedLogWatcher {
40 Initializing(LogBufReader),
41 Waiting(LogBufReader),
42 Reading(LogBufReader),
43 Missing(LogBufReader),
44 Reloading((PathBuf, Arc<Sender<Vec<u8>>>)),
45 Closed,
46}
47#[derive(Debug)]
48pub enum LogWatcherSignal {
49 Close,
50 Reload,
51 Swap(PathBuf),
52}
53
54type BoxedError = Box<dyn Error + 'static + Send + Sync>;
55type BoxedResult<T> = Result<T, BoxedError>;
56type SpawnFnResult = Pin<Box<dyn Future<Output = BoxedResult<()>> + Send + Sync>>;
57
58impl LogWatcher {
59 pub fn new(file_path: impl Into<PathBuf>) -> Self {
60 let (sender, receiver) = tokio::sync::mpsc::channel(256);
61 let (signal_tx, signal_rx) = tokio::sync::mpsc::channel(256);
62
63 Self {
64 receiver,
65 sender: Arc::new(sender),
66 path: file_path.into(),
67 signal_rx: Some(signal_rx).into(),
68 signal_tx,
69 }
70 }
71
72 pub async fn send_signal(
73 &self,
74 signal: LogWatcherSignal,
75 ) -> Result<(), SendError<LogWatcherSignal>> {
76 self.signal_tx.send(signal).await
77 }
78
79 pub async fn read_message(&mut self) -> Option<Vec<u8>> {
80 self.receiver.recv().await
81 }
82
83 pub fn try_read_message(&mut self) -> Result<Vec<u8>, TryRecvError> {
84 self.receiver.try_recv()
85 }
86
87 pub fn spawn(&self, skip_to_end: bool) -> SpawnFnResult {
88 let sender = self.sender.clone();
89 let path = self.path.clone();
90
91 let signal_rx = self.signal_rx.lock().unwrap().take();
92
93 if signal_rx.is_none() {
94 panic!("Log watcher spanwed twice {:?}", self);
95 };
96
97 let mut signal_rx = signal_rx.unwrap();
98
99 let future: SpawnFnResult = Box::pin(async move {
100 let file = File::open(&path).await?;
101 let mut detached = if skip_to_end {
102 DetachedLogWatcher::Initializing(LogBufReader {
103 file: BufReader::new(file),
104 sender,
105 path: path.clone(),
106 last_ctime: get_c_time(&path).await.unwrap(),
107 })
108 } else {
109 DetachedLogWatcher::Reading(LogBufReader {
110 file: BufReader::new(file),
111 sender,
112 path: path.clone(),
113 last_ctime: get_c_time(&path).await.unwrap(),
114 })
115 };
116
117 loop {
118 match signal_rx.try_recv() {
119 Ok(LogWatcherSignal::Close) => {
120 detached.close().await;
121 }
122 Ok(LogWatcherSignal::Reload) => {
123 detached.reload().await;
124 }
125 Ok(LogWatcherSignal::Swap(path)) => {
126 detached.swap(path).await;
127 }
128 Err(err) => {
129 if err == TryRecvError::Disconnected {
130 break;
131 }
132 }
133 }
134
135 match detached {
136 DetachedLogWatcher::Closed => {
137 break;
138 }
139 _ => {
140 detached = detached
141 .next()
142 .await
143 .expect("failed to move next on detached log watcher");
144 }
145 }
146 }
147
148 Ok(())
149 });
150 future
151 }
152}
153
154impl DetachedLogWatcher {
155 pub async fn next(self) -> Result<Self, std::io::Error> {
156 match self {
157 DetachedLogWatcher::Initializing(mut inner) => {
158 inner.skip_file().await?;
159 Ok(DetachedLogWatcher::Waiting(inner))
160 }
161 DetachedLogWatcher::Waiting(mut inner) => match inner.read_next().await {
162 Ok(size) if size > 4096 => Ok(DetachedLogWatcher::Reading(inner)),
163 Ok(size) => {
164 if size == 0 {
165 let curr_ctime = get_c_time(&inner.path).await?;
166
167 if curr_ctime > inner.last_ctime {
168 return Ok(DetachedLogWatcher::Missing(inner));
169 }
170
171 sleep(Duration::from_secs(1)).await;
172 Ok(DetachedLogWatcher::Waiting(inner))
173 } else {
174 sleep(Duration::from_secs(1)).await;
175 Ok(DetachedLogWatcher::Waiting(inner))
176 }
177 }
178 Err(err) => match err.kind() {
179 std::io::ErrorKind::NotFound => Ok(DetachedLogWatcher::Missing(inner)),
180 _ => Err(err),
181 },
182 },
183 DetachedLogWatcher::Reading(mut inner) => match inner.read_next().await {
184 Ok(size) if size < 4096 => Ok(DetachedLogWatcher::Waiting(inner)),
185 Ok(_) => Ok(DetachedLogWatcher::Reading(inner)),
186 Err(err) => match err.kind() {
187 std::io::ErrorKind::NotFound => Ok(DetachedLogWatcher::Missing(inner)),
188 _ => Err(err),
189 },
190 },
191 DetachedLogWatcher::Missing(inner) => {
192 inner.sender.try_send(inner.file.buffer().to_vec()).ok();
193 Ok(DetachedLogWatcher::Reloading((inner.path, inner.sender)))
194 }
195 DetachedLogWatcher::Reloading((path, sender)) => {
196 let file_exists = match tokio::fs::metadata(&path).await {
197 Ok(meta) => Ok(meta.is_file()),
198 Err(err) => match err.kind() {
199 std::io::ErrorKind::NotFound | std::io::ErrorKind::PermissionDenied => {
200 Ok(false)
201 }
202 _ => Err(err),
203 },
204 }?;
205
206 if file_exists {
207 let new_inner = LogBufReader {
208 file: BufReader::new(File::open(&path).await?),
209 path: path.clone(),
210 sender,
211 last_ctime: get_c_time(&path).await.unwrap(),
212 };
213
214 Ok(DetachedLogWatcher::Waiting(new_inner))
215 } else {
216 sleep(Duration::from_secs(1)).await;
217 Ok(DetachedLogWatcher::Reloading((path, sender)))
218 }
219 }
220 DetachedLogWatcher::Closed => Ok(DetachedLogWatcher::Closed),
221 }
222 }
223
224 pub async fn close(&mut self) {
225 match self {
226 DetachedLogWatcher::Initializing(inner)
227 | DetachedLogWatcher::Waiting(inner)
228 | DetachedLogWatcher::Reading(inner)
229 | DetachedLogWatcher::Missing(inner) => {
230 inner.read_next().await.ok();
231 *self = DetachedLogWatcher::Closed
232 }
233 DetachedLogWatcher::Reloading(_) => *self = DetachedLogWatcher::Closed,
234 DetachedLogWatcher::Closed => {}
235 }
236 }
237
238 pub async fn reload(&mut self) {
239 match self {
240 DetachedLogWatcher::Initializing(inner)
241 | DetachedLogWatcher::Waiting(inner)
242 | DetachedLogWatcher::Reading(inner)
243 | DetachedLogWatcher::Missing(inner) => {
244 let result = inner.read_next().await.unwrap_or(0);
245
246 if result == 0 {
247 inner.sender.try_send(inner.file.buffer().to_vec()).ok();
248 }
249 *self = DetachedLogWatcher::Reloading((inner.path.clone(), inner.sender.clone()));
250 }
251 DetachedLogWatcher::Reloading(_) | DetachedLogWatcher::Closed => {}
252 }
253 }
254
255 pub async fn swap(&mut self, path: PathBuf) {
256 match self {
257 DetachedLogWatcher::Initializing(inner)
258 | DetachedLogWatcher::Waiting(inner)
259 | DetachedLogWatcher::Reading(inner)
260 | DetachedLogWatcher::Missing(inner) => {
261 let result = inner.read_next().await.unwrap_or(0);
262
263 if result == 0 {
264 inner.sender.try_send(inner.file.buffer().to_vec()).ok();
265 }
266 *self = DetachedLogWatcher::Reloading((path, inner.sender.clone()));
267 }
268 DetachedLogWatcher::Reloading((_old_path, sender)) => {
269 *self = DetachedLogWatcher::Reloading((path, sender.clone()));
270 }
271 DetachedLogWatcher::Closed => {}
272 }
273 }
274}
275
276impl LogBufReader {
277 async fn read_next(&mut self) -> Result<usize, std::io::Error> {
278 let mut buffer: Vec<u8> = Vec::new();
279 let result: Result<usize, std::io::Error> = self.file.read_to_end(&mut buffer).await;
280
281 match result {
282 Ok(size) if size > 0 => match self.sender.try_send(buffer) {
283 Ok(_) => {
284 self.last_ctime = get_c_time(&self.path).await?;
285 Ok(size)
286 }
287 Err(_) => Err(std::io::Error::new(
288 std::io::ErrorKind::NotConnected,
289 "failed to send to channel",
290 )),
291 },
292 Ok(size) => Ok(size),
293 Err(err) => match err.kind() {
294 std::io::ErrorKind::UnexpectedEof => Ok(0),
295 _ => Err(err),
296 },
297 }
298 }
299
300 async fn skip_file(&mut self) -> Result<(), std::io::Error> {
301 self.file.seek(SeekFrom::End(0)).await?;
302 Ok(())
303 }
304}
305
306#[cfg(windows)]
307async fn get_c_time(path: &Path) -> Result<u64, std::io::Error> {
308 use std::os::windows::prelude::MetadataExt;
309
310 let meta = tokio::fs::metadata(path).await?;
311 Ok(meta.last_write_time())
312}
313
314#[cfg(unix)]
315async fn get_c_time(path: &Path) -> Result<u64, std::io::Error> {
316 use std::os::unix::prelude::MetadataExt;
317
318 let meta = tokio::fs::metadata(path).await?;
319 Ok(meta.ctime() as u64)
320}