dm_database_parser_sqllog/
parser.rs

1use memchr::{memchr, memrchr};
2use memmap2::Mmap;
3use std::borrow::Cow;
4use std::fs::File;
5use std::path::Path;
6
7use crate::error::ParseError;
8use crate::sqllog::Sqllog;
9
10pub struct LogParser {
11    mmap: Mmap,
12}
13
14impl LogParser {
15    pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, ParseError> {
16        let file = File::open(path).map_err(|e| ParseError::IoError(e.to_string()))?;
17        let mmap = unsafe { Mmap::map(&file).map_err(|e| ParseError::IoError(e.to_string()))? };
18        Ok(Self { mmap })
19    }
20
21    pub fn iter(&self) -> LogIterator<'_> {
22        LogIterator {
23            data: &self.mmap,
24            pos: 0,
25        }
26    }
27}
28
29pub struct LogIterator<'a> {
30    data: &'a [u8],
31    pos: usize,
32}
33
34impl<'a> Iterator for LogIterator<'a> {
35    type Item = Result<Sqllog<'a>, ParseError>;
36
37    fn next(&mut self) -> Option<Self::Item> {
38        if self.pos >= self.data.len() {
39            return None;
40        }
41
42        let data = &self.data[self.pos..];
43        let mut scan_pos = 0;
44        let mut found_next = None;
45        let mut is_multiline = false;
46
47        while let Some(idx) = memchr(b'\n', &data[scan_pos..]) {
48            let newline_idx = scan_pos + idx;
49            let next_line_start = newline_idx + 1;
50
51            if next_line_start >= data.len() {
52                break;
53            }
54
55            // Check if next line starts with timestamp
56            let check_len = std::cmp::min(23, data.len() - next_line_start);
57            if check_len == 23 {
58                let next_bytes = &data[next_line_start..next_line_start + 23];
59                // Fast check: 20xx and separators
60                if next_bytes[0] == b'2'
61                    && next_bytes[1] == b'0'
62                    && next_bytes[4] == b'-'
63                    && next_bytes[7] == b'-'
64                    && next_bytes[10] == b' '
65                    && next_bytes[13] == b':'
66                    && next_bytes[16] == b':'
67                    && next_bytes[19] == b'.'
68                {
69                    found_next = Some(newline_idx);
70                    break;
71                }
72            }
73
74            is_multiline = true;
75            scan_pos = next_line_start;
76        }
77
78        let (record_end, next_start) = if let Some(idx) = found_next {
79            (idx, idx + 1)
80        } else {
81            (data.len(), data.len())
82        };
83
84        let record_slice = &data[..record_end];
85        self.pos += next_start;
86
87        // Trim trailing CR if present
88        let record_slice = if record_slice.ends_with(b"\r") {
89            &record_slice[..record_slice.len() - 1]
90        } else {
91            record_slice
92        };
93
94        if record_slice.is_empty() {
95            return self.next();
96        }
97
98        Some(parse_record_with_hint(record_slice, is_multiline))
99    }
100}
101
102pub fn parse_record<'a>(record_bytes: &'a [u8]) -> Result<Sqllog<'a>, ParseError> {
103    parse_record_with_hint(record_bytes, true)
104}
105
106fn parse_record_with_hint<'a>(
107    record_bytes: &'a [u8],
108    is_multiline: bool,
109) -> Result<Sqllog<'a>, ParseError> {
110    // Find end of first line
111    let (first_line, _rest) = if is_multiline {
112        match memchr(b'\n', record_bytes) {
113            Some(idx) => {
114                let mut line = &record_bytes[..idx];
115                if line.ends_with(b"\r") {
116                    line = &line[..line.len() - 1];
117                }
118                (line, &record_bytes[idx + 1..])
119            }
120            None => {
121                let mut line = record_bytes;
122                if line.ends_with(b"\r") {
123                    line = &line[..line.len() - 1];
124                }
125                (line, &[] as &[u8])
126            }
127        }
128    } else {
129        let mut line = record_bytes;
130        if line.ends_with(b"\r") {
131            line = &line[..line.len() - 1];
132        }
133        (line, &[] as &[u8])
134    };
135
136    // 1. Timestamp
137    if first_line.len() < 23 {
138        return Err(ParseError::InvalidFormat {
139            raw: String::from_utf8_lossy(first_line).to_string(),
140        });
141    }
142    // We assume ASCII/UTF-8 for timestamp
143    // SAFETY: We validated the timestamp format in LogIterator::next using is_ts_millis_bytes,
144    // which ensures it contains only digits and separators.
145    let ts = unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(&first_line[0..23])) };
146
147    // 2. Meta
148    // Format: TS (META) BODY
149    // Find first '(' after TS
150    let meta_start = match memchr(b'(', &first_line[23..]) {
151        Some(idx) => 23 + idx,
152        None => {
153            return Err(ParseError::InvalidFormat {
154                raw: String::from_utf8_lossy(first_line).to_string(),
155            });
156        }
157    };
158
159    // Find closing ')' for meta.
160    // We search for ") " starting from meta_start.
161    // Optimization: use memchr loop instead of windows(2)
162    let mut search_pos = meta_start;
163    let meta_end = loop {
164        match memchr(b')', &first_line[search_pos..]) {
165            Some(idx) => {
166                let abs_idx = search_pos + idx;
167                // Check if followed by space
168                if abs_idx + 1 < first_line.len() && first_line[abs_idx + 1] == b' ' {
169                    break Some(abs_idx);
170                }
171                // If not, continue searching after this ')'
172                search_pos = abs_idx + 1;
173            }
174            None => {
175                // Fallback: find last ')' if ") " not found (robustness)
176                break memrchr(b')', &first_line[meta_start..]).map(|idx| meta_start + idx);
177            }
178        }
179    };
180
181    let meta_end = match meta_end {
182        Some(idx) => idx,
183        None => {
184            return Err(ParseError::InvalidFormat {
185                raw: String::from_utf8_lossy(first_line).to_string(),
186            });
187        }
188    };
189
190    let meta_bytes = &first_line[meta_start + 1..meta_end];
191    // Lazy parsing: store raw bytes
192    // SAFETY: meta_bytes is a sub-slice of first_line, which is 'a.
193    // We assume it's valid UTF-8 (or at least we store it as such for now, validation happens on access if needed,
194    // but actually we just store bytes wrapped in Cow::Borrowed).
195    // Wait, Cow<'a, str> requires valid UTF-8 if Borrowed.
196    // We should use unsafe from_utf8_unchecked because we validated the structure?
197    // No, we haven't validated meta content yet.
198    // But we need to store it in Sqllog.meta_raw which is Cow<'a, str>.
199    // If we use Cow<'a, [u8]>, it would be better. But I used Cow<'a, str> in Sqllog definition.
200    // Let's assume it's UTF-8. It's mostly ASCII.
201    let meta_raw = unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(meta_bytes)) };
202
203    // 3. Body & 4. Indicators
204    let body_start_in_first_line = meta_end + 1;
205
206    let first_line_body = if body_start_in_first_line < first_line.len() {
207        &first_line[body_start_in_first_line..]
208    } else {
209        &[]
210    };
211
212    let start_idx = first_line_body
213        .iter()
214        .position(|b| !b.is_ascii_whitespace())
215        .unwrap_or(first_line_body.len());
216
217    let content_start = body_start_in_first_line + start_idx;
218
219    let content_raw = if content_start < record_bytes.len() {
220        Cow::Borrowed(&record_bytes[content_start..])
221    } else {
222        Cow::Borrowed(&[] as &[u8])
223    };
224
225    Ok(Sqllog {
226        ts,
227        meta_raw,
228        content_raw,
229    })
230}