blocking_reader/
file.rs

1use std::fmt::Display;
2use std::io::SeekFrom;
3use std::pin::Pin;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use moka::future::Cache;
8use tokio::fs::{self, File};
9use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader};
10use tokio::sync::OnceCell;
11
12static CACHE_TASK_SEEK: OnceCell<Cache<String, u64>> = OnceCell::const_new();
13
14#[async_trait]
15pub trait FileReadExt: Display {
16    async fn get_cache_handle() -> &'static Cache<String, u64> {
17        CACHE_TASK_SEEK
18            .get_or_init(|| async { Cache::new(u16::MAX as u64) })
19            .await
20    }
21
22    /// a line must include all patterns, and will get true
23    /// if patterns is empty, then will get true always
24    fn is_line_match_all_patterns(line: &str, patterns: &Vec<&str>) -> bool {
25        for &pattern in patterns {
26            if !line.contains(pattern) {
27                return false;
28            }
29        }
30        true
31    }
32
33    async fn blocking_read(
34        &self,
35        patterns: &Vec<&str>,
36        matched_lines: &mut Vec<String>,
37    ) -> anyhow::Result<()> {
38        let filepath = format!("{self}");
39        let create_time = fs::metadata(&filepath).await?.created()?;
40        let cache_key = format!("{filepath}.{create_time:?}");
41        let cache = Self::get_cache_handle().await;
42        let last_offset = cache.get(&cache_key).unwrap_or(0);
43
44        tracing::debug!("begin read {filepath} created at {create_time:?} at offset {last_offset}");
45        let mut buffer_reader = BufReader::new(File::open(filepath).await?);
46        let _ = buffer_reader.seek(SeekFrom::Start(last_offset)).await;
47        let pin_buffer_reader = Pin::new(&mut buffer_reader);
48
49        let mut lines = pin_buffer_reader.lines();
50        while let Ok(Some(line)) = lines.next_line().await {
51            if Self::is_line_match_all_patterns(&line, patterns) {
52                matched_lines.push(line);
53            }
54        }
55        let _ = buffer_reader.seek(SeekFrom::End(0)).await;
56        let seek = buffer_reader.stream_position().await?;
57        tracing::debug!("cache:{cache_key} with offset: {seek}");
58        cache.insert(cache_key, seek).await;
59        Ok(())
60    }
61
62    async fn blocking_read_with_time_limit(
63        &self,
64        patterns: &Vec<&str>,
65        time_limit: Duration,
66    ) -> anyhow::Result<Vec<String>> {
67        let mut matched_lines = vec![];
68        let _ = tokio::time::timeout(
69            time_limit,
70            Self::blocking_read(self, patterns, &mut matched_lines),
71        )
72        .await;
73        Ok(matched_lines)
74    }
75}
76
77impl FileReadExt for String {}
78
79impl FileReadExt for &str {}