1use rusmes_proto::{Mail, MailId};
118use serde::{Deserialize, Serialize};
119use std::collections::{HashMap, VecDeque};
120use std::sync::{Arc, RwLock};
121
122#[derive(
124 Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Default,
125)]
126pub enum Priority {
127 High = 3,
129 #[default]
131 Normal = 2,
132 Low = 1,
134 Bulk = 0,
136}
137
138impl Priority {
139 pub fn all() -> &'static [Priority] {
141 &[
142 Priority::High,
143 Priority::Normal,
144 Priority::Low,
145 Priority::Bulk,
146 ]
147 }
148
149 pub fn as_u8(self) -> u8 {
151 self as u8
152 }
153
154 pub fn from_u8(value: u8) -> Option<Self> {
156 match value {
157 3 => Some(Priority::High),
158 2 => Some(Priority::Normal),
159 1 => Some(Priority::Low),
160 0 => Some(Priority::Bulk),
161 _ => None,
162 }
163 }
164}
165
166impl std::fmt::Display for Priority {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 match self {
169 Priority::High => write!(f, "high"),
170 Priority::Normal => write!(f, "normal"),
171 Priority::Low => write!(f, "low"),
172 Priority::Bulk => write!(f, "bulk"),
173 }
174 }
175}
176
177impl std::str::FromStr for Priority {
178 type Err = String;
179
180 fn from_str(s: &str) -> Result<Self, Self::Err> {
181 match s.to_lowercase().as_str() {
182 "high" => Ok(Priority::High),
183 "normal" => Ok(Priority::Normal),
184 "low" => Ok(Priority::Low),
185 "bulk" => Ok(Priority::Bulk),
186 _ => Err(format!("Invalid priority: {}", s)),
187 }
188 }
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct PriorityConfig {
194 pub default_priority: Priority,
196
197 pub sender_priorities: HashMap<String, Priority>,
199
200 pub recipient_priorities: HashMap<String, Priority>,
202
203 pub domain_priorities: HashMap<String, Priority>,
205
206 pub inherit_priority_on_retry: bool,
208
209 pub boost_after_attempts: Option<u32>,
211
212 pub boost_amount: u8,
214}
215
216impl Default for PriorityConfig {
217 fn default() -> Self {
218 Self {
219 default_priority: Priority::Normal,
220 sender_priorities: HashMap::new(),
221 recipient_priorities: HashMap::new(),
222 domain_priorities: HashMap::new(),
223 inherit_priority_on_retry: true,
224 boost_after_attempts: Some(3),
225 boost_amount: 1,
226 }
227 }
228}
229
230impl PriorityConfig {
231 pub fn new() -> Self {
233 Self::default()
234 }
235
236 pub fn add_sender_priority(&mut self, sender: impl Into<String>, priority: Priority) {
238 self.sender_priorities.insert(sender.into(), priority);
239 }
240
241 pub fn add_recipient_priority(&mut self, recipient: impl Into<String>, priority: Priority) {
243 self.recipient_priorities.insert(recipient.into(), priority);
244 }
245
246 pub fn add_domain_priority(&mut self, domain: impl Into<String>, priority: Priority) {
248 self.domain_priorities.insert(domain.into(), priority);
249 }
250
251 pub fn calculate_priority(&self, mail: &Mail, current_attempts: u32) -> Priority {
253 if let Some(attr) = mail.get_attribute("priority") {
255 if let Some(priority_str) = attr.as_str() {
256 if let Ok(priority) = priority_str.parse::<Priority>() {
257 return self.apply_boost(priority, current_attempts);
258 }
259 }
260 }
261
262 if let Some(sender) = mail.sender() {
264 let sender_email = sender.as_string();
265 if let Some(&priority) = self.sender_priorities.get(&sender_email) {
266 return self.apply_boost(priority, current_attempts);
267 }
268 }
269
270 let mut max_priority = None;
272 for recipient in mail.recipients() {
273 let recipient_email = recipient.as_string();
274 if let Some(&priority) = self.recipient_priorities.get(&recipient_email) {
275 max_priority = Some(max_priority.map_or(priority, |p: Priority| p.max(priority)));
276 }
277
278 let domain = recipient.domain().as_str();
280 if let Some(&priority) = self.domain_priorities.get(domain) {
281 max_priority = Some(max_priority.map_or(priority, |p: Priority| p.max(priority)));
282 }
283 }
284
285 if let Some(priority) = max_priority {
286 return self.apply_boost(priority, current_attempts);
287 }
288
289 self.apply_boost(self.default_priority, current_attempts)
291 }
292
293 fn apply_boost(&self, priority: Priority, current_attempts: u32) -> Priority {
295 if let Some(boost_after) = self.boost_after_attempts {
296 if current_attempts >= boost_after {
297 let current_value = priority.as_u8();
298 let boosted_value = current_value.saturating_add(self.boost_amount);
299 return Priority::from_u8(boosted_value.min(3)).unwrap_or(Priority::High);
300 }
301 }
302 priority
303 }
304}
305
306#[derive(Debug, Clone, Default)]
308pub struct PriorityStats {
309 pub total: usize,
311 pub ready: usize,
313 pub delayed: usize,
315 pub enqueued_total: u64,
317 pub delivered_total: u64,
319}
320
321pub struct PriorityQueue<T> {
323 queues: HashMap<Priority, VecDeque<(MailId, T)>>,
325 config: Arc<RwLock<PriorityConfig>>,
327 stats: HashMap<Priority, Arc<RwLock<PriorityStats>>>,
329}
330
331impl<T> PriorityQueue<T> {
332 pub fn new(config: PriorityConfig) -> Self {
334 let mut queues = HashMap::new();
335 let mut stats = HashMap::new();
336
337 for &priority in Priority::all() {
338 queues.insert(priority, VecDeque::new());
339 stats.insert(priority, Arc::new(RwLock::new(PriorityStats::default())));
340 }
341
342 Self {
343 queues,
344 config: Arc::new(RwLock::new(config)),
345 stats,
346 }
347 }
348
349 pub fn with_default_config() -> Self {
351 Self::new(PriorityConfig::default())
352 }
353
354 pub fn enqueue(&mut self, mail_id: MailId, item: T, priority: Priority) {
356 if let Some(queue) = self.queues.get_mut(&priority) {
357 queue.push_back((mail_id, item));
358 }
359
360 if let Some(stats) = self.stats.get(&priority) {
362 if let Ok(mut stats) = stats.write() {
363 stats.total += 1;
364 stats.enqueued_total += 1;
365 }
366 }
367 }
368
369 pub fn dequeue(&mut self) -> Option<(MailId, T, Priority)> {
371 for &priority in Priority::all() {
373 if let Some(queue) = self.queues.get_mut(&priority) {
374 if let Some((mail_id, item)) = queue.pop_front() {
375 if let Some(stats) = self.stats.get(&priority) {
377 if let Ok(mut stats) = stats.write() {
378 stats.total = stats.total.saturating_sub(1);
379 }
380 }
381 return Some((mail_id, item, priority));
382 }
383 }
384 }
385 None
386 }
387
388 pub fn peek(&self) -> Option<(&MailId, &T, Priority)> {
390 for &priority in Priority::all() {
391 if let Some(queue) = self.queues.get(&priority) {
392 if let Some((mail_id, item)) = queue.front() {
393 return Some((mail_id, item, priority));
394 }
395 }
396 }
397 None
398 }
399
400 pub fn len_for_priority(&self, priority: Priority) -> usize {
402 self.queues.get(&priority).map_or(0, |q| q.len())
403 }
404
405 pub fn len(&self) -> usize {
407 self.queues.values().map(|q| q.len()).sum()
408 }
409
410 pub fn is_empty(&self) -> bool {
412 self.len() == 0
413 }
414
415 pub fn remove(&mut self, mail_id: &MailId) -> Option<(T, Priority)> {
417 for &priority in Priority::all() {
418 if let Some(queue) = self.queues.get_mut(&priority) {
419 if let Some(pos) = queue.iter().position(|(id, _)| id == mail_id) {
420 if let Some((_, item)) = queue.remove(pos) {
421 if let Some(stats) = self.stats.get(&priority) {
423 if let Ok(mut stats) = stats.write() {
424 stats.total = stats.total.saturating_sub(1);
425 }
426 }
427 return Some((item, priority));
428 }
429 }
430 }
431 }
432 None
433 }
434
435 pub fn update_config(&self, config: PriorityConfig) {
437 if let Ok(mut guard) = self.config.write() {
438 *guard = config;
439 }
440 }
441
442 pub fn get_config(&self) -> PriorityConfig {
444 self.config.read().map(|g| g.clone()).unwrap_or_default()
445 }
446
447 pub fn stats_for_priority(&self, priority: Priority) -> PriorityStats {
449 self.stats
450 .get(&priority)
451 .and_then(|s| s.read().ok().map(|g| g.clone()))
452 .unwrap_or_default()
453 }
454
455 pub fn mark_delivered(&self, priority: Priority) {
457 if let Some(stats) = self.stats.get(&priority) {
458 if let Ok(mut stats) = stats.write() {
459 stats.delivered_total += 1;
460 }
461 }
462 }
463
464 pub fn update_ready_delayed_stats(&self, priority: Priority, ready: usize, delayed: usize) {
466 if let Some(stats) = self.stats.get(&priority) {
467 if let Ok(mut stats) = stats.write() {
468 stats.ready = ready;
469 stats.delayed = delayed;
470 }
471 }
472 }
473
474 pub fn clear(&mut self) {
476 for queue in self.queues.values_mut() {
477 queue.clear();
478 }
479 for stats in self.stats.values() {
480 if let Ok(mut stats) = stats.write() {
481 stats.total = 0;
482 stats.ready = 0;
483 stats.delayed = 0;
484 }
485 }
486 }
487
488 pub fn items_for_priority(&self, priority: Priority) -> Vec<&(MailId, T)> {
490 self.queues
491 .get(&priority)
492 .map(|q| q.iter().collect())
493 .unwrap_or_default()
494 }
495}
496
497impl<T> Default for PriorityQueue<T> {
498 fn default() -> Self {
499 Self::with_default_config()
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506 use bytes::Bytes;
507 use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
508
509 #[test]
510 fn test_priority_ordering() {
511 assert!(Priority::High > Priority::Normal);
512 assert!(Priority::Normal > Priority::Low);
513 assert!(Priority::Low > Priority::Bulk);
514 }
515
516 #[test]
517 fn test_priority_from_str() {
518 assert_eq!("high".parse::<Priority>().unwrap(), Priority::High);
519 assert_eq!("normal".parse::<Priority>().unwrap(), Priority::Normal);
520 assert_eq!("low".parse::<Priority>().unwrap(), Priority::Low);
521 assert_eq!("bulk".parse::<Priority>().unwrap(), Priority::Bulk);
522 assert!("invalid".parse::<Priority>().is_err());
523 }
524
525 #[test]
526 fn test_priority_queue_enqueue_dequeue() {
527 let mut queue = PriorityQueue::<String>::with_default_config();
528
529 let mail_id1 = MailId::new();
530 let mail_id2 = MailId::new();
531 let mail_id3 = MailId::new();
532
533 queue.enqueue(mail_id1, "low priority".to_string(), Priority::Low);
534 queue.enqueue(mail_id2, "high priority".to_string(), Priority::High);
535 queue.enqueue(mail_id3, "normal priority".to_string(), Priority::Normal);
536
537 let (_, item1, priority1) = queue.dequeue().unwrap();
539 assert_eq!(item1, "high priority");
540 assert_eq!(priority1, Priority::High);
541
542 let (_, item2, priority2) = queue.dequeue().unwrap();
543 assert_eq!(item2, "normal priority");
544 assert_eq!(priority2, Priority::Normal);
545
546 let (_, item3, priority3) = queue.dequeue().unwrap();
547 assert_eq!(item3, "low priority");
548 assert_eq!(priority3, Priority::Low);
549
550 assert!(queue.is_empty());
551 }
552
553 #[test]
554 fn test_priority_queue_remove() {
555 let mut queue = PriorityQueue::<String>::with_default_config();
556
557 let mail_id = MailId::new();
558 queue.enqueue(mail_id, "test".to_string(), Priority::Normal);
559
560 assert_eq!(queue.len(), 1);
561 let (item, priority) = queue.remove(&mail_id).unwrap();
562 assert_eq!(item, "test");
563 assert_eq!(priority, Priority::Normal);
564 assert!(queue.is_empty());
565 }
566
567 #[test]
568 fn test_priority_config_sender_rule() {
569 let mut config = PriorityConfig::new();
570 config.add_sender_priority("vip@example.com", Priority::High);
571
572 let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("test")));
573
574 let mail = Mail::new(
575 Some("vip@example.com".parse().unwrap()),
576 vec!["user@example.com".parse().unwrap()],
577 message,
578 None,
579 None,
580 );
581
582 let priority = config.calculate_priority(&mail, 0);
583 assert_eq!(priority, Priority::High);
584 }
585
586 #[test]
587 fn test_priority_config_domain_rule() {
588 let mut config = PriorityConfig::new();
589 config.add_domain_priority("important.com", Priority::High);
590
591 let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("test")));
592
593 let mail = Mail::new(
594 Some("sender@example.com".parse().unwrap()),
595 vec!["user@important.com".parse().unwrap()],
596 message,
597 None,
598 None,
599 );
600
601 let priority = config.calculate_priority(&mail, 0);
602 assert_eq!(priority, Priority::High);
603 }
604
605 #[test]
606 fn test_priority_boost_on_retry() {
607 let mut config = PriorityConfig::new();
608 config.boost_after_attempts = Some(3);
609 config.boost_amount = 1;
610
611 let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("test")));
612
613 let mail = Mail::new(
614 Some("sender@example.com".parse().unwrap()),
615 vec!["user@example.com".parse().unwrap()],
616 message,
617 None,
618 None,
619 );
620
621 let priority = config.calculate_priority(&mail, 2);
623 assert_eq!(priority, Priority::Normal);
624
625 let priority = config.calculate_priority(&mail, 3);
627 assert_eq!(priority, Priority::High);
628 }
629
630 #[test]
631 fn test_priority_queue_stats() {
632 let mut queue = PriorityQueue::<String>::with_default_config();
633
634 let mail_id = MailId::new();
635 queue.enqueue(mail_id, "test".to_string(), Priority::High);
636
637 let stats = queue.stats_for_priority(Priority::High);
638 assert_eq!(stats.total, 1);
639 assert_eq!(stats.enqueued_total, 1);
640
641 queue.mark_delivered(Priority::High);
642 let stats = queue.stats_for_priority(Priority::High);
643 assert_eq!(stats.delivered_total, 1);
644 }
645}