dm_database_parser_sqllog/parser/api.rs
1//! 便捷 API 函数
2//!
3//! 提供了一组方便使用的高层 API,用于快速解析 SQL 日志。
4
5use crate::error::ParseError;
6use crate::parser::record_parser::RecordParser;
7use crate::sqllog::Sqllog;
8use rayon::prelude::*;
9use std::fs::File;
10use std::io::{BufReader, Read};
11use std::path::Path;
12
13/// Sqllog 迭代器,使用批量缓冲 + 并行处理优化性能
14pub struct SqllogIterator<R: Read> {
15 record_parser: RecordParser<R>,
16 buffer: std::collections::VecDeque<Result<Sqllog, ParseError>>,
17 batch_size: usize,
18}
19
20impl<R: Read> SqllogIterator<R> {
21 /// 创建新的 SqllogIterator,使用默认批次大小(10000)
22 pub fn new(record_parser: RecordParser<R>) -> Self {
23 Self {
24 record_parser,
25 buffer: std::collections::VecDeque::new(),
26 batch_size: 10000, // 每次并行处理 1万条
27 }
28 }
29
30 /// 填充缓冲区:批量读取记录并并行解析
31 fn fill_buffer(&mut self) {
32 use crate::parser::record::Record;
33
34 let mut records: Vec<Record> = Vec::with_capacity(self.batch_size);
35
36 // 批量读取记录
37 for _ in 0..self.batch_size {
38 match self.record_parser.next() {
39 Some(Ok(record)) => records.push(record),
40 Some(Err(io_err)) => {
41 self.buffer
42 .push_back(Err(ParseError::IoError(io_err.to_string())));
43 }
44 None => break,
45 }
46 }
47
48 if records.is_empty() {
49 return;
50 }
51
52 // 并行解析
53 let results: Vec<Result<Sqllog, ParseError>> = records
54 .par_iter()
55 .map(|record| record.parse_to_sqllog())
56 .collect();
57
58 // 将结果放入缓冲区
59 for result in results {
60 self.buffer.push_back(result);
61 }
62 }
63}
64
65impl<R: Read> Iterator for SqllogIterator<R> {
66 type Item = Result<Sqllog, ParseError>;
67
68 fn next(&mut self) -> Option<Self::Item> {
69 // 如果缓冲区为空,尝试填充
70 if self.buffer.is_empty() {
71 self.fill_buffer();
72 }
73
74 // 从缓冲区取出结果
75 self.buffer.pop_front()
76 }
77}
78
79/// 从文件读取并返回 Sqllog 迭代器(流式处理)
80///
81/// 这是一个便捷函数,从文件读取日志并返回 `SqllogIterator` 迭代器。
82/// 使用迭代器可以避免一次性加载所有数据到内存,适合处理大文件。
83///
84/// # 参数
85///
86/// * `path` - 日志文件路径
87///
88/// # 返回
89///
90/// * `Ok(SqllogIterator<BufReader<File>>)` - Sqllog 迭代器
91/// * `Err(ParseError)` - 文件打开错误
92///
93/// # 示例
94///
95/// ```no_run
96/// use dm_database_parser_sqllog::iter_records_from_file;
97///
98/// let parser = iter_records_from_file("sqllog.txt")?;
99///
100/// let mut sqllog_count = 0;
101/// let mut error_count = 0;
102///
103/// for result in parser {
104/// match result {
105/// Ok(sqllog) => {
106/// sqllog_count += 1;
107/// println!("Sqllog {}: 用户={}, SQL={}",
108/// sqllog_count, sqllog.meta.username, sqllog.body);
109/// }
110/// Err(err) => {
111/// error_count += 1;
112/// eprintln!("错误 {}: {}", error_count, err);
113/// }
114/// }
115/// }
116///
117/// println!("成功: {} 条, 错误: {} 个", sqllog_count, error_count);
118/// # Ok::<(), Box<dyn std::error::Error>>(())
119/// ```
120pub fn iter_records_from_file<P>(path: P) -> Result<SqllogIterator<BufReader<File>>, ParseError>
121where
122 P: AsRef<Path>,
123{
124 let path_ref = path.as_ref();
125 let file = File::open(path_ref).map_err(|e| ParseError::FileNotFound {
126 path: format!("{}: {}", path_ref.display(), e),
127 })?;
128 let reader = BufReader::new(file);
129 let record_parser = RecordParser::new(reader);
130 Ok(SqllogIterator::new(record_parser))
131}
132
133/// 从文件读取并并行解析为 Sqllog(高性能版本)
134///
135/// 此函数使用并行处理,将日志文件解析为 Sqllog 列表。
136/// 适合处理大文件(GB 级别),先识别所有记录,然后并行解析。
137///
138/// # 性能
139///
140/// - 1GB 文件(300万条记录):约 2.5-2.7 秒
141/// - 与流式处理性能相当(流式也使用批量并行优化)
142/// - 内存使用:批量加载所有记录到内存
143///
144/// # 参数
145///
146/// * `path` - 日志文件路径
147///
148/// # 返回
149///
150/// * `Ok((Vec<Sqllog>, Vec<ParseError>))` - 成功解析的 Sqllog 和遇到的错误
151/// * `Err(ParseError)` - 文件打开错误
152///
153/// # 示例
154///
155/// ```no_run
156/// use dm_database_parser_sqllog::parse_records_from_file;
157///
158/// let (sqllogs, errors) = parse_records_from_file("large_log.txt")?;
159///
160/// println!("成功解析 {} 条 SQL 日志", sqllogs.len());
161/// println!("遇到 {} 个错误", errors.len());
162///
163/// for sqllog in sqllogs.iter().take(10) {
164/// println!("用户: {}, SQL: {}", sqllog.meta.username, sqllog.body);
165/// }
166/// # Ok::<(), Box<dyn std::error::Error>>(())
167/// ```
168pub fn parse_records_from_file<P>(path: P) -> Result<(Vec<Sqllog>, Vec<ParseError>), ParseError>
169where
170 P: AsRef<Path>,
171{
172 // 直接使用流式迭代器收集所有结果(内部已经使用批量并行优化)
173 let mut sqllogs = Vec::new();
174 let mut errors = Vec::new();
175
176 for result in iter_records_from_file(path)? {
177 match result {
178 Ok(sqllog) => sqllogs.push(sqllog),
179 Err(err) => errors.push(err),
180 }
181 }
182
183 Ok((sqllogs, errors))
184}