1use crate::components::ComponentParser;
51use crate::error::ParseError;
52use crate::events::smtp::SmtpEvent;
53use crate::events::ComponentEvent;
54use crate::utils::common_fields::CommonFieldsParser;
55use crate::utils::queue_id::{create_queue_id_pattern, extract_queue_id};
56use lazy_static::lazy_static;
57use regex::Regex;
58
59pub struct SmtpParser;
63
64lazy_static! {
65 static ref DELIVERY_SUCCESS_REGEX: Regex = Regex::new(
67 &create_queue_id_pattern(r"^{QUEUE_ID}: to=<([^>]+)>, relay=([^,]+), delay=([\d.]+), delays=([\d./]+), dsn=([\d.]+), status=sent \((.+)\)$")
68 ).unwrap();
69
70 static ref DELIVERY_BOUNCED_REGEX: Regex = Regex::new(
72 &create_queue_id_pattern(r"^{QUEUE_ID}: to=<([^>]+)>, relay=([^,]+), delay=([\d.]+), delays=([\d./]+), dsn=([\d.]+), status=bounced \((.+)\)$")
73 ).unwrap();
74
75 static ref DELIVERY_DEFERRED_REGEX: Regex = Regex::new(
77 &create_queue_id_pattern(r"^{QUEUE_ID}: (?:to=<([^>]+)>, )?(?:relay=([^,]+), )?(?:delay=([\d.]+), )?(?:delays=([\d./]+), )?(?:dsn=([\d.]+), )?status=deferred \((.+)\)$")
78 ).unwrap();
79
80 static ref CONNECTION_TIMEOUT_REGEX: Regex = Regex::new(
82 &create_queue_id_pattern(r"^{QUEUE_ID}: connect to ([^\[\]]+)\[([^\]]+)\]:(\d+): Connection timed out$")
83 ).unwrap();
84
85 static ref CONNECTION_REFUSED_REGEX: Regex = Regex::new(
87 &create_queue_id_pattern(r"^{QUEUE_ID}: connect to ([^\[\]]+)\[([^\]]+)\]:(\d+): Connection refused$")
88 ).unwrap();
89
90 static ref LOST_CONNECTION_REGEX: Regex = Regex::new(
92 &create_queue_id_pattern(r"^{QUEUE_ID}: lost connection with ([^\[\]]+)\[([^\]]+)\] while (.+)$")
93 ).unwrap();
94
95 static ref PROTOCOL_BREAKING_LINE_REGEX: Regex = Regex::new(
97 &create_queue_id_pattern(r"^{QUEUE_ID}: breaking line > (\d+) bytes with <CR><LF>SPACE$")
98 ).unwrap();
99
100 static ref PROTOCOL_PIPELINING_REGEX: Regex = Regex::new(
101 &create_queue_id_pattern(r"^{QUEUE_ID}: Using ESMTP PIPELINING, TCP send buffer size is (\d+), PIPELINING buffer size is (\d+)$")
102 ).unwrap();
103
104 static ref PROTOCOL_SERVER_FEATURES_REGEX: Regex = Regex::new(
105 r"^server features: (0x[0-9a-fA-F]+) size (\d+)$"
106 ).unwrap();
107
108 static ref DEBUG_VSTREAM_REGEX: Regex = Regex::new(
112 r"^vstream_buf_get_ready: fd (\d+) got (\d+)$"
113 ).unwrap();
114
115 static ref DEBUG_REC_GET_REGEX: Regex = Regex::new(
116 r"^rec_get: type ([A-Z]) len (\d+) data (.+)$"
117 ).unwrap();
118
119 static ref TLS_INIT_REGEX: Regex = Regex::new(
121 r"^initializing the client-side TLS engine$"
122 ).unwrap();
123
124 static ref TLS_SETUP_REGEX: Regex = Regex::new(
125 r"^setting up TLS connection to ([^\[\]]+)\[([^\]]+)\]:(\d+)$"
126 ).unwrap();
127
128 static ref TLS_CIPHER_REGEX: Regex = Regex::new(
129 r#"^([^\[\]]+)\[([^\]]+)\]:(\d+): TLS cipher list "(.+)"$"#
130 ).unwrap();
131
132 static ref SSL_CONNECT_REGEX: Regex = Regex::new(
133 r"^SSL_connect:(.+)$"
134 ).unwrap();
135
136 static ref TLS_CERT_VERIFY_REGEX: Regex = Regex::new(
138 r"^([^\[\]]+)\[([^\]]+)\]:(\d+): depth=(\d+) verify=(\d+) subject=(.+)$"
139 ).unwrap();
140
141 static ref TLS_CERT_INFO_REGEX: Regex = Regex::new(
142 r"^([^\[\]]+)\[([^\]]+)\]:(\d+): subject_CN=([^,]+), issuer_CN=([^,]+), fingerprint=([^,]+), pkey_fingerprint=(.+)$"
143 ).unwrap();
144
145 static ref TLS_CONNECTION_ESTABLISHED_REGEX: Regex = Regex::new(
146 r"^Untrusted TLS connection established to ([^\[\]]+)\[([^\]]+)\]:(\d+): (.+)$"
147 ).unwrap();
148}
149
150impl SmtpParser {
151 pub fn new() -> Self {
152 SmtpParser
153 }
154
155 fn parse_delays(
157 &self,
158 delays_str: &str,
159 ) -> (Option<f64>, Option<f64>, Option<f64>, Option<f64>) {
160 let parts: Vec<&str> = delays_str.split('/').collect();
161 if parts.len() != 4 {
162 return (None, None, None, None);
163 }
164
165 let before_queue = parts[0].parse::<f64>().ok();
166 let in_queue = parts[1].parse::<f64>().ok();
167 let connection = parts[2].parse::<f64>().ok();
168 let transmission = parts[3].parse::<f64>().ok();
169
170 (before_queue, in_queue, connection, transmission)
171 }
172
173 fn parse_relay(&self, relay_str: &str) -> (String, Option<String>, Option<u16>) {
175 if relay_str == "none" {
176 return ("none".to_string(), None, None);
177 }
178
179 let full_relay = format!("relay={}", relay_str);
181 if let Some(relay_info) = CommonFieldsParser::extract_relay_info(&full_relay) {
182 return (relay_info.hostname, relay_info.ip, relay_info.port);
183 }
184
185 (relay_str.to_string(), None, None)
187 }
188
189 fn parse_delivery_success(&self, message: &str) -> Option<SmtpEvent> {
191 if let Some(captures) = DELIVERY_SUCCESS_REGEX.captures(message) {
192 let queue_id = captures.get(1)?.as_str().to_string();
193 let to = captures.get(2)?.as_str().to_string();
194 let relay_str = captures.get(3)?.as_str();
195 let delay: f64 = captures.get(4)?.as_str().parse().ok()?;
196 let delays_str = captures.get(5)?.as_str();
197 let dsn = Some(captures.get(6)?.as_str().to_string());
198 let status = captures.get(7)?.as_str().to_string();
199
200 let (relay_hostname, relay_ip, relay_port) = self.parse_relay(relay_str);
201 let (delay_before_queue, delay_in_queue, delay_connection, delay_transmission) =
202 self.parse_delays(delays_str);
203
204 return Some(SmtpEvent::Sent {
205 queue_id,
206 to,
207 relay_hostname,
208 relay_ip,
209 relay_port,
210 delay,
211 delay_before_queue,
212 delay_in_queue,
213 delay_connection,
214 delay_transmission,
215 dsn,
216 status,
217 message_size: None,
218 });
219 }
220 None
221 }
222
223 fn parse_delivery_bounced(&self, message: &str) -> Option<SmtpEvent> {
225 if let Some(captures) = DELIVERY_BOUNCED_REGEX.captures(message) {
226 let queue_id = captures.get(1)?.as_str().to_string();
227 let to = captures.get(2)?.as_str().to_string();
228 let relay_str = captures.get(3)?.as_str();
229 let delay: f64 = captures.get(4)?.as_str().parse().ok()?;
230 let delays_str = captures.get(5)?.as_str();
231 let dsn = Some(captures.get(6)?.as_str().to_string());
232 let bounce_reason = captures.get(7)?.as_str().to_string();
233
234 let (relay_hostname, relay_ip, relay_port) = self.parse_relay(relay_str);
235 let (delay_before_queue, delay_in_queue, delay_connection, delay_transmission) =
236 self.parse_delays(delays_str);
237
238 return Some(SmtpEvent::Bounced {
239 queue_id,
240 to,
241 relay_hostname: if relay_hostname == "none" {
242 None
243 } else {
244 Some(relay_hostname)
245 },
246 relay_ip,
247 relay_port,
248 delay: Some(delay),
249 delay_before_queue,
250 delay_in_queue,
251 delay_connection,
252 delay_transmission,
253 dsn,
254 status: "bounced".to_string(),
255 bounce_reason,
256 });
257 }
258 None
259 }
260
261 fn parse_delivery_deferred(&self, message: &str) -> Option<SmtpEvent> {
263 if let Some(captures) = DELIVERY_DEFERRED_REGEX.captures(message) {
264 let queue_id = captures.get(1)?.as_str().to_string();
265 let to = captures.get(2).map(|m| m.as_str().to_string());
266 let relay_str = captures.get(3).map(|m| m.as_str()).unwrap_or("none");
267 let delay = captures.get(4).and_then(|m| m.as_str().parse::<f64>().ok());
268 let delays_str = captures.get(5).map(|m| m.as_str()).unwrap_or("0/0/0/0");
269 let dsn = captures.get(6).map(|m| m.as_str().to_string());
270 let defer_reason = captures.get(7)?.as_str().to_string();
271
272 let (relay_hostname, relay_ip, relay_port) = self.parse_relay(relay_str);
273 let (delay_before_queue, delay_in_queue, delay_connection, delay_transmission) =
274 self.parse_delays(delays_str);
275
276 return Some(SmtpEvent::Deferred {
277 queue_id,
278 to,
279 relay_hostname: if relay_hostname == "none" {
280 None
281 } else {
282 Some(relay_hostname)
283 },
284 relay_ip,
285 relay_port,
286 delay,
287 delay_before_queue,
288 delay_in_queue,
289 delay_connection,
290 delay_transmission,
291 dsn,
292 status: "deferred".to_string(),
293 defer_reason,
294 });
295 }
296 None
297 }
298
299 fn parse_connection_timeout(&self, message: &str) -> Option<SmtpEvent> {
301 if let Some(captures) = CONNECTION_TIMEOUT_REGEX.captures(message) {
302 let queue_id = captures.get(1)?.as_str().to_string();
303 let target_hostname = captures.get(2)?.as_str().to_string();
304 let target_ip = captures.get(3)?.as_str().to_string();
305 let target_port: u16 = captures.get(4)?.as_str().parse().ok()?;
306
307 return Some(SmtpEvent::ConnectionTimeout {
308 queue_id,
309 target_hostname,
310 target_ip,
311 target_port,
312 timeout_duration: None,
313 });
314 }
315 None
316 }
317
318 fn parse_connection_refused(&self, message: &str) -> Option<SmtpEvent> {
320 if let Some(captures) = CONNECTION_REFUSED_REGEX.captures(message) {
321 let queue_id = captures.get(1)?.as_str().to_string();
322 let target_hostname = captures.get(2)?.as_str().to_string();
323 let target_ip = captures.get(3)?.as_str().to_string();
324 let target_port: u16 = captures.get(4)?.as_str().parse().ok()?;
325
326 return Some(SmtpEvent::ConnectionRefused {
327 queue_id,
328 target_hostname,
329 target_ip,
330 target_port,
331 });
332 }
333 None
334 }
335
336 fn parse_connection_lost(&self, message: &str) -> Option<SmtpEvent> {
338 if let Some(captures) = LOST_CONNECTION_REGEX.captures(message) {
339 let queue_id = captures.get(1)?.as_str().to_string();
340 let target_hostname = captures.get(2)?.as_str().to_string();
341 let target_ip = captures.get(3)?.as_str().to_string();
342 let lost_stage = captures.get(4)?.as_str().to_string();
343
344 return Some(SmtpEvent::ConnectionLost {
345 queue_id,
346 target_hostname,
347 target_ip,
348 lost_stage,
349 });
350 }
351 None
352 }
353
354 fn parse_protocol_interaction(&self, message: &str) -> Option<SmtpEvent> {
356 if let Some(captures) = PROTOCOL_BREAKING_LINE_REGEX.captures(message) {
358 let queue_id = captures.get(1)?.as_str().to_string();
359 let line_length = captures.get(2)?.as_str();
360
361 return Some(SmtpEvent::ProtocolInteraction {
362 queue_id,
363 interaction_type: "breaking_line".to_string(),
364 details: format!("Breaking line > {} bytes with <CR><LF>SPACE", line_length),
365 });
366 }
367
368 if let Some(captures) = PROTOCOL_PIPELINING_REGEX.captures(message) {
370 let queue_id = captures.get(1)?.as_str().to_string();
371 let send_buffer = captures.get(2)?.as_str();
372 let pipeline_buffer = captures.get(3)?.as_str();
373
374 return Some(SmtpEvent::ProtocolInteraction {
375 queue_id,
376 interaction_type: "esmtp_pipelining".to_string(),
377 details: format!(
378 "TCP send buffer: {}, PIPELINING buffer: {}",
379 send_buffer, pipeline_buffer
380 ),
381 });
382 }
383
384 if let Some(captures) = PROTOCOL_SERVER_FEATURES_REGEX.captures(message) {
386 let features = captures.get(1)?.as_str();
387 let max_size = captures.get(2)?.as_str();
388
389 return Some(SmtpEvent::ProtocolInteraction {
390 queue_id: "".to_string(), interaction_type: "server_features".to_string(),
392 details: format!("Features: {}, Max size: {}", features, max_size),
393 });
394 }
395
396 None
397 }
398
399 fn parse_debug_event(&self, message: &str) -> Option<SmtpEvent> {
401 if let Some(captures) = DEBUG_VSTREAM_REGEX.captures(message) {
403 let fd = captures.get(1)?.as_str();
404 let bytes = captures.get(2)?.as_str();
405
406 return Some(SmtpEvent::Other {
407 queue_id: None,
408 event_type: "debug_vstream".to_string(),
409 message: format!("vstream_buf_get_ready: fd {} got {} bytes", fd, bytes),
410 });
411 }
412
413 if let Some(captures) = DEBUG_REC_GET_REGEX.captures(message) {
415 let record_type = captures.get(1)?.as_str();
416 let length = captures.get(2)?.as_str();
417 let data = captures.get(3)?.as_str();
418
419 return Some(SmtpEvent::Other {
420 queue_id: None,
421 event_type: "debug_rec_get".to_string(),
422 message: format!("rec_get: type {} len {} data {}", record_type, length, data),
423 });
424 }
425
426 None
427 }
428
429 fn parse_tls_event(&self, message: &str) -> Option<SmtpEvent> {
431 if TLS_INIT_REGEX.is_match(message) {
433 return Some(SmtpEvent::Other {
434 queue_id: None,
435 event_type: "tls_init".to_string(),
436 message: "Initializing client-side TLS engine".to_string(),
437 });
438 }
439
440 if let Some(captures) = TLS_SETUP_REGEX.captures(message) {
442 let hostname = captures.get(1)?.as_str();
443 let ip = captures.get(2)?.as_str();
444 let port = captures.get(3)?.as_str();
445
446 return Some(SmtpEvent::Other {
447 queue_id: None,
448 event_type: "tls_setup".to_string(),
449 message: format!("Setting up TLS connection to {}[{}]:{}", hostname, ip, port),
450 });
451 }
452
453 if let Some(captures) = TLS_CIPHER_REGEX.captures(message) {
455 let hostname = captures.get(1)?.as_str();
456 let ip = captures.get(2)?.as_str();
457 let port = captures.get(3)?.as_str();
458 let cipher_list = captures.get(4)?.as_str();
459
460 return Some(SmtpEvent::Other {
461 queue_id: None,
462 event_type: "tls_cipher".to_string(),
463 message: format!(
464 "TLS cipher list for {}[{}]:{}: {}",
465 hostname, ip, port, cipher_list
466 ),
467 });
468 }
469
470 if let Some(captures) = SSL_CONNECT_REGEX.captures(message) {
472 let state = captures.get(1)?.as_str();
473
474 return Some(SmtpEvent::Other {
475 queue_id: None,
476 event_type: "ssl_connect".to_string(),
477 message: format!("SSL_connect:{}", state),
478 });
479 }
480
481 if let Some(captures) = TLS_CERT_VERIFY_REGEX.captures(message) {
483 let hostname = captures.get(1)?.as_str();
484 let ip = captures.get(2)?.as_str();
485 let port = captures.get(3)?.as_str();
486 let depth = captures.get(4)?.as_str();
487 let verify = captures.get(5)?.as_str();
488 let subject = captures.get(6)?.as_str();
489
490 return Some(SmtpEvent::Other {
491 queue_id: None,
492 event_type: "tls_cert_verify".to_string(),
493 message: format!(
494 "Certificate verification for {}[{}]:{}: depth={}, verify={}, subject={}",
495 hostname, ip, port, depth, verify, subject
496 ),
497 });
498 }
499
500 if let Some(captures) = TLS_CERT_INFO_REGEX.captures(message) {
502 let hostname = captures.get(1)?.as_str();
503 let ip = captures.get(2)?.as_str();
504 let port = captures.get(3)?.as_str();
505 let subject_cn = captures.get(4)?.as_str();
506 let issuer_cn = captures.get(5)?.as_str();
507
508 return Some(SmtpEvent::Other {
509 queue_id: None,
510 event_type: "tls_cert_info".to_string(),
511 message: format!(
512 "Certificate info for {}[{}]:{}: subject_CN={}, issuer_CN={}",
513 hostname, ip, port, subject_cn, issuer_cn
514 ),
515 });
516 }
517
518 if let Some(captures) = TLS_CONNECTION_ESTABLISHED_REGEX.captures(message) {
520 let hostname = captures.get(1)?.as_str();
521 let ip = captures.get(2)?.as_str();
522 let port = captures.get(3)?.as_str();
523 let connection_details = captures.get(4)?.as_str();
524
525 return Some(SmtpEvent::Other {
526 queue_id: None,
527 event_type: "tls_connection_established".to_string(),
528 message: format!(
529 "TLS connection established to {}[{}]:{}: {}",
530 hostname, ip, port, connection_details
531 ),
532 });
533 }
534
535 None
536 }
537}
538
539impl ComponentParser for SmtpParser {
540 fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
541 if let Some(event) = self.parse_delivery_success(message) {
545 return Ok(ComponentEvent::Smtp(event));
546 }
547
548 if let Some(event) = self.parse_delivery_bounced(message) {
550 return Ok(ComponentEvent::Smtp(event));
551 }
552
553 if let Some(event) = self.parse_delivery_deferred(message) {
555 return Ok(ComponentEvent::Smtp(event));
556 }
557
558 if let Some(event) = self.parse_connection_timeout(message) {
560 return Ok(ComponentEvent::Smtp(event));
561 }
562
563 if let Some(event) = self.parse_connection_refused(message) {
565 return Ok(ComponentEvent::Smtp(event));
566 }
567
568 if let Some(event) = self.parse_connection_lost(message) {
570 return Ok(ComponentEvent::Smtp(event));
571 }
572
573 if let Some(event) = self.parse_protocol_interaction(message) {
575 return Ok(ComponentEvent::Smtp(event));
576 }
577
578 if let Some(event) = self.parse_debug_event(message) {
580 return Ok(ComponentEvent::Smtp(event));
581 }
582
583 if let Some(event) = self.parse_tls_event(message) {
585 return Ok(ComponentEvent::Smtp(event));
586 }
587
588 let queue_id = extract_queue_id(message);
590
591 Ok(ComponentEvent::Smtp(SmtpEvent::Other {
592 queue_id,
593 event_type: "unclassified".to_string(),
594 message: message.to_string(),
595 }))
596 }
597
598 fn component_name(&self) -> &'static str {
599 "smtp"
600 }
601}
602
603impl Default for SmtpParser {
604 fn default() -> Self {
605 Self::new()
606 }
607}