dm_database_parser_sqllog/
parser.rs1use memchr::memmem::Finder;
2use memchr::{memchr, memrchr};
3#[cfg(unix)]
4use memmap2::Advice;
5use memmap2::Mmap;
6use simdutf8::basic::from_utf8 as simd_from_utf8;
7use std::borrow::Cow;
8use std::fs::File;
9use std::path::Path;
10use std::sync::LazyLock;
11
12use crate::error::ParseError;
13use crate::sqllog::Sqllog;
14use encoding::all::GB18030;
15use encoding::{DecoderTrap, Encoding};
16
17static FINDER_CLOSE_META: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b") "));
20
21static FINDER_RECORD_START: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"\n20"));
24
25#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
26pub(crate) enum FileEncodingHint {
27 #[default]
28 Auto,
29 Utf8,
30 Gb18030,
31}
32
33pub struct LogParser {
34 mmap: Mmap,
35 encoding: FileEncodingHint,
36}
37
38pub struct RecordIndex {
42 pub(crate) offsets: Vec<usize>,
43}
44
45impl RecordIndex {
46 pub fn len(&self) -> usize {
48 self.offsets.len()
49 }
50
51 pub fn is_empty(&self) -> bool {
53 self.offsets.is_empty()
54 }
55}
56
57impl LogParser {
58 pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, ParseError> {
59 let file = File::open(path).map_err(|e| ParseError::IoError(e.to_string()))?;
60 let mmap = unsafe { Mmap::map(&file).map_err(|e| ParseError::IoError(e.to_string()))? };
61
62 #[cfg(unix)]
66 let _ = mmap.advise(Advice::Sequential);
67
68 let head_size = mmap.len().min(64 * 1024);
81 let tail_start = mmap.len().saturating_sub(4 * 1024).max(head_size);
82 let head_ok = simd_from_utf8(&mmap[..head_size]).is_ok();
83 let tail_ok = tail_start >= mmap.len() || simd_from_utf8(&mmap[tail_start..]).is_ok();
84 let encoding = if head_ok && tail_ok {
85 FileEncodingHint::Utf8
86 } else {
87 FileEncodingHint::Gb18030
88 };
89
90 Ok(Self { mmap, encoding })
91 }
92
93 pub fn iter(&self) -> LogIterator<'_> {
94 LogIterator {
95 data: &self.mmap,
96 pos: 0,
97 encoding: self.encoding,
98 }
99 }
100
101 pub fn index(&self) -> RecordIndex {
104 let data: &[u8] = &self.mmap;
105 let mut offsets: Vec<usize> = Vec::new();
106
107 if data.len() >= 23 && is_timestamp_start(&data[0..23]) {
110 offsets.push(0);
111 }
112
113 let mut pos: usize = 0;
114 loop {
115 let next = find_next_record_start(data, pos);
116 if next >= data.len() {
117 break;
118 }
119 if offsets.last() != Some(&next) {
121 offsets.push(next);
122 }
123 pos = next.saturating_add(1);
126 }
127 RecordIndex { offsets }
128 }
129
130 pub fn par_iter(
138 &self,
139 ) -> impl rayon::iter::ParallelIterator<Item = Result<Sqllog<'_>, ParseError>> + '_ {
140 use rayon::prelude::*;
141
142 const PAR_THRESHOLD: usize = 32 * 1024 * 1024;
143
144 let data: &[u8] = &self.mmap;
145 let encoding = self.encoding;
146
147 let bounds: Vec<(usize, usize)> = if data.is_empty() {
148 Vec::new()
149 } else if data.len() < PAR_THRESHOLD {
150 vec![(0, data.len())]
151 } else {
152 let num_threads = rayon::current_num_threads().max(1);
153 let chunk_size = (data.len() / num_threads).max(1);
154 let mut starts: Vec<usize> = vec![0];
155 for i in 1..num_threads {
156 let boundary = find_next_record_start(data, i * chunk_size);
157 if boundary < data.len() {
158 starts.push(boundary);
159 }
160 }
161 starts.push(data.len());
162 starts.dedup();
163 starts.windows(2).map(|w| (w[0], w[1])).collect()
164 };
165
166 bounds
167 .into_par_iter()
168 .flat_map_iter(move |(start, end)| LogIterator {
169 data: &data[start..end],
170 pos: 0,
171 encoding,
172 })
173 }
174}
175
176pub struct LogIterator<'a> {
177 data: &'a [u8],
178 pos: usize,
179 encoding: FileEncodingHint,
180}
181
182impl<'a> Iterator for LogIterator<'a> {
183 type Item = Result<Sqllog<'a>, ParseError>;
184
185 fn next(&mut self) -> Option<Self::Item> {
186 loop {
187 if self.pos >= self.data.len() {
188 return None;
189 }
190
191 let data = &self.data[self.pos..];
192
193 let (record_end, next_start, is_multiline) = match memchr(b'\n', data) {
196 None => (data.len(), data.len(), false),
197 Some(first_nl) => {
198 let ts_start = first_nl + 1;
199 if ts_start + 23 <= data.len()
200 && is_timestamp_start(&data[ts_start..ts_start + 23])
201 {
202 (first_nl, ts_start, false)
204 } else {
205 let mut found_boundary: Option<usize> = None;
208 for candidate in FINDER_RECORD_START.find_iter(&data[ts_start..]) {
209 let abs_ts = ts_start + candidate + 1;
210 if abs_ts + 23 <= data.len()
211 && is_timestamp_start(&data[abs_ts..abs_ts + 23])
212 {
213 found_boundary = Some(ts_start + candidate);
214 break;
215 }
216 }
217 match found_boundary {
218 Some(idx) => (idx, idx + 1, true),
219 None => (data.len(), data.len(), true),
220 }
221 }
222 }
223 };
224
225 let record_slice = &data[..record_end];
226 self.pos += next_start;
227
228 let record_slice = if record_slice.ends_with(b"\r") {
230 &record_slice[..record_slice.len() - 1]
231 } else {
232 record_slice
233 };
234
235 if record_slice.is_empty() {
238 continue;
239 }
240
241 return Some(parse_record_with_hint(
242 record_slice,
243 is_multiline,
244 self.encoding,
245 ));
246 }
247 }
248}
249
250fn find_next_record_start(data: &[u8], from: usize) -> usize {
253 let mut pos = from;
254 if let Some(nl) = memchr(b'\n', &data[pos..]) {
256 pos += nl + 1;
257 } else {
258 return data.len();
259 }
260 if pos + 23 <= data.len() && is_timestamp_start(&data[pos..pos + 23]) {
262 return pos;
263 }
264
265 for candidate in FINDER_RECORD_START.find_iter(&data[pos..]) {
267 let ts_start = pos + candidate + 1;
268 if ts_start + 23 <= data.len() && is_timestamp_start(&data[ts_start..ts_start + 23]) {
269 return ts_start;
270 }
271 }
272 data.len()
273}
274
275pub fn parse_record<'a>(record_bytes: &'a [u8]) -> Result<Sqllog<'a>, ParseError> {
276 let is_multiline = memchr(b'\n', record_bytes).is_some();
280 parse_record_with_hint(record_bytes, is_multiline, FileEncodingHint::Auto)
281}
282
283fn parse_record_with_hint<'a>(
284 record_bytes: &'a [u8],
285 is_multiline: bool,
286 encoding_hint: FileEncodingHint,
287) -> Result<Sqllog<'a>, ParseError> {
288 let (first_line, _rest) = if is_multiline {
290 match memchr(b'\n', record_bytes) {
291 Some(idx) => {
292 let mut line = &record_bytes[..idx];
293 if line.ends_with(b"\r") {
294 line = &line[..line.len() - 1];
295 }
296 (line, &record_bytes[idx + 1..])
297 }
298 None => {
299 let mut line = record_bytes;
300 if line.ends_with(b"\r") {
301 line = &line[..line.len() - 1];
302 }
303 (line, &[] as &[u8])
304 }
305 }
306 } else {
307 let mut line = record_bytes;
308 if line.ends_with(b"\r") {
309 line = &line[..line.len() - 1];
310 }
311 (line, &[] as &[u8])
312 };
313
314 if first_line.len() < 23 {
316 return Err(make_invalid_format_error(first_line));
317 }
318 let ts = unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(&first_line[0..23])) };
322
323 let meta_start = match memchr(b'(', &first_line[23..]) {
327 Some(idx) => 23 + idx,
328 None => {
329 return Err(make_invalid_format_error(first_line));
330 }
331 };
332
333 let meta_end = match FINDER_CLOSE_META.find(&first_line[meta_start..]) {
335 Some(idx) => Some(meta_start + idx),
336 None => memrchr(b')', &first_line[meta_start..]).map(|idx| meta_start + idx),
337 };
338
339 let meta_end = match meta_end {
340 Some(idx) => idx,
341 None => {
342 return Err(make_invalid_format_error(first_line));
343 }
344 };
345
346 let meta_bytes = &first_line[meta_start + 1..meta_end];
347 let meta_raw = match encoding_hint {
353 FileEncodingHint::Utf8 => {
354 unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(meta_bytes)) }
358 }
359 FileEncodingHint::Gb18030 => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
360 Ok(s) => Cow::Owned(s),
361 Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
362 },
363 FileEncodingHint::Auto => match simd_from_utf8(meta_bytes) {
364 Ok(_) => {
365 unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(meta_bytes)) }
368 }
369 Err(_) => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
370 Ok(s) => Cow::Owned(s),
371 Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
372 },
373 },
374 };
375
376 let body_start_in_first_line = meta_end + 1;
378
379 let content_start = if body_start_in_first_line < first_line.len()
381 && first_line[body_start_in_first_line] == b' '
382 {
383 body_start_in_first_line + 1
384 } else {
385 body_start_in_first_line
386 };
387
388 let mut tag: Option<Cow<'a, str>> = None;
390 let content_slice = if content_start < record_bytes.len() {
391 let mut s = &record_bytes[content_start..];
392 if !s.is_empty()
394 && s[0] == b'['
395 && let Some(end_idx) = memchr(b']', s)
396 && end_idx >= 1
397 {
398 let inner = &s[1..end_idx];
399 if !inner.contains(&b' ') && inner.len() <= 32 {
401 tag = match encoding_hint {
402 FileEncodingHint::Utf8 => {
403 Some(unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(inner)) })
407 }
408 _ => match simd_from_utf8(inner) {
409 Ok(_) => Some(unsafe {
410 Cow::Borrowed(std::str::from_utf8_unchecked(inner))
413 }),
414 Err(_) => match encoding_hint {
415 FileEncodingHint::Gb18030 => {
416 match GB18030.decode(inner, DecoderTrap::Strict) {
417 Ok(s) => Some(Cow::Owned(s)),
418 Err(_) => Some(Cow::Owned(
419 String::from_utf8_lossy(inner).into_owned(),
420 )),
421 }
422 }
423 _ => Some(Cow::Owned(String::from_utf8_lossy(inner).into_owned())),
424 },
425 },
426 };
427 s = &s[end_idx + 1..];
429 let mut skip = 0usize;
430 while skip < s.len() && s[skip].is_ascii_whitespace() {
431 skip += 1;
432 }
433 s = &s[skip..];
434 }
435 }
436 s
437 } else {
438 &[] as &[u8]
439 };
440
441 let content_raw = Cow::Borrowed(content_slice);
442
443 Ok(Sqllog {
444 ts,
445 meta_raw,
446 content_raw,
447 tag,
448 encoding: encoding_hint,
449 })
450}
451
452const LO_MASK: u64 = 0xFF0000FF0000FFFF; const LO_EXPECTED: u64 = 0x2D00002D00003032; const HI_MASK: u64 = 0x0000FF0000FF0000; const HI_EXPECTED: u64 = 0x00003A0000200000; #[inline(always)]
462fn is_timestamp_start(bytes: &[u8]) -> bool {
463 debug_assert!(bytes.len() >= 23);
464 let lo = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
465 let hi = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
466 (lo & LO_MASK == LO_EXPECTED)
468 && (hi & HI_MASK == HI_EXPECTED)
469 && bytes[16] == b':'
470 && bytes[19] == b'.'
471}
472
473#[cold]
475fn make_invalid_format_error(raw_bytes: &[u8]) -> ParseError {
476 ParseError::InvalidFormat {
477 raw: String::from_utf8_lossy(raw_bytes).to_string(),
478 }
479}