dm_database_parser_sqllog/parser/
record_parser.rs1use crate::parser::record::Record;
6use crate::tools::is_record_start_line;
7use std::{
8 io::{self, BufRead, BufReader, Read},
9 mem,
10};
11
12pub struct RecordParser<R: Read> {
21 reader: BufReader<R>,
22 buffer: String,
23 next_line: Option<String>,
24 finished: bool,
25}
26
27impl<R: Read> RecordParser<R> {
28 pub fn new(reader: R) -> Self {
29 Self {
30 reader: BufReader::new(reader),
31 buffer: String::new(),
32 next_line: None,
33 finished: false,
34 }
35 }
36
37 fn read_line(&mut self) -> io::Result<Option<String>> {
39 self.buffer.clear();
40 let bytes_read = self.reader.read_line(&mut self.buffer)?;
41
42 if bytes_read == 0 {
43 Ok(None)
44 } else {
45 let mut len = self.buffer.len();
47 while len > 0 {
48 let last_byte = self.buffer.as_bytes()[len - 1];
49 if last_byte == b'\n' || last_byte == b'\r' {
50 len -= 1;
51 } else {
52 break;
53 }
54 }
55
56 if len != self.buffer.len() {
58 self.buffer.truncate(len);
59 }
60
61 Ok(Some(mem::take(&mut self.buffer)))
63 }
64 }
65
66 fn get_start_line(&mut self) -> io::Result<Option<String>> {
68 if let Some(line) = self.next_line.take() {
70 return Ok(Some(line));
71 }
72
73 loop {
75 match self.read_line()? {
76 Some(line) if is_record_start_line(&line) => return Ok(Some(line)),
77 Some(_) => continue, None => {
79 self.finished = true;
80 return Ok(None);
81 }
82 }
83 }
84 }
85
86 fn read_continuation_lines(&mut self, record: &mut Record) -> io::Result<()> {
88 loop {
89 match self.read_line()? {
90 Some(line) if is_record_start_line(&line) => {
91 self.next_line = Some(line);
93 break;
94 }
95 Some(line) => {
96 record.add_line(line);
98 }
99 None => {
100 self.finished = true;
102 break;
103 }
104 }
105 }
106 Ok(())
107 }
108}
109
110impl<R: Read> Iterator for RecordParser<R> {
111 type Item = io::Result<Record>;
112
113 fn next(&mut self) -> Option<Self::Item> {
114 if self.finished {
115 return None;
116 }
117
118 let start_line = match self.get_start_line() {
120 Ok(Some(line)) => line,
121 Ok(None) => return None,
122 Err(e) => return Some(Err(e)),
123 };
124
125 let mut record = Record::new(start_line);
126
127 match self.read_continuation_lines(&mut record) {
129 Ok(()) => Some(Ok(record)),
130 Err(e) => Some(Err(e)),
131 }
132 }
133}