Skip to main content

dm_database_parser_sqllog/
parser.rs

1use memchr::memmem::Finder;
2use memchr::{memchr, memrchr};
3#[cfg(unix)]
4use memmap2::Advice;
5use memmap2::Mmap;
6use simdutf8::basic::from_utf8 as simd_from_utf8;
7use std::borrow::Cow;
8use std::fs::File;
9use std::path::Path;
10use std::sync::LazyLock;
11
12use crate::error::ParseError;
13use crate::sqllog::Sqllog;
14use encoding::all::GB18030;
15use encoding::{DecoderTrap, Encoding};
16
17/// Pre-built SIMD searcher for the `") "` meta-close pattern.
18/// Avoids rebuilding the Finder on every record parse.
19static FINDER_CLOSE_META: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b") "));
20
21/// Pre-built SIMD searcher for the `"\n20"` record-start pattern.
22/// Shared across threads via LazyLock; constructed once on first use.
23static FINDER_RECORD_START: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"\n20"));
24
25#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
26pub(crate) enum FileEncodingHint {
27    #[default]
28    Auto,
29    Utf8,
30    Gb18030,
31}
32
33pub struct LogParser {
34    mmap: Mmap,
35    encoding: FileEncodingHint,
36}
37
38/// 记录起始字节偏移列表,由 `LogParser::index()` 一次性构建。
39/// 每个元素是某条记录在内存映射缓冲区内的绝对字节偏移。
40/// 用于两阶段并行扫描:先建索引,再按记录数均匀分区给多线程。
41pub struct RecordIndex {
42    pub(crate) offsets: Vec<usize>,
43}
44
45impl RecordIndex {
46    /// 记录总数
47    pub fn len(&self) -> usize {
48        self.offsets.len()
49    }
50
51    /// 是否为空(文件不含任何完整记录)
52    pub fn is_empty(&self) -> bool {
53        self.offsets.is_empty()
54    }
55}
56
57impl LogParser {
58    pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, ParseError> {
59        let file = File::open(path).map_err(|e| ParseError::IoError(e.to_string()))?;
60        let mmap = unsafe { Mmap::map(&file).map_err(|e| ParseError::IoError(e.to_string()))? };
61
62        // HOT-04: 告知 OS 以顺序模式预读 mmap 页面,减少 page fault 开销
63        // Unix-only;Windows 上 advise() 方法不存在,cfg 门控跳过
64        // 失败(如内核不支持)静默忽略,不影响正确性
65        #[cfg(unix)]
66        let _ = mmap.advise(Advice::Sequential);
67
68        // Detect encoding by sampling the first 64 KB and the last 4 KB.
69        // Sampling both ends catches the rare case where GB18030 content only
70        // appears after the initial UTF-8 section (e.g. late-joined non-ASCII
71        // usernames), while keeping the cost well below a full-file scan.
72        //
73        // Known limitation: the middle of large files (> ~68 KB) is not sampled.
74        // GB18030 multi-byte sequences that appear only in the middle of a very
75        // large log file may cause the file to be misclassified as UTF-8, leading
76        // to garbled output for those records. In practice DM log files either use
77        // GB18030 throughout or are entirely ASCII-safe UTF-8, so this edge case
78        // is unlikely. A full middle-window sample could be added if misclassification
79        // is observed in production.
80        let head_size = mmap.len().min(64 * 1024);
81        let tail_start = mmap.len().saturating_sub(4 * 1024).max(head_size);
82        let head_ok = simd_from_utf8(&mmap[..head_size]).is_ok();
83        let tail_ok = tail_start >= mmap.len() || simd_from_utf8(&mmap[tail_start..]).is_ok();
84        let encoding = if head_ok && tail_ok {
85            FileEncodingHint::Utf8
86        } else {
87            FileEncodingHint::Gb18030
88        };
89
90        Ok(Self { mmap, encoding })
91    }
92
93    pub fn iter(&self) -> LogIterator<'_> {
94        LogIterator {
95            data: &self.mmap,
96            pos: 0,
97            encoding: self.encoding,
98        }
99    }
100
101    /// 两阶段扫描第一阶段:构建记录起始字节偏移索引。
102    /// 单线程扫描整个文件,返回的 `RecordIndex` 可直接用于并行处理阶段。
103    pub fn index(&self) -> RecordIndex {
104        let data: &[u8] = &self.mmap;
105        let mut offsets: Vec<usize> = Vec::new();
106
107        // 第 0 条记录:仅当文件首字节即是时间戳时才单独 push
108        // (find_next_record_start 会先跳过首行,所以首行就是时间戳的情况需要单独处理)
109        if data.len() >= 23 && is_timestamp_start(&data[0..23]) {
110            offsets.push(0);
111        }
112
113        let mut pos: usize = 0;
114        loop {
115            let next = find_next_record_start(data, pos);
116            if next >= data.len() {
117                break;
118            }
119            // 防止与首条记录重复 push(首字节即是时间戳的边界情况)
120            if offsets.last() != Some(&next) {
121                offsets.push(next);
122            }
123            // Pitfall 1: pos 必须前进至少 1,否则 find_next_record_start
124            // 在首行就是时间戳时会返回同一个 next,无限循环
125            pos = next.saturating_add(1);
126        }
127        RecordIndex { offsets }
128    }
129
130    /// Returns a Rayon parallel iterator over all log records.
131    ///
132    /// Large files (≥ PAR_THRESHOLD = 32 MB) are split into N byte-aligned chunks
133    /// at record boundaries — O(threads) overhead, not O(records). Small files use a
134    /// single partition so Rayon executes single-threaded without scheduling cost
135    /// (PAR-03 semantics). `index()` is intentionally not called here: a full
136    /// sequential pre-scan would double I/O on mmap'd, I/O-bound workloads.
137    pub fn par_iter(
138        &self,
139    ) -> impl rayon::iter::ParallelIterator<Item = Result<Sqllog<'_>, ParseError>> + '_ {
140        use rayon::prelude::*;
141
142        const PAR_THRESHOLD: usize = 32 * 1024 * 1024;
143
144        let data: &[u8] = &self.mmap;
145        let encoding = self.encoding;
146
147        let bounds: Vec<(usize, usize)> = if data.is_empty() {
148            Vec::new()
149        } else if data.len() < PAR_THRESHOLD {
150            vec![(0, data.len())]
151        } else {
152            let num_threads = rayon::current_num_threads().max(1);
153            let chunk_size = (data.len() / num_threads).max(1);
154            let mut starts: Vec<usize> = vec![0];
155            for i in 1..num_threads {
156                let boundary = find_next_record_start(data, i * chunk_size);
157                if boundary < data.len() {
158                    starts.push(boundary);
159                }
160            }
161            starts.push(data.len());
162            starts.dedup();
163            starts.windows(2).map(|w| (w[0], w[1])).collect()
164        };
165
166        bounds
167            .into_par_iter()
168            .flat_map_iter(move |(start, end)| LogIterator {
169                data: &data[start..end],
170                pos: 0,
171                encoding,
172            })
173    }
174}
175
176pub struct LogIterator<'a> {
177    data: &'a [u8],
178    pos: usize,
179    encoding: FileEncodingHint,
180}
181
182impl<'a> Iterator for LogIterator<'a> {
183    type Item = Result<Sqllog<'a>, ParseError>;
184
185    fn next(&mut self) -> Option<Self::Item> {
186        loop {
187            if self.pos >= self.data.len() {
188                return None;
189            }
190
191            let data = &self.data[self.pos..];
192
193            // 快速路径:先用 memchr 找第一个 '\n',若下一行即是时间戳则为单行记录
194            // 慢速路径(多行):用 FINDER_RECORD_START.find_iter 跳过嵌入换行
195            let (record_end, next_start, is_multiline) = match memchr(b'\n', data) {
196                None => (data.len(), data.len(), false),
197                Some(first_nl) => {
198                    let ts_start = first_nl + 1;
199                    if ts_start + 23 <= data.len()
200                        && is_timestamp_start(&data[ts_start..ts_start + 23])
201                    {
202                        // 单行记录:边界就是第一个 '\n'
203                        (first_nl, ts_start, false)
204                    } else {
205                        // 多行记录:用 memmem 跳过嵌入换行继续搜索
206                        // ALGO-01: find_iter 替代逐行 while-memchr 循环
207                        let mut found_boundary: Option<usize> = None;
208                        for candidate in FINDER_RECORD_START.find_iter(&data[ts_start..]) {
209                            let abs_ts = ts_start + candidate + 1;
210                            if abs_ts + 23 <= data.len()
211                                && is_timestamp_start(&data[abs_ts..abs_ts + 23])
212                            {
213                                found_boundary = Some(ts_start + candidate);
214                                break;
215                            }
216                        }
217                        match found_boundary {
218                            Some(idx) => (idx, idx + 1, true),
219                            None => (data.len(), data.len(), true),
220                        }
221                    }
222                }
223            };
224
225            let record_slice = &data[..record_end];
226            self.pos += next_start;
227
228            // Trim trailing CR if present
229            let record_slice = if record_slice.ends_with(b"\r") {
230                &record_slice[..record_slice.len() - 1]
231            } else {
232                record_slice
233            };
234
235            // Skip empty slices iteratively instead of recursing to avoid stack overflow
236            // when the file contains many consecutive blank lines.
237            if record_slice.is_empty() {
238                continue;
239            }
240
241            return Some(parse_record_with_hint(
242                record_slice,
243                is_multiline,
244                self.encoding,
245            ));
246        }
247    }
248}
249
250/// Find the position of the next record start at or after `from`.
251/// A record start is a line beginning with a timestamp pattern.
252fn find_next_record_start(data: &[u8], from: usize) -> usize {
253    let mut pos = from;
254    // Skip to start of next line
255    if let Some(nl) = memchr(b'\n', &data[pos..]) {
256        pos += nl + 1;
257    } else {
258        return data.len();
259    }
260    // 先检查 pos 本身是否是时间戳行(Finder 不会命中无前置 '\n' 的行首)
261    if pos + 23 <= data.len() && is_timestamp_start(&data[pos..pos + 23]) {
262        return pos;
263    }
264
265    // ALGO-01: memmem 单次扫描替代逐行 memchr loop
266    for candidate in FINDER_RECORD_START.find_iter(&data[pos..]) {
267        let ts_start = pos + candidate + 1;
268        if ts_start + 23 <= data.len() && is_timestamp_start(&data[ts_start..ts_start + 23]) {
269            return ts_start;
270        }
271    }
272    data.len()
273}
274
275pub fn parse_record<'a>(record_bytes: &'a [u8]) -> Result<Sqllog<'a>, ParseError> {
276    // Auto-detect multiline: inspect whether the bytes actually contain a newline
277    // rather than hardcoding true, which caused a redundant memchr scan for
278    // single-line records and was semantically misleading.
279    let is_multiline = memchr(b'\n', record_bytes).is_some();
280    parse_record_with_hint(record_bytes, is_multiline, FileEncodingHint::Auto)
281}
282
283fn parse_record_with_hint<'a>(
284    record_bytes: &'a [u8],
285    is_multiline: bool,
286    encoding_hint: FileEncodingHint,
287) -> Result<Sqllog<'a>, ParseError> {
288    // Find end of first line
289    let (first_line, _rest) = if is_multiline {
290        match memchr(b'\n', record_bytes) {
291            Some(idx) => {
292                let mut line = &record_bytes[..idx];
293                if line.ends_with(b"\r") {
294                    line = &line[..line.len() - 1];
295                }
296                (line, &record_bytes[idx + 1..])
297            }
298            None => {
299                let mut line = record_bytes;
300                if line.ends_with(b"\r") {
301                    line = &line[..line.len() - 1];
302                }
303                (line, &[] as &[u8])
304            }
305        }
306    } else {
307        let mut line = record_bytes;
308        if line.ends_with(b"\r") {
309            line = &line[..line.len() - 1];
310        }
311        (line, &[] as &[u8])
312    };
313
314    // 1. Timestamp
315    if first_line.len() < 23 {
316        return Err(make_invalid_format_error(first_line));
317    }
318    // We assume ASCII/UTF-8 for timestamp
319    // SAFETY: We validated the timestamp format in LogIterator::next using is_ts_millis_bytes,
320    // which ensures it contains only digits and separators.
321    let ts = unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(&first_line[0..23])) };
322
323    // 2. Meta
324    // Format: TS (META) BODY
325    // Find first '(' after TS
326    let meta_start = match memchr(b'(', &first_line[23..]) {
327        Some(idx) => 23 + idx,
328        None => {
329            return Err(make_invalid_format_error(first_line));
330        }
331    };
332
333    // Find closing ')' for meta using pre-built SIMD Finder.
334    let meta_end = match FINDER_CLOSE_META.find(&first_line[meta_start..]) {
335        Some(idx) => Some(meta_start + idx),
336        None => memrchr(b')', &first_line[meta_start..]).map(|idx| meta_start + idx),
337    };
338
339    let meta_end = match meta_end {
340        Some(idx) => idx,
341        None => {
342            return Err(make_invalid_format_error(first_line));
343        }
344    };
345
346    let meta_bytes = &first_line[meta_start + 1..meta_end];
347    // Lazy parsing: store raw bytes as a Cow<'a, str>.
348    // For Utf8 / Auto-UTF8 encoding: meta_bytes is a sub-slice of the memory-mapped buffer
349    // (raw UTF-8 bytes) that lives for 'a — borrowing is sound.
350    // For Gb18030 / Auto-GB18030 encoding: GB18030.decode() produces a new owned String, so
351    // meta_raw becomes Cow::Owned; the 'a lifetime is NOT extended to that allocation.
352    let meta_raw = match encoding_hint {
353        FileEncodingHint::Utf8 => {
354            // File already validated as UTF-8 during `from_path`; skip per-slice re-validation.
355            // SAFETY: meta_bytes is a sub-slice of record_bytes which lives for 'a.
356            // No lifetime extension via from_raw_parts needed — meta_bytes already carries 'a.
357            unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(meta_bytes)) }
358        }
359        FileEncodingHint::Gb18030 => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
360            Ok(s) => Cow::Owned(s),
361            Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
362        },
363        FileEncodingHint::Auto => match simd_from_utf8(meta_bytes) {
364            Ok(_) => {
365                // SAFETY: meta_bytes is a sub-slice of record_bytes which lives for 'a;
366                // simd_from_utf8 confirmed it is valid UTF-8.
367                unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(meta_bytes)) }
368            }
369            Err(_) => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
370                Ok(s) => Cow::Owned(s),
371                Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
372            },
373        },
374    };
375
376    // 3. Body & 4. Indicators
377    let body_start_in_first_line = meta_end + 1;
378
379    // The ") " pattern guarantees one space; skip it directly.
380    let content_start = if body_start_in_first_line < first_line.len()
381        && first_line[body_start_in_first_line] == b' '
382    {
383        body_start_in_first_line + 1
384    } else {
385        body_start_in_first_line
386    };
387
388    // Extract optional leading tag like [SEL] or [ORA]
389    let mut tag: Option<Cow<'a, str>> = None;
390    let content_slice = if content_start < record_bytes.len() {
391        let mut s = &record_bytes[content_start..];
392        // If it starts with '[', try to find matching ']' and treat inner token as tag
393        if !s.is_empty()
394            && s[0] == b'['
395            && let Some(end_idx) = memchr(b']', s)
396            && end_idx >= 1
397        {
398            let inner = &s[1..end_idx];
399            // Accept token without spaces and reasonable length
400            if !inner.contains(&b' ') && inner.len() <= 32 {
401                tag = match encoding_hint {
402                    FileEncodingHint::Utf8 => {
403                        // File already validated as UTF-8; skip re-validation.
404                        // SAFETY: inner is a sub-slice of record_bytes which lives for 'a.
405                        // No from_raw_parts needed — inner already carries 'a lifetime.
406                        Some(unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(inner)) })
407                    }
408                    _ => match simd_from_utf8(inner) {
409                        Ok(_) => Some(unsafe {
410                            // SAFETY: inner is a sub-slice of record_bytes which lives for 'a;
411                            // simd_from_utf8 confirmed it is valid UTF-8.
412                            Cow::Borrowed(std::str::from_utf8_unchecked(inner))
413                        }),
414                        Err(_) => match encoding_hint {
415                            FileEncodingHint::Gb18030 => {
416                                match GB18030.decode(inner, DecoderTrap::Strict) {
417                                    Ok(s) => Some(Cow::Owned(s)),
418                                    Err(_) => Some(Cow::Owned(
419                                        String::from_utf8_lossy(inner).into_owned(),
420                                    )),
421                                }
422                            }
423                            _ => Some(Cow::Owned(String::from_utf8_lossy(inner).into_owned())),
424                        },
425                    },
426                };
427                // Move past the closing ']' and any following ASCII whitespace
428                s = &s[end_idx + 1..];
429                let mut skip = 0usize;
430                while skip < s.len() && s[skip].is_ascii_whitespace() {
431                    skip += 1;
432                }
433                s = &s[skip..];
434            }
435        }
436        s
437    } else {
438        &[] as &[u8]
439    };
440
441    let content_raw = Cow::Borrowed(content_slice);
442
443    Ok(Sqllog {
444        ts,
445        meta_raw,
446        content_raw,
447        tag,
448        encoding: encoding_hint,
449    })
450}
451
452// u64 掩码常量:验证时间戳格式 "20YY-MM-DD HH:MM:SS.mmm"
453// 字节位置:0('2'), 1('0'), 4('-'), 7('-'), 10(' '), 13(':'), 16(':'), 19('.')
454const LO_MASK: u64 = 0xFF0000FF0000FFFF; // data[0..8]:位置 0,1,4,7
455const LO_EXPECTED: u64 = 0x2D00002D00003032; // LE: '2'=0x32,'0'=0x30,'-'=0x2D,'-'=0x2D
456const HI_MASK: u64 = 0x0000FF0000FF0000; // data[8..16]:位置 10,13(偏移 2,5)
457const HI_EXPECTED: u64 = 0x00003A0000200000; // LE: ' '=0x20,':'=0x3A
458
459/// 检查 bytes[0..23] 是否符合时间戳格式 "20YY-MM-DD HH:MM:SS.mmm"。
460/// 调用前需确保 bytes.len() >= 23(由调用方做长度检查)。
461#[inline(always)]
462fn is_timestamp_start(bytes: &[u8]) -> bool {
463    debug_assert!(bytes.len() >= 23);
464    let lo = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
465    let hi = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
466    // 位置 16(':') 和 19('.') 用两次单字节比较(比第三次 u64 load 更清晰)
467    (lo & LO_MASK == LO_EXPECTED)
468        && (hi & HI_MASK == HI_EXPECTED)
469        && bytes[16] == b':'
470        && bytes[19] == b'.'
471}
472
473/// 将原始字节转换为 InvalidFormat 错误(错误路径,标注 cold 避免影响热路径代码布局)
474#[cold]
475fn make_invalid_format_error(raw_bytes: &[u8]) -> ParseError {
476    ParseError::InvalidFormat {
477        raw: String::from_utf8_lossy(raw_bytes).to_string(),
478    }
479}