postfix_log_parser/components/
cleanup.rs1use crate::components::ComponentParser;
4use crate::error::ParseError;
5use crate::events::cleanup::CleanupEvent;
6use crate::events::ComponentEvent;
7use crate::utils::queue_id::create_queue_id_pattern;
8use lazy_static::lazy_static;
9use regex::Regex;
10
11pub struct CleanupParser;
15
16lazy_static! {
17 static ref MESSAGE_ID_REGEX: Regex = Regex::new(
20 &create_queue_id_pattern(r"^{QUEUE_ID}: message-id=<([^>]+)>$")
21 ).unwrap();
22
23 static ref QUEUE_FILE_WARNING_REGEX: Regex = Regex::new(
26 r"^([^:]+): create file ([^:]+): (.+)$"
27 ).unwrap();
28
29 static ref MESSAGE_SIZE_REGEX: Regex = Regex::new(
32 &create_queue_id_pattern(r"^{QUEUE_ID}: size=(\d+)$")
33 ).unwrap();
34
35 static ref HEADER_PROCESSING_REGEX: Regex = Regex::new(
38 &create_queue_id_pattern(r"^{QUEUE_ID}: header ([^:]+): (.+)$")
39 ).unwrap();
40
41 static ref ADDRESS_REWRITE_REGEX: Regex = Regex::new(
44 &create_queue_id_pattern(r"^{QUEUE_ID}: (from|to)=<([^>]+)> -> <([^>]+)>$")
45 ).unwrap();
46
47 static ref MESSAGE_REWRITE_REGEX: Regex = Regex::new(
50 &create_queue_id_pattern(r"^{QUEUE_ID}: rewrite: (.+)$")
51 ).unwrap();
52
53 static ref FILTER_ACTION_REGEX: Regex = Regex::new(
56 &create_queue_id_pattern(r"^{QUEUE_ID}: filter ([^:]+): (.+)$")
57 ).unwrap();
58
59 static ref MILTER_INTERACTION_REGEX: Regex = Regex::new(
62 &create_queue_id_pattern(r"^{QUEUE_ID}: milter ([^:]+): (.+)$")
63 ).unwrap();
64
65 static ref MESSAGE_REJECT_REGEX: Regex = Regex::new(
68 &create_queue_id_pattern(r"^{QUEUE_ID}: reject: (.+)$")
69 ).unwrap();
70
71 static ref CONFIG_WARNING_REGEX: Regex = Regex::new(
74 r"^warning: (.+)$"
75 ).unwrap();
76
77 static ref RESOURCE_LIMIT_REGEX: Regex = Regex::new(
80 r"^warning: ([^:]+): (.+)$"
81 ).unwrap();
82
83 static ref STATISTICS_REGEX: Regex = Regex::new(
86 r"^statistics: processed=(\d+) rejected=(\d+)(?:\s+errors=(\d+))?$"
87 ).unwrap();
88}
89
90impl CleanupParser {
91 pub fn new() -> Self {
92 CleanupParser
93 }
94
95 fn parse_message_id(&self, message: &str) -> Option<CleanupEvent> {
97 if let Some(captures) = MESSAGE_ID_REGEX.captures(message) {
98 return Some(CleanupEvent::MessageId {
99 queue_id: captures.get(1)?.as_str().to_string(),
100 message_id: captures.get(2)?.as_str().to_string(),
101 });
102 }
103 None
104 }
105
106 fn parse_queue_file_warning(&self, message: &str) -> Option<CleanupEvent> {
108 if let Some(captures) = QUEUE_FILE_WARNING_REGEX.captures(message) {
109 return Some(CleanupEvent::QueueFileWarning {
110 operation: captures.get(1)?.as_str().to_string(),
111 file_path: captures.get(2)?.as_str().to_string(),
112 error_reason: captures.get(3)?.as_str().to_string(),
113 });
114 }
115 None
116 }
117
118 fn parse_message_size(&self, message: &str) -> Option<CleanupEvent> {
120 if let Some(captures) = MESSAGE_SIZE_REGEX.captures(message) {
121 if let Ok(size) = captures.get(2)?.as_str().parse::<u64>() {
122 return Some(CleanupEvent::MessageSize {
123 queue_id: captures.get(1)?.as_str().to_string(),
124 size,
125 });
126 }
127 }
128 None
129 }
130
131 fn parse_header_processing(&self, message: &str) -> Option<CleanupEvent> {
133 if let Some(captures) = HEADER_PROCESSING_REGEX.captures(message) {
134 return Some(CleanupEvent::HeaderProcessing {
135 queue_id: captures.get(1)?.as_str().to_string(),
136 header_name: captures.get(2)?.as_str().to_string(),
137 header_value: captures.get(3)?.as_str().to_string(),
138 action: "process".to_string(), });
140 }
141 None
142 }
143
144 fn parse_address_rewrite(&self, message: &str) -> Option<CleanupEvent> {
146 if let Some(captures) = ADDRESS_REWRITE_REGEX.captures(message) {
147 return Some(CleanupEvent::AddressRewrite {
148 queue_id: captures.get(1)?.as_str().to_string(),
149 address_type: captures.get(2)?.as_str().to_string(),
150 original_address: captures.get(3)?.as_str().to_string(),
151 rewritten_address: captures.get(4)?.as_str().to_string(),
152 });
153 }
154 None
155 }
156
157 fn parse_message_rewrite(&self, message: &str) -> Option<CleanupEvent> {
159 if let Some(captures) = MESSAGE_REWRITE_REGEX.captures(message) {
160 return Some(CleanupEvent::MessageRewrite {
161 queue_id: captures.get(1)?.as_str().to_string(),
162 rewrite_type: "content".to_string(),
163 original: "".to_string(), rewritten: captures.get(2)?.as_str().to_string(),
165 });
166 }
167 None
168 }
169
170 fn parse_filter_action(&self, message: &str) -> Option<CleanupEvent> {
172 if let Some(captures) = FILTER_ACTION_REGEX.captures(message) {
173 return Some(CleanupEvent::FilterAction {
174 queue_id: captures.get(1)?.as_str().to_string(),
175 filter_name: captures.get(2)?.as_str().to_string(),
176 action: captures.get(3)?.as_str().to_string(),
177 details: None,
178 });
179 }
180 None
181 }
182
183 fn parse_milter_interaction(&self, message: &str) -> Option<CleanupEvent> {
185 if let Some(captures) = MILTER_INTERACTION_REGEX.captures(message) {
186 return Some(CleanupEvent::MilterInteraction {
187 queue_id: captures.get(1)?.as_str().to_string(),
188 milter_name: captures.get(2)?.as_str().to_string(),
189 command: "interaction".to_string(),
190 response: Some(captures.get(3)?.as_str().to_string()),
191 });
192 }
193 None
194 }
195
196 fn parse_message_reject(&self, message: &str) -> Option<CleanupEvent> {
198 if let Some(captures) = MESSAGE_REJECT_REGEX.captures(message) {
199 return Some(CleanupEvent::MessageReject {
200 queue_id: captures.get(1)?.as_str().to_string(),
201 reason: captures.get(2)?.as_str().to_string(),
202 action: "reject".to_string(),
203 });
204 }
205 None
206 }
207
208 fn parse_config_warning(&self, message: &str) -> Option<CleanupEvent> {
210 if let Some(captures) = CONFIG_WARNING_REGEX.captures(message) {
211 let warning_msg = captures.get(1)?.as_str();
212
213 if warning_msg.contains("disk")
215 || warning_msg.contains("memory")
216 || warning_msg.contains("queue")
217 || warning_msg.contains("limit")
218 {
219 return Some(CleanupEvent::ResourceLimit {
220 resource_type: "unknown".to_string(),
221 limit_details: warning_msg.to_string(),
222 current_value: None,
223 limit_value: None,
224 });
225 }
226
227 return Some(CleanupEvent::ConfigurationWarning {
228 warning_type: "cleanup_config".to_string(),
229 message: warning_msg.to_string(),
230 });
231 }
232 None
233 }
234
235 fn parse_statistics(&self, message: &str) -> Option<CleanupEvent> {
237 if let Some(captures) = STATISTICS_REGEX.captures(message) {
238 let processed = captures.get(1)?.as_str().parse::<u32>().ok();
239 let rejected = captures.get(2)?.as_str().parse::<u32>().ok();
240 let errors = captures.get(3).and_then(|m| m.as_str().parse::<u32>().ok());
241
242 return Some(CleanupEvent::Statistics {
243 processed,
244 rejected,
245 errors,
246 });
247 }
248 None
249 }
250}
251
252impl ComponentParser for CleanupParser {
253 fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
254 if let Some(event) = self.parse_message_id(message) {
257 return Ok(ComponentEvent::Cleanup(event));
258 }
259
260 if let Some(event) = self.parse_queue_file_warning(message) {
262 return Ok(ComponentEvent::Cleanup(event));
263 }
264
265 if let Some(event) = self.parse_message_size(message) {
267 return Ok(ComponentEvent::Cleanup(event));
268 }
269
270 if let Some(event) = self.parse_header_processing(message) {
272 return Ok(ComponentEvent::Cleanup(event));
273 }
274
275 if let Some(event) = self.parse_address_rewrite(message) {
277 return Ok(ComponentEvent::Cleanup(event));
278 }
279
280 if let Some(event) = self.parse_message_rewrite(message) {
282 return Ok(ComponentEvent::Cleanup(event));
283 }
284
285 if let Some(event) = self.parse_filter_action(message) {
287 return Ok(ComponentEvent::Cleanup(event));
288 }
289
290 if let Some(event) = self.parse_milter_interaction(message) {
292 return Ok(ComponentEvent::Cleanup(event));
293 }
294
295 if let Some(event) = self.parse_message_reject(message) {
297 return Ok(ComponentEvent::Cleanup(event));
298 }
299
300 if let Some(event) = self.parse_config_warning(message) {
302 return Ok(ComponentEvent::Cleanup(event));
303 }
304
305 if let Some(event) = self.parse_statistics(message) {
307 return Ok(ComponentEvent::Cleanup(event));
308 }
309
310 Ok(ComponentEvent::Cleanup(CleanupEvent::Other {
312 event_type: "unknown".to_string(),
313 message: message.to_string(),
314 queue_id: None, }))
316 }
317
318 fn component_name(&self) -> &'static str {
319 "cleanup"
320 }
321}
322
323impl Default for CleanupParser {
324 fn default() -> Self {
325 Self::new()
326 }
327}