Skip to main content

dm_database_parser_sqllog/parser/
iterator.rs

1use std::fs::File;
2use std::io::{BufRead, BufReader};
3
4use crate::error::ParseError;
5use crate::filter::adapter;
6use crate::filter::builder::Filter;
7use crate::parser::encoding::FileEncodingHint;
8use crate::record::Sqllog;
9
10// ── 时间戳验证常量 ──────────────────────────────────────────────────────────────
11
12const LO_MASK: u64 = 0xFF0000FF0000FFFF;
13const LO_EXPECTED: u64 = 0x2D00002D00003032;
14const HI_MASK: u64 = 0x0000FF0000FF0000;
15const HI_EXPECTED: u64 = 0x00003A0000200000;
16
17/// 检查 bytes[0..23] 是否符合时间戳格式 "20YY-MM-DD HH:MM:SS.mmm"。
18#[inline(always)]
19fn is_timestamp_start(bytes: &[u8]) -> bool {
20    debug_assert!(bytes.len() >= 23);
21    let lo = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
22    let hi = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
23    (lo & LO_MASK == LO_EXPECTED)
24        && (hi & HI_MASK == HI_EXPECTED)
25        && bytes[16] == b':'
26        && bytes[19] == b'.'
27}
28
29#[inline(always)]
30fn trailing_newline_len(s: &[u8]) -> usize {
31    if s.ends_with(b"\r\n") {
32        2
33    } else if s.ends_with(b"\n") {
34        1
35    } else {
36        0
37    }
38}
39
40/// SQL 日志记录的流式迭代器。
41///
42/// 内部使用 `BufReader` 逐行读取,内存峰值约等于单条最大记录的大小。
43pub struct LogIterator {
44    reader: BufReader<File>,
45    encoding: FileEncodingHint,
46    pending: Vec<u8>,
47    pending_line_number: u64,
48    next_line_number: u64,
49    line_buf: Vec<u8>,
50    done: bool,
51}
52
53impl LogIterator {
54    pub(super) fn new(file: File, encoding: FileEncodingHint) -> Self {
55        Self {
56            reader: BufReader::new(file),
57            encoding,
58            pending: Vec::new(),
59            pending_line_number: 1,
60            next_line_number: 1,
61            line_buf: Vec::new(),
62            done: false,
63        }
64    }
65
66    /// 返回一个跳过解析错误的迭代器。
67    pub fn skip_errors(self) -> impl Iterator<Item = Sqllog> {
68        self.filter_map(Result::ok)
69    }
70
71    /// 过滤出执行时间大于等于 `min_ms` 毫秒的记录。
72    pub fn filter_by_exec_time(
73        self,
74        min_ms: f32,
75    ) -> impl Iterator<Item = Result<Sqllog, ParseError>> {
76        adapter::filter_by_exec_time(self, min_ms)
77    }
78
79    /// 过滤出 SQL 语句体包含指定 `pattern` 的记录。
80    pub fn filter_by_sql_contains(
81        self,
82        pattern: &str,
83    ) -> impl Iterator<Item = Result<Sqllog, ParseError>> {
84        adapter::filter_by_sql_contains(self, pattern)
85    }
86
87    /// 应用 FilterBuilder 产出的组合过滤器,错误记录被丢弃。
88    pub fn apply_filter(
89        self,
90        filter: Filter,
91    ) -> impl Iterator<Item = Result<Sqllog, ParseError>> {
92        adapter::apply_filter(self, filter)
93    }
94
95    /// 应用 FilterBuilder 产出的组合过滤器,错误记录透传。
96    pub fn apply_filter_keep_errors(
97        self,
98        filter: Filter,
99    ) -> impl Iterator<Item = Result<Sqllog, ParseError>> {
100        adapter::apply_filter_keep_errors(self, filter)
101    }
102}
103
104impl Iterator for LogIterator {
105    type Item = Result<Sqllog, ParseError>;
106
107    fn next(&mut self) -> Option<Self::Item> {
108        loop {
109            if self.done {
110                return None;
111            }
112
113            self.line_buf.clear();
114            let bytes_read = match self.reader.read_until(b'\n', &mut self.line_buf) {
115                Ok(n) => n,
116                Err(e) => {
117                    self.done = true;
118                    return Some(Err(ParseError::IoError(e.to_string())));
119                }
120            };
121
122            if bytes_read == 0 {
123                self.done = true;
124                if self.pending.is_empty() {
125                    return None;
126                }
127                let trim = trailing_newline_len(&self.pending);
128                let end = self.pending.len() - trim;
129                if end == 0 {
130                    return None;
131                }
132                return Some(super::parse_record_with_hint(
133                    &self.pending[..end],
134                    self.encoding,
135                    self.pending_line_number,
136                ));
137            }
138
139            let current_line = self.next_line_number;
140            self.next_line_number += 1;
141
142            let is_new_record =
143                self.line_buf.len() >= 23 && is_timestamp_start(&self.line_buf);
144
145            if is_new_record && !self.pending.is_empty() {
146                let trim = trailing_newline_len(&self.pending);
147                let end = self.pending.len() - trim;
148
149                if end == 0 {
150                    // blank line between records — skip silently
151                    self.pending.clear();
152                    self.pending_line_number = current_line;
153                    self.pending.extend_from_slice(&self.line_buf);
154                    continue;
155                }
156
157                let result = super::parse_record_with_hint(
158                    &self.pending[..end],
159                    self.encoding,
160                    self.pending_line_number,
161                );
162                self.pending.clear();
163                self.pending_line_number = current_line;
164                self.pending.extend_from_slice(&self.line_buf);
165                return Some(result);
166            }
167
168            if is_new_record {
169                self.pending_line_number = current_line;
170                self.pending.extend_from_slice(&self.line_buf);
171            } else if !self.pending.is_empty() {
172                self.pending.extend_from_slice(&self.line_buf);
173            } else {
174                // non-record line before any record start: buffer as a (likely invalid) record
175                self.pending_line_number = current_line;
176                self.pending.extend_from_slice(&self.line_buf);
177            }
178        }
179    }
180}
181
182// ── 测试 ────────────────────────────────────────────────────────────────────
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187
188    #[test]
189    fn test_is_timestamp_start_valid() {
190        let ts = b"2025-11-17 16:09:41.123";
191        assert!(is_timestamp_start(ts));
192    }
193
194    #[test]
195    fn test_is_timestamp_start_wrong_year_prefix() {
196        let ts = b"1025-11-17 16:09:41.123";
197        assert!(!is_timestamp_start(ts));
198    }
199
200    #[test]
201    fn test_is_timestamp_start_wrong_month_separator() {
202        let ts = b"2025X11-17 16:09:41.123";
203        assert!(!is_timestamp_start(ts));
204    }
205
206    #[test]
207    fn test_is_timestamp_start_wrong_second_separator() {
208        let ts = b"2025-11-17 16:09X41.123";
209        assert!(!is_timestamp_start(ts));
210    }
211
212    #[test]
213    fn test_is_timestamp_start_wrong_millis_separator() {
214        let ts = b"2025-11-17 16:09:41X123";
215        assert!(!is_timestamp_start(ts));
216    }
217
218    #[test]
219    fn test_is_timestamp_start_exactly_23_bytes() {
220        let ts = b"2025-11-17 16:09:41.123";
221        assert_eq!(ts.len(), 23);
222        assert!(is_timestamp_start(ts));
223    }
224
225    #[test]
226    fn test_is_timestamp_start_trailing_garbage() {
227        let ts = b"2025-11-17 16:09:41.123extra_garbage_here";
228        assert!(is_timestamp_start(ts));
229    }
230}