dm_database_parser_sqllog/
parser.rs1#[derive(Debug, Clone, PartialEq, Eq)]
2pub struct ParsedRecord<'a> {
3 pub ts: &'a str,
4 pub meta_raw: &'a str,
5 pub ep: Option<&'a str>,
6 pub sess: Option<&'a str>,
7 pub thrd: Option<&'a str>,
8 pub user: Option<&'a str>,
9 pub trxid: Option<&'a str>,
10 pub stmt: Option<&'a str>,
11 pub appname: Option<&'a str>,
12 pub ip: Option<&'a str>,
13 pub body: &'a str,
14 pub execute_time_ms: Option<u64>,
15 pub row_count: Option<u64>,
16 pub execute_id: Option<u64>,
17}
18
19pub struct RecordSplitter<'a> {
57 text: &'a str,
58 bytes: &'a [u8],
59 n: usize,
60 scan_pos: usize,
62 next_start: Option<usize>,
64 finished: bool,
66 first_start: Option<usize>,
68}
69
70impl<'a> RecordSplitter<'a> {
71 pub fn new(text: &'a str) -> Self {
72 let bytes = text.as_bytes();
73 let n = text.len();
74 let mut first_start = None;
75 if n >= 23 {
76 let limit = n.saturating_sub(23);
80 let mut pos = 0usize;
81 while pos <= limit {
82 if (pos == 0 || bytes[pos - 1] == b'\n')
83 && crate::tools::is_ts_millis_bytes(&bytes[pos..pos + 23])
84 {
85 first_start = Some(pos);
86 break;
87 }
88 pos += 1;
89 }
90 }
91 let scan_pos = first_start.unwrap_or(0).saturating_add(1);
93 RecordSplitter {
94 text,
95 bytes,
96 n,
97 scan_pos,
98 next_start: first_start,
99 finished: false,
100 first_start,
101 }
102 }
103
104 pub fn leading_errors_slice(&self) -> Option<&'a str> {
110 self.first_start.map(|s| &self.text[..s])
111 }
112}
113
114impl<'a> Iterator for RecordSplitter<'a> {
115 type Item = &'a str;
116
117 fn next(&mut self) -> Option<Self::Item> {
118 if self.finished {
119 return None;
120 }
121 let start = match self.next_start {
122 Some(s) => s,
123 None => {
124 self.finished = true;
125 return None;
126 }
127 };
128
129 if self.scan_pos > self.n {
134 self.finished = true;
136 return Some(&self.text[start..self.n]);
137 }
138 let limit = self.n.saturating_sub(23);
139 let mut pos = self.scan_pos;
140 while pos <= limit {
141 if (pos == 0 || self.bytes[pos - 1] == b'\n')
142 && crate::tools::is_ts_millis_bytes(&self.bytes[pos..pos + 23])
143 {
144 let end = pos;
146 self.next_start = Some(pos);
148 self.scan_pos = pos + 1;
149 return Some(&self.text[start..end]);
150 }
151 pos += 1;
152 }
153
154 self.finished = true;
156 Some(&self.text[start..self.n])
157 }
158}
159
160pub fn split_by_ts_records_with_errors<'a>(text: &'a str) -> (Vec<&'a str>, Vec<&'a str>) {
163 let mut records: Vec<&'a str> = Vec::new();
164 let mut errors: Vec<&'a str> = Vec::new();
165
166 let splitter = RecordSplitter::new(text);
167 if let Some(prefix) = splitter.leading_errors_slice() {
168 for line in prefix.lines() {
169 errors.push(line);
170 }
171 }
172 for rec in splitter {
173 records.push(rec);
174 }
175 (records, errors)
176}
177
178pub fn split_into<'a>(text: &'a str, records: &mut Vec<&'a str>, errors: &mut Vec<&'a str>) {
183 records.clear();
184 errors.clear();
185
186 let splitter = RecordSplitter::new(text);
187 if let Some(prefix) = splitter.leading_errors_slice() {
188 for line in prefix.lines() {
189 errors.push(line);
190 }
191 }
192 for rec in splitter {
193 records.push(rec);
194 }
195}
196
197pub fn for_each_record<F>(text: &str, mut f: F)
200where
201 F: FnMut(&str),
202{
203 let splitter = RecordSplitter::new(text);
204 if let Some(_prefix) = splitter.leading_errors_slice() {
206 }
208 for rec in splitter {
209 f(rec);
210 }
211}
212
213pub fn parse_records_with<F>(text: &str, mut f: F)
215where
216 F: for<'r> FnMut(ParsedRecord<'r>),
217{
218 for_each_record(text, |rec| {
219 let parsed = parse_record(rec);
220 f(parsed);
221 });
222}
223
224pub fn parse_into<'a>(text: &'a str, out: &mut Vec<ParsedRecord<'a>>) {
226 out.clear();
227 let splitter = RecordSplitter::new(text);
228 for rec in splitter {
229 out.push(parse_record(rec));
230 }
231}
232
233pub fn parse_all(text: &str) -> Vec<ParsedRecord<'_>> {
235 let splitter = RecordSplitter::new(text);
236 splitter.map(|r| parse_record(r)).collect()
237}
238
239fn parse_digits_forward(s: &str, mut i: usize) -> Option<(u64, usize)> {
240 let bytes = s.as_bytes();
241 let n = bytes.len();
242 while i < n && !bytes[i].is_ascii_digit() {
244 i += 1;
245 }
246 if i >= n || !bytes[i].is_ascii_digit() {
247 return None;
248 }
249 let mut val: u64 = 0;
250 while i < n && bytes[i].is_ascii_digit() {
251 val = val
252 .saturating_mul(10)
253 .saturating_add((bytes[i] - b'0') as u64);
254 i += 1;
255 }
256 Some((val, i))
257}
258
259fn split_ts_meta_body<'a>(rec: &'a str) -> (&'a str, &'a str, &'a str) {
261 let ts: &'a str = if rec.len() >= 23 { &rec[..23] } else { "" };
262 let after_ts: &'a str = if rec.len() > 23 { &rec[23..] } else { "" };
263 let mut meta_raw: &'a str = "";
264 let mut body: &'a str = "";
265
266 if let Some(open_idx) = after_ts.find('(') {
267 if let Some(close_rel) = after_ts[open_idx..].find(')') {
268 meta_raw = &after_ts[open_idx + 1..open_idx + close_rel];
269 let body_start = 23 + open_idx + close_rel + 1;
270 if body_start < rec.len() {
271 body = rec[body_start..].trim_start();
272 }
273 } else {
274 body = after_ts;
276 }
277 } else {
278 body = after_ts;
280 }
281
282 (ts, meta_raw, body)
283}
284
285#[derive(Debug)]
287struct MetaParts<'a> {
288 ep: Option<&'a str>,
289 sess: Option<&'a str>,
290 thrd: Option<&'a str>,
291 user: Option<&'a str>,
292 trxid: Option<&'a str>,
293 stmt: Option<&'a str>,
294 appname: Option<&'a str>,
295 ip: Option<&'a str>,
296}
297
298fn parse_meta(meta_raw: &str) -> MetaParts<'_> {
299 let mut parts = MetaParts {
300 ep: None,
301 sess: None,
302 thrd: None,
303 user: None,
304 trxid: None,
305 stmt: None,
306 appname: None,
307 ip: None,
308 };
309
310 let mut iter = meta_raw.split_whitespace().peekable();
311 while let Some(tok) = iter.next() {
312 if tok.starts_with("EP[") {
313 parts.ep = Some(tok);
314 } else if let Some(val) = tok.strip_prefix("sess:") {
315 parts.sess = Some(val);
316 } else if let Some(val) = tok.strip_prefix("thrd:") {
317 parts.thrd = Some(val);
318 } else if let Some(val) = tok.strip_prefix("user:") {
319 parts.user = Some(val);
320 } else if let Some(val) = tok.strip_prefix("trxid:") {
321 parts.trxid = Some(val);
322 } else if let Some(val) = tok.strip_prefix("stmt:") {
323 parts.stmt = Some(val);
324 } else if tok == "appname:" {
325 if let Some(next) = iter.peek() {
326 if (*next).starts_with("ip:::") {
327 let nexttok = iter.next().unwrap();
328 let ippart = nexttok.trim_start_matches("ip:::");
329 let ipclean = ippart.trim_start_matches("ffff:");
330 parts.ip = Some(ipclean);
331 parts.appname = Some("");
332 } else {
333 let val = iter.next().unwrap();
334 parts.appname = Some(val);
335 }
336 } else {
337 parts.appname = Some("");
338 }
339 } else if let Some(val) = tok.strip_prefix("appname:") {
340 if val.starts_with("ip:::") {
341 let ippart = val.trim_start_matches("ip:::");
342 let ipclean = ippart.trim_start_matches("ffff:");
343 parts.ip = Some(ipclean);
344 parts.appname = Some("");
345 } else {
346 parts.appname = Some(val);
347 }
348 }
349 }
350
351 parts
352}
353
354fn parse_body_metrics(body: &str) -> (Option<u64>, Option<u64>, Option<u64>) {
356 let mut execute_id: Option<u64> = None;
357 let mut row_count: Option<u64> = None;
358 let mut execute_time_ms: Option<u64> = None;
359
360 let body_str = body;
361 let mut search_end = body_str.len();
362
363 if let Some(pos) = body_str[..search_end].rfind("EXEC_ID:") {
364 let start = pos + "EXEC_ID:".len();
365 if let Some((v, _)) = parse_digits_forward(body_str, start) {
366 execute_id = Some(v);
367 }
368 search_end = pos;
369 }
370
371 if let Some(pos) = body_str[..search_end].rfind("ROWCOUNT:") {
372 let start = pos + "ROWCOUNT:".len();
373 if let Some((v, _)) = parse_digits_forward(body_str, start) {
374 row_count = Some(v);
375 }
376 search_end = pos;
377 }
378
379 if let Some(pos) = body_str[..search_end].rfind("EXECTIME:") {
380 let start = pos + "EXECTIME:".len();
381 if let Some((v, _)) = parse_digits_forward(body_str, start) {
382 execute_time_ms = Some(v);
383 }
384 }
385
386 (execute_time_ms, row_count, execute_id)
387}
388
389pub fn parse_record(rec: &'_ str) -> ParsedRecord<'_> {
392 let (ts, meta_raw, body) = split_ts_meta_body(rec);
394
395 let meta = parse_meta(meta_raw);
397
398 let (execute_time_ms, row_count, execute_id) = parse_body_metrics(body);
400
401 ParsedRecord {
402 ts,
403 meta_raw,
404 ep: meta.ep,
405 sess: meta.sess,
406 thrd: meta.thrd,
407 user: meta.user,
408 trxid: meta.trxid,
409 stmt: meta.stmt,
410 appname: meta.appname,
411 ip: meta.ip,
412 body,
413 execute_time_ms,
414 row_count,
415 execute_id,
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 #[test]
424 fn test_split_by_ts_records() {
425 let log_text = "2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT * FROM users
4262023-10-05 14:24:00.456 (EP[12346] sess:2 thrd:2 user:guest trxid:0 stmt:2 appname:MyApp)\nINSERT INTO orders VALUES (1, 'item');\n";
427 let (records, errors) = split_by_ts_records_with_errors(log_text);
428
429 assert_eq!(records.len(), 2);
430 assert_eq!(errors.len(), 0);
431 }
432
433 #[test]
434 fn test_split_with_leading_errors() {
435 let log_text = "garbage line 1\ngarbage line 2\n2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
436 let (records, errors) = split_by_ts_records_with_errors(log_text);
437
438 assert_eq!(records.len(), 1);
439 assert_eq!(errors.len(), 2);
440 assert!(records[0].contains("SELECT 1"));
441 }
442
443 #[test]
444 fn test_record_splitter_iterator() {
445 let log_text =
446 "garbage\n2023-10-05 14:23:45.123 (EP[1]) foo\n2023-10-05 14:23:46.456 (EP[2]) bar\n";
447 let it = RecordSplitter::new(log_text);
448 assert_eq!(it.leading_errors_slice().unwrap().trim(), "garbage");
449 let v: Vec<&str> = it.collect();
450 assert_eq!(v.len(), 2);
451 }
452
453 #[test]
454 fn test_parse_simple_log_sample() {
455 let log_text = "2025-08-12 10:57:09.562 (EP[0] sess:0x7fb24f392a30 thrd:757794 user:HBTCOMS_V3_PROD trxid:688489653 stmt:0x7fb236077b70 appname: ip:::ffff:10.3.100.68) EXECTIME: 0ms ROWCOUNT: 1 EXEC_ID: 289655185\n2025-08-12 10:57:09.562 (EP[0] sess:0x7fb24f392a30 thrd:757794 user:HBTCOMS_V3_PROD trxid:0 stmt:NULL appname:) TRX: START\n";
456
457 let (records, errors) = split_by_ts_records_with_errors(log_text);
458 assert_eq!(errors.len(), 0);
459 assert_eq!(records.len(), 2);
460
461 let r0 = parse_record(records[0]);
462 assert_eq!(r0.execute_time_ms, Some(0));
463 assert_eq!(r0.row_count, Some(1));
464 assert_eq!(r0.execute_id, Some(289655185));
465 assert_eq!(r0.ip, Some("10.3.100.68"));
466 assert_eq!(r0.appname, Some(""));
467
468 let r1 = parse_record(records[1]);
469 assert!(r1.body.contains("TRX: START"));
470 }
471}
472