dm_database_parser_sqllog/parser/
iterator.rs1use std::fs::File;
2use std::io::{BufRead, BufReader};
3
4use crate::error::ParseError;
5use crate::filter::adapter;
6use crate::filter::builder::Filter;
7use crate::parser::encoding::FileEncodingHint;
8use crate::record::Sqllog;
9
10const LO_MASK: u64 = 0xFF0000FF0000FFFF;
13const LO_EXPECTED: u64 = 0x2D00002D00003032;
14const HI_MASK: u64 = 0x0000FF0000FF0000;
15const HI_EXPECTED: u64 = 0x00003A0000200000;
16
17#[inline(always)]
19fn is_timestamp_start(bytes: &[u8]) -> bool {
20 debug_assert!(bytes.len() >= 23);
21 let lo = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
22 let hi = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
23 (lo & LO_MASK == LO_EXPECTED)
24 && (hi & HI_MASK == HI_EXPECTED)
25 && bytes[16] == b':'
26 && bytes[19] == b'.'
27}
28
29#[inline(always)]
30fn trailing_newline_len(s: &[u8]) -> usize {
31 if s.ends_with(b"\r\n") {
32 2
33 } else if s.ends_with(b"\n") {
34 1
35 } else {
36 0
37 }
38}
39
40pub struct LogIterator {
44 reader: BufReader<File>,
45 encoding: FileEncodingHint,
46 pending: Vec<u8>,
47 pending_line_number: u64,
48 next_line_number: u64,
49 line_buf: Vec<u8>,
50 done: bool,
51}
52
53impl LogIterator {
54 pub(super) fn new(file: File, encoding: FileEncodingHint) -> Self {
55 Self {
56 reader: BufReader::new(file),
57 encoding,
58 pending: Vec::new(),
59 pending_line_number: 1,
60 next_line_number: 1,
61 line_buf: Vec::new(),
62 done: false,
63 }
64 }
65
66 pub fn skip_errors(self) -> impl Iterator<Item = Sqllog> {
68 self.filter_map(Result::ok)
69 }
70
71 pub fn filter_by_exec_time(
73 self,
74 min_ms: f32,
75 ) -> impl Iterator<Item = Result<Sqllog, ParseError>> {
76 adapter::filter_by_exec_time(self, min_ms)
77 }
78
79 pub fn filter_by_sql_contains(
81 self,
82 pattern: &str,
83 ) -> impl Iterator<Item = Result<Sqllog, ParseError>> {
84 adapter::filter_by_sql_contains(self, pattern)
85 }
86
87 pub fn apply_filter(
89 self,
90 filter: Filter,
91 ) -> impl Iterator<Item = Result<Sqllog, ParseError>> {
92 adapter::apply_filter(self, filter)
93 }
94
95 pub fn apply_filter_keep_errors(
97 self,
98 filter: Filter,
99 ) -> impl Iterator<Item = Result<Sqllog, ParseError>> {
100 adapter::apply_filter_keep_errors(self, filter)
101 }
102}
103
104impl Iterator for LogIterator {
105 type Item = Result<Sqllog, ParseError>;
106
107 fn next(&mut self) -> Option<Self::Item> {
108 loop {
109 if self.done {
110 return None;
111 }
112
113 self.line_buf.clear();
114 let bytes_read = match self.reader.read_until(b'\n', &mut self.line_buf) {
115 Ok(n) => n,
116 Err(e) => {
117 self.done = true;
118 return Some(Err(ParseError::IoError(e.to_string())));
119 }
120 };
121
122 if bytes_read == 0 {
123 self.done = true;
124 if self.pending.is_empty() {
125 return None;
126 }
127 let trim = trailing_newline_len(&self.pending);
128 let end = self.pending.len() - trim;
129 if end == 0 {
130 return None;
131 }
132 return Some(super::parse_record_with_hint(
133 &self.pending[..end],
134 self.encoding,
135 self.pending_line_number,
136 ));
137 }
138
139 let current_line = self.next_line_number;
140 self.next_line_number += 1;
141
142 let is_new_record =
143 self.line_buf.len() >= 23 && is_timestamp_start(&self.line_buf);
144
145 if is_new_record && !self.pending.is_empty() {
146 let trim = trailing_newline_len(&self.pending);
147 let end = self.pending.len() - trim;
148
149 if end == 0 {
150 self.pending.clear();
152 self.pending_line_number = current_line;
153 self.pending.extend_from_slice(&self.line_buf);
154 continue;
155 }
156
157 let result = super::parse_record_with_hint(
158 &self.pending[..end],
159 self.encoding,
160 self.pending_line_number,
161 );
162 self.pending.clear();
163 self.pending_line_number = current_line;
164 self.pending.extend_from_slice(&self.line_buf);
165 return Some(result);
166 }
167
168 if is_new_record {
169 self.pending_line_number = current_line;
170 self.pending.extend_from_slice(&self.line_buf);
171 } else if !self.pending.is_empty() {
172 self.pending.extend_from_slice(&self.line_buf);
173 } else {
174 self.pending_line_number = current_line;
176 self.pending.extend_from_slice(&self.line_buf);
177 }
178 }
179 }
180}
181
182#[cfg(test)]
185mod tests {
186 use super::*;
187
188 #[test]
189 fn test_is_timestamp_start_valid() {
190 let ts = b"2025-11-17 16:09:41.123";
191 assert!(is_timestamp_start(ts));
192 }
193
194 #[test]
195 fn test_is_timestamp_start_wrong_year_prefix() {
196 let ts = b"1025-11-17 16:09:41.123";
197 assert!(!is_timestamp_start(ts));
198 }
199
200 #[test]
201 fn test_is_timestamp_start_wrong_month_separator() {
202 let ts = b"2025X11-17 16:09:41.123";
203 assert!(!is_timestamp_start(ts));
204 }
205
206 #[test]
207 fn test_is_timestamp_start_wrong_second_separator() {
208 let ts = b"2025-11-17 16:09X41.123";
209 assert!(!is_timestamp_start(ts));
210 }
211
212 #[test]
213 fn test_is_timestamp_start_wrong_millis_separator() {
214 let ts = b"2025-11-17 16:09:41X123";
215 assert!(!is_timestamp_start(ts));
216 }
217
218 #[test]
219 fn test_is_timestamp_start_exactly_23_bytes() {
220 let ts = b"2025-11-17 16:09:41.123";
221 assert_eq!(ts.len(), 23);
222 assert!(is_timestamp_start(ts));
223 }
224
225 #[test]
226 fn test_is_timestamp_start_trailing_garbage() {
227 let ts = b"2025-11-17 16:09:41.123extra_garbage_here";
228 assert!(is_timestamp_start(ts));
229 }
230}