dm_database_parser_sqllog/
parser.rs

1#[derive(Debug, Clone, PartialEq, Eq)]
2pub struct ParsedRecord<'a> {
3    pub ts: &'a str,
4    pub meta_raw: &'a str,
5    pub ep: Option<&'a str>,
6    pub sess: Option<&'a str>,
7    pub thrd: Option<&'a str>,
8    pub user: Option<&'a str>,
9    pub trxid: Option<&'a str>,
10    pub stmt: Option<&'a str>,
11    pub appname: Option<&'a str>,
12    pub ip: Option<&'a str>,
13    pub body: &'a str,
14    pub execute_time_ms: Option<u64>,
15    pub row_count: Option<u64>,
16    pub execute_id: Option<u64>,
17}
18
19/// 迭代器,从输入日志文本中产生记录切片(&str),不进行额外分配。
20///
21/// 详细说明:
22/// RecordSplitter 旨在以最小分配(零分配或极少分配)的方式,从整个日志文本中
23/// 按“记录”(record)边界切分并逐条返回。这里的“记录”由如下格式决定:每条记录
24/// 都以固定长度的时间戳开始(23 字符,格式 `YYYY-MM-DD HH:MM:SS.mmm`),且时间戳位于
25/// 行首(紧贴换行或文件开头),时间戳之后通常跟随一个空格和一个以圆括号包围的元信息,
26/// 然后是记录主体(body),记录主体可能跨多行。
27///
28/// 设计目标与不变式:
29/// - 尽量避免对每条记录进行拷贝;返回的是对原始输入字符串的切片(&str)。
30/// - 通过只扫描字节数组并使用简单的索引运算来保持最高性能。
31/// - 保持下列内部不变式以便 `next()` 实现更简单且安全:
32///   * `0 <= scan_pos <= n`,scan_pos 始终单调不减,用于向前搜索下一条记录的起始位置;
33///   * `next_start` 保存下一个要返回记录的起始索引(相对于 `text`),当为 None 时表示迭代结束;
34///   * `first_start` 如果存在,表示第一个合法记录起始索引(用于提取前导错误段);
35///   * `finished` 标志一旦为 true,表示迭代终止,不再返回任何记录。
36///
37/// 字段说明:
38/// - `text`:原始输入字符串(借用),返回的记录切片都指向该字符串。
39/// - `bytes`:`text.as_bytes()` 的借用,便于按字节高效检测时间戳等。
40/// - `n`:`text.len()`,用于边界判断。
41/// - `scan_pos`:下一次搜索开始的位置(相对于 `text`),用于寻找下一个时间戳起始;
42/// - `next_start`:下一次要返回记录的起始位置(Some(idx));
43/// - `finished`:迭代是否已结束的标志;
44/// - `first_start`:第一个记录的起始位置(用于提取文件起始处的前导错误文本)。
45///
46/// 算法与复杂度:
47/// - 构造时(`new`)只进行一次向前扫描,寻找第一个满足“行首+时间戳”的位置;该扫描为 O(n)。
48/// - `next()` 在所有记录上总共扫描一次文本(每个字节最多被检查常数次),总体时间为 O(n)。
49/// - 该实现避免在每条记录上分配新的字符串或 Vec,因此适合高吞吐场景。
50///
51/// 使用示例:
52/// ```ignore
53/// let splitter = RecordSplitter::new(&log_text);
54/// for rec in splitter { process(rec); }
55/// ```
56pub struct RecordSplitter<'a> {
57    text: &'a str,
58    bytes: &'a [u8],
59    n: usize,
60    // 扫描位置:始终单调不减
61    scan_pos: usize,
62    // 下一个要返回的记录的起始索引
63    next_start: Option<usize>,
64    // 是否已返回最后一条记录
65    finished: bool,
66    // 缓存的前缀(前导错误)结束索引
67    first_start: Option<usize>,
68}
69
70impl<'a> RecordSplitter<'a> {
71    pub fn new(text: &'a str) -> Self {
72        let bytes = text.as_bytes();
73        let n = text.len();
74        let mut first_start = None;
75        if n >= 23 {
76            // 在构造时寻找第一个可能的记录起始位置(减少后续第一轮迭代的开销)
77            // 我们按字节线性扫描:对于每个位置 pos,判断该位置是否为行首并且随后 23 字节是合法时间戳。
78            // 一旦找到第一个符合条件的位置就停止(first_start 用于提取前导错误)。
79            let limit = n.saturating_sub(23);
80            let mut pos = 0usize;
81            while pos <= limit {
82                if (pos == 0 || bytes[pos - 1] == b'\n')
83                    && crate::tools::is_ts_millis_bytes(&bytes[pos..pos + 23])
84                {
85                    first_start = Some(pos);
86                    break;
87                }
88                pos += 1;
89            }
90        }
91        // scan_pos 设置为 first_start+1,保证下一次搜索从第一个起始之后继续(避免重复检测)
92        let scan_pos = first_start.unwrap_or(0).saturating_add(1);
93        RecordSplitter {
94            text,
95            bytes,
96            n,
97            scan_pos,
98            next_start: first_start,
99            finished: false,
100            first_start,
101        }
102    }
103
104    /// 返回完整的前导错误文本切片(第一条记录之前的所有内容)
105    ///
106    /// 说明:当日志文件开头存在非记录内容(例如垃圾行或日志碎片)时,`first_start` 会
107    /// 指向第一个合法记录的起始位置;`leading_errors_slice()` 返回从文件开始到该位置的全部文本,
108    /// 便于调用者单独处理这部分错误/告警信息。
109    pub fn leading_errors_slice(&self) -> Option<&'a str> {
110        self.first_start.map(|s| &self.text[..s])
111    }
112}
113
114impl<'a> Iterator for RecordSplitter<'a> {
115    type Item = &'a str;
116
117    fn next(&mut self) -> Option<Self::Item> {
118        if self.finished {
119            return None;
120        }
121        let start = match self.next_start {
122            Some(s) => s,
123            None => {
124                self.finished = true;
125                return None;
126            }
127        };
128
129        // 扫描下一个记录的起始位置
130        // 逻辑:从当前 scan_pos 向前搜索下一个满足“行首+时间戳”的位置。
131        // 如果找到,则当前记录的结束位置为该 timestamp 的起始位置(end = pos),并把 next_start 设置为该 pos,
132        // 以便下一次调用返回后续记录;如果搜索到末尾未找到,则把剩余文本作为最后一条记录返回。
133        if self.scan_pos > self.n {
134            // 没有足够空间容纳另一个时间戳,返回剩余内容
135            self.finished = true;
136            return Some(&self.text[start..self.n]);
137        }
138        let limit = self.n.saturating_sub(23);
139        let mut pos = self.scan_pos;
140        while pos <= limit {
141            if (pos == 0 || self.bytes[pos - 1] == b'\n')
142                && crate::tools::is_ts_millis_bytes(&self.bytes[pos..pos + 23])
143            {
144                // 找到下一个起始位置
145                let end = pos;
146                // 为下一次调用做准备
147                self.next_start = Some(pos);
148                self.scan_pos = pos + 1;
149                return Some(&self.text[start..end]);
150            }
151            pos += 1;
152        }
153
154        // 没有下一个起始位置 => 返回最后一条记录
155        self.finished = true;
156        Some(&self.text[start..self.n])
157    }
158}
159
160/// 使用时间戳检测将完整日志文本拆分为记录。
161/// 返回 (records, leading_errors)。每条记录都是从 `text` 借用的切片。
162pub fn split_by_ts_records_with_errors<'a>(text: &'a str) -> (Vec<&'a str>, Vec<&'a str>) {
163    let mut records: Vec<&'a str> = Vec::new();
164    let mut errors: Vec<&'a str> = Vec::new();
165
166    let splitter = RecordSplitter::new(text);
167    if let Some(prefix) = splitter.leading_errors_slice() {
168        for line in prefix.lines() {
169            errors.push(line);
170        }
171    }
172    for rec in splitter {
173        records.push(rec);
174    }
175    (records, errors)
176}
177
178/// 拆分到调用者提供的容器以避免每次调用分配。
179///
180/// 该函数会清空并填充 `records` 和 `errors`。如果调用者在重复调用中重用这些
181/// 向量(例如在循环中),则可以避免每次调用分配新的 `Vec`。
182pub fn split_into<'a>(text: &'a str, records: &mut Vec<&'a str>, errors: &mut Vec<&'a str>) {
183    records.clear();
184    errors.clear();
185
186    let splitter = RecordSplitter::new(text);
187    if let Some(prefix) = splitter.leading_errors_slice() {
188        for line in prefix.lines() {
189            errors.push(line);
190        }
191    }
192    for rec in splitter {
193        records.push(rec);
194    }
195}
196
197/// 对记录进行流式处理,并对每条记录调用回调而不分配 Vec。
198/// 这是处理日志文本时分配最少的方式。
199pub fn for_each_record<F>(text: &str, mut f: F)
200where
201    F: FnMut(&str),
202{
203    let splitter = RecordSplitter::new(text);
204    // 对流式 API 忽略前导错误;如果需要,调用者可以通过 RecordSplitter::leading_errors_slice 检查它们。
205    if let Some(_prefix) = splitter.leading_errors_slice() {
206        // 在迭代之前释放前缀借用
207    }
208    for rec in splitter {
209        f(rec);
210    }
211}
212
213/// 解析每条记录并用 ParsedRecord 调用回调;与流式 Splitter 一起使用时实现零分配。
214pub fn parse_records_with<F>(text: &str, mut f: F)
215where
216    F: for<'r> FnMut(ParsedRecord<'r>),
217{
218    for_each_record(text, |rec| {
219        let parsed = parse_record(rec);
220        f(parsed);
221    });
222}
223
224/// 解析到调用方提供的 Vec 中以避免每次调用分配新的 Vec。
225pub fn parse_into<'a>(text: &'a str, out: &mut Vec<ParsedRecord<'a>>) {
226    out.clear();
227    let splitter = RecordSplitter::new(text);
228    for rec in splitter {
229        out.push(parse_record(rec));
230    }
231}
232
233/// 顺序解析所有记录并返回 ParsedRecord 的 Vec。
234pub fn parse_all(text: &str) -> Vec<ParsedRecord<'_>> {
235    let splitter = RecordSplitter::new(text);
236    splitter.map(|r| parse_record(r)).collect()
237}
238
239fn parse_digits_forward(s: &str, mut i: usize) -> Option<(u64, usize)> {
240    let bytes = s.as_bytes();
241    let n = bytes.len();
242    // 跳过非数字
243    while i < n && !bytes[i].is_ascii_digit() {
244        i += 1;
245    }
246    if i >= n || !bytes[i].is_ascii_digit() {
247        return None;
248    }
249    let mut val: u64 = 0;
250    while i < n && bytes[i].is_ascii_digit() {
251        val = val
252            .saturating_mul(10)
253            .saturating_add((bytes[i] - b'0') as u64);
254        i += 1;
255    }
256    Some((val, i))
257}
258
259// 辅助:将记录分割成 (ts, meta_raw, body),均为 &str(借用)
260fn split_ts_meta_body<'a>(rec: &'a str) -> (&'a str, &'a str, &'a str) {
261    let ts: &'a str = if rec.len() >= 23 { &rec[..23] } else { "" };
262    let after_ts: &'a str = if rec.len() > 23 { &rec[23..] } else { "" };
263    let mut meta_raw: &'a str = "";
264    let mut body: &'a str = "";
265
266    if let Some(open_idx) = after_ts.find('(') {
267        if let Some(close_rel) = after_ts[open_idx..].find(')') {
268            meta_raw = &after_ts[open_idx + 1..open_idx + close_rel];
269            let body_start = 23 + open_idx + close_rel + 1;
270            if body_start < rec.len() {
271                body = rec[body_start..].trim_start();
272            }
273        } else {
274            // 没有闭合括号:将剩余部分视为 body
275            body = after_ts;
276        }
277    } else {
278        // 没有元数据括号:时间戳之后的全部内容都是 body
279        body = after_ts;
280    }
281
282    (ts, meta_raw, body)
283}
284
285// 辅助:解析 meta_raw 中的各个字段,返回一个小结构
286#[derive(Debug)]
287struct MetaParts<'a> {
288    ep: Option<&'a str>,
289    sess: Option<&'a str>,
290    thrd: Option<&'a str>,
291    user: Option<&'a str>,
292    trxid: Option<&'a str>,
293    stmt: Option<&'a str>,
294    appname: Option<&'a str>,
295    ip: Option<&'a str>,
296}
297
298fn parse_meta(meta_raw: &str) -> MetaParts<'_> {
299    let mut parts = MetaParts {
300        ep: None,
301        sess: None,
302        thrd: None,
303        user: None,
304        trxid: None,
305        stmt: None,
306        appname: None,
307        ip: None,
308    };
309
310    let mut iter = meta_raw.split_whitespace().peekable();
311    while let Some(tok) = iter.next() {
312        if tok.starts_with("EP[") {
313            parts.ep = Some(tok);
314        } else if let Some(val) = tok.strip_prefix("sess:") {
315            parts.sess = Some(val);
316        } else if let Some(val) = tok.strip_prefix("thrd:") {
317            parts.thrd = Some(val);
318        } else if let Some(val) = tok.strip_prefix("user:") {
319            parts.user = Some(val);
320        } else if let Some(val) = tok.strip_prefix("trxid:") {
321            parts.trxid = Some(val);
322        } else if let Some(val) = tok.strip_prefix("stmt:") {
323            parts.stmt = Some(val);
324        } else if tok == "appname:" {
325            if let Some(next) = iter.peek() {
326                if (*next).starts_with("ip:::") {
327                    let nexttok = iter.next().unwrap();
328                    let ippart = nexttok.trim_start_matches("ip:::");
329                    let ipclean = ippart.trim_start_matches("ffff:");
330                    parts.ip = Some(ipclean);
331                    parts.appname = Some("");
332                } else {
333                    let val = iter.next().unwrap();
334                    parts.appname = Some(val);
335                }
336            } else {
337                parts.appname = Some("");
338            }
339        } else if let Some(val) = tok.strip_prefix("appname:") {
340            if val.starts_with("ip:::") {
341                let ippart = val.trim_start_matches("ip:::");
342                let ipclean = ippart.trim_start_matches("ffff:");
343                parts.ip = Some(ipclean);
344                parts.appname = Some("");
345            } else {
346                parts.appname = Some(val);
347            }
348        }
349    }
350
351    parts
352}
353
354// 辅助:从 body 末尾反向提取数值指标(EXEC_ID, ROWCOUNT, EXECTIME)
355fn parse_body_metrics(body: &str) -> (Option<u64>, Option<u64>, Option<u64>) {
356    let mut execute_id: Option<u64> = None;
357    let mut row_count: Option<u64> = None;
358    let mut execute_time_ms: Option<u64> = None;
359
360    let body_str = body;
361    let mut search_end = body_str.len();
362
363    if let Some(pos) = body_str[..search_end].rfind("EXEC_ID:") {
364        let start = pos + "EXEC_ID:".len();
365        if let Some((v, _)) = parse_digits_forward(body_str, start) {
366            execute_id = Some(v);
367        }
368        search_end = pos;
369    }
370
371    if let Some(pos) = body_str[..search_end].rfind("ROWCOUNT:") {
372        let start = pos + "ROWCOUNT:".len();
373        if let Some((v, _)) = parse_digits_forward(body_str, start) {
374            row_count = Some(v);
375        }
376        search_end = pos;
377    }
378
379    if let Some(pos) = body_str[..search_end].rfind("EXECTIME:") {
380        let start = pos + "EXECTIME:".len();
381        if let Some((v, _)) = parse_digits_forward(body_str, start) {
382            execute_time_ms = Some(v);
383        }
384    }
385
386    (execute_time_ms, row_count, execute_id)
387}
388
389/// 解析单条记录(由 split_by_ts_records_with_errors 生成)。
390/// 返回一个从输入 `rec` 借用的 ParsedRecord。
391pub fn parse_record(rec: &'_ str) -> ParsedRecord<'_> {
392    // 1) 将记录分割为 ts / meta_raw / body
393    let (ts, meta_raw, body) = split_ts_meta_body(rec);
394
395    // 2) 解析 meta 字段
396    let meta = parse_meta(meta_raw);
397
398    // 3) 从 body 解析数值指标
399    let (execute_time_ms, row_count, execute_id) = parse_body_metrics(body);
400
401    ParsedRecord {
402        ts,
403        meta_raw,
404        ep: meta.ep,
405        sess: meta.sess,
406        thrd: meta.thrd,
407        user: meta.user,
408        trxid: meta.trxid,
409        stmt: meta.stmt,
410        appname: meta.appname,
411        ip: meta.ip,
412        body,
413        execute_time_ms,
414        row_count,
415        execute_id,
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn test_split_by_ts_records() {
425        let log_text = "2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT * FROM users
4262023-10-05 14:24:00.456 (EP[12346] sess:2 thrd:2 user:guest trxid:0 stmt:2 appname:MyApp)\nINSERT INTO orders VALUES (1, 'item');\n";
427        let (records, errors) = split_by_ts_records_with_errors(log_text);
428
429        assert_eq!(records.len(), 2);
430        assert_eq!(errors.len(), 0);
431    }
432
433    #[test]
434    fn test_split_with_leading_errors() {
435        let log_text = "garbage line 1\ngarbage line 2\n2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
436        let (records, errors) = split_by_ts_records_with_errors(log_text);
437
438        assert_eq!(records.len(), 1);
439        assert_eq!(errors.len(), 2);
440        assert!(records[0].contains("SELECT 1"));
441    }
442
443    #[test]
444    fn test_record_splitter_iterator() {
445        let log_text =
446            "garbage\n2023-10-05 14:23:45.123 (EP[1]) foo\n2023-10-05 14:23:46.456 (EP[2]) bar\n";
447        let it = RecordSplitter::new(log_text);
448        assert_eq!(it.leading_errors_slice().unwrap().trim(), "garbage");
449        let v: Vec<&str> = it.collect();
450        assert_eq!(v.len(), 2);
451    }
452
453    #[test]
454    fn test_parse_simple_log_sample() {
455        let log_text = "2025-08-12 10:57:09.562 (EP[0] sess:0x7fb24f392a30 thrd:757794 user:HBTCOMS_V3_PROD trxid:688489653 stmt:0x7fb236077b70 appname: ip:::ffff:10.3.100.68) EXECTIME: 0ms ROWCOUNT: 1 EXEC_ID: 289655185\n2025-08-12 10:57:09.562 (EP[0] sess:0x7fb24f392a30 thrd:757794 user:HBTCOMS_V3_PROD trxid:0 stmt:NULL appname:) TRX: START\n";
456
457        let (records, errors) = split_by_ts_records_with_errors(log_text);
458        assert_eq!(errors.len(), 0);
459        assert_eq!(records.len(), 2);
460
461        let r0 = parse_record(records[0]);
462        assert_eq!(r0.execute_time_ms, Some(0));
463        assert_eq!(r0.row_count, Some(1));
464        assert_eq!(r0.execute_id, Some(289655185));
465        assert_eq!(r0.ip, Some("10.3.100.68"));
466        assert_eq!(r0.appname, Some(""));
467
468        let r1 = parse_record(records[1]);
469        assert!(r1.body.contains("TRX: START"));
470    }
471}
472