use crate::components::ComponentParser;
use crate::error::ParseError;
use crate::events::cleanup::CleanupEvent;
use crate::events::ComponentEvent;
use crate::utils::common_fields::CommonFieldsParser;
use crate::utils::queue_id::create_queue_id_pattern;
use lazy_static::lazy_static;
use regex::Regex;
pub struct CleanupParser;
lazy_static! {
static ref MESSAGE_ID_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: message-id=(?:<([^>]+)>|([^,\s]+))$")
).unwrap();
static ref QUEUE_FILE_WARNING_REGEX: Regex = Regex::new(
r"^([^:]+): create file ([^:]+): (.+)$"
).unwrap();
static ref MESSAGE_SIZE_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: size=(\d+)$")
).unwrap();
static ref HEADER_PROCESSING_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: header ([^:]+): (.+)$")
).unwrap();
static ref ADDRESS_REWRITE_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: (from|to)=<([^>]+)> -> <([^>]+)>$")
).unwrap();
static ref MESSAGE_REWRITE_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: rewrite: (.+)$")
).unwrap();
static ref FILTER_ACTION_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: filter ([^:]+): (.+)$")
).unwrap();
static ref MILTER_INTERACTION_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: milter ([^:]+): (.+)$")
).unwrap();
static ref MESSAGE_REJECT_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: reject: (.+)$")
).unwrap();
static ref CONFIG_WARNING_REGEX: Regex = Regex::new(
r"^warning: (.+)$"
).unwrap();
static ref RESOURCE_LIMIT_REGEX: Regex = Regex::new(
r"^warning: ([^:]+): (.+)$"
).unwrap();
static ref MESSAGE_HOLD_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: hold: (.+)$")
).unwrap();
static ref MESSAGE_DISCARD_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: discard: (.+)$")
).unwrap();
static ref MESSAGE_REMOVED_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}: removed \(([^)]+)\)(?:\s*(.*))?$")
).unwrap();
static ref STATISTICS_REGEX: Regex = Regex::new(
r"^statistics: processed=(\d+) rejected=(\d+)(?:\s+errors=(\d+))?$"
).unwrap();
static ref SNOWFLAKE_INIT_REGEX: Regex = Regex::new(
r"^snowflake: initialized with node_id=(\d+), node_bits=(\d+), seq_bits=(\d+)$"
).unwrap();
static ref HOLD_CLIENT_REGEX: Regex = Regex::new(r"from ([^:;\s]+\[[^\]]+\]:?\d*)").unwrap();
}
impl CleanupParser {
pub fn new() -> Self {
CleanupParser
}
fn parse_message_id(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = MESSAGE_ID_REGEX.captures(message) {
let queue_id = captures.get(1)?.as_str().to_string();
let message_id = if let Some(bracketed) = captures.get(2) {
bracketed.as_str().to_string()
}
else if let Some(unbracketed) = captures.get(3) {
unbracketed.as_str().to_string()
} else {
return None;
};
return Some(CleanupEvent::MessageId {
queue_id,
message_id,
});
}
None
}
fn parse_queue_file_warning(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = QUEUE_FILE_WARNING_REGEX.captures(message) {
return Some(CleanupEvent::QueueFileWarning {
operation: captures.get(1)?.as_str().to_string(),
file_path: captures.get(2)?.as_str().to_string(),
error_reason: captures.get(3)?.as_str().to_string(),
});
}
None
}
fn parse_message_size(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = MESSAGE_SIZE_REGEX.captures(message) {
if let Ok(size) = captures.get(2)?.as_str().parse::<u64>() {
return Some(CleanupEvent::MessageSize {
queue_id: captures.get(1)?.as_str().to_string(),
size,
});
}
}
None
}
fn parse_header_processing(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = HEADER_PROCESSING_REGEX.captures(message) {
return Some(CleanupEvent::HeaderProcessing {
queue_id: captures.get(1)?.as_str().to_string(),
header_name: captures.get(2)?.as_str().to_string(),
header_value: captures.get(3)?.as_str().to_string(),
action: "process".to_string(), });
}
None
}
fn parse_address_rewrite(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = ADDRESS_REWRITE_REGEX.captures(message) {
return Some(CleanupEvent::AddressRewrite {
queue_id: captures.get(1)?.as_str().to_string(),
address_type: captures.get(2)?.as_str().to_string(),
original_address: captures.get(3)?.as_str().to_string(),
rewritten_address: captures.get(4)?.as_str().to_string(),
});
}
None
}
fn parse_message_rewrite(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = MESSAGE_REWRITE_REGEX.captures(message) {
return Some(CleanupEvent::MessageRewrite {
queue_id: captures.get(1)?.as_str().to_string(),
rewrite_type: "content".to_string(),
original: "".to_string(), rewritten: captures.get(2)?.as_str().to_string(),
});
}
None
}
fn parse_filter_action(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = FILTER_ACTION_REGEX.captures(message) {
return Some(CleanupEvent::FilterAction {
queue_id: captures.get(1)?.as_str().to_string(),
filter_name: captures.get(2)?.as_str().to_string(),
action: captures.get(3)?.as_str().to_string(),
details: None,
});
}
None
}
fn parse_milter_interaction(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = MILTER_INTERACTION_REGEX.captures(message) {
return Some(CleanupEvent::MilterInteraction {
queue_id: captures.get(1)?.as_str().to_string(),
milter_name: captures.get(2)?.as_str().to_string(),
command: "interaction".to_string(),
response: Some(captures.get(3)?.as_str().to_string()),
});
}
None
}
fn parse_message_reject(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = MESSAGE_REJECT_REGEX.captures(message) {
return Some(CleanupEvent::MessageReject {
queue_id: captures.get(1)?.as_str().to_string(),
reason: captures.get(2)?.as_str().to_string(),
action: "reject".to_string(),
});
}
None
}
fn parse_config_warning(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = CONFIG_WARNING_REGEX.captures(message) {
let warning_msg = captures.get(1)?.as_str();
if warning_msg.contains("disk")
|| warning_msg.contains("memory")
|| warning_msg.contains("queue")
|| warning_msg.contains("limit")
{
return Some(CleanupEvent::ResourceLimit {
resource_type: "unknown".to_string(),
limit_details: warning_msg.to_string(),
current_value: None,
limit_value: None,
});
}
return Some(CleanupEvent::ConfigurationWarning {
warning_type: "cleanup_config".to_string(),
message: warning_msg.to_string(),
});
}
None
}
fn parse_message_hold(&self, message: &str) -> Option<CleanupEvent> {
let captures = MESSAGE_HOLD_REGEX.captures(message)?;
let queue_id = captures.get(1)?.as_str().to_string();
let hold_details = captures.get(2)?.as_str();
let hold_reason = self.extract_hold_reason(hold_details);
let sender =
CommonFieldsParser::extract_from_email(hold_details).map(|email| email.address);
let recipient =
CommonFieldsParser::extract_to_email(hold_details).map(|email| email.address);
let (client_hostname, client_ip, client_port) = self.extract_client_info(hold_details);
let protocol = CommonFieldsParser::extract_protocol(hold_details);
let helo = CommonFieldsParser::extract_helo(hold_details);
let description = if let Some(desc_start) = hold_details.rfind(": ") {
hold_details[(desc_start + 2)..].to_string()
} else {
hold_details.to_string()
};
Some(CleanupEvent::MessageHold {
queue_id,
hold_reason,
sender,
recipient,
client_ip,
client_hostname,
client_port,
protocol,
helo,
description,
})
}
fn extract_hold_reason(&self, hold_details: &str) -> String {
if hold_details.contains("header X-Decision-Result: Quarantine") {
"X-Decision-Result: Quarantine".to_string()
} else if hold_details.contains("hold") {
"hold".to_string()
} else {
"unknown".to_string()
}
}
fn extract_client_info(
&self,
hold_details: &str,
) -> (Option<String>, Option<String>, Option<u16>) {
let client_info_str = match HOLD_CLIENT_REGEX
.captures(hold_details)
.and_then(|c| c.get(1))
.map(|m| m.as_str())
{
Some(s) => s,
None => return (None, None, None),
};
CommonFieldsParser::extract_client_info_simple(client_info_str)
.map(|client| (Some(client.hostname), Some(client.ip), client.port))
.unwrap_or((None, None, None))
}
fn parse_message_discard(&self, message: &str) -> Option<CleanupEvent> {
let captures = MESSAGE_DISCARD_REGEX.captures(message)?;
let queue_id = captures.get(1)?.as_str().to_string();
let discard_details = captures.get(2)?.as_str();
let discard_reason = self.extract_discard_reason(discard_details);
let sender =
CommonFieldsParser::extract_from_email(discard_details).map(|email| email.address);
let recipient =
CommonFieldsParser::extract_to_email(discard_details).map(|email| email.address);
let (client_hostname, client_ip, client_port) = self.extract_client_info(discard_details);
let protocol = CommonFieldsParser::extract_protocol(discard_details);
let helo = CommonFieldsParser::extract_helo(discard_details);
let description = if let Some(desc_start) = discard_details.rfind(": ") {
discard_details[(desc_start + 2)..].to_string()
} else {
discard_details.to_string()
};
Some(CleanupEvent::MessageDiscard {
queue_id,
discard_reason,
sender,
recipient,
client_ip,
client_hostname,
client_port,
protocol,
helo,
description,
})
}
fn parse_message_removed(&self, message: &str) -> Option<CleanupEvent> {
let captures = MESSAGE_REMOVED_REGEX.captures(message)?;
let queue_id = captures.get(1)?.as_str().to_string();
let removal_reason = captures.get(2)?.as_str().to_string();
let details = captures.get(3).map(|m| m.as_str().to_string());
Some(CleanupEvent::MessageRemoved {
queue_id,
removal_reason,
details,
})
}
fn extract_discard_reason(&self, discard_details: &str) -> String {
if discard_details.contains("header X-Decision-Result: Discard") {
"X-Decision-Result: Discard".to_string()
} else if discard_details.contains("discard") {
"discard".to_string()
} else {
"unknown".to_string()
}
}
fn parse_statistics(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = STATISTICS_REGEX.captures(message) {
let processed = captures.get(1)?.as_str().parse::<u32>().ok();
let rejected = captures.get(2)?.as_str().parse::<u32>().ok();
let errors = captures.get(3).and_then(|m| m.as_str().parse::<u32>().ok());
return Some(CleanupEvent::Statistics {
processed,
rejected,
errors,
});
}
None
}
fn parse_snowflake_init(&self, message: &str) -> Option<CleanupEvent> {
if let Some(captures) = SNOWFLAKE_INIT_REGEX.captures(message) {
let node_id = captures.get(1)?.as_str().parse::<u32>().ok()?;
let node_bits = captures.get(2)?.as_str().parse::<u32>().ok()?;
let seq_bits = captures.get(3)?.as_str().parse::<u32>().ok()?;
return Some(CleanupEvent::SnowflakeInit {
node_id,
node_bits,
seq_bits,
});
}
None
}
}
impl ComponentParser for CleanupParser {
fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
if let Some(event) = self.parse_message_id(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_queue_file_warning(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_message_size(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_header_processing(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_address_rewrite(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_message_rewrite(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_filter_action(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_milter_interaction(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_message_reject(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_config_warning(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_message_hold(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_message_discard(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_message_removed(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_statistics(message) {
return Ok(ComponentEvent::Cleanup(event));
}
if let Some(event) = self.parse_snowflake_init(message) {
return Ok(ComponentEvent::Cleanup(event));
}
Ok(ComponentEvent::Cleanup(CleanupEvent::Other {
event_type: "unknown".to_string(),
message: message.to_string(),
queue_id: None, }))
}
fn component_name(&self) -> &'static str {
"cleanup"
}
}
impl Default for CleanupParser {
fn default() -> Self {
Self::new()
}
}