dm_database_parser_sqllog/parser/
record_parser.rs

1//! RecordParser - 从 Reader 流式读取并解析 Record
2//!
3//! 提供了一个迭代器,可以从任何实现了 `Read` trait 的源中逐条读取日志记录。
4
5use crate::parser::record::Record;
6use crate::tools::is_record_start_line;
7use std::{
8    io::{self, BufRead, BufReader, Read},
9    mem,
10};
11
12/// 从 Reader 中按行读取并解析成 Record 的迭代器
13///
14/// `RecordParser` 实现了 `Iterator` trait,可以逐条读取日志记录。
15/// 它会自动识别记录的起始行和继续行,并将它们组合成完整的 `Record`。
16///
17/// # 类型参数
18///
19/// * `R` - 实现了 `Read` trait 的类型
20pub struct RecordParser<R: Read> {
21    reader: BufReader<R>,
22    buffer: String,
23    next_line: Option<String>,
24    finished: bool,
25}
26
27impl<R: Read> RecordParser<R> {
28    pub fn new(reader: R) -> Self {
29        Self {
30            reader: BufReader::new(reader),
31            buffer: String::new(),
32            next_line: None,
33            finished: false,
34        }
35    }
36
37    /// 读取下一行
38    fn read_line(&mut self) -> io::Result<Option<String>> {
39        self.buffer.clear();
40        let bytes_read = self.reader.read_line(&mut self.buffer)?;
41
42        if bytes_read == 0 {
43            Ok(None)
44        } else {
45            // 优化:原地移除换行符,避免创建新字符串
46            let mut len = self.buffer.len();
47            while len > 0 {
48                let last_byte = self.buffer.as_bytes()[len - 1];
49                if last_byte == b'\n' || last_byte == b'\r' {
50                    len -= 1;
51                } else {
52                    break;
53                }
54            }
55
56            // 只在需要时才创建新字符串(避免额外的 trim + to_string 开销)
57            if len != self.buffer.len() {
58                self.buffer.truncate(len);
59            }
60
61            // 使用 mem::take 避免额外的克隆,保持缓冲区容量
62            Ok(Some(mem::take(&mut self.buffer)))
63        }
64    }
65
66    /// 获取下一个记录的起始行
67    fn get_start_line(&mut self) -> io::Result<Option<String>> {
68        // 如果有缓存的下一行(上次读取时遇到的新起始行)
69        if let Some(line) = self.next_line.take() {
70            return Ok(Some(line));
71        }
72
73        // 读取并跳过非起始行,直到找到第一个有效起始行
74        loop {
75            match self.read_line()? {
76                Some(line) if is_record_start_line(&line) => return Ok(Some(line)),
77                Some(_) => continue, // 跳过非起始行
78                None => {
79                    self.finished = true;
80                    return Ok(None);
81                }
82            }
83        }
84    }
85
86    /// 读取当前记录的所有继续行
87    fn read_continuation_lines(&mut self, record: &mut Record) -> io::Result<()> {
88        loop {
89            match self.read_line()? {
90                Some(line) if is_record_start_line(&line) => {
91                    // 遇到下一个起始行,保存它并结束当前记录
92                    self.next_line = Some(line);
93                    break;
94                }
95                Some(line) => {
96                    // 继续行
97                    record.add_line(line);
98                }
99                None => {
100                    // 文件结束
101                    self.finished = true;
102                    break;
103                }
104            }
105        }
106        Ok(())
107    }
108}
109
110impl<R: Read> Iterator for RecordParser<R> {
111    type Item = io::Result<Record>;
112
113    fn next(&mut self) -> Option<Self::Item> {
114        if self.finished {
115            return None;
116        }
117
118        // 获取记录的起始行
119        let start_line = match self.get_start_line() {
120            Ok(Some(line)) => line,
121            Ok(None) => return None,
122            Err(e) => return Some(Err(e)),
123        };
124
125        let mut record = Record::new(start_line);
126
127        // 读取继续行
128        match self.read_continuation_lines(&mut record) {
129            Ok(()) => Some(Ok(record)),
130            Err(e) => Some(Err(e)),
131        }
132    }
133}