Skip to main content

dm_database_parser_sqllog/
sqllog.rs

1use atoi::atoi;
2use encoding::DecoderTrap;
3use encoding::Encoding;
4use encoding::all::GB18030;
5use memchr::memchr;
6use memchr::memmem::Finder;
7use memchr::memrchr;
8use simdutf8::basic::from_utf8 as simd_from_utf8;
9use std::borrow::Cow;
10use std::sync::LazyLock;
11
12use crate::parser::FileEncodingHint;
13
14/// Pre-built SIMD finders for performance indicators — avoids per-call initialization.
15static FINDER_EXECTIME: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"EXECTIME:"));
16static FINDER_ROWCOUNT: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"ROWCOUNT:"));
17static FINDER_EXEC_ID: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"EXEC_ID:"));
18
19/// Maximum byte length of an indicators section.
20/// Typical indicators ("EXECTIME: x(ms) ROWCOUNT: y(rows) EXEC_ID: z.") are ≤ 80 bytes.
21/// 256 is a conservative upper bound that covers unusual padding or long EXEC_ID values.
22const INDICATORS_WINDOW: usize = 256;
23
24/// SQL 日志记录
25///
26/// 表示一条完整的 SQL 日志记录,包含时间戳、元数据、SQL 语句体和可选的性能指标。
27#[derive(Debug, Clone, PartialEq, Default)]
28pub struct Sqllog<'a> {
29    /// 时间戳,格式为 "YYYY-MM-DD HH:MM:SS.mmm"
30    pub ts: Cow<'a, str>,
31
32    /// 原始元数据字节(延迟解析)
33    pub meta_raw: Cow<'a, str>,
34
35    /// 原始内容(包含 Body 和 Indicators),延迟分割和解析
36    pub content_raw: Cow<'a, [u8]>,
37
38    /// 提取出的方括号标签(例如 [SEL]、[ORA]),若无则为 None
39    pub tag: Option<Cow<'a, str>>,
40
41    /// 文件级编码 hint(由 parser 探测),用于正确解码 content
42    pub(crate) encoding: FileEncodingHint,
43}
44
45impl<'a> Sqllog<'a> {
46    // ── 公开 API ─────────────────────────────────────────────────────────────
47
48    /// 获取 SQL 语句体(延迟分割)
49    pub fn body(&self) -> Cow<'a, str> {
50        let split = self.find_indicators_split();
51        let is_borrowed = matches!(&self.content_raw, Cow::Borrowed(_));
52        // SAFETY: body_bytes 是 content_raw 的子切片,与 content_raw 共享 'a 生命周期
53        unsafe { decode_content_bytes(&self.content_raw[..split], is_borrowed, self.encoding) }
54    }
55
56    /// 获取 SQL 语句体的长度(不做 UTF-8 校验,不分配)
57    #[inline]
58    pub fn body_len(&self) -> usize {
59        self.find_indicators_split()
60    }
61
62    /// 获取 SQL 语句体的原始字节切片(不分配)
63    #[inline]
64    pub fn body_bytes(&self) -> &[u8] {
65        &self.content_raw[..self.find_indicators_split()]
66    }
67
68    /// 获取原始性能指标字符串(延迟分割)
69    pub fn indicators_raw(&self) -> Option<Cow<'a, str>> {
70        let split = self.find_indicators_split();
71        let ind_bytes = &self.content_raw[split..];
72        if ind_bytes.is_empty() {
73            return None;
74        }
75        let is_borrowed = matches!(&self.content_raw, Cow::Borrowed(_));
76        // SAFETY: ind_bytes 是 content_raw 的子切片,与 content_raw 共享 'a 生命周期
77        Some(unsafe { decode_content_bytes(ind_bytes, is_borrowed, self.encoding) })
78    }
79
80    /// 解析性能指标(sql 字段为空字符串)
81    pub fn parse_indicators(&self) -> Option<PerformanceMetrics<'static>> {
82        let ind_bytes = &self.content_raw[self.find_indicators_split()..];
83        if ind_bytes.is_empty() {
84            return None;
85        }
86        parse_indicators_from_bytes(ind_bytes)
87    }
88
89    /// 解析性能指标和 SQL 语句
90    ///
91    /// 返回包含 EXECTIME、ROWCOUNT、EXEC_ID 和 SQL 语句的 [`PerformanceMetrics`]。
92    ///
93    /// 当 tag 为 `"ORA"` 时,SQL 语句开头可能带有 `": "`,本方法会自动去除。
94    ///
95    /// # 实现说明
96    /// 仅调用一次 `find_indicators_split()`,body 解码与 indicators 解析均在同一
97    /// 次遍历中完成,`Cow::Borrowed` 路径全程零分配。
98    #[inline(always)]
99    pub fn parse_performance_metrics(&self) -> PerformanceMetrics<'a> {
100        let split = self.find_indicators_split();
101        let is_borrowed = matches!(&self.content_raw, Cow::Borrowed(_));
102
103        // SAFETY: 子切片与 content_raw 共享 'a 生命周期
104        let sql_raw =
105            unsafe { decode_content_bytes(&self.content_raw[..split], is_borrowed, self.encoding) };
106
107        let sql = if self.tag.as_deref() == Some("ORA") {
108            strip_ora_prefix(sql_raw)
109        } else {
110            sql_raw
111        };
112
113        let mut pm = parse_indicators_from_bytes(&self.content_raw[split..]).unwrap_or_default();
114        pm.sql = sql;
115        pm
116    }
117
118    /// 解析元数据
119    pub fn parse_meta(&self) -> MetaParts<'a> {
120        let meta_bytes = self.meta_raw.as_bytes();
121        let mut meta = MetaParts::default();
122        let len = meta_bytes.len();
123        let is_borrowed = matches!(&self.meta_raw, Cow::Borrowed(_));
124
125        let to_cow = |bytes: &[u8]| -> Cow<'a, str> {
126            if is_borrowed {
127                // For Utf8 / Auto encoding: meta_raw is Cow::Borrowed — bytes is a sub-slice
128                // of the memory-mapped buffer that lives for 'a.  The file was validated as
129                // UTF-8 during `from_path`, so the unchecked conversion is sound.
130                unsafe {
131                    Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
132                        bytes.as_ptr(),
133                        bytes.len(),
134                    )))
135                }
136            } else {
137                // For Gb18030 / Auto-fallback encoding: meta_raw is Cow::Owned (already decoded
138                // to a valid UTF-8 String).  We must NOT transmute the lifetime to 'a because
139                // the Owned String lives only as long as `self`, not 'a.  Return an owned copy.
140                Cow::Owned(
141                    std::str::from_utf8(bytes)
142                        .expect("meta_raw is always valid UTF-8")
143                        .to_string(),
144                )
145            }
146        };
147
148        let mut idx = 0;
149        while idx < len {
150            // Skip whitespace
151            while idx < len && meta_bytes[idx] == b' ' {
152                idx += 1;
153            }
154            if idx >= len {
155                break;
156            }
157
158            // Find token end
159            let start = idx;
160            while idx < len && meta_bytes[idx] != b' ' {
161                idx += 1;
162            }
163            let part = &meta_bytes[start..idx];
164
165            // Parse EP[n]
166            if part.len() > 4
167                && part[0] == b'E'
168                && part[1] == b'P'
169                && part[2] == b'['
170                && part[part.len() - 1] == b']'
171            {
172                if let Some(ep) = atoi::<u8>(&part[3..part.len() - 1]) {
173                    meta.ep = ep;
174                }
175                continue;
176            }
177
178            // Find ':'
179            if let Some(sep) = memchr(b':', part) {
180                let key = &part[..sep];
181                let val = &part[sep + 1..];
182
183                match key {
184                    b"sess" => meta.sess_id = to_cow(val),
185                    b"thrd" => meta.thrd_id = to_cow(val),
186                    b"user" => meta.username = to_cow(val),
187                    b"trxid" => meta.trxid = to_cow(val),
188                    b"stmt" => meta.statement = to_cow(val),
189                    b"ip" => meta.client_ip = to_cow(val),
190                    b"appname" => {
191                        if !val.is_empty() {
192                            meta.appname = to_cow(val);
193                        } else {
194                            // Peek next token; treat it as appname only if it is not an ip field
195                            let mut peek = idx;
196                            while peek < len && meta_bytes[peek] == b' ' {
197                                peek += 1;
198                            }
199                            if peek < len {
200                                let peek_start = peek;
201                                while peek < len && meta_bytes[peek] != b' ' {
202                                    peek += 1;
203                                }
204                                let next = &meta_bytes[peek_start..peek];
205                                if !(next.starts_with(b"ip:") || next.starts_with(b"ip::")) {
206                                    meta.appname = to_cow(next);
207                                    idx = peek;
208                                }
209                            }
210                        }
211                    }
212                    _ => {}
213                }
214            }
215        }
216        meta
217    }
218
219    // ── Private helpers ───────────────────────────────────────────────────────
220
221    fn find_indicators_split(&self) -> usize {
222        let data = &self.content_raw;
223        let len = data.len();
224
225        // HOT-01: O(1) 早退 — DM 格式中有指标的记录以 '.' 结尾(EXEC_ID: N.)
226        // 或以 ')' 结尾(仅 EXECTIME/ROWCOUNT,格式为 N(ms)/N(rows))。
227        // 跳过末尾 \n/\r,取最后一个有效字节;既非 '.' 也非 ')' 则无指标,直接返回。
228        let last_meaningful = data
229            .iter()
230            .rev()
231            .find(|&&b| b != b'\n' && b != b'\r')
232            .copied();
233        if last_meaningful != Some(b'.') && last_meaningful != Some(b')') {
234            return len;
235        }
236
237        let start = len.saturating_sub(INDICATORS_WINDOW);
238        let window = &data[start..];
239
240        // HOT-02: 单次反向扫描 ':' 字节,检查关键字前缀,记录最左命中位置。
241        // 替代 3 次独立 FinderRev::rfind 调用,减少 SIMD 启动开销。
242        let earliest = scan_earliest_indicator(window);
243
244        let split = start + earliest;
245        // CORR-03 验证守卫:假阳性(如 SQL 以指标关键字结尾)时 fallback 到全文。
246        if split < len && parse_indicators_from_bytes(&data[split..]).is_none() {
247            return len;
248        }
249        split
250    }
251}
252
253// ── Module-level helpers ──────────────────────────────────────────────────────
254
255/// 在 window 内单次反向扫描 ':' 字节,匹配已知指标关键字前缀。
256///
257/// 对每个关键字只取最右命中(即从右向左扫描的第一次命中),等价于原 FinderRev::rfind 语义。
258/// 返回三个关键字最右命中中,起始位置最小(最左)的那个。
259/// 若无任何命中则返回 `window.len()`(表示无分割点)。
260///
261/// 关键字长度:EXECTIME = 8,ROWCOUNT = 8,EXEC_ID = 7。
262fn scan_earliest_indicator(window: &[u8]) -> usize {
263    // 分别记录三个关键字的最右命中起始位置(None 表示未命中)
264    let mut exectime_pos: Option<usize> = None;
265    let mut rowcount_pos: Option<usize> = None;
266    let mut exec_id_pos: Option<usize> = None;
267
268    let mut search_end = window.len();
269    while search_end > 0 {
270        // 所有关键字均已找到最右命中,无需继续向左
271        if exectime_pos.is_some() && rowcount_pos.is_some() && exec_id_pos.is_some() {
272            break;
273        }
274        match memrchr(b':', &window[..search_end]) {
275            None => break,
276            Some(colon) => {
277                let prefix = &window[..colon];
278                if exectime_pos.is_none() && prefix.ends_with(b"EXECTIME") {
279                    exectime_pos = Some(colon - 8);
280                } else if rowcount_pos.is_none() && prefix.ends_with(b"ROWCOUNT") {
281                    rowcount_pos = Some(colon - 8);
282                } else if exec_id_pos.is_none() && prefix.ends_with(b"EXEC_ID") {
283                    exec_id_pos = Some(colon - 7);
284                }
285                search_end = colon;
286            }
287        }
288    }
289
290    // 取三者中最左(最小索引)的命中,无命中则返回 window.len()
291    [exectime_pos, rowcount_pos, exec_id_pos]
292        .into_iter()
293        .flatten()
294        .min()
295        .unwrap_or(window.len())
296}
297
298/// Decode a sub-slice of `content_raw` bytes into a `Cow<'a, str>`.
299///
300/// # Safety
301/// `bytes` must be a sub-slice of a `'a`-lived allocation (i.e., the original
302/// `Cow::Borrowed(&'a [u8])`). The caller guarantees this by passing `is_borrowed = true`
303/// only when the source `Cow` is `Borrowed`.
304#[inline]
305unsafe fn decode_content_bytes<'a>(
306    bytes: &[u8],
307    is_borrowed: bool,
308    encoding: FileEncodingHint,
309) -> Cow<'a, str> {
310    match encoding {
311        FileEncodingHint::Utf8 => {
312            // File was already validated as UTF-8 during `from_path`; skip per-slice re-validation.
313            if is_borrowed {
314                unsafe {
315                    Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
316                        bytes.as_ptr(),
317                        bytes.len(),
318                    )))
319                }
320            } else {
321                unsafe { Cow::Owned(std::str::from_utf8_unchecked(bytes).to_string()) }
322            }
323        }
324        FileEncodingHint::Auto => match simd_from_utf8(bytes) {
325            Ok(_) => {
326                if is_borrowed {
327                    unsafe {
328                        Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
329                            bytes.as_ptr(),
330                            bytes.len(),
331                        )))
332                    }
333                } else {
334                    unsafe { Cow::Owned(std::str::from_utf8_unchecked(bytes).to_string()) }
335                }
336            }
337            Err(_) => Cow::Owned(String::from_utf8_lossy(bytes).into_owned()),
338        },
339        FileEncodingHint::Gb18030 => match GB18030.decode(bytes, DecoderTrap::Strict) {
340            Ok(s) => Cow::Owned(s),
341            Err(_) => Cow::Owned(String::from_utf8_lossy(bytes).into_owned()),
342        },
343    }
344}
345
346/// Parse `EXECTIME`, `ROWCOUNT`, `EXEC_ID` from a raw indicators byte slice.
347/// The `sql` field of the returned struct is left as the default empty string.
348/// Returns `None` if none of the three fields are present.
349fn parse_indicators_from_bytes(ind: &[u8]) -> Option<PerformanceMetrics<'static>> {
350    if ind.is_empty() {
351        return None;
352    }
353
354    let mut out = PerformanceMetrics::default();
355    let mut found = false;
356
357    if let Some(idx) = FINDER_EXECTIME.find(ind) {
358        let ss = idx + 9;
359        if let Some(pi) = memchr(b'(', &ind[ss..]) {
360            let val = ind[ss..ss + pi].trim_ascii();
361            if let Ok(t) = fast_float::parse::<f32, _>(val) {
362                out.exectime = t;
363                found = true;
364            }
365        }
366    }
367
368    if let Some(idx) = FINDER_ROWCOUNT.find(ind) {
369        let ss = idx + 9;
370        if let Some(pi) = memchr(b'(', &ind[ss..])
371            && let Some(c) = atoi::<u32>(ind[ss..ss + pi].trim_ascii())
372        {
373            out.rowcount = c;
374            found = true;
375        }
376    }
377
378    if let Some(idx) = FINDER_EXEC_ID.find(ind) {
379        let ss = idx + 8;
380        let end = memchr(b'.', &ind[ss..])
381            .map(|i| ss + i)
382            .unwrap_or(ind.len());
383        if let Some(id) = atoi::<i64>(ind[ss..end].trim_ascii()) {
384            out.exec_id = id;
385            found = true;
386        }
387    }
388
389    found.then_some(out)
390}
391
392/// Strip a leading `": "` prefix from a `Cow<str>` (zero-alloc for both paths).
393#[inline]
394fn strip_ora_prefix(s: Cow<'_, str>) -> Cow<'_, str> {
395    match s {
396        Cow::Borrowed(inner) => Cow::Borrowed(inner.strip_prefix(": ").unwrap_or(inner)),
397        Cow::Owned(mut inner) => {
398            if inner.starts_with(": ") {
399                inner.drain(..2);
400            }
401            Cow::Owned(inner)
402        }
403    }
404}
405
406// ── Public types ──────────────────────────────────────────────────────────────
407
408/// 元数据部分
409///
410/// 包含日志记录的所有元数据字段,如会话 ID、用户名等。
411#[derive(Debug, Clone, PartialEq, Default)]
412pub struct MetaParts<'a> {
413    /// EP(Execution Point)编号,范围 0-255
414    pub ep: u8,
415
416    /// 会话 ID
417    pub sess_id: Cow<'a, str>,
418
419    /// 线程 ID
420    pub thrd_id: Cow<'a, str>,
421
422    /// 用户名
423    pub username: Cow<'a, str>,
424
425    /// 事务 ID
426    pub trxid: Cow<'a, str>,
427
428    /// 语句 ID
429    pub statement: Cow<'a, str>,
430
431    /// 应用程序名称
432    pub appname: Cow<'a, str>,
433
434    /// 客户端 IP 地址(可选)
435    pub client_ip: Cow<'a, str>,
436}
437
438/// SQL 记录的性能指标和 SQL 语句
439///
440/// 包含 SQL 执行的性能指标,如执行时间、影响行数、执行 ID 和完整的 SQL 语句。
441#[derive(Debug, Clone, PartialEq, Default)]
442pub struct PerformanceMetrics<'a> {
443    /// 执行时间(毫秒)
444    pub exectime: f32,
445
446    /// 影响的行数
447    pub rowcount: u32,
448
449    /// 执行 ID
450    pub exec_id: i64,
451
452    /// 完整的 SQL 语句
453    pub sql: Cow<'a, str>,
454}