dm_database_parser_sqllog/parser/
record_parser.rs

1//! RecordParser - 从 Reader 流式读取并解析 Record
2//!
3//! 提供了一个迭代器,可以从任何实现了 `Read` trait 的源中逐条读取日志记录。
4
5use crate::error::ParseError;
6use crate::parser::record::Record;
7use crate::sqllog::Sqllog;
8use crate::tools::is_record_start_line;
9use rayon::prelude::*;
10use std::collections::VecDeque;
11use std::{
12    io::{self, BufRead, BufReader, Read},
13    mem,
14};
15
16/// 从 Reader 中按行读取并解析成 Record 的迭代器
17///
18/// `RecordParser` 实现了 `Iterator` trait,可以逐条读取日志记录。
19/// 它会自动识别记录的起始行和继续行,并将它们组合成完整的 `Record`。
20///
21/// # 类型参数
22///
23/// * `R` - 实现了 `Read` trait 的类型
24pub struct RecordParser<R: Read> {
25    reader: BufReader<R>,
26    buffer: String,
27    next_line: Option<String>,
28    finished: bool,
29}
30
31impl<R: Read> RecordParser<R> {
32    pub fn new(reader: R) -> Self {
33        Self {
34            reader: BufReader::new(reader),
35            buffer: String::new(),
36            next_line: None,
37            finished: false,
38        }
39    }
40
41    /// 读取下一行
42    fn read_line(&mut self) -> io::Result<Option<String>> {
43        self.buffer.clear();
44        let bytes_read = self.reader.read_line(&mut self.buffer)?;
45
46        if bytes_read == 0 {
47            Ok(None)
48        } else {
49            // 优化:原地移除换行符,避免创建新字符串
50            let mut len = self.buffer.len();
51            while len > 0 {
52                let last_byte = self.buffer.as_bytes()[len - 1];
53                if last_byte == b'\n' || last_byte == b'\r' {
54                    len -= 1;
55                } else {
56                    break;
57                }
58            }
59
60            // 只在需要时才创建新字符串(避免额外的 trim + to_string 开销)
61            if len != self.buffer.len() {
62                self.buffer.truncate(len);
63            }
64
65            // 使用 mem::take 避免额外的克隆,保持缓冲区容量
66            Ok(Some(mem::take(&mut self.buffer)))
67        }
68    }
69
70    /// 获取下一个记录的起始行
71    fn get_start_line(&mut self) -> io::Result<Option<String>> {
72        // 如果有缓存的下一行(上次读取时遇到的新起始行)
73        if let Some(line) = self.next_line.take() {
74            return Ok(Some(line));
75        }
76
77        // 读取并跳过非起始行,直到找到第一个有效起始行
78        loop {
79            match self.read_line()? {
80                Some(line) if is_record_start_line(&line) => return Ok(Some(line)),
81                Some(_) => continue, // 跳过非起始行
82                None => {
83                    self.finished = true;
84                    return Ok(None);
85                }
86            }
87        }
88    }
89
90    /// 读取当前记录的所有继续行
91    fn read_continuation_lines(&mut self, record: &mut Record) -> io::Result<()> {
92        loop {
93            match self.read_line()? {
94                Some(line) if is_record_start_line(&line) => {
95                    // 遇到下一个起始行,保存它并结束当前记录
96                    self.next_line = Some(line);
97                    break;
98                }
99                Some(line) => {
100                    // 继续行
101                    record.add_line(line);
102                }
103                None => {
104                    // 文件结束
105                    self.finished = true;
106                    break;
107                }
108            }
109        }
110        Ok(())
111    }
112}
113
114impl<R: Read> Iterator for RecordParser<R> {
115    type Item = io::Result<Record>;
116
117    fn next(&mut self) -> Option<Self::Item> {
118        if self.finished {
119            return None;
120        }
121
122        // 获取记录的起始行
123        let start_line = match self.get_start_line() {
124            Ok(Some(line)) => line,
125            Ok(None) => return None,
126            Err(e) => return Some(Err(e)),
127        };
128
129        let mut record = Record::new(start_line);
130
131        // 读取继续行
132        match self.read_continuation_lines(&mut record) {
133            Ok(()) => Some(Ok(record)),
134            Err(e) => Some(Err(e)),
135        }
136    }
137}
138
139/// Sqllog 迭代器,使用批量缓冲 + 并行处理优化性能
140///
141/// 该迭代器用于从 `RecordParser` 并行转换为 `Sqllog`,并在 crate 内部使用。
142pub(crate) struct SqllogIterator<R: Read> {
143    record_parser: RecordParser<R>,
144    buffer: VecDeque<Result<Sqllog, ParseError>>,
145    batch_size: usize,
146}
147
148impl<R: Read> SqllogIterator<R> {
149    /// 创建新的 SqllogIterator,使用默认批次大小(10000)
150    pub(crate) fn new(record_parser: RecordParser<R>) -> Self {
151        Self {
152            record_parser,
153            buffer: VecDeque::new(),
154            batch_size: 10000, // 每次并行处理 1万条
155        }
156    }
157
158    /// 填充缓冲区:批量读取记录并并行解析
159    fn fill_buffer(&mut self) {
160        let mut records: Vec<Record> = Vec::with_capacity(self.batch_size);
161
162        // 批量读取记录
163        for _ in 0..self.batch_size {
164            match self.record_parser.next() {
165                Some(Ok(record)) => records.push(record),
166                Some(Err(io_err)) => {
167                    self.buffer
168                        .push_back(Err(ParseError::IoError(io_err.to_string())));
169                }
170                None => break,
171            }
172        }
173
174        if records.is_empty() {
175            return;
176        }
177
178        // 并行解析
179        let results: Vec<Result<Sqllog, ParseError>> = records
180            .par_iter()
181            .map(|record| record.parse_to_sqllog())
182            .collect();
183
184        // 将结果放入缓冲区
185        for result in results {
186            self.buffer.push_back(result);
187        }
188    }
189}
190
191impl<R: Read> Iterator for SqllogIterator<R> {
192    type Item = Result<Sqllog, ParseError>;
193
194    fn next(&mut self) -> Option<Self::Item> {
195        // 如果缓冲区为空,尝试填充
196        if self.buffer.is_empty() {
197            self.fill_buffer();
198        }
199
200        // 从缓冲区取出结果
201        self.buffer.pop_front()
202    }
203}