postfix_log_parser/
master_parser.rs

1use chrono::{DateTime, Datelike, NaiveDateTime, Utc};
2use log::{debug, warn};
3use regex::Regex;
4use std::collections::HashMap;
5
6use crate::components::{master::MasterParser as MasterComponentParser, ComponentParser};
7
8use crate::error::{ParseError, ParseResult};
9use crate::events::base::ComponentEvent;
10use crate::events::base::{PostfixLogEvent, UnknownEvent};
11
12/// 主解析器
13///
14/// 对应Postfix的master进程,负责识别组件并分发到相应的解析器
15pub struct MasterParser {
16    /// 组件解析器注册表
17    ///
18    /// 存储所有已注册的组件解析器,以组件名为键进行索引
19    /// 每个Postfix组件都有对应的专用解析器来处理其特定的日志格式
20    component_parsers: HashMap<String, Box<dyn ComponentParser>>,
21
22    /// 用于解析基础日志格式的正则表达式
23    ///
24    /// 匹配标准Postfix日志格式:时间戳 主机名 组件名\[进程ID\]: 消息
25    /// 例如:"Dec 30 12:34:56 mail01 postfix/smtpd\[12345\]: connect from client\[192.168.1.100\]"
26    base_log_regex: Regex,
27}
28
29impl MasterParser {
30    /// 创建新的主解析器实例
31    pub fn new() -> Self {
32        let mut parser = Self {
33            component_parsers: HashMap::new(),
34            base_log_regex: Regex::new(
35                r"^((?:\d{4}\s+)?\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?)\s+(\S+)\s+postfix/([^\[\]]+)\[(\d+)\]:\s+(.+)$",
36            )
37            .expect("基础日志正则表达式编译失败"),
38        };
39
40        // 注册默认组件解析器
41        parser.register_default_parsers();
42        parser
43    }
44
45    /// 注册默认的组件解析器
46    fn register_default_parsers(&mut self) {
47        use crate::components::*;
48
49        self.register_parser("smtpd", Box::new(smtpd::SmtpdParser::new()));
50        self.register_parser("qmgr", Box::new(qmgr::QmgrParser::new()));
51        self.register_parser("smtp", Box::new(smtp::SmtpParser::new()));
52        self.register_parser("cleanup", Box::new(cleanup::CleanupParser::new()));
53        self.register_parser("error", Box::new(error::ErrorParser::new()));
54        self.register_parser("relay", Box::new(relay::RelayParser::new()));
55        self.register_parser("relay/smtp", Box::new(relay::RelayParser::new()));
56        self.register_parser("discard", Box::new(discard::DiscardParser::new()));
57        self.register_parser("bounce", Box::new(bounce::BounceParser::new()));
58        self.register_parser(
59            "postfix-script",
60            Box::new(postfix_script::PostfixScriptParser::new()),
61        );
62        self.register_parser("master", Box::new(MasterComponentParser::new()));
63        self.register_parser("local", Box::new(LocalParser::new()));
64        self.register_parser("postmap", Box::new(PostmapParser::new()));
65        self.register_parser("postsuper", Box::new(PostsuperParser::new().unwrap()));
66        self.register_parser("anvil", Box::new(anvil::AnvilParser::new()));
67        self.register_parser("pickup", Box::new(pickup::PickupParser::new()));
68        self.register_parser(
69            "trivial-rewrite",
70            Box::new(trivial_rewrite::TrivialRewriteParser::new()),
71        );
72        self.register_parser("postlogd", Box::new(postlogd::PostlogdParser::new()));
73        self.register_parser("proxymap", Box::new(proxymap::ProxymapParser::new()));
74        self.register_parser("sendmail", Box::new(sendmail::SendmailParser::new()));
75        self.register_parser("virtual", Box::new(virtual_parser::VirtualParser::new()));
76    }
77
78    /// 注册组件解析器
79    ///
80    /// 允许用户注册自定义组件解析器
81    pub fn register_parser(&mut self, component: &str, parser: Box<dyn ComponentParser>) {
82        debug!("注册组件解析器: {}", component);
83        self.component_parsers.insert(component.to_string(), parser);
84    }
85
86    /// 解析单行日志
87    ///
88    /// 这是主要的解析入口点
89    pub fn parse(&self, log_line: &str) -> ParseResult {
90        debug!("开始解析日志行: {}", log_line);
91
92        // 1. 解析基础格式
93        let base_info = match self.parse_base_format(log_line) {
94            Ok(info) => info,
95            Err(e) => return ParseResult::failure(e),
96        };
97
98        // 2. 查找对应的组件解析器
99        let parser = match self.component_parsers.get(&base_info.component) {
100            Some(parser) => parser,
101            None => {
102                warn!("未找到组件解析器: {}", base_info.component);
103                return self.create_unknown_event(log_line, base_info);
104            }
105        };
106
107        // 3. 使用组件解析器解析具体事件
108        match parser.parse(&base_info.message) {
109            Ok(event) => {
110                let postfix_event = PostfixLogEvent::new(
111                    log_line.to_string(),
112                    base_info.timestamp,
113                    base_info.hostname,
114                    base_info.component,
115                    base_info.process_id,
116                    base_info.log_level,
117                    event,
118                    None, // 队列ID将在具体解析器中提取
119                );
120
121                ParseResult::success(postfix_event, 1.0)
122            }
123            Err(e) => {
124                warn!(
125                    "组件解析失败: {} - {}, 创建Unknown事件",
126                    base_info.component, e
127                );
128                // 当组件解析失败时,创建Unknown事件但保留日志等级信息
129                self.create_unknown_event(log_line, base_info)
130            }
131        }
132    }
133
134    /// 解析日志的基础格式
135    fn parse_base_format(&self, log_line: &str) -> Result<BaseLogInfo, ParseError> {
136        let captures =
137            self.base_log_regex
138                .captures(log_line)
139                .ok_or_else(|| ParseError::InvalidLogFormat {
140                    reason: "不匹配标准Postfix日志格式".to_string(),
141                })?;
142
143        let timestamp_str = captures.get(1).unwrap().as_str();
144        let hostname = captures.get(2).unwrap().as_str().to_string();
145        let component = captures.get(3).unwrap().as_str().to_string();
146        let process_id = captures
147            .get(4)
148            .unwrap()
149            .as_str()
150            .parse::<u32>()
151            .map_err(|_| ParseError::InvalidLogFormat {
152                reason: "无效的进程ID".to_string(),
153            })?;
154        let raw_message = captures.get(5).unwrap().as_str().to_string();
155
156        // 解析时间戳(简单实用的实现,满足当前需求)
157        let timestamp = self.parse_timestamp(timestamp_str)?;
158
159        // 提取日志等级和消息内容
160        let (log_level, message) = self.extract_log_level_and_message(&raw_message);
161
162        Ok(BaseLogInfo {
163            timestamp,
164            hostname,
165            component,
166            process_id,
167            log_level,
168            message,
169            raw_message,
170        })
171    }
172
173    /// 解析时间戳
174    fn parse_timestamp(&self, timestamp_str: &str) -> Result<DateTime<Utc>, ParseError> {
175        // 新格式:2025 Jun 16 11:06:14.897961
176        // 旧格式兼容:Jun 16 11:06:14
177
178        // 首先尝试解析新格式(包含年份和可选毫秒)
179        if let Ok(naive_dt) = NaiveDateTime::parse_from_str(timestamp_str, "%Y %b %d %H:%M:%S%.f") {
180            return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
181        }
182
183        // 尝试解析新格式但没有毫秒
184        if let Ok(naive_dt) = NaiveDateTime::parse_from_str(timestamp_str, "%Y %b %d %H:%M:%S") {
185            return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
186        }
187
188        // 兼容旧格式(没有年份),使用当前年份
189        let current_year = chrono::Utc::now().year();
190        let datetime_str = format!("{} {}", current_year, timestamp_str);
191
192        // 尝试解析旧格式(带毫秒)
193        if let Ok(naive_dt) = NaiveDateTime::parse_from_str(&datetime_str, "%Y %b %d %H:%M:%S%.f") {
194            return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
195        }
196
197        // 尝试解析旧格式(不带毫秒)
198        let naive_dt =
199            NaiveDateTime::parse_from_str(&datetime_str, "%Y %b %d %H:%M:%S").map_err(|_e| {
200                ParseError::InvalidTimestamp {
201                    timestamp: timestamp_str.to_string(),
202                }
203            })?;
204
205        Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc))
206    }
207
208    /// 提取日志等级和消息内容
209    fn extract_log_level_and_message(
210        &self,
211        raw_message: &str,
212    ) -> (crate::events::base::PostfixLogLevel, String) {
213        // 检查消息是否以日志等级前缀开始
214        if let Some(first_word_end) = raw_message.find(' ') {
215            let first_word = &raw_message[..first_word_end + 1]; // 包含空格或冒号
216            if let Some(level) =
217                crate::events::base::PostfixLogLevel::from_prefix(first_word.trim())
218            {
219                let message = raw_message[first_word_end + 1..].to_string();
220                return (level, message);
221            }
222        }
223
224        // 如果没有明确的等级前缀,检查是否以 "warning:" 等开头
225        if raw_message.starts_with("warning:") {
226            let message = raw_message
227                .strip_prefix("warning:")
228                .unwrap_or(raw_message)
229                .trim()
230                .to_string();
231            return (crate::events::base::PostfixLogLevel::Warning, message);
232        }
233
234        // 默认为Info级别
235        (
236            crate::events::base::PostfixLogLevel::Info,
237            raw_message.to_string(),
238        )
239    }
240
241    /// 创建未知事件
242    fn create_unknown_event(&self, log_line: &str, base_info: BaseLogInfo) -> ParseResult {
243        let unknown_event = UnknownEvent {
244            component: base_info.component.clone(),
245            message: base_info.message.clone(),
246        };
247
248        let component_name = base_info.component.clone();
249        let postfix_event = PostfixLogEvent::new(
250            log_line.to_string(),
251            base_info.timestamp,
252            base_info.hostname,
253            base_info.component,
254            base_info.process_id,
255            base_info.log_level,
256            ComponentEvent::Unknown(unknown_event),
257            None,
258        );
259
260        ParseResult::partial(
261            postfix_event,
262            0.3,
263            vec![format!("未识别的组件: {}", component_name)],
264        )
265    }
266
267    /// 获取已注册的组件列表
268    pub fn registered_components(&self) -> Vec<&String> {
269        self.component_parsers.keys().collect()
270    }
271
272    /// 公共方法:解析基础日志格式(用于测试)
273    pub fn parse_base_info(&self, log_line: &str) -> Result<BaseLogInfo, ParseError> {
274        self.parse_base_format(log_line)
275    }
276}
277
278impl Default for MasterParser {
279    fn default() -> Self {
280        Self::new()
281    }
282}
283
284/// 基础日志信息
285///
286/// 从原始日志行中解析出的基础字段,是所有组件事件的共同基础
287#[derive(Debug, Clone, PartialEq)]
288pub struct BaseLogInfo {
289    /// 日志时间戳(UTC时间)
290    /// 从日志行开头的时间信息解析而来,自动转换为UTC时间
291    pub timestamp: DateTime<Utc>,
292
293    /// 主机名(产生日志的服务器名称)
294    /// 从日志行中的第二个字段提取,用于区分不同的邮件服务器
295    pub hostname: String,
296
297    /// Postfix组件名称(产生此日志的具体组件)
298    /// 如:smtpd, qmgr, smtp, cleanup等,对应Postfix架构中的不同模块
299    pub component: String,
300
301    /// 进程ID(产生日志的进程标识符)
302    /// 从方括号内提取,用于区分同一组件的不同进程实例
303    pub process_id: u32,
304
305    /// 日志等级(消息的重要性和严重程度)
306    /// 从消息内容中识别或默认为Info级别
307    pub log_level: crate::events::base::PostfixLogLevel,
308
309    /// 处理后的消息内容(去除日志等级前缀后的净消息)
310    /// 这是组件解析器实际处理的内容
311    pub message: String,
312
313    /// 原始消息内容(从冒号后的完整消息)
314    /// 保留用于调试和追溯
315    pub raw_message: String,
316}