1use crate::components::ComponentParser;
4use crate::error::ParseError;
5use crate::events::smtp::SmtpEvent;
6use crate::events::ComponentEvent;
7use crate::utils::queue_id::{create_queue_id_pattern, extract_queue_id};
8use lazy_static::lazy_static;
9use regex::Regex;
10
11pub struct SmtpParser;
15
16lazy_static! {
17 static ref DELIVERY_SUCCESS_REGEX: Regex = Regex::new(
20 &create_queue_id_pattern(r"^{QUEUE_ID}: to=<([^>]+)>, relay=([^,]+), delay=([\d.]+), delays=([\d./]+), dsn=([\d.]+), status=sent \((.+)\)$")
21 ).unwrap();
22
23 static ref DELIVERY_BOUNCED_REGEX: Regex = Regex::new(
26 &create_queue_id_pattern(r"^{QUEUE_ID}: to=<([^>]+)>, relay=([^,]+), delay=([\d.]+), delays=([\d./]+), dsn=([\d.]+), status=bounced \((.+)\)$")
27 ).unwrap();
28
29 static ref DELIVERY_DEFERRED_REGEX: Regex = Regex::new(
32 &create_queue_id_pattern(r"^{QUEUE_ID}: (?:to=<([^>]+)>, )?(?:relay=([^,]+), )?(?:delay=([\d.]+), )?(?:delays=([\d./]+), )?(?:dsn=([\d.]+), )?status=deferred \((.+)\)$")
33 ).unwrap();
34
35 static ref CONNECTION_TIMEOUT_REGEX: Regex = Regex::new(
38 &create_queue_id_pattern(r"^{QUEUE_ID}: connect to ([^\[\]]+)\[([^\]]+)\]:(\d+): Connection timed out$")
39 ).unwrap();
40
41 static ref CONNECTION_REFUSED_REGEX: Regex = Regex::new(
44 &create_queue_id_pattern(r"^{QUEUE_ID}: connect to ([^\[\]]+)\[([^\]]+)\]:(\d+): Connection refused$")
45 ).unwrap();
46
47 static ref LOST_CONNECTION_REGEX: Regex = Regex::new(
50 &create_queue_id_pattern(r"^{QUEUE_ID}: lost connection with ([^\[\]]+)\[([^\]]+)\] while (.+)$")
51 ).unwrap();
52
53 static ref PROTOCOL_BREAKING_LINE_REGEX: Regex = Regex::new(
56 &create_queue_id_pattern(r"^{QUEUE_ID}: breaking line > (\d+) bytes with <CR><LF>SPACE$")
57 ).unwrap();
58
59 static ref PROTOCOL_PIPELINING_REGEX: Regex = Regex::new(
61 &create_queue_id_pattern(r"^{QUEUE_ID}: Using ESMTP PIPELINING, TCP send buffer size is (\d+), PIPELINING buffer size is (\d+)$")
62 ).unwrap();
63
64 static ref PROTOCOL_SERVER_FEATURES_REGEX: Regex = Regex::new(
66 r"^server features: (0x[0-9a-fA-F]+) size (\d+)$"
67 ).unwrap();
68
69 static ref RELAY_PARSER_REGEX: Regex = Regex::new(
71 r"^([^\[\]]+)(?:\[([^\]]+)\])?(?::(\d+))?$"
72 ).unwrap();
73
74 static ref DEBUG_VSTREAM_REGEX: Regex = Regex::new(
77 r"^vstream_buf_get_ready: fd (\d+) got (\d+)$"
78 ).unwrap();
79
80 static ref DEBUG_REC_GET_REGEX: Regex = Regex::new(
82 r"^rec_get: type ([A-Z]) len (\d+) data (.+)$"
83 ).unwrap();
84
85 static ref TLS_INIT_REGEX: Regex = Regex::new(
88 r"^initializing the client-side TLS engine$"
89 ).unwrap();
90
91 static ref TLS_SETUP_REGEX: Regex = Regex::new(
93 r"^setting up TLS connection to ([^\[\]]+)\[([^\]]+)\]:(\d+)$"
94 ).unwrap();
95
96 static ref TLS_CIPHER_REGEX: Regex = Regex::new(
98 r#"^([^\[\]]+)\[([^\]]+)\]:(\d+): TLS cipher list "(.+)"$"#
99 ).unwrap();
100
101 static ref SSL_CONNECT_REGEX: Regex = Regex::new(
103 r"^SSL_connect:(.+)$"
104 ).unwrap();
105
106 static ref TLS_CERT_VERIFY_REGEX: Regex = Regex::new(
109 r"^([^\[\]]+)\[([^\]]+)\]:(\d+): depth=(\d+) verify=(\d+) subject=(.+)$"
110 ).unwrap();
111
112 static ref TLS_CERT_INFO_REGEX: Regex = Regex::new(
114 r"^([^\[\]]+)\[([^\]]+)\]:(\d+): subject_CN=([^,]+), issuer_CN=([^,]+), fingerprint=([^,]+), pkey_fingerprint=(.+)$"
115 ).unwrap();
116
117 static ref TLS_CONNECTION_ESTABLISHED_REGEX: Regex = Regex::new(
119 r"^Untrusted TLS connection established to ([^\[\]]+)\[([^\]]+)\]:(\d+): (.+)$"
120 ).unwrap();
121}
122
123impl SmtpParser {
124 pub fn new() -> Self {
125 SmtpParser
126 }
127
128 fn parse_delays(
130 &self,
131 delays_str: &str,
132 ) -> (Option<f64>, Option<f64>, Option<f64>, Option<f64>) {
133 let parts: Vec<&str> = delays_str.split('/').collect();
134 if parts.len() != 4 {
135 return (None, None, None, None);
136 }
137
138 let before_queue = parts[0].parse::<f64>().ok();
139 let in_queue = parts[1].parse::<f64>().ok();
140 let connection = parts[2].parse::<f64>().ok();
141 let transmission = parts[3].parse::<f64>().ok();
142
143 (before_queue, in_queue, connection, transmission)
144 }
145
146 fn parse_relay(&self, relay_str: &str) -> (String, Option<String>, Option<u16>) {
148 if relay_str == "none" {
149 return ("none".to_string(), None, None);
150 }
151
152 if let Some(captures) = RELAY_PARSER_REGEX.captures(relay_str) {
153 let hostname = captures
154 .get(1)
155 .map(|m| m.as_str().to_string())
156 .unwrap_or_default();
157 let ip = captures.get(2).map(|m| m.as_str().to_string());
158 let port = captures.get(3).and_then(|m| m.as_str().parse::<u16>().ok());
159 return (hostname, ip, port);
160 }
161
162 (relay_str.to_string(), None, None)
164 }
165
166 fn parse_delivery_success(&self, message: &str) -> Option<SmtpEvent> {
168 if let Some(captures) = DELIVERY_SUCCESS_REGEX.captures(message) {
169 let queue_id = captures.get(1)?.as_str().to_string();
170 let to = captures.get(2)?.as_str().to_string();
171 let relay_str = captures.get(3)?.as_str();
172 let delay: f64 = captures.get(4)?.as_str().parse().ok()?;
173 let delays_str = captures.get(5)?.as_str();
174 let dsn = Some(captures.get(6)?.as_str().to_string());
175 let status = captures.get(7)?.as_str().to_string();
176
177 let (relay_hostname, relay_ip, relay_port) = self.parse_relay(relay_str);
178 let (delay_before_queue, delay_in_queue, delay_connection, delay_transmission) =
179 self.parse_delays(delays_str);
180
181 return Some(SmtpEvent::Sent {
182 queue_id,
183 to,
184 relay_hostname,
185 relay_ip,
186 relay_port,
187 delay,
188 delay_before_queue,
189 delay_in_queue,
190 delay_connection,
191 delay_transmission,
192 dsn,
193 status,
194 message_size: None,
195 });
196 }
197 None
198 }
199
200 fn parse_delivery_bounced(&self, message: &str) -> Option<SmtpEvent> {
202 if let Some(captures) = DELIVERY_BOUNCED_REGEX.captures(message) {
203 let queue_id = captures.get(1)?.as_str().to_string();
204 let to = captures.get(2)?.as_str().to_string();
205 let relay_str = captures.get(3)?.as_str();
206 let delay: f64 = captures.get(4)?.as_str().parse().ok()?;
207 let delays_str = captures.get(5)?.as_str();
208 let dsn = Some(captures.get(6)?.as_str().to_string());
209 let bounce_reason = captures.get(7)?.as_str().to_string();
210
211 let (relay_hostname, relay_ip, relay_port) = self.parse_relay(relay_str);
212 let (delay_before_queue, delay_in_queue, delay_connection, delay_transmission) =
213 self.parse_delays(delays_str);
214
215 return Some(SmtpEvent::Bounced {
216 queue_id,
217 to,
218 relay_hostname: if relay_hostname == "none" {
219 None
220 } else {
221 Some(relay_hostname)
222 },
223 relay_ip,
224 relay_port,
225 delay: Some(delay),
226 delay_before_queue,
227 delay_in_queue,
228 delay_connection,
229 delay_transmission,
230 dsn,
231 status: "bounced".to_string(),
232 bounce_reason,
233 });
234 }
235 None
236 }
237
238 fn parse_delivery_deferred(&self, message: &str) -> Option<SmtpEvent> {
240 if let Some(captures) = DELIVERY_DEFERRED_REGEX.captures(message) {
241 let queue_id = captures.get(1)?.as_str().to_string();
242 let to = captures.get(2).map(|m| m.as_str().to_string());
243 let relay_str = captures.get(3).map(|m| m.as_str()).unwrap_or("none");
244 let delay = captures.get(4).and_then(|m| m.as_str().parse::<f64>().ok());
245 let delays_str = captures.get(5).map(|m| m.as_str()).unwrap_or("0/0/0/0");
246 let dsn = captures.get(6).map(|m| m.as_str().to_string());
247 let defer_reason = captures.get(7)?.as_str().to_string();
248
249 let (relay_hostname, relay_ip, relay_port) = self.parse_relay(relay_str);
250 let (delay_before_queue, delay_in_queue, delay_connection, delay_transmission) =
251 self.parse_delays(delays_str);
252
253 return Some(SmtpEvent::Deferred {
254 queue_id,
255 to,
256 relay_hostname: if relay_hostname == "none" {
257 None
258 } else {
259 Some(relay_hostname)
260 },
261 relay_ip,
262 relay_port,
263 delay,
264 delay_before_queue,
265 delay_in_queue,
266 delay_connection,
267 delay_transmission,
268 dsn,
269 status: "deferred".to_string(),
270 defer_reason,
271 });
272 }
273 None
274 }
275
276 fn parse_connection_timeout(&self, message: &str) -> Option<SmtpEvent> {
278 if let Some(captures) = CONNECTION_TIMEOUT_REGEX.captures(message) {
279 let queue_id = captures.get(1)?.as_str().to_string();
280 let target_hostname = captures.get(2)?.as_str().to_string();
281 let target_ip = captures.get(3)?.as_str().to_string();
282 let target_port: u16 = captures.get(4)?.as_str().parse().ok()?;
283
284 return Some(SmtpEvent::ConnectionTimeout {
285 queue_id,
286 target_hostname,
287 target_ip,
288 target_port,
289 timeout_duration: None,
290 });
291 }
292 None
293 }
294
295 fn parse_connection_refused(&self, message: &str) -> Option<SmtpEvent> {
297 if let Some(captures) = CONNECTION_REFUSED_REGEX.captures(message) {
298 let queue_id = captures.get(1)?.as_str().to_string();
299 let target_hostname = captures.get(2)?.as_str().to_string();
300 let target_ip = captures.get(3)?.as_str().to_string();
301 let target_port: u16 = captures.get(4)?.as_str().parse().ok()?;
302
303 return Some(SmtpEvent::ConnectionRefused {
304 queue_id,
305 target_hostname,
306 target_ip,
307 target_port,
308 });
309 }
310 None
311 }
312
313 fn parse_connection_lost(&self, message: &str) -> Option<SmtpEvent> {
315 if let Some(captures) = LOST_CONNECTION_REGEX.captures(message) {
316 let queue_id = captures.get(1)?.as_str().to_string();
317 let target_hostname = captures.get(2)?.as_str().to_string();
318 let target_ip = captures.get(3)?.as_str().to_string();
319 let lost_stage = captures.get(4)?.as_str().to_string();
320
321 return Some(SmtpEvent::ConnectionLost {
322 queue_id,
323 target_hostname,
324 target_ip,
325 lost_stage,
326 });
327 }
328 None
329 }
330
331 fn parse_protocol_interaction(&self, message: &str) -> Option<SmtpEvent> {
333 if let Some(captures) = PROTOCOL_BREAKING_LINE_REGEX.captures(message) {
335 let queue_id = captures.get(1)?.as_str().to_string();
336 let line_length = captures.get(2)?.as_str();
337
338 return Some(SmtpEvent::ProtocolInteraction {
339 queue_id,
340 interaction_type: "breaking_line".to_string(),
341 details: format!("Breaking line > {} bytes with <CR><LF>SPACE", line_length),
342 });
343 }
344
345 if let Some(captures) = PROTOCOL_PIPELINING_REGEX.captures(message) {
347 let queue_id = captures.get(1)?.as_str().to_string();
348 let send_buffer = captures.get(2)?.as_str();
349 let pipeline_buffer = captures.get(3)?.as_str();
350
351 return Some(SmtpEvent::ProtocolInteraction {
352 queue_id,
353 interaction_type: "esmtp_pipelining".to_string(),
354 details: format!(
355 "TCP send buffer: {}, PIPELINING buffer: {}",
356 send_buffer, pipeline_buffer
357 ),
358 });
359 }
360
361 if let Some(captures) = PROTOCOL_SERVER_FEATURES_REGEX.captures(message) {
363 let features = captures.get(1)?.as_str();
364 let max_size = captures.get(2)?.as_str();
365
366 return Some(SmtpEvent::ProtocolInteraction {
367 queue_id: "".to_string(), interaction_type: "server_features".to_string(),
369 details: format!("Features: {}, Max size: {}", features, max_size),
370 });
371 }
372
373 None
374 }
375
376 fn parse_debug_event(&self, message: &str) -> Option<SmtpEvent> {
378 if let Some(captures) = DEBUG_VSTREAM_REGEX.captures(message) {
380 let fd = captures.get(1)?.as_str();
381 let bytes = captures.get(2)?.as_str();
382
383 return Some(SmtpEvent::Other {
384 queue_id: None,
385 event_type: "debug_vstream".to_string(),
386 message: format!("vstream_buf_get_ready: fd {} got {} bytes", fd, bytes),
387 });
388 }
389
390 if let Some(captures) = DEBUG_REC_GET_REGEX.captures(message) {
392 let record_type = captures.get(1)?.as_str();
393 let length = captures.get(2)?.as_str();
394 let data = captures.get(3)?.as_str();
395
396 return Some(SmtpEvent::Other {
397 queue_id: None,
398 event_type: "debug_rec_get".to_string(),
399 message: format!("rec_get: type {} len {} data {}", record_type, length, data),
400 });
401 }
402
403 None
404 }
405
406 fn parse_tls_event(&self, message: &str) -> Option<SmtpEvent> {
408 if TLS_INIT_REGEX.is_match(message) {
410 return Some(SmtpEvent::Other {
411 queue_id: None,
412 event_type: "tls_init".to_string(),
413 message: "Initializing client-side TLS engine".to_string(),
414 });
415 }
416
417 if let Some(captures) = TLS_SETUP_REGEX.captures(message) {
419 let hostname = captures.get(1)?.as_str();
420 let ip = captures.get(2)?.as_str();
421 let port = captures.get(3)?.as_str();
422
423 return Some(SmtpEvent::Other {
424 queue_id: None,
425 event_type: "tls_setup".to_string(),
426 message: format!("Setting up TLS connection to {}[{}]:{}", hostname, ip, port),
427 });
428 }
429
430 if let Some(captures) = TLS_CIPHER_REGEX.captures(message) {
432 let hostname = captures.get(1)?.as_str();
433 let ip = captures.get(2)?.as_str();
434 let port = captures.get(3)?.as_str();
435 let cipher_list = captures.get(4)?.as_str();
436
437 return Some(SmtpEvent::Other {
438 queue_id: None,
439 event_type: "tls_cipher".to_string(),
440 message: format!(
441 "TLS cipher list for {}[{}]:{}: {}",
442 hostname, ip, port, cipher_list
443 ),
444 });
445 }
446
447 if let Some(captures) = SSL_CONNECT_REGEX.captures(message) {
449 let state = captures.get(1)?.as_str();
450
451 return Some(SmtpEvent::Other {
452 queue_id: None,
453 event_type: "ssl_connect".to_string(),
454 message: format!("SSL_connect:{}", state),
455 });
456 }
457
458 if let Some(captures) = TLS_CERT_VERIFY_REGEX.captures(message) {
460 let hostname = captures.get(1)?.as_str();
461 let ip = captures.get(2)?.as_str();
462 let port = captures.get(3)?.as_str();
463 let depth = captures.get(4)?.as_str();
464 let verify = captures.get(5)?.as_str();
465 let subject = captures.get(6)?.as_str();
466
467 return Some(SmtpEvent::Other {
468 queue_id: None,
469 event_type: "tls_cert_verify".to_string(),
470 message: format!(
471 "Certificate verification for {}[{}]:{}: depth={}, verify={}, subject={}",
472 hostname, ip, port, depth, verify, subject
473 ),
474 });
475 }
476
477 if let Some(captures) = TLS_CERT_INFO_REGEX.captures(message) {
479 let hostname = captures.get(1)?.as_str();
480 let ip = captures.get(2)?.as_str();
481 let port = captures.get(3)?.as_str();
482 let subject_cn = captures.get(4)?.as_str();
483 let issuer_cn = captures.get(5)?.as_str();
484
485 return Some(SmtpEvent::Other {
486 queue_id: None,
487 event_type: "tls_cert_info".to_string(),
488 message: format!(
489 "Certificate info for {}[{}]:{}: subject_CN={}, issuer_CN={}",
490 hostname, ip, port, subject_cn, issuer_cn
491 ),
492 });
493 }
494
495 if let Some(captures) = TLS_CONNECTION_ESTABLISHED_REGEX.captures(message) {
497 let hostname = captures.get(1)?.as_str();
498 let ip = captures.get(2)?.as_str();
499 let port = captures.get(3)?.as_str();
500 let connection_details = captures.get(4)?.as_str();
501
502 return Some(SmtpEvent::Other {
503 queue_id: None,
504 event_type: "tls_connection_established".to_string(),
505 message: format!(
506 "TLS connection established to {}[{}]:{}: {}",
507 hostname, ip, port, connection_details
508 ),
509 });
510 }
511
512 None
513 }
514}
515
516impl ComponentParser for SmtpParser {
517 fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
518 if let Some(event) = self.parse_delivery_success(message) {
522 return Ok(ComponentEvent::Smtp(event));
523 }
524
525 if let Some(event) = self.parse_delivery_bounced(message) {
527 return Ok(ComponentEvent::Smtp(event));
528 }
529
530 if let Some(event) = self.parse_delivery_deferred(message) {
532 return Ok(ComponentEvent::Smtp(event));
533 }
534
535 if let Some(event) = self.parse_connection_timeout(message) {
537 return Ok(ComponentEvent::Smtp(event));
538 }
539
540 if let Some(event) = self.parse_connection_refused(message) {
542 return Ok(ComponentEvent::Smtp(event));
543 }
544
545 if let Some(event) = self.parse_connection_lost(message) {
547 return Ok(ComponentEvent::Smtp(event));
548 }
549
550 if let Some(event) = self.parse_protocol_interaction(message) {
552 return Ok(ComponentEvent::Smtp(event));
553 }
554
555 if let Some(event) = self.parse_debug_event(message) {
557 return Ok(ComponentEvent::Smtp(event));
558 }
559
560 if let Some(event) = self.parse_tls_event(message) {
562 return Ok(ComponentEvent::Smtp(event));
563 }
564
565 let queue_id = extract_queue_id(message);
567
568 Ok(ComponentEvent::Smtp(SmtpEvent::Other {
569 queue_id,
570 event_type: "unclassified".to_string(),
571 message: message.to_string(),
572 }))
573 }
574
575 fn component_name(&self) -> &'static str {
576 "smtp"
577 }
578}
579
580impl Default for SmtpParser {
581 fn default() -> Self {
582 Self::new()
583 }
584}