dm_database_parser_sqllog/
parser.rs1use memchr::memmem::Finder;
2use memchr::{memchr, memrchr};
3use std::fs;
4use std::path::Path;
5use std::path::PathBuf;
6use std::str;
7use std::sync::LazyLock;
8
9use crate::error::ParseError;
10use crate::sqllog;
11use crate::sqllog::Sqllog;
12use encoding::all::GB18030;
13use encoding::{DecoderTrap, Encoding};
14
15static FINDER_CLOSE_META: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b") "));
17
18static FINDER_RECORD_START: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"\n20"));
20
21#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
23pub enum FileEncodingHint {
24 #[default]
26 Auto,
27 Utf8,
29 Gb18030,
31}
32
33pub struct LogParser {
38 data: Vec<u8>,
39 encoding: FileEncodingHint,
40}
41
42pub struct LogParserBuilder {
44 path: PathBuf,
45 encoding_hint: Option<FileEncodingHint>,
46}
47
48impl LogParserBuilder {
49 pub fn new<P: AsRef<Path>>(path: P) -> Self {
51 Self {
52 path: path.as_ref().to_path_buf(),
53 encoding_hint: None,
54 }
55 }
56
57 pub fn encoding_hint(mut self, hint: FileEncodingHint) -> Self {
59 self.encoding_hint = Some(hint);
60 self
61 }
62
63 pub fn build(self) -> Result<LogParser, ParseError> {
65 let data = fs::read(&self.path)
66 .map_err(|e| ParseError::IoError(e.to_string()))?;
67
68 let encoding = match self.encoding_hint {
69 Some(hint) => hint,
70 None => {
71 let head_size = data.len().min(64 * 1024);
73 let head_ok = str::from_utf8(&data[..head_size]).is_ok();
74 let tail_start = data.len().saturating_sub(4 * 1024).max(head_size);
75 let tail_ok = tail_start >= data.len()
76 || str::from_utf8(&data[tail_start..]).is_ok();
77 if head_ok && tail_ok {
78 FileEncodingHint::Utf8
79 } else {
80 FileEncodingHint::Gb18030
81 }
82 }
83 };
84
85 Ok(LogParser { data, encoding })
86 }
87}
88
89impl LogParser {
90 pub fn iter(&self) -> LogIterator<'_> {
92 LogIterator {
93 data: &self.data,
94 pos: 0,
95 encoding: self.encoding,
96 line_number: 1,
97 }
98 }
99}
100
101pub struct LogIterator<'a> {
103 data: &'a [u8],
104 pos: usize,
105 encoding: FileEncodingHint,
106 line_number: u64,
107}
108
109impl<'a> LogIterator<'a> {
110 pub fn skip_errors(self) -> impl Iterator<Item = Sqllog> + 'a {
112 self.filter_map(Result::ok)
113 }
114
115 pub fn filter_by_exec_time(
117 self,
118 min_ms: u64,
119 ) -> impl Iterator<Item = Result<Sqllog, ParseError>> + 'a {
120 let threshold = min_ms as f32;
121 self.filter(move |item| match item {
122 Ok(sqllog) => sqllog.exectime >= threshold,
123 Err(_) => false,
124 })
125 }
126
127 pub fn filter_by_sql_contains(
129 self,
130 pattern: &str,
131 ) -> impl Iterator<Item = Result<Sqllog, ParseError>> + 'a {
132 let pattern = pattern.to_string();
133 self.filter(move |item| match item {
134 Ok(sqllog) => sqllog.sql.contains(&pattern),
135 Err(_) => false,
136 })
137 }
138}
139
140impl<'a> Iterator for LogIterator<'a> {
141 type Item = Result<Sqllog, ParseError>;
142
143 fn next(&mut self) -> Option<Self::Item> {
144 loop {
145 if self.pos >= self.data.len() {
146 return None;
147 }
148
149 let data = &self.data[self.pos..];
150 let current_line = self.line_number;
151
152 let (record_end, next_start) = match memchr(b'\n', data) {
153 None => (data.len(), data.len()),
154 Some(first_nl) => {
155 let ts_start = first_nl + 1;
156 if ts_start + 23 <= data.len()
157 && is_timestamp_start(&data[ts_start..ts_start + 23])
158 {
159 (first_nl, ts_start)
160 } else {
161 let mut found_boundary: Option<usize> = None;
163 for candidate in FINDER_RECORD_START.find_iter(&data[ts_start..]) {
164 let abs_ts = ts_start + candidate + 1;
165 if abs_ts + 23 <= data.len()
166 && is_timestamp_start(&data[abs_ts..abs_ts + 23])
167 {
168 found_boundary = Some(ts_start + candidate);
169 break;
170 }
171 }
172 match found_boundary {
173 Some(idx) => (idx, idx + 1),
174 None => (data.len(), data.len()),
175 }
176 }
177 }
178 };
179
180 let record_slice = &data[..record_end];
181 self.pos += next_start;
182
183 self.line_number += data[..next_start].iter().filter(|&&b| b == b'\n').count() as u64;
184
185 let record_slice = if record_slice.ends_with(b"\r") {
187 &record_slice[..record_slice.len() - 1]
188 } else {
189 record_slice
190 };
191
192 if record_slice.is_empty() {
193 continue;
194 }
195
196 return Some(parse_record_with_hint(
197 record_slice,
198 self.encoding,
199 current_line,
200 ));
201 }
202 }
203}
204
205pub fn parse_record(record_bytes: &[u8]) -> Result<Sqllog, ParseError> {
209 parse_record_with_hint(record_bytes, FileEncodingHint::Auto, 0)
210}
211
212fn parse_record_with_hint(
214 record_bytes: &[u8],
215 encoding_hint: FileEncodingHint,
216 line_number: u64,
217) -> Result<Sqllog, ParseError> {
218 let is_multiline = memchr(b'\n', record_bytes).is_some();
220
221 let first_line = if is_multiline {
223 match memchr(b'\n', record_bytes) {
224 Some(idx) => {
225 let mut line = &record_bytes[..idx];
226 if line.ends_with(b"\r") {
227 line = &line[..line.len() - 1];
228 }
229 line
230 }
231 None => {
232 let mut line = record_bytes;
233 if line.ends_with(b"\r") {
234 line = &line[..line.len() - 1];
235 }
236 line
237 }
238 }
239 } else {
240 let mut line = record_bytes;
241 if line.ends_with(b"\r") {
242 line = &line[..line.len() - 1];
243 }
244 line
245 };
246
247 if first_line.len() < 23 {
249 return Err(make_invalid_format_error(first_line, line_number));
250 }
251 let ts = match str::from_utf8(&first_line[0..23]) {
252 Ok(s) => s.to_string(),
253 Err(_) => return Err(make_invalid_format_error(first_line, line_number)),
254 };
255
256 let meta_start = match memchr(b'(', &first_line[23..]) {
258 Some(idx) => 23 + idx,
259 None => return Err(make_invalid_format_error(first_line, line_number)),
260 };
261
262 let meta_end = match FINDER_CLOSE_META.find(&first_line[meta_start..]) {
263 Some(idx) => Some(meta_start + idx),
264 None => memrchr(b')', &first_line[meta_start..]).map(|idx| meta_start + idx),
265 };
266
267 let meta_end = match meta_end {
268 Some(idx) => idx,
269 None => return Err(make_invalid_format_error(first_line, line_number)),
270 };
271
272 let meta_bytes = &first_line[meta_start + 1..meta_end];
273
274 let (ep, sess_id, thrd_id, username, trxid, statement, appname, client_ip) =
276 match encoding_hint {
277 FileEncodingHint::Utf8 => {
278 sqllog::parse_meta_from_bytes(meta_bytes)
279 }
280 FileEncodingHint::Auto => {
281 match str::from_utf8(meta_bytes) {
283 Ok(_) => sqllog::parse_meta_from_bytes(meta_bytes),
284 Err(_) => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
285 Ok(decoded) => sqllog::parse_meta_from_bytes(decoded.as_bytes()),
286 Err(_) => {
287 let lossy = String::from_utf8_lossy(meta_bytes).into_owned();
288 sqllog::parse_meta_from_bytes(lossy.as_bytes())
289 }
290 },
291 }
292 }
293 FileEncodingHint::Gb18030 => {
294 match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
295 Ok(decoded) => sqllog::parse_meta_from_bytes(decoded.as_bytes()),
296 Err(_) => {
297 let lossy = String::from_utf8_lossy(meta_bytes).into_owned();
298 sqllog::parse_meta_from_bytes(lossy.as_bytes())
299 }
300 }
301 }
302 };
303
304 let body_start_in_first_line = meta_end + 1;
306
307 let content_start = if body_start_in_first_line < first_line.len()
308 && first_line[body_start_in_first_line] == b' '
309 {
310 body_start_in_first_line + 1
311 } else {
312 body_start_in_first_line
313 };
314
315 let mut tag: Option<String> = None;
317 let content_slice = if content_start < record_bytes.len() {
318 let mut s = &record_bytes[content_start..];
319 if !s.is_empty()
320 && s[0] == b'['
321 && let Some(end_idx) = memchr(b']', s)
322 && end_idx >= 1
323 {
324 let inner = &s[1..end_idx];
325 if !inner.contains(&b' ') && inner.len() <= 32 {
326 tag = match encoding_hint {
327 FileEncodingHint::Utf8 => {
328 str::from_utf8(inner).ok().map(|t| t.to_string())
329 }
330 FileEncodingHint::Auto => {
331 str::from_utf8(inner).ok().map(|t| t.to_string())
332 .or_else(|| {
333 GB18030.decode(inner, DecoderTrap::Strict)
334 .ok()
335 })
336 }
337 FileEncodingHint::Gb18030 => {
338 GB18030.decode(inner, DecoderTrap::Strict)
339 .ok()
340 .or_else(|| str::from_utf8(inner).ok().map(|s| s.to_string()))
341 }
342 };
343 s = &s[end_idx + 1..];
345 let mut skip = 0usize;
346 while skip < s.len() && s[skip].is_ascii_whitespace() {
347 skip += 1;
348 }
349 s = &s[skip..];
350 }
351 }
352 s
353 } else {
354 &[] as &[u8]
355 };
356
357 let split = sqllog::find_indicators_split(content_slice);
359 let body_bytes = &content_slice[..split];
360 let ind_bytes = &content_slice[split..];
361
362 let sql_raw = match encoding_hint {
364 FileEncodingHint::Utf8 => {
365 String::from_utf8_lossy(body_bytes).into_owned()
366 }
367 FileEncodingHint::Auto => {
368 match str::from_utf8(body_bytes) {
369 Ok(s) => s.to_string(),
370 Err(_) => match GB18030.decode(body_bytes, DecoderTrap::Strict) {
371 Ok(s) => s,
372 Err(_) => String::from_utf8_lossy(body_bytes).into_owned(),
373 },
374 }
375 }
376 FileEncodingHint::Gb18030 => {
377 match GB18030.decode(body_bytes, DecoderTrap::Strict) {
378 Ok(s) => s,
379 Err(_) => String::from_utf8_lossy(body_bytes).into_owned(),
380 }
381 }
382 };
383
384 let sql = if tag.as_deref() == Some("ORA") {
386 sql_raw.strip_prefix(": ").unwrap_or(&sql_raw).to_string()
387 } else {
388 sql_raw
389 };
390
391 let (exectime, rowcount, exec_id) = sqllog::parse_indicators_from_bytes(ind_bytes);
393
394 Ok(Sqllog {
395 ts,
396 tag,
397 ep,
398 sess_id,
399 thrd_id,
400 username,
401 trxid,
402 statement,
403 appname,
404 client_ip,
405 sql,
406 exectime,
407 rowcount,
408 exec_id,
409 })
410}
411
412const LO_MASK: u64 = 0xFF0000FF0000FFFF;
415const LO_EXPECTED: u64 = 0x2D00002D00003032;
416const HI_MASK: u64 = 0x0000FF0000FF0000;
417const HI_EXPECTED: u64 = 0x00003A0000200000;
418
419#[inline(always)]
421fn is_timestamp_start(bytes: &[u8]) -> bool {
422 debug_assert!(bytes.len() >= 23);
423 let lo = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
424 let hi = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
425 (lo & LO_MASK == LO_EXPECTED)
426 && (hi & HI_MASK == HI_EXPECTED)
427 && bytes[16] == b':'
428 && bytes[19] == b'.'
429}
430
431#[cold]
432fn make_invalid_format_error(raw_bytes: &[u8], line_number: u64) -> ParseError {
433 ParseError::InvalidFormat {
434 raw: String::from_utf8_lossy(raw_bytes).to_string(),
435 line_number,
436 }
437}
438
439#[cfg(test)]
442mod tests {
443 use super::*;
444
445 #[test]
446 fn test_is_timestamp_start_valid() {
447 let ts = b"2025-11-17 16:09:41.123";
448 assert!(is_timestamp_start(ts));
449 }
450
451 #[test]
452 fn test_is_timestamp_start_wrong_year_prefix() {
453 let ts = b"1025-11-17 16:09:41.123";
454 assert!(!is_timestamp_start(ts));
455 }
456
457 #[test]
458 fn test_is_timestamp_start_wrong_month_separator() {
459 let ts = b"2025X11-17 16:09:41.123";
460 assert!(!is_timestamp_start(ts));
461 }
462
463 #[test]
464 fn test_is_timestamp_start_wrong_second_separator() {
465 let ts = b"2025-11-17 16:09X41.123";
466 assert!(!is_timestamp_start(ts));
467 }
468
469 #[test]
470 fn test_is_timestamp_start_wrong_millis_separator() {
471 let ts = b"2025-11-17 16:09:41X123";
472 assert!(!is_timestamp_start(ts));
473 }
474
475 #[test]
476 fn test_is_timestamp_start_exactly_23_bytes() {
477 let ts = b"2025-11-17 16:09:41.123";
478 assert_eq!(ts.len(), 23);
479 assert!(is_timestamp_start(ts));
480 }
481
482 #[test]
483 fn test_is_timestamp_start_trailing_garbage() {
484 let ts = b"2025-11-17 16:09:41.123extra_garbage_here";
485 assert!(is_timestamp_start(ts));
486 }
487
488 #[cfg(not(miri))]
489 #[test]
490 fn test_builder_encoding_hint_utf8() {
491 use std::io::Write;
492 use tempfile::NamedTempFile;
493
494 let mut tmp = NamedTempFile::new().expect("tmp");
495 write!(
496 tmp,
497 "2025-11-17 16:09:41.123 (EP[0] sess:1 thrd:2 user:u trxid:3 stmt:4 appname:a) SELECT 1"
498 )
499 .unwrap();
500 tmp.as_file().sync_all().unwrap();
501
502 let parser = LogParserBuilder::new(tmp.path())
503 .encoding_hint(FileEncodingHint::Utf8)
504 .build()
505 .expect("build");
506 let record = parser.iter().next().unwrap().unwrap();
507 assert_eq!(record.ts, "2025-11-17 16:09:41.123");
508 assert!(record.sql.contains("SELECT 1"));
509 }
510
511 #[cfg(not(miri))]
512 #[test]
513 fn test_builder_file_not_found() {
514 let result = LogParserBuilder::new("/nonexistent/path.log").build();
515 assert!(result.is_err());
516 match result {
517 Err(ParseError::IoError(_)) => {}
518 _ => panic!("Expected IoError on nonexistent file"),
519 }
520 }
521}