use std::time::Instant;
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Postfix Log Parser - 并行处理示例");
println!("{}", "=".repeat(50));
let log_data = generate_sample_logs(10000);
println!("生成了 {} 条示例日志", log_data.len());
println!("\n🔄 串行处理测试:");
let start = Instant::now();
let serial_results = postfix_log_parser::parse_log_lines(&log_data);
let serial_duration = start.elapsed();
let serial_success = serial_results.iter().filter(|r| r.event.is_some()).count();
println!(" 耗时: {:?}", serial_duration);
println!(" 成功解析: {}/{}", serial_success, log_data.len());
#[cfg(feature = "parallel")]
{
println!("\n⚡ 并行处理测试:");
let start = Instant::now();
let parallel_results = postfix_log_parser::parse_log_lines_parallel(&log_data);
let parallel_duration = start.elapsed();
let parallel_success = parallel_results
.iter()
.filter(|r| r.event.is_some())
.count();
println!(" 耗时: {:?}", parallel_duration);
println!(" 成功解析: {}/{}", parallel_success, log_data.len());
let speedup = serial_duration.as_secs_f64() / parallel_duration.as_secs_f64();
println!("\n📊 性能对比:");
println!(" 串行处理: {:?}", serial_duration);
println!(" 并行处理: {:?}", parallel_duration);
println!(" 加速比: {:.2}x", speedup);
if serial_success == parallel_success {
println!(" ✅ 结果一致性: 通过");
} else {
println!(
" ❌ 结果不一致: 串行{} vs 并行{}",
serial_success, parallel_success
);
}
}
#[cfg(not(feature = "parallel"))]
{
println!("\n💡 提示: 要启用并行处理,请在 Cargo.toml 中添加:");
println!(" postfix-log-parser = {{ version = \"0.2.0\", features = [\"parallel\"] }}");
}
let estimated_memory = estimate_memory_usage(&log_data);
println!("\n💾 内存使用估算: {:.2} MB", estimated_memory);
Ok(())
}
fn generate_sample_logs(count: usize) -> Vec<String> {
let templates = vec![
"Dec 30 12:34:{:02} mail01 postfix/smtpd[{}]: connect from client{}.example.com[192.168.1.{}]",
"Dec 30 12:34:{:02} mail01 postfix/smtp[{}]: {}: to=<user{}@example.com>, relay=mx.example.com[1.2.3.4]:25, delay=0.5, status=sent",
"Dec 30 12:34:{:02} mail01 postfix/qmgr[{}]: {}: from=<sender{}@example.com>, size={}, nrcpt=1 (queue active)",
"Dec 30 12:34:{:02} mail01 postfix/cleanup[{}]: {}: message-id=<msg{}@example.com>",
"Dec 30 12:34:{:02} mail01 postfix/qmgr[{}]: {}: removed",
];
let mut logs = Vec::with_capacity(count);
for i in 0..count {
let template_idx = i % templates.len();
let log = match template_idx {
0 => format!("Dec 30 12:34:{:02} mail01 postfix/smtpd[{}]: connect from client{}.example.com[192.168.1.{}]", i % 60, 12000 + i, i, i % 254),
1 => format!("Dec 30 12:34:{:02} mail01 postfix/smtp[{}]: {}: to=<user{}@example.com>, relay=mx.example.com[1.2.3.4]:25, delay=0.5, status=sent", i % 60, 13000 + i, format!("4bG4VR{:06}", i), i),
2 => format!("Dec 30 12:34:{:02} mail01 postfix/qmgr[{}]: {}: from=<sender{}@example.com>, size={}, nrcpt=1 (queue active)", i % 60, 14000 + i, format!("4bG4VR{:06}", i), i, 1000 + (i % 5000)),
3 => format!("Dec 30 12:34:{:02} mail01 postfix/cleanup[{}]: {}: message-id=<msg{}@example.com>", i % 60, 15000 + i, format!("4bG4VR{:06}", i), i),
4 => format!("Dec 30 12:34:{:02} mail01 postfix/qmgr[{}]: {}: removed", i % 60, 14000 + i, format!("4bG4VR{:06}", i)),
_ => unreachable!(),
};
logs.push(log);
}
logs
}
fn estimate_memory_usage(logs: &[String]) -> f64 {
let total_chars: usize = logs.iter().map(|s| s.len()).sum();
let base_memory = total_chars * std::mem::size_of::<char>();
let struct_overhead = logs.len() * 500;
(base_memory + struct_overhead) as f64 / 1024.0 / 1024.0
}
#[cfg(feature = "parallel")]
fn demonstrate_scaling() {
use std::sync::Arc;
use std::thread;
println!("\n🔬 并行扩展性测试:");
let sizes = vec![1000, 5000, 10000, 50000];
for size in sizes {
let logs = generate_sample_logs(size);
let logs = Arc::new(logs);
println!("\n📏 数据规模: {} 条日志", size);
for thread_count in [1, 2, 4, 8] {
let logs_clone = Arc::clone(&logs);
let start = Instant::now();
let chunk_size = logs.len() / thread_count;
let mut handles = vec![];
for i in 0..thread_count {
let logs_ref = Arc::clone(&logs_clone);
let start_idx = i * chunk_size;
let end_idx = if i == thread_count - 1 {
logs.len()
} else {
(i + 1) * chunk_size
};
let handle = thread::spawn(move || {
let chunk = &logs_ref[start_idx..end_idx];
postfix_log_parser::parse_log_lines(chunk)
});
handles.push(handle);
}
let mut total_results = 0;
for handle in handles {
let results = handle.join().unwrap();
total_results += results.len();
}
let duration = start.elapsed();
println!(
" {} 线程: {:?} ({} 结果)",
thread_count, duration, total_results
);
}
}
}