use rusmes_proto::{Mail, MailId};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, RwLock};
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Default,
)]
pub enum Priority {
High = 3,
#[default]
Normal = 2,
Low = 1,
Bulk = 0,
}
impl Priority {
pub fn all() -> &'static [Priority] {
&[
Priority::High,
Priority::Normal,
Priority::Low,
Priority::Bulk,
]
}
pub fn as_u8(self) -> u8 {
self as u8
}
pub fn from_u8(value: u8) -> Option<Self> {
match value {
3 => Some(Priority::High),
2 => Some(Priority::Normal),
1 => Some(Priority::Low),
0 => Some(Priority::Bulk),
_ => None,
}
}
}
impl std::fmt::Display for Priority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Priority::High => write!(f, "high"),
Priority::Normal => write!(f, "normal"),
Priority::Low => write!(f, "low"),
Priority::Bulk => write!(f, "bulk"),
}
}
}
impl std::str::FromStr for Priority {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"high" => Ok(Priority::High),
"normal" => Ok(Priority::Normal),
"low" => Ok(Priority::Low),
"bulk" => Ok(Priority::Bulk),
_ => Err(format!("Invalid priority: {}", s)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PriorityConfig {
pub default_priority: Priority,
pub sender_priorities: HashMap<String, Priority>,
pub recipient_priorities: HashMap<String, Priority>,
pub domain_priorities: HashMap<String, Priority>,
pub inherit_priority_on_retry: bool,
pub boost_after_attempts: Option<u32>,
pub boost_amount: u8,
}
impl Default for PriorityConfig {
fn default() -> Self {
Self {
default_priority: Priority::Normal,
sender_priorities: HashMap::new(),
recipient_priorities: HashMap::new(),
domain_priorities: HashMap::new(),
inherit_priority_on_retry: true,
boost_after_attempts: Some(3),
boost_amount: 1,
}
}
}
impl PriorityConfig {
pub fn new() -> Self {
Self::default()
}
pub fn add_sender_priority(&mut self, sender: impl Into<String>, priority: Priority) {
self.sender_priorities.insert(sender.into(), priority);
}
pub fn add_recipient_priority(&mut self, recipient: impl Into<String>, priority: Priority) {
self.recipient_priorities.insert(recipient.into(), priority);
}
pub fn add_domain_priority(&mut self, domain: impl Into<String>, priority: Priority) {
self.domain_priorities.insert(domain.into(), priority);
}
pub fn calculate_priority(&self, mail: &Mail, current_attempts: u32) -> Priority {
if let Some(attr) = mail.get_attribute("priority") {
if let Some(priority_str) = attr.as_str() {
if let Ok(priority) = priority_str.parse::<Priority>() {
return self.apply_boost(priority, current_attempts);
}
}
}
if let Some(sender) = mail.sender() {
let sender_email = sender.as_string();
if let Some(&priority) = self.sender_priorities.get(&sender_email) {
return self.apply_boost(priority, current_attempts);
}
}
let mut max_priority = None;
for recipient in mail.recipients() {
let recipient_email = recipient.as_string();
if let Some(&priority) = self.recipient_priorities.get(&recipient_email) {
max_priority = Some(max_priority.map_or(priority, |p: Priority| p.max(priority)));
}
let domain = recipient.domain().as_str();
if let Some(&priority) = self.domain_priorities.get(domain) {
max_priority = Some(max_priority.map_or(priority, |p: Priority| p.max(priority)));
}
}
if let Some(priority) = max_priority {
return self.apply_boost(priority, current_attempts);
}
self.apply_boost(self.default_priority, current_attempts)
}
fn apply_boost(&self, priority: Priority, current_attempts: u32) -> Priority {
if let Some(boost_after) = self.boost_after_attempts {
if current_attempts >= boost_after {
let current_value = priority.as_u8();
let boosted_value = current_value.saturating_add(self.boost_amount);
return Priority::from_u8(boosted_value.min(3)).unwrap_or(Priority::High);
}
}
priority
}
}
#[derive(Debug, Clone, Default)]
pub struct PriorityStats {
pub total: usize,
pub ready: usize,
pub delayed: usize,
pub enqueued_total: u64,
pub delivered_total: u64,
}
pub struct PriorityQueue<T> {
queues: HashMap<Priority, VecDeque<(MailId, T)>>,
config: Arc<RwLock<PriorityConfig>>,
stats: HashMap<Priority, Arc<RwLock<PriorityStats>>>,
}
impl<T> PriorityQueue<T> {
pub fn new(config: PriorityConfig) -> Self {
let mut queues = HashMap::new();
let mut stats = HashMap::new();
for &priority in Priority::all() {
queues.insert(priority, VecDeque::new());
stats.insert(priority, Arc::new(RwLock::new(PriorityStats::default())));
}
Self {
queues,
config: Arc::new(RwLock::new(config)),
stats,
}
}
pub fn with_default_config() -> Self {
Self::new(PriorityConfig::default())
}
pub fn enqueue(&mut self, mail_id: MailId, item: T, priority: Priority) {
if let Some(queue) = self.queues.get_mut(&priority) {
queue.push_back((mail_id, item));
}
if let Some(stats) = self.stats.get(&priority) {
if let Ok(mut stats) = stats.write() {
stats.total += 1;
stats.enqueued_total += 1;
}
}
}
pub fn dequeue(&mut self) -> Option<(MailId, T, Priority)> {
for &priority in Priority::all() {
if let Some(queue) = self.queues.get_mut(&priority) {
if let Some((mail_id, item)) = queue.pop_front() {
if let Some(stats) = self.stats.get(&priority) {
if let Ok(mut stats) = stats.write() {
stats.total = stats.total.saturating_sub(1);
}
}
return Some((mail_id, item, priority));
}
}
}
None
}
pub fn peek(&self) -> Option<(&MailId, &T, Priority)> {
for &priority in Priority::all() {
if let Some(queue) = self.queues.get(&priority) {
if let Some((mail_id, item)) = queue.front() {
return Some((mail_id, item, priority));
}
}
}
None
}
pub fn len_for_priority(&self, priority: Priority) -> usize {
self.queues.get(&priority).map_or(0, |q| q.len())
}
pub fn len(&self) -> usize {
self.queues.values().map(|q| q.len()).sum()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn remove(&mut self, mail_id: &MailId) -> Option<(T, Priority)> {
for &priority in Priority::all() {
if let Some(queue) = self.queues.get_mut(&priority) {
if let Some(pos) = queue.iter().position(|(id, _)| id == mail_id) {
if let Some((_, item)) = queue.remove(pos) {
if let Some(stats) = self.stats.get(&priority) {
if let Ok(mut stats) = stats.write() {
stats.total = stats.total.saturating_sub(1);
}
}
return Some((item, priority));
}
}
}
}
None
}
pub fn update_config(&self, config: PriorityConfig) {
if let Ok(mut guard) = self.config.write() {
*guard = config;
}
}
pub fn get_config(&self) -> PriorityConfig {
self.config.read().map(|g| g.clone()).unwrap_or_default()
}
pub fn stats_for_priority(&self, priority: Priority) -> PriorityStats {
self.stats
.get(&priority)
.and_then(|s| s.read().ok().map(|g| g.clone()))
.unwrap_or_default()
}
pub fn mark_delivered(&self, priority: Priority) {
if let Some(stats) = self.stats.get(&priority) {
if let Ok(mut stats) = stats.write() {
stats.delivered_total += 1;
}
}
}
pub fn update_ready_delayed_stats(&self, priority: Priority, ready: usize, delayed: usize) {
if let Some(stats) = self.stats.get(&priority) {
if let Ok(mut stats) = stats.write() {
stats.ready = ready;
stats.delayed = delayed;
}
}
}
pub fn clear(&mut self) {
for queue in self.queues.values_mut() {
queue.clear();
}
for stats in self.stats.values() {
if let Ok(mut stats) = stats.write() {
stats.total = 0;
stats.ready = 0;
stats.delayed = 0;
}
}
}
pub fn items_for_priority(&self, priority: Priority) -> Vec<&(MailId, T)> {
self.queues
.get(&priority)
.map(|q| q.iter().collect())
.unwrap_or_default()
}
}
impl<T> Default for PriorityQueue<T> {
fn default() -> Self {
Self::with_default_config()
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
#[test]
fn test_priority_ordering() {
assert!(Priority::High > Priority::Normal);
assert!(Priority::Normal > Priority::Low);
assert!(Priority::Low > Priority::Bulk);
}
#[test]
fn test_priority_from_str() {
assert_eq!("high".parse::<Priority>().unwrap(), Priority::High);
assert_eq!("normal".parse::<Priority>().unwrap(), Priority::Normal);
assert_eq!("low".parse::<Priority>().unwrap(), Priority::Low);
assert_eq!("bulk".parse::<Priority>().unwrap(), Priority::Bulk);
assert!("invalid".parse::<Priority>().is_err());
}
#[test]
fn test_priority_queue_enqueue_dequeue() {
let mut queue = PriorityQueue::<String>::with_default_config();
let mail_id1 = MailId::new();
let mail_id2 = MailId::new();
let mail_id3 = MailId::new();
queue.enqueue(mail_id1, "low priority".to_string(), Priority::Low);
queue.enqueue(mail_id2, "high priority".to_string(), Priority::High);
queue.enqueue(mail_id3, "normal priority".to_string(), Priority::Normal);
let (_, item1, priority1) = queue.dequeue().unwrap();
assert_eq!(item1, "high priority");
assert_eq!(priority1, Priority::High);
let (_, item2, priority2) = queue.dequeue().unwrap();
assert_eq!(item2, "normal priority");
assert_eq!(priority2, Priority::Normal);
let (_, item3, priority3) = queue.dequeue().unwrap();
assert_eq!(item3, "low priority");
assert_eq!(priority3, Priority::Low);
assert!(queue.is_empty());
}
#[test]
fn test_priority_queue_remove() {
let mut queue = PriorityQueue::<String>::with_default_config();
let mail_id = MailId::new();
queue.enqueue(mail_id, "test".to_string(), Priority::Normal);
assert_eq!(queue.len(), 1);
let (item, priority) = queue.remove(&mail_id).unwrap();
assert_eq!(item, "test");
assert_eq!(priority, Priority::Normal);
assert!(queue.is_empty());
}
#[test]
fn test_priority_config_sender_rule() {
let mut config = PriorityConfig::new();
config.add_sender_priority("vip@example.com", Priority::High);
let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("test")));
let mail = Mail::new(
Some("vip@example.com".parse().unwrap()),
vec!["user@example.com".parse().unwrap()],
message,
None,
None,
);
let priority = config.calculate_priority(&mail, 0);
assert_eq!(priority, Priority::High);
}
#[test]
fn test_priority_config_domain_rule() {
let mut config = PriorityConfig::new();
config.add_domain_priority("important.com", Priority::High);
let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("test")));
let mail = Mail::new(
Some("sender@example.com".parse().unwrap()),
vec!["user@important.com".parse().unwrap()],
message,
None,
None,
);
let priority = config.calculate_priority(&mail, 0);
assert_eq!(priority, Priority::High);
}
#[test]
fn test_priority_boost_on_retry() {
let mut config = PriorityConfig::new();
config.boost_after_attempts = Some(3);
config.boost_amount = 1;
let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("test")));
let mail = Mail::new(
Some("sender@example.com".parse().unwrap()),
vec!["user@example.com".parse().unwrap()],
message,
None,
None,
);
let priority = config.calculate_priority(&mail, 2);
assert_eq!(priority, Priority::Normal);
let priority = config.calculate_priority(&mail, 3);
assert_eq!(priority, Priority::High);
}
#[test]
fn test_priority_queue_stats() {
let mut queue = PriorityQueue::<String>::with_default_config();
let mail_id = MailId::new();
queue.enqueue(mail_id, "test".to_string(), Priority::High);
let stats = queue.stats_for_priority(Priority::High);
assert_eq!(stats.total, 1);
assert_eq!(stats.enqueued_total, 1);
queue.mark_delivered(Priority::High);
let stats = queue.stats_for_priority(Priority::High);
assert_eq!(stats.delivered_total, 1);
}
}