pub fn parse_log_lines_parallel<I>(log_lines: I) -> Vec<ParseResult>
Expand description
并行批量解析多行日志 (高性能版本)
🚀 利用多核CPU并行处理大量日志,相比串行处理可获得7.28x性能提升, 特别适用于大数据量、高性能需求的场景。
§性能优势
- 7.28x速度提升: 从7,990提升到60,809 lines/sec
- 多核利用: 自动利用所有可用CPU核心
- 内存优化: 智能任务分割,避免内存溢出
- 容错处理: 单条解析失败不影响整体处理
§使用要求
需要在 Cargo.toml
中启用 parallel
feature:
[dependencies]
postfix-log-parser = { version = "0.2.0", features = ["parallel"] }
§参数说明
log_lines
- 实现了IntoIterator
的日志行集合,元素需支持Send
用于并发
§返回值
返回 Vec<ParseResult>
- 解析结果向量,顺序与输入一致
§性能建议
- 最佳场景: 1000+条日志,4+CPU核心
- 内存要求: 建议8GB+内存处理大文件
- 线程控制: 可通过
RAYON_NUM_THREADS
环境变量调整线程数
§示例用法
§基础并行处理
#[cfg(feature = "parallel")]
use postfix_log_parser::parse_log_lines_parallel;
#[cfg(feature = "parallel")]
fn process_large_logs() -> Result<(), Box<dyn std::error::Error>> {
let content = std::fs::read_to_string("/var/log/mail.log")?;
let lines: Vec<String> = content.lines().map(String::from).collect();
let results = parse_log_lines_parallel(lines);
println!("并行处理了 {} 条日志", results.len());
Ok(())
}
§性能测试示例
#[cfg(feature = "parallel")]
use postfix_log_parser::{parse_log_lines, parse_log_lines_parallel};
#[cfg(feature = "parallel")]
fn performance_comparison() -> Result<(), Box<dyn std::error::Error>> {
let lines: Vec<String> = std::fs::read_to_string("large_mail.log")?
.lines().map(String::from).collect();
// 串行处理基准
let start = std::time::Instant::now();
let _serial_results = parse_log_lines(&lines);
let serial_time = start.elapsed();
// 并行处理测试
let start = std::time::Instant::now();
let _parallel_results = parse_log_lines_parallel(lines);
let parallel_time = start.elapsed();
let speedup = serial_time.as_secs_f64() / parallel_time.as_secs_f64();
println!("并行处理速度提升: {:.2}x", speedup);
Ok(())
}
§大文件分块处理
#[cfg(feature = "parallel")]
use postfix_log_parser::parse_log_lines_parallel;
use std::io::{BufRead, BufReader};
use std::fs::File;
#[cfg(feature = "parallel")]
fn process_huge_file_in_chunks() -> Result<(), Box<dyn std::error::Error>> {
const CHUNK_SIZE: usize = 10000; // 每批处理10K行
let file = File::open("/var/log/huge_mail.log")?;
let reader = BufReader::new(file);
let mut chunk: Vec<String> = Vec::with_capacity(CHUNK_SIZE);
for line in reader.lines() {
chunk.push(line?);
if chunk.len() >= CHUNK_SIZE {
let results = parse_log_lines_parallel(chunk.drain(..).collect::<Vec<String>>());
println!("处理了一批 {} 条日志", results.len());
}
}
// 处理剩余数据
if !chunk.is_empty() {
let results = parse_log_lines_parallel(chunk);
println!("处理了最后 {} 条日志", results.len());
}
Ok(())
}
§⚠️ 注意事项
- 小数据量(<1000条)建议使用串行
parse_log_lines
- 输入数据必须实现
Send
trait以支持并发 - 并行处理会增加内存使用量(约4x)
- 不保证输出顺序严格对应输入顺序(实际测试中通常一致)