dm_database_parser_sqllog/parser/
record_parser.rs

1//! RecordParser - 从 Reader 流式读取并解析 Record
2//!
3//! 提供了一个迭代器,可以从任何实现了 `Read` trait 的源中逐条读取日志记录。
4
5use crate::parser::record::Record;
6use crate::tools::is_record_start_line;
7use std::io::{BufRead, BufReader, Read};
8
9/// 从 Reader 中按行读取并解析成 Record 的迭代器
10///
11/// `RecordParser` 实现了 `Iterator` trait,可以逐条读取日志记录。
12/// 它会自动识别记录的起始行和继续行,并将它们组合成完整的 `Record`。
13///
14/// # 类型参数
15///
16/// * `R` - 实现了 `Read` trait 的类型
17///
18/// # 示例
19///
20/// ```no_run
21/// use dm_database_parser_sqllog::RecordParser;
22/// use std::fs::File;
23///
24/// let file = File::open("sqllog.txt").unwrap();
25/// let parser = RecordParser::new(file);
26///
27/// for result in parser {
28///     if let Ok(record) = result {
29///         println!("记录: {}", record.start_line());
30///     }
31/// }
32/// ```
33pub struct RecordParser<R: Read> {
34    reader: BufReader<R>,
35    buffer: String,
36    next_line: Option<String>,
37    finished: bool,
38}
39
40impl<R: Read> RecordParser<R> {
41    /// 创建新的 RecordParser
42    ///
43    /// # 参数
44    ///
45    /// * `reader` - 任何实现了 `Read` trait 的类型(如 File、&[u8] 等)
46    ///
47    /// # 示例
48    ///
49    /// ```
50    /// use dm_database_parser_sqllog::RecordParser;
51    ///
52    /// let data = b"2025-08-12 10:57:09.548 (EP[0] sess:123 thrd:456 user:alice trxid:789 stmt:999 appname:app) SELECT 1";
53    /// let parser = RecordParser::new(&data[..]);
54    /// ```
55    pub fn new(reader: R) -> Self {
56        Self {
57            reader: BufReader::new(reader),
58            buffer: String::new(),
59            next_line: None,
60            finished: false,
61        }
62    }
63
64    /// 读取下一行
65    fn read_line(&mut self) -> std::io::Result<Option<String>> {
66        self.buffer.clear();
67        let bytes_read = self.reader.read_line(&mut self.buffer)?;
68
69        if bytes_read == 0 {
70            Ok(None)
71        } else {
72            // 移除行尾的换行符
73            let line = self.buffer.trim_end_matches(&['\r', '\n'][..]).to_string();
74            Ok(Some(line))
75        }
76    }
77
78    /// 获取下一个记录的起始行
79    fn get_start_line(&mut self) -> std::io::Result<Option<String>> {
80        // 如果有缓存的下一行(上次读取时遇到的新起始行)
81        if let Some(line) = self.next_line.take() {
82            return Ok(Some(line));
83        }
84
85        // 读取并跳过非起始行,直到找到第一个有效起始行
86        loop {
87            match self.read_line()? {
88                Some(line) if is_record_start_line(&line) => return Ok(Some(line)),
89                Some(_) => continue, // 跳过非起始行
90                None => {
91                    self.finished = true;
92                    return Ok(None);
93                }
94            }
95        }
96    }
97
98    /// 读取当前记录的所有继续行
99    fn read_continuation_lines(&mut self, record: &mut Record) -> std::io::Result<()> {
100        loop {
101            match self.read_line()? {
102                Some(line) if is_record_start_line(&line) => {
103                    // 遇到下一个起始行,保存它并结束当前记录
104                    self.next_line = Some(line);
105                    break;
106                }
107                Some(line) => {
108                    // 继续行
109                    record.add_line(line);
110                }
111                None => {
112                    // 文件结束
113                    self.finished = true;
114                    break;
115                }
116            }
117        }
118        Ok(())
119    }
120}
121
122impl<R: Read> Iterator for RecordParser<R> {
123    type Item = std::io::Result<Record>;
124
125    fn next(&mut self) -> Option<Self::Item> {
126        if self.finished {
127            return None;
128        }
129
130        // 获取记录的起始行
131        let start_line = match self.get_start_line() {
132            Ok(Some(line)) => line,
133            Ok(None) => return None,
134            Err(e) => return Some(Err(e)),
135        };
136
137        let mut record = Record::new(start_line);
138
139        // 读取继续行
140        match self.read_continuation_lines(&mut record) {
141            Ok(()) => Some(Ok(record)),
142            Err(e) => Some(Err(e)),
143        }
144    }
145}