dm_database_parser_sqllog/parser/record_parser.rs
1//! RecordParser - 从 Reader 流式读取并解析 Record
2//!
3//! 提供了一个迭代器,可以从任何实现了 `Read` trait 的源中逐条读取日志记录。
4
5use crate::parser::record::Record;
6use crate::tools::is_record_start_line;
7use std::io::{BufRead, BufReader, Read};
8
9/// 从 Reader 中按行读取并解析成 Record 的迭代器
10///
11/// `RecordParser` 实现了 `Iterator` trait,可以逐条读取日志记录。
12/// 它会自动识别记录的起始行和继续行,并将它们组合成完整的 `Record`。
13///
14/// # 类型参数
15///
16/// * `R` - 实现了 `Read` trait 的类型
17///
18/// # 示例
19///
20/// ```no_run
21/// use dm_database_parser_sqllog::RecordParser;
22/// use std::fs::File;
23///
24/// let file = File::open("sqllog.txt").unwrap();
25/// let parser = RecordParser::new(file);
26///
27/// for result in parser {
28/// if let Ok(record) = result {
29/// println!("记录: {}", record.start_line());
30/// }
31/// }
32/// ```
33pub struct RecordParser<R: Read> {
34 reader: BufReader<R>,
35 buffer: String,
36 next_line: Option<String>,
37 finished: bool,
38}
39
40impl<R: Read> RecordParser<R> {
41 /// 创建新的 RecordParser
42 ///
43 /// # 参数
44 ///
45 /// * `reader` - 任何实现了 `Read` trait 的类型(如 File、&[u8] 等)
46 ///
47 /// # 示例
48 ///
49 /// ```
50 /// use dm_database_parser_sqllog::RecordParser;
51 ///
52 /// let data = b"2025-08-12 10:57:09.548 (EP[0] sess:123 thrd:456 user:alice trxid:789 stmt:999 appname:app) SELECT 1";
53 /// let parser = RecordParser::new(&data[..]);
54 /// ```
55 pub fn new(reader: R) -> Self {
56 Self {
57 reader: BufReader::new(reader),
58 buffer: String::new(),
59 next_line: None,
60 finished: false,
61 }
62 }
63
64 /// 读取下一行
65 fn read_line(&mut self) -> std::io::Result<Option<String>> {
66 self.buffer.clear();
67 let bytes_read = self.reader.read_line(&mut self.buffer)?;
68
69 if bytes_read == 0 {
70 Ok(None)
71 } else {
72 // 优化:原地移除换行符,避免创建新字符串
73 let mut len = self.buffer.len();
74 while len > 0 {
75 let last_byte = self.buffer.as_bytes()[len - 1];
76 if last_byte == b'\n' || last_byte == b'\r' {
77 len -= 1;
78 } else {
79 break;
80 }
81 }
82
83 // 只在需要时才创建新字符串(避免额外的 trim + to_string 开销)
84 if len != self.buffer.len() {
85 self.buffer.truncate(len);
86 }
87
88 // 使用 mem::take 避免额外的克隆,保持缓冲区容量
89 Ok(Some(std::mem::take(&mut self.buffer)))
90 }
91 }
92
93 /// 获取下一个记录的起始行
94 fn get_start_line(&mut self) -> std::io::Result<Option<String>> {
95 // 如果有缓存的下一行(上次读取时遇到的新起始行)
96 if let Some(line) = self.next_line.take() {
97 return Ok(Some(line));
98 }
99
100 // 读取并跳过非起始行,直到找到第一个有效起始行
101 loop {
102 match self.read_line()? {
103 Some(line) if is_record_start_line(&line) => return Ok(Some(line)),
104 Some(_) => continue, // 跳过非起始行
105 None => {
106 self.finished = true;
107 return Ok(None);
108 }
109 }
110 }
111 }
112
113 /// 读取当前记录的所有继续行
114 fn read_continuation_lines(&mut self, record: &mut Record) -> std::io::Result<()> {
115 loop {
116 match self.read_line()? {
117 Some(line) if is_record_start_line(&line) => {
118 // 遇到下一个起始行,保存它并结束当前记录
119 self.next_line = Some(line);
120 break;
121 }
122 Some(line) => {
123 // 继续行
124 record.add_line(line);
125 }
126 None => {
127 // 文件结束
128 self.finished = true;
129 break;
130 }
131 }
132 }
133 Ok(())
134 }
135}
136
137impl<R: Read> Iterator for RecordParser<R> {
138 type Item = std::io::Result<Record>;
139
140 fn next(&mut self) -> Option<Self::Item> {
141 if self.finished {
142 return None;
143 }
144
145 // 获取记录的起始行
146 let start_line = match self.get_start_line() {
147 Ok(Some(line)) => line,
148 Ok(None) => return None,
149 Err(e) => return Some(Err(e)),
150 };
151
152 let mut record = Record::new(start_line);
153
154 // 读取继续行
155 match self.read_continuation_lines(&mut record) {
156 Ok(()) => Some(Ok(record)),
157 Err(e) => Some(Err(e)),
158 }
159 }
160}