dm_database_parser_sqllog/parser/
api.rs

1//! 便捷 API 函数
2//!
3//! 提供了一组方便使用的高层 API,用于快速解析 SQL 日志。
4
5use crate::error::ParseError;
6use crate::parser::record_parser::RecordParser;
7use crate::sqllog::Sqllog;
8use rayon::prelude::*;
9use std::fs::File;
10use std::io::{BufReader, Read};
11use std::path::Path;
12
13/// Sqllog 迭代器,使用批量缓冲 + 并行处理优化性能
14pub struct SqllogIterator<R: Read> {
15    record_parser: RecordParser<R>,
16    buffer: std::collections::VecDeque<Result<Sqllog, ParseError>>,
17    batch_size: usize,
18}
19
20impl<R: Read> SqllogIterator<R> {
21    /// 创建新的 SqllogIterator,使用默认批次大小(10000)
22    pub fn new(record_parser: RecordParser<R>) -> Self {
23        Self {
24            record_parser,
25            buffer: std::collections::VecDeque::new(),
26            batch_size: 10000, // 每次并行处理 1万条
27        }
28    }
29
30    /// 填充缓冲区:批量读取记录并并行解析
31    fn fill_buffer(&mut self) {
32        use crate::parser::record::Record;
33
34        let mut records: Vec<Record> = Vec::with_capacity(self.batch_size);
35
36        // 批量读取记录
37        for _ in 0..self.batch_size {
38            match self.record_parser.next() {
39                Some(Ok(record)) => records.push(record),
40                Some(Err(io_err)) => {
41                    self.buffer
42                        .push_back(Err(ParseError::IoError(io_err.to_string())));
43                }
44                None => break,
45            }
46        }
47
48        if records.is_empty() {
49            return;
50        }
51
52        // 并行解析
53        let results: Vec<Result<Sqllog, ParseError>> = records
54            .par_iter()
55            .map(|record| record.parse_to_sqllog())
56            .collect();
57
58        // 将结果放入缓冲区
59        for result in results {
60            self.buffer.push_back(result);
61        }
62    }
63}
64
65impl<R: Read> Iterator for SqllogIterator<R> {
66    type Item = Result<Sqllog, ParseError>;
67
68    fn next(&mut self) -> Option<Self::Item> {
69        // 如果缓冲区为空,尝试填充
70        if self.buffer.is_empty() {
71            self.fill_buffer();
72        }
73
74        // 从缓冲区取出结果
75        self.buffer.pop_front()
76    }
77}
78
79/// 从文件读取并返回 Sqllog 迭代器(流式处理)
80///
81/// 这是一个便捷函数,从文件读取日志并返回 `SqllogIterator` 迭代器。
82/// 使用迭代器可以避免一次性加载所有数据到内存,适合处理大文件。
83///
84/// # 参数
85///
86/// * `path` - 日志文件路径
87///
88/// # 返回
89///
90/// * `Ok(SqllogIterator<BufReader<File>>)` - Sqllog 迭代器
91/// * `Err(ParseError)` - 文件打开错误
92///
93/// # 示例
94///
95/// ```no_run
96/// use dm_database_parser_sqllog::iter_records_from_file;
97///
98/// let parser = iter_records_from_file("sqllog.txt")?;
99///
100/// let mut sqllog_count = 0;
101/// let mut error_count = 0;
102///
103/// for result in parser {
104///     match result {
105///         Ok(sqllog) => {
106///             sqllog_count += 1;
107///             println!("Sqllog {}: 用户={}, SQL={}",
108///                 sqllog_count, sqllog.meta.username, sqllog.body);
109///         }
110///         Err(err) => {
111///             error_count += 1;
112///             eprintln!("错误 {}: {}", error_count, err);
113///         }
114///     }
115/// }
116///
117/// println!("成功: {} 条, 错误: {} 个", sqllog_count, error_count);
118/// # Ok::<(), Box<dyn std::error::Error>>(())
119/// ```
120pub fn iter_records_from_file<P>(path: P) -> Result<SqllogIterator<BufReader<File>>, ParseError>
121where
122    P: AsRef<Path>,
123{
124    let path_ref = path.as_ref();
125    let file = File::open(path_ref).map_err(|e| ParseError::FileNotFound {
126        path: format!("{}: {}", path_ref.display(), e),
127    })?;
128    let reader = BufReader::new(file);
129    let record_parser = RecordParser::new(reader);
130    Ok(SqllogIterator::new(record_parser))
131}
132
133/// 从文件读取并并行解析为 Sqllog(高性能版本)
134///
135/// 此函数使用并行处理,将日志文件解析为 Sqllog 列表。
136/// 适合处理大文件(GB 级别),先识别所有记录,然后并行解析。
137///
138/// # 性能
139///
140/// - 1GB 文件(300万条记录):约 2.5-2.7 秒
141/// - 与流式处理性能相当(流式也使用批量并行优化)
142/// - 内存使用:批量加载所有记录到内存
143///
144/// # 参数
145///
146/// * `path` - 日志文件路径
147///
148/// # 返回
149///
150/// * `Ok((Vec<Sqllog>, Vec<ParseError>))` - 成功解析的 Sqllog 和遇到的错误
151/// * `Err(ParseError)` - 文件打开错误
152///
153/// # 示例
154///
155/// ```no_run
156/// use dm_database_parser_sqllog::parse_records_from_file;
157///
158/// let (sqllogs, errors) = parse_records_from_file("large_log.txt")?;
159///
160/// println!("成功解析 {} 条 SQL 日志", sqllogs.len());
161/// println!("遇到 {} 个错误", errors.len());
162///
163/// for sqllog in sqllogs.iter().take(10) {
164///     println!("用户: {}, SQL: {}", sqllog.meta.username, sqllog.body);
165/// }
166/// # Ok::<(), Box<dyn std::error::Error>>(())
167/// ```
168pub fn parse_records_from_file<P>(path: P) -> Result<(Vec<Sqllog>, Vec<ParseError>), ParseError>
169where
170    P: AsRef<Path>,
171{
172    // 直接使用流式迭代器收集所有结果(内部已经使用批量并行优化)
173    let mut sqllogs = Vec::new();
174    let mut errors = Vec::new();
175
176    for result in iter_records_from_file(path)? {
177        match result {
178            Ok(sqllog) => sqllogs.push(sqllog),
179            Err(err) => errors.push(err),
180        }
181    }
182
183    Ok((sqllogs, errors))
184}