postfix-log-parser 0.2.0

高性能模块化Postfix日志解析器,经3.2GB生产数据验证,SMTPD事件100%准确率
Documentation
use chrono::{DateTime, Datelike, NaiveDateTime, Utc};
use log::{debug, warn};
use regex::Regex;
use std::collections::HashMap;

use crate::components::{master::MasterParser as MasterComponentParser, ComponentParser};

use crate::error::{ParseError, ParseResult};
use crate::events::base::ComponentEvent;
use crate::events::base::{PostfixLogEvent, UnknownEvent};

/// 主解析器
///
/// 对应Postfix的master进程,负责识别组件并分发到相应的解析器
pub struct MasterParser {
    /// 组件解析器注册表
    ///
    /// 存储所有已注册的组件解析器,以组件名为键进行索引
    /// 每个Postfix组件都有对应的专用解析器来处理其特定的日志格式
    component_parsers: HashMap<String, Box<dyn ComponentParser>>,

    /// 用于解析基础日志格式的正则表达式
    ///
    /// 匹配标准Postfix日志格式:时间戳 主机名 组件名\[进程ID\]: 消息
    /// 例如:"Dec 30 12:34:56 mail01 postfix/smtpd\[12345\]: connect from client\[192.168.1.100\]"
    base_log_regex: Regex,
}

impl MasterParser {
    /// 创建新的主解析器实例
    pub fn new() -> Self {
        let mut parser = Self {
            component_parsers: HashMap::new(),
            base_log_regex: Regex::new(
                r"^((?:\d{4}\s+)?\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?)\s+(\S+)\s+postfix/([^\[\]]+)\[(\d+)\]:\s+(.+)$",
            )
            .expect("基础日志正则表达式编译失败"),
        };

        // 注册默认组件解析器
        parser.register_default_parsers();
        parser
    }

    /// 注册默认的组件解析器
    fn register_default_parsers(&mut self) {
        use crate::components::*;

        self.register_parser("smtpd", Box::new(smtpd::SmtpdParser::new()));
        self.register_parser("qmgr", Box::new(qmgr::QmgrParser::new()));
        self.register_parser("smtp", Box::new(smtp::SmtpParser::new()));
        self.register_parser("cleanup", Box::new(cleanup::CleanupParser::new()));
        self.register_parser("error", Box::new(error::ErrorParser::new()));
        self.register_parser("relay", Box::new(relay::RelayParser::new()));
        self.register_parser("relay/smtp", Box::new(relay::RelayParser::new()));
        self.register_parser("discard", Box::new(discard::DiscardParser::new()));
        self.register_parser("bounce", Box::new(bounce::BounceParser::new()));
        self.register_parser(
            "postfix-script",
            Box::new(postfix_script::PostfixScriptParser::new()),
        );
        self.register_parser("master", Box::new(MasterComponentParser::new()));
        self.register_parser("local", Box::new(LocalParser::new()));
        self.register_parser("postmap", Box::new(PostmapParser::new()));
        self.register_parser("postsuper", Box::new(PostsuperParser::new().unwrap()));
        self.register_parser("anvil", Box::new(anvil::AnvilParser::new()));
        self.register_parser("pickup", Box::new(pickup::PickupParser::new()));
        self.register_parser(
            "trivial-rewrite",
            Box::new(trivial_rewrite::TrivialRewriteParser::new()),
        );
        self.register_parser("postlogd", Box::new(postlogd::PostlogdParser::new()));
        self.register_parser("proxymap", Box::new(proxymap::ProxymapParser::new()));
        self.register_parser("sendmail", Box::new(sendmail::SendmailParser::new()));
        self.register_parser("virtual", Box::new(virtual_parser::VirtualParser::new()));
    }

    /// 注册组件解析器
    ///
    /// 允许用户注册自定义组件解析器
    pub fn register_parser(&mut self, component: &str, parser: Box<dyn ComponentParser>) {
        debug!("注册组件解析器: {}", component);
        self.component_parsers.insert(component.to_string(), parser);
    }

    /// 解析单行日志
    ///
    /// 这是主要的解析入口点
    pub fn parse(&self, log_line: &str) -> ParseResult {
        debug!("开始解析日志行: {}", log_line);

        // 1. 解析基础格式
        let base_info = match self.parse_base_format(log_line) {
            Ok(info) => info,
            Err(e) => return ParseResult::failure(e),
        };

        // 2. 查找对应的组件解析器
        let parser = match self.component_parsers.get(&base_info.component) {
            Some(parser) => parser,
            None => {
                warn!("未找到组件解析器: {}", base_info.component);
                return self.create_unknown_event(log_line, base_info);
            }
        };

        // 3. 使用组件解析器解析具体事件
        match parser.parse(&base_info.message) {
            Ok(event) => {
                let postfix_event = PostfixLogEvent::new(
                    log_line.to_string(),
                    base_info.timestamp,
                    base_info.hostname,
                    base_info.component,
                    base_info.process_id,
                    base_info.log_level,
                    event,
                    None, // 队列ID将在具体解析器中提取
                );

                ParseResult::success(postfix_event, 1.0)
            }
            Err(e) => {
                warn!(
                    "组件解析失败: {} - {}, 创建Unknown事件",
                    base_info.component, e
                );
                // 当组件解析失败时,创建Unknown事件但保留日志等级信息
                self.create_unknown_event(log_line, base_info)
            }
        }
    }

    /// 解析日志的基础格式
    fn parse_base_format(&self, log_line: &str) -> Result<BaseLogInfo, ParseError> {
        let captures =
            self.base_log_regex
                .captures(log_line)
                .ok_or_else(|| ParseError::InvalidLogFormat {
                    reason: "不匹配标准Postfix日志格式".to_string(),
                })?;

        let timestamp_str = captures.get(1).unwrap().as_str();
        let hostname = captures.get(2).unwrap().as_str().to_string();
        let component = captures.get(3).unwrap().as_str().to_string();
        let process_id = captures
            .get(4)
            .unwrap()
            .as_str()
            .parse::<u32>()
            .map_err(|_| ParseError::InvalidLogFormat {
                reason: "无效的进程ID".to_string(),
            })?;
        let raw_message = captures.get(5).unwrap().as_str().to_string();

        // 解析时间戳(简单实用的实现,满足当前需求)
        let timestamp = self.parse_timestamp(timestamp_str)?;

        // 提取日志等级和消息内容
        let (log_level, message) = self.extract_log_level_and_message(&raw_message);

        Ok(BaseLogInfo {
            timestamp,
            hostname,
            component,
            process_id,
            log_level,
            message,
            raw_message,
        })
    }

    /// 解析时间戳
    fn parse_timestamp(&self, timestamp_str: &str) -> Result<DateTime<Utc>, ParseError> {
        // 新格式:2025 Jun 16 11:06:14.897961
        // 旧格式兼容:Jun 16 11:06:14

        // 首先尝试解析新格式(包含年份和可选毫秒)
        if let Ok(naive_dt) = NaiveDateTime::parse_from_str(timestamp_str, "%Y %b %d %H:%M:%S%.f") {
            return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
        }

        // 尝试解析新格式但没有毫秒
        if let Ok(naive_dt) = NaiveDateTime::parse_from_str(timestamp_str, "%Y %b %d %H:%M:%S") {
            return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
        }

        // 兼容旧格式(没有年份),使用当前年份
        let current_year = chrono::Utc::now().year();
        let datetime_str = format!("{} {}", current_year, timestamp_str);

        // 尝试解析旧格式(带毫秒)
        if let Ok(naive_dt) = NaiveDateTime::parse_from_str(&datetime_str, "%Y %b %d %H:%M:%S%.f") {
            return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
        }

        // 尝试解析旧格式(不带毫秒)
        let naive_dt =
            NaiveDateTime::parse_from_str(&datetime_str, "%Y %b %d %H:%M:%S").map_err(|_e| {
                ParseError::InvalidTimestamp {
                    timestamp: timestamp_str.to_string(),
                }
            })?;

        Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc))
    }

    /// 提取日志等级和消息内容
    fn extract_log_level_and_message(
        &self,
        raw_message: &str,
    ) -> (crate::events::base::PostfixLogLevel, String) {
        // 检查消息是否以日志等级前缀开始
        if let Some(first_word_end) = raw_message.find(' ') {
            let first_word = &raw_message[..first_word_end + 1]; // 包含空格或冒号
            if let Some(level) =
                crate::events::base::PostfixLogLevel::from_prefix(first_word.trim())
            {
                let message = raw_message[first_word_end + 1..].to_string();
                return (level, message);
            }
        }

        // 如果没有明确的等级前缀,检查是否以 "warning:" 等开头
        if raw_message.starts_with("warning:") {
            let message = raw_message
                .strip_prefix("warning:")
                .unwrap_or(raw_message)
                .trim()
                .to_string();
            return (crate::events::base::PostfixLogLevel::Warning, message);
        }

        // 默认为Info级别
        (
            crate::events::base::PostfixLogLevel::Info,
            raw_message.to_string(),
        )
    }

    /// 创建未知事件
    fn create_unknown_event(&self, log_line: &str, base_info: BaseLogInfo) -> ParseResult {
        let unknown_event = UnknownEvent {
            component: base_info.component.clone(),
            message: base_info.message.clone(),
        };

        let component_name = base_info.component.clone();
        let postfix_event = PostfixLogEvent::new(
            log_line.to_string(),
            base_info.timestamp,
            base_info.hostname,
            base_info.component,
            base_info.process_id,
            base_info.log_level,
            ComponentEvent::Unknown(unknown_event),
            None,
        );

        ParseResult::partial(
            postfix_event,
            0.3,
            vec![format!("未识别的组件: {}", component_name)],
        )
    }

    /// 获取已注册的组件列表
    pub fn registered_components(&self) -> Vec<&String> {
        self.component_parsers.keys().collect()
    }

    /// 公共方法:解析基础日志格式(用于测试)
    pub fn parse_base_info(&self, log_line: &str) -> Result<BaseLogInfo, ParseError> {
        self.parse_base_format(log_line)
    }
}

impl Default for MasterParser {
    fn default() -> Self {
        Self::new()
    }
}

/// 基础日志信息
///
/// 从原始日志行中解析出的基础字段,是所有组件事件的共同基础
#[derive(Debug, Clone, PartialEq)]
pub struct BaseLogInfo {
    /// 日志时间戳(UTC时间)
    /// 从日志行开头的时间信息解析而来,自动转换为UTC时间
    pub timestamp: DateTime<Utc>,

    /// 主机名(产生日志的服务器名称)
    /// 从日志行中的第二个字段提取,用于区分不同的邮件服务器
    pub hostname: String,

    /// Postfix组件名称(产生此日志的具体组件)
    /// 如:smtpd, qmgr, smtp, cleanup等,对应Postfix架构中的不同模块
    pub component: String,

    /// 进程ID(产生日志的进程标识符)
    /// 从方括号内提取,用于区分同一组件的不同进程实例
    pub process_id: u32,

    /// 日志等级(消息的重要性和严重程度)
    /// 从消息内容中识别或默认为Info级别
    pub log_level: crate::events::base::PostfixLogLevel,

    /// 处理后的消息内容(去除日志等级前缀后的净消息)
    /// 这是组件解析器实际处理的内容
    pub message: String,

    /// 原始消息内容(从冒号后的完整消息)
    /// 保留用于调试和追溯
    pub raw_message: String,
}