postfix_log_parser/components/
cleanup.rs1use crate::components::ComponentParser;
4use crate::error::ParseError;
5use crate::events::cleanup::CleanupEvent;
6use crate::events::ComponentEvent;
7use crate::utils::common_fields::CommonFieldsParser;
8use crate::utils::queue_id::create_queue_id_pattern;
9use lazy_static::lazy_static;
10use regex::Regex;
11
12pub struct CleanupParser;
16
17lazy_static! {
18 static ref MESSAGE_ID_REGEX: Regex = Regex::new(
20 &create_queue_id_pattern(r"^{QUEUE_ID}: message-id=(?:<([^>]+)>|([^,\s]+))$")
21 ).unwrap();
22
23 static ref QUEUE_FILE_WARNING_REGEX: Regex = Regex::new(
25 r"^([^:]+): create file ([^:]+): (.+)$"
26 ).unwrap();
27
28 static ref MESSAGE_SIZE_REGEX: Regex = Regex::new(
30 &create_queue_id_pattern(r"^{QUEUE_ID}: size=(\d+)$")
31 ).unwrap();
32
33 static ref HEADER_PROCESSING_REGEX: Regex = Regex::new(
35 &create_queue_id_pattern(r"^{QUEUE_ID}: header ([^:]+): (.+)$")
36 ).unwrap();
37
38 static ref ADDRESS_REWRITE_REGEX: Regex = Regex::new(
40 &create_queue_id_pattern(r"^{QUEUE_ID}: (from|to)=<([^>]+)> -> <([^>]+)>$")
41 ).unwrap();
42
43 static ref MESSAGE_REWRITE_REGEX: Regex = Regex::new(
45 &create_queue_id_pattern(r"^{QUEUE_ID}: rewrite: (.+)$")
46 ).unwrap();
47
48 static ref FILTER_ACTION_REGEX: Regex = Regex::new(
50 &create_queue_id_pattern(r"^{QUEUE_ID}: filter ([^:]+): (.+)$")
51 ).unwrap();
52
53 static ref MILTER_INTERACTION_REGEX: Regex = Regex::new(
55 &create_queue_id_pattern(r"^{QUEUE_ID}: milter ([^:]+): (.+)$")
56 ).unwrap();
57
58 static ref MESSAGE_REJECT_REGEX: Regex = Regex::new(
60 &create_queue_id_pattern(r"^{QUEUE_ID}: reject: (.+)$")
61 ).unwrap();
62
63 static ref CONFIG_WARNING_REGEX: Regex = Regex::new(
65 r"^warning: (.+)$"
66 ).unwrap();
67
68 static ref RESOURCE_LIMIT_REGEX: Regex = Regex::new(
70 r"^warning: ([^:]+): (.+)$"
71 ).unwrap();
72
73 static ref MESSAGE_HOLD_REGEX: Regex = Regex::new(
75 &create_queue_id_pattern(r"^{QUEUE_ID}: hold: (.+)$")
76 ).unwrap();
77
78 static ref MESSAGE_DISCARD_REGEX: Regex = Regex::new(
80 &create_queue_id_pattern(r"^{QUEUE_ID}: discard: (.+)$")
81 ).unwrap();
82
83 static ref MESSAGE_REMOVED_REGEX: Regex = Regex::new(
85 &create_queue_id_pattern(r"^{QUEUE_ID}: removed \(([^)]+)\)(?:\s*(.*))?$")
86 ).unwrap();
87
88 static ref STATISTICS_REGEX: Regex = Regex::new(
90 r"^statistics: processed=(\d+) rejected=(\d+)(?:\s+errors=(\d+))?$"
91 ).unwrap();
92
93 static ref SNOWFLAKE_INIT_REGEX: Regex = Regex::new(
95 r"^snowflake: initialized with node_id=(\d+), node_bits=(\d+), seq_bits=(\d+)$"
96 ).unwrap();
97
98 static ref HOLD_CLIENT_REGEX: Regex = Regex::new(r"from ([^:;\s]+\[[^\]]+\]:?\d*)").unwrap();
100}
101
102impl CleanupParser {
103 pub fn new() -> Self {
104 CleanupParser
105 }
106
107 fn parse_message_id(&self, message: &str) -> Option<CleanupEvent> {
109 if let Some(captures) = MESSAGE_ID_REGEX.captures(message) {
110 let queue_id = captures.get(1)?.as_str().to_string();
111
112 let message_id = if let Some(bracketed) = captures.get(2) {
114 bracketed.as_str().to_string()
115 }
116 else if let Some(unbracketed) = captures.get(3) {
118 unbracketed.as_str().to_string()
119 } else {
120 return None;
121 };
122
123 return Some(CleanupEvent::MessageId {
124 queue_id,
125 message_id,
126 });
127 }
128 None
129 }
130
131 fn parse_queue_file_warning(&self, message: &str) -> Option<CleanupEvent> {
133 if let Some(captures) = QUEUE_FILE_WARNING_REGEX.captures(message) {
134 return Some(CleanupEvent::QueueFileWarning {
135 operation: captures.get(1)?.as_str().to_string(),
136 file_path: captures.get(2)?.as_str().to_string(),
137 error_reason: captures.get(3)?.as_str().to_string(),
138 });
139 }
140 None
141 }
142
143 fn parse_message_size(&self, message: &str) -> Option<CleanupEvent> {
145 if let Some(captures) = MESSAGE_SIZE_REGEX.captures(message) {
146 if let Ok(size) = captures.get(2)?.as_str().parse::<u64>() {
147 return Some(CleanupEvent::MessageSize {
148 queue_id: captures.get(1)?.as_str().to_string(),
149 size,
150 });
151 }
152 }
153 None
154 }
155
156 fn parse_header_processing(&self, message: &str) -> Option<CleanupEvent> {
158 if let Some(captures) = HEADER_PROCESSING_REGEX.captures(message) {
159 return Some(CleanupEvent::HeaderProcessing {
160 queue_id: captures.get(1)?.as_str().to_string(),
161 header_name: captures.get(2)?.as_str().to_string(),
162 header_value: captures.get(3)?.as_str().to_string(),
163 action: "process".to_string(), });
165 }
166 None
167 }
168
169 fn parse_address_rewrite(&self, message: &str) -> Option<CleanupEvent> {
171 if let Some(captures) = ADDRESS_REWRITE_REGEX.captures(message) {
172 return Some(CleanupEvent::AddressRewrite {
173 queue_id: captures.get(1)?.as_str().to_string(),
174 address_type: captures.get(2)?.as_str().to_string(),
175 original_address: captures.get(3)?.as_str().to_string(),
176 rewritten_address: captures.get(4)?.as_str().to_string(),
177 });
178 }
179 None
180 }
181
182 fn parse_message_rewrite(&self, message: &str) -> Option<CleanupEvent> {
184 if let Some(captures) = MESSAGE_REWRITE_REGEX.captures(message) {
185 return Some(CleanupEvent::MessageRewrite {
186 queue_id: captures.get(1)?.as_str().to_string(),
187 rewrite_type: "content".to_string(),
188 original: "".to_string(), rewritten: captures.get(2)?.as_str().to_string(),
190 });
191 }
192 None
193 }
194
195 fn parse_filter_action(&self, message: &str) -> Option<CleanupEvent> {
197 if let Some(captures) = FILTER_ACTION_REGEX.captures(message) {
198 return Some(CleanupEvent::FilterAction {
199 queue_id: captures.get(1)?.as_str().to_string(),
200 filter_name: captures.get(2)?.as_str().to_string(),
201 action: captures.get(3)?.as_str().to_string(),
202 details: None,
203 });
204 }
205 None
206 }
207
208 fn parse_milter_interaction(&self, message: &str) -> Option<CleanupEvent> {
210 if let Some(captures) = MILTER_INTERACTION_REGEX.captures(message) {
211 return Some(CleanupEvent::MilterInteraction {
212 queue_id: captures.get(1)?.as_str().to_string(),
213 milter_name: captures.get(2)?.as_str().to_string(),
214 command: "interaction".to_string(),
215 response: Some(captures.get(3)?.as_str().to_string()),
216 });
217 }
218 None
219 }
220
221 fn parse_message_reject(&self, message: &str) -> Option<CleanupEvent> {
223 if let Some(captures) = MESSAGE_REJECT_REGEX.captures(message) {
224 return Some(CleanupEvent::MessageReject {
225 queue_id: captures.get(1)?.as_str().to_string(),
226 reason: captures.get(2)?.as_str().to_string(),
227 action: "reject".to_string(),
228 });
229 }
230 None
231 }
232
233 fn parse_config_warning(&self, message: &str) -> Option<CleanupEvent> {
235 if let Some(captures) = CONFIG_WARNING_REGEX.captures(message) {
236 let warning_msg = captures.get(1)?.as_str();
237
238 if warning_msg.contains("disk")
240 || warning_msg.contains("memory")
241 || warning_msg.contains("queue")
242 || warning_msg.contains("limit")
243 {
244 return Some(CleanupEvent::ResourceLimit {
245 resource_type: "unknown".to_string(),
246 limit_details: warning_msg.to_string(),
247 current_value: None,
248 limit_value: None,
249 });
250 }
251
252 return Some(CleanupEvent::ConfigurationWarning {
253 warning_type: "cleanup_config".to_string(),
254 message: warning_msg.to_string(),
255 });
256 }
257 None
258 }
259
260 fn parse_message_hold(&self, message: &str) -> Option<CleanupEvent> {
262 let captures = MESSAGE_HOLD_REGEX.captures(message)?;
263 let queue_id = captures.get(1)?.as_str().to_string();
264 let hold_details = captures.get(2)?.as_str();
265
266 let hold_reason = self.extract_hold_reason(hold_details);
268
269 let sender =
271 CommonFieldsParser::extract_from_email(hold_details).map(|email| email.address);
272
273 let recipient =
274 CommonFieldsParser::extract_to_email(hold_details).map(|email| email.address);
275
276 let (client_hostname, client_ip, client_port) = self.extract_client_info(hold_details);
277
278 let protocol = CommonFieldsParser::extract_protocol(hold_details);
279
280 let helo = CommonFieldsParser::extract_helo(hold_details);
281
282 let description = if let Some(desc_start) = hold_details.rfind(": ") {
284 hold_details[(desc_start + 2)..].to_string()
285 } else {
286 hold_details.to_string()
287 };
288
289 Some(CleanupEvent::MessageHold {
290 queue_id,
291 hold_reason,
292 sender,
293 recipient,
294 client_ip,
295 client_hostname,
296 client_port,
297 protocol,
298 helo,
299 description,
300 })
301 }
302
303 fn extract_hold_reason(&self, hold_details: &str) -> String {
305 if hold_details.contains("header X-Decision-Result: Quarantine") {
306 "X-Decision-Result: Quarantine".to_string()
307 } else if hold_details.contains("hold") {
308 "hold".to_string()
309 } else {
310 "unknown".to_string()
311 }
312 }
313
314 fn extract_client_info(
316 &self,
317 hold_details: &str,
318 ) -> (Option<String>, Option<String>, Option<u16>) {
319 let client_info_str = match HOLD_CLIENT_REGEX
320 .captures(hold_details)
321 .and_then(|c| c.get(1))
322 .map(|m| m.as_str())
323 {
324 Some(s) => s,
325 None => return (None, None, None),
326 };
327
328 CommonFieldsParser::extract_client_info_simple(client_info_str)
330 .map(|client| (Some(client.hostname), Some(client.ip), client.port))
331 .unwrap_or((None, None, None))
332 }
333
334 fn parse_message_discard(&self, message: &str) -> Option<CleanupEvent> {
336 let captures = MESSAGE_DISCARD_REGEX.captures(message)?;
337 let queue_id = captures.get(1)?.as_str().to_string();
338 let discard_details = captures.get(2)?.as_str();
339
340 let discard_reason = self.extract_discard_reason(discard_details);
342
343 let sender =
345 CommonFieldsParser::extract_from_email(discard_details).map(|email| email.address);
346
347 let recipient =
348 CommonFieldsParser::extract_to_email(discard_details).map(|email| email.address);
349
350 let (client_hostname, client_ip, client_port) = self.extract_client_info(discard_details);
351
352 let protocol = CommonFieldsParser::extract_protocol(discard_details);
353
354 let helo = CommonFieldsParser::extract_helo(discard_details);
355
356 let description = if let Some(desc_start) = discard_details.rfind(": ") {
358 discard_details[(desc_start + 2)..].to_string()
359 } else {
360 discard_details.to_string()
361 };
362
363 Some(CleanupEvent::MessageDiscard {
364 queue_id,
365 discard_reason,
366 sender,
367 recipient,
368 client_ip,
369 client_hostname,
370 client_port,
371 protocol,
372 helo,
373 description,
374 })
375 }
376
377 fn parse_message_removed(&self, message: &str) -> Option<CleanupEvent> {
379 let captures = MESSAGE_REMOVED_REGEX.captures(message)?;
380 let queue_id = captures.get(1)?.as_str().to_string();
381 let removal_reason = captures.get(2)?.as_str().to_string();
382 let details = captures.get(3).map(|m| m.as_str().to_string());
383
384 Some(CleanupEvent::MessageRemoved {
385 queue_id,
386 removal_reason,
387 details,
388 })
389 }
390
391 fn extract_discard_reason(&self, discard_details: &str) -> String {
393 if discard_details.contains("header X-Decision-Result: Discard") {
394 "X-Decision-Result: Discard".to_string()
395 } else if discard_details.contains("discard") {
396 "discard".to_string()
397 } else {
398 "unknown".to_string()
399 }
400 }
401
402 fn parse_statistics(&self, message: &str) -> Option<CleanupEvent> {
404 if let Some(captures) = STATISTICS_REGEX.captures(message) {
405 let processed = captures.get(1)?.as_str().parse::<u32>().ok();
406 let rejected = captures.get(2)?.as_str().parse::<u32>().ok();
407 let errors = captures.get(3).and_then(|m| m.as_str().parse::<u32>().ok());
408
409 return Some(CleanupEvent::Statistics {
410 processed,
411 rejected,
412 errors,
413 });
414 }
415 None
416 }
417
418 fn parse_snowflake_init(&self, message: &str) -> Option<CleanupEvent> {
420 if let Some(captures) = SNOWFLAKE_INIT_REGEX.captures(message) {
421 let node_id = captures.get(1)?.as_str().parse::<u32>().ok()?;
422 let node_bits = captures.get(2)?.as_str().parse::<u32>().ok()?;
423 let seq_bits = captures.get(3)?.as_str().parse::<u32>().ok()?;
424
425 return Some(CleanupEvent::SnowflakeInit {
426 node_id,
427 node_bits,
428 seq_bits,
429 });
430 }
431 None
432 }
433}
434
435impl ComponentParser for CleanupParser {
436 fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
437 if let Some(event) = self.parse_message_id(message) {
440 return Ok(ComponentEvent::Cleanup(event));
441 }
442
443 if let Some(event) = self.parse_queue_file_warning(message) {
445 return Ok(ComponentEvent::Cleanup(event));
446 }
447
448 if let Some(event) = self.parse_message_size(message) {
450 return Ok(ComponentEvent::Cleanup(event));
451 }
452
453 if let Some(event) = self.parse_header_processing(message) {
455 return Ok(ComponentEvent::Cleanup(event));
456 }
457
458 if let Some(event) = self.parse_address_rewrite(message) {
460 return Ok(ComponentEvent::Cleanup(event));
461 }
462
463 if let Some(event) = self.parse_message_rewrite(message) {
465 return Ok(ComponentEvent::Cleanup(event));
466 }
467
468 if let Some(event) = self.parse_filter_action(message) {
470 return Ok(ComponentEvent::Cleanup(event));
471 }
472
473 if let Some(event) = self.parse_milter_interaction(message) {
475 return Ok(ComponentEvent::Cleanup(event));
476 }
477
478 if let Some(event) = self.parse_message_reject(message) {
480 return Ok(ComponentEvent::Cleanup(event));
481 }
482
483 if let Some(event) = self.parse_config_warning(message) {
485 return Ok(ComponentEvent::Cleanup(event));
486 }
487
488 if let Some(event) = self.parse_message_hold(message) {
490 return Ok(ComponentEvent::Cleanup(event));
491 }
492
493 if let Some(event) = self.parse_message_discard(message) {
495 return Ok(ComponentEvent::Cleanup(event));
496 }
497
498 if let Some(event) = self.parse_message_removed(message) {
500 return Ok(ComponentEvent::Cleanup(event));
501 }
502
503 if let Some(event) = self.parse_statistics(message) {
505 return Ok(ComponentEvent::Cleanup(event));
506 }
507
508 if let Some(event) = self.parse_snowflake_init(message) {
510 return Ok(ComponentEvent::Cleanup(event));
511 }
512
513 Ok(ComponentEvent::Cleanup(CleanupEvent::Other {
515 event_type: "unknown".to_string(),
516 message: message.to_string(),
517 queue_id: None, }))
519 }
520
521 fn component_name(&self) -> &'static str {
522 "cleanup"
523 }
524}
525
526impl Default for CleanupParser {
527 fn default() -> Self {
528 Self::new()
529 }
530}