1use std::fmt::Display;
2use std::io::SeekFrom;
3use std::pin::Pin;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use moka::future::Cache;
8use tokio::fs::{self, File};
9use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader};
10use tokio::sync::OnceCell;
11
12static CACHE_TASK_SEEK: OnceCell<Cache<String, u64>> = OnceCell::const_new();
13
14#[async_trait]
15pub trait FileReadExt: Display {
16 async fn get_cache_handle() -> &'static Cache<String, u64> {
17 CACHE_TASK_SEEK
18 .get_or_init(|| async { Cache::new(u16::MAX as u64) })
19 .await
20 }
21
22 fn is_line_match_all_patterns(line: &str, patterns: &Vec<&str>) -> bool {
25 for &pattern in patterns {
26 if !line.contains(pattern) {
27 return false;
28 }
29 }
30 true
31 }
32
33 async fn blocking_read(
34 &self,
35 patterns: &Vec<&str>,
36 matched_lines: &mut Vec<String>,
37 ) -> anyhow::Result<()> {
38 let filepath = format!("{self}");
39 let create_time = fs::metadata(&filepath).await?.created()?;
40 let cache_key = format!("{filepath}.{create_time:?}");
41 let cache = Self::get_cache_handle().await;
42 let last_offset = cache.get(&cache_key).unwrap_or(0);
43
44 tracing::debug!("begin read {filepath} created at {create_time:?} at offset {last_offset}");
45 let mut buffer_reader = BufReader::new(File::open(filepath).await?);
46 let _ = buffer_reader.seek(SeekFrom::Start(last_offset)).await;
47 let pin_buffer_reader = Pin::new(&mut buffer_reader);
48
49 let mut lines = pin_buffer_reader.lines();
50 while let Ok(Some(line)) = lines.next_line().await {
51 if Self::is_line_match_all_patterns(&line, patterns) {
52 matched_lines.push(line);
53 }
54 }
55 let _ = buffer_reader.seek(SeekFrom::End(0)).await;
56 let seek = buffer_reader.stream_position().await?;
57 tracing::debug!("cache:{cache_key} with offset: {seek}");
58 cache.insert(cache_key, seek).await;
59 Ok(())
60 }
61
62 async fn blocking_read_with_time_limit(
63 &self,
64 patterns: &Vec<&str>,
65 time_limit: Duration,
66 ) -> anyhow::Result<Vec<String>> {
67 let mut matched_lines = vec![];
68 let _ = tokio::time::timeout(
69 time_limit,
70 Self::blocking_read(self, patterns, &mut matched_lines),
71 )
72 .await;
73 Ok(matched_lines)
74 }
75}
76
77impl FileReadExt for String {}
78
79impl FileReadExt for &str {}