use crate::components::ComponentParser;
use crate::error::ParseError;
use crate::events::{ComponentEvent, QmgrEvent};
use crate::utils::common_fields::CommonFieldsParser;
use crate::utils::queue_id::{create_queue_id_pattern, extract_queue_id};
use lazy_static::lazy_static;
use regex::Regex;
pub struct QmgrParser;
lazy_static! {
static ref CONFIG_WARNING_REGEX: Regex = Regex::new(
r"^warning:\s+(.+)"
).unwrap();
static ref MESSAGE_ACTIVE_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+size=(\d+),\s+nrcpt=(\d+)\s+\(queue active\)")
).unwrap();
static ref MESSAGE_REMOVED_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}:\s+removed(?:\s+\((.+)\))?")
).unwrap();
static ref MESSAGE_DEFERRED_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>(?:,\s+to=<([^>]*)>)?(?:,\s+relay=([^,]+))?(?:,\s+delay=([^,]+))?(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?(?:,\s+status=(.+))?")
).unwrap();
static ref MESSAGE_SENT_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,\s+relay=([^,]+),\s+delay=([^,]+)(?:,\s+delays=([^,]+))?(?:,\s+dsn=([^,]+))?,\s+status=sent\s+(.+)")
).unwrap();
static ref MESSAGE_BOUNCED_REGEX: Regex = Regex::new(
&create_queue_id_pattern(r"^{QUEUE_ID}:\s+from=<([^>]*)>,\s+to=<([^>]*)>,.*status=bounced\s+\((.+)\)(?:,\s+dsn=([^,\)]+))?")
).unwrap();
static ref QUEUE_STATS_REGEX: Regex = Regex::new(
r"^statistics:\s+active=(\d+)\s+deferred=(\d+)\s+hold=(\d+)\s+incoming=(\d+)\s+maildrop=(\d+)"
).unwrap();
static ref TRANSPORT_STATUS_REGEX: Regex = Regex::new(
r"^transport\s+([^:]+):\s+(.+)"
).unwrap();
static ref RESOURCE_LIMIT_REGEX: Regex = Regex::new(
r"^warning:\s+([^(]+)\s+\((\d+)\)\s+(.+)"
).unwrap();
static ref QUEUE_FLUSH_REGEX: Regex = Regex::new(
r"^flush(?:ing)?\s+queue(?:\s+([^:]+))?(?::\s+(\d+)\s+messages)?"
).unwrap();
}
impl QmgrParser {
pub fn new() -> Self {
Self
}
fn parse_configuration_warning(&self, message: &str) -> Option<QmgrEvent> {
if message.contains("qmgr_message_recipient_limit")
|| message.contains("qmgr_message_active_limit")
|| message.contains("process_limit")
|| (message.contains("queue")
&& (message.contains("limit") || message.contains("adjusting")))
{
let warning_type = if message.contains("qmgr_message_recipient_limit") {
"recipient_limit_adjustment".to_string()
} else if message.contains("qmgr_message_active_limit") {
"active_limit_warning".to_string()
} else if message.contains("process_limit") {
"process_limit_warning".to_string()
} else if message.contains("queue") {
"queue_warning".to_string()
} else {
"general_warning".to_string()
};
return Some(QmgrEvent::ConfigurationWarning {
warning_type,
message: message.to_string(),
});
}
None
}
fn parse_message_active(&self, message: &str) -> Option<QmgrEvent> {
if let Some(captures) = MESSAGE_ACTIVE_REGEX.captures(message) {
let queue_id = captures.get(1).unwrap().as_str().to_string();
let from = CommonFieldsParser::extract_from_email(message)
.map(|email| email.address)
.unwrap_or_else(|| captures.get(2).unwrap().as_str().to_string());
let size = CommonFieldsParser::extract_size(message).unwrap_or_else(|| {
captures
.get(3)
.unwrap()
.as_str()
.parse::<u64>()
.ok()
.unwrap_or(0)
});
let nrcpt = captures.get(4).unwrap().as_str().parse::<u32>().ok()?;
return Some(QmgrEvent::MessageActive {
queue_id,
from,
size,
nrcpt,
});
}
None
}
fn parse_message_removed(&self, message: &str) -> Option<QmgrEvent> {
if let Some(captures) = MESSAGE_REMOVED_REGEX.captures(message) {
let queue_id = captures.get(1).unwrap().as_str().to_string();
let reason = captures.get(2).map(|m| m.as_str().to_string());
return Some(QmgrEvent::MessageRemoved { queue_id, reason });
}
None
}
fn parse_message_skipped(&self, message: &str) -> Option<QmgrEvent> {
if message.contains("skipped") {
if let Some(queue_id_match) = message.split(':').next() {
let queue_id = queue_id_match.trim().to_string();
let reason = if message.contains("still being delivered") {
"still being delivered".to_string()
} else {
message
.split("skipped,")
.nth(1)
.map(|s| s.trim().to_string())
.unwrap_or_else(|| "unknown reason".to_string())
};
let status_details = if message.contains(",") {
Some(
message
.split(',')
.skip(1)
.collect::<Vec<&str>>()
.join(",")
.trim()
.to_string(),
)
} else {
None
};
return Some(QmgrEvent::MessageSkipped {
queue_id,
reason,
status_details,
});
}
}
None
}
fn parse_message_deferred(&self, message: &str) -> Option<QmgrEvent> {
if message.contains("status=deferred") {
if let Some(captures) = MESSAGE_DEFERRED_REGEX.captures(message) {
let queue_id = captures.get(1).unwrap().as_str().to_string();
let from = CommonFieldsParser::extract_from_email(message)
.map(|email| email.address)
.unwrap_or_else(|| captures.get(2).unwrap().as_str().to_string());
let to = CommonFieldsParser::extract_to_email(message)
.map(|email| email.address)
.or_else(|| captures.get(3).map(|m| m.as_str().to_string()));
let relay_info = CommonFieldsParser::extract_relay_info(message);
let relay = relay_info
.as_ref()
.map(|r| {
format!(
"{}[{}]:{}",
r.hostname,
r.ip.as_deref().unwrap_or(""),
r.port.map_or(25, |p| p)
)
})
.or_else(|| captures.get(4).map(|m| m.as_str().to_string()));
let delay_info = CommonFieldsParser::extract_delay_info(message);
let delay = delay_info
.as_ref()
.map(|d| d.total.to_string())
.unwrap_or_else(|| {
captures
.get(5)
.map(|m| m.as_str().to_string())
.unwrap_or_default()
});
let delays = delay_info
.as_ref()
.and_then(|d| d.breakdown.as_ref())
.map(|breakdown| {
format!(
"{}/{}/{}/{}",
breakdown[0], breakdown[1], breakdown[2], breakdown[3]
)
})
.or_else(|| captures.get(6).map(|m| m.as_str().to_string()));
let status_info = CommonFieldsParser::extract_status_info(message);
let dsn = status_info
.as_ref()
.and_then(|s| s.dsn.clone())
.or_else(|| captures.get(7).map(|m| m.as_str().to_string()));
let status = status_info
.as_ref()
.map(|s| s.status.clone())
.unwrap_or_else(|| {
captures
.get(8)
.map(|m| m.as_str().to_string())
.unwrap_or_default()
});
return Some(QmgrEvent::MessageDeferred {
queue_id,
from,
to,
relay,
delay,
delays,
dsn,
status,
});
}
}
None
}
fn parse_message_sent(&self, message: &str) -> Option<QmgrEvent> {
if let Some(captures) = MESSAGE_SENT_REGEX.captures(message) {
let queue_id = captures.get(1).unwrap().as_str().to_string();
let from = captures.get(2).unwrap().as_str().to_string();
let to = captures.get(3).unwrap().as_str().to_string();
let relay = captures.get(4).unwrap().as_str().to_string();
let delay = captures.get(5).unwrap().as_str().to_string();
let delays = captures.get(6).map(|m| m.as_str().to_string());
let dsn = captures.get(7).map(|m| m.as_str().to_string());
let status = captures.get(8).unwrap().as_str().to_string();
return Some(QmgrEvent::MessageSent {
queue_id,
from,
to,
relay,
delay,
delays,
dsn,
status,
});
}
None
}
fn parse_other_events(&self, message: &str) -> Option<QmgrEvent> {
if let Some(captures) = QUEUE_STATS_REGEX.captures(message) {
let active = captures.get(1).unwrap().as_str().parse::<u32>().ok();
let deferred = captures.get(2).unwrap().as_str().parse::<u32>().ok();
let hold = captures.get(3).unwrap().as_str().parse::<u32>().ok();
let incoming = captures.get(4).unwrap().as_str().parse::<u32>().ok();
let maildrop = captures.get(5).unwrap().as_str().parse::<u32>().ok();
return Some(QmgrEvent::QueueStats {
active,
deferred,
hold,
incoming,
maildrop,
});
}
if let Some(captures) = TRANSPORT_STATUS_REGEX.captures(message) {
let transport = captures.get(1).unwrap().as_str().to_string();
let status = captures.get(2).unwrap().as_str().to_string();
return Some(QmgrEvent::TransportStatus {
transport,
status,
details: None,
});
}
if let Some(captures) = QUEUE_FLUSH_REGEX.captures(message) {
let queue_name = captures.get(1).map(|m| m.as_str().to_string());
let message_count = captures.get(2).and_then(|m| m.as_str().parse::<u32>().ok());
return Some(QmgrEvent::QueueFlush {
queue_name,
message_count,
});
}
None
}
}
impl ComponentParser for QmgrParser {
fn parse(&self, message: &str) -> Result<ComponentEvent, ParseError> {
if let Some(event) = self.parse_configuration_warning(message) {
return Ok(ComponentEvent::Qmgr(event));
}
if let Some(event) = self.parse_message_active(message) {
return Ok(ComponentEvent::Qmgr(event));
}
if let Some(event) = self.parse_message_removed(message) {
return Ok(ComponentEvent::Qmgr(event));
}
if let Some(event) = self.parse_message_skipped(message) {
return Ok(ComponentEvent::Qmgr(event));
}
if let Some(event) = self.parse_message_deferred(message) {
return Ok(ComponentEvent::Qmgr(event));
}
if let Some(event) = self.parse_message_sent(message) {
return Ok(ComponentEvent::Qmgr(event));
}
if let Some(event) = self.parse_other_events(message) {
return Ok(ComponentEvent::Qmgr(event));
}
let queue_id = extract_queue_id(message);
Ok(ComponentEvent::Qmgr(QmgrEvent::Other {
event_type: "unclassified".to_string(),
message: message.to_string(),
queue_id,
}))
}
fn component_name(&self) -> &'static str {
"qmgr"
}
}
impl Default for QmgrParser {
fn default() -> Self {
Self::new()
}
}