Skip to main content

dm_database_parser_sqllog/
parser.rs

1use memchr::memmem::Finder;
2use memchr::{memchr, memrchr};
3use std::fs;
4use std::path::Path;
5use std::path::PathBuf;
6use std::str;
7use std::sync::LazyLock;
8
9use crate::error::ParseError;
10use crate::sqllog;
11use crate::sqllog::Sqllog;
12use encoding::all::GB18030;
13use encoding::{DecoderTrap, Encoding};
14
15/// Pre-built SIMD searcher for the `") "` meta-close pattern.
16static FINDER_CLOSE_META: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b") "));
17
18/// Pre-built SIMD searcher for the `"\n20"` record-start pattern.
19static FINDER_RECORD_START: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"\n20"));
20
21/// 文件编码提示,用于指示日志文件的字符编码。
22#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
23pub enum FileEncodingHint {
24    /// 自动探测编码(默认行为)
25    #[default]
26    Auto,
27    /// 文件使用 UTF-8 编码
28    Utf8,
29    /// 文件使用 GB18030 编码
30    Gb18030,
31}
32
33/// SQL 日志文件解析器。
34///
35/// 通过 [`LogParserBuilder`] 构建实例。内部将整个文件读入内存,
36/// 自动检测文件编码(UTF-8 或 GB18030)。
37pub struct LogParser {
38    data: Vec<u8>,
39    encoding: FileEncodingHint,
40}
41
42/// 配置并构建 [`LogParser`] 的构建器模式 API。
43pub struct LogParserBuilder {
44    path: PathBuf,
45    encoding_hint: Option<FileEncodingHint>,
46}
47
48impl LogParserBuilder {
49    /// 创建一个新的 `LogParserBuilder`。
50    pub fn new<P: AsRef<Path>>(path: P) -> Self {
51        Self {
52            path: path.as_ref().to_path_buf(),
53            encoding_hint: None,
54        }
55    }
56
57    /// 设置文件编码提示。
58    pub fn encoding_hint(mut self, hint: FileEncodingHint) -> Self {
59        self.encoding_hint = Some(hint);
60        self
61    }
62
63    /// 构建并返回 [`LogParser`] 实例。
64    pub fn build(self) -> Result<LogParser, ParseError> {
65        let data = fs::read(&self.path)
66            .map_err(|e| ParseError::IoError(e.to_string()))?;
67
68        let encoding = match self.encoding_hint {
69            Some(hint) => hint,
70            None => {
71                // 自动编码探测:采样头部 64KB 和尾部 4KB
72                let head_size = data.len().min(64 * 1024);
73                let head_ok = str::from_utf8(&data[..head_size]).is_ok();
74                let tail_start = data.len().saturating_sub(4 * 1024).max(head_size);
75                let tail_ok = tail_start >= data.len()
76                    || str::from_utf8(&data[tail_start..]).is_ok();
77                if head_ok && tail_ok {
78                    FileEncodingHint::Utf8
79                } else {
80                    FileEncodingHint::Gb18030
81                }
82            }
83        };
84
85        Ok(LogParser { data, encoding })
86    }
87}
88
89impl LogParser {
90    /// 返回顺序迭代器。
91    pub fn iter(&self) -> LogIterator<'_> {
92        LogIterator {
93            data: &self.data,
94            pos: 0,
95            encoding: self.encoding,
96            line_number: 1,
97        }
98    }
99}
100
101/// SQL 日志记录的顺序迭代器。
102pub struct LogIterator<'a> {
103    data: &'a [u8],
104    pos: usize,
105    encoding: FileEncodingHint,
106    line_number: u64,
107}
108
109impl<'a> LogIterator<'a> {
110    /// 返回一个跳过解析错误的迭代器。
111    pub fn skip_errors(self) -> impl Iterator<Item = Sqllog> + 'a {
112        self.filter_map(Result::ok)
113    }
114
115    /// 过滤出执行时间大于等于 `min_ms` 毫秒的记录。
116    pub fn filter_by_exec_time(
117        self,
118        min_ms: u64,
119    ) -> impl Iterator<Item = Result<Sqllog, ParseError>> + 'a {
120        let threshold = min_ms as f32;
121        self.filter(move |item| match item {
122            Ok(sqllog) => sqllog.exectime >= threshold,
123            Err(_) => false,
124        })
125    }
126
127    /// 过滤出 SQL 语句体包含指定 `pattern` 的记录。
128    pub fn filter_by_sql_contains(
129        self,
130        pattern: &str,
131    ) -> impl Iterator<Item = Result<Sqllog, ParseError>> + 'a {
132        let pattern = pattern.to_string();
133        self.filter(move |item| match item {
134            Ok(sqllog) => sqllog.sql.contains(&pattern),
135            Err(_) => false,
136        })
137    }
138}
139
140impl<'a> Iterator for LogIterator<'a> {
141    type Item = Result<Sqllog, ParseError>;
142
143    fn next(&mut self) -> Option<Self::Item> {
144        loop {
145            if self.pos >= self.data.len() {
146                return None;
147            }
148
149            let data = &self.data[self.pos..];
150            let current_line = self.line_number;
151
152            let (record_end, next_start) = match memchr(b'\n', data) {
153                None => (data.len(), data.len()),
154                Some(first_nl) => {
155                    let ts_start = first_nl + 1;
156                    if ts_start + 23 <= data.len()
157                        && is_timestamp_start(&data[ts_start..ts_start + 23])
158                    {
159                        (first_nl, ts_start)
160                    } else {
161                        // 多行记录:用 memmem 跳过嵌入换行继续搜索
162                        let mut found_boundary: Option<usize> = None;
163                        for candidate in FINDER_RECORD_START.find_iter(&data[ts_start..]) {
164                            let abs_ts = ts_start + candidate + 1;
165                            if abs_ts + 23 <= data.len()
166                                && is_timestamp_start(&data[abs_ts..abs_ts + 23])
167                            {
168                                found_boundary = Some(ts_start + candidate);
169                                break;
170                            }
171                        }
172                        match found_boundary {
173                            Some(idx) => (idx, idx + 1),
174                            None => (data.len(), data.len()),
175                        }
176                    }
177                }
178            };
179
180            let record_slice = &data[..record_end];
181            self.pos += next_start;
182
183            self.line_number += data[..next_start].iter().filter(|&&b| b == b'\n').count() as u64;
184
185            // Trim trailing CR
186            let record_slice = if record_slice.ends_with(b"\r") {
187                &record_slice[..record_slice.len() - 1]
188            } else {
189                record_slice
190            };
191
192            if record_slice.is_empty() {
193                continue;
194            }
195
196            return Some(parse_record_with_hint(
197                record_slice,
198                self.encoding,
199                current_line,
200            ));
201        }
202    }
203}
204
205/// 从原始字节解析单条 SQL 日志记录。
206///
207/// 自动检测多行模式。适合已从文件中读出完整记录的调用方。
208pub fn parse_record(record_bytes: &[u8]) -> Result<Sqllog, ParseError> {
209    parse_record_with_hint(record_bytes, FileEncodingHint::Auto, 0)
210}
211
212/// 核心解析函数:从原始字节一次性解析全部字段到 Sqllog。
213fn parse_record_with_hint(
214    record_bytes: &[u8],
215    encoding_hint: FileEncodingHint,
216    line_number: u64,
217) -> Result<Sqllog, ParseError> {
218    // 检测是否多行
219    let is_multiline = memchr(b'\n', record_bytes).is_some();
220
221    // 找到第一行
222    let first_line = if is_multiline {
223        match memchr(b'\n', record_bytes) {
224            Some(idx) => {
225                let mut line = &record_bytes[..idx];
226                if line.ends_with(b"\r") {
227                    line = &line[..line.len() - 1];
228                }
229                line
230            }
231            None => {
232                let mut line = record_bytes;
233                if line.ends_with(b"\r") {
234                    line = &line[..line.len() - 1];
235                }
236                line
237            }
238        }
239    } else {
240        let mut line = record_bytes;
241        if line.ends_with(b"\r") {
242            line = &line[..line.len() - 1];
243        }
244        line
245    };
246
247    // ── 1. 时间戳 ──
248    if first_line.len() < 23 {
249        return Err(make_invalid_format_error(first_line, line_number));
250    }
251    let ts = match str::from_utf8(&first_line[0..23]) {
252        Ok(s) => s.to_string(),
253        Err(_) => return Err(make_invalid_format_error(first_line, line_number)),
254    };
255
256    // ── 2. 元数据 ──
257    let meta_start = match memchr(b'(', &first_line[23..]) {
258        Some(idx) => 23 + idx,
259        None => return Err(make_invalid_format_error(first_line, line_number)),
260    };
261
262    let meta_end = match FINDER_CLOSE_META.find(&first_line[meta_start..]) {
263        Some(idx) => Some(meta_start + idx),
264        None => memrchr(b')', &first_line[meta_start..]).map(|idx| meta_start + idx),
265    };
266
267    let meta_end = match meta_end {
268        Some(idx) => idx,
269        None => return Err(make_invalid_format_error(first_line, line_number)),
270    };
271
272    let meta_bytes = &first_line[meta_start + 1..meta_end];
273
274    // 解析元数据(考虑编码)
275    let (ep, sess_id, thrd_id, username, trxid, statement, appname, client_ip) =
276        match encoding_hint {
277            FileEncodingHint::Utf8 => {
278                sqllog::parse_meta_from_bytes(meta_bytes)
279            }
280            FileEncodingHint::Auto => {
281                // Auto: try UTF-8 first, then GB18030 fallback
282                match str::from_utf8(meta_bytes) {
283                    Ok(_) => sqllog::parse_meta_from_bytes(meta_bytes),
284                    Err(_) => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
285                        Ok(decoded) => sqllog::parse_meta_from_bytes(decoded.as_bytes()),
286                        Err(_) => {
287                            let lossy = String::from_utf8_lossy(meta_bytes).into_owned();
288                            sqllog::parse_meta_from_bytes(lossy.as_bytes())
289                        }
290                    },
291                }
292            }
293            FileEncodingHint::Gb18030 => {
294                match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
295                    Ok(decoded) => sqllog::parse_meta_from_bytes(decoded.as_bytes()),
296                    Err(_) => {
297                        let lossy = String::from_utf8_lossy(meta_bytes).into_owned();
298                        sqllog::parse_meta_from_bytes(lossy.as_bytes())
299                    }
300                }
301            }
302        };
303
304    // ── 3. Body 和 Indicators ──
305    let body_start_in_first_line = meta_end + 1;
306
307    let content_start = if body_start_in_first_line < first_line.len()
308        && first_line[body_start_in_first_line] == b' '
309    {
310        body_start_in_first_line + 1
311    } else {
312        body_start_in_first_line
313    };
314
315    // 提取可选的标签 [SEL] / [ORA]
316    let mut tag: Option<String> = None;
317    let content_slice = if content_start < record_bytes.len() {
318        let mut s = &record_bytes[content_start..];
319        if !s.is_empty()
320            && s[0] == b'['
321            && let Some(end_idx) = memchr(b']', s)
322            && end_idx >= 1
323        {
324            let inner = &s[1..end_idx];
325            if !inner.contains(&b' ') && inner.len() <= 32 {
326                tag = match encoding_hint {
327                    FileEncodingHint::Utf8 => {
328                        str::from_utf8(inner).ok().map(|t| t.to_string())
329                    }
330                    FileEncodingHint::Auto => {
331                        str::from_utf8(inner).ok().map(|t| t.to_string())
332                            .or_else(|| {
333                                GB18030.decode(inner, DecoderTrap::Strict)
334                                    .ok()
335                            })
336                    }
337                    FileEncodingHint::Gb18030 => {
338                        GB18030.decode(inner, DecoderTrap::Strict)
339                            .ok()
340                            .or_else(|| str::from_utf8(inner).ok().map(|s| s.to_string()))
341                    }
342                };
343                // 跳过 ']' 及后续空白
344                s = &s[end_idx + 1..];
345                let mut skip = 0usize;
346                while skip < s.len() && s[skip].is_ascii_whitespace() {
347                    skip += 1;
348                }
349                s = &s[skip..];
350            }
351        }
352        s
353    } else {
354        &[] as &[u8]
355    };
356
357    // 分割 body 和 indicators
358    let split = sqllog::find_indicators_split(content_slice);
359    let body_bytes = &content_slice[..split];
360    let ind_bytes = &content_slice[split..];
361
362    // 解码 body
363    let sql_raw = match encoding_hint {
364        FileEncodingHint::Utf8 => {
365            String::from_utf8_lossy(body_bytes).into_owned()
366        }
367        FileEncodingHint::Auto => {
368            match str::from_utf8(body_bytes) {
369                Ok(s) => s.to_string(),
370                Err(_) => match GB18030.decode(body_bytes, DecoderTrap::Strict) {
371                    Ok(s) => s,
372                    Err(_) => String::from_utf8_lossy(body_bytes).into_owned(),
373                },
374            }
375        }
376        FileEncodingHint::Gb18030 => {
377            match GB18030.decode(body_bytes, DecoderTrap::Strict) {
378                Ok(s) => s,
379                Err(_) => String::from_utf8_lossy(body_bytes).into_owned(),
380            }
381        }
382    };
383
384    // 处理 ORA 前缀
385    let sql = if tag.as_deref() == Some("ORA") {
386        sql_raw.strip_prefix(": ").unwrap_or(&sql_raw).to_string()
387    } else {
388        sql_raw
389    };
390
391    // 解析性能指标
392    let (exectime, rowcount, exec_id) = sqllog::parse_indicators_from_bytes(ind_bytes);
393
394    Ok(Sqllog {
395        ts,
396        tag,
397        ep,
398        sess_id,
399        thrd_id,
400        username,
401        trxid,
402        statement,
403        appname,
404        client_ip,
405        sql,
406        exectime,
407        rowcount,
408        exec_id,
409    })
410}
411
412// ── 时间戳验证 ──────────────────────────────────────────────────────────────
413
414const LO_MASK: u64 = 0xFF0000FF0000FFFF;
415const LO_EXPECTED: u64 = 0x2D00002D00003032;
416const HI_MASK: u64 = 0x0000FF0000FF0000;
417const HI_EXPECTED: u64 = 0x00003A0000200000;
418
419/// 检查 bytes[0..23] 是否符合时间戳格式 "20YY-MM-DD HH:MM:SS.mmm"。
420#[inline(always)]
421fn is_timestamp_start(bytes: &[u8]) -> bool {
422    debug_assert!(bytes.len() >= 23);
423    let lo = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
424    let hi = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
425    (lo & LO_MASK == LO_EXPECTED)
426        && (hi & HI_MASK == HI_EXPECTED)
427        && bytes[16] == b':'
428        && bytes[19] == b'.'
429}
430
431#[cold]
432fn make_invalid_format_error(raw_bytes: &[u8], line_number: u64) -> ParseError {
433    ParseError::InvalidFormat {
434        raw: String::from_utf8_lossy(raw_bytes).to_string(),
435        line_number,
436    }
437}
438
439// ── 测试 ────────────────────────────────────────────────────────────────────
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444
445    #[test]
446    fn test_is_timestamp_start_valid() {
447        let ts = b"2025-11-17 16:09:41.123";
448        assert!(is_timestamp_start(ts));
449    }
450
451    #[test]
452    fn test_is_timestamp_start_wrong_year_prefix() {
453        let ts = b"1025-11-17 16:09:41.123";
454        assert!(!is_timestamp_start(ts));
455    }
456
457    #[test]
458    fn test_is_timestamp_start_wrong_month_separator() {
459        let ts = b"2025X11-17 16:09:41.123";
460        assert!(!is_timestamp_start(ts));
461    }
462
463    #[test]
464    fn test_is_timestamp_start_wrong_second_separator() {
465        let ts = b"2025-11-17 16:09X41.123";
466        assert!(!is_timestamp_start(ts));
467    }
468
469    #[test]
470    fn test_is_timestamp_start_wrong_millis_separator() {
471        let ts = b"2025-11-17 16:09:41X123";
472        assert!(!is_timestamp_start(ts));
473    }
474
475    #[test]
476    fn test_is_timestamp_start_exactly_23_bytes() {
477        let ts = b"2025-11-17 16:09:41.123";
478        assert_eq!(ts.len(), 23);
479        assert!(is_timestamp_start(ts));
480    }
481
482    #[test]
483    fn test_is_timestamp_start_trailing_garbage() {
484        let ts = b"2025-11-17 16:09:41.123extra_garbage_here";
485        assert!(is_timestamp_start(ts));
486    }
487
488    #[cfg(not(miri))]
489    #[test]
490    fn test_builder_encoding_hint_utf8() {
491        use std::io::Write;
492        use tempfile::NamedTempFile;
493
494        let mut tmp = NamedTempFile::new().expect("tmp");
495        write!(
496            tmp,
497            "2025-11-17 16:09:41.123 (EP[0] sess:1 thrd:2 user:u trxid:3 stmt:4 appname:a) SELECT 1"
498        )
499        .unwrap();
500        tmp.as_file().sync_all().unwrap();
501
502        let parser = LogParserBuilder::new(tmp.path())
503            .encoding_hint(FileEncodingHint::Utf8)
504            .build()
505            .expect("build");
506        let record = parser.iter().next().unwrap().unwrap();
507        assert_eq!(record.ts, "2025-11-17 16:09:41.123");
508        assert!(record.sql.contains("SELECT 1"));
509    }
510
511    #[cfg(not(miri))]
512    #[test]
513    fn test_builder_file_not_found() {
514        let result = LogParserBuilder::new("/nonexistent/path.log").build();
515        assert!(result.is_err());
516        match result {
517            Err(ParseError::IoError(_)) => {}
518            _ => panic!("Expected IoError on nonexistent file"),
519        }
520    }
521}