Skip to main content

dm_database_sqllog2db/pipeline/
normalizer.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4/// 参数替换缓冲区类型:keyed by (`sess_id`, `stmt`),value 为解析好的参数列表。
5///
6/// Key 使用 `sess_id` 而非 `trxid`:DM 日志中 PARAMS 记录携带绑定时的 `trxid`,
7/// 但对应的 DML 执行记录在自动提交场景下 `trxid` 为 0,导致 key 不匹配。
8/// `sess_id` 在 PARAMS 和执行记录之间始终一致,是更稳定的关联键。
9///
10/// Value 使用 `Arc<Vec<ParamValue>>`:热路径 `buffer.get(&key)?.clone()` 仅复制
11/// 引用计数(O(1) 原子操作),而非深拷贝整个 Vec(H-3 优化)。
12pub type ParamBuffer = HashMap<(String, String), Arc<Vec<ParamValue>>>;
13
14/// A single parameter value parsed from a `PARAMS(...)` log record.
15#[derive(Debug, Clone)]
16pub enum ParamValue {
17    /// Single-quoted string already including the surrounding quotes, e.g. `'3USJ29'`.
18    Quoted(String),
19    /// Bare numeric literal, e.g. `2370075`.
20    Bare(String),
21    /// NULL, BLOB, or any empty-value entry.
22    Null,
23}
24
25impl ParamValue {
26    fn as_sql(&self) -> &str {
27        match self {
28            Self::Quoted(s) | Self::Bare(s) => s.as_str(),
29            Self::Null => "NULL",
30        }
31    }
32}
33
34/// Parse a `PARAMS(SEQNO, TYPE, DATA)={...}` record body into an ordered list of values.
35///
36/// Returns `None` if the body does not match the expected format.
37#[must_use]
38pub fn parse_params(body: &str) -> Option<Vec<ParamValue>> {
39    // memmem 使用 Two-Way + SIMD 算法,比 str::find 快
40    let brace = memchr::memmem::find(body.as_bytes(), b"={")?;
41    let inner = body[brace + 2..].strip_suffix('}')?;
42
43    let mut params = Vec::new();
44    // trim_start:只需去除前导空格,尾部空格在下一次迭代自然消耗
45    let mut rest = inner.trim_start();
46
47    while !rest.is_empty() {
48        let (value, tail) = parse_one_entry(rest)?;
49        params.push(value);
50        rest = tail.trim_start();
51        if let Some(t) = rest.strip_prefix(',') {
52            rest = t.trim_start();
53        }
54    }
55
56    Some(params)
57}
58
59/// Parse one `(seqno, type, value)` entry from the front of `s`.
60/// Returns `(parsed_value, remaining_input)`.
61fn parse_one_entry(s: &str) -> Option<(ParamValue, &str)> {
62    let s = s.strip_prefix('(')?;
63
64    // Skip SEQNO (integer up to first comma) — memchr for SIMD acceleration
65    let comma1 = memchr::memchr(b',', s.as_bytes())?;
66    let s = s[comma1 + 1..].trim_start();
67
68    // Skip TYPE (up to next comma)
69    let comma2 = memchr::memchr(b',', s.as_bytes())?;
70    let s = s[comma2 + 1..].trim_start();
71
72    // Parse VALUE then the closing ')'
73    if s.starts_with('\'') {
74        // Quoted string — use memchr to skip to the next single-quote, same pattern as
75        // count_placeholders / apply_params, avoiding the byte-by-byte inner loop.
76        let bytes = s.as_bytes();
77        let mut i = 1;
78        loop {
79            let rel = memchr::memchr(b'\'', &bytes[i..])?;
80            i += rel + 1;
81            // '' is an escaped quote inside the string — consume both and keep scanning
82            if i < bytes.len() && bytes[i] == b'\'' {
83                i += 1;
84            } else {
85                break;
86            }
87        }
88        // s[..i] is the quoted string including both surrounding quotes
89        let quoted = &s[..i];
90        let tail = s[i..].trim_start().strip_prefix(')')?;
91        Some((ParamValue::Quoted(String::from(quoted)), tail))
92    } else {
93        // Bare number or empty — memchr for closing ')'
94        let end = memchr::memchr(b')', s.as_bytes())?;
95        let raw = s[..end].trim();
96        let tail = &s[end + 1..];
97        let value = if raw.is_empty() {
98            ParamValue::Null
99        } else {
100            ParamValue::Bare(String::from(raw))
101        };
102        Some((value, tail))
103    }
104}
105
106/// Detect which placeholder style the SQL uses and count the number of slots,
107/// skipping over single-quoted string literals.
108///
109/// Returns `(count, is_colon_style)`:
110/// - `is_colon_style = false` → `?` style; count = number of `?` outside literals
111/// - `is_colon_style = true`  → `:N` Oracle style; count = highest ordinal seen
112///
113/// If the SQL contains no recognisable placeholders, returns `(0, false)`.
114#[inline]
115#[must_use]
116pub fn count_placeholders(sql: &str) -> (usize, bool) {
117    let bytes = sql.as_bytes();
118    let len = bytes.len();
119    let mut i = 0;
120    let mut question_count = 0usize;
121    let mut max_colon_ordinal = 0usize;
122
123    while i < len {
124        // 用 memchr3 跳过无关字节,直接定位到下一个特殊字符
125        let Some(rel) = memchr::memchr3(b'\'', b'?', b':', &bytes[i..]) else {
126            break; // 无更多特殊字节
127        };
128        i += rel;
129
130        match bytes[i] {
131            b'\'' => {
132                // Skip string literal verbatim — use memchr to jump to next quote
133                i += 1;
134                loop {
135                    let Some(r) = memchr::memchr(b'\'', &bytes[i..]) else {
136                        i = len;
137                        break;
138                    };
139                    i += r + 1;
140                    if i < len && bytes[i] == b'\'' {
141                        i += 1; // '' escape, keep scanning
142                    } else {
143                        break;
144                    }
145                }
146            }
147            b'?' => {
148                question_count += 1;
149                i += 1;
150            }
151            b':' => {
152                // `:N` where N is one or more decimal digits
153                let start = i + 1;
154                let mut j = start;
155                while j < bytes.len() && bytes[j].is_ascii_digit() {
156                    j += 1;
157                }
158                if j > start {
159                    // `:N` 内的字节均为 ASCII 数字(已 while 保证),直接累加避免 from_utf8 + parse 开销
160                    // 使用 saturating 算术防止超长序号(>20 位)在 debug 构建下 panic(WR-03)
161                    let n: usize = bytes[start..j].iter().fold(0usize, |acc, &b| {
162                        acc.saturating_mul(10).saturating_add((b - b'0') as usize)
163                    });
164                    max_colon_ordinal = max_colon_ordinal.max(n);
165                    i = j;
166                } else {
167                    i += 1;
168                }
169            }
170            _ => unreachable!(),
171        }
172    }
173
174    if max_colon_ordinal > 0 {
175        (max_colon_ordinal, true)
176    } else {
177        (question_count, false)
178    }
179}
180
181/// Replace parameter placeholders in `sql` with values from `params`, writing
182/// the result into `out` (which is cleared first).
183///
184/// Internal hot-path used by both `apply_params` and [`compute_normalized`].
185/// Avoids a `String` allocation when the caller already owns a reusable `Vec<u8>`.
186///
187/// # Safety invariant
188/// `out` will contain valid UTF-8 on return: all bytes are either taken verbatim
189/// from `sql` (already valid UTF-8) or are ASCII literals from params.
190/// ASCII bytes (0x00–0x7F) can never appear in the interior of a multi-byte
191/// UTF-8 sequence (continuation bytes are 0x80–0xBF), so no sequence is broken.
192#[inline]
193fn apply_params_into(sql: &str, params: &[ParamValue], colon_style: bool, out: &mut Vec<u8>) {
194    out.clear();
195    if params.is_empty() {
196        out.extend_from_slice(sql.as_bytes());
197        return;
198    }
199
200    let extra: usize = params
201        .iter()
202        .map(|p| p.as_sql().len().saturating_sub(1))
203        .sum();
204    out.reserve(sql.len() + extra);
205    let bytes = sql.as_bytes();
206    let len = bytes.len();
207    let mut i = 0;
208    let mut seq_idx = 0usize; // used for `?` style
209
210    while i < len {
211        // 用 memchr2 跳过无关字节:问号模式找 ' 和 ?,冒号模式找 ' 和 :
212        let special = if colon_style {
213            memchr::memchr2(b'\'', b':', &bytes[i..])
214        } else {
215            memchr::memchr2(b'\'', b'?', &bytes[i..])
216        };
217        let Some(rel) = special else {
218            out.extend_from_slice(&bytes[i..]);
219            break;
220        };
221        // 批量复制特殊字节之前的普通内容
222        if rel > 0 {
223            out.extend_from_slice(&bytes[i..i + rel]);
224        }
225        i += rel;
226
227        match bytes[i] {
228            b'\'' => {
229                // Copy string literal verbatim — use memchr to bulk-copy chunks between quotes
230                out.push(b'\'');
231                i += 1;
232                loop {
233                    let Some(r) = memchr::memchr(b'\'', &bytes[i..]) else {
234                        out.extend_from_slice(&bytes[i..]);
235                        i = len;
236                        break;
237                    };
238                    out.extend_from_slice(&bytes[i..=(i + r)]); // copy up to and including the '
239                    i += r + 1;
240                    if i < len && bytes[i] == b'\'' {
241                        out.push(b'\''); // '' escape: emit second '
242                        i += 1;
243                    } else {
244                        break;
245                    }
246                }
247            }
248            b'?' if !colon_style => {
249                if let Some(p) = params.get(seq_idx) {
250                    out.extend_from_slice(p.as_sql().as_bytes());
251                } else {
252                    out.push(b'?');
253                }
254                seq_idx += 1;
255                i += 1;
256            }
257            b':' if colon_style => {
258                let start = i + 1;
259                let mut j = start;
260                while j < len && bytes[j].is_ascii_digit() {
261                    j += 1;
262                }
263                if j > start {
264                    // `:N` 内的字节均为 ASCII 数字,直接累加避免 from_utf8 + parse 开销
265                    // 使用 saturating 算术防止超长序号(>20 位)在 debug 构建下 panic(WR-03)
266                    let n: usize = bytes[start..j].iter().fold(0usize, |acc, &b| {
267                        acc.saturating_mul(10).saturating_add((b - b'0') as usize)
268                    });
269                    // :N is 1-indexed
270                    if let Some(p) = n.checked_sub(1).and_then(|idx| params.get(idx)) {
271                        out.extend_from_slice(p.as_sql().as_bytes());
272                    } else {
273                        out.extend_from_slice(&bytes[i..j]);
274                    }
275                    i = j;
276                } else {
277                    out.push(b':');
278                    i += 1;
279                }
280            }
281            b => {
282                out.push(b);
283                i += 1;
284            }
285        }
286    }
287}
288
289/// Replace parameter placeholders in `sql` with values from `params`.
290///
291/// Supports two placeholder styles:
292/// - `?`  — replaced sequentially: first `?` → `params[0]`, second → `params[1]`, …
293/// - `:N` — replaced by ordinal:   `:1` → `params[0]`, `:2` → `params[1]`, …
294///
295/// String params are already single-quoted (e.g. `'hello'`); numeric and NULL params
296/// are written bare or as `NULL`. Placeholders inside single-quoted SQL string literals
297/// are never replaced.
298///
299/// **Callers must verify that `params.len()` equals `count_placeholders(sql).0`
300/// before calling this function.**  If counts differ the result is unspecified.
301///
302/// # Panics
303///
304/// Will not panic in practice: the output is valid UTF-8 (original SQL bytes plus
305/// ASCII param literals). The `expect` is an internal consistency assertion.
306#[cfg(test)]
307fn apply_params(sql: &str, params: &[ParamValue], colon_style: bool) -> String {
308    let mut buf = Vec::new();
309    apply_params_into(sql, params, colon_style, &mut buf);
310    String::from_utf8(buf).expect("apply_params produced invalid UTF-8")
311}
312
313/// Helper used in `cli/run.rs` to update the params buffer and compute the
314/// `normalized_sql` value for a single log record.
315///
316/// Accepts pre-parsed `meta` and `pm_sql` to avoid re-parsing inside this
317/// function. For PARAMS records `pm_sql` equals the record body (the two are
318/// identical when there are no performance indicators). For DML records it is
319/// the SQL statement extracted from `PerformanceMetrics::sql`.
320///
321/// - If the record is a `PARAMS(...)` record, its values are stored in `buffer`
322///   (keyed by `(sess_id, stmt)`) and `None` is returned.
323/// - If the record is an `[INS]`/`[DEL]`/`[UPD]`/`[SEL]` execution record that
324///   has a matching entry in `buffer`, the SQL with substituted parameters is
325///   returned as `Some(String)`.
326/// - For all other records, `None` is returned.
327///
328/// `placeholder_override`:
329/// - `None`        → auto-detect from the SQL (`:N` takes priority over `?`)
330/// - `Some(true)`  → force colon-style (`:N`)
331/// - `Some(false)` → force question-style (`?`)
332///
333/// `scratch` is a caller-owned reusable buffer. On a successful substitution the
334/// result is written there and a `&str` pointing into it is returned, eliminating
335/// a per-record heap allocation. The caller must not modify `scratch` while the
336/// returned reference is live.
337///
338/// # Returns
339///
340/// - `Some(&str)` — the SQL with all placeholders replaced by their bound values,
341///   written into `scratch`. The reference borrows `scratch`; the caller must not
342///   modify `scratch` while it is live.
343/// - `None` — if any of the following hold:
344///   - the record has no `tag` (e.g. a `PARAMS` record — its values are stored in `buffer`)
345///   - the tag is not `INS`, `DEL`, `UPD`, or `SEL`
346///   - the SQL contains no recognisable placeholders
347///   - no matching params entry exists in `buffer` for this (`sess_id`, `stmt`) key
348///   - the number of bound params does not equal the number of placeholders in the SQL
349///
350/// # Panics
351///
352/// Will not panic in practice: all bytes written to `scratch` are either taken verbatim
353/// from the UTF-8 input SQL or from UTF-8 `ParamValue` strings. The `expect` below
354/// is an internal consistency assertion that should never fire.
355pub fn compute_normalized<'a>(
356    record: &dm_database_parser_sqllog::Sqllog,
357    pm_sql: &str,
358    buffer: &mut ParamBuffer,
359    placeholder_override: Option<bool>,
360    scratch: &'a mut Vec<u8>,
361) -> Option<&'a str> {
362    if record.tag.is_none() {
363        // 无 tag → 可能是 PARAMS 记录。
364        if pm_sql.starts_with("PARAMS(") {
365            if let Some(params) = parse_params(pm_sql) {
366                buffer.insert(
367                    (record.sess_id.clone(), record.statement.clone()),
368                    Arc::new(params),
369                );
370            }
371        }
372        return None;
373    }
374
375    // 有 tag → DML/SEL 执行记录
376    let tag = record.tag.as_deref()?;
377    if !matches!(tag, "INS" | "DEL" | "UPD" | "SEL") {
378        return None;
379    }
380
381    let (placeholder_count, detected_colon) = count_placeholders(pm_sql);
382    if placeholder_count == 0 {
383        return None;
384    }
385
386    let key = (record.sess_id.clone(), record.statement.clone());
387
388    let params = buffer.get(&key)?.clone();
389
390    let colon_style = placeholder_override.unwrap_or(detected_colon);
391
392    if params.len() != placeholder_count {
393        log::warn!(
394            "replace_parameters: param count mismatch (params={}, placeholders={}) for sql: {}",
395            params.len(),
396            placeholder_count,
397            pm_sql
398                .char_indices()
399                .nth(80)
400                .map_or(pm_sql, |(i, _)| &pm_sql[..i])
401        );
402        return None;
403    }
404
405    apply_params_into(pm_sql, &params, colon_style, scratch);
406
407    // All bytes in `scratch` come from two UTF-8 sources:
408    //   1. verbatim slices of `pm_sql` (already valid UTF-8)
409    //   2. ParamValue::Quoted/Bare strings (Rust String — always valid UTF-8)
410    // ASCII literals used as delimiters ('?', ':', '\'') are single-byte and
411    // cannot appear in the interior of a multi-byte UTF-8 sequence, so no
412    // sequence is broken. The debug_assert guards this invariant cheaply in
413    // debug builds; the expect is a final consistency guard.
414    debug_assert!(
415        std::str::from_utf8(scratch).is_ok(),
416        "apply_params_into produced invalid UTF-8 — safety invariant violated"
417    );
418    Some(std::str::from_utf8(scratch).expect("apply_params_into produced invalid UTF-8"))
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424
425    fn bare(s: &str) -> ParamValue {
426        ParamValue::Bare(String::from(s))
427    }
428    fn quoted(s: &str) -> ParamValue {
429        ParamValue::Quoted(String::from(s))
430    }
431
432    // ── parse_params ──────────────────────────────────────────────────────────
433
434    #[test]
435    fn test_parse_single_varchar() {
436        let params = parse_params("PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'SM')}").unwrap();
437        assert_eq!(params.len(), 1);
438        assert_eq!(params[0].as_sql(), "'SM'");
439    }
440
441    #[test]
442    fn test_parse_mixed_types() {
443        let params = parse_params(
444            "PARAMS(SEQNO, TYPE, DATA)={(0, DEC, 3), (1, VARCHAR, 'send ok'), (2, DEC, 0), (3, INTEGER, 42)}",
445        )
446        .unwrap();
447        assert_eq!(params.len(), 4);
448        assert_eq!(params[0].as_sql(), "3");
449        assert_eq!(params[1].as_sql(), "'send ok'");
450        assert_eq!(params[2].as_sql(), "0");
451        assert_eq!(params[3].as_sql(), "42");
452    }
453
454    #[test]
455    fn test_parse_blob_empty() {
456        let params = parse_params("PARAMS(SEQNO, TYPE, DATA)={(0, DEC, 1), (1, BLOB, )}").unwrap();
457        assert_eq!(params.len(), 2);
458        assert_eq!(params[0].as_sql(), "1");
459        assert_eq!(params[1].as_sql(), "NULL");
460    }
461
462    #[test]
463    fn test_parse_quoted_with_escaped_quote() {
464        let params = parse_params("PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'O''Brien')}").unwrap();
465        assert_eq!(params[0].as_sql(), "'O''Brien'");
466    }
467
468    #[test]
469    fn test_parse_invalid_returns_none() {
470        assert!(parse_params("not a params record").is_none());
471    }
472
473    // ── apply_params ──────────────────────────────────────────────────────────
474
475    #[test]
476    fn test_apply_single_string_param() {
477        let params = vec![quoted("'3USJ29'")];
478        let result = apply_params("WHERE code = ?", &params, false);
479        assert_eq!(result, "WHERE code = '3USJ29'");
480    }
481
482    #[test]
483    fn test_apply_numeric_param() {
484        let params = vec![bare("42")];
485        let result = apply_params("WHERE id = ?", &params, false);
486        assert_eq!(result, "WHERE id = 42");
487    }
488
489    #[test]
490    fn test_apply_null_param() {
491        let params = vec![ParamValue::Null];
492        let result = apply_params("WHERE tag = ?", &params, false);
493        assert_eq!(result, "WHERE tag = NULL");
494    }
495
496    #[test]
497    fn test_apply_multiple_params() {
498        let params = vec![bare("2370075"), quoted("'SJ-1'"), ParamValue::Null];
499        let result = apply_params("VALUES (?, ?, ?)", &params, false);
500        assert_eq!(result, "VALUES (2370075, 'SJ-1', NULL)");
501    }
502
503    #[test]
504    fn test_apply_no_placeholders() {
505        let params = vec![bare("1")];
506        let result = apply_params("SELECT 1", &params, false);
507        assert_eq!(result, "SELECT 1");
508    }
509
510    #[test]
511    fn test_apply_skip_literal_contents() {
512        // The '?' inside the string literal should NOT be replaced
513        let params = vec![quoted("'real'")];
514        let result = apply_params("WHERE a = '?' AND b = ?", &params, false);
515        assert_eq!(result, "WHERE a = '?' AND b = 'real'");
516    }
517
518    #[test]
519    fn test_apply_insert_with_function() {
520        // current_timestamp is not a placeholder; only the bare ? are replaced
521        let params = vec![bare("1"), quoted("'hello'"), bare("99")];
522        let result = apply_params(
523            "INSERT INTO t VALUES (?,current_timestamp,?,?)",
524            &params,
525            false,
526        );
527        assert_eq!(
528            result,
529            "INSERT INTO t VALUES (1,current_timestamp,'hello',99)"
530        );
531    }
532
533    #[test]
534    fn test_apply_chinese_in_param() {
535        let params = vec![quoted("'张三'")];
536        let result = apply_params("WHERE name = ?", &params, false);
537        assert_eq!(result, "WHERE name = '张三'");
538    }
539
540    // ── colon-style placeholders ───────────────────────────────────────────────
541
542    #[test]
543    fn test_apply_colon_style_basic() {
544        let params = vec![bare("10"), quoted("'abc'")];
545        let result = apply_params("WHERE id = :1 AND code = :2", &params, true);
546        assert_eq!(result, "WHERE id = 10 AND code = 'abc'");
547    }
548
549    #[test]
550    fn test_apply_colon_style_out_of_order() {
551        let params = vec![bare("1"), bare("2"), bare("3")];
552        let result = apply_params("SELECT :3, :1, :2", &params, true);
553        assert_eq!(result, "SELECT 3, 1, 2");
554    }
555
556    #[test]
557    fn test_count_placeholders_question() {
558        let (count, colon_style) = count_placeholders("WHERE a = ? AND b = ?");
559        assert_eq!(count, 2);
560        assert!(!colon_style);
561    }
562
563    #[test]
564    fn test_count_placeholders_colon() {
565        let (count, colon_style) = count_placeholders("WHERE a = :1 AND b = :2 AND c = :3");
566        assert_eq!(count, 3);
567        assert!(colon_style);
568    }
569
570    #[test]
571    fn test_count_placeholders_skips_literals() {
572        let (count, colon_style) = count_placeholders("WHERE a = '?' AND b = ?");
573        assert_eq!(count, 1);
574        assert!(!colon_style);
575    }
576
577    #[test]
578    fn test_count_placeholders_none() {
579        let (count, colon_style) = count_placeholders("SELECT 1");
580        assert_eq!(count, 0);
581        assert!(!colon_style);
582    }
583
584    #[test]
585    fn test_count_placeholders_unclosed_string() {
586        // Unclosed string literal — covers the `None` branch in the inner loop
587        let (count, _) = count_placeholders("SELECT 'unclosed");
588        assert_eq!(count, 0);
589    }
590
591    #[test]
592    fn test_count_placeholders_escaped_quote() {
593        // SQL with '' (escaped quote inside string) — covers the '' escape branch
594        let (count, _) = count_placeholders("WHERE name = 'O''Brien' AND id = ?");
595        assert_eq!(count, 1);
596    }
597
598    #[test]
599    fn test_count_placeholders_colon_not_followed_by_digit() {
600        // ':' not followed by digits → i += 1 branch (line 168)
601        let (count, colon_style) = count_placeholders("SELECT a::text");
602        assert_eq!(count, 0);
603        assert!(!colon_style);
604    }
605
606    #[test]
607    fn test_apply_params_empty_params_returns_sql_unchanged() {
608        // Empty params list → early return with sql copy (lines 197-198)
609        let result = apply_params("SELECT * FROM t", &[], false);
610        assert_eq!(result, "SELECT * FROM t");
611    }
612
613    #[test]
614    fn test_apply_params_with_string_literal_verbatim_copy() {
615        // String literal in SQL is copied verbatim, ? inside is NOT replaced
616        let params = vec![bare("42")];
617        let result = apply_params("WHERE code = '?' AND id = ?", &params, false);
618        assert_eq!(result, "WHERE code = '?' AND id = 42");
619    }
620
621    #[test]
622    fn test_apply_params_escaped_quote_in_literal() {
623        // '' escape inside a string literal — covers lines 242-243
624        let params = vec![bare("1")];
625        let result = apply_params("WHERE name = 'O''Brien' AND id = ?", &params, false);
626        assert_eq!(result, "WHERE name = 'O''Brien' AND id = 1");
627    }
628
629    #[test]
630    fn test_apply_params_unclosed_string_literal() {
631        // Unclosed string literal in SQL — covers lines 235-237 in apply_params_into
632        let params = vec![bare("1")];
633        let result = apply_params("SELECT 'unclosed", &params, false);
634        // Unclosed string: no ? found outside literal, result == original sql
635        assert_eq!(result, "SELECT 'unclosed");
636    }
637}