postfix_log_parser/components/
qmgr.rs1use crate::components::ComponentParser;
7use crate::error::ParseError;
8use crate::events::{ComponentEvent, QmgrEvent};
9use crate::utils::common_fields::CommonFieldsParser;
10use crate::utils::queue_id::{create_queue_id_pattern, extract_queue_id};
11use lazy_static::lazy_static;
12use regex::Regex;
13
14pub struct QmgrParser;
20
21lazy_static! {
22 static ref CONFIG_WARNING_REGEX: Regex = Regex::new(
24 r"^warning:\s+(.+)"
25 ).unwrap();
26
27 static ref MESSAGE_ACTIVE_REGEX: Regex = Regex::new(
29 &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+size=(\d+),\s+nrcpt=(\d+)\s+\(queue active\)")
30 ).unwrap();
31
32 static ref MESSAGE_REMOVED_REGEX: Regex = Regex::new(
34 &create_queue_id_pattern(r"^{QUEUE_ID}:\s+removed(?:\s+\((.+)\))?")
35 ).unwrap();
36
37 static ref MESSAGE_DEFERRED_REGEX: Regex = Regex::new(
39 &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>(?:,\s+to=<([^>]*)>)?(?:,\s+relay=([^,]+))?(?:,\s+delay=([^,]+))?(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?(?:,\s+status=(.+))?")
40 ).unwrap();
41
42 static ref MESSAGE_SENT_REGEX: Regex = Regex::new(
44 &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,\s+relay=([^,]+),\s+delay=([^,]+)(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?,\s+status=sent\s+(.+)")
45 ).unwrap();
46
47 static ref MESSAGE_BOUNCED_REGEX: Regex = Regex::new(
49 &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,.*status=bounced\s+\((.+)\)(?:,\s+dsn=([^,\)]+))?")
50 ).unwrap();
51
52 static ref QUEUE_STATS_REGEX: Regex = Regex::new(
54 r"^statistics:\s+active=(\d+)\s+deferred=(\d+)\s+hold=(\d+)\s+incoming=(\d+)\s+maildrop=(\d+)"
55 ).unwrap();
56
57 static ref TRANSPORT_STATUS_REGEX: Regex = Regex::new(
59 r"^transport\s+([^:]+):\s+(.+)"
60 ).unwrap();
61
62 static ref RESOURCE_LIMIT_REGEX: Regex = Regex::new(
64 r"^warning:\s+([^(]+)\s+\((\d+)\)\s+(.+)"
65 ).unwrap();
66
67 static ref QUEUE_FLUSH_REGEX: Regex = Regex::new(
69 r"^flush(?:ing)?\s+queue(?:\s+([^:]+))?(?::\s+(\d+)\s+messages)?"
70 ).unwrap();
71}
72
73impl QmgrParser {
74 pub fn new() -> Self {
75 Self
76 }
77
78 fn parse_configuration_warning(&self, message: &str) -> Option<QmgrEvent> {
81 if message.contains("qmgr_message_recipient_limit")
83 || message.contains("qmgr_message_active_limit")
84 || message.contains("process_limit")
85 || (message.contains("queue")
86 && (message.contains("limit") || message.contains("adjusting")))
87 {
88 let warning_type = if message.contains("qmgr_message_recipient_limit") {
90 "recipient_limit_adjustment".to_string()
91 } else if message.contains("qmgr_message_active_limit") {
92 "active_limit_warning".to_string()
93 } else if message.contains("process_limit") {
94 "process_limit_warning".to_string()
95 } else if message.contains("queue") {
96 "queue_warning".to_string()
97 } else {
98 "general_warning".to_string()
99 };
100
101 return Some(QmgrEvent::ConfigurationWarning {
102 warning_type,
103 message: message.to_string(),
104 });
105 }
106 None
107 }
108
109 fn parse_message_active(&self, message: &str) -> Option<QmgrEvent> {
111 if let Some(captures) = MESSAGE_ACTIVE_REGEX.captures(message) {
112 let queue_id = captures.get(1).unwrap().as_str().to_string();
113
114 let from = CommonFieldsParser::extract_from_email(message)
116 .map(|email| email.address)
117 .unwrap_or_else(|| captures.get(2).unwrap().as_str().to_string());
118
119 let size = CommonFieldsParser::extract_size(message).unwrap_or_else(|| {
121 captures
122 .get(3)
123 .unwrap()
124 .as_str()
125 .parse::<u64>()
126 .ok()
127 .unwrap_or(0)
128 });
129
130 let nrcpt = captures.get(4).unwrap().as_str().parse::<u32>().ok()?;
131
132 return Some(QmgrEvent::MessageActive {
133 queue_id,
134 from,
135 size,
136 nrcpt,
137 });
138 }
139 None
140 }
141
142 fn parse_message_removed(&self, message: &str) -> Option<QmgrEvent> {
144 if let Some(captures) = MESSAGE_REMOVED_REGEX.captures(message) {
145 let queue_id = captures.get(1).unwrap().as_str().to_string();
146 let reason = captures.get(2).map(|m| m.as_str().to_string());
147
148 return Some(QmgrEvent::MessageRemoved { queue_id, reason });
149 }
150 None
151 }
152
153 fn parse_message_skipped(&self, message: &str) -> Option<QmgrEvent> {
155 if message.contains("skipped") {
156 if let Some(queue_id_match) = message.split(':').next() {
158 let queue_id = queue_id_match.trim().to_string();
159
160 let reason = if message.contains("still being delivered") {
162 "still being delivered".to_string()
163 } else {
164 message
166 .split("skipped,")
167 .nth(1)
168 .map(|s| s.trim().to_string())
169 .unwrap_or_else(|| "unknown reason".to_string())
170 };
171
172 let status_details = if message.contains(",") {
174 Some(
175 message
176 .split(',')
177 .skip(1)
178 .collect::<Vec<&str>>()
179 .join(",")
180 .trim()
181 .to_string(),
182 )
183 } else {
184 None
185 };
186
187 return Some(QmgrEvent::MessageSkipped {
188 queue_id,
189 reason,
190 status_details,
191 });
192 }
193 }
194 None
195 }
196
197 fn parse_message_deferred(&self, message: &str) -> Option<QmgrEvent> {
199 if message.contains("status=deferred") {
200 if let Some(captures) = MESSAGE_DEFERRED_REGEX.captures(message) {
201 let queue_id = captures.get(1).unwrap().as_str().to_string();
202
203 let from = CommonFieldsParser::extract_from_email(message)
205 .map(|email| email.address)
206 .unwrap_or_else(|| captures.get(2).unwrap().as_str().to_string());
207
208 let to = CommonFieldsParser::extract_to_email(message)
209 .map(|email| email.address)
210 .or_else(|| captures.get(3).map(|m| m.as_str().to_string()));
211
212 let relay_info = CommonFieldsParser::extract_relay_info(message);
213 let relay = relay_info
214 .as_ref()
215 .map(|r| {
216 format!(
217 "{}[{}]:{}",
218 r.hostname,
219 r.ip.as_deref().unwrap_or(""),
220 r.port.map_or(25, |p| p)
221 )
222 })
223 .or_else(|| captures.get(4).map(|m| m.as_str().to_string()));
224
225 let delay_info = CommonFieldsParser::extract_delay_info(message);
226 let delay = delay_info
227 .as_ref()
228 .map(|d| d.total.to_string())
229 .unwrap_or_else(|| {
230 captures
231 .get(5)
232 .map(|m| m.as_str().to_string())
233 .unwrap_or_default()
234 });
235
236 let delays = delay_info
237 .as_ref()
238 .and_then(|d| d.breakdown.as_ref())
239 .map(|breakdown| {
240 format!(
241 "{}/{}/{}/{}",
242 breakdown[0], breakdown[1], breakdown[2], breakdown[3]
243 )
244 })
245 .or_else(|| captures.get(6).map(|m| m.as_str().to_string()));
246
247 let status_info = CommonFieldsParser::extract_status_info(message);
248 let dsn = status_info
249 .as_ref()
250 .and_then(|s| s.dsn.clone())
251 .or_else(|| captures.get(7).map(|m| m.as_str().to_string()));
252
253 let status = status_info
254 .as_ref()
255 .map(|s| s.status.clone())
256 .unwrap_or_else(|| {
257 captures
258 .get(8)
259 .map(|m| m.as_str().to_string())
260 .unwrap_or_default()
261 });
262
263 return Some(QmgrEvent::MessageDeferred {
264 queue_id,
265 from,
266 to,
267 relay,
268 delay,
269 delays,
270 dsn,
271 status,
272 });
273 }
274 }
275 None
276 }
277
278 fn parse_message_sent(&self, message: &str) -> Option<QmgrEvent> {
280 if let Some(captures) = MESSAGE_SENT_REGEX.captures(message) {
281 let queue_id = captures.get(1).unwrap().as_str().to_string();
282 let from = captures.get(2).unwrap().as_str().to_string();
283 let to = captures.get(3).unwrap().as_str().to_string();
284 let relay = captures.get(4).unwrap().as_str().to_string();
285 let delay = captures.get(5).unwrap().as_str().to_string();
286 let delays = captures.get(6).map(|m| m.as_str().to_string());
287 let dsn = captures.get(7).map(|m| m.as_str().to_string());
288 let status = captures.get(8).unwrap().as_str().to_string();
289
290 return Some(QmgrEvent::MessageSent {
291 queue_id,
292 from,
293 to,
294 relay,
295 delay,
296 delays,
297 dsn,
298 status,
299 });
300 }
301 None
302 }
303
304 fn parse_other_events(&self, message: &str) -> Option<QmgrEvent> {
306 if let Some(captures) = QUEUE_STATS_REGEX.captures(message) {
308 let active = captures.get(1).unwrap().as_str().parse::<u32>().ok();
309 let deferred = captures.get(2).unwrap().as_str().parse::<u32>().ok();
310 let hold = captures.get(3).unwrap().as_str().parse::<u32>().ok();
311 let incoming = captures.get(4).unwrap().as_str().parse::<u32>().ok();
312 let maildrop = captures.get(5).unwrap().as_str().parse::<u32>().ok();
313
314 return Some(QmgrEvent::QueueStats {
315 active,
316 deferred,
317 hold,
318 incoming,
319 maildrop,
320 });
321 }
322
323 if let Some(captures) = TRANSPORT_STATUS_REGEX.captures(message) {
325 let transport = captures.get(1).unwrap().as_str().to_string();
326 let status = captures.get(2).unwrap().as_str().to_string();
327
328 return Some(QmgrEvent::TransportStatus {
329 transport,
330 status,
331 details: None,
332 });
333 }
334
335 if let Some(captures) = QUEUE_FLUSH_REGEX.captures(message) {
337 let queue_name = captures.get(1).map(|m| m.as_str().to_string());
338 let message_count = captures.get(2).and_then(|m| m.as_str().parse::<u32>().ok());
339
340 return Some(QmgrEvent::QueueFlush {
341 queue_name,
342 message_count,
343 });
344 }
345
346 None
347 }
348}
349
350impl ComponentParser for QmgrParser {
351 fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
352 if let Some(event) = self.parse_configuration_warning(message) {
356 return Ok(ComponentEvent::Qmgr(event));
357 }
358
359 if let Some(event) = self.parse_message_active(message) {
361 return Ok(ComponentEvent::Qmgr(event));
362 }
363
364 if let Some(event) = self.parse_message_removed(message) {
366 return Ok(ComponentEvent::Qmgr(event));
367 }
368
369 if let Some(event) = self.parse_message_skipped(message) {
371 return Ok(ComponentEvent::Qmgr(event));
372 }
373
374 if let Some(event) = self.parse_message_deferred(message) {
376 return Ok(ComponentEvent::Qmgr(event));
377 }
378
379 if let Some(event) = self.parse_message_sent(message) {
381 return Ok(ComponentEvent::Qmgr(event));
382 }
383
384 if let Some(event) = self.parse_other_events(message) {
386 return Ok(ComponentEvent::Qmgr(event));
387 }
388
389 let queue_id = extract_queue_id(message);
391
392 Ok(ComponentEvent::Qmgr(QmgrEvent::Other {
393 event_type: "unclassified".to_string(),
394 message: message.to_string(),
395 queue_id,
396 }))
397 }
398
399 fn component_name(&self) -> &'static str {
400 "qmgr"
401 }
402}
403
404impl Default for QmgrParser {
405 fn default() -> Self {
406 Self::new()
407 }
408}