1#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct ParsedRecord<'a> {
34 pub ts: &'a str,
35 pub meta_raw: &'a str,
36 pub ep: &'a str,
37 pub sess: &'a str,
38 pub thrd: &'a str,
39 pub user: &'a str,
40 pub trxid: &'a str,
41 pub stmt: &'a str,
42 pub appname: &'a str,
43 pub ip: Option<&'a str>,
44 pub body: &'a str,
45 pub execute_time_ms: Option<u64>,
46 pub row_count: Option<u64>,
47 pub execute_id: Option<u64>,
48}
49
50pub struct RecordSplitter<'a> {
80 text: &'a str,
81 bytes: &'a [u8],
82 n: usize,
83 scan_pos: usize,
85 next_start: Option<usize>,
87 finished: bool,
89 first_start: Option<usize>,
91}
92
93impl<'a> RecordSplitter<'a> {
110 pub fn new(text: &'a str) -> Self {
111 let bytes = text.as_bytes();
112 let n = text.len();
113 let first_start = Self::find_first_record_start(bytes, n);
114
115 let scan_pos = first_start.unwrap_or(0).saturating_add(1);
116 RecordSplitter {
117 text,
118 bytes,
119 n,
120 scan_pos,
121 next_start: first_start,
122 finished: false,
123 first_start,
124 }
125 }
126
127 fn find_first_record_start(bytes: &[u8], n: usize) -> Option<usize> {
129 const TS_LEN: usize = 23;
130 const FIELDS: [&[u8]; 7] = [
131 b"EP[",
132 b"sess:",
133 b"thrd:",
134 b"user:",
135 b"trxid:",
136 b"stmt:",
137 b"appname:",
138 ];
139
140 if n < TS_LEN {
141 return None;
142 }
143
144 let limit = n.saturating_sub(TS_LEN);
145 (0..=limit).find(|&pos| {
146 Self::is_line_start_with_timestamp(bytes, n, pos, TS_LEN)
147 && Self::validate_meta_fields(bytes, n, pos + TS_LEN, &FIELDS)
148 })
149 }
150
151 fn is_line_start_with_timestamp(bytes: &[u8], n: usize, pos: usize, ts_len: usize) -> bool {
153 if pos + ts_len > n {
154 return false;
155 }
156 let is_line_start = pos == 0 || bytes[pos - 1] == b'\n';
157 let has_valid_timestamp = crate::tools::is_ts_millis_bytes(&bytes[pos..pos + ts_len]);
158 is_line_start && has_valid_timestamp
159 }
160
161 fn validate_meta_fields(bytes: &[u8], n: usize, mut pos: usize, fields: &[&[u8]]) -> bool {
163 if pos >= n || bytes[pos] != b' ' {
165 return false;
166 }
167 pos += 1; if pos < n && bytes[pos] == b'(' {
171 pos += 1;
172 }
173
174 for &pat in fields {
176 let pat_len = pat.len();
178 if pos + pat_len > n || &bytes[pos..pos + pat_len] != pat {
179 return false;
180 }
181 pos += pat_len;
182
183 if pat == b"EP[" {
185 while pos < n && bytes[pos] != b']' {
187 pos += 1;
188 }
189 if pos >= n || bytes[pos] != b']' {
190 return false;
191 }
192 pos += 1; } else {
194 if pos >= n {
196 return false;
197 }
198 while pos < n && bytes[pos] != b' ' {
199 pos += 1;
200 }
201 }
202 while pos < n && bytes[pos] == b' ' {
204 pos += 1;
205 }
206 }
207
208 true
210 }
211
212 pub fn leading_errors_slice(&self) -> Option<&'a str> {
218 self.first_start.map(|s| &self.text[..s])
219 }
220}
221
222impl<'a> Iterator for RecordSplitter<'a> {
223 type Item = &'a str;
224
225 fn next(&mut self) -> Option<Self::Item> {
226 if self.finished {
227 return None;
228 }
229 let start = match self.next_start {
230 Some(s) => s,
231 None => {
232 self.finished = true;
233 return None;
234 }
235 };
236
237 if self.scan_pos > self.n {
242 self.finished = true;
244 return Some(&self.text[start..self.n]);
245 }
246 let limit = self.n.saturating_sub(23);
247 let mut pos = self.scan_pos;
248 while pos <= limit {
249 if (pos == 0 || self.bytes[pos - 1] == b'\n')
250 && crate::tools::is_ts_millis_bytes(&self.bytes[pos..pos + 23])
251 {
252 let end = pos;
254 self.next_start = Some(pos);
256 self.scan_pos = pos + 1;
257 return Some(&self.text[start..end]);
258 }
259 pos += 1;
260 }
261
262 self.finished = true;
264 Some(&self.text[start..self.n])
265 }
266}
267
268pub fn split_by_ts_records_with_errors<'a>(text: &'a str) -> (Vec<&'a str>, Vec<&'a str>) {
298 let mut records: Vec<&'a str> = Vec::new();
299 let mut errors: Vec<&'a str> = Vec::new();
300
301 let splitter = RecordSplitter::new(text);
302 if let Some(prefix) = splitter.leading_errors_slice() {
303 for line in prefix.lines() {
304 errors.push(line);
305 }
306 }
307 for rec in splitter {
308 records.push(rec);
309 }
310 (records, errors)
311}
312
313pub fn split_into<'a>(text: &'a str, records: &mut Vec<&'a str>, errors: &mut Vec<&'a str>) {
337 records.clear();
338 errors.clear();
339
340 let splitter = RecordSplitter::new(text);
341 if let Some(prefix) = splitter.leading_errors_slice() {
342 for line in prefix.lines() {
343 errors.push(line);
344 }
345 }
346 for rec in splitter {
347 records.push(rec);
348 }
349}
350
351pub fn for_each_record<F>(text: &str, mut f: F)
372where
373 F: FnMut(&str),
374{
375 let splitter = RecordSplitter::new(text);
376 if let Some(_prefix) = splitter.leading_errors_slice() {
378 }
380 for rec in splitter {
381 f(rec);
382 }
383}
384
385pub fn parse_records_with<F>(text: &str, mut f: F)
406where
407 F: for<'r> FnMut(ParsedRecord<'r>),
408{
409 for_each_record(text, |rec| {
410 let parsed = parse_record(rec);
411 f(parsed);
412 });
413}
414
415pub fn parse_into<'a>(text: &'a str, out: &mut Vec<ParsedRecord<'a>>) {
417 out.clear();
418 let splitter = RecordSplitter::new(text);
419 for rec in splitter {
420 out.push(parse_record(rec));
421 }
422}
423
424pub fn parse_all(text: &str) -> Vec<ParsedRecord<'_>> {
449 let splitter = RecordSplitter::new(text);
450 splitter.map(|r| parse_record(r)).collect()
451}
452
453fn parse_digits_forward(s: &str, mut i: usize) -> Option<(u64, usize)> {
454 let bytes = s.as_bytes();
455 let n = bytes.len();
456 while i < n && !bytes[i].is_ascii_digit() {
458 i += 1;
459 }
460 if i >= n || !bytes[i].is_ascii_digit() {
461 return None;
462 }
463 let mut val: u64 = 0;
464 while i < n && bytes[i].is_ascii_digit() {
465 val = val
466 .saturating_mul(10)
467 .saturating_add((bytes[i] - b'0') as u64);
468 i += 1;
469 }
470 Some((val, i))
471}
472
473fn split_ts_meta_body<'a>(rec: &'a str) -> (&'a str, &'a str, &'a str) {
475 let ts: &'a str = if rec.len() >= 23 { &rec[..23] } else { "" };
476 let after_ts: &'a str = if rec.len() > 23 { &rec[23..] } else { "" };
477 let mut meta_raw: &'a str = "";
478 let mut body: &'a str = "";
479
480 if let Some(open_idx) = after_ts.find('(') {
481 if let Some(close_rel) = after_ts[open_idx..].find(')') {
482 meta_raw = &after_ts[open_idx + 1..open_idx + close_rel];
483 let body_start = 23 + open_idx + close_rel + 1;
484 if body_start < rec.len() {
485 body = rec[body_start..].trim_start();
486 }
487 } else {
488 body = after_ts;
490 }
491 } else {
492 body = after_ts;
494 }
495
496 (ts, meta_raw, body)
497}
498
499#[derive(Debug)]
501struct MetaParts<'a> {
502 ep: &'a str,
503 sess: &'a str,
504 thrd: &'a str,
505 user: &'a str,
506 trxid: &'a str,
507 stmt: &'a str,
508 appname: &'a str,
509 ip: Option<&'a str>,
510}
511
512fn parse_meta(meta_raw: &str) -> MetaParts<'_> {
513 let mut parts = MetaParts {
514 ep: "",
515 sess: "",
516 thrd: "",
517 user: "",
518 trxid: "",
519 stmt: "",
520 appname: "",
521 ip: None,
522 };
523
524 let mut iter = meta_raw.split_whitespace().peekable();
525 while let Some(tok) = iter.next() {
526 if tok.starts_with("EP[") {
527 parts.ep = tok;
528 } else if let Some(val) = tok.strip_prefix("sess:") {
529 parts.sess = val;
530 } else if let Some(val) = tok.strip_prefix("thrd:") {
531 parts.thrd = val;
532 } else if let Some(val) = tok.strip_prefix("user:") {
533 parts.user = val;
534 } else if let Some(val) = tok.strip_prefix("trxid:") {
535 parts.trxid = val;
536 } else if let Some(val) = tok.strip_prefix("stmt:") {
537 parts.stmt = val;
538 } else if tok == "appname:" {
539 if let Some(next) = iter.peek() {
540 if (*next).starts_with("ip:::") {
541 let nexttok = iter.next().unwrap();
542 let ippart = nexttok.trim_start_matches("ip:::");
543 let ipclean = ippart.trim_start_matches("ffff:");
544 parts.ip = Some(ipclean);
545 parts.appname = "";
546 } else {
547 let val = iter.next().unwrap();
548 parts.appname = val;
549 }
550 } else {
551 parts.appname = "";
552 }
553 } else if let Some(val) = tok.strip_prefix("appname:") {
554 if val.starts_with("ip:::") {
555 let ippart = val.trim_start_matches("ip:::");
556 let ipclean = ippart.trim_start_matches("ffff:");
557 parts.ip = Some(ipclean);
558 parts.appname = "";
559 } else {
560 parts.appname = val;
561 }
562 }
563 }
564
565 parts
566}
567
568fn parse_body_metrics(body: &str) -> (Option<u64>, Option<u64>, Option<u64>) {
570 let mut execute_id: Option<u64> = None;
571 let mut row_count: Option<u64> = None;
572 let mut execute_time_ms: Option<u64> = None;
573
574 let body_str = body;
575 let mut search_end = body_str.len();
576
577 if let Some(pos) = body_str[..search_end].rfind("EXEC_ID:") {
578 let start = pos + "EXEC_ID:".len();
579 if let Some((v, _)) = parse_digits_forward(body_str, start) {
580 execute_id = Some(v);
581 }
582 search_end = pos;
583 }
584
585 if let Some(pos) = body_str[..search_end].rfind("ROWCOUNT:") {
586 let start = pos + "ROWCOUNT:".len();
587 if let Some((v, _)) = parse_digits_forward(body_str, start) {
588 row_count = Some(v);
589 }
590 search_end = pos;
591 }
592
593 if let Some(pos) = body_str[..search_end].rfind("EXECTIME:") {
594 let start = pos + "EXECTIME:".len();
595 if let Some((v, _)) = parse_digits_forward(body_str, start) {
596 execute_time_ms = Some(v);
597 }
598 }
599
600 (execute_time_ms, row_count, execute_id)
601}
602
603pub fn parse_record(rec: &'_ str) -> ParsedRecord<'_> {
626 let (ts, meta_raw, body) = split_ts_meta_body(rec);
628
629 let meta = parse_meta(meta_raw);
631
632 let (execute_time_ms, row_count, execute_id) = parse_body_metrics(body);
634
635 ParsedRecord {
636 ts,
637 meta_raw,
638 ep: meta.ep,
639 sess: meta.sess,
640 thrd: meta.thrd,
641 user: meta.user,
642 trxid: meta.trxid,
643 stmt: meta.stmt,
644 appname: meta.appname,
645 ip: meta.ip,
646 body,
647 execute_time_ms,
648 row_count,
649 execute_id,
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656
657 #[test]
658 fn test_split_by_ts_records() {
659 let log_text = "2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT * FROM users
6602023-10-05 14:24:00.456 (EP[12346] sess:2 thrd:2 user:guest trxid:0 stmt:2 appname:MyApp)\nINSERT INTO orders VALUES (1, 'item');\n";
661 let (records, errors) = split_by_ts_records_with_errors(log_text);
662
663 assert_eq!(records.len(), 2);
664 assert_eq!(errors.len(), 0);
665 }
666
667 #[test]
668 fn test_split_with_leading_errors() {
669 let log_text = "garbage line 1\ngarbage line 2\n2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
670 let (records, errors) = split_by_ts_records_with_errors(log_text);
671
672 assert_eq!(records.len(), 1);
673 assert_eq!(errors.len(), 2);
674 assert!(records[0].contains("SELECT 1"));
675 }
676
677 #[test]
678 fn test_record_splitter_iterator() {
679 let log_text = "garbage\n2023-10-05 14:23:45.123 (EP[1] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp) foo\n2023-10-05 14:23:46.456 (EP[2] sess:2 thrd:2 user:guest trxid:0 stmt:2 appname:MyApp) bar\n";
680 let it = RecordSplitter::new(log_text);
681 assert_eq!(it.leading_errors_slice().unwrap().trim(), "garbage");
682 let v: Vec<&str> = it.collect();
683 assert_eq!(v.len(), 2);
684 }
685
686 #[test]
687 fn test_parse_simple_log_sample() {
688 let log_text = "2025-08-12 10:57:09.562 (EP[0] sess:0x7fb24f392a30 thrd:757794 user:HBTCOMS_V3_PROD trxid:688489653 stmt:0x7fb236077b70 appname: ip:::ffff:10.3.100.68) EXECTIME: 0ms ROWCOUNT: 1 EXEC_ID: 289655185\n2025-08-12 10:57:09.562 (EP[0] sess:0x7fb24f392a30 thrd:757794 user:HBTCOMS_V3_PROD trxid:0 stmt:NULL appname:) TRX: START\n";
689
690 let (records, errors) = split_by_ts_records_with_errors(log_text);
691 assert_eq!(errors.len(), 0);
692 assert_eq!(records.len(), 2);
693
694 let r0 = parse_record(records[0]);
695 assert_eq!(r0.execute_time_ms, Some(0));
696 assert_eq!(r0.row_count, Some(1));
697 assert_eq!(r0.execute_id, Some(289655185));
698 assert_eq!(r0.ip, Some("10.3.100.68"));
699 assert_eq!(r0.appname, "");
700
701 let r1 = parse_record(records[1]);
702 assert!(r1.body.contains("TRX: START"));
703 }
704
705 #[test]
706 fn test_missing_sess_field_should_be_error() {
707 let log_text = "garbage1\n2023-10-05 14:23:45.123 (EP[12345] thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
709 let (records, errors) = split_by_ts_records_with_errors(log_text);
710 assert_eq!(records.len(), 0);
711 assert_eq!(errors.len(), 0);
714 }
715
716 #[test]
717 fn test_missing_thrd_field_should_be_error() {
718 let log_text = "garbage2\n2023-10-05 14:23:45.123 (EP[12345] sess:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
720 let (records, errors) = split_by_ts_records_with_errors(log_text);
721 assert_eq!(records.len(), 0);
722 assert_eq!(errors.len(), 0); }
724
725 #[test]
726 fn test_missing_user_field_should_be_error() {
727 let log_text = "garbage3\n2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
729 let (records, errors) = split_by_ts_records_with_errors(log_text);
730 assert_eq!(records.len(), 0);
731 assert_eq!(errors.len(), 0);
732 }
733
734 #[test]
735 fn test_missing_trxid_field_should_be_error() {
736 let log_text = "garbage4\n2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 user:admin stmt:1 appname:MyApp)\nSELECT 1\n";
738 let (records, errors) = split_by_ts_records_with_errors(log_text);
739 assert_eq!(records.len(), 0);
740 assert_eq!(errors.len(), 0);
741 }
742
743 #[test]
744 fn test_missing_stmt_field_should_be_error() {
745 let log_text = "garbage5\n2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 user:admin trxid:0 appname:MyApp)\nSELECT 1\n";
747 let (records, errors) = split_by_ts_records_with_errors(log_text);
748 assert_eq!(records.len(), 0);
749 assert_eq!(errors.len(), 0);
750 }
751
752 #[test]
753 fn test_missing_appname_field_should_be_error() {
754 let log_text = "garbage6\n2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1)\nSELECT 1\n";
756 let (records, errors) = split_by_ts_records_with_errors(log_text);
757 assert_eq!(records.len(), 0);
758 assert_eq!(errors.len(), 0);
759 }
760
761 #[test]
762 fn test_wrong_field_order_should_be_error() {
763 let log_text = "garbage\n2023-10-05 14:23:45.123 (EP[12345] thrd:1 sess:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
765 let (records, errors) = split_by_ts_records_with_errors(log_text);
766 assert_eq!(records.len(), 0);
767 assert_eq!(errors.len(), 0);
768 }
769
770 #[test]
771 fn test_invalid_timestamp_format_should_be_error() {
772 let log_text = "garbage\n2023-10-05 14:23:45 (EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
774 let (records, errors) = split_by_ts_records_with_errors(log_text);
775 assert_eq!(records.len(), 0);
776 assert_eq!(errors.len(), 0);
777 }
778
779 #[test]
780 fn test_invalid_timestamp_length_should_be_error() {
781 let log_text = "garbage\n2023-10-05 14:23:45.1 (EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
783 let (records, errors) = split_by_ts_records_with_errors(log_text);
784 assert_eq!(records.len(), 0);
785 assert_eq!(errors.len(), 0);
786 }
787
788 #[test]
789 fn test_missing_space_after_timestamp_should_be_error() {
790 let log_text = "garbage\n2023-10-05 14:23:45.123(EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
792 let (records, errors) = split_by_ts_records_with_errors(log_text);
793 assert_eq!(records.len(), 0);
794 assert_eq!(errors.len(), 0);
795 }
796
797 #[test]
798 fn test_missing_ep_bracket_should_be_error() {
799 let log_text = "garbage\n2023-10-05 14:23:45.123 (EP[12345 sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
801 let (records, errors) = split_by_ts_records_with_errors(log_text);
802 assert_eq!(records.len(), 0);
803 assert_eq!(errors.len(), 0);
804 }
805
806 #[test]
807 fn test_no_timestamp_line_should_be_error() {
808 let log_text = "garbage line 1\ngarbage line 2\njust a normal line\n2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\n";
810 let (records, errors) = split_by_ts_records_with_errors(log_text);
811 assert_eq!(records.len(), 1);
812 assert_eq!(errors.len(), 3); }
814
815 #[test]
816 fn test_mixed_valid_and_invalid_records() {
817 let log_text = "2023-10-05 14:23:45.123 (EP[12345] sess:1 thrd:1 user:admin trxid:0 stmt:1 appname:MyApp)\nSELECT 1\ninvalid line\n2023-10-05 14:24:00.456 (EP[12346] sess:2 thrd:2 user:guest trxid:0 stmt:2 appname:MyApp)\nINSERT 1\n2023-10-05 14:24:01.789 (EP[12347] sess:3)\ninvalid record\n";
820 let (records, errors) = split_by_ts_records_with_errors(log_text);
821 assert_eq!(records.len(), 3);
823 assert_eq!(errors.len(), 0); }
826}