endpoint_libs/libs/
log_reader.rs1use core::str::FromStr;
2use eyre::ContextCompat;
3use lazy_static::lazy_static;
4use regex::Regex;
5use rev_lines::RevLines;
6use tracing::warn;
7
8#[derive(Debug, Clone)]
11pub struct LogEntry {
12 pub datetime: i64,
13 pub level: String,
14 pub thread: String,
15 pub path: String,
16 pub line_number: usize,
17 pub message: String,
18}
19
20lazy_static! {
21 static ref CONTROL_SEQUENCE_PATTERN: Regex = Regex::new("\x1b\\[[0-9;]*m").unwrap();
22}
23
24impl FromStr for LogEntry {
25 type Err = eyre::Error;
26 fn from_str(s: &str) -> Result<Self, Self::Err> {
27 let no_control_sequence = CONTROL_SEQUENCE_PATTERN.replace_all(s, "");
28 let mut split = no_control_sequence.split_whitespace();
29 let datetime = split
30 .next()
31 .and_then(|x| chrono::DateTime::parse_from_rfc3339(x).ok())
32 .context("no datetime")?;
33 let level: tracing::Level = split.next().and_then(|x| x.parse().ok()).context("no level")?;
34 let thread = split.next().context("no thread")?;
35 let path = split.next().map(|x| x[..x.len() - 1].to_string()).context("no path")?;
36
37 let line_number = split.next().and_then(|x| x[..x.len() - 1].parse().ok()).unwrap_or(0);
38 let message = split.collect::<Vec<&str>>().join(" ");
39 Ok(LogEntry {
40 datetime: datetime.timestamp_millis(),
41 level: level.to_string(),
42 thread: thread.to_string(),
43 path,
44 line_number,
45 message,
46 })
47 }
48}
49
50pub async fn get_log_entries(path: impl AsRef<std::path::Path>, limit: usize) -> eyre::Result<Vec<LogEntry>> {
51 let file = std::fs::File::open(path.as_ref())?;
53 let lines = RevLines::new(file);
54 let mut entries = tokio::task::spawn_blocking(move || {
56 let mut entries = vec![];
57 for line in lines {
58 if entries.len() >= limit {
59 break;
60 }
61 let line = match line {
62 Ok(line) => line,
63 Err(error) => {
64 warn!("Error reading line: {:?}", error);
65 entries.push(LogEntry {
66 datetime: 0,
67 level: "".to_string(),
68 thread: "".to_string(),
69 path: "".to_string(),
70 line_number: 0,
71 message: error.to_string(),
72 });
73 break;
74 }
75 };
76 let entry = LogEntry::from_str(&line);
77 match entry {
78 Ok(entry) => entries.push(entry),
79 Err(_) => entries.push(LogEntry {
80 datetime: 0,
81 level: "".to_string(),
82 thread: "".to_string(),
83 path: "".to_string(),
84 line_number: 0,
85 message: line,
86 }),
87 }
88 }
89 entries
90 })
91 .await?;
92 if entries.is_empty() {
93 entries.push(LogEntry {
94 datetime: 0,
95 level: "".to_string(),
96 thread: "".to_string(),
97 path: "".to_string(),
98 line_number: 0,
99 message: format!("No entries found in {}", path.as_ref().display()),
100 });
101 }
102 Ok(entries)
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108 #[test]
109 fn log_entry_from_line() {
110 let line = "2024-05-18T14:26:36.709390Z WARN main trading_be: 290: terminated diff 0_table_limit_1 initially";
111 let entry = LogEntry::from_str(line).unwrap();
112 assert_eq!(entry.level, "WARN");
114 assert_eq!(entry.thread, "main");
115 assert_eq!(entry.path, "trading_be");
116 assert_eq!(entry.line_number, 290);
117 assert_eq!(entry.message, "terminated diff 0_table_limit_1 initially");
118 }
119 #[test]
120 fn log_entry_from_line_control_sequence() {
121 let line = "\u{1b}[2m2024-06-07T12:25:06.735143Z\u{1b}[0m \u{1b}[32m INFO\u{1b}[0m main \u{1b}[2mtrading_be::strategy::data_factory\u{1b}[0m\u{1b}[2m:\u{1b}[0m \u{1b}[2m110:\u{1b}[0m terminated diff 0_table_limit_1 initially";
122 let entry = LogEntry::from_str(line).unwrap();
123 assert_eq!(entry.level, "INFO");
125 assert_eq!(entry.thread, "main");
126 assert_eq!(entry.path, "trading_be::strategy::data_factory");
127 assert_eq!(entry.line_number, 110);
128 assert_eq!(entry.message, "terminated diff 0_table_limit_1 initially");
129 }
130
131 #[tokio::test]
132 async fn test_get_log_entries() {
133 use std::io::Write;
134 use tempfile::NamedTempFile;
135
136 let mut temp_file = NamedTempFile::new().unwrap();
137 writeln!(temp_file, "2025-03-19T12:34:56Z INFO thread-1 src/main.rs:42 Application started").unwrap();
138 writeln!(temp_file, "2025-03-19T12:35:01Z ERROR thread-2 src/lib.rs:128 Failed to connect").unwrap();
139
140 let entries = get_log_entries(temp_file.path(), 10).await.unwrap();
141 assert_eq!(entries.len(), 2);
142 assert_eq!(entries[0].level, "ERROR");
143 assert_eq!(entries[1].level, "INFO");
144 }
145}