endpoint_libs/libs/
log_reader.rs

1use core::str::FromStr;
2use eyre::ContextCompat;
3use lazy_static::lazy_static;
4use regex::Regex;
5use rev_lines::RevLines;
6use tracing::warn;
7
8// Define a struct to hold the parts of the log entry
9
10#[derive(Debug, Clone)]
11pub struct LogEntry {
12    pub datetime: i64,
13    pub level: String,
14    pub thread: String,
15    pub path: String,
16    pub line_number: usize,
17    pub message: String,
18}
19
20lazy_static! {
21    static ref CONTROL_SEQUENCE_PATTERN: Regex = Regex::new("\x1b\\[[0-9;]*m").unwrap();
22}
23
24impl FromStr for LogEntry {
25    type Err = eyre::Error;
26    fn from_str(s: &str) -> Result<Self, Self::Err> {
27        let no_control_sequence = CONTROL_SEQUENCE_PATTERN.replace_all(s, "");
28        let mut split = no_control_sequence.split_whitespace();
29        let datetime = split
30            .next()
31            .and_then(|x| chrono::DateTime::parse_from_rfc3339(x).ok())
32            .context("no datetime")?;
33        let level: tracing::Level = split.next().and_then(|x| x.parse().ok()).context("no level")?;
34        let thread = split.next().context("no thread")?;
35        let path = split.next().map(|x| x[..x.len() - 1].to_string()).context("no path")?;
36
37        let line_number = split.next().and_then(|x| x[..x.len() - 1].parse().ok()).unwrap_or(0);
38        let message = split.collect::<Vec<&str>>().join(" ");
39        Ok(LogEntry {
40            datetime: datetime.timestamp_millis(),
41            level: level.to_string(),
42            thread: thread.to_string(),
43            path,
44            line_number,
45            message,
46        })
47    }
48}
49
50pub async fn get_log_entries(path: impl AsRef<std::path::Path>, limit: usize) -> eyre::Result<Vec<LogEntry>> {
51    // Specify the path to your log file
52    let file = std::fs::File::open(path.as_ref())?;
53    let lines = RevLines::new(file);
54    // get all entries first
55    let mut entries = tokio::task::spawn_blocking(move || {
56        let mut entries = vec![];
57        for line in lines {
58            if entries.len() >= limit {
59                break;
60            }
61            let line = match line {
62                Ok(line) => line,
63                Err(error) => {
64                    warn!("Error reading line: {:?}", error);
65                    entries.push(LogEntry {
66                        datetime: 0,
67                        level: "".to_string(),
68                        thread: "".to_string(),
69                        path: "".to_string(),
70                        line_number: 0,
71                        message: error.to_string(),
72                    });
73                    break;
74                }
75            };
76            let entry = LogEntry::from_str(&line);
77            match entry {
78                Ok(entry) => entries.push(entry),
79                Err(_) => entries.push(LogEntry {
80                    datetime: 0,
81                    level: "".to_string(),
82                    thread: "".to_string(),
83                    path: "".to_string(),
84                    line_number: 0,
85                    message: line,
86                }),
87            }
88        }
89        entries
90    })
91    .await?;
92    if entries.is_empty() {
93        entries.push(LogEntry {
94            datetime: 0,
95            level: "".to_string(),
96            thread: "".to_string(),
97            path: "".to_string(),
98            line_number: 0,
99            message: format!("No entries found in {}", path.as_ref().display()),
100        });
101    }
102    Ok(entries)
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108    #[test]
109    fn log_entry_from_line() {
110        let line = "2024-05-18T14:26:36.709390Z  WARN                 main trading_be: 290: terminated diff 0_table_limit_1 initially";
111        let entry = LogEntry::from_str(line).unwrap();
112        // assert_eq!(entry.datetime, "2024-02-09 18:10:46");
113        assert_eq!(entry.level, "WARN");
114        assert_eq!(entry.thread, "main");
115        assert_eq!(entry.path, "trading_be");
116        assert_eq!(entry.line_number, 290);
117        assert_eq!(entry.message, "terminated diff 0_table_limit_1 initially");
118    }
119    #[test]
120    fn log_entry_from_line_control_sequence() {
121        let line = "\u{1b}[2m2024-06-07T12:25:06.735143Z\u{1b}[0m \u{1b}[32m INFO\u{1b}[0m main \u{1b}[2mtrading_be::strategy::data_factory\u{1b}[0m\u{1b}[2m:\u{1b}[0m \u{1b}[2m110:\u{1b}[0m terminated diff 0_table_limit_1 initially";
122        let entry = LogEntry::from_str(line).unwrap();
123        // assert_eq!(entry.datetime, "2024-02-09 18:10:46");
124        assert_eq!(entry.level, "INFO");
125        assert_eq!(entry.thread, "main");
126        assert_eq!(entry.path, "trading_be::strategy::data_factory");
127        assert_eq!(entry.line_number, 110);
128        assert_eq!(entry.message, "terminated diff 0_table_limit_1 initially");
129    }
130
131    #[tokio::test]
132    async fn test_get_log_entries() {
133        use std::io::Write;
134        use tempfile::NamedTempFile;
135
136        let mut temp_file = NamedTempFile::new().unwrap();
137        writeln!(temp_file, "2025-03-19T12:34:56Z INFO thread-1 src/main.rs:42 Application started").unwrap();
138        writeln!(temp_file, "2025-03-19T12:35:01Z ERROR thread-2 src/lib.rs:128 Failed to connect").unwrap();
139
140        let entries = get_log_entries(temp_file.path(), 10).await.unwrap();
141        assert_eq!(entries.len(), 2);
142        assert_eq!(entries[0].level, "ERROR");
143        assert_eq!(entries[1].level, "INFO");
144    }
145}