postfix-log-parser 0.2.0

高性能模块化Postfix日志解析器,经3.2GB生产数据验证,SMTPD事件100%准确率
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
//! 队列管理器(qmgr)组件解析器
//!
//! 基于1,987万行真实生产数据分析开发
//! qmgr组件出现8,554,899次,占比43.1%,是最高频的组件

use crate::components::ComponentParser;
use crate::error::ParseError;
use crate::events::{ComponentEvent, QmgrEvent};
use crate::utils::common_fields::CommonFieldsParser;
use crate::utils::queue_id::{create_queue_id_pattern, extract_queue_id};
use lazy_static::lazy_static;
use regex::Regex;

/// 队列管理器解析器
///
/// 针对qmgr组件的各种日志格式进行解析
/// qmgr组件解析器
/// 解析顺序基于真实数据中的出现频率优化
pub struct QmgrParser;

lazy_static! {
    // 配置警告模式 - 最常见的qmgr日志
    static ref CONFIG_WARNING_REGEX: Regex = Regex::new(
        r"^warning:\s+(.+)"
    ).unwrap();

    // 邮件进入活动队列 - 核心业务事件
    static ref MESSAGE_ACTIVE_REGEX: Regex = Regex::new(
        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+size=(\d+),\s+nrcpt=(\d+)\s+\(queue active\)")
    ).unwrap();

    // 邮件从队列移除
    static ref MESSAGE_REMOVED_REGEX: Regex = Regex::new(
        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+removed(?:\s+\((.+)\))?")
    ).unwrap();

    // 延迟投递消息 - 包含详细状态信息
    static ref MESSAGE_DEFERRED_REGEX: Regex = Regex::new(
        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>(?:,\s+to=<([^>]*)>)?(?:,\s+relay=([^,]+))?(?:,\s+delay=([^,]+))?(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?(?:,\s+status=(.+))?")
    ).unwrap();

    // 投递成功消息
    static ref MESSAGE_SENT_REGEX: Regex = Regex::new(
        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,\s+relay=([^,]+),\s+delay=([^,]+)(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?,\s+status=sent\s+(.+)")
    ).unwrap();

    // 投递失败(退信)消息
    static ref MESSAGE_BOUNCED_REGEX: Regex = Regex::new(
        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,.*status=bounced\s+\((.+)\)(?:,\s+dsn=([^,\)]+))?")
    ).unwrap();

    // 队列统计信息
    static ref QUEUE_STATS_REGEX: Regex = Regex::new(
        r"^statistics:\s+active=(\d+)\s+deferred=(\d+)\s+hold=(\d+)\s+incoming=(\d+)\s+maildrop=(\d+)"
    ).unwrap();

    // 传输状态
    static ref TRANSPORT_STATUS_REGEX: Regex = Regex::new(
        r"^transport\s+([^:]+):\s+(.+)"
    ).unwrap();

    // 资源限制警告
    static ref RESOURCE_LIMIT_REGEX: Regex = Regex::new(
        r"^warning:\s+([^(]+)\s+\((\d+)\)\s+(.+)"
    ).unwrap();

    // 队列刷新操作
    static ref QUEUE_FLUSH_REGEX: Regex = Regex::new(
        r"^flush(?:ing)?\s+queue(?:\s+([^:]+))?(?::\s+(\d+)\s+messages)?"
    ).unwrap();
}

impl QmgrParser {
    pub fn new() -> Self {
        Self
    }

    /// 解析配置警告消息(最高频)
    /// 注意:MasterParser已经剥离了"warning:"前缀,所以这里直接检查内容
    fn parse_configuration_warning(&self, message: &str) -> Option<QmgrEvent> {
        // 检查是否为qmgr相关的配置警告
        if message.contains("qmgr_message_recipient_limit")
            || message.contains("qmgr_message_active_limit")
            || message.contains("process_limit")
            || (message.contains("queue")
                && (message.contains("limit") || message.contains("adjusting")))
        {
            // 分类警告类型
            let warning_type = if message.contains("qmgr_message_recipient_limit") {
                "recipient_limit_adjustment".to_string()
            } else if message.contains("qmgr_message_active_limit") {
                "active_limit_warning".to_string()
            } else if message.contains("process_limit") {
                "process_limit_warning".to_string()
            } else if message.contains("queue") {
                "queue_warning".to_string()
            } else {
                "general_warning".to_string()
            };

            return Some(QmgrEvent::ConfigurationWarning {
                warning_type,
                message: message.to_string(),
            });
        }
        None
    }

    /// 解析邮件进入活动队列事件(使用公共字段解析器)
    fn parse_message_active(&self, message: &str) -> Option<QmgrEvent> {
        if let Some(captures) = MESSAGE_ACTIVE_REGEX.captures(message) {
            let queue_id = captures.get(1).unwrap().as_str().to_string();

            // 使用公共字段解析器提取from字段
            let from = CommonFieldsParser::extract_from_email(message)
                .map(|email| email.address)
                .unwrap_or_else(|| captures.get(2).unwrap().as_str().to_string());

            // 使用公共字段解析器提取size字段
            let size = CommonFieldsParser::extract_size(message).unwrap_or_else(|| {
                captures
                    .get(3)
                    .unwrap()
                    .as_str()
                    .parse::<u64>()
                    .ok()
                    .unwrap_or(0)
            });

            let nrcpt = captures.get(4).unwrap().as_str().parse::<u32>().ok()?;

            return Some(QmgrEvent::MessageActive {
                queue_id,
                from,
                size,
                nrcpt,
            });
        }
        None
    }

    /// 解析邮件移除事件
    fn parse_message_removed(&self, message: &str) -> Option<QmgrEvent> {
        if let Some(captures) = MESSAGE_REMOVED_REGEX.captures(message) {
            let queue_id = captures.get(1).unwrap().as_str().to_string();
            let reason = captures.get(2).map(|m| m.as_str().to_string());

            return Some(QmgrEvent::MessageRemoved { queue_id, reason });
        }
        None
    }

    /// 解析邮件跳过事件
    fn parse_message_skipped(&self, message: &str) -> Option<QmgrEvent> {
        if message.contains("skipped") {
            // 解析格式: "411381C805FD: skipped, still being delivered"
            if let Some(queue_id_match) = message.split(':').next() {
                let queue_id = queue_id_match.trim().to_string();

                // 提取跳过原因
                let reason = if message.contains("still being delivered") {
                    "still being delivered".to_string()
                } else {
                    // 提取 "skipped" 后面的内容作为原因
                    message
                        .split("skipped,")
                        .nth(1)
                        .map(|s| s.trim().to_string())
                        .unwrap_or_else(|| "unknown reason".to_string())
                };

                // 提取额外状态信息
                let status_details = if message.contains(",") {
                    Some(
                        message
                            .split(',')
                            .skip(1)
                            .collect::<Vec<&str>>()
                            .join(",")
                            .trim()
                            .to_string(),
                    )
                } else {
                    None
                };

                return Some(QmgrEvent::MessageSkipped {
                    queue_id,
                    reason,
                    status_details,
                });
            }
        }
        None
    }

    /// 解析延迟投递事件(使用公共字段解析器)
    fn parse_message_deferred(&self, message: &str) -> Option<QmgrEvent> {
        if message.contains("status=deferred") {
            if let Some(captures) = MESSAGE_DEFERRED_REGEX.captures(message) {
                let queue_id = captures.get(1).unwrap().as_str().to_string();

                // 使用公共字段解析器提取字段
                let from = CommonFieldsParser::extract_from_email(message)
                    .map(|email| email.address)
                    .unwrap_or_else(|| captures.get(2).unwrap().as_str().to_string());

                let to = CommonFieldsParser::extract_to_email(message)
                    .map(|email| email.address)
                    .or_else(|| captures.get(3).map(|m| m.as_str().to_string()));

                let relay_info = CommonFieldsParser::extract_relay_info(message);
                let relay = relay_info
                    .as_ref()
                    .map(|r| {
                        format!(
                            "{}[{}]:{}",
                            r.hostname,
                            r.ip.as_deref().unwrap_or(""),
                            r.port.map_or(25, |p| p)
                        )
                    })
                    .or_else(|| captures.get(4).map(|m| m.as_str().to_string()));

                let delay_info = CommonFieldsParser::extract_delay_info(message);
                let delay = delay_info
                    .as_ref()
                    .map(|d| d.total.to_string())
                    .unwrap_or_else(|| {
                        captures
                            .get(5)
                            .map(|m| m.as_str().to_string())
                            .unwrap_or_default()
                    });

                let delays = delay_info
                    .as_ref()
                    .and_then(|d| d.breakdown.as_ref())
                    .map(|breakdown| {
                        format!(
                            "{}/{}/{}/{}",
                            breakdown[0], breakdown[1], breakdown[2], breakdown[3]
                        )
                    })
                    .or_else(|| captures.get(6).map(|m| m.as_str().to_string()));

                let status_info = CommonFieldsParser::extract_status_info(message);
                let dsn = status_info
                    .as_ref()
                    .and_then(|s| s.dsn.clone())
                    .or_else(|| captures.get(7).map(|m| m.as_str().to_string()));

                let status = status_info
                    .as_ref()
                    .map(|s| s.status.clone())
                    .unwrap_or_else(|| {
                        captures
                            .get(8)
                            .map(|m| m.as_str().to_string())
                            .unwrap_or_default()
                    });

                return Some(QmgrEvent::MessageDeferred {
                    queue_id,
                    from,
                    to,
                    relay,
                    delay,
                    delays,
                    dsn,
                    status,
                });
            }
        }
        None
    }

    /// 解析投递成功事件
    fn parse_message_sent(&self, message: &str) -> Option<QmgrEvent> {
        if let Some(captures) = MESSAGE_SENT_REGEX.captures(message) {
            let queue_id = captures.get(1).unwrap().as_str().to_string();
            let from = captures.get(2).unwrap().as_str().to_string();
            let to = captures.get(3).unwrap().as_str().to_string();
            let relay = captures.get(4).unwrap().as_str().to_string();
            let delay = captures.get(5).unwrap().as_str().to_string();
            let delays = captures.get(6).map(|m| m.as_str().to_string());
            let dsn = captures.get(7).map(|m| m.as_str().to_string());
            let status = captures.get(8).unwrap().as_str().to_string();

            return Some(QmgrEvent::MessageSent {
                queue_id,
                from,
                to,
                relay,
                delay,
                delays,
                dsn,
                status,
            });
        }
        None
    }

    /// 解析其他类型的qmgr事件
    fn parse_other_events(&self, message: &str) -> Option<QmgrEvent> {
        // 队列统计
        if let Some(captures) = QUEUE_STATS_REGEX.captures(message) {
            let active = captures.get(1).unwrap().as_str().parse::<u32>().ok();
            let deferred = captures.get(2).unwrap().as_str().parse::<u32>().ok();
            let hold = captures.get(3).unwrap().as_str().parse::<u32>().ok();
            let incoming = captures.get(4).unwrap().as_str().parse::<u32>().ok();
            let maildrop = captures.get(5).unwrap().as_str().parse::<u32>().ok();

            return Some(QmgrEvent::QueueStats {
                active,
                deferred,
                hold,
                incoming,
                maildrop,
            });
        }

        // 传输状态
        if let Some(captures) = TRANSPORT_STATUS_REGEX.captures(message) {
            let transport = captures.get(1).unwrap().as_str().to_string();
            let status = captures.get(2).unwrap().as_str().to_string();

            return Some(QmgrEvent::TransportStatus {
                transport,
                status,
                details: None,
            });
        }

        // 队列刷新
        if let Some(captures) = QUEUE_FLUSH_REGEX.captures(message) {
            let queue_name = captures.get(1).map(|m| m.as_str().to_string());
            let message_count = captures.get(2).and_then(|m| m.as_str().parse::<u32>().ok());

            return Some(QmgrEvent::QueueFlush {
                queue_name,
                message_count,
            });
        }

        None
    }
}

impl ComponentParser for QmgrParser {
    fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
        // 按照真实数据频率优化的解析顺序

        // 1. 配置警告(最常见)
        if let Some(event) = self.parse_configuration_warning(message) {
            return Ok(ComponentEvent::Qmgr(event));
        }

        // 2. 邮件进入活动队列(核心业务)
        if let Some(event) = self.parse_message_active(message) {
            return Ok(ComponentEvent::Qmgr(event));
        }

        // 3. 邮件移除
        if let Some(event) = self.parse_message_removed(message) {
            return Ok(ComponentEvent::Qmgr(event));
        }

        // 4. 邮件跳过
        if let Some(event) = self.parse_message_skipped(message) {
            return Ok(ComponentEvent::Qmgr(event));
        }

        // 5. 延迟投递
        if let Some(event) = self.parse_message_deferred(message) {
            return Ok(ComponentEvent::Qmgr(event));
        }

        // 6. 投递成功
        if let Some(event) = self.parse_message_sent(message) {
            return Ok(ComponentEvent::Qmgr(event));
        }

        // 7. 其他事件类型
        if let Some(event) = self.parse_other_events(message) {
            return Ok(ComponentEvent::Qmgr(event));
        }

        // 8. 如果都不匹配,创建Other事件
        let queue_id = extract_queue_id(message);

        Ok(ComponentEvent::Qmgr(QmgrEvent::Other {
            event_type: "unclassified".to_string(),
            message: message.to_string(),
            queue_id,
        }))
    }

    fn component_name(&self) -> &'static str {
        "qmgr"
    }
}

impl Default for QmgrParser {
    fn default() -> Self {
        Self::new()
    }
}