postfix_log_parser/components/
qmgr.rs1use crate::components::ComponentParser;
7use crate::error::ParseError;
8use crate::events::{ComponentEvent, QmgrEvent};
9use crate::utils::queue_id::{create_queue_id_pattern, extract_queue_id};
10use lazy_static::lazy_static;
11use regex::Regex;
12
13pub struct QmgrParser;
19
20lazy_static! {
21 static ref CONFIG_WARNING_REGEX: Regex = Regex::new(
24 r"^warning:\s+(.+)"
25 ).unwrap();
26
27 static ref MESSAGE_ACTIVE_REGEX: Regex = Regex::new(
30 &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+size=(\d+),\s+nrcpt=(\d+)\s+\(queue active\)")
31 ).unwrap();
32
33 static ref MESSAGE_REMOVED_REGEX: Regex = Regex::new(
36 &create_queue_id_pattern(r"^{QUEUE_ID}:\s+removed(?:\s+\((.+)\))?")
37 ).unwrap();
38
39 static ref MESSAGE_DEFERRED_REGEX: Regex = Regex::new(
42 &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>(?:,\s+to=<([^>]*)>)?(?:,\s+relay=([^,]+))?(?:,\s+delay=([^,]+))?(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?(?:,\s+status=(.+))?")
43 ).unwrap();
44
45 static ref MESSAGE_SENT_REGEX: Regex = Regex::new(
48 &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,\s+relay=([^,]+),\s+delay=([^,]+)(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?,\s+status=sent\s+(.+)")
49 ).unwrap();
50
51 static ref MESSAGE_BOUNCED_REGEX: Regex = Regex::new(
54 &create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,.*status=bounced\s+\((.+)\)(?:,\s+dsn=([^,\)]+))?")
55 ).unwrap();
56
57 static ref QUEUE_STATS_REGEX: Regex = Regex::new(
60 r"^statistics:\s+active=(\d+)\s+deferred=(\d+)\s+hold=(\d+)\s+incoming=(\d+)\s+maildrop=(\d+)"
61 ).unwrap();
62
63 static ref TRANSPORT_STATUS_REGEX: Regex = Regex::new(
66 r"^transport\s+([^:]+):\s+(.+)"
67 ).unwrap();
68
69 static ref RESOURCE_LIMIT_REGEX: Regex = Regex::new(
72 r"^warning:\s+([^(]+)\s+\((\d+)\)\s+(.+)"
73 ).unwrap();
74
75 static ref QUEUE_FLUSH_REGEX: Regex = Regex::new(
78 r"^flush(?:ing)?\s+queue(?:\s+([^:]+))?(?::\s+(\d+)\s+messages)?"
79 ).unwrap();
80}
81
82impl QmgrParser {
83 pub fn new() -> Self {
84 Self
85 }
86
87 fn parse_configuration_warning(&self, message: &str) -> Option<QmgrEvent> {
90 if message.contains("qmgr_message_recipient_limit")
92 || message.contains("qmgr_message_active_limit")
93 || message.contains("process_limit")
94 || (message.contains("queue")
95 && (message.contains("limit") || message.contains("adjusting")))
96 {
97 let warning_type = if message.contains("qmgr_message_recipient_limit") {
99 "recipient_limit_adjustment".to_string()
100 } else if message.contains("qmgr_message_active_limit") {
101 "active_limit_warning".to_string()
102 } else if message.contains("process_limit") {
103 "process_limit_warning".to_string()
104 } else if message.contains("queue") {
105 "queue_warning".to_string()
106 } else {
107 "general_warning".to_string()
108 };
109
110 return Some(QmgrEvent::ConfigurationWarning {
111 warning_type,
112 message: message.to_string(),
113 });
114 }
115 None
116 }
117
118 fn parse_message_active(&self, message: &str) -> Option<QmgrEvent> {
120 if let Some(captures) = MESSAGE_ACTIVE_REGEX.captures(message) {
121 let queue_id = captures.get(1).unwrap().as_str().to_string();
122 let from = captures.get(2).unwrap().as_str().to_string();
123 let size = captures.get(3).unwrap().as_str().parse::<u64>().ok()?;
124 let nrcpt = captures.get(4).unwrap().as_str().parse::<u32>().ok()?;
125
126 return Some(QmgrEvent::MessageActive {
127 queue_id,
128 from,
129 size,
130 nrcpt,
131 });
132 }
133 None
134 }
135
136 fn parse_message_removed(&self, message: &str) -> Option<QmgrEvent> {
138 if let Some(captures) = MESSAGE_REMOVED_REGEX.captures(message) {
139 let queue_id = captures.get(1).unwrap().as_str().to_string();
140 let reason = captures.get(2).map(|m| m.as_str().to_string());
141
142 return Some(QmgrEvent::MessageRemoved { queue_id, reason });
143 }
144 None
145 }
146
147 fn parse_message_skipped(&self, message: &str) -> Option<QmgrEvent> {
149 if message.contains("skipped") {
150 if let Some(queue_id_match) = message.split(':').next() {
152 let queue_id = queue_id_match.trim().to_string();
153
154 let reason = if message.contains("still being delivered") {
156 "still being delivered".to_string()
157 } else {
158 message
160 .split("skipped,")
161 .nth(1)
162 .map(|s| s.trim().to_string())
163 .unwrap_or_else(|| "unknown reason".to_string())
164 };
165
166 let status_details = if message.contains(",") {
168 Some(
169 message
170 .split(',')
171 .skip(1)
172 .collect::<Vec<&str>>()
173 .join(",")
174 .trim()
175 .to_string(),
176 )
177 } else {
178 None
179 };
180
181 return Some(QmgrEvent::MessageSkipped {
182 queue_id,
183 reason,
184 status_details,
185 });
186 }
187 }
188 None
189 }
190
191 fn parse_message_deferred(&self, message: &str) -> Option<QmgrEvent> {
193 if message.contains("status=deferred") {
194 if let Some(captures) = MESSAGE_DEFERRED_REGEX.captures(message) {
195 let queue_id = captures.get(1).unwrap().as_str().to_string();
196 let from = captures.get(2).unwrap().as_str().to_string();
197 let to = captures.get(3).map(|m| m.as_str().to_string());
198 let relay = captures.get(4).map(|m| m.as_str().to_string());
199 let delay = captures
200 .get(5)
201 .map(|m| m.as_str().to_string())
202 .unwrap_or_default();
203 let delays = captures.get(6).map(|m| m.as_str().to_string());
204 let dsn = captures.get(7).map(|m| m.as_str().to_string());
205 let status = captures
206 .get(8)
207 .map(|m| m.as_str().to_string())
208 .unwrap_or_default();
209
210 return Some(QmgrEvent::MessageDeferred {
211 queue_id,
212 from,
213 to,
214 relay,
215 delay,
216 delays,
217 dsn,
218 status,
219 });
220 }
221 }
222 None
223 }
224
225 fn parse_message_sent(&self, message: &str) -> Option<QmgrEvent> {
227 if let Some(captures) = MESSAGE_SENT_REGEX.captures(message) {
228 let queue_id = captures.get(1).unwrap().as_str().to_string();
229 let from = captures.get(2).unwrap().as_str().to_string();
230 let to = captures.get(3).unwrap().as_str().to_string();
231 let relay = captures.get(4).unwrap().as_str().to_string();
232 let delay = captures.get(5).unwrap().as_str().to_string();
233 let delays = captures.get(6).map(|m| m.as_str().to_string());
234 let dsn = captures.get(7).map(|m| m.as_str().to_string());
235 let status = captures.get(8).unwrap().as_str().to_string();
236
237 return Some(QmgrEvent::MessageSent {
238 queue_id,
239 from,
240 to,
241 relay,
242 delay,
243 delays,
244 dsn,
245 status,
246 });
247 }
248 None
249 }
250
251 fn parse_other_events(&self, message: &str) -> Option<QmgrEvent> {
253 if let Some(captures) = QUEUE_STATS_REGEX.captures(message) {
255 let active = captures.get(1).unwrap().as_str().parse::<u32>().ok();
256 let deferred = captures.get(2).unwrap().as_str().parse::<u32>().ok();
257 let hold = captures.get(3).unwrap().as_str().parse::<u32>().ok();
258 let incoming = captures.get(4).unwrap().as_str().parse::<u32>().ok();
259 let maildrop = captures.get(5).unwrap().as_str().parse::<u32>().ok();
260
261 return Some(QmgrEvent::QueueStats {
262 active,
263 deferred,
264 hold,
265 incoming,
266 maildrop,
267 });
268 }
269
270 if let Some(captures) = TRANSPORT_STATUS_REGEX.captures(message) {
272 let transport = captures.get(1).unwrap().as_str().to_string();
273 let status = captures.get(2).unwrap().as_str().to_string();
274
275 return Some(QmgrEvent::TransportStatus {
276 transport,
277 status,
278 details: None,
279 });
280 }
281
282 if let Some(captures) = QUEUE_FLUSH_REGEX.captures(message) {
284 let queue_name = captures.get(1).map(|m| m.as_str().to_string());
285 let message_count = captures.get(2).and_then(|m| m.as_str().parse::<u32>().ok());
286
287 return Some(QmgrEvent::QueueFlush {
288 queue_name,
289 message_count,
290 });
291 }
292
293 None
294 }
295}
296
297impl ComponentParser for QmgrParser {
298 fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
299 if let Some(event) = self.parse_configuration_warning(message) {
303 return Ok(ComponentEvent::Qmgr(event));
304 }
305
306 if let Some(event) = self.parse_message_active(message) {
308 return Ok(ComponentEvent::Qmgr(event));
309 }
310
311 if let Some(event) = self.parse_message_removed(message) {
313 return Ok(ComponentEvent::Qmgr(event));
314 }
315
316 if let Some(event) = self.parse_message_skipped(message) {
318 return Ok(ComponentEvent::Qmgr(event));
319 }
320
321 if let Some(event) = self.parse_message_deferred(message) {
323 return Ok(ComponentEvent::Qmgr(event));
324 }
325
326 if let Some(event) = self.parse_message_sent(message) {
328 return Ok(ComponentEvent::Qmgr(event));
329 }
330
331 if let Some(event) = self.parse_other_events(message) {
333 return Ok(ComponentEvent::Qmgr(event));
334 }
335
336 let queue_id = extract_queue_id(message);
338
339 Ok(ComponentEvent::Qmgr(QmgrEvent::Other {
340 event_type: "unclassified".to_string(),
341 message: message.to_string(),
342 queue_id,
343 }))
344 }
345
346 fn component_name(&self) -> &'static str {
347 "qmgr"
348 }
349}
350
351impl Default for QmgrParser {
352 fn default() -> Self {
353 Self::new()
354 }
355}