rlqt_lib/
parser.rs

1// Copyright (C) 2025-2026 Michael S. Klishin and Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use crate::entry_metadata::labels::LogEntryLabels;
15use crate::{Result, Severity};
16use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
17use nom::{
18    Err as NomErr, IResult,
19    branch::alt,
20    bytes::complete::tag,
21    character::complete::{char, digit1, space1},
22    combinator::{map_res, recognize},
23    error::{Error as NomError, ErrorKind},
24    sequence::{delimited, tuple},
25};
26use std::fs::File;
27use std::io::{BufRead, BufReader};
28use std::path::Path;
29
30/// Initial capacity for the entries vector during parsing.
31/// This value is not very scientific but helps a lot of reallocations
32/// compared to the original version with a much smaller initial capacity,
33/// according to benchmarks and profiling.
34const INITIAL_ENTRIES_CAPACITY: usize = 16384;
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct ParsedLogEntry {
38    pub sequence_id: usize,
39    pub explicit_id: Option<i64>,
40    pub timestamp: DateTime<Utc>,
41    pub severity: Severity,
42    pub process_id: String,
43    pub message: String,
44    pub message_lowercased: String,
45    pub subsystem_id: Option<i16>,
46    pub labels: LogEntryLabels,
47    pub resolution_or_discussion_url_id: Option<i16>,
48    pub doc_url_id: Option<i16>,
49}
50
51impl ParsedLogEntry {
52    /// Check if this log entry spans multiple lines
53    #[inline]
54    pub fn is_multiline(&self) -> bool {
55        self.message.contains('\n')
56    }
57
58    #[inline]
59    fn is_continuation_of(&self, other: &ParsedLogEntry) -> bool {
60        self.timestamp == other.timestamp
61            && self.severity == other.severity
62            && self.process_id == other.process_id
63    }
64
65    #[inline]
66    fn append_continuation(&mut self, content: &str) {
67        self.message.reserve(1 + content.len());
68        self.message.push('\n');
69        self.message.push_str(content);
70
71        self.message_lowercased.push('\n');
72        for c in content.chars() {
73            self.message_lowercased.extend(c.to_lowercase());
74        }
75    }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct ParseResult {
80    pub entries: Vec<ParsedLogEntry>,
81    pub total_lines: usize,
82}
83
84fn process_new_entry(
85    entries: &mut Vec<ParsedLogEntry>,
86    current_entry: &mut Option<ParsedLogEntry>,
87    new_entry: ParsedLogEntry,
88) {
89    let is_continuation = current_entry
90        .as_ref()
91        .is_some_and(|prev| new_entry.is_continuation_of(prev));
92
93    match (is_continuation, current_entry.as_mut()) {
94        (true, Some(prev_entry)) => {
95            prev_entry.append_continuation(&new_entry.message);
96        }
97        (false, _) => {
98            if let Some(prev_entry) = current_entry.take() {
99                entries.push(prev_entry);
100            }
101            *current_entry = Some(new_entry);
102        }
103        (true, None) => {}
104    }
105}
106
107fn process_continuation_line(
108    current_entry: &mut Option<ParsedLogEntry>,
109    line: &str,
110    line_number: usize,
111) {
112    if let Some(entry) = current_entry {
113        entry.append_continuation(line.trim_end());
114    } else {
115        log::warn!("Orphaned continuation line {}: {}", line_number + 1, line);
116    }
117}
118
119pub fn parse_log_file<R: BufRead>(reader: R) -> Result<ParseResult> {
120    let mut entries = Vec::with_capacity(INITIAL_ENTRIES_CAPACITY);
121    let mut current_entry: Option<ParsedLogEntry> = None;
122    let mut total_lines = 0;
123
124    for (line_number, line_result) in reader.lines().enumerate() {
125        total_lines = line_number + 1;
126        let line = line_result.map_err(|e| crate::Error::ReadLine {
127            line: total_lines,
128            source: e,
129        })?;
130
131        match parse_log_entry(&line) {
132            Ok((_, entry)) => process_new_entry(&mut entries, &mut current_entry, entry),
133            Err(_) => process_continuation_line(&mut current_entry, &line, line_number),
134        }
135    }
136
137    if let Some(entry) = current_entry {
138        entries.push(entry);
139    }
140
141    for (i, entry) in entries.iter_mut().enumerate() {
142        entry.sequence_id = i;
143    }
144
145    Ok(ParseResult {
146        entries,
147        total_lines,
148    })
149}
150
151pub fn count_log_lines(path: &Path) -> Result<usize> {
152    let file = File::open(path)?;
153    let reader = BufReader::new(file);
154    Ok(reader.lines().count())
155}
156
157/// Parse a single log entry line
158/// Format: "2025-10-27 11:23:27.566558-07:00 [notice] <0.208.0> Message"
159fn parse_log_entry(input: &str) -> IResult<&str, ParsedLogEntry> {
160    let (input, timestamp) = parse_timestamp(input)?;
161    let (input, _) = space1(input)?;
162    let (input, severity) = parse_severity(input)?;
163    let (input, _) = space1(input)?;
164    let (input, process_id) = parse_process_id(input)?;
165    let (input, _) = char(' ')(input)?;
166    let trimmed_message = input.trim_end();
167    let message = trimmed_message.to_string();
168    let message_lowercased = trimmed_message.to_lowercase();
169
170    Ok((
171        "",
172        ParsedLogEntry {
173            sequence_id: 0,
174            explicit_id: None,
175            timestamp,
176            severity,
177            process_id,
178            message,
179            message_lowercased,
180            subsystem_id: None,
181            labels: LogEntryLabels::default(),
182            resolution_or_discussion_url_id: None,
183            doc_url_id: None,
184        },
185    ))
186}
187
188fn parse_date(input: &str) -> IResult<&str, (i32, u32, u32)> {
189    let (input, year) = map_res(digit1, |s: &str| s.parse::<i32>())(input)?;
190    let (input, _) = char('-')(input)?;
191    let (input, month) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
192    let (input, _) = char('-')(input)?;
193    let (input, day) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
194    Ok((input, (year, month, day)))
195}
196
197fn parse_time(input: &str) -> IResult<&str, (u32, u32, u32, u32)> {
198    let (input, hour) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
199    let (input, _) = char(':')(input)?;
200    let (input, minute) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
201    let (input, _) = char(':')(input)?;
202    let (input, second) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
203    let (input, _) = char('.')(input)?;
204    let (input, microseconds) = map_res(digit1, |s: &str| s.parse::<u32>())(input)?;
205    Ok((input, (hour, minute, second, microseconds)))
206}
207
208fn parse_timezone(input: &str) -> IResult<&str, i32> {
209    let (input, tz_sign) = alt((char('+'), char('-')))(input)?;
210    let (input, tz_hour) = map_res(digit1, |s: &str| s.parse::<i32>())(input)?;
211    let (input, _) = char(':')(input)?;
212    let (input, tz_minute) = map_res(digit1, |s: &str| s.parse::<i32>())(input)?;
213    let tz_offset_seconds = (tz_hour * 3600 + tz_minute * 60) * if tz_sign == '-' { -1 } else { 1 };
214    Ok((input, tz_offset_seconds))
215}
216
217fn nom_verify_error() -> NomErr<NomError<&'static str>> {
218    NomErr::Error(NomError::new("", ErrorKind::Verify))
219}
220
221fn build_datetime(
222    date: (i32, u32, u32),
223    time: (u32, u32, u32, u32),
224    tz_offset_seconds: i32,
225) -> Result<DateTime<Utc>, NomErr<NomError<&'static str>>> {
226    let (year, month, day) = date;
227    let (hour, minute, second, microseconds) = time;
228
229    let naive_date = NaiveDate::from_ymd_opt(year, month, day).ok_or_else(nom_verify_error)?;
230
231    let naive_time = NaiveTime::from_hms_micro_opt(hour, minute, second, microseconds)
232        .ok_or_else(nom_verify_error)?;
233
234    let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
235
236    let offset = FixedOffset::east_opt(tz_offset_seconds).ok_or_else(nom_verify_error)?;
237
238    let dt = offset
239        .from_local_datetime(&naive_datetime)
240        .single()
241        .ok_or_else(nom_verify_error)?;
242
243    Ok(dt.to_utc())
244}
245
246fn parse_timestamp(input: &str) -> IResult<&str, DateTime<Utc>> {
247    let (input, date) = parse_date(input)?;
248    let (input, _) = space1(input)?;
249    let (input, time) = parse_time(input)?;
250    let (input, tz_offset) = parse_timezone(input)?;
251    let datetime = build_datetime(date, time, tz_offset)
252        .map_err(|_| NomErr::Error(NomError::new(input, ErrorKind::Verify)))?;
253    Ok((input, datetime))
254}
255
256/// Parses a severity level.
257/// Example: "[notice]", "[debug]", "[info]", "[warning]", "[error]", "[critical]"
258fn parse_severity(input: &str) -> IResult<&str, Severity> {
259    let (input, severity_str) = delimited(
260        char('['),
261        alt((
262            tag("debug"),
263            tag("info"),
264            tag("notice"),
265            tag("warning"),
266            tag("error"),
267            tag("critical"),
268        )),
269        char(']'),
270    )(input)?;
271
272    let severity = severity_str
273        .parse::<Severity>()
274        .map_err(|_| NomErr::Error(NomError::new(input, ErrorKind::Verify)))?;
275
276    Ok((input, severity))
277}
278
279/// Parses an Erlang process ID.
280/// Example: "<0.208.0>"
281fn parse_process_id(input: &str) -> IResult<&str, String> {
282    let (input, pid) = recognize(delimited(
283        char('<'),
284        tuple((digit1, char('.'), digit1, char('.'), digit1)),
285        char('>'),
286    ))(input)?;
287
288    Ok((input, pid.to_string()))
289}