dm_database_parser_sqllog/parser/
api.rs

1//! 便捷 API 函数
2//!
3//! 提供了一组方便使用的高层 API,用于快速解析 SQL 日志。
4
5use crate::error::ParseError;
6use crate::parser::record_parser::RecordParser;
7use crate::sqllog::Sqllog;
8use rayon::prelude::*;
9use std::collections;
10use std::fs::File;
11use std::io::{BufReader, Read};
12use std::path::Path;
13
14/// Sqllog 迭代器,使用批量缓冲 + 并行处理优化性能
15pub struct SqllogIterator<R: Read> {
16    record_parser: RecordParser<R>,
17    buffer: collections::VecDeque<Result<Sqllog, ParseError>>,
18    batch_size: usize,
19}
20
21impl<R: Read> SqllogIterator<R> {
22    /// 创建新的 SqllogIterator,使用默认批次大小(10000)
23    pub fn new(record_parser: RecordParser<R>) -> Self {
24        Self {
25            record_parser,
26            buffer: collections::VecDeque::new(),
27            batch_size: 10000, // 每次并行处理 1万条
28        }
29    }
30
31    /// 填充缓冲区:批量读取记录并并行解析
32    fn fill_buffer(&mut self) {
33        use crate::parser::record::Record;
34
35        let mut records: Vec<Record> = Vec::with_capacity(self.batch_size);
36
37        // 批量读取记录
38        for _ in 0..self.batch_size {
39            match self.record_parser.next() {
40                Some(Ok(record)) => records.push(record),
41                Some(Err(io_err)) => {
42                    self.buffer
43                        .push_back(Err(ParseError::IoError(io_err.to_string())));
44                }
45                None => break,
46            }
47        }
48
49        if records.is_empty() {
50            return;
51        }
52
53        // 并行解析
54        let results: Vec<Result<Sqllog, ParseError>> = records
55            .par_iter()
56            .map(|record| record.parse_to_sqllog())
57            .collect();
58
59        // 将结果放入缓冲区
60        for result in results {
61            self.buffer.push_back(result);
62        }
63    }
64}
65
66impl<R: Read> Iterator for SqllogIterator<R> {
67    type Item = Result<Sqllog, ParseError>;
68
69    fn next(&mut self) -> Option<Self::Item> {
70        // 如果缓冲区为空,尝试填充
71        if self.buffer.is_empty() {
72            self.fill_buffer();
73        }
74
75        // 从缓冲区取出结果
76        self.buffer.pop_front()
77    }
78}
79
80/// 从文件读取并返回 Sqllog 迭代器(流式处理)
81///
82/// 这是一个便捷函数,从文件读取日志并返回 `SqllogIterator` 迭代器。
83/// 使用迭代器可以避免一次性加载所有数据到内存,适合处理大文件。
84///
85/// # 参数
86///
87/// * `path` - 日志文件路径
88///
89/// # 返回
90///
91/// * `Ok(SqllogIterator<BufReader<File>>)` - Sqllog 迭代器
92/// * `Err(ParseError)` - 文件打开错误
93///
94/// # 示例
95///
96/// ```no_run
97/// use dm_database_parser_sqllog::iter_records_from_file;
98///
99/// let parser = iter_records_from_file("sqllog.txt")?;
100///
101/// let mut sqllog_count = 0;
102/// let mut error_count = 0;
103///
104/// for result in parser {
105///     match result {
106///         Ok(sqllog) => {
107///             sqllog_count += 1;
108///             println!("Sqllog {}: 用户={}, SQL={}",
109///                 sqllog_count, sqllog.meta.username, sqllog.body);
110///         }
111///         Err(err) => {
112///             error_count += 1;
113///             eprintln!("错误 {}: {}", error_count, err);
114///         }
115///     }
116/// }
117///
118/// println!("成功: {} 条, 错误: {} 个", sqllog_count, error_count);
119/// # Ok::<(), Box<dyn std::error::Error>>(())
120/// ```
121pub fn iter_records_from_file<P>(path: P) -> Result<SqllogIterator<BufReader<File>>, ParseError>
122where
123    P: AsRef<Path>,
124{
125    let path_ref = path.as_ref();
126    let file = File::open(path_ref).map_err(|e| ParseError::FileNotFound {
127        path: format!("{}: {}", path_ref.display(), e),
128    })?;
129    let reader = BufReader::new(file);
130    let record_parser = RecordParser::new(reader);
131    Ok(SqllogIterator::new(record_parser))
132}
133
134/// 从文件读取并并行解析为 Sqllog(高性能版本)
135///
136/// 此函数使用并行处理,将日志文件解析为 Sqllog 列表。
137/// 适合处理大文件(GB 级别),先识别所有记录,然后并行解析。
138///
139/// # 性能
140///
141/// - 1GB 文件(300万条记录):约 2.5-2.7 秒
142/// - 与流式处理性能相当(流式也使用批量并行优化)
143/// - 内存使用:批量加载所有记录到内存
144///
145/// # 参数
146///
147/// * `path` - 日志文件路径
148///
149/// # 返回
150///
151/// * `Ok((Vec<Sqllog>, Vec<ParseError>))` - 成功解析的 Sqllog 和遇到的错误
152/// * `Err(ParseError)` - 文件打开错误
153///
154/// # 示例
155///
156/// ```no_run
157/// use dm_database_parser_sqllog::parse_records_from_file;
158///
159/// let (sqllogs, errors) = parse_records_from_file("large_log.txt")?;
160///
161/// println!("成功解析 {} 条 SQL 日志", sqllogs.len());
162/// println!("遇到 {} 个错误", errors.len());
163///
164/// for sqllog in sqllogs.iter().take(10) {
165///     println!("用户: {}, SQL: {}", sqllog.meta.username, sqllog.body);
166/// }
167/// # Ok::<(), Box<dyn std::error::Error>>(())
168/// ```
169pub fn parse_records_from_file<P>(path: P) -> Result<(Vec<Sqllog>, Vec<ParseError>), ParseError>
170where
171    P: AsRef<Path>,
172{
173    // 直接使用流式迭代器收集所有结果(内部已经使用批量并行优化)
174    let mut sqllogs = Vec::new();
175    let mut errors = Vec::new();
176
177    for result in iter_records_from_file(path)? {
178        match result {
179            Ok(sqllog) => sqllogs.push(sqllog),
180            Err(err) => errors.push(err),
181        }
182    }
183
184    Ok((sqllogs, errors))
185}