dm_database_parser_sqllog/parser/
record_parser.rs

1//! RecordParser - 从 Reader 流式读取并解析 Record
2//!
3//! 提供了一个迭代器,可以从任何实现了 `Read` trait 的源中逐条读取日志记录。
4
5use crate::parser::record::Record;
6use crate::tools::is_record_start_line;
7use std::io::{BufRead, BufReader, Read};
8
9/// 从 Reader 中按行读取并解析成 Record 的迭代器
10///
11/// `RecordParser` 实现了 `Iterator` trait,可以逐条读取日志记录。
12/// 它会自动识别记录的起始行和继续行,并将它们组合成完整的 `Record`。
13///
14/// # 类型参数
15///
16/// * `R` - 实现了 `Read` trait 的类型
17///
18/// # 示例
19///
20/// ```no_run
21/// use dm_database_parser_sqllog::RecordParser;
22/// use std::fs::File;
23///
24/// let file = File::open("sqllog.txt").unwrap();
25/// let parser = RecordParser::new(file);
26///
27/// for result in parser {
28///     if let Ok(record) = result {
29///         println!("记录: {}", record.start_line());
30///     }
31/// }
32/// ```
33pub struct RecordParser<R: Read> {
34    reader: BufReader<R>,
35    buffer: String,
36    next_line: Option<String>,
37    finished: bool,
38}
39
40impl<R: Read> RecordParser<R> {
41    /// 创建新的 RecordParser
42    ///
43    /// # 参数
44    ///
45    /// * `reader` - 任何实现了 `Read` trait 的类型(如 File、&[u8] 等)
46    ///
47    /// # 示例
48    ///
49    /// ```
50    /// use dm_database_parser_sqllog::RecordParser;
51    ///
52    /// let data = b"2025-08-12 10:57:09.548 (EP[0] sess:123 thrd:456 user:alice trxid:789 stmt:999 appname:app) SELECT 1";
53    /// let parser = RecordParser::new(&data[..]);
54    /// ```
55    pub fn new(reader: R) -> Self {
56        Self {
57            reader: BufReader::new(reader),
58            buffer: String::new(),
59            next_line: None,
60            finished: false,
61        }
62    }
63
64    /// 读取下一行
65    fn read_line(&mut self) -> std::io::Result<Option<String>> {
66        self.buffer.clear();
67        let bytes_read = self.reader.read_line(&mut self.buffer)?;
68
69        if bytes_read == 0 {
70            Ok(None)
71        } else {
72            // 优化:原地移除换行符,避免创建新字符串
73            let mut len = self.buffer.len();
74            while len > 0 {
75                let last_byte = self.buffer.as_bytes()[len - 1];
76                if last_byte == b'\n' || last_byte == b'\r' {
77                    len -= 1;
78                } else {
79                    break;
80                }
81            }
82
83            // 只在需要时才创建新字符串(避免额外的 trim + to_string 开销)
84            if len != self.buffer.len() {
85                self.buffer.truncate(len);
86            }
87
88            // 使用 mem::take 避免额外的克隆,保持缓冲区容量
89            Ok(Some(std::mem::take(&mut self.buffer)))
90        }
91    }
92
93    /// 获取下一个记录的起始行
94    fn get_start_line(&mut self) -> std::io::Result<Option<String>> {
95        // 如果有缓存的下一行(上次读取时遇到的新起始行)
96        if let Some(line) = self.next_line.take() {
97            return Ok(Some(line));
98        }
99
100        // 读取并跳过非起始行,直到找到第一个有效起始行
101        loop {
102            match self.read_line()? {
103                Some(line) if is_record_start_line(&line) => return Ok(Some(line)),
104                Some(_) => continue, // 跳过非起始行
105                None => {
106                    self.finished = true;
107                    return Ok(None);
108                }
109            }
110        }
111    }
112
113    /// 读取当前记录的所有继续行
114    fn read_continuation_lines(&mut self, record: &mut Record) -> std::io::Result<()> {
115        loop {
116            match self.read_line()? {
117                Some(line) if is_record_start_line(&line) => {
118                    // 遇到下一个起始行,保存它并结束当前记录
119                    self.next_line = Some(line);
120                    break;
121                }
122                Some(line) => {
123                    // 继续行
124                    record.add_line(line);
125                }
126                None => {
127                    // 文件结束
128                    self.finished = true;
129                    break;
130                }
131            }
132        }
133        Ok(())
134    }
135}
136
137impl<R: Read> Iterator for RecordParser<R> {
138    type Item = std::io::Result<Record>;
139
140    fn next(&mut self) -> Option<Self::Item> {
141        if self.finished {
142            return None;
143        }
144
145        // 获取记录的起始行
146        let start_line = match self.get_start_line() {
147            Ok(Some(line)) => line,
148            Ok(None) => return None,
149            Err(e) => return Some(Err(e)),
150        };
151
152        let mut record = Record::new(start_line);
153
154        // 读取继续行
155        match self.read_continuation_lines(&mut record) {
156            Ok(()) => Some(Ok(record)),
157            Err(e) => Some(Err(e)),
158        }
159    }
160}