dm_database_parser_sqllog/
parser.rs1use memchr::memmem::Finder;
2use memchr::{memchr, memrchr};
3use memmap2::Mmap;
4use simdutf8::basic::from_utf8 as simd_from_utf8;
5use std::borrow::Cow;
6use std::fs::File;
7use std::path::Path;
8use std::sync::LazyLock;
9
10use crate::error::ParseError;
11use crate::sqllog::Sqllog;
12use encoding::all::GB18030;
13use encoding::{DecoderTrap, Encoding};
14
15static FINDER_CLOSE_META: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b") "));
18
19#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
20pub(crate) enum FileEncodingHint {
21 #[default]
22 Auto,
23 Utf8,
24 Gb18030,
25}
26
27pub struct LogParser {
28 mmap: Mmap,
29 encoding: FileEncodingHint,
30}
31
32impl LogParser {
33 pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, ParseError> {
34 let file = File::open(path).map_err(|e| ParseError::IoError(e.to_string()))?;
35 let mmap = unsafe { Mmap::map(&file).map_err(|e| ParseError::IoError(e.to_string()))? };
36
37 let sample = &mmap[..mmap.len().min(65536)];
39 let encoding = if simd_from_utf8(sample).is_ok() {
40 FileEncodingHint::Utf8
41 } else {
42 FileEncodingHint::Gb18030
43 };
44
45 Ok(Self { mmap, encoding })
46 }
47
48 pub fn iter(&self) -> LogIterator<'_> {
49 LogIterator {
50 data: &self.mmap,
51 pos: 0,
52 encoding: self.encoding,
53 }
54 }
55
56 pub fn par_iter(
61 &self,
62 ) -> impl rayon::iter::ParallelIterator<Item = Result<Sqllog<'_>, ParseError>> + '_ {
63 use rayon::prelude::*;
64
65 let data: &[u8] = &self.mmap;
66 let encoding = self.encoding;
67 let num_threads = rayon::current_num_threads().max(1);
68
69 let mut starts: Vec<usize> = vec![0];
71 if !data.is_empty() {
72 let chunk_size = (data.len() / num_threads).max(1);
73 for i in 1..num_threads {
74 let boundary = find_next_record_start(data, i * chunk_size);
75 if boundary < data.len() {
76 starts.push(boundary);
77 }
78 }
79 }
80 starts.push(data.len());
81 starts.dedup();
82
83 let bounds: Vec<(usize, usize)> = starts.windows(2).map(|w| (w[0], w[1])).collect();
85
86 bounds
87 .into_par_iter()
88 .flat_map_iter(move |(start, end)| LogIterator {
89 data: &data[start..end],
90 pos: 0,
91 encoding,
92 })
93 }
94}
95
96pub struct LogIterator<'a> {
97 data: &'a [u8],
98 pos: usize,
99 encoding: FileEncodingHint,
100}
101
102impl<'a> Iterator for LogIterator<'a> {
103 type Item = Result<Sqllog<'a>, ParseError>;
104
105 fn next(&mut self) -> Option<Self::Item> {
106 if self.pos >= self.data.len() {
107 return None;
108 }
109
110 let data = &self.data[self.pos..];
111 let mut scan_pos = 0;
112 let mut found_next = None;
113 let mut is_multiline = false;
114
115 while let Some(idx) = memchr(b'\n', &data[scan_pos..]) {
116 let newline_idx = scan_pos + idx;
117 let next_line_start = newline_idx + 1;
118
119 if next_line_start >= data.len() {
120 break;
121 }
122
123 let check_len = std::cmp::min(23, data.len() - next_line_start);
125 if check_len == 23 {
126 let next_bytes = &data[next_line_start..next_line_start + 23];
127 if next_bytes[0] == b'2'
129 && next_bytes[1] == b'0'
130 && next_bytes[4] == b'-'
131 && next_bytes[7] == b'-'
132 && next_bytes[10] == b' '
133 && next_bytes[13] == b':'
134 && next_bytes[16] == b':'
135 && next_bytes[19] == b'.'
136 {
137 found_next = Some(newline_idx);
138 break;
139 }
140 }
141
142 is_multiline = true;
143 scan_pos = next_line_start;
144 }
145
146 let (record_end, next_start) = if let Some(idx) = found_next {
147 (idx, idx + 1)
148 } else {
149 (data.len(), data.len())
150 };
151
152 let record_slice = &data[..record_end];
153 self.pos += next_start;
154
155 let record_slice = if record_slice.ends_with(b"\r") {
157 &record_slice[..record_slice.len() - 1]
158 } else {
159 record_slice
160 };
161
162 if record_slice.is_empty() {
163 return self.next();
164 }
165
166 Some(parse_record_with_hint(
167 record_slice,
168 is_multiline,
169 self.encoding,
170 ))
171 }
172}
173
174fn find_next_record_start(data: &[u8], from: usize) -> usize {
177 let mut pos = from;
178 if let Some(nl) = memchr(b'\n', &data[pos..]) {
180 pos += nl + 1;
181 } else {
182 return data.len();
183 }
184 loop {
186 if pos + 23 > data.len() {
187 return data.len();
188 }
189 let peek = &data[pos..pos + 23];
190 if peek[0] == b'2'
191 && peek[1] == b'0'
192 && peek[4] == b'-'
193 && peek[7] == b'-'
194 && peek[10] == b' '
195 && peek[13] == b':'
196 && peek[16] == b':'
197 && peek[19] == b'.'
198 {
199 return pos;
200 }
201 match memchr(b'\n', &data[pos..]) {
203 Some(nl) => pos += nl + 1,
204 None => return data.len(),
205 }
206 }
207}
208
209pub fn parse_record<'a>(record_bytes: &'a [u8]) -> Result<Sqllog<'a>, ParseError> {
210 parse_record_with_hint(record_bytes, true, FileEncodingHint::Auto)
211}
212
213fn parse_record_with_hint<'a>(
214 record_bytes: &'a [u8],
215 is_multiline: bool,
216 encoding_hint: FileEncodingHint,
217) -> Result<Sqllog<'a>, ParseError> {
218 let (first_line, _rest) = if is_multiline {
220 match memchr(b'\n', record_bytes) {
221 Some(idx) => {
222 let mut line = &record_bytes[..idx];
223 if line.ends_with(b"\r") {
224 line = &line[..line.len() - 1];
225 }
226 (line, &record_bytes[idx + 1..])
227 }
228 None => {
229 let mut line = record_bytes;
230 if line.ends_with(b"\r") {
231 line = &line[..line.len() - 1];
232 }
233 (line, &[] as &[u8])
234 }
235 }
236 } else {
237 let mut line = record_bytes;
238 if line.ends_with(b"\r") {
239 line = &line[..line.len() - 1];
240 }
241 (line, &[] as &[u8])
242 };
243
244 if first_line.len() < 23 {
246 return Err(ParseError::InvalidFormat {
247 raw: String::from_utf8_lossy(first_line).to_string(),
248 });
249 }
250 let ts = unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(&first_line[0..23])) };
254
255 let meta_start = match memchr(b'(', &first_line[23..]) {
259 Some(idx) => 23 + idx,
260 None => {
261 return Err(ParseError::InvalidFormat {
262 raw: String::from_utf8_lossy(first_line).to_string(),
263 });
264 }
265 };
266
267 let meta_end = match FINDER_CLOSE_META.find(&first_line[meta_start..]) {
269 Some(idx) => Some(meta_start + idx),
270 None => memrchr(b')', &first_line[meta_start..]).map(|idx| meta_start + idx),
271 };
272
273 let meta_end = match meta_end {
274 Some(idx) => idx,
275 None => {
276 return Err(ParseError::InvalidFormat {
277 raw: String::from_utf8_lossy(first_line).to_string(),
278 });
279 }
280 };
281
282 let meta_bytes = &first_line[meta_start + 1..meta_end];
283 let meta_raw = match encoding_hint {
287 FileEncodingHint::Utf8 => match simd_from_utf8(meta_bytes) {
288 Ok(s) => {
289 unsafe {
291 Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
292 s.as_ptr(),
293 s.len(),
294 )))
295 }
296 }
297 Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
298 },
299 FileEncodingHint::Gb18030 => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
300 Ok(s) => Cow::Owned(s),
301 Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
302 },
303 FileEncodingHint::Auto => match simd_from_utf8(meta_bytes) {
304 Ok(s) => {
305 unsafe {
307 Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
308 s.as_ptr(),
309 s.len(),
310 )))
311 }
312 }
313 Err(_) => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
314 Ok(s) => Cow::Owned(s),
315 Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
316 },
317 },
318 };
319
320 let body_start_in_first_line = meta_end + 1;
322
323 let content_start = if body_start_in_first_line < first_line.len()
325 && first_line[body_start_in_first_line] == b' '
326 {
327 body_start_in_first_line + 1
328 } else {
329 body_start_in_first_line
330 };
331
332 let mut tag: Option<Cow<'a, str>> = None;
334 let content_slice = if content_start < record_bytes.len() {
335 let mut s = &record_bytes[content_start..];
336 if !s.is_empty()
338 && s[0] == b'['
339 && let Some(end_idx) = memchr(b']', s)
340 && end_idx >= 1
341 {
342 let inner = &s[1..end_idx];
343 if !inner.contains(&b' ') && inner.len() <= 32 {
345 tag = match simd_from_utf8(inner) {
346 Ok(st) => Some(unsafe {
347 Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
349 st.as_ptr(),
350 st.len(),
351 )))
352 }),
353 Err(_) => match encoding_hint {
354 FileEncodingHint::Gb18030 => {
355 match GB18030.decode(inner, DecoderTrap::Strict) {
356 Ok(s) => Some(Cow::Owned(s)),
357 Err(_) => {
358 Some(Cow::Owned(String::from_utf8_lossy(inner).into_owned()))
359 }
360 }
361 }
362 _ => Some(Cow::Owned(String::from_utf8_lossy(inner).into_owned())),
363 },
364 };
365 s = &s[end_idx + 1..];
367 let mut skip = 0usize;
368 while skip < s.len() && s[skip].is_ascii_whitespace() {
369 skip += 1;
370 }
371 s = &s[skip..];
372 }
373 }
374 s
375 } else {
376 &[] as &[u8]
377 };
378
379 let content_raw = Cow::Borrowed(content_slice);
380
381 Ok(Sqllog {
382 ts,
383 meta_raw,
384 content_raw,
385 tag,
386 encoding: encoding_hint,
387 })
388}