postfix_log_parser/components/
cleanup.rs

1//! 清理(cleanup)组件解析器
2
3use crate::components::ComponentParser;
4use crate::error::ParseError;
5use crate::events::cleanup::CleanupEvent;
6use crate::events::ComponentEvent;
7use crate::utils::common_fields::CommonFieldsParser;
8use crate::utils::queue_id::create_queue_id_pattern;
9use lazy_static::lazy_static;
10use regex::Regex;
11
12/// Cleanup组件解析器
13/// 基于896,788个真实生产数据分析,cleanup组件占4.5%的日志
14/// 解析顺序按照真实数据频率优化
15pub struct CleanupParser;
16
17lazy_static! {
18    // 1. Message-ID事件 - 最高频事件,优先解析 (支持带括号和不带括号格式)
19    static ref MESSAGE_ID_REGEX: Regex = Regex::new(
20        &create_queue_id_pattern(r"^{QUEUE_ID}: message-id=(?:<([^>]+)>|([^,\s]+))$")
21    ).unwrap();
22
23    // 2. 队列文件警告 - 第二高频,系统问题相关
24    static ref QUEUE_FILE_WARNING_REGEX: Regex = Regex::new(
25        r"^([^:]+): create file ([^:]+): (.+)$"
26    ).unwrap();
27
28    // 3. 邮件大小信息
29    static ref MESSAGE_SIZE_REGEX: Regex = Regex::new(
30        &create_queue_id_pattern(r"^{QUEUE_ID}: size=(\d+)$")
31    ).unwrap();
32
33    // 4. 邮件头处理
34    static ref HEADER_PROCESSING_REGEX: Regex = Regex::new(
35        &create_queue_id_pattern(r"^{QUEUE_ID}: header ([^:]+): (.+)$")
36    ).unwrap();
37
38    // 5. 地址重写
39    static ref ADDRESS_REWRITE_REGEX: Regex = Regex::new(
40        &create_queue_id_pattern(r"^{QUEUE_ID}: (from|to)=<([^>]+)> -> <([^>]+)>$")
41    ).unwrap();
42
43    // 6. 邮件内容重写
44    static ref MESSAGE_REWRITE_REGEX: Regex = Regex::new(
45        &create_queue_id_pattern(r"^{QUEUE_ID}: rewrite: (.+)$")
46    ).unwrap();
47
48    // 7. 过滤器动作
49    static ref FILTER_ACTION_REGEX: Regex = Regex::new(
50        &create_queue_id_pattern(r"^{QUEUE_ID}: filter ([^:]+): (.+)$")
51    ).unwrap();
52
53    // 8. Milter交互
54    static ref MILTER_INTERACTION_REGEX: Regex = Regex::new(
55        &create_queue_id_pattern(r"^{QUEUE_ID}: milter ([^:]+): (.+)$")
56    ).unwrap();
57
58    // 9. 邮件拒绝
59    static ref MESSAGE_REJECT_REGEX: Regex = Regex::new(
60        &create_queue_id_pattern(r"^{QUEUE_ID}: reject: (.+)$")
61    ).unwrap();
62
63    // 10. 配置警告
64    static ref CONFIG_WARNING_REGEX: Regex = Regex::new(
65        r"^warning: (.+)$"
66    ).unwrap();
67
68    // 11. 资源限制
69    static ref RESOURCE_LIMIT_REGEX: Regex = Regex::new(
70        r"^warning: ([^:]+): (.+)$"
71    ).unwrap();
72
73    // 12. 邮件隔离/保留事件
74    static ref MESSAGE_HOLD_REGEX: Regex = Regex::new(
75        &create_queue_id_pattern(r"^{QUEUE_ID}: hold: (.+)$")
76    ).unwrap();
77
78    // 13. 邮件丢弃事件
79    static ref MESSAGE_DISCARD_REGEX: Regex = Regex::new(
80        &create_queue_id_pattern(r"^{QUEUE_ID}: discard: (.+)$")
81    ).unwrap();
82
83    // 14. 邮件移除事件
84    static ref MESSAGE_REMOVED_REGEX: Regex = Regex::new(
85        &create_queue_id_pattern(r"^{QUEUE_ID}: removed \(([^)]+)\)(?:\s*(.*))?$")
86    ).unwrap();
87
88        // 15. 统计信息
89    static ref STATISTICS_REGEX: Regex = Regex::new(
90        r"^statistics: processed=(\d+) rejected=(\d+)(?:\s+errors=(\d+))?$"
91    ).unwrap();
92
93    // 16. Snowflake ID生成器初始化
94    static ref SNOWFLAKE_INIT_REGEX: Regex = Regex::new(
95        r"^snowflake: initialized with node_id=(\d+), node_bits=(\d+), seq_bits=(\d+)$"
96    ).unwrap();
97
98    // MessageHold 事件专用的正则表达式(仅保留cleanup特定的)
99    static ref HOLD_CLIENT_REGEX: Regex = Regex::new(r"from ([^:;\s]+\[[^\]]+\]:?\d*)").unwrap();
100}
101
102impl CleanupParser {
103    pub fn new() -> Self {
104        CleanupParser
105    }
106
107    /// 解析Message-ID事件 - 最高频事件 (支持带括号和不带括号格式)
108    fn parse_message_id(&self, message: &str) -> Option<CleanupEvent> {
109        if let Some(captures) = MESSAGE_ID_REGEX.captures(message) {
110            let queue_id = captures.get(1)?.as_str().to_string();
111
112            // 检查带尖括号的格式 (第2个捕获组)
113            let message_id = if let Some(bracketed) = captures.get(2) {
114                bracketed.as_str().to_string()
115            }
116            // 检查不带尖括号的格式 (第3个捕获组)
117            else if let Some(unbracketed) = captures.get(3) {
118                unbracketed.as_str().to_string()
119            } else {
120                return None;
121            };
122
123            return Some(CleanupEvent::MessageId {
124                queue_id,
125                message_id,
126            });
127        }
128        None
129    }
130
131    /// 解析队列文件警告
132    fn parse_queue_file_warning(&self, message: &str) -> Option<CleanupEvent> {
133        if let Some(captures) = QUEUE_FILE_WARNING_REGEX.captures(message) {
134            return Some(CleanupEvent::QueueFileWarning {
135                operation: captures.get(1)?.as_str().to_string(),
136                file_path: captures.get(2)?.as_str().to_string(),
137                error_reason: captures.get(3)?.as_str().to_string(),
138            });
139        }
140        None
141    }
142
143    /// 解析邮件大小信息
144    fn parse_message_size(&self, message: &str) -> Option<CleanupEvent> {
145        if let Some(captures) = MESSAGE_SIZE_REGEX.captures(message) {
146            if let Ok(size) = captures.get(2)?.as_str().parse::<u64>() {
147                return Some(CleanupEvent::MessageSize {
148                    queue_id: captures.get(1)?.as_str().to_string(),
149                    size,
150                });
151            }
152        }
153        None
154    }
155
156    /// 解析邮件头处理
157    fn parse_header_processing(&self, message: &str) -> Option<CleanupEvent> {
158        if let Some(captures) = HEADER_PROCESSING_REGEX.captures(message) {
159            return Some(CleanupEvent::HeaderProcessing {
160                queue_id: captures.get(1)?.as_str().to_string(),
161                header_name: captures.get(2)?.as_str().to_string(),
162                header_value: captures.get(3)?.as_str().to_string(),
163                action: "process".to_string(), // 默认动作
164            });
165        }
166        None
167    }
168
169    /// 解析地址重写
170    fn parse_address_rewrite(&self, message: &str) -> Option<CleanupEvent> {
171        if let Some(captures) = ADDRESS_REWRITE_REGEX.captures(message) {
172            return Some(CleanupEvent::AddressRewrite {
173                queue_id: captures.get(1)?.as_str().to_string(),
174                address_type: captures.get(2)?.as_str().to_string(),
175                original_address: captures.get(3)?.as_str().to_string(),
176                rewritten_address: captures.get(4)?.as_str().to_string(),
177            });
178        }
179        None
180    }
181
182    /// 解析邮件内容重写
183    fn parse_message_rewrite(&self, message: &str) -> Option<CleanupEvent> {
184        if let Some(captures) = MESSAGE_REWRITE_REGEX.captures(message) {
185            return Some(CleanupEvent::MessageRewrite {
186                queue_id: captures.get(1)?.as_str().to_string(),
187                rewrite_type: "content".to_string(),
188                original: "".to_string(), // 需要更详细的解析
189                rewritten: captures.get(2)?.as_str().to_string(),
190            });
191        }
192        None
193    }
194
195    /// 解析过滤器动作
196    fn parse_filter_action(&self, message: &str) -> Option<CleanupEvent> {
197        if let Some(captures) = FILTER_ACTION_REGEX.captures(message) {
198            return Some(CleanupEvent::FilterAction {
199                queue_id: captures.get(1)?.as_str().to_string(),
200                filter_name: captures.get(2)?.as_str().to_string(),
201                action: captures.get(3)?.as_str().to_string(),
202                details: None,
203            });
204        }
205        None
206    }
207
208    /// 解析Milter交互
209    fn parse_milter_interaction(&self, message: &str) -> Option<CleanupEvent> {
210        if let Some(captures) = MILTER_INTERACTION_REGEX.captures(message) {
211            return Some(CleanupEvent::MilterInteraction {
212                queue_id: captures.get(1)?.as_str().to_string(),
213                milter_name: captures.get(2)?.as_str().to_string(),
214                command: "interaction".to_string(),
215                response: Some(captures.get(3)?.as_str().to_string()),
216            });
217        }
218        None
219    }
220
221    /// 解析邮件拒绝
222    fn parse_message_reject(&self, message: &str) -> Option<CleanupEvent> {
223        if let Some(captures) = MESSAGE_REJECT_REGEX.captures(message) {
224            return Some(CleanupEvent::MessageReject {
225                queue_id: captures.get(1)?.as_str().to_string(),
226                reason: captures.get(2)?.as_str().to_string(),
227                action: "reject".to_string(),
228            });
229        }
230        None
231    }
232
233    /// 解析配置警告
234    fn parse_config_warning(&self, message: &str) -> Option<CleanupEvent> {
235        if let Some(captures) = CONFIG_WARNING_REGEX.captures(message) {
236            let warning_msg = captures.get(1)?.as_str();
237
238            // 先检查是否是资源限制类警告
239            if warning_msg.contains("disk")
240                || warning_msg.contains("memory")
241                || warning_msg.contains("queue")
242                || warning_msg.contains("limit")
243            {
244                return Some(CleanupEvent::ResourceLimit {
245                    resource_type: "unknown".to_string(),
246                    limit_details: warning_msg.to_string(),
247                    current_value: None,
248                    limit_value: None,
249                });
250            }
251
252            return Some(CleanupEvent::ConfigurationWarning {
253                warning_type: "cleanup_config".to_string(),
254                message: warning_msg.to_string(),
255            });
256        }
257        None
258    }
259
260    /// 解析邮件隔离/保留事件
261    fn parse_message_hold(&self, message: &str) -> Option<CleanupEvent> {
262        let captures = MESSAGE_HOLD_REGEX.captures(message)?;
263        let queue_id = captures.get(1)?.as_str().to_string();
264        let hold_details = captures.get(2)?.as_str();
265
266        // 解析隔离原因
267        let hold_reason = self.extract_hold_reason(hold_details);
268
269        // 使用公共字段解析器提取各字段信息
270        let sender =
271            CommonFieldsParser::extract_from_email(hold_details).map(|email| email.address);
272
273        let recipient =
274            CommonFieldsParser::extract_to_email(hold_details).map(|email| email.address);
275
276        let (client_hostname, client_ip, client_port) = self.extract_client_info(hold_details);
277
278        let protocol = CommonFieldsParser::extract_protocol(hold_details);
279
280        let helo = CommonFieldsParser::extract_helo(hold_details);
281
282        // 提取描述信息(通常在冒号后面)
283        let description = if let Some(desc_start) = hold_details.rfind(": ") {
284            hold_details[(desc_start + 2)..].to_string()
285        } else {
286            hold_details.to_string()
287        };
288
289        Some(CleanupEvent::MessageHold {
290            queue_id,
291            hold_reason,
292            sender,
293            recipient,
294            client_ip,
295            client_hostname,
296            client_port,
297            protocol,
298            helo,
299            description,
300        })
301    }
302
303    /// 提取隔离原因的辅助函数
304    fn extract_hold_reason(&self, hold_details: &str) -> String {
305        if hold_details.contains("header X-Decision-Result: Quarantine") {
306            "X-Decision-Result: Quarantine".to_string()
307        } else if hold_details.contains("hold") {
308            "hold".to_string()
309        } else {
310            "unknown".to_string()
311        }
312    }
313
314    /// 提取并解析客户端信息的辅助函数
315    fn extract_client_info(
316        &self,
317        hold_details: &str,
318    ) -> (Option<String>, Option<String>, Option<u16>) {
319        let client_info_str = match HOLD_CLIENT_REGEX
320            .captures(hold_details)
321            .and_then(|c| c.get(1))
322            .map(|m| m.as_str())
323        {
324            Some(s) => s,
325            None => return (None, None, None),
326        };
327
328        // 使用公共字段解析器解析客户端信息
329        CommonFieldsParser::extract_client_info_simple(client_info_str)
330            .map(|client| (Some(client.hostname), Some(client.ip), client.port))
331            .unwrap_or((None, None, None))
332    }
333
334    /// 解析邮件丢弃事件
335    fn parse_message_discard(&self, message: &str) -> Option<CleanupEvent> {
336        let captures = MESSAGE_DISCARD_REGEX.captures(message)?;
337        let queue_id = captures.get(1)?.as_str().to_string();
338        let discard_details = captures.get(2)?.as_str();
339
340        // 解析丢弃原因
341        let discard_reason = self.extract_discard_reason(discard_details);
342
343        // 使用公共字段解析器提取各字段信息
344        let sender =
345            CommonFieldsParser::extract_from_email(discard_details).map(|email| email.address);
346
347        let recipient =
348            CommonFieldsParser::extract_to_email(discard_details).map(|email| email.address);
349
350        let (client_hostname, client_ip, client_port) = self.extract_client_info(discard_details);
351
352        let protocol = CommonFieldsParser::extract_protocol(discard_details);
353
354        let helo = CommonFieldsParser::extract_helo(discard_details);
355
356        // 提取描述信息(通常在冒号后面)
357        let description = if let Some(desc_start) = discard_details.rfind(": ") {
358            discard_details[(desc_start + 2)..].to_string()
359        } else {
360            discard_details.to_string()
361        };
362
363        Some(CleanupEvent::MessageDiscard {
364            queue_id,
365            discard_reason,
366            sender,
367            recipient,
368            client_ip,
369            client_hostname,
370            client_port,
371            protocol,
372            helo,
373            description,
374        })
375    }
376
377    /// 解析邮件移除事件
378    fn parse_message_removed(&self, message: &str) -> Option<CleanupEvent> {
379        let captures = MESSAGE_REMOVED_REGEX.captures(message)?;
380        let queue_id = captures.get(1)?.as_str().to_string();
381        let removal_reason = captures.get(2)?.as_str().to_string();
382        let details = captures.get(3).map(|m| m.as_str().to_string());
383
384        Some(CleanupEvent::MessageRemoved {
385            queue_id,
386            removal_reason,
387            details,
388        })
389    }
390
391    /// 提取丢弃原因的辅助函数
392    fn extract_discard_reason(&self, discard_details: &str) -> String {
393        if discard_details.contains("header X-Decision-Result: Discard") {
394            "X-Decision-Result: Discard".to_string()
395        } else if discard_details.contains("discard") {
396            "discard".to_string()
397        } else {
398            "unknown".to_string()
399        }
400    }
401
402    /// 解析统计信息
403    fn parse_statistics(&self, message: &str) -> Option<CleanupEvent> {
404        if let Some(captures) = STATISTICS_REGEX.captures(message) {
405            let processed = captures.get(1)?.as_str().parse::<u32>().ok();
406            let rejected = captures.get(2)?.as_str().parse::<u32>().ok();
407            let errors = captures.get(3).and_then(|m| m.as_str().parse::<u32>().ok());
408
409            return Some(CleanupEvent::Statistics {
410                processed,
411                rejected,
412                errors,
413            });
414        }
415        None
416    }
417
418    /// 解析Snowflake初始化事件
419    fn parse_snowflake_init(&self, message: &str) -> Option<CleanupEvent> {
420        if let Some(captures) = SNOWFLAKE_INIT_REGEX.captures(message) {
421            let node_id = captures.get(1)?.as_str().parse::<u32>().ok()?;
422            let node_bits = captures.get(2)?.as_str().parse::<u32>().ok()?;
423            let seq_bits = captures.get(3)?.as_str().parse::<u32>().ok()?;
424
425            return Some(CleanupEvent::SnowflakeInit {
426                node_id,
427                node_bits,
428                seq_bits,
429            });
430        }
431        None
432    }
433}
434
435impl ComponentParser for CleanupParser {
436    fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
437        // 按照真实数据频率优化的解析顺序
438        // 1. Message-ID事件 - 最高频,约占90%的cleanup日志
439        if let Some(event) = self.parse_message_id(message) {
440            return Ok(ComponentEvent::Cleanup(event));
441        }
442
443        // 2. 队列文件警告 - 系统问题,重要性高
444        if let Some(event) = self.parse_queue_file_warning(message) {
445            return Ok(ComponentEvent::Cleanup(event));
446        }
447
448        // 3. 邮件大小信息
449        if let Some(event) = self.parse_message_size(message) {
450            return Ok(ComponentEvent::Cleanup(event));
451        }
452
453        // 4. 邮件头处理
454        if let Some(event) = self.parse_header_processing(message) {
455            return Ok(ComponentEvent::Cleanup(event));
456        }
457
458        // 5. 地址重写
459        if let Some(event) = self.parse_address_rewrite(message) {
460            return Ok(ComponentEvent::Cleanup(event));
461        }
462
463        // 6. 邮件内容重写
464        if let Some(event) = self.parse_message_rewrite(message) {
465            return Ok(ComponentEvent::Cleanup(event));
466        }
467
468        // 7. 过滤器动作
469        if let Some(event) = self.parse_filter_action(message) {
470            return Ok(ComponentEvent::Cleanup(event));
471        }
472
473        // 8. Milter交互
474        if let Some(event) = self.parse_milter_interaction(message) {
475            return Ok(ComponentEvent::Cleanup(event));
476        }
477
478        // 9. 邮件拒绝
479        if let Some(event) = self.parse_message_reject(message) {
480            return Ok(ComponentEvent::Cleanup(event));
481        }
482
483        // 10. 配置警告和资源限制
484        if let Some(event) = self.parse_config_warning(message) {
485            return Ok(ComponentEvent::Cleanup(event));
486        }
487
488        // 11. 邮件隔离/保留事件
489        if let Some(event) = self.parse_message_hold(message) {
490            return Ok(ComponentEvent::Cleanup(event));
491        }
492
493        // 12. 邮件丢弃事件
494        if let Some(event) = self.parse_message_discard(message) {
495            return Ok(ComponentEvent::Cleanup(event));
496        }
497
498        // 13. 邮件移除事件
499        if let Some(event) = self.parse_message_removed(message) {
500            return Ok(ComponentEvent::Cleanup(event));
501        }
502
503        // 14. 统计信息
504        if let Some(event) = self.parse_statistics(message) {
505            return Ok(ComponentEvent::Cleanup(event));
506        }
507
508        // 15. Snowflake初始化事件
509        if let Some(event) = self.parse_snowflake_init(message) {
510            return Ok(ComponentEvent::Cleanup(event));
511        }
512
513        // 16. 未识别的事件归类为Other
514        Ok(ComponentEvent::Cleanup(CleanupEvent::Other {
515            event_type: "unknown".to_string(),
516            message: message.to_string(),
517            queue_id: None, // 尝试从消息中提取队列ID
518        }))
519    }
520
521    fn component_name(&self) -> &'static str {
522        "cleanup"
523    }
524}
525
526impl Default for CleanupParser {
527    fn default() -> Self {
528        Self::new()
529    }
530}