Skip to main content

dm_database_parser_sqllog/
parser.rs

1use memchr::memmem::Finder;
2use memchr::{memchr, memrchr};
3use memmap2::Mmap;
4use simdutf8::basic::from_utf8 as simd_from_utf8;
5use std::borrow::Cow;
6use std::fs::File;
7use std::path::Path;
8use std::sync::LazyLock;
9
10use crate::error::ParseError;
11use crate::sqllog::Sqllog;
12use encoding::all::GB18030;
13use encoding::{DecoderTrap, Encoding};
14
15/// Pre-built SIMD searcher for the `") "` meta-close pattern.
16/// Avoids rebuilding the Finder on every record parse.
17static FINDER_CLOSE_META: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b") "));
18
19#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
20pub(crate) enum FileEncodingHint {
21    #[default]
22    Auto,
23    Utf8,
24    Gb18030,
25}
26
27pub struct LogParser {
28    mmap: Mmap,
29    encoding: FileEncodingHint,
30}
31
32impl LogParser {
33    pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, ParseError> {
34        let file = File::open(path).map_err(|e| ParseError::IoError(e.to_string()))?;
35        let mmap = unsafe { Mmap::map(&file).map_err(|e| ParseError::IoError(e.to_string()))? };
36
37        // Sample the first 64 KB to determine encoding.
38        let sample = &mmap[..mmap.len().min(65536)];
39        let encoding = if simd_from_utf8(sample).is_ok() {
40            FileEncodingHint::Utf8
41        } else {
42            FileEncodingHint::Gb18030
43        };
44
45        Ok(Self { mmap, encoding })
46    }
47
48    pub fn iter(&self) -> LogIterator<'_> {
49        LogIterator {
50            data: &self.mmap,
51            pos: 0,
52            encoding: self.encoding,
53        }
54    }
55
56    /// Returns a Rayon parallel iterator over all log records.
57    ///
58    /// Splits the file into CPU-count chunks at record boundaries and
59    /// processes each chunk on a separate thread.
60    pub fn par_iter(
61        &self,
62    ) -> impl rayon::iter::ParallelIterator<Item = Result<Sqllog<'_>, ParseError>> + '_ {
63        use rayon::prelude::*;
64
65        let data: &[u8] = &self.mmap;
66        let encoding = self.encoding;
67        let num_threads = rayon::current_num_threads().max(1);
68
69        // Find chunk start positions at record boundaries
70        let mut starts: Vec<usize> = vec![0];
71        if !data.is_empty() {
72            let chunk_size = (data.len() / num_threads).max(1);
73            for i in 1..num_threads {
74                let boundary = find_next_record_start(data, i * chunk_size);
75                if boundary < data.len() {
76                    starts.push(boundary);
77                }
78            }
79        }
80        starts.push(data.len());
81        starts.dedup();
82
83        // Pair up (start, end) boundaries, collect to Vec so we can par_iter
84        let bounds: Vec<(usize, usize)> = starts.windows(2).map(|w| (w[0], w[1])).collect();
85
86        bounds
87            .into_par_iter()
88            .flat_map_iter(move |(start, end)| LogIterator {
89                data: &data[start..end],
90                pos: 0,
91                encoding,
92            })
93    }
94}
95
96pub struct LogIterator<'a> {
97    data: &'a [u8],
98    pos: usize,
99    encoding: FileEncodingHint,
100}
101
102impl<'a> Iterator for LogIterator<'a> {
103    type Item = Result<Sqllog<'a>, ParseError>;
104
105    fn next(&mut self) -> Option<Self::Item> {
106        if self.pos >= self.data.len() {
107            return None;
108        }
109
110        let data = &self.data[self.pos..];
111        let mut scan_pos = 0;
112        let mut found_next = None;
113        let mut is_multiline = false;
114
115        while let Some(idx) = memchr(b'\n', &data[scan_pos..]) {
116            let newline_idx = scan_pos + idx;
117            let next_line_start = newline_idx + 1;
118
119            if next_line_start >= data.len() {
120                break;
121            }
122
123            // Check if next line starts with timestamp
124            let check_len = std::cmp::min(23, data.len() - next_line_start);
125            if check_len == 23 {
126                let next_bytes = &data[next_line_start..next_line_start + 23];
127                // Fast check: 20xx and separators
128                if next_bytes[0] == b'2'
129                    && next_bytes[1] == b'0'
130                    && next_bytes[4] == b'-'
131                    && next_bytes[7] == b'-'
132                    && next_bytes[10] == b' '
133                    && next_bytes[13] == b':'
134                    && next_bytes[16] == b':'
135                    && next_bytes[19] == b'.'
136                {
137                    found_next = Some(newline_idx);
138                    break;
139                }
140            }
141
142            is_multiline = true;
143            scan_pos = next_line_start;
144        }
145
146        let (record_end, next_start) = if let Some(idx) = found_next {
147            (idx, idx + 1)
148        } else {
149            (data.len(), data.len())
150        };
151
152        let record_slice = &data[..record_end];
153        self.pos += next_start;
154
155        // Trim trailing CR if present
156        let record_slice = if record_slice.ends_with(b"\r") {
157            &record_slice[..record_slice.len() - 1]
158        } else {
159            record_slice
160        };
161
162        if record_slice.is_empty() {
163            return self.next();
164        }
165
166        Some(parse_record_with_hint(
167            record_slice,
168            is_multiline,
169            self.encoding,
170        ))
171    }
172}
173
174/// Find the position of the next record start at or after `from`.
175/// A record start is a line beginning with a timestamp pattern.
176fn find_next_record_start(data: &[u8], from: usize) -> usize {
177    let mut pos = from;
178    // Skip to start of next line
179    if let Some(nl) = memchr(b'\n', &data[pos..]) {
180        pos += nl + 1;
181    } else {
182        return data.len();
183    }
184    // Scan forward for a line starting with timestamp
185    loop {
186        if pos + 23 > data.len() {
187            return data.len();
188        }
189        let peek = &data[pos..pos + 23];
190        if peek[0] == b'2'
191            && peek[1] == b'0'
192            && peek[4] == b'-'
193            && peek[7] == b'-'
194            && peek[10] == b' '
195            && peek[13] == b':'
196            && peek[16] == b':'
197            && peek[19] == b'.'
198        {
199            return pos;
200        }
201        // Skip to next line
202        match memchr(b'\n', &data[pos..]) {
203            Some(nl) => pos += nl + 1,
204            None => return data.len(),
205        }
206    }
207}
208
209pub fn parse_record<'a>(record_bytes: &'a [u8]) -> Result<Sqllog<'a>, ParseError> {
210    parse_record_with_hint(record_bytes, true, FileEncodingHint::Auto)
211}
212
213fn parse_record_with_hint<'a>(
214    record_bytes: &'a [u8],
215    is_multiline: bool,
216    encoding_hint: FileEncodingHint,
217) -> Result<Sqllog<'a>, ParseError> {
218    // Find end of first line
219    let (first_line, _rest) = if is_multiline {
220        match memchr(b'\n', record_bytes) {
221            Some(idx) => {
222                let mut line = &record_bytes[..idx];
223                if line.ends_with(b"\r") {
224                    line = &line[..line.len() - 1];
225                }
226                (line, &record_bytes[idx + 1..])
227            }
228            None => {
229                let mut line = record_bytes;
230                if line.ends_with(b"\r") {
231                    line = &line[..line.len() - 1];
232                }
233                (line, &[] as &[u8])
234            }
235        }
236    } else {
237        let mut line = record_bytes;
238        if line.ends_with(b"\r") {
239            line = &line[..line.len() - 1];
240        }
241        (line, &[] as &[u8])
242    };
243
244    // 1. Timestamp
245    if first_line.len() < 23 {
246        return Err(ParseError::InvalidFormat {
247            raw: String::from_utf8_lossy(first_line).to_string(),
248        });
249    }
250    // We assume ASCII/UTF-8 for timestamp
251    // SAFETY: We validated the timestamp format in LogIterator::next using is_ts_millis_bytes,
252    // which ensures it contains only digits and separators.
253    let ts = unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(&first_line[0..23])) };
254
255    // 2. Meta
256    // Format: TS (META) BODY
257    // Find first '(' after TS
258    let meta_start = match memchr(b'(', &first_line[23..]) {
259        Some(idx) => 23 + idx,
260        None => {
261            return Err(ParseError::InvalidFormat {
262                raw: String::from_utf8_lossy(first_line).to_string(),
263            });
264        }
265    };
266
267    // Find closing ')' for meta using pre-built SIMD Finder.
268    let meta_end = match FINDER_CLOSE_META.find(&first_line[meta_start..]) {
269        Some(idx) => Some(meta_start + idx),
270        None => memrchr(b')', &first_line[meta_start..]).map(|idx| meta_start + idx),
271    };
272
273    let meta_end = match meta_end {
274        Some(idx) => idx,
275        None => {
276            return Err(ParseError::InvalidFormat {
277                raw: String::from_utf8_lossy(first_line).to_string(),
278            });
279        }
280    };
281
282    let meta_bytes = &first_line[meta_start + 1..meta_end];
283    // Lazy parsing: store raw bytes
284    // SAFETY: meta_bytes is a sub-slice of first_line, which is 'a.
285    // Use the provided encoding hint (file-level autodetection) to decide how to decode meta bytes.
286    let meta_raw = match encoding_hint {
287        FileEncodingHint::Utf8 => match simd_from_utf8(meta_bytes) {
288            Ok(s) => {
289                // SAFETY: meta_bytes is a sub-slice of first_line which lives for 'a
290                unsafe {
291                    Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
292                        s.as_ptr(),
293                        s.len(),
294                    )))
295                }
296            }
297            Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
298        },
299        FileEncodingHint::Gb18030 => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
300            Ok(s) => Cow::Owned(s),
301            Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
302        },
303        FileEncodingHint::Auto => match simd_from_utf8(meta_bytes) {
304            Ok(s) => {
305                // SAFETY: meta_bytes is a sub-slice of first_line which lives for 'a
306                unsafe {
307                    Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
308                        s.as_ptr(),
309                        s.len(),
310                    )))
311                }
312            }
313            Err(_) => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
314                Ok(s) => Cow::Owned(s),
315                Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
316            },
317        },
318    };
319
320    // 3. Body & 4. Indicators
321    let body_start_in_first_line = meta_end + 1;
322
323    // The ") " pattern guarantees one space; skip it directly.
324    let content_start = if body_start_in_first_line < first_line.len()
325        && first_line[body_start_in_first_line] == b' '
326    {
327        body_start_in_first_line + 1
328    } else {
329        body_start_in_first_line
330    };
331
332    // Extract optional leading tag like [SEL] or [ORA]
333    let mut tag: Option<Cow<'a, str>> = None;
334    let content_slice = if content_start < record_bytes.len() {
335        let mut s = &record_bytes[content_start..];
336        // If it starts with '[', try to find matching ']' and treat inner token as tag
337        if !s.is_empty()
338            && s[0] == b'['
339            && let Some(end_idx) = memchr(b']', s)
340            && end_idx >= 1
341        {
342            let inner = &s[1..end_idx];
343            // Accept token without spaces and reasonable length
344            if !inner.contains(&b' ') && inner.len() <= 32 {
345                tag = match simd_from_utf8(inner) {
346                    Ok(st) => Some(unsafe {
347                        // SAFETY: inner is a sub-slice of record_bytes which lives for 'a
348                        Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
349                            st.as_ptr(),
350                            st.len(),
351                        )))
352                    }),
353                    Err(_) => match encoding_hint {
354                        FileEncodingHint::Gb18030 => {
355                            match GB18030.decode(inner, DecoderTrap::Strict) {
356                                Ok(s) => Some(Cow::Owned(s)),
357                                Err(_) => {
358                                    Some(Cow::Owned(String::from_utf8_lossy(inner).into_owned()))
359                                }
360                            }
361                        }
362                        _ => Some(Cow::Owned(String::from_utf8_lossy(inner).into_owned())),
363                    },
364                };
365                // Move past the closing ']' and any following ASCII whitespace
366                s = &s[end_idx + 1..];
367                let mut skip = 0usize;
368                while skip < s.len() && s[skip].is_ascii_whitespace() {
369                    skip += 1;
370                }
371                s = &s[skip..];
372            }
373        }
374        s
375    } else {
376        &[] as &[u8]
377    };
378
379    let content_raw = Cow::Borrowed(content_slice);
380
381    Ok(Sqllog {
382        ts,
383        meta_raw,
384        content_raw,
385        tag,
386        encoding: encoding_hint,
387    })
388}