postfix-log-parser 0.2.0

高性能模块化Postfix日志解析器,经3.2GB生产数据验证,SMTPD事件100%准确率
Documentation
//! # Master 主控进程组件解析器
//!
//! Master 是 Postfix 的主控进程组件,负责:
//! - 管理和监控所有 Postfix 服务进程
//! - 处理进程生命周期和重启策略
//! - 监控服务限制和性能瓶颈
//! - 提供系统配置警告和优化建议
//!
//! ## 核心功能
//!
//! - **守护进程管理**: 启动、重载、停止 Postfix 服务
//! - **进程监控**: 跟踪子进程状态和异常
//! - **服务限制**: 监控并发连接和进程数限制
//! - **配置优化**: 提供性能调优建议
//!
//! ## 支持的事件类型
//!
//! - **守护进程生命周期**: 启动、重载配置事件
//! - **进程警告**: 进程崩溃、启动失败警告
//! - **服务限制**: 进程数量限制达到警告
//! - **配置警告**: 性能优化和配置建议
//!
//! ## 示例日志格式
//!
//! ```text
//! # 守护进程启动
//! daemon started -- version 3.6.4, configuration /etc/postfix
//!
//! # 进程警告
//! warning: process /usr/libexec/postfix/smtpd pid 12345 killed by signal 9
//! warning: service "smtpd" (unix:/private/smtpd) has reached its process limit "100": new clients may experience noticeable delays
//!
//! # 配置建议
//! warning: to avoid this condition, increase the process count in master.cf or reduce the service time per client
//! ```

use regex::Regex;
use crate::components::ComponentParser;
use crate::events::ComponentEvent;
use crate::events::base::{BaseEvent, PostfixLogLevel};
use crate::events::master::{
    MasterEvent, MasterLifecycleEvent, MasterProcessWarning, 
    MasterServiceLimit, MasterConfigWarning
};
use crate::error::ParseError;
use chrono::{DateTime, Utc};

pub struct MasterParser {
    // Daemon lifecycle
    daemon_started_regex: Regex,
    daemon_reload_regex: Regex,
    
    // Process warnings
    process_killed_regex: Regex,
    bad_command_startup_regex: Regex,
    
    // Service limits
    service_limit_regex: Regex,
    
    // Configuration warnings
    process_count_suggestion_regex: Regex,
    stress_config_reference_regex: Regex,
}

impl MasterParser {
    pub fn new() -> Self {
        MasterParser {
            // Daemon lifecycle
            daemon_started_regex: Regex::new(r"^daemon started -- version ([^,]+), configuration (.+)$").unwrap(),
            daemon_reload_regex: Regex::new(r"^reload -- version ([^,]+), configuration (.+)$").unwrap(),
            
            // Process warnings
            process_killed_regex: Regex::new(r"^warning: process (/[^/]+/[^/]+/[^/]+/([^/\s]+)) pid (\d+) killed by signal (\d+)$").unwrap(),
            bad_command_startup_regex: Regex::new(r"^warning: (/[^/]+/[^/]+/[^/]+/[^:]+): bad command startup -- throttling$").unwrap(),
            
            // Service limits
            service_limit_regex: Regex::new(r#"^warning: service "([^"]+)" \(([^)]+)\) has reached its process limit "(\d+)": new clients may experience noticeable delays$"#).unwrap(),
            
            // Configuration warnings
            process_count_suggestion_regex: Regex::new(r"^warning: to avoid this condition, increase the process count in master\.cf or reduce the service time per client$").unwrap(),
            stress_config_reference_regex: Regex::new(r"^warning: see (https?://[^\s]+) for examples of stress-adapting configuration settings$").unwrap(),
        }
    }

    fn create_base_event(&self, message: &str) -> BaseEvent {
        BaseEvent {
            timestamp: DateTime::parse_from_rfc3339("2024-04-27T16:20:48Z")
                .unwrap()
                .with_timezone(&Utc),
            hostname: "unknown".to_string(),
            component: "master".to_string(),
            process_id: 0,
            log_level: PostfixLogLevel::Info,
            raw_message: message.to_string(),
        }
    }

    fn parse_daemon_lifecycle(&self, message: &str) -> Option<MasterEvent> {
        let base = self.create_base_event(message);
        
        if let Some(caps) = self.daemon_started_regex.captures(message) {
            let version = caps.get(1).map(|m| m.as_str().to_string())
                .unwrap_or_else(|| "unknown".to_string());
            let configuration = caps.get(2).map(|m| m.as_str().to_string())
                .unwrap_or_else(|| "unknown".to_string());
            return Some(MasterEvent::DaemonLifecycle {
                base,
                lifecycle: MasterLifecycleEvent::Started { version, configuration },
            });
        }

        if let Some(caps) = self.daemon_reload_regex.captures(message) {
            let version = caps.get(1).map(|m| m.as_str().to_string())
                .unwrap_or_else(|| "unknown".to_string());
            let configuration = caps.get(2).map(|m| m.as_str().to_string())
                .unwrap_or_else(|| "unknown".to_string());
            return Some(MasterEvent::DaemonLifecycle {
                base,
                lifecycle: MasterLifecycleEvent::Reload { version, configuration },
            });
        }

        None
    }

    fn parse_process_warning(&self, message: &str) -> Option<MasterEvent> {
        let mut base = self.create_base_event(message);
        base.log_level = PostfixLogLevel::Warning;
        
        if let Some(caps) = self.process_killed_regex.captures(message) {
            let process_path = caps.get(1).map(|m| m.as_str().to_string())
                .unwrap_or_else(|| "unknown".to_string());
            let service = caps.get(2).map(|m| m.as_str().to_string())
                .unwrap_or_else(|| "unknown".to_string());
            let pid = caps.get(3).and_then(|m| m.as_str().parse::<u32>().ok())
                .unwrap_or(0);
            let signal = caps.get(4).and_then(|m| m.as_str().parse::<u8>().ok())
                .unwrap_or(0);
            return Some(MasterEvent::ProcessWarning {
                base,
                warning: MasterProcessWarning::ProcessKilled {
                    service,
                    process_path,
                    pid,
                    signal,
                },
            });
        }

        if let Some(caps) = self.bad_command_startup_regex.captures(message) {
            let full_path = caps.get(1).map(|m| m.as_str().to_string())
                .unwrap_or_else(|| "unknown".to_string());
            // Extract service name from path like "/usr/libexec/postfix/smtpd"
            let service = full_path.split('/').last().unwrap_or("unknown").to_string();
            return Some(MasterEvent::ProcessWarning {
                base,
                warning: MasterProcessWarning::BadCommandStartup { service },
            });
        }

        None
    }

    fn parse_service_limit(&self, message: &str) -> Option<MasterEvent> {
        let mut base = self.create_base_event(message);
        base.log_level = PostfixLogLevel::Warning;
        
        if let Some(caps) = self.service_limit_regex.captures(message) {
            let service = caps.get(1).map(|m| m.as_str().to_string())
                .unwrap_or_else(|| "unknown".to_string());
            let service_address = caps.get(2).map(|m| m.as_str().to_string())
                .unwrap_or_else(|| "unknown".to_string());
            let limit = caps.get(3).and_then(|m| m.as_str().parse::<u32>().ok())
                .unwrap_or(0);
            return Some(MasterEvent::ServiceLimit {
                base,
                limit: MasterServiceLimit::ProcessLimitReached {
                    service,
                    service_address,
                    limit,
                },
            });
        }

        None
    }

    fn parse_configuration_warning(&self, message: &str) -> Option<MasterEvent> {
        let mut base = self.create_base_event(message);
        base.log_level = PostfixLogLevel::Warning;
        
        if self.process_count_suggestion_regex.is_match(message) {
            return Some(MasterEvent::ConfigurationWarning {
                base,
                warning: MasterConfigWarning::ProcessCountSuggestion,
            });
        }

        if let Some(caps) = self.stress_config_reference_regex.captures(message) {
            let url = caps.get(1).map(|m| m.as_str().to_string())
                .unwrap_or_else(|| "unknown".to_string());
            return Some(MasterEvent::ConfigurationWarning {
                base,
                warning: MasterConfigWarning::StressConfigReference { url },
            });
        }

        None
    }
}

impl ComponentParser for MasterParser {
    fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
        // Try daemon lifecycle events first
        if let Some(event) = self.parse_daemon_lifecycle(message) {
            return Ok(ComponentEvent::Master(event));
        }
        
        // Try warnings (most common)
        if message.contains("warning:") {
            if let Some(event) = self.parse_process_warning(message) {
                return Ok(ComponentEvent::Master(event));
            }
            
            if let Some(event) = self.parse_service_limit(message) {
                return Ok(ComponentEvent::Master(event));
            }
            
            if let Some(event) = self.parse_configuration_warning(message) {
                return Ok(ComponentEvent::Master(event));
            }
        }

        Err(ParseError::ComponentParseError {
            component: "master".to_string(),
            reason: format!("Unsupported message format: {}", message),
        })
    }

    fn component_name(&self) -> &'static str {
        "master"
    }

    fn can_parse(&self, component: &str) -> bool {
        component == "master"
    }
}

impl Default for MasterParser {
    fn default() -> Self {
        Self::new()
    }
}