1use crate::entry_metadata::labels::LogEntryLabels;
15use crate::{Result, Severity};
16use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
17use nom::{
18 Err as NomErr, IResult,
19 branch::alt,
20 bytes::complete::tag,
21 character::complete::{char, digit1, space1},
22 combinator::{map_res, recognize},
23 error::{Error as NomError, ErrorKind},
24 sequence::{delimited, tuple},
25};
26use std::fs::File;
27use std::io::{BufRead, BufReader};
28use std::path::Path;
29
30const INITIAL_ENTRIES_CAPACITY: usize = 16384;
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct ParsedLogEntry {
38 pub sequence_id: usize,
39 pub explicit_id: Option<i64>,
40 pub timestamp: DateTime<Utc>,
41 pub severity: Severity,
42 pub process_id: String,
43 pub message: String,
44 pub message_lowercased: String,
45 pub subsystem_id: Option<i16>,
46 pub labels: LogEntryLabels,
47 pub resolution_or_discussion_url_id: Option<i16>,
48 pub doc_url_id: Option<i16>,
49}
50
51impl ParsedLogEntry {
52 #[inline]
54 pub fn is_multiline(&self) -> bool {
55 self.message.contains('\n')
56 }
57
58 #[inline]
59 fn is_continuation_of(&self, other: &ParsedLogEntry) -> bool {
60 self.timestamp == other.timestamp
61 && self.severity == other.severity
62 && self.process_id == other.process_id
63 }
64
65 #[inline]
66 fn append_continuation(&mut self, content: &str) {
67 self.message.reserve(1 + content.len());
68 self.message.push('\n');
69 self.message.push_str(content);
70
71 self.message_lowercased.push('\n');
72 for c in content.chars() {
73 self.message_lowercased.extend(c.to_lowercase());
74 }
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct ParseResult {
80 pub entries: Vec<ParsedLogEntry>,
81 pub total_lines: usize,
82}
83
84fn process_new_entry(
85 entries: &mut Vec<ParsedLogEntry>,
86 current_entry: &mut Option<ParsedLogEntry>,
87 new_entry: ParsedLogEntry,
88) {
89 let is_continuation = current_entry
90 .as_ref()
91 .is_some_and(|prev| new_entry.is_continuation_of(prev));
92
93 match (is_continuation, current_entry.as_mut()) {
94 (true, Some(prev_entry)) => {
95 prev_entry.append_continuation(&new_entry.message);
96 }
97 (false, _) => {
98 if let Some(prev_entry) = current_entry.take() {
99 entries.push(prev_entry);
100 }
101 *current_entry = Some(new_entry);
102 }
103 (true, None) => {}
104 }
105}
106
107fn process_continuation_line(
108 current_entry: &mut Option<ParsedLogEntry>,
109 line: &str,
110 line_number: usize,
111) {
112 if let Some(entry) = current_entry {
113 entry.append_continuation(line.trim_end());
114 } else {
115 log::warn!("Orphaned continuation line {}: {}", line_number + 1, line);
116 }
117}
118
119pub fn parse_log_file<R: BufRead>(reader: R) -> Result<ParseResult> {
120 let mut entries = Vec::with_capacity(INITIAL_ENTRIES_CAPACITY);
121 let mut current_entry: Option<ParsedLogEntry> = None;
122 let mut total_lines = 0;
123
124 for (line_number, line_result) in reader.lines().enumerate() {
125 total_lines = line_number + 1;
126 let line = line_result.map_err(|e| crate::Error::ReadLine {
127 line: total_lines,
128 source: e,
129 })?;
130
131 match parse_log_entry(&line) {
132 Ok((_, entry)) => process_new_entry(&mut entries, &mut current_entry, entry),
133 Err(_) => process_continuation_line(&mut current_entry, &line, line_number),
134 }
135 }
136
137 if let Some(entry) = current_entry {
138 entries.push(entry);
139 }
140
141 for (i, entry) in entries.iter_mut().enumerate() {
142 entry.sequence_id = i;
143 }
144
145 Ok(ParseResult {
146 entries,
147 total_lines,
148 })
149}
150
151pub fn count_log_lines(path: &Path) -> Result<usize> {
152 let file = File::open(path)?;
153 let reader = BufReader::new(file);
154 Ok(reader.lines().count())
155}
156
157fn parse_log_entry(input: &str) -> IResult<&str, ParsedLogEntry> {
160 let (input, timestamp) = parse_timestamp(input)?;
161 let (input, _) = space1(input)?;
162 let (input, severity) = parse_severity(input)?;
163 let (input, _) = space1(input)?;
164 let (input, process_id) = parse_process_id(input)?;
165 let (input, _) = char(' ')(input)?;
166 let trimmed_message = input.trim_end();
167 let message = trimmed_message.to_string();
168 let message_lowercased = trimmed_message.to_lowercase();
169
170 Ok((
171 "",
172 ParsedLogEntry {
173 sequence_id: 0,
174 explicit_id: None,
175 timestamp,
176 severity,
177 process_id,
178 message,
179 message_lowercased,
180 subsystem_id: None,
181 labels: LogEntryLabels::default(),
182 resolution_or_discussion_url_id: None,
183 doc_url_id: None,
184 },
185 ))
186}
187
188fn parse_date(input: &str) -> IResult<&str, (i32, u32, u32)> {
189 let (input, year) = map_res(digit1, |s: &str| s.parse::<i32>())(input)?;
190 let (input, _) = char('-')(input)?;
191 let (input, month) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
192 let (input, _) = char('-')(input)?;
193 let (input, day) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
194 Ok((input, (year, month, day)))
195}
196
197fn parse_time(input: &str) -> IResult<&str, (u32, u32, u32, u32)> {
198 let (input, hour) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
199 let (input, _) = char(':')(input)?;
200 let (input, minute) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
201 let (input, _) = char(':')(input)?;
202 let (input, second) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
203 let (input, _) = char('.')(input)?;
204 let (input, microseconds) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
205 Ok((input, (hour, minute, second, microseconds)))
206}
207
208fn parse_timezone(input: &str) -> IResult<&str, i32> {
209 let (input, tz_sign) = alt((char('+'), char('-')))(input)?;
210 let (input, tz_hour) = map_res(digit1, |s: &str| s.parse::<i32>())(input)?;
211 let (input, _) = char(':')(input)?;
212 let (input, tz_minute) = map_res(digit1, |s: &str| s.parse::<i32>())(input)?;
213 let tz_offset_seconds = (tz_hour * 3600 + tz_minute * 60) * if tz_sign == '-' { -1 } else { 1 };
214 Ok((input, tz_offset_seconds))
215}
216
217fn nom_verify_error() -> NomErr<NomError<&'static str>> {
218 NomErr::Error(NomError::new("", ErrorKind::Verify))
219}
220
221fn build_datetime(
222 date: (i32, u32, u32),
223 time: (u32, u32, u32, u32),
224 tz_offset_seconds: i32,
225) -> Result<DateTime<Utc>, NomErr<NomError<&'static str>>> {
226 let (year, month, day) = date;
227 let (hour, minute, second, microseconds) = time;
228
229 let naive_date = NaiveDate::from_ymd_opt(year, month, day).ok_or_else(nom_verify_error)?;
230
231 let naive_time = NaiveTime::from_hms_micro_opt(hour, minute, second, microseconds)
232 .ok_or_else(nom_verify_error)?;
233
234 let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
235
236 let offset = FixedOffset::east_opt(tz_offset_seconds).ok_or_else(nom_verify_error)?;
237
238 let dt = offset
239 .from_local_datetime(&naive_datetime)
240 .single()
241 .ok_or_else(nom_verify_error)?;
242
243 Ok(dt.to_utc())
244}
245
246fn parse_timestamp(input: &str) -> IResult<&str, DateTime<Utc>> {
247 let (input, date) = parse_date(input)?;
248 let (input, _) = space1(input)?;
249 let (input, time) = parse_time(input)?;
250 let (input, tz_offset) = parse_timezone(input)?;
251 let datetime = build_datetime(date, time, tz_offset)
252 .map_err(|_| NomErr::Error(NomError::new(input, ErrorKind::Verify)))?;
253 Ok((input, datetime))
254}
255
256fn parse_severity(input: &str) -> IResult<&str, Severity> {
259 let (input, severity_str) = delimited(
260 char('['),
261 alt((
262 tag("debug"),
263 tag("info"),
264 tag("notice"),
265 tag("warning"),
266 tag("error"),
267 tag("critical"),
268 )),
269 char(']'),
270 )(input)?;
271
272 let severity = severity_str
273 .parse::<Severity>()
274 .map_err(|_| NomErr::Error(NomError::new(input, ErrorKind::Verify)))?;
275
276 Ok((input, severity))
277}
278
279fn parse_process_id(input: &str) -> IResult<&str, String> {
282 let (input, pid) = recognize(delimited(
283 char('<'),
284 tuple((digit1, char('.'), digit1, char('.'), digit1)),
285 char('>'),
286 ))(input)?;
287
288 Ok((input, pid.to_string()))
289}