postfix-log-parser 0.2.0

高性能模块化Postfix日志解析器,经3.2GB生产数据验证,SMTPD事件100%准确率
Documentation
// 此示例需要启用 parallel 特性
// 在 Cargo.toml 中添加: postfix-log-parser = { version = "0.2.0", features = ["parallel"] }

use std::time::Instant;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Postfix Log Parser - 并行处理示例");
    println!("{}", "=".repeat(50));

    // 生成大量示例日志数据
    let log_data = generate_sample_logs(10000);
    println!("生成了 {} 条示例日志", log_data.len());

    // 测试串行处理
    println!("\n🔄 串行处理测试:");
    let start = Instant::now();
    let serial_results = postfix_log_parser::parse_log_lines(&log_data);
    let serial_duration = start.elapsed();

    let serial_success = serial_results.iter().filter(|r| r.event.is_some()).count();
    println!("   耗时: {:?}", serial_duration);
    println!("   成功解析: {}/{}", serial_success, log_data.len());

    // 测试并行处理 (如果启用了 parallel 特性)
    #[cfg(feature = "parallel")]
    {
        println!("\n⚡ 并行处理测试:");
        let start = Instant::now();
        let parallel_results = postfix_log_parser::parse_log_lines_parallel(&log_data);
        let parallel_duration = start.elapsed();

        let parallel_success = parallel_results
            .iter()
            .filter(|r| r.event.is_some())
            .count();
        println!("   耗时: {:?}", parallel_duration);
        println!("   成功解析: {}/{}", parallel_success, log_data.len());

        // 性能比较
        let speedup = serial_duration.as_secs_f64() / parallel_duration.as_secs_f64();
        println!("\n📊 性能对比:");
        println!("   串行处理: {:?}", serial_duration);
        println!("   并行处理: {:?}", parallel_duration);
        println!("   加速比: {:.2}x", speedup);

        // 验证结果一致性
        if serial_success == parallel_success {
            println!("   ✅ 结果一致性: 通过");
        } else {
            println!(
                "   ❌ 结果不一致: 串行{} vs 并行{}",
                serial_success, parallel_success
            );
        }
    }

    #[cfg(not(feature = "parallel"))]
    {
        println!("\n💡 提示: 要启用并行处理,请在 Cargo.toml 中添加:");
        println!("   postfix-log-parser = {{ version = \"0.2.0\", features = [\"parallel\"] }}");
    }

    // 内存使用估算
    let estimated_memory = estimate_memory_usage(&log_data);
    println!("\n💾 内存使用估算: {:.2} MB", estimated_memory);

    Ok(())
}

fn generate_sample_logs(count: usize) -> Vec<String> {
    let templates = vec![
        "Dec 30 12:34:{:02} mail01 postfix/smtpd[{}]: connect from client{}.example.com[192.168.1.{}]",
        "Dec 30 12:34:{:02} mail01 postfix/smtp[{}]: {}: to=<user{}@example.com>, relay=mx.example.com[1.2.3.4]:25, delay=0.5, status=sent",
        "Dec 30 12:34:{:02} mail01 postfix/qmgr[{}]: {}: from=<sender{}@example.com>, size={}, nrcpt=1 (queue active)",
        "Dec 30 12:34:{:02} mail01 postfix/cleanup[{}]: {}: message-id=<msg{}@example.com>",
        "Dec 30 12:34:{:02} mail01 postfix/qmgr[{}]: {}: removed",
    ];

    let mut logs = Vec::with_capacity(count);

    for i in 0..count {
        let template_idx = i % templates.len();

        let log = match template_idx {
            0 => format!("Dec 30 12:34:{:02} mail01 postfix/smtpd[{}]: connect from client{}.example.com[192.168.1.{}]", i % 60, 12000 + i, i, i % 254),
            1 => format!("Dec 30 12:34:{:02} mail01 postfix/smtp[{}]: {}: to=<user{}@example.com>, relay=mx.example.com[1.2.3.4]:25, delay=0.5, status=sent", i % 60, 13000 + i, format!("4bG4VR{:06}", i), i),
            2 => format!("Dec 30 12:34:{:02} mail01 postfix/qmgr[{}]: {}: from=<sender{}@example.com>, size={}, nrcpt=1 (queue active)", i % 60, 14000 + i, format!("4bG4VR{:06}", i), i, 1000 + (i % 5000)),
            3 => format!("Dec 30 12:34:{:02} mail01 postfix/cleanup[{}]: {}: message-id=<msg{}@example.com>", i % 60, 15000 + i, format!("4bG4VR{:06}", i), i),
            4 => format!("Dec 30 12:34:{:02} mail01 postfix/qmgr[{}]: {}: removed", i % 60, 14000 + i, format!("4bG4VR{:06}", i)),
            _ => unreachable!(),
        };

        logs.push(log);
    }

    logs
}

fn estimate_memory_usage(logs: &[String]) -> f64 {
    let total_chars: usize = logs.iter().map(|s| s.len()).sum();
    let base_memory = total_chars * std::mem::size_of::<char>();

    // 估算解析后的结构体大小
    let struct_overhead = logs.len() * 500; // 每个ParseResult大约500字节

    (base_memory + struct_overhead) as f64 / 1024.0 / 1024.0
}

#[cfg(feature = "parallel")]
fn demonstrate_scaling() {
    use std::sync::Arc;
    use std::thread;

    println!("\n🔬 并行扩展性测试:");

    let sizes = vec![1000, 5000, 10000, 50000];

    for size in sizes {
        let logs = generate_sample_logs(size);
        let logs = Arc::new(logs);
        println!("\n📏 数据规模: {} 条日志", size);

        // 测试不同线程数
        for thread_count in [1, 2, 4, 8] {
            let logs_clone = Arc::clone(&logs);
            let start = Instant::now();

            let chunk_size = logs.len() / thread_count;
            let mut handles = vec![];

            for i in 0..thread_count {
                let logs_ref = Arc::clone(&logs_clone);
                let start_idx = i * chunk_size;
                let end_idx = if i == thread_count - 1 {
                    logs.len()
                } else {
                    (i + 1) * chunk_size
                };

                let handle = thread::spawn(move || {
                    let chunk = &logs_ref[start_idx..end_idx];
                    postfix_log_parser::parse_log_lines(chunk)
                });

                handles.push(handle);
            }

            let mut total_results = 0;
            for handle in handles {
                let results = handle.join().unwrap();
                total_results += results.len();
            }

            let duration = start.elapsed();
            println!(
                "   {} 线程: {:?} ({} 结果)",
                thread_count, duration, total_results
            );
        }
    }
}