Skip to main content

rlqt_lib/
parser.rs

1// Copyright (C) 2025-2026 Michael S. Klishin and Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use crate::entry_metadata::labels::LogEntryLabels;
15use crate::{Result, Severity};
16use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
17use nom::{
18    Err as NomErr, IResult, Parser,
19    branch::alt,
20    bytes::complete::tag,
21    character::complete::{char, digit1, space1},
22    combinator::{map_res, recognize},
23    error::{Error as NomError, ErrorKind},
24    sequence::delimited,
25};
26use regex::Regex;
27use std::fs::File;
28use std::io::{BufRead, BufReader};
29use std::path::Path;
30use std::sync::LazyLock;
31
32static ANSI_ESCAPE_RE: LazyLock<Regex> =
33    LazyLock::new(|| Regex::new(r"\x1b\[[0-9;]*m").expect("Invalid ANSI regex"));
34
35#[inline]
36fn strip_ansi_codes(input: &str) -> std::borrow::Cow<'_, str> {
37    ANSI_ESCAPE_RE.replace_all(input, "")
38}
39
40/// Initial capacity for the entries vector during parsing.
41/// This value is not very scientific but avoids many reallocations
42/// compared to the original version with a much smaller initial capacity,
43/// according to benchmarks and profiling.
44const INITIAL_ENTRIES_CAPACITY: usize = 16384;
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47pub struct ParsedLogEntry {
48    pub sequence_id: usize,
49    pub explicit_id: Option<i64>,
50    pub timestamp: DateTime<Utc>,
51    pub severity: Severity,
52    pub process_id: String,
53    pub message: String,
54    pub message_lowercased: String,
55    pub subsystem_id: Option<i16>,
56    pub labels: LogEntryLabels,
57    pub resolution_or_discussion_url_id: Option<i16>,
58    pub doc_url_id: Option<i16>,
59}
60
61impl ParsedLogEntry {
62    /// Check if this log entry spans multiple lines
63    #[inline]
64    pub fn is_multiline(&self) -> bool {
65        self.message.contains('\n')
66    }
67
68    #[inline]
69    fn is_continuation_of(&self, other: &ParsedLogEntry) -> bool {
70        self.timestamp == other.timestamp
71            && self.severity == other.severity
72            && self.process_id == other.process_id
73    }
74
75    #[inline]
76    fn append_continuation(&mut self, content: &str) {
77        self.message.reserve(1 + content.len());
78        self.message.push('\n');
79        self.message.push_str(content);
80
81        self.message_lowercased.reserve(1 + content.len());
82        self.message_lowercased.push('\n');
83        self.message_lowercased.push_str(&content.to_lowercase());
84    }
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct ParseResult {
89    pub entries: Vec<ParsedLogEntry>,
90    pub total_lines: usize,
91}
92
93fn process_new_entry(
94    entries: &mut Vec<ParsedLogEntry>,
95    current_entry: &mut Option<ParsedLogEntry>,
96    new_entry: ParsedLogEntry,
97) {
98    let is_continuation = current_entry
99        .as_ref()
100        .is_some_and(|prev| new_entry.is_continuation_of(prev));
101
102    match (is_continuation, current_entry.as_mut()) {
103        (true, Some(prev_entry)) => {
104            prev_entry.append_continuation(&new_entry.message);
105        }
106        (false, _) => {
107            if let Some(prev_entry) = current_entry.take() {
108                entries.push(prev_entry);
109            }
110            *current_entry = Some(new_entry);
111        }
112        (true, None) => {}
113    }
114}
115
116fn process_continuation_line(
117    current_entry: &mut Option<ParsedLogEntry>,
118    line: &str,
119    line_number: usize,
120) {
121    if let Some(entry) = current_entry {
122        entry.append_continuation(line.trim_end());
123    } else {
124        log::warn!("Orphaned continuation line {}: {}", line_number + 1, line);
125    }
126}
127
128pub fn parse_log_file<R: BufRead>(reader: R) -> Result<ParseResult> {
129    let mut entries = Vec::with_capacity(INITIAL_ENTRIES_CAPACITY);
130    let mut current_entry: Option<ParsedLogEntry> = None;
131    let mut total_lines = 0;
132
133    for (line_number, line_result) in reader.lines().enumerate() {
134        total_lines = line_number + 1;
135        let line = line_result.map_err(|e| crate::Error::ReadLine {
136            line: total_lines,
137            source: e,
138        })?;
139
140        let stripped_line = strip_ansi_codes(&line);
141        match parse_log_entry(&stripped_line) {
142            Ok((_, entry)) => process_new_entry(&mut entries, &mut current_entry, entry),
143            Err(_) => process_continuation_line(&mut current_entry, &stripped_line, line_number),
144        }
145    }
146
147    if let Some(entry) = current_entry {
148        entries.push(entry);
149    }
150
151    for (i, entry) in entries.iter_mut().enumerate() {
152        entry.sequence_id = i;
153    }
154
155    Ok(ParseResult {
156        entries,
157        total_lines,
158    })
159}
160
161pub fn count_log_lines(path: &Path) -> Result<usize> {
162    let file = File::open(path)?;
163    let reader = BufReader::new(file);
164    Ok(reader.lines().count())
165}
166
167/// Parses a single log entry line.
168/// Format: "2025-10-27 11:23:27.566558-07:00 [notice] <0.208.0> Message"
169fn parse_log_entry(input: &str) -> IResult<&str, ParsedLogEntry> {
170    alt((parse_standard_log_entry, parse_sasl_report_header)).parse(input)
171}
172
173fn parse_standard_log_entry(input: &str) -> IResult<&str, ParsedLogEntry> {
174    let (input, timestamp) = parse_timestamp(input)?;
175    let (input, _) = space1.parse(input)?;
176    let (input, severity) = parse_severity(input)?;
177    let (input, _) = space1.parse(input)?;
178    let (input, process_id) = parse_process_id(input)?;
179    let (input, _) = char(' ').parse(input)?;
180    let trimmed_message = input.trim_end();
181    let message = trimmed_message.to_string();
182    let message_lowercased = trimmed_message.to_lowercase();
183
184    Ok((
185        "",
186        ParsedLogEntry {
187            sequence_id: 0,
188            explicit_id: None,
189            timestamp,
190            severity,
191            process_id,
192            message,
193            message_lowercased,
194            subsystem_id: None,
195            labels: LogEntryLabels::default(),
196            resolution_or_discussion_url_id: None,
197            doc_url_id: None,
198        },
199    ))
200}
201
202/// Parses an OTP SASL report header. These can still be logged in some cases.
203/// Format: "=INFO REPORT==== 4-Dec-2025::19:22:30.888840 ==="
204fn parse_sasl_report_header(input: &str) -> IResult<&str, ParsedLogEntry> {
205    let (input, _) = char('=').parse(input)?;
206    let (input, severity) = parse_sasl_severity(input)?;
207    let (input, _) = tag(" REPORT==== ").parse(input)?;
208    let (input, timestamp) = parse_sasl_timestamp(input)?;
209    let (_, _) = tag(" ===").parse(input)?;
210
211    Ok((
212        "",
213        ParsedLogEntry {
214            sequence_id: 0,
215            explicit_id: None,
216            timestamp,
217            severity,
218            process_id: "<0.0.0>".to_string(),
219            message: String::new(),
220            message_lowercased: String::new(),
221            subsystem_id: None,
222            labels: LogEntryLabels::default(),
223            resolution_or_discussion_url_id: None,
224            doc_url_id: None,
225        },
226    ))
227}
228
229fn parse_sasl_severity(input: &str) -> IResult<&str, Severity> {
230    let (input, severity_str) = alt((
231        tag("DEBUG"),
232        tag("INFO"),
233        tag("NOTICE"),
234        tag("WARNING"),
235        tag("ERROR"),
236        tag("CRITICAL"),
237    ))
238    .parse(input)?;
239
240    let severity = match severity_str {
241        "DEBUG" => Severity::Debug,
242        "INFO" => Severity::Info,
243        "NOTICE" => Severity::Notice,
244        "WARNING" => Severity::Warning,
245        "ERROR" => Severity::Error,
246        "CRITICAL" => Severity::Critical,
247        _ => return Err(NomErr::Error(NomError::new(input, ErrorKind::Tag))),
248    };
249
250    Ok((input, severity))
251}
252
253fn month_name_to_number(name: &str) -> Option<u32> {
254    match name {
255        "Jan" => Some(1),
256        "Feb" => Some(2),
257        "Mar" => Some(3),
258        "Apr" => Some(4),
259        "May" => Some(5),
260        "Jun" => Some(6),
261        "Jul" => Some(7),
262        "Aug" => Some(8),
263        "Sep" => Some(9),
264        "Oct" => Some(10),
265        "Nov" => Some(11),
266        "Dec" => Some(12),
267        _ => None,
268    }
269}
270
271/// Parse a SASL timestamp in the format of "4-Dec-2025::19:22:30.888840"
272fn parse_sasl_timestamp(input: &str) -> IResult<&str, DateTime<Utc>> {
273    let (input, day) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
274    let (input, _) = char('-').parse(input)?;
275    let (input, month_str) = alt((
276        tag("Jan"),
277        tag("Feb"),
278        tag("Mar"),
279        tag("Apr"),
280        tag("May"),
281        tag("Jun"),
282        tag("Jul"),
283        tag("Aug"),
284        tag("Sep"),
285        tag("Oct"),
286        tag("Nov"),
287        tag("Dec"),
288    ))
289    .parse(input)?;
290    let (input, _) = char('-').parse(input)?;
291    let (input, year) = map_res(digit1, |s: &str| s.parse::<i32>()).parse(input)?;
292    let (input, _) = tag("::").parse(input)?;
293    let (input, hour) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
294    let (input, _) = char(':').parse(input)?;
295    let (input, minute) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
296    let (input, _) = char(':').parse(input)?;
297    let (input, second) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
298    let (input, _) = char('.').parse(input)?;
299    let (input, microseconds) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
300
301    let month = month_name_to_number(month_str)
302        .ok_or_else(|| NomErr::Error(NomError::new(input, ErrorKind::Tag)))?;
303
304    let datetime = build_datetime((year, month, day), (hour, minute, second, microseconds), 0)
305        .map_err(|_| NomErr::Error(NomError::new(input, ErrorKind::Verify)))?;
306
307    Ok((input, datetime))
308}
309
310#[inline]
311fn parse_date(input: &str) -> IResult<&str, (i32, u32, u32)> {
312    let (input, year) = map_res(digit1, |s: &str| s.parse::<i32>()).parse(input)?;
313    let (input, _) = char('-').parse(input)?;
314    let (input, month) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
315    let (input, _) = char('-').parse(input)?;
316    let (input, day) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
317    Ok((input, (year, month, day)))
318}
319
320#[inline]
321fn parse_time(input: &str) -> IResult<&str, (u32, u32, u32, u32)> {
322    let (input, hour) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
323    let (input, _) = char(':').parse(input)?;
324    let (input, minute) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
325    let (input, _) = char(':').parse(input)?;
326    let (input, second) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
327    let (input, _) = char('.').parse(input)?;
328    let (input, microseconds) = map_res(digit1, |s: &str| s.parse::<u32>()).parse(input)?;
329    Ok((input, (hour, minute, second, microseconds)))
330}
331
332#[inline]
333fn parse_timezone(input: &str) -> IResult<&str, i32> {
334    let (input, tz_sign) = alt((char('+'), char('-'))).parse(input)?;
335    let (input, tz_hour) = map_res(digit1, |s: &str| s.parse::<i32>()).parse(input)?;
336    let (input, _) = char(':').parse(input)?;
337    let (input, tz_minute) = map_res(digit1, |s: &str| s.parse::<i32>()).parse(input)?;
338    let tz_offset_seconds = (tz_hour * 3600 + tz_minute * 60) * if tz_sign == '-' { -1 } else { 1 };
339    Ok((input, tz_offset_seconds))
340}
341
342fn nom_verify_error() -> NomErr<NomError<&'static str>> {
343    NomErr::Error(NomError::new("", ErrorKind::Verify))
344}
345
346#[inline]
347fn build_datetime(
348    date: (i32, u32, u32),
349    time: (u32, u32, u32, u32),
350    tz_offset_seconds: i32,
351) -> Result<DateTime<Utc>, NomErr<NomError<&'static str>>> {
352    let (year, month, day) = date;
353    let (hour, minute, second, microseconds) = time;
354
355    let naive_date = NaiveDate::from_ymd_opt(year, month, day).ok_or_else(nom_verify_error)?;
356
357    let naive_time = NaiveTime::from_hms_micro_opt(hour, minute, second, microseconds)
358        .ok_or_else(nom_verify_error)?;
359
360    let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
361
362    let offset = FixedOffset::east_opt(tz_offset_seconds).ok_or_else(nom_verify_error)?;
363
364    let dt = offset
365        .from_local_datetime(&naive_datetime)
366        .single()
367        .ok_or_else(nom_verify_error)?;
368
369    Ok(dt.to_utc())
370}
371
372#[inline]
373fn parse_timestamp(input: &str) -> IResult<&str, DateTime<Utc>> {
374    let (input, date) = parse_date(input)?;
375    let (input, _) = space1.parse(input)?;
376    let (input, time) = parse_time(input)?;
377    let (input, tz_offset) = parse_timezone(input)?;
378    let datetime = build_datetime(date, time, tz_offset)
379        .map_err(|_| NomErr::Error(NomError::new(input, ErrorKind::Verify)))?;
380    Ok((input, datetime))
381}
382
383/// Parses a severity level.
384/// Example: "[notice]", "[debug]", "[info]", "[warning]", "[error]", "[critical]"
385fn parse_severity(input: &str) -> IResult<&str, Severity> {
386    let (input, severity_str) = delimited(
387        char('['),
388        alt((
389            tag("debug"),
390            tag("info"),
391            tag("notice"),
392            tag("warning"),
393            tag("error"),
394            tag("critical"),
395        )),
396        char(']'),
397    )
398    .parse(input)?;
399
400    let severity = severity_str
401        .parse::<Severity>()
402        .map_err(|_| NomErr::Error(NomError::new(input, ErrorKind::Verify)))?;
403
404    Ok((input, severity))
405}
406
407/// Parses an Erlang process ID.
408/// Format: "<0.208.0>"
409fn parse_process_id(input: &str) -> IResult<&str, String> {
410    let (input, pid) = recognize(delimited(
411        char('<'),
412        (digit1, char('.'), digit1, char('.'), digit1),
413        char('>'),
414    ))
415    .parse(input)?;
416
417    Ok((input, pid.to_string()))
418}