postfix_log_parser/components/
qmgr.rs

1//! 队列管理器(qmgr)组件解析器
2//!
3//! 基于1,987万行真实生产数据分析开发
4//! qmgr组件出现8,554,899次,占比43.1%,是最高频的组件
5
6use crate::components::ComponentParser;
7use crate::error::ParseError;
8use crate::events::{ComponentEvent, QmgrEvent};
9use crate::utils::common_fields::CommonFieldsParser;
10use crate::utils::queue_id::{create_queue_id_pattern, extract_queue_id};
11use lazy_static::lazy_static;
12use regex::Regex;
13
14/// 队列管理器解析器
15///
16/// 针对qmgr组件的各种日志格式进行解析
17/// qmgr组件解析器
18/// 解析顺序基于真实数据中的出现频率优化
19pub struct QmgrParser;
20
21lazy_static! {
22    // 配置警告模式 - 最常见的qmgr日志
23    static ref CONFIG_WARNING_REGEX: Regex = Regex::new(
24        r"^warning:\s+(.+)"
25    ).unwrap();
26
27    // 邮件进入活动队列 - 核心业务事件
28    static ref MESSAGE_ACTIVE_REGEX: Regex = Regex::new(
29        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+size=(\d+),\s+nrcpt=(\d+)\s+\(queue active\)")
30    ).unwrap();
31
32    // 邮件从队列移除
33    static ref MESSAGE_REMOVED_REGEX: Regex = Regex::new(
34        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+removed(?:\s+\((.+)\))?")
35    ).unwrap();
36
37    // 延迟投递消息 - 包含详细状态信息
38    static ref MESSAGE_DEFERRED_REGEX: Regex = Regex::new(
39        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>(?:,\s+to=<([^>]*)>)?(?:,\s+relay=([^,]+))?(?:,\s+delay=([^,]+))?(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?(?:,\s+status=(.+))?")
40    ).unwrap();
41
42    // 投递成功消息
43    static ref MESSAGE_SENT_REGEX: Regex = Regex::new(
44        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,\s+relay=([^,]+),\s+delay=([^,]+)(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?,\s+status=sent\s+(.+)")
45    ).unwrap();
46
47    // 投递失败(退信)消息
48    static ref MESSAGE_BOUNCED_REGEX: Regex = Regex::new(
49        &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,.*status=bounced\s+\((.+)\)(?:,\s+dsn=([^,\)]+))?")
50    ).unwrap();
51
52    // 队列统计信息
53    static ref QUEUE_STATS_REGEX: Regex = Regex::new(
54        r"^statistics:\s+active=(\d+)\s+deferred=(\d+)\s+hold=(\d+)\s+incoming=(\d+)\s+maildrop=(\d+)"
55    ).unwrap();
56
57    // 传输状态
58    static ref TRANSPORT_STATUS_REGEX: Regex = Regex::new(
59        r"^transport\s+([^:]+):\s+(.+)"
60    ).unwrap();
61
62    // 资源限制警告
63    static ref RESOURCE_LIMIT_REGEX: Regex = Regex::new(
64        r"^warning:\s+([^(]+)\s+\((\d+)\)\s+(.+)"
65    ).unwrap();
66
67    // 队列刷新操作
68    static ref QUEUE_FLUSH_REGEX: Regex = Regex::new(
69        r"^flush(?:ing)?\s+queue(?:\s+([^:]+))?(?::\s+(\d+)\s+messages)?"
70    ).unwrap();
71}
72
73impl QmgrParser {
74    pub fn new() -> Self {
75        Self
76    }
77
78    /// 解析配置警告消息(最高频)
79    /// 注意:MasterParser已经剥离了"warning:"前缀,所以这里直接检查内容
80    fn parse_configuration_warning(&self, message: &str) -> Option<QmgrEvent> {
81        // 检查是否为qmgr相关的配置警告
82        if message.contains("qmgr_message_recipient_limit")
83            || message.contains("qmgr_message_active_limit")
84            || message.contains("process_limit")
85            || (message.contains("queue")
86                && (message.contains("limit") || message.contains("adjusting")))
87        {
88            // 分类警告类型
89            let warning_type = if message.contains("qmgr_message_recipient_limit") {
90                "recipient_limit_adjustment".to_string()
91            } else if message.contains("qmgr_message_active_limit") {
92                "active_limit_warning".to_string()
93            } else if message.contains("process_limit") {
94                "process_limit_warning".to_string()
95            } else if message.contains("queue") {
96                "queue_warning".to_string()
97            } else {
98                "general_warning".to_string()
99            };
100
101            return Some(QmgrEvent::ConfigurationWarning {
102                warning_type,
103                message: message.to_string(),
104            });
105        }
106        None
107    }
108
109    /// 解析邮件进入活动队列事件(使用公共字段解析器)
110    fn parse_message_active(&self, message: &str) -> Option<QmgrEvent> {
111        if let Some(captures) = MESSAGE_ACTIVE_REGEX.captures(message) {
112            let queue_id = captures.get(1).unwrap().as_str().to_string();
113
114            // 使用公共字段解析器提取from字段
115            let from = CommonFieldsParser::extract_from_email(message)
116                .map(|email| email.address)
117                .unwrap_or_else(|| captures.get(2).unwrap().as_str().to_string());
118
119            // 使用公共字段解析器提取size字段
120            let size = CommonFieldsParser::extract_size(message).unwrap_or_else(|| {
121                captures
122                    .get(3)
123                    .unwrap()
124                    .as_str()
125                    .parse::<u64>()
126                    .ok()
127                    .unwrap_or(0)
128            });
129
130            let nrcpt = captures.get(4).unwrap().as_str().parse::<u32>().ok()?;
131
132            return Some(QmgrEvent::MessageActive {
133                queue_id,
134                from,
135                size,
136                nrcpt,
137            });
138        }
139        None
140    }
141
142    /// 解析邮件移除事件
143    fn parse_message_removed(&self, message: &str) -> Option<QmgrEvent> {
144        if let Some(captures) = MESSAGE_REMOVED_REGEX.captures(message) {
145            let queue_id = captures.get(1).unwrap().as_str().to_string();
146            let reason = captures.get(2).map(|m| m.as_str().to_string());
147
148            return Some(QmgrEvent::MessageRemoved { queue_id, reason });
149        }
150        None
151    }
152
153    /// 解析邮件跳过事件
154    fn parse_message_skipped(&self, message: &str) -> Option<QmgrEvent> {
155        if message.contains("skipped") {
156            // 解析格式: "411381C805FD: skipped, still being delivered"
157            if let Some(queue_id_match) = message.split(':').next() {
158                let queue_id = queue_id_match.trim().to_string();
159
160                // 提取跳过原因
161                let reason = if message.contains("still being delivered") {
162                    "still being delivered".to_string()
163                } else {
164                    // 提取 "skipped" 后面的内容作为原因
165                    message
166                        .split("skipped,")
167                        .nth(1)
168                        .map(|s| s.trim().to_string())
169                        .unwrap_or_else(|| "unknown reason".to_string())
170                };
171
172                // 提取额外状态信息
173                let status_details = if message.contains(",") {
174                    Some(
175                        message
176                            .split(',')
177                            .skip(1)
178                            .collect::<Vec<&str>>()
179                            .join(",")
180                            .trim()
181                            .to_string(),
182                    )
183                } else {
184                    None
185                };
186
187                return Some(QmgrEvent::MessageSkipped {
188                    queue_id,
189                    reason,
190                    status_details,
191                });
192            }
193        }
194        None
195    }
196
197    /// 解析延迟投递事件(使用公共字段解析器)
198    fn parse_message_deferred(&self, message: &str) -> Option<QmgrEvent> {
199        if message.contains("status=deferred") {
200            if let Some(captures) = MESSAGE_DEFERRED_REGEX.captures(message) {
201                let queue_id = captures.get(1).unwrap().as_str().to_string();
202
203                // 使用公共字段解析器提取字段
204                let from = CommonFieldsParser::extract_from_email(message)
205                    .map(|email| email.address)
206                    .unwrap_or_else(|| captures.get(2).unwrap().as_str().to_string());
207
208                let to = CommonFieldsParser::extract_to_email(message)
209                    .map(|email| email.address)
210                    .or_else(|| captures.get(3).map(|m| m.as_str().to_string()));
211
212                let relay_info = CommonFieldsParser::extract_relay_info(message);
213                let relay = relay_info
214                    .as_ref()
215                    .map(|r| {
216                        format!(
217                            "{}[{}]:{}",
218                            r.hostname,
219                            r.ip.as_deref().unwrap_or(""),
220                            r.port.map_or(25, |p| p)
221                        )
222                    })
223                    .or_else(|| captures.get(4).map(|m| m.as_str().to_string()));
224
225                let delay_info = CommonFieldsParser::extract_delay_info(message);
226                let delay = delay_info
227                    .as_ref()
228                    .map(|d| d.total.to_string())
229                    .unwrap_or_else(|| {
230                        captures
231                            .get(5)
232                            .map(|m| m.as_str().to_string())
233                            .unwrap_or_default()
234                    });
235
236                let delays = delay_info
237                    .as_ref()
238                    .and_then(|d| d.breakdown.as_ref())
239                    .map(|breakdown| {
240                        format!(
241                            "{}/{}/{}/{}",
242                            breakdown[0], breakdown[1], breakdown[2], breakdown[3]
243                        )
244                    })
245                    .or_else(|| captures.get(6).map(|m| m.as_str().to_string()));
246
247                let status_info = CommonFieldsParser::extract_status_info(message);
248                let dsn = status_info
249                    .as_ref()
250                    .and_then(|s| s.dsn.clone())
251                    .or_else(|| captures.get(7).map(|m| m.as_str().to_string()));
252
253                let status = status_info
254                    .as_ref()
255                    .map(|s| s.status.clone())
256                    .unwrap_or_else(|| {
257                        captures
258                            .get(8)
259                            .map(|m| m.as_str().to_string())
260                            .unwrap_or_default()
261                    });
262
263                return Some(QmgrEvent::MessageDeferred {
264                    queue_id,
265                    from,
266                    to,
267                    relay,
268                    delay,
269                    delays,
270                    dsn,
271                    status,
272                });
273            }
274        }
275        None
276    }
277
278    /// 解析投递成功事件
279    fn parse_message_sent(&self, message: &str) -> Option<QmgrEvent> {
280        if let Some(captures) = MESSAGE_SENT_REGEX.captures(message) {
281            let queue_id = captures.get(1).unwrap().as_str().to_string();
282            let from = captures.get(2).unwrap().as_str().to_string();
283            let to = captures.get(3).unwrap().as_str().to_string();
284            let relay = captures.get(4).unwrap().as_str().to_string();
285            let delay = captures.get(5).unwrap().as_str().to_string();
286            let delays = captures.get(6).map(|m| m.as_str().to_string());
287            let dsn = captures.get(7).map(|m| m.as_str().to_string());
288            let status = captures.get(8).unwrap().as_str().to_string();
289
290            return Some(QmgrEvent::MessageSent {
291                queue_id,
292                from,
293                to,
294                relay,
295                delay,
296                delays,
297                dsn,
298                status,
299            });
300        }
301        None
302    }
303
304    /// 解析其他类型的qmgr事件
305    fn parse_other_events(&self, message: &str) -> Option<QmgrEvent> {
306        // 队列统计
307        if let Some(captures) = QUEUE_STATS_REGEX.captures(message) {
308            let active = captures.get(1).unwrap().as_str().parse::<u32>().ok();
309            let deferred = captures.get(2).unwrap().as_str().parse::<u32>().ok();
310            let hold = captures.get(3).unwrap().as_str().parse::<u32>().ok();
311            let incoming = captures.get(4).unwrap().as_str().parse::<u32>().ok();
312            let maildrop = captures.get(5).unwrap().as_str().parse::<u32>().ok();
313
314            return Some(QmgrEvent::QueueStats {
315                active,
316                deferred,
317                hold,
318                incoming,
319                maildrop,
320            });
321        }
322
323        // 传输状态
324        if let Some(captures) = TRANSPORT_STATUS_REGEX.captures(message) {
325            let transport = captures.get(1).unwrap().as_str().to_string();
326            let status = captures.get(2).unwrap().as_str().to_string();
327
328            return Some(QmgrEvent::TransportStatus {
329                transport,
330                status,
331                details: None,
332            });
333        }
334
335        // 队列刷新
336        if let Some(captures) = QUEUE_FLUSH_REGEX.captures(message) {
337            let queue_name = captures.get(1).map(|m| m.as_str().to_string());
338            let message_count = captures.get(2).and_then(|m| m.as_str().parse::<u32>().ok());
339
340            return Some(QmgrEvent::QueueFlush {
341                queue_name,
342                message_count,
343            });
344        }
345
346        None
347    }
348}
349
350impl ComponentParser for QmgrParser {
351    fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
352        // 按照真实数据频率优化的解析顺序
353
354        // 1. 配置警告(最常见)
355        if let Some(event) = self.parse_configuration_warning(message) {
356            return Ok(ComponentEvent::Qmgr(event));
357        }
358
359        // 2. 邮件进入活动队列(核心业务)
360        if let Some(event) = self.parse_message_active(message) {
361            return Ok(ComponentEvent::Qmgr(event));
362        }
363
364        // 3. 邮件移除
365        if let Some(event) = self.parse_message_removed(message) {
366            return Ok(ComponentEvent::Qmgr(event));
367        }
368
369        // 4. 邮件跳过
370        if let Some(event) = self.parse_message_skipped(message) {
371            return Ok(ComponentEvent::Qmgr(event));
372        }
373
374        // 5. 延迟投递
375        if let Some(event) = self.parse_message_deferred(message) {
376            return Ok(ComponentEvent::Qmgr(event));
377        }
378
379        // 6. 投递成功
380        if let Some(event) = self.parse_message_sent(message) {
381            return Ok(ComponentEvent::Qmgr(event));
382        }
383
384        // 7. 其他事件类型
385        if let Some(event) = self.parse_other_events(message) {
386            return Ok(ComponentEvent::Qmgr(event));
387        }
388
389        // 8. 如果都不匹配,创建Other事件
390        let queue_id = extract_queue_id(message);
391
392        Ok(ComponentEvent::Qmgr(QmgrEvent::Other {
393            event_type: "unclassified".to_string(),
394            message: message.to_string(),
395            queue_id,
396        }))
397    }
398
399    fn component_name(&self) -> &'static str {
400        "qmgr"
401    }
402}
403
404impl Default for QmgrParser {
405    fn default() -> Self {
406        Self::new()
407    }
408}