Skip to main content

dm_database_parser_sqllog/parser/
iterator.rs

1use memchr::memchr;
2use memchr::memmem::Finder;
3use std::sync::LazyLock;
4
5use crate::error::ParseError;
6use crate::filter::adapter;
7use crate::filter::builder::Filter;
8use crate::parser::encoding::FileEncodingHint;
9use crate::record::Sqllog;
10
11/// Pre-built SIMD searcher for the `"\n20"` record-start pattern.
12static FINDER_RECORD_START: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"\n20"));
13
14// ── 时间戳验证常量 ──────────────────────────────────────────────────────────────
15
16const LO_MASK: u64 = 0xFF0000FF0000FFFF;
17const LO_EXPECTED: u64 = 0x2D00002D00003032;
18const HI_MASK: u64 = 0x0000FF0000FF0000;
19const HI_EXPECTED: u64 = 0x00003A0000200000;
20
21/// 检查 bytes[0..23] 是否符合时间戳格式 "20YY-MM-DD HH:MM:SS.mmm"。
22#[inline(always)]
23fn is_timestamp_start(bytes: &[u8]) -> bool {
24    debug_assert!(bytes.len() >= 23);
25    let lo = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
26    let hi = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
27    (lo & LO_MASK == LO_EXPECTED)
28        && (hi & HI_MASK == HI_EXPECTED)
29        && bytes[16] == b':'
30        && bytes[19] == b'.'
31}
32
33/// SQL 日志记录的顺序迭代器。
34pub struct LogIterator<'a> {
35    pub(super) data: &'a [u8],
36    pub(super) pos: usize,
37    pub(super) encoding: FileEncodingHint,
38    pub(super) line_number: u64,
39}
40
41impl<'a> LogIterator<'a> {
42    /// 返回一个跳过解析错误的迭代器。
43    pub fn skip_errors(self) -> impl Iterator<Item = Sqllog> + 'a {
44        self.filter_map(Result::ok)
45    }
46
47    /// 过滤出执行时间大于等于 `min_ms` 毫秒的记录。
48    ///
49    /// 解析错误在迭代过程中**静默丢弃**。若需保留错误,请使用 [`apply_filter_keep_errors`]。
50    pub fn filter_by_exec_time(
51        self,
52        min_ms: f32,
53    ) -> impl Iterator<Item = Result<Sqllog, ParseError>> + 'a {
54        adapter::filter_by_exec_time(self, min_ms)
55    }
56
57    /// 过滤出 SQL 语句体包含指定 `pattern` 的记录。
58    ///
59    /// 解析错误在迭代过程中**静默丢弃**。若需保留错误,请使用 [`apply_filter_keep_errors`]。
60    pub fn filter_by_sql_contains(
61        self,
62        pattern: &str,
63    ) -> impl Iterator<Item = Result<Sqllog, ParseError>> + 'a {
64        adapter::filter_by_sql_contains(self, pattern)
65    }
66
67    /// 应用 FilterBuilder 产出的组合过滤器,错误记录被丢弃(与 filter_by_exec_time 一致)。
68    pub fn apply_filter(
69        self,
70        filter: Filter,
71    ) -> impl Iterator<Item = Result<Sqllog, ParseError>> + 'a {
72        adapter::apply_filter(self, filter)
73    }
74
75    /// 应用 FilterBuilder 产出的组合过滤器,错误记录透传。
76    pub fn apply_filter_keep_errors(
77        self,
78        filter: Filter,
79    ) -> impl Iterator<Item = Result<Sqllog, ParseError>> + 'a {
80        adapter::apply_filter_keep_errors(self, filter)
81    }
82}
83
84impl<'a> Iterator for LogIterator<'a> {
85    type Item = Result<Sqllog, ParseError>;
86
87    fn next(&mut self) -> Option<Self::Item> {
88        loop {
89            if self.pos >= self.data.len() {
90                return None;
91            }
92
93            let data = &self.data[self.pos..];
94            let current_line = self.line_number;
95
96            let (record_end, next_start) = match memchr(b'\n', data) {
97                None => (data.len(), data.len()),
98                Some(first_nl) => {
99                    let ts_start = first_nl + 1;
100                    if ts_start + 23 <= data.len()
101                        && is_timestamp_start(&data[ts_start..ts_start + 23])
102                    {
103                        (first_nl, ts_start)
104                    } else {
105                        // 多行记录:用 memmem 跳过嵌入换行继续搜索
106                        let mut found_boundary: Option<usize> = None;
107                        for candidate in FINDER_RECORD_START.find_iter(&data[ts_start..]) {
108                            let abs_ts = ts_start + candidate + 1;
109                            if abs_ts + 23 <= data.len()
110                                && is_timestamp_start(&data[abs_ts..abs_ts + 23])
111                            {
112                                found_boundary = Some(ts_start + candidate);
113                                break;
114                            }
115                        }
116                        match found_boundary {
117                            Some(idx) => (idx, idx + 1),
118                            None => (data.len(), data.len()),
119                        }
120                    }
121                }
122            };
123
124            let record_slice = &data[..record_end];
125            self.pos += next_start;
126
127            self.line_number += data[..next_start].iter().filter(|&&b| b == b'\n').count() as u64;
128
129            // Trim trailing CR
130            let record_slice = if record_slice.ends_with(b"\r") {
131                &record_slice[..record_slice.len() - 1]
132            } else {
133                record_slice
134            };
135
136            if record_slice.is_empty() {
137                continue;
138            }
139
140            return Some(super::parse_record_with_hint(
141                record_slice,
142                self.encoding,
143                current_line,
144            ));
145        }
146    }
147}
148
149// ── 测试 ────────────────────────────────────────────────────────────────────
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[test]
156    fn test_is_timestamp_start_valid() {
157        let ts = b"2025-11-17 16:09:41.123";
158        assert!(is_timestamp_start(ts));
159    }
160
161    #[test]
162    fn test_is_timestamp_start_wrong_year_prefix() {
163        let ts = b"1025-11-17 16:09:41.123";
164        assert!(!is_timestamp_start(ts));
165    }
166
167    #[test]
168    fn test_is_timestamp_start_wrong_month_separator() {
169        let ts = b"2025X11-17 16:09:41.123";
170        assert!(!is_timestamp_start(ts));
171    }
172
173    #[test]
174    fn test_is_timestamp_start_wrong_second_separator() {
175        let ts = b"2025-11-17 16:09X41.123";
176        assert!(!is_timestamp_start(ts));
177    }
178
179    #[test]
180    fn test_is_timestamp_start_wrong_millis_separator() {
181        let ts = b"2025-11-17 16:09:41X123";
182        assert!(!is_timestamp_start(ts));
183    }
184
185    #[test]
186    fn test_is_timestamp_start_exactly_23_bytes() {
187        let ts = b"2025-11-17 16:09:41.123";
188        assert_eq!(ts.len(), 23);
189        assert!(is_timestamp_start(ts));
190    }
191
192    #[test]
193    fn test_is_timestamp_start_trailing_garbage() {
194        let ts = b"2025-11-17 16:09:41.123extra_garbage_here";
195        assert!(is_timestamp_start(ts));
196    }
197}