1use crate::entry_metadata::labels::LogEntryLabels;
15use crate::{Result, Severity};
16use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
17use nom::{
18 Err as NomErr, IResult, Parser,
19 branch::alt,
20 bytes::complete::tag,
21 character::complete::{char, digit1, space1},
22 combinator::{map_res, recognize},
23 error::{Error as NomError, ErrorKind},
24 sequence::delimited,
25};
26use regex::Regex;
27use std::fs::File;
28use std::io::{BufRead, BufReader};
29use std::path::Path;
30use std::sync::LazyLock;
31
32static ANSI_ESCAPE_RE: LazyLock<Regex> =
33 LazyLock::new(|| Regex::new(r"\x1b\[[0-9;]*m").expect("Invalid ANSI regex"));
34
35#[inline]
36fn strip_ansi_codes(input: &str) -> std::borrow::Cow<'_, str> {
37 ANSI_ESCAPE_RE.replace_all(input, "")
38}
39
40const INITIAL_ENTRIES_CAPACITY: usize = 16384;
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47pub struct ParsedLogEntry {
48 pub sequence_id: usize,
49 pub explicit_id: Option<i64>,
50 pub timestamp: DateTime<Utc>,
51 pub severity: Severity,
52 pub process_id: String,
53 pub message: String,
54 pub message_lowercased: String,
55 pub subsystem_id: Option<i16>,
56 pub labels: LogEntryLabels,
57 pub resolution_or_discussion_url_id: Option<i16>,
58 pub doc_url_id: Option<i16>,
59}
60
61impl ParsedLogEntry {
62 #[inline]
64 pub fn is_multiline(&self) -> bool {
65 self.message.contains('\n')
66 }
67
68 #[inline]
69 fn is_continuation_of(&self, other: &ParsedLogEntry) -> bool {
70 self.timestamp == other.timestamp
71 && self.severity == other.severity
72 && self.process_id == other.process_id
73 }
74
75 #[inline]
76 fn append_continuation(&mut self, content: &str) {
77 self.message.reserve(1 + content.len());
78 self.message.push('\n');
79 self.message.push_str(content);
80
81 self.message_lowercased.reserve(1 + content.len());
82 self.message_lowercased.push('\n');
83 self.message_lowercased.push_str(&content.to_lowercase());
84 }
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct ParseResult {
89 pub entries: Vec<ParsedLogEntry>,
90 pub total_lines: usize,
91}
92
93fn process_new_entry(
94 entries: &mut Vec<ParsedLogEntry>,
95 current_entry: &mut Option<ParsedLogEntry>,
96 new_entry: ParsedLogEntry,
97) {
98 let is_continuation = current_entry
99 .as_ref()
100 .is_some_and(|prev| new_entry.is_continuation_of(prev));
101
102 match (is_continuation, current_entry.as_mut()) {
103 (true, Some(prev_entry)) => {
104 prev_entry.append_continuation(&new_entry.message);
105 }
106 (false, _) => {
107 if let Some(prev_entry) = current_entry.take() {
108 entries.push(prev_entry);
109 }
110 *current_entry = Some(new_entry);
111 }
112 (true, None) => {}
113 }
114}
115
116fn process_continuation_line(
117 current_entry: &mut Option<ParsedLogEntry>,
118 line: &str,
119 line_number: usize,
120) {
121 if let Some(entry) = current_entry {
122 entry.append_continuation(line.trim_end());
123 } else {
124 log::warn!("Orphaned continuation line {}: {}", line_number + 1, line);
125 }
126}
127
128pub fn parse_log_file<R: BufRead>(reader: R) -> Result<ParseResult> {
129 let mut entries = Vec::with_capacity(INITIAL_ENTRIES_CAPACITY);
130 let mut current_entry: Option<ParsedLogEntry> = None;
131 let mut total_lines = 0;
132
133 for (line_number, line_result) in reader.lines().enumerate() {
134 total_lines = line_number + 1;
135 let line = line_result.map_err(|e| crate::Error::ReadLine {
136 line: total_lines,
137 source: e,
138 })?;
139
140 let stripped_line = strip_ansi_codes(&line);
141 match parse_log_entry(&stripped_line) {
142 Ok((_, entry)) => process_new_entry(&mut entries, &mut current_entry, entry),
143 Err(_) => process_continuation_line(&mut current_entry, &stripped_line, line_number),
144 }
145 }
146
147 if let Some(entry) = current_entry {
148 entries.push(entry);
149 }
150
151 for (i, entry) in entries.iter_mut().enumerate() {
152 entry.sequence_id = i;
153 }
154
155 Ok(ParseResult {
156 entries,
157 total_lines,
158 })
159}
160
161pub fn count_log_lines(path: &Path) -> Result<usize> {
162 let file = File::open(path)?;
163 let reader = BufReader::new(file);
164 Ok(reader.lines().count())
165}
166
167fn parse_log_entry(input: &str) -> IResult<&str, ParsedLogEntry> {
170 alt((parse_standard_log_entry, parse_sasl_report_header)).parse(input)
171}
172
173fn parse_standard_log_entry(input: &str) -> IResult<&str, ParsedLogEntry> {
174 let (input, timestamp) = parse_timestamp(input)?;
175 let (input, _) = space1.parse(input)?;
176 let (input, severity) = parse_severity(input)?;
177 let (input, _) = space1.parse(input)?;
178 let (input, process_id) = parse_process_id(input)?;
179 let (input, _) = char(' ').parse(input)?;
180 let trimmed_message = input.trim_end();
181 let message = trimmed_message.to_string();
182 let message_lowercased = trimmed_message.to_lowercase();
183
184 Ok((
185 "",
186 ParsedLogEntry {
187 sequence_id: 0,
188 explicit_id: None,
189 timestamp,
190 severity,
191 process_id,
192 message,
193 message_lowercased,
194 subsystem_id: None,
195 labels: LogEntryLabels::default(),
196 resolution_or_discussion_url_id: None,
197 doc_url_id: None,
198 },
199 ))
200}
201
202fn parse_sasl_report_header(input: &str) -> IResult<&str, ParsedLogEntry> {
205 let (input, _) = char('=').parse(input)?;
206 let (input, severity) = parse_sasl_severity(input)?;
207 let (input, _) = tag(" REPORT==== ").parse(input)?;
208 let (input, timestamp) = parse_sasl_timestamp(input)?;
209 let (_, _) = tag(" ===").parse(input)?;
210
211 Ok((
212 "",
213 ParsedLogEntry {
214 sequence_id: 0,
215 explicit_id: None,
216 timestamp,
217 severity,
218 process_id: "<0.0.0>".to_string(),
219 message: String::new(),
220 message_lowercased: String::new(),
221 subsystem_id: None,
222 labels: LogEntryLabels::default(),
223 resolution_or_discussion_url_id: None,
224 doc_url_id: None,
225 },
226 ))
227}
228
229fn parse_sasl_severity(input: &str) -> IResult<&str, Severity> {
230 let (input, severity_str) = alt((
231 tag("DEBUG"),
232 tag("INFO"),
233 tag("NOTICE"),
234 tag("WARNING"),
235 tag("ERROR"),
236 tag("CRITICAL"),
237 ))
238 .parse(input)?;
239
240 let severity = match severity_str {
241 "DEBUG" => Severity::Debug,
242 "INFO" => Severity::Info,
243 "NOTICE" => Severity::Notice,
244 "WARNING" => Severity::Warning,
245 "ERROR" => Severity::Error,
246 "CRITICAL" => Severity::Critical,
247 _ => return Err(NomErr::Error(NomError::new(input, ErrorKind::Tag))),
248 };
249
250 Ok((input, severity))
251}
252
253fn month_name_to_number(name: &str) -> Option<u32> {
254 match name {
255 "Jan" => Some(1),
256 "Feb" => Some(2),
257 "Mar" => Some(3),
258 "Apr" => Some(4),
259 "May" => Some(5),
260 "Jun" => Some(6),
261 "Jul" => Some(7),
262 "Aug" => Some(8),
263 "Sep" => Some(9),
264 "Oct" => Some(10),
265 "Nov" => Some(11),
266 "Dec" => Some(12),
267 _ => None,
268 }
269}
270
271fn parse_sasl_timestamp(input: &str) -> IResult<&str, DateTime<Utc>> {
273 let (input, day) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
274 let (input, _) = char('-').parse(input)?;
275 let (input, month_str) = alt((
276 tag("Jan"),
277 tag("Feb"),
278 tag("Mar"),
279 tag("Apr"),
280 tag("May"),
281 tag("Jun"),
282 tag("Jul"),
283 tag("Aug"),
284 tag("Sep"),
285 tag("Oct"),
286 tag("Nov"),
287 tag("Dec"),
288 ))
289 .parse(input)?;
290 let (input, _) = char('-').parse(input)?;
291 let (input, year) = map_res(digit1, |s: &str| s.parse::<i32>()).parse(input)?;
292 let (input, _) = tag("::").parse(input)?;
293 let (input, hour) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
294 let (input, _) = char(':').parse(input)?;
295 let (input, minute) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
296 let (input, _) = char(':').parse(input)?;
297 let (input, second) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
298 let (input, _) = char('.').parse(input)?;
299 let (input, microseconds) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
300
301 let month = month_name_to_number(month_str)
302 .ok_or_else(|| NomErr::Error(NomError::new(input, ErrorKind::Tag)))?;
303
304 let datetime = build_datetime((year, month, day), (hour, minute, second, microseconds), 0)
305 .map_err(|_| NomErr::Error(NomError::new(input, ErrorKind::Verify)))?;
306
307 Ok((input, datetime))
308}
309
310#[inline]
311fn parse_date(input: &str) -> IResult<&str, (i32, u32, u32)> {
312 let (input, year) = map_res(digit1, |s: &str| s.parse::<i32>()).parse(input)?;
313 let (input, _) = char('-').parse(input)?;
314 let (input, month) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
315 let (input, _) = char('-').parse(input)?;
316 let (input, day) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
317 Ok((input, (year, month, day)))
318}
319
320#[inline]
321fn parse_time(input: &str) -> IResult<&str, (u32, u32, u32, u32)> {
322 let (input, hour) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
323 let (input, _) = char(':').parse(input)?;
324 let (input, minute) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
325 let (input, _) = char(':').parse(input)?;
326 let (input, second) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
327 let (input, _) = char('.').parse(input)?;
328 let (input, microseconds) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
329 Ok((input, (hour, minute, second, microseconds)))
330}
331
332#[inline]
333fn parse_timezone(input: &str) -> IResult<&str, i32> {
334 let (input, tz_sign) = alt((char('+'), char('-'))).parse(input)?;
335 let (input, tz_hour) = map_res(digit1, |s: &str| s.parse::<i32>()).parse(input)?;
336 let (input, _) = char(':').parse(input)?;
337 let (input, tz_minute) = map_res(digit1, |s: &str| s.parse::<i32>()).parse(input)?;
338 let tz_offset_seconds = (tz_hour * 3600 + tz_minute * 60) * if tz_sign == '-' { -1 } else { 1 };
339 Ok((input, tz_offset_seconds))
340}
341
342fn nom_verify_error() -> NomErr<NomError<&'static str>> {
343 NomErr::Error(NomError::new("", ErrorKind::Verify))
344}
345
346#[inline]
347fn build_datetime(
348 date: (i32, u32, u32),
349 time: (u32, u32, u32, u32),
350 tz_offset_seconds: i32,
351) -> Result<DateTime<Utc>, NomErr<NomError<&'static str>>> {
352 let (year, month, day) = date;
353 let (hour, minute, second, microseconds) = time;
354
355 let naive_date = NaiveDate::from_ymd_opt(year, month, day).ok_or_else(nom_verify_error)?;
356
357 let naive_time = NaiveTime::from_hms_micro_opt(hour, minute, second, microseconds)
358 .ok_or_else(nom_verify_error)?;
359
360 let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
361
362 let offset = FixedOffset::east_opt(tz_offset_seconds).ok_or_else(nom_verify_error)?;
363
364 let dt = offset
365 .from_local_datetime(&naive_datetime)
366 .single()
367 .ok_or_else(nom_verify_error)?;
368
369 Ok(dt.to_utc())
370}
371
372#[inline]
373fn parse_timestamp(input: &str) -> IResult<&str, DateTime<Utc>> {
374 let (input, date) = parse_date(input)?;
375 let (input, _) = space1.parse(input)?;
376 let (input, time) = parse_time(input)?;
377 let (input, tz_offset) = parse_timezone(input)?;
378 let datetime = build_datetime(date, time, tz_offset)
379 .map_err(|_| NomErr::Error(NomError::new(input, ErrorKind::Verify)))?;
380 Ok((input, datetime))
381}
382
383fn parse_severity(input: &str) -> IResult<&str, Severity> {
386 let (input, severity_str) = delimited(
387 char('['),
388 alt((
389 tag("debug"),
390 tag("info"),
391 tag("notice"),
392 tag("warning"),
393 tag("error"),
394 tag("critical"),
395 )),
396 char(']'),
397 )
398 .parse(input)?;
399
400 let severity = severity_str
401 .parse::<Severity>()
402 .map_err(|_| NomErr::Error(NomError::new(input, ErrorKind::Verify)))?;
403
404 Ok((input, severity))
405}
406
407fn parse_process_id(input: &str) -> IResult<&str, String> {
410 let (input, pid) = recognize(delimited(
411 char('<'),
412 (digit1, char('.'), digit1, char('.'), digit1),
413 char('>'),
414 ))
415 .parse(input)?;
416
417 Ok((input, pid.to_string()))
418}