postfix_log_parser/components/
qmgr.rs

1//! 队列管理器(qmgr)组件解析器
2//!
3//! 基于1,987万行真实生产数据分析开发
4//! qmgr组件出现8,554,899次,占比43.1%,是最高频的组件
5
6use crate::components::ComponentParser;
7use crate::error::ParseError;
8use crate::events::{ComponentEvent, QmgrEvent};
9use crate::utils::queue_id::{create_queue_id_pattern, extract_queue_id};
10use lazy_static::lazy_static;
11use regex::Regex;
12
13/// 队列管理器解析器
14///
15/// 针对qmgr组件的各种日志格式进行解析
16/// qmgr组件解析器
17/// 解析顺序基于真实数据中的出现频率优化
18pub struct QmgrParser;
19
20lazy_static! {
21    // 配置警告模式 - 最常见的qmgr日志
22    // 示例: "warning: qmgr_message_recipient_limit is smaller than qmgr_message_active_limit - adjusting qmgr_message_recipient_limit"
23    static ref CONFIG_WARNING_REGEX: Regex = Regex::new(
24        r"^warning:\s+(.+)"
25    ).unwrap();
26
27    // 邮件进入活动队列 - 核心业务事件
28    // 示例: "4C79F1C801AD: from=<piggy@relaxcloud.cn>, size=23878, nrcpt=1 (queue active)"
29    static ref MESSAGE_ACTIVE_REGEX: Regex = Regex::new(
30        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+size=(\d+),\s+nrcpt=(\d+)\s+\(queue active\)")
31    ).unwrap();
32
33    // 邮件从队列移除
34    // 示例: "4C79F1C801AD: removed"
35    static ref MESSAGE_REMOVED_REGEX: Regex = Regex::new(
36        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+removed(?:\s+\((.+)\))?")
37    ).unwrap();
38
39    // 延迟投递消息 - 包含详细状态信息
40    // 示例: "4C79F1C801AD: from=<sender@example.com>, to=<recipient@example.com>, relay=mx.example.com[1.2.3.4]:25, delay=120, delays=1/2/100/17, dsn=4.4.2, status=deferred (Connection timed out)"
41    static ref MESSAGE_DEFERRED_REGEX: Regex = Regex::new(
42        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>(?:,\s+to=<([^>]*)>)?(?:,\s+relay=([^,]+))?(?:,\s+delay=([^,]+))?(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?(?:,\s+status=(.+))?")
43    ).unwrap();
44
45    // 投递成功消息
46    // 示例: "4C79F1C801AD: from=<sender@example.com>, to=<recipient@example.com>, relay=mx.example.com[1.2.3.4]:25, delay=5, delays=1/0/2/2, dsn=2.0.0, status=sent (250 OK)"
47    static ref MESSAGE_SENT_REGEX: Regex = Regex::new(
48        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,\s+relay=([^,]+),\s+delay=([^,]+)(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?,\s+status=sent\s+(.+)")
49    ).unwrap();
50
51    // 投递失败(退信)消息
52    // 示例: "4C79F1C801AD: from=<sender@example.com>, to=<recipient@example.com>, relay=none, delay=3600, dsn=5.4.4, status=bounced (Host not found)"
53    static ref MESSAGE_BOUNCED_REGEX: Regex = Regex::new(
54        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,.*status=bounced\s+\((.+)\)(?:,\s+dsn=([^,\)]+))?")
55    ).unwrap();
56
57    // 队列统计信息
58    // 示例: "statistics: active=10 deferred=20 hold=0 incoming=5 maildrop=2"
59    static ref QUEUE_STATS_REGEX: Regex = Regex::new(
60        r"^statistics:\s+active=(\d+)\s+deferred=(\d+)\s+hold=(\d+)\s+incoming=(\d+)\s+maildrop=(\d+)"
61    ).unwrap();
62
63    // 传输状态
64    // 示例: "transport smtp: enabling"
65    static ref TRANSPORT_STATUS_REGEX: Regex = Regex::new(
66        r"^transport\s+([^:]+):\s+(.+)"
67    ).unwrap();
68
69    // 资源限制警告
70    // 示例: "warning: process_limit (100) reached for transport smtp"
71    static ref RESOURCE_LIMIT_REGEX: Regex = Regex::new(
72        r"^warning:\s+([^(]+)\s+\((\d+)\)\s+(.+)"
73    ).unwrap();
74
75    // 队列刷新操作
76    // 示例: "flush queue"或"flushing queue: 10 messages"
77    static ref QUEUE_FLUSH_REGEX: Regex = Regex::new(
78        r"^flush(?:ing)?\s+queue(?:\s+([^:]+))?(?::\s+(\d+)\s+messages)?"
79    ).unwrap();
80}
81
82impl QmgrParser {
83    pub fn new() -> Self {
84        Self
85    }
86
87    /// 解析配置警告消息(最高频)
88    /// 注意:MasterParser已经剥离了"warning:"前缀,所以这里直接检查内容
89    fn parse_configuration_warning(&self, message: &str) -> Option<QmgrEvent> {
90        // 检查是否为qmgr相关的配置警告
91        if message.contains("qmgr_message_recipient_limit")
92            || message.contains("qmgr_message_active_limit")
93            || message.contains("process_limit")
94            || (message.contains("queue")
95                && (message.contains("limit") || message.contains("adjusting")))
96        {
97            // 分类警告类型
98            let warning_type = if message.contains("qmgr_message_recipient_limit") {
99                "recipient_limit_adjustment".to_string()
100            } else if message.contains("qmgr_message_active_limit") {
101                "active_limit_warning".to_string()
102            } else if message.contains("process_limit") {
103                "process_limit_warning".to_string()
104            } else if message.contains("queue") {
105                "queue_warning".to_string()
106            } else {
107                "general_warning".to_string()
108            };
109
110            return Some(QmgrEvent::ConfigurationWarning {
111                warning_type,
112                message: message.to_string(),
113            });
114        }
115        None
116    }
117
118    /// 解析邮件进入活动队列事件
119    fn parse_message_active(&self, message: &str) -> Option<QmgrEvent> {
120        if let Some(captures) = MESSAGE_ACTIVE_REGEX.captures(message) {
121            let queue_id = captures.get(1).unwrap().as_str().to_string();
122            let from = captures.get(2).unwrap().as_str().to_string();
123            let size = captures.get(3).unwrap().as_str().parse::<u64>().ok()?;
124            let nrcpt = captures.get(4).unwrap().as_str().parse::<u32>().ok()?;
125
126            return Some(QmgrEvent::MessageActive {
127                queue_id,
128                from,
129                size,
130                nrcpt,
131            });
132        }
133        None
134    }
135
136    /// 解析邮件移除事件
137    fn parse_message_removed(&self, message: &str) -> Option<QmgrEvent> {
138        if let Some(captures) = MESSAGE_REMOVED_REGEX.captures(message) {
139            let queue_id = captures.get(1).unwrap().as_str().to_string();
140            let reason = captures.get(2).map(|m| m.as_str().to_string());
141
142            return Some(QmgrEvent::MessageRemoved { queue_id, reason });
143        }
144        None
145    }
146
147    /// 解析邮件跳过事件
148    fn parse_message_skipped(&self, message: &str) -> Option<QmgrEvent> {
149        if message.contains("skipped") {
150            // 解析格式: "411381C805FD: skipped, still being delivered"
151            if let Some(queue_id_match) = message.split(':').next() {
152                let queue_id = queue_id_match.trim().to_string();
153
154                // 提取跳过原因
155                let reason = if message.contains("still being delivered") {
156                    "still being delivered".to_string()
157                } else {
158                    // 提取 "skipped" 后面的内容作为原因
159                    message
160                        .split("skipped,")
161                        .nth(1)
162                        .map(|s| s.trim().to_string())
163                        .unwrap_or_else(|| "unknown reason".to_string())
164                };
165
166                // 提取额外状态信息
167                let status_details = if message.contains(",") {
168                    Some(
169                        message
170                            .split(',')
171                            .skip(1)
172                            .collect::<Vec<&str>>()
173                            .join(",")
174                            .trim()
175                            .to_string(),
176                    )
177                } else {
178                    None
179                };
180
181                return Some(QmgrEvent::MessageSkipped {
182                    queue_id,
183                    reason,
184                    status_details,
185                });
186            }
187        }
188        None
189    }
190
191    /// 解析延迟投递事件
192    fn parse_message_deferred(&self, message: &str) -> Option<QmgrEvent> {
193        if message.contains("status=deferred") {
194            if let Some(captures) = MESSAGE_DEFERRED_REGEX.captures(message) {
195                let queue_id = captures.get(1).unwrap().as_str().to_string();
196                let from = captures.get(2).unwrap().as_str().to_string();
197                let to = captures.get(3).map(|m| m.as_str().to_string());
198                let relay = captures.get(4).map(|m| m.as_str().to_string());
199                let delay = captures
200                    .get(5)
201                    .map(|m| m.as_str().to_string())
202                    .unwrap_or_default();
203                let delays = captures.get(6).map(|m| m.as_str().to_string());
204                let dsn = captures.get(7).map(|m| m.as_str().to_string());
205                let status = captures
206                    .get(8)
207                    .map(|m| m.as_str().to_string())
208                    .unwrap_or_default();
209
210                return Some(QmgrEvent::MessageDeferred {
211                    queue_id,
212                    from,
213                    to,
214                    relay,
215                    delay,
216                    delays,
217                    dsn,
218                    status,
219                });
220            }
221        }
222        None
223    }
224
225    /// 解析投递成功事件
226    fn parse_message_sent(&self, message: &str) -> Option<QmgrEvent> {
227        if let Some(captures) = MESSAGE_SENT_REGEX.captures(message) {
228            let queue_id = captures.get(1).unwrap().as_str().to_string();
229            let from = captures.get(2).unwrap().as_str().to_string();
230            let to = captures.get(3).unwrap().as_str().to_string();
231            let relay = captures.get(4).unwrap().as_str().to_string();
232            let delay = captures.get(5).unwrap().as_str().to_string();
233            let delays = captures.get(6).map(|m| m.as_str().to_string());
234            let dsn = captures.get(7).map(|m| m.as_str().to_string());
235            let status = captures.get(8).unwrap().as_str().to_string();
236
237            return Some(QmgrEvent::MessageSent {
238                queue_id,
239                from,
240                to,
241                relay,
242                delay,
243                delays,
244                dsn,
245                status,
246            });
247        }
248        None
249    }
250
251    /// 解析其他类型的qmgr事件
252    fn parse_other_events(&self, message: &str) -> Option<QmgrEvent> {
253        // 队列统计
254        if let Some(captures) = QUEUE_STATS_REGEX.captures(message) {
255            let active = captures.get(1).unwrap().as_str().parse::<u32>().ok();
256            let deferred = captures.get(2).unwrap().as_str().parse::<u32>().ok();
257            let hold = captures.get(3).unwrap().as_str().parse::<u32>().ok();
258            let incoming = captures.get(4).unwrap().as_str().parse::<u32>().ok();
259            let maildrop = captures.get(5).unwrap().as_str().parse::<u32>().ok();
260
261            return Some(QmgrEvent::QueueStats {
262                active,
263                deferred,
264                hold,
265                incoming,
266                maildrop,
267            });
268        }
269
270        // 传输状态
271        if let Some(captures) = TRANSPORT_STATUS_REGEX.captures(message) {
272            let transport = captures.get(1).unwrap().as_str().to_string();
273            let status = captures.get(2).unwrap().as_str().to_string();
274
275            return Some(QmgrEvent::TransportStatus {
276                transport,
277                status,
278                details: None,
279            });
280        }
281
282        // 队列刷新
283        if let Some(captures) = QUEUE_FLUSH_REGEX.captures(message) {
284            let queue_name = captures.get(1).map(|m| m.as_str().to_string());
285            let message_count = captures.get(2).and_then(|m| m.as_str().parse::<u32>().ok());
286
287            return Some(QmgrEvent::QueueFlush {
288                queue_name,
289                message_count,
290            });
291        }
292
293        None
294    }
295}
296
297impl ComponentParser for QmgrParser {
298    fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
299        // 按照真实数据频率优化的解析顺序
300
301        // 1. 配置警告(最常见)
302        if let Some(event) = self.parse_configuration_warning(message) {
303            return Ok(ComponentEvent::Qmgr(event));
304        }
305
306        // 2. 邮件进入活动队列(核心业务)
307        if let Some(event) = self.parse_message_active(message) {
308            return Ok(ComponentEvent::Qmgr(event));
309        }
310
311        // 3. 邮件移除
312        if let Some(event) = self.parse_message_removed(message) {
313            return Ok(ComponentEvent::Qmgr(event));
314        }
315
316        // 4. 邮件跳过
317        if let Some(event) = self.parse_message_skipped(message) {
318            return Ok(ComponentEvent::Qmgr(event));
319        }
320
321        // 5. 延迟投递
322        if let Some(event) = self.parse_message_deferred(message) {
323            return Ok(ComponentEvent::Qmgr(event));
324        }
325
326        // 6. 投递成功
327        if let Some(event) = self.parse_message_sent(message) {
328            return Ok(ComponentEvent::Qmgr(event));
329        }
330
331        // 7. 其他事件类型
332        if let Some(event) = self.parse_other_events(message) {
333            return Ok(ComponentEvent::Qmgr(event));
334        }
335
336        // 8. 如果都不匹配,创建Other事件
337        let queue_id = extract_queue_id(message);
338
339        Ok(ComponentEvent::Qmgr(QmgrEvent::Other {
340            event_type: "unclassified".to_string(),
341            message: message.to_string(),
342            queue_id,
343        }))
344    }
345
346    fn component_name(&self) -> &'static str {
347        "qmgr"
348    }
349}
350
351impl Default for QmgrParser {
352    fn default() -> Self {
353        Self::new()
354    }
355}