dm_database_parser_sqllog/parser/
record_parser.rs1use crate::error::ParseError;
6use crate::parser::record::Record;
7use crate::sqllog::Sqllog;
8use crate::tools::is_record_start_line;
9use rayon::prelude::*;
10use std::collections::VecDeque;
11use std::{
12 io::{self, BufRead, BufReader, Read},
13 mem,
14};
15
16pub struct RecordParser<R: Read> {
25 reader: BufReader<R>,
26 buffer: String,
27 next_line: Option<String>,
28 finished: bool,
29}
30
31impl<R: Read> RecordParser<R> {
32 pub fn new(reader: R) -> Self {
33 Self {
34 reader: BufReader::new(reader),
35 buffer: String::new(),
36 next_line: None,
37 finished: false,
38 }
39 }
40
41 fn read_line(&mut self) -> io::Result<Option<String>> {
43 self.buffer.clear();
44 let bytes_read = self.reader.read_line(&mut self.buffer)?;
45
46 if bytes_read == 0 {
47 Ok(None)
48 } else {
49 let mut len = self.buffer.len();
51 while len > 0 {
52 let last_byte = self.buffer.as_bytes()[len - 1];
53 if last_byte == b'\n' || last_byte == b'\r' {
54 len -= 1;
55 } else {
56 break;
57 }
58 }
59
60 if len != self.buffer.len() {
62 self.buffer.truncate(len);
63 }
64
65 Ok(Some(mem::take(&mut self.buffer)))
67 }
68 }
69
70 fn get_start_line(&mut self) -> io::Result<Option<String>> {
72 if let Some(line) = self.next_line.take() {
74 return Ok(Some(line));
75 }
76
77 loop {
79 match self.read_line()? {
80 Some(line) if is_record_start_line(&line) => return Ok(Some(line)),
81 Some(_) => continue, None => {
83 self.finished = true;
84 return Ok(None);
85 }
86 }
87 }
88 }
89
90 fn read_continuation_lines(&mut self, record: &mut Record) -> io::Result<()> {
92 loop {
93 match self.read_line()? {
94 Some(line) if is_record_start_line(&line) => {
95 self.next_line = Some(line);
97 break;
98 }
99 Some(line) => {
100 record.add_line(line);
102 }
103 None => {
104 self.finished = true;
106 break;
107 }
108 }
109 }
110 Ok(())
111 }
112}
113
114impl<R: Read> Iterator for RecordParser<R> {
115 type Item = io::Result<Record>;
116
117 fn next(&mut self) -> Option<Self::Item> {
118 if self.finished {
119 return None;
120 }
121
122 let start_line = match self.get_start_line() {
124 Ok(Some(line)) => line,
125 Ok(None) => return None,
126 Err(e) => return Some(Err(e)),
127 };
128
129 let mut record = Record::new(start_line);
130
131 match self.read_continuation_lines(&mut record) {
133 Ok(()) => Some(Ok(record)),
134 Err(e) => Some(Err(e)),
135 }
136 }
137}
138
139pub(crate) struct SqllogIterator<R: Read> {
143 record_parser: RecordParser<R>,
144 buffer: VecDeque<Result<Sqllog, ParseError>>,
145 batch_size: usize,
146}
147
148impl<R: Read> SqllogIterator<R> {
149 pub(crate) fn new(record_parser: RecordParser<R>) -> Self {
151 Self {
152 record_parser,
153 buffer: VecDeque::new(),
154 batch_size: 10000, }
156 }
157
158 fn fill_buffer(&mut self) {
160 let mut records: Vec<Record> = Vec::with_capacity(self.batch_size);
161
162 for _ in 0..self.batch_size {
164 match self.record_parser.next() {
165 Some(Ok(record)) => records.push(record),
166 Some(Err(io_err)) => {
167 self.buffer
168 .push_back(Err(ParseError::IoError(io_err.to_string())));
169 }
170 None => break,
171 }
172 }
173
174 if records.is_empty() {
175 return;
176 }
177
178 let results: Vec<Result<Sqllog, ParseError>> = records
180 .par_iter()
181 .map(|record| record.parse_to_sqllog())
182 .collect();
183
184 for result in results {
186 self.buffer.push_back(result);
187 }
188 }
189}
190
191impl<R: Read> Iterator for SqllogIterator<R> {
192 type Item = Result<Sqllog, ParseError>;
193
194 fn next(&mut self) -> Option<Self::Item> {
195 if self.buffer.is_empty() {
197 self.fill_buffer();
198 }
199
200 self.buffer.pop_front()
202 }
203}