dm_database_parser_sqllog/parser/record_parser.rs
1//! RecordParser - 从 Reader 流式读取并解析 Record
2//!
3//! 提供了一个迭代器,可以从任何实现了 `Read` trait 的源中逐条读取日志记录。
4
5use crate::parser::record::Record;
6use crate::tools::is_record_start_line;
7use std::io::{BufRead, BufReader, Read};
8
9/// 从 Reader 中按行读取并解析成 Record 的迭代器
10///
11/// `RecordParser` 实现了 `Iterator` trait,可以逐条读取日志记录。
12/// 它会自动识别记录的起始行和继续行,并将它们组合成完整的 `Record`。
13///
14/// # 类型参数
15///
16/// * `R` - 实现了 `Read` trait 的类型
17///
18/// # 示例
19///
20/// ```no_run
21/// use dm_database_parser_sqllog::RecordParser;
22/// use std::fs::File;
23///
24/// let file = File::open("sqllog.txt").unwrap();
25/// let parser = RecordParser::new(file);
26///
27/// for result in parser {
28/// if let Ok(record) = result {
29/// println!("记录: {}", record.start_line());
30/// }
31/// }
32/// ```
33pub struct RecordParser<R: Read> {
34 reader: BufReader<R>,
35 buffer: String,
36 next_line: Option<String>,
37 finished: bool,
38}
39
40impl<R: Read> RecordParser<R> {
41 /// 创建新的 RecordParser
42 ///
43 /// # 参数
44 ///
45 /// * `reader` - 任何实现了 `Read` trait 的类型(如 File、&[u8] 等)
46 ///
47 /// # 示例
48 ///
49 /// ```
50 /// use dm_database_parser_sqllog::RecordParser;
51 ///
52 /// let data = b"2025-08-12 10:57:09.548 (EP[0] sess:123 thrd:456 user:alice trxid:789 stmt:999 appname:app) SELECT 1";
53 /// let parser = RecordParser::new(&data[..]);
54 /// ```
55 pub fn new(reader: R) -> Self {
56 Self {
57 reader: BufReader::new(reader),
58 buffer: String::new(),
59 next_line: None,
60 finished: false,
61 }
62 }
63
64 /// 读取下一行
65 fn read_line(&mut self) -> std::io::Result<Option<String>> {
66 self.buffer.clear();
67 let bytes_read = self.reader.read_line(&mut self.buffer)?;
68
69 if bytes_read == 0 {
70 Ok(None)
71 } else {
72 // 移除行尾的换行符
73 let line = self.buffer.trim_end_matches(&['\r', '\n'][..]).to_string();
74 Ok(Some(line))
75 }
76 }
77
78 /// 获取下一个记录的起始行
79 fn get_start_line(&mut self) -> std::io::Result<Option<String>> {
80 // 如果有缓存的下一行(上次读取时遇到的新起始行)
81 if let Some(line) = self.next_line.take() {
82 return Ok(Some(line));
83 }
84
85 // 读取并跳过非起始行,直到找到第一个有效起始行
86 loop {
87 match self.read_line()? {
88 Some(line) if is_record_start_line(&line) => return Ok(Some(line)),
89 Some(_) => continue, // 跳过非起始行
90 None => {
91 self.finished = true;
92 return Ok(None);
93 }
94 }
95 }
96 }
97
98 /// 读取当前记录的所有继续行
99 fn read_continuation_lines(&mut self, record: &mut Record) -> std::io::Result<()> {
100 loop {
101 match self.read_line()? {
102 Some(line) if is_record_start_line(&line) => {
103 // 遇到下一个起始行,保存它并结束当前记录
104 self.next_line = Some(line);
105 break;
106 }
107 Some(line) => {
108 // 继续行
109 record.add_line(line);
110 }
111 None => {
112 // 文件结束
113 self.finished = true;
114 break;
115 }
116 }
117 }
118 Ok(())
119 }
120}
121
122impl<R: Read> Iterator for RecordParser<R> {
123 type Item = std::io::Result<Record>;
124
125 fn next(&mut self) -> Option<Self::Item> {
126 if self.finished {
127 return None;
128 }
129
130 // 获取记录的起始行
131 let start_line = match self.get_start_line() {
132 Ok(Some(line)) => line,
133 Ok(None) => return None,
134 Err(e) => return Some(Err(e)),
135 };
136
137 let mut record = Record::new(start_line);
138
139 // 读取继续行
140 match self.read_continuation_lines(&mut record) {
141 Ok(()) => Some(Ok(record)),
142 Err(e) => Some(Err(e)),
143 }
144 }
145}