use crate::components::ComponentParser;
use crate::error::ParseError;
use crate::events::smtp::SmtpEvent;
use crate::events::ComponentEvent;
use crate::utils::common_fields::CommonFieldsParser;
use crate::utils::queue_id::{create_queue_id_pattern, extract_queue_id};
use lazy_static::lazy_static;
use regex::Regex;
pub struct SmtpParser;
lazy_static! {
static ref DELIVERY_SUCCESS_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: to=<([^>]+)>, relay=([^,]+), delay=([\d.]+), delays=([\d./]+), dsn=([\d.]+), status=sent \((.+)\)$")
).unwrap();
static ref DELIVERY_BOUNCED_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: to=<([^>]+)>, relay=([^,]+), delay=([\d.]+), delays=([\d./]+), dsn=([\d.]+), status=bounced \((.+)\)$")
).unwrap();
static ref DELIVERY_DEFERRED_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: (?:to=<([^>]+)>, )?(?:relay=([^,]+), )?(?:delay=([\d.]+), )?(?:delays=([\d./]+), )?(?:dsn=([\d.]+), )?status=deferred \((.+)\)$")
).unwrap();
static ref CONNECTION_TIMEOUT_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: connect to ([^\[\]]+)\[([^\]]+)\]:(\d+): Connection timed out$")
).unwrap();
static ref CONNECTION_REFUSED_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: connect to ([^\[\]]+)\[([^\]]+)\]:(\d+): Connection refused$")
).unwrap();
static ref LOST_CONNECTION_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: lost connection with ([^\[\]]+)\[([^\]]+)\] while (.+)$")
).unwrap();
static ref PROTOCOL_BREAKING_LINE_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: breaking line > (\d+) bytes with <CR><LF>SPACE$")
).unwrap();
static ref PROTOCOL_PIPELINING_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: Using ESMTP PIPELINING, TCP send buffer size is (\d+), PIPELINING buffer size is (\d+)$")
).unwrap();
static ref PROTOCOL_SERVER_FEATURES_REGEX: Regex = Regex::new(
r"^server features: (0x[0-9a-fA-F]+) size (\d+)$"
).unwrap();
static ref DEBUG_VSTREAM_REGEX: Regex = Regex::new(
r"^vstream_buf_get_ready: fd (\d+) got (\d+)$"
).unwrap();
static ref DEBUG_REC_GET_REGEX: Regex = Regex::new(
r"^rec_get: type ([A-Z]) len (\d+) data (.+)$"
).unwrap();
static ref TLS_INIT_REGEX: Regex = Regex::new(
r"^initializing the client-side TLS engine$"
).unwrap();
static ref TLS_SETUP_REGEX: Regex = Regex::new(
r"^setting up TLS connection to ([^\[\]]+)\[([^\]]+)\]:(\d+)$"
).unwrap();
static ref TLS_CIPHER_REGEX: Regex = Regex::new(
r#"^([^\[\]]+)\[([^\]]+)\]:(\d+): TLS cipher list "(.+)"$"#
).unwrap();
static ref SSL_CONNECT_REGEX: Regex = Regex::new(
r"^SSL_connect:(.+)$"
).unwrap();
static ref TLS_CERT_VERIFY_REGEX: Regex = Regex::new(
r"^([^\[\]]+)\[([^\]]+)\]:(\d+): depth=(\d+) verify=(\d+) subject=(.+)$"
).unwrap();
static ref TLS_CERT_INFO_REGEX: Regex = Regex::new(
r"^([^\[\]]+)\[([^\]]+)\]:(\d+): subject_CN=([^,]+), issuer_CN=([^,]+), fingerprint=([^,]+), pkey_fingerprint=(.+)$"
).unwrap();
static ref TLS_CONNECTION_ESTABLISHED_REGEX: Regex = Regex::new(
r"^Untrusted TLS connection established to ([^\[\]]+)\[([^\]]+)\]:(\d+): (.+)$"
).unwrap();
}
impl SmtpParser {
pub fn new() -> Self {
SmtpParser
}
fn parse_delays(
&self,
delays_str: &str,
) -> (Option<f64>, Option<f64>, Option<f64>, Option<f64>) {
let parts: Vec<&str> = delays_str.split('/').collect();
if parts.len() != 4 {
return (None, None, None, None);
}
let before_queue = parts[0].parse::<f64>().ok();
let in_queue = parts[1].parse::<f64>().ok();
let connection = parts[2].parse::<f64>().ok();
let transmission = parts[3].parse::<f64>().ok();
(before_queue, in_queue, connection, transmission)
}
fn parse_relay(&self, relay_str: &str) -> (String, Option<String>, Option<u16>) {
if relay_str == "none" {
return ("none".to_string(), None, None);
}
let full_relay = format!("relay={}", relay_str);
if let Some(relay_info) = CommonFieldsParser::extract_relay_info(&full_relay) {
return (relay_info.hostname, relay_info.ip, relay_info.port);
}
(relay_str.to_string(), None, None)
}
fn parse_delivery_success(&self, message: &str) -> Option<SmtpEvent> {
if let Some(captures) = DELIVERY_SUCCESS_REGEX.captures(message) {
let queue_id = captures.get(1)?.as_str().to_string();
let to = captures.get(2)?.as_str().to_string();
let relay_str = captures.get(3)?.as_str();
let delay: f64 = captures.get(4)?.as_str().parse().ok()?;
let delays_str = captures.get(5)?.as_str();
let dsn = Some(captures.get(6)?.as_str().to_string());
let status = captures.get(7)?.as_str().to_string();
let (relay_hostname, relay_ip, relay_port) = self.parse_relay(relay_str);
let (delay_before_queue, delay_in_queue, delay_connection, delay_transmission) =
self.parse_delays(delays_str);
return Some(SmtpEvent::Sent {
queue_id,
to,
relay_hostname,
relay_ip,
relay_port,
delay,
delay_before_queue,
delay_in_queue,
delay_connection,
delay_transmission,
dsn,
status,
message_size: None,
});
}
None
}
fn parse_delivery_bounced(&self, message: &str) -> Option<SmtpEvent> {
if let Some(captures) = DELIVERY_BOUNCED_REGEX.captures(message) {
let queue_id = captures.get(1)?.as_str().to_string();
let to = captures.get(2)?.as_str().to_string();
let relay_str = captures.get(3)?.as_str();
let delay: f64 = captures.get(4)?.as_str().parse().ok()?;
let delays_str = captures.get(5)?.as_str();
let dsn = Some(captures.get(6)?.as_str().to_string());
let bounce_reason = captures.get(7)?.as_str().to_string();
let (relay_hostname, relay_ip, relay_port) = self.parse_relay(relay_str);
let (delay_before_queue, delay_in_queue, delay_connection, delay_transmission) =
self.parse_delays(delays_str);
return Some(SmtpEvent::Bounced {
queue_id,
to,
relay_hostname: if relay_hostname == "none" {
None
} else {
Some(relay_hostname)
},
relay_ip,
relay_port,
delay: Some(delay),
delay_before_queue,
delay_in_queue,
delay_connection,
delay_transmission,
dsn,
status: "bounced".to_string(),
bounce_reason,
});
}
None
}
fn parse_delivery_deferred(&self, message: &str) -> Option<SmtpEvent> {
if let Some(captures) = DELIVERY_DEFERRED_REGEX.captures(message) {
let queue_id = captures.get(1)?.as_str().to_string();
let to = captures.get(2).map(|m| m.as_str().to_string());
let relay_str = captures.get(3).map(|m| m.as_str()).unwrap_or("none");
let delay = captures.get(4).and_then(|m| m.as_str().parse::<f64>().ok());
let delays_str = captures.get(5).map(|m| m.as_str()).unwrap_or("0/0/0/0");
let dsn = captures.get(6).map(|m| m.as_str().to_string());
let defer_reason = captures.get(7)?.as_str().to_string();
let (relay_hostname, relay_ip, relay_port) = self.parse_relay(relay_str);
let (delay_before_queue, delay_in_queue, delay_connection, delay_transmission) =
self.parse_delays(delays_str);
return Some(SmtpEvent::Deferred {
queue_id,
to,
relay_hostname: if relay_hostname == "none" {
None
} else {
Some(relay_hostname)
},
relay_ip,
relay_port,
delay,
delay_before_queue,
delay_in_queue,
delay_connection,
delay_transmission,
dsn,
status: "deferred".to_string(),
defer_reason,
});
}
None
}
fn parse_connection_timeout(&self, message: &str) -> Option<SmtpEvent> {
if let Some(captures) = CONNECTION_TIMEOUT_REGEX.captures(message) {
let queue_id = captures.get(1)?.as_str().to_string();
let target_hostname = captures.get(2)?.as_str().to_string();
let target_ip = captures.get(3)?.as_str().to_string();
let target_port: u16 = captures.get(4)?.as_str().parse().ok()?;
return Some(SmtpEvent::ConnectionTimeout {
queue_id,
target_hostname,
target_ip,
target_port,
timeout_duration: None,
});
}
None
}
fn parse_connection_refused(&self, message: &str) -> Option<SmtpEvent> {
if let Some(captures) = CONNECTION_REFUSED_REGEX.captures(message) {
let queue_id = captures.get(1)?.as_str().to_string();
let target_hostname = captures.get(2)?.as_str().to_string();
let target_ip = captures.get(3)?.as_str().to_string();
let target_port: u16 = captures.get(4)?.as_str().parse().ok()?;
return Some(SmtpEvent::ConnectionRefused {
queue_id,
target_hostname,
target_ip,
target_port,
});
}
None
}
fn parse_connection_lost(&self, message: &str) -> Option<SmtpEvent> {
if let Some(captures) = LOST_CONNECTION_REGEX.captures(message) {
let queue_id = captures.get(1)?.as_str().to_string();
let target_hostname = captures.get(2)?.as_str().to_string();
let target_ip = captures.get(3)?.as_str().to_string();
let lost_stage = captures.get(4)?.as_str().to_string();
return Some(SmtpEvent::ConnectionLost {
queue_id,
target_hostname,
target_ip,
lost_stage,
});
}
None
}
fn parse_protocol_interaction(&self, message: &str) -> Option<SmtpEvent> {
if let Some(captures) = PROTOCOL_BREAKING_LINE_REGEX.captures(message) {
let queue_id = captures.get(1)?.as_str().to_string();
let line_length = captures.get(2)?.as_str();
return Some(SmtpEvent::ProtocolInteraction {
queue_id,
interaction_type: "breaking_line".to_string(),
details: format!("Breaking line > {} bytes with <CR><LF>SPACE", line_length),
});
}
if let Some(captures) = PROTOCOL_PIPELINING_REGEX.captures(message) {
let queue_id = captures.get(1)?.as_str().to_string();
let send_buffer = captures.get(2)?.as_str();
let pipeline_buffer = captures.get(3)?.as_str();
return Some(SmtpEvent::ProtocolInteraction {
queue_id,
interaction_type: "esmtp_pipelining".to_string(),
details: format!(
"TCP send buffer: {}, PIPELINING buffer: {}",
send_buffer, pipeline_buffer
),
});
}
if let Some(captures) = PROTOCOL_SERVER_FEATURES_REGEX.captures(message) {
let features = captures.get(1)?.as_str();
let max_size = captures.get(2)?.as_str();
return Some(SmtpEvent::ProtocolInteraction {
queue_id: "".to_string(), interaction_type: "server_features".to_string(),
details: format!("Features: {}, Max size: {}", features, max_size),
});
}
None
}
fn parse_debug_event(&self, message: &str) -> Option<SmtpEvent> {
if let Some(captures) = DEBUG_VSTREAM_REGEX.captures(message) {
let fd = captures.get(1)?.as_str();
let bytes = captures.get(2)?.as_str();
return Some(SmtpEvent::Other {
queue_id: None,
event_type: "debug_vstream".to_string(),
message: format!("vstream_buf_get_ready: fd {} got {} bytes", fd, bytes),
});
}
if let Some(captures) = DEBUG_REC_GET_REGEX.captures(message) {
let record_type = captures.get(1)?.as_str();
let length = captures.get(2)?.as_str();
let data = captures.get(3)?.as_str();
return Some(SmtpEvent::Other {
queue_id: None,
event_type: "debug_rec_get".to_string(),
message: format!("rec_get: type {} len {} data {}", record_type, length, data),
});
}
None
}
fn parse_tls_event(&self, message: &str) -> Option<SmtpEvent> {
if TLS_INIT_REGEX.is_match(message) {
return Some(SmtpEvent::Other {
queue_id: None,
event_type: "tls_init".to_string(),
message: "Initializing client-side TLS engine".to_string(),
});
}
if let Some(captures) = TLS_SETUP_REGEX.captures(message) {
let hostname = captures.get(1)?.as_str();
let ip = captures.get(2)?.as_str();
let port = captures.get(3)?.as_str();
return Some(SmtpEvent::Other {
queue_id: None,
event_type: "tls_setup".to_string(),
message: format!("Setting up TLS connection to {}[{}]:{}", hostname, ip, port),
});
}
if let Some(captures) = TLS_CIPHER_REGEX.captures(message) {
let hostname = captures.get(1)?.as_str();
let ip = captures.get(2)?.as_str();
let port = captures.get(3)?.as_str();
let cipher_list = captures.get(4)?.as_str();
return Some(SmtpEvent::Other {
queue_id: None,
event_type: "tls_cipher".to_string(),
message: format!(
"TLS cipher list for {}[{}]:{}: {}",
hostname, ip, port, cipher_list
),
});
}
if let Some(captures) = SSL_CONNECT_REGEX.captures(message) {
let state = captures.get(1)?.as_str();
return Some(SmtpEvent::Other {
queue_id: None,
event_type: "ssl_connect".to_string(),
message: format!("SSL_connect:{}", state),
});
}
if let Some(captures) = TLS_CERT_VERIFY_REGEX.captures(message) {
let hostname = captures.get(1)?.as_str();
let ip = captures.get(2)?.as_str();
let port = captures.get(3)?.as_str();
let depth = captures.get(4)?.as_str();
let verify = captures.get(5)?.as_str();
let subject = captures.get(6)?.as_str();
return Some(SmtpEvent::Other {
queue_id: None,
event_type: "tls_cert_verify".to_string(),
message: format!(
"Certificate verification for {}[{}]:{}: depth={}, verify={}, subject={}",
hostname, ip, port, depth, verify, subject
),
});
}
if let Some(captures) = TLS_CERT_INFO_REGEX.captures(message) {
let hostname = captures.get(1)?.as_str();
let ip = captures.get(2)?.as_str();
let port = captures.get(3)?.as_str();
let subject_cn = captures.get(4)?.as_str();
let issuer_cn = captures.get(5)?.as_str();
return Some(SmtpEvent::Other {
queue_id: None,
event_type: "tls_cert_info".to_string(),
message: format!(
"Certificate info for {}[{}]:{}: subject_CN={}, issuer_CN={}",
hostname, ip, port, subject_cn, issuer_cn
),
});
}
if let Some(captures) = TLS_CONNECTION_ESTABLISHED_REGEX.captures(message) {
let hostname = captures.get(1)?.as_str();
let ip = captures.get(2)?.as_str();
let port = captures.get(3)?.as_str();
let connection_details = captures.get(4)?.as_str();
return Some(SmtpEvent::Other {
queue_id: None,
event_type: "tls_connection_established".to_string(),
message: format!(
"TLS connection established to {}[{}]:{}: {}",
hostname, ip, port, connection_details
),
});
}
None
}
}
impl ComponentParser for SmtpParser {
fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
if let Some(event) = self.parse_delivery_success(message) {
return Ok(ComponentEvent::Smtp(event));
}
if let Some(event) = self.parse_delivery_bounced(message) {
return Ok(ComponentEvent::Smtp(event));
}
if let Some(event) = self.parse_delivery_deferred(message) {
return Ok(ComponentEvent::Smtp(event));
}
if let Some(event) = self.parse_connection_timeout(message) {
return Ok(ComponentEvent::Smtp(event));
}
if let Some(event) = self.parse_connection_refused(message) {
return Ok(ComponentEvent::Smtp(event));
}
if let Some(event) = self.parse_connection_lost(message) {
return Ok(ComponentEvent::Smtp(event));
}
if let Some(event) = self.parse_protocol_interaction(message) {
return Ok(ComponentEvent::Smtp(event));
}
if let Some(event) = self.parse_debug_event(message) {
return Ok(ComponentEvent::Smtp(event));
}
if let Some(event) = self.parse_tls_event(message) {
return Ok(ComponentEvent::Smtp(event));
}
let queue_id = extract_queue_id(message);
Ok(ComponentEvent::Smtp(SmtpEvent::Other {
queue_id,
event_type: "unclassified".to_string(),
message: message.to_string(),
}))
}
fn component_name(&self) -> &'static str {
"smtp"
}
}
impl Default for SmtpParser {
fn default() -> Self {
Self::new()
}
}