dm-database-parser-sqllog 1.0.0

一个高性能的达梦数据库 sqllog 日志解析库,提供零分配或低分配的记录切分与解析功能
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
use atoi::atoi;
use encoding::DecoderTrap;
use encoding::Encoding;
use encoding::all::GB18030;
use memchr::memchr;
use memchr::memmem::Finder;
use memchr::memrchr;
use simdutf8::basic::from_utf8 as simd_from_utf8;
use std::borrow::Cow;
use std::sync::LazyLock;

use crate::parser::FileEncodingHint;

/// Pre-built SIMD finders for performance indicators — avoids per-call initialization.
static FINDER_EXECTIME: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"EXECTIME:"));
static FINDER_ROWCOUNT: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"ROWCOUNT:"));
static FINDER_EXEC_ID: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"EXEC_ID:"));

/// Maximum byte length of an indicators section.
/// Typical indicators ("EXECTIME: x(ms) ROWCOUNT: y(rows) EXEC_ID: z.") are ≤ 80 bytes.
/// 256 is a conservative upper bound that covers unusual padding or long EXEC_ID values.
const INDICATORS_WINDOW: usize = 256;

/// SQL 日志记录
///
/// 表示一条完整的 SQL 日志记录,包含时间戳、元数据、SQL 语句体和可选的性能指标。
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Sqllog<'a> {
    /// 时间戳,格式为 "YYYY-MM-DD HH:MM:SS.mmm"
    pub ts: Cow<'a, str>,

    /// 原始元数据字节(延迟解析)
    pub meta_raw: Cow<'a, str>,

    /// 原始内容(包含 Body 和 Indicators),延迟分割和解析
    pub content_raw: Cow<'a, [u8]>,

    /// 提取出的方括号标签(例如 [SEL]、[ORA]),若无则为 None
    pub tag: Option<Cow<'a, str>>,

    /// 文件级编码 hint(由 parser 探测),用于正确解码 content
    pub(crate) encoding: FileEncodingHint,
}

impl<'a> Sqllog<'a> {
    // ── 公开 API ─────────────────────────────────────────────────────────────

    /// 获取 SQL 语句体(延迟分割)
    pub fn body(&self) -> Cow<'a, str> {
        let split = self.find_indicators_split();
        let is_borrowed = matches!(&self.content_raw, Cow::Borrowed(_));
        // SAFETY: body_bytes 是 content_raw 的子切片,与 content_raw 共享 'a 生命周期
        unsafe { decode_content_bytes(&self.content_raw[..split], is_borrowed, self.encoding) }
    }

    /// 获取 SQL 语句体的长度(不做 UTF-8 校验,不分配)
    #[inline]
    pub fn body_len(&self) -> usize {
        self.find_indicators_split()
    }

    /// 获取 SQL 语句体的原始字节切片(不分配)
    #[inline]
    pub fn body_bytes(&self) -> &[u8] {
        &self.content_raw[..self.find_indicators_split()]
    }

    /// 获取原始性能指标字符串(延迟分割)
    pub fn indicators_raw(&self) -> Option<Cow<'a, str>> {
        let split = self.find_indicators_split();
        let ind_bytes = &self.content_raw[split..];
        if ind_bytes.is_empty() {
            return None;
        }
        let is_borrowed = matches!(&self.content_raw, Cow::Borrowed(_));
        // SAFETY: ind_bytes 是 content_raw 的子切片,与 content_raw 共享 'a 生命周期
        Some(unsafe { decode_content_bytes(ind_bytes, is_borrowed, self.encoding) })
    }

    /// 解析性能指标(sql 字段为空字符串)
    pub fn parse_indicators(&self) -> Option<PerformanceMetrics<'static>> {
        let ind_bytes = &self.content_raw[self.find_indicators_split()..];
        if ind_bytes.is_empty() {
            return None;
        }
        parse_indicators_from_bytes(ind_bytes)
    }

    /// 解析性能指标和 SQL 语句
    ///
    /// 返回包含 EXECTIME、ROWCOUNT、EXEC_ID 和 SQL 语句的 [`PerformanceMetrics`]。
    ///
    /// 当 tag 为 `"ORA"` 时,SQL 语句开头可能带有 `": "`,本方法会自动去除。
    ///
    /// # 实现说明
    /// 仅调用一次 `find_indicators_split()`,body 解码与 indicators 解析均在同一
    /// 次遍历中完成,`Cow::Borrowed` 路径全程零分配。
    #[inline(always)]
    pub fn parse_performance_metrics(&self) -> PerformanceMetrics<'a> {
        let split = self.find_indicators_split();
        let is_borrowed = matches!(&self.content_raw, Cow::Borrowed(_));

        // SAFETY: 子切片与 content_raw 共享 'a 生命周期
        let sql_raw =
            unsafe { decode_content_bytes(&self.content_raw[..split], is_borrowed, self.encoding) };

        let sql = if self.tag.as_deref() == Some("ORA") {
            strip_ora_prefix(sql_raw)
        } else {
            sql_raw
        };

        let mut pm = parse_indicators_from_bytes(&self.content_raw[split..]).unwrap_or_default();
        pm.sql = sql;
        pm
    }

    /// 解析元数据
    pub fn parse_meta(&self) -> MetaParts<'a> {
        let meta_bytes = self.meta_raw.as_bytes();
        let mut meta = MetaParts::default();
        let len = meta_bytes.len();
        let is_borrowed = matches!(&self.meta_raw, Cow::Borrowed(_));

        let to_cow = |bytes: &[u8]| -> Cow<'a, str> {
            if is_borrowed {
                // For Utf8 / Auto encoding: meta_raw is Cow::Borrowed — bytes is a sub-slice
                // of the memory-mapped buffer that lives for 'a.  The file was validated as
                // UTF-8 during `from_path`, so the unchecked conversion is sound.
                unsafe {
                    Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
                        bytes.as_ptr(),
                        bytes.len(),
                    )))
                }
            } else {
                // For Gb18030 / Auto-fallback encoding: meta_raw is Cow::Owned (already decoded
                // to a valid UTF-8 String).  We must NOT transmute the lifetime to 'a because
                // the Owned String lives only as long as `self`, not 'a.  Return an owned copy.
                Cow::Owned(
                    std::str::from_utf8(bytes)
                        .expect("meta_raw is always valid UTF-8")
                        .to_string(),
                )
            }
        };

        let mut idx = 0;
        while idx < len {
            // Skip whitespace
            while idx < len && meta_bytes[idx] == b' ' {
                idx += 1;
            }
            if idx >= len {
                break;
            }

            // Find token end
            let start = idx;
            while idx < len && meta_bytes[idx] != b' ' {
                idx += 1;
            }
            let part = &meta_bytes[start..idx];

            // Parse EP[n]
            if part.len() > 4
                && part[0] == b'E'
                && part[1] == b'P'
                && part[2] == b'['
                && part[part.len() - 1] == b']'
            {
                if let Some(ep) = atoi::<u8>(&part[3..part.len() - 1]) {
                    meta.ep = ep;
                }
                continue;
            }

            // Find ':'
            if let Some(sep) = memchr(b':', part) {
                let key = &part[..sep];
                let val = &part[sep + 1..];

                match key {
                    b"sess" => meta.sess_id = to_cow(val),
                    b"thrd" => meta.thrd_id = to_cow(val),
                    b"user" => meta.username = to_cow(val),
                    b"trxid" => meta.trxid = to_cow(val),
                    b"stmt" => meta.statement = to_cow(val),
                    b"ip" => meta.client_ip = to_cow(val),
                    b"appname" => {
                        if !val.is_empty() {
                            meta.appname = to_cow(val);
                        } else {
                            // Peek next token; treat it as appname only if it is not an ip field
                            let mut peek = idx;
                            while peek < len && meta_bytes[peek] == b' ' {
                                peek += 1;
                            }
                            if peek < len {
                                let peek_start = peek;
                                while peek < len && meta_bytes[peek] != b' ' {
                                    peek += 1;
                                }
                                let next = &meta_bytes[peek_start..peek];
                                if !(next.starts_with(b"ip:") || next.starts_with(b"ip::")) {
                                    meta.appname = to_cow(next);
                                    idx = peek;
                                }
                            }
                        }
                    }
                    _ => {}
                }
            }
        }
        meta
    }

    // ── Private helpers ───────────────────────────────────────────────────────

    fn find_indicators_split(&self) -> usize {
        let data = &self.content_raw;
        let len = data.len();

        // HOT-01: O(1) 早退 — DM 格式中有指标的记录以 '.' 结尾(EXEC_ID: N.)
        // 或以 ')' 结尾(仅 EXECTIME/ROWCOUNT,格式为 N(ms)/N(rows))。
        // 跳过末尾 \n/\r,取最后一个有效字节;既非 '.' 也非 ')' 则无指标,直接返回。
        let last_meaningful = data
            .iter()
            .rev()
            .find(|&&b| b != b'\n' && b != b'\r')
            .copied();
        if last_meaningful != Some(b'.') && last_meaningful != Some(b')') {
            return len;
        }

        let start = len.saturating_sub(INDICATORS_WINDOW);
        let window = &data[start..];

        // HOT-02: 单次反向扫描 ':' 字节,检查关键字前缀,记录最左命中位置。
        // 替代 3 次独立 FinderRev::rfind 调用,减少 SIMD 启动开销。
        let earliest = scan_earliest_indicator(window);

        let split = start + earliest;
        // CORR-03 验证守卫:假阳性(如 SQL 以指标关键字结尾)时 fallback 到全文。
        if split < len && parse_indicators_from_bytes(&data[split..]).is_none() {
            return len;
        }
        split
    }
}

// ── Module-level helpers ──────────────────────────────────────────────────────

/// 在 window 内单次反向扫描 ':' 字节,匹配已知指标关键字前缀。
///
/// 对每个关键字只取最右命中(即从右向左扫描的第一次命中),等价于原 FinderRev::rfind 语义。
/// 返回三个关键字最右命中中,起始位置最小(最左)的那个。
/// 若无任何命中则返回 `window.len()`(表示无分割点)。
///
/// 关键字长度:EXECTIME = 8,ROWCOUNT = 8,EXEC_ID = 7。
fn scan_earliest_indicator(window: &[u8]) -> usize {
    // 分别记录三个关键字的最右命中起始位置(None 表示未命中)
    let mut exectime_pos: Option<usize> = None;
    let mut rowcount_pos: Option<usize> = None;
    let mut exec_id_pos: Option<usize> = None;

    let mut search_end = window.len();
    while search_end > 0 {
        // 所有关键字均已找到最右命中,无需继续向左
        if exectime_pos.is_some() && rowcount_pos.is_some() && exec_id_pos.is_some() {
            break;
        }
        match memrchr(b':', &window[..search_end]) {
            None => break,
            Some(colon) => {
                let prefix = &window[..colon];
                if exectime_pos.is_none() && prefix.ends_with(b"EXECTIME") {
                    exectime_pos = Some(colon - 8);
                } else if rowcount_pos.is_none() && prefix.ends_with(b"ROWCOUNT") {
                    rowcount_pos = Some(colon - 8);
                } else if exec_id_pos.is_none() && prefix.ends_with(b"EXEC_ID") {
                    exec_id_pos = Some(colon - 7);
                }
                search_end = colon;
            }
        }
    }

    // 取三者中最左(最小索引)的命中,无命中则返回 window.len()
    [exectime_pos, rowcount_pos, exec_id_pos]
        .into_iter()
        .flatten()
        .min()
        .unwrap_or(window.len())
}

/// Decode a sub-slice of `content_raw` bytes into a `Cow<'a, str>`.
///
/// # Safety
/// `bytes` must be a sub-slice of a `'a`-lived allocation (i.e., the original
/// `Cow::Borrowed(&'a [u8])`). The caller guarantees this by passing `is_borrowed = true`
/// only when the source `Cow` is `Borrowed`.
#[inline]
unsafe fn decode_content_bytes<'a>(
    bytes: &[u8],
    is_borrowed: bool,
    encoding: FileEncodingHint,
) -> Cow<'a, str> {
    match encoding {
        FileEncodingHint::Utf8 => {
            // File was already validated as UTF-8 during `from_path`; skip per-slice re-validation.
            if is_borrowed {
                unsafe {
                    Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
                        bytes.as_ptr(),
                        bytes.len(),
                    )))
                }
            } else {
                unsafe { Cow::Owned(std::str::from_utf8_unchecked(bytes).to_string()) }
            }
        }
        FileEncodingHint::Auto => match simd_from_utf8(bytes) {
            Ok(_) => {
                if is_borrowed {
                    unsafe {
                        Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
                            bytes.as_ptr(),
                            bytes.len(),
                        )))
                    }
                } else {
                    unsafe { Cow::Owned(std::str::from_utf8_unchecked(bytes).to_string()) }
                }
            }
            Err(_) => Cow::Owned(String::from_utf8_lossy(bytes).into_owned()),
        },
        FileEncodingHint::Gb18030 => match GB18030.decode(bytes, DecoderTrap::Strict) {
            Ok(s) => Cow::Owned(s),
            Err(_) => Cow::Owned(String::from_utf8_lossy(bytes).into_owned()),
        },
    }
}

/// Parse `EXECTIME`, `ROWCOUNT`, `EXEC_ID` from a raw indicators byte slice.
/// The `sql` field of the returned struct is left as the default empty string.
/// Returns `None` if none of the three fields are present.
fn parse_indicators_from_bytes(ind: &[u8]) -> Option<PerformanceMetrics<'static>> {
    if ind.is_empty() {
        return None;
    }

    let mut out = PerformanceMetrics::default();
    let mut found = false;

    if let Some(idx) = FINDER_EXECTIME.find(ind) {
        let ss = idx + 9;
        if let Some(pi) = memchr(b'(', &ind[ss..]) {
            let val = ind[ss..ss + pi].trim_ascii();
            if let Ok(t) = fast_float::parse::<f32, _>(val) {
                out.exectime = t;
                found = true;
            }
        }
    }

    if let Some(idx) = FINDER_ROWCOUNT.find(ind) {
        let ss = idx + 9;
        if let Some(pi) = memchr(b'(', &ind[ss..])
            && let Some(c) = atoi::<u32>(ind[ss..ss + pi].trim_ascii())
        {
            out.rowcount = c;
            found = true;
        }
    }

    if let Some(idx) = FINDER_EXEC_ID.find(ind) {
        let ss = idx + 8;
        let end = memchr(b'.', &ind[ss..])
            .map(|i| ss + i)
            .unwrap_or(ind.len());
        if let Some(id) = atoi::<i64>(ind[ss..end].trim_ascii()) {
            out.exec_id = id;
            found = true;
        }
    }

    found.then_some(out)
}

/// Strip a leading `": "` prefix from a `Cow<str>` (zero-alloc for both paths).
#[inline]
fn strip_ora_prefix(s: Cow<'_, str>) -> Cow<'_, str> {
    match s {
        Cow::Borrowed(inner) => Cow::Borrowed(inner.strip_prefix(": ").unwrap_or(inner)),
        Cow::Owned(mut inner) => {
            if inner.starts_with(": ") {
                inner.drain(..2);
            }
            Cow::Owned(inner)
        }
    }
}

// ── Public types ──────────────────────────────────────────────────────────────

/// 元数据部分
///
/// 包含日志记录的所有元数据字段,如会话 ID、用户名等。
#[derive(Debug, Clone, PartialEq, Default)]
pub struct MetaParts<'a> {
    /// EP(Execution Point)编号,范围 0-255
    pub ep: u8,

    /// 会话 ID
    pub sess_id: Cow<'a, str>,

    /// 线程 ID
    pub thrd_id: Cow<'a, str>,

    /// 用户名
    pub username: Cow<'a, str>,

    /// 事务 ID
    pub trxid: Cow<'a, str>,

    /// 语句 ID
    pub statement: Cow<'a, str>,

    /// 应用程序名称
    pub appname: Cow<'a, str>,

    /// 客户端 IP 地址(可选)
    pub client_ip: Cow<'a, str>,
}

/// SQL 记录的性能指标和 SQL 语句
///
/// 包含 SQL 执行的性能指标,如执行时间、影响行数、执行 ID 和完整的 SQL 语句。
#[derive(Debug, Clone, PartialEq, Default)]
pub struct PerformanceMetrics<'a> {
    /// 执行时间(毫秒)
    pub exectime: f32,

    /// 影响的行数
    pub rowcount: u32,

    /// 执行 ID
    pub exec_id: i64,

    /// 完整的 SQL 语句
    pub sql: Cow<'a, str>,
}