postfix-log-parser 0.2.0

高性能模块化Postfix日志解析器,经3.2GB生产数据验证,SMTPD事件100%准确率
Documentation
use postfix_log_parser::parse_log_lines;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::time::Instant;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Postfix Log Parser - 文件处理示例");
    println!("{}", "=".repeat(50));

    // 如果有命令行参数,使用指定的文件
    let args: Vec<String> = std::env::args().collect();
    let filename = if args.len() > 1 {
        &args[1]
    } else {
        // 创建示例日志文件用于演示
        create_sample_log_file("sample.log")?;
        "sample.log"
    };

    println!("📄 处理日志文件: {}", filename);

    // 方式1: 一次性读取所有行 (适合小文件)
    println!("\n🔄 方式1: 批量处理 (一次性加载)");
    batch_process_file(filename)?;

    // 方式2: 逐行流式处理 (适合大文件)
    println!("\n🌊 方式2: 流式处理 (逐行读取)");
    stream_process_file(filename)?;

    // 方式3: 分块处理 (平衡内存和性能)
    println!("\n📦 方式3: 分块处理");
    chunk_process_file(filename, 1000)?;

    // 清理示例文件
    if filename == "sample.log" {
        std::fs::remove_file(filename).ok();
        println!("\n🧹 已清理示例文件");
    }

    Ok(())
}

/// 批量处理:一次性读取所有行
fn batch_process_file(filename: &str) -> Result<(), Box<dyn std::error::Error>> {
    let start = Instant::now();
    
    let content = std::fs::read_to_string(filename)?;
    let lines: Vec<&str> = content.lines().collect();
    
    println!("   📖 读取了 {} 行日志", lines.len());
    
    let results = parse_log_lines(lines);
    let duration = start.elapsed();
    
    analyze_results(&results);
    println!("   ⏱️  处理耗时: {:?}", duration);
    
    Ok(())
}

/// 流式处理:逐行读取和处理
fn stream_process_file(filename: &str) -> Result<(), Box<dyn std::error::Error>> {
    let start = Instant::now();
    
    let file = File::open(filename)?;
    let reader = BufReader::new(file);
    
    let mut total_lines = 0;
    let mut success_count = 0;
    let mut component_stats: HashMap<String, u32> = HashMap::new();
    
    for line in reader.lines() {
        let line = line?;
        total_lines += 1;
        
        let result = postfix_log_parser::parse_log_line(&line);
        
        if let Some(event) = result.event {
            success_count += 1;
            *component_stats.entry(event.component).or_insert(0) += 1;
            
            // 可以在这里实时处理每个事件
            if total_lines % 1000 == 0 {
                println!("   📊 已处理 {} 行...", total_lines);
            }
        }
    }
    
    let duration = start.elapsed();
    
    println!("   📖 总共处理: {}", total_lines);
    println!("   ✅ 成功解析: {} 行 ({:.1}%)", 
        success_count, 
        success_count as f32 / total_lines as f32 * 100.0
    );
    println!("   ⏱️  处理耗时: {:?}", duration);
    
    println!("   🏷️  组件统计:");
    for (component, count) in component_stats {
        println!("      {}: {}", component, count);
    }
    
    Ok(())
}

/// 分块处理:平衡内存使用和性能
fn chunk_process_file(filename: &str, chunk_size: usize) -> Result<(), Box<dyn std::error::Error>> {
    let start = Instant::now();
    
    let file = File::open(filename)?;
    let reader = BufReader::new(file);
    
    let mut chunk = Vec::with_capacity(chunk_size);
    let mut total_lines = 0;
    let mut total_success = 0;
    
    for line in reader.lines() {
        let line = line?;
        chunk.push(line);
        
        if chunk.len() >= chunk_size {
            let success = process_chunk(&chunk);
            total_success += success;
            total_lines += chunk.len();
            
            println!("   📦 处理了一个块: {} 行,成功 {}", chunk.len(), success);
            chunk.clear();
        }
    }
    
    // 处理最后一个不完整的块
    if !chunk.is_empty() {
        let success = process_chunk(&chunk);
        total_success += success;
        total_lines += chunk.len();
        println!("   📦 处理了最后块: {} 行,成功 {}", chunk.len(), success);
    }
    
    let duration = start.elapsed();
    
    println!("   📖 总共处理: {}", total_lines);
    println!("   ✅ 成功解析: {} 行 ({:.1}%)", 
        total_success, 
        total_success as f32 / total_lines as f32 * 100.0
    );
    println!("   ⏱️  处理耗时: {:?}", duration);
    
    Ok(())
}

/// 处理一个数据块
fn process_chunk(chunk: &[String]) -> usize {
    let results = parse_log_lines(chunk.iter().map(|s| s.as_str()));
    results.iter().filter(|r| r.event.is_some()).count()
}

/// 分析解析结果
fn analyze_results(results: &[postfix_log_parser::ParseResult]) {
    let success_count = results.iter().filter(|r| r.event.is_some()).count();
    let total_count = results.len();
    
    println!("   📊 解析统计:");
    println!("      总行数: {}", total_count);
    println!("      成功: {} ({:.1}%)", success_count, success_count as f32 / total_count as f32 * 100.0);
    println!("      失败: {}", total_count - success_count);
    
    // 组件统计
    let mut component_stats: HashMap<String, u32> = HashMap::new();
    let mut event_stats: HashMap<String, u32> = HashMap::new();
    let mut confidence_sum = 0.0f32;
    let mut confidence_count = 0;
    
    for result in results {
        if let Some(event) = &result.event {
            *component_stats.entry(event.component.clone()).or_insert(0) += 1;
            *event_stats.entry(event.event_type().to_string()).or_insert(0) += 1;
            confidence_sum += result.confidence;
            confidence_count += 1;
        }
    }
    
    if confidence_count > 0 {
        println!("      平均置信度: {:.2}", confidence_sum / confidence_count as f32);
    }
    
    println!("   🏷️  热门组件 (TOP 5):");
    let mut sorted_components: Vec<_> = component_stats.iter().collect();
    sorted_components.sort_by(|a, b| b.1.cmp(a.1));
    for (component, count) in sorted_components.iter().take(5) {
        println!("      {}: {}", component, count);
    }
    
    println!("   🎯 热门事件类型 (TOP 5):");
    let mut sorted_events: Vec<_> = event_stats.iter().collect();
    sorted_events.sort_by(|a, b| b.1.cmp(a.1));
    for (event_type, count) in sorted_events.iter().take(5) {
        println!("      {}: {}", event_type, count);
    }
}

/// 创建示例日志文件用于演示
fn create_sample_log_file(filename: &str) -> Result<(), Box<dyn std::error::Error>> {
    use std::io::Write;
    
    let mut file = File::create(filename)?;
    
    let sample_logs = vec![
        "Dec 30 12:34:56 mail01 postfix/smtpd[12345]: connect from client1.example.com[192.168.1.100]",
        "Dec 30 12:34:56 mail01 postfix/smtpd[12345]: 4bG4VR5z1Tgz6pqsd: client=client1.example.com[192.168.1.100]",
        "Dec 30 12:34:57 mail01 postfix/cleanup[12346]: 4bG4VR5z1Tgz6pqsd: message-id=<test1@example.com>",
        "Dec 30 12:34:58 mail01 postfix/qmgr[12347]: 4bG4VR5z1Tgz6pqsd: from=<sender1@example.com>, size=1234, nrcpt=1 (queue active)",
        "Dec 30 12:34:59 mail01 postfix/smtp[12348]: 4bG4VR5z1Tgz6pqsd: to=<recipient1@example.com>, relay=mx1.example.com[1.2.3.4]:25, delay=0.5, status=sent (250 2.0.0 OK)",
        "Dec 30 12:35:00 mail01 postfix/qmgr[12347]: 4bG4VR5z1Tgz6pqsd: removed",
        "Dec 30 12:35:01 mail01 postfix/smtpd[12345]: disconnect from client1.example.com[192.168.1.100] ehlo=1 mail=1 rcpt=1 data=1 quit=1 commands=5",
        
        // 第二个邮件流程
        "Dec 30 12:35:02 mail01 postfix/smtpd[12345]: connect from client2.example.com[192.168.1.101]",
        "Dec 30 12:35:02 mail01 postfix/smtpd[12345]: 4bG4VR5z2Tgz6pqsd: client=client2.example.com[192.168.1.101]",
        "Dec 30 12:35:03 mail01 postfix/cleanup[12346]: 4bG4VR5z2Tgz6pqsd: message-id=<test2@example.com>",
        "Dec 30 12:35:04 mail01 postfix/qmgr[12347]: 4bG4VR5z2Tgz6pqsd: from=<sender2@example.com>, size=2345, nrcpt=2 (queue active)",
        "Dec 30 12:35:05 mail01 postfix/smtp[12348]: 4bG4VR5z2Tgz6pqsd: to=<recipient2@example.com>, relay=mx2.example.com[1.2.3.5]:25, delay=0.8, status=sent (250 2.0.0 OK)",
        "Dec 30 12:35:06 mail01 postfix/smtp[12348]: 4bG4VR5z2Tgz6pqsd: to=<recipient3@example.com>, relay=mx3.example.com[1.2.3.6]:25, delay=1.2, status=bounced (550 5.1.1 User unknown)",
        "Dec 30 12:35:07 mail01 postfix/bounce[12349]: 4bG4VR5z2Tgz6pqsd: sender non-delivery notification: 4bG4VR5z3Tgz6pqsd",
        "Dec 30 12:35:08 mail01 postfix/qmgr[12347]: 4bG4VR5z2Tgz6pqsd: removed",
        "Dec 30 12:35:09 mail01 postfix/smtpd[12345]: disconnect from client2.example.com[192.168.1.101] ehlo=1 mail=1 rcpt=2 data=1 quit=1 commands=6",
        
        // 错误和其他事件
        "Dec 30 12:35:10 mail01 postfix/master[1234]: reload -- version 3.6.4, configuration /etc/postfix",
        "Dec 30 12:35:11 mail01 postfix/anvil[12352]: statistics: max connection rate 5/60s for (smtp:192.168.1.100) at Dec 30 12:35:11",
        "Dec 30 12:35:12 mail01 postfix/error[12351]: 4bG4VR5z4Tgz6pqsd: to=<baduser@localhost>, relay=none, delay=30, status=bounced (mail for localhost loops back to myself)",
        "Dec 30 12:35:13 mail01 postfix/local[12350]: 4bG4VR5z5Tgz6pqsd: to=<localuser@localhost>, relay=local, delay=0.1, status=delivered (delivered to mailbox)",
    ];
    
    for log in sample_logs {
        writeln!(file, "{}", log)?;
    }
    
    println!("📝 已创建示例日志文件: {}", filename);
    Ok(())
}

/// 实时监控日志文件的变化
#[allow(dead_code)]
fn monitor_log_file(filename: &str) -> Result<(), Box<dyn std::error::Error>> {
    println!("👁️  开始监控日志文件: {}", filename);
    println!("   (这是一个简化的示例,实际应用中建议使用 inotify 等文件系统事件)");
    
    let mut last_size = 0;
    
    loop {
        if let Ok(metadata) = std::fs::metadata(filename) {
            let current_size = metadata.len();
            
            if current_size > last_size {
                println!("🔄 检测到文件更新,大小从 {} 增加到 {}", last_size, current_size);
                
                // 这里可以只读取新增的部分
                // 实际实现中需要更复杂的逻辑来处理日志轮转等情况
                
                last_size = current_size;
            }
        }
        
        std::thread::sleep(std::time::Duration::from_secs(1));
    }
}