Skip to main content

rusmes_core/queue/
priority.rs

1//! Priority queue system for mail delivery
2//!
3//! This module provides a multi-level priority queue system for mail delivery,
4//! allowing mails to be processed based on their priority level.
5//!
6//! # Priority Levels
7//!
8//! - **High**: Urgent mail (priority 3)
9//! - **Normal**: Default for most mail (priority 2)
10//! - **Low**: Non-urgent mail (priority 1)
11//! - **Bulk**: Newsletters, marketing (priority 0)
12//!
13//! # Features
14//!
15//! - Priority-based scheduling (highest priority first)
16//! - Per-priority statistics tracking
17//! - Priority inheritance for retries
18//! - Configurable priority assignment rules:
19//!   - Per sender email
20//!   - Per recipient email
21//!   - Per domain
22//! - Automatic priority boosting after N failed attempts
23//!
24//! # Example: Basic Priority Queue
25//!
26//! ```
27//! use rusmes_core::queue::priority::{Priority, PriorityQueue};
28//! use rusmes_proto::MailId;
29//!
30//! let mut queue = PriorityQueue::<String>::with_default_config();
31//!
32//! // Enqueue items with different priorities
33//! queue.enqueue(MailId::new(), "bulk mail".to_string(), Priority::Bulk);
34//! queue.enqueue(MailId::new(), "urgent mail".to_string(), Priority::High);
35//! queue.enqueue(MailId::new(), "normal mail".to_string(), Priority::Normal);
36//!
37//! // Dequeue returns highest priority first (High, Normal, Bulk)
38//! let (mail_id, item, priority) = queue.dequeue().unwrap();
39//! assert_eq!(item, "urgent mail");
40//! assert_eq!(priority, Priority::High);
41//! ```
42//!
43//! # Example: Priority Configuration
44//!
45//! ```
46//! use rusmes_core::queue::priority::{Priority, PriorityConfig};
47//! use rusmes_proto::{Mail, MimeMessage, MessageBody, HeaderMap};
48//! use bytes::Bytes;
49//!
50//! let mut config = PriorityConfig::new();
51//!
52//! // VIP sender always gets high priority
53//! config.add_sender_priority("vip@example.com", Priority::High);
54//!
55//! // Important domain gets high priority
56//! config.add_domain_priority("important.com", Priority::High);
57//!
58//! // Bulk domain gets low priority
59//! config.add_domain_priority("marketing.com", Priority::Bulk);
60//!
61//! // Enable priority boost after 3 failed attempts
62//! config.boost_after_attempts = Some(3);
63//! config.boost_amount = 1;
64//!
65//! // Calculate priority for a mail
66//! let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("test")));
67//! let mail = Mail::new(
68//!     Some("vip@example.com".parse().unwrap()),
69//!     vec!["user@example.com".parse().unwrap()],
70//!     message,
71//!     None,
72//!     None,
73//! );
74//!
75//! let priority = config.calculate_priority(&mail, 0);
76//! assert_eq!(priority, Priority::High);
77//! ```
78//!
79//! # Example: Integration with MailQueue
80//!
81//! ```no_run
82//! use rusmes_core::{MailQueue, PriorityConfig, Priority};
83//! use rusmes_proto::{Mail, MimeMessage, MessageBody, HeaderMap};
84//! use bytes::Bytes;
85//!
86//! #[tokio::main]
87//! async fn main() {
88//!     // Create priority configuration
89//!     let mut priority_config = PriorityConfig::new();
90//!     priority_config.add_domain_priority("urgent.com", Priority::High);
91//!
92//!     // Create queue with priority support
93//!     let queue = MailQueue::new_with_priority_config(priority_config);
94//!
95//!     // Enqueue mail (priority calculated automatically)
96//!     let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("test")));
97//!     let mail = Mail::new(
98//!         Some("sender@example.com".parse().unwrap()),
99//!         vec!["user@urgent.com".parse().unwrap()],
100//!         message,
101//!         None,
102//!         None,
103//!     );
104//!     queue.enqueue(mail).await.unwrap();
105//!
106//!     // Get ready mails (sorted by priority)
107//!     let ready_mails = queue.get_ready_for_retry(10);
108//!
109//!     // Get statistics by priority
110//!     let stats = queue.stats_by_priority();
111//!     for (priority, stat) in stats {
112//!         println!("{}: {} total, {} ready", priority, stat.total, stat.ready);
113//!     }
114//! }
115//! ```
116
117use rusmes_proto::{Mail, MailId};
118use serde::{Deserialize, Serialize};
119use std::collections::{HashMap, VecDeque};
120use std::sync::{Arc, RwLock};
121
122/// Mail priority levels
123#[derive(
124    Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Default,
125)]
126pub enum Priority {
127    /// Highest priority - urgent mail
128    High = 3,
129    /// Normal priority - default for most mail
130    #[default]
131    Normal = 2,
132    /// Low priority - non-urgent mail
133    Low = 1,
134    /// Bulk mail - newsletters, marketing
135    Bulk = 0,
136}
137
138impl Priority {
139    /// Get all priority levels in order (highest first)
140    pub fn all() -> &'static [Priority] {
141        &[
142            Priority::High,
143            Priority::Normal,
144            Priority::Low,
145            Priority::Bulk,
146        ]
147    }
148
149    /// Get priority as numeric value
150    pub fn as_u8(self) -> u8 {
151        self as u8
152    }
153
154    /// Create priority from numeric value
155    pub fn from_u8(value: u8) -> Option<Self> {
156        match value {
157            3 => Some(Priority::High),
158            2 => Some(Priority::Normal),
159            1 => Some(Priority::Low),
160            0 => Some(Priority::Bulk),
161            _ => None,
162        }
163    }
164}
165
166impl std::fmt::Display for Priority {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        match self {
169            Priority::High => write!(f, "high"),
170            Priority::Normal => write!(f, "normal"),
171            Priority::Low => write!(f, "low"),
172            Priority::Bulk => write!(f, "bulk"),
173        }
174    }
175}
176
177impl std::str::FromStr for Priority {
178    type Err = String;
179
180    fn from_str(s: &str) -> Result<Self, Self::Err> {
181        match s.to_lowercase().as_str() {
182            "high" => Ok(Priority::High),
183            "normal" => Ok(Priority::Normal),
184            "low" => Ok(Priority::Low),
185            "bulk" => Ok(Priority::Bulk),
186            _ => Err(format!("Invalid priority: {}", s)),
187        }
188    }
189}
190
191/// Configuration for priority assignment rules
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct PriorityConfig {
194    /// Default priority if no rules match
195    pub default_priority: Priority,
196
197    /// Sender-based priority rules (email -> priority)
198    pub sender_priorities: HashMap<String, Priority>,
199
200    /// Recipient-based priority rules (email -> priority)
201    pub recipient_priorities: HashMap<String, Priority>,
202
203    /// Domain-based priority rules (domain -> priority)
204    pub domain_priorities: HashMap<String, Priority>,
205
206    /// Enable priority inheritance for retries
207    pub inherit_priority_on_retry: bool,
208
209    /// Boost priority after N failed attempts
210    pub boost_after_attempts: Option<u32>,
211
212    /// Priority boost amount (e.g., Low -> Normal)
213    pub boost_amount: u8,
214}
215
216impl Default for PriorityConfig {
217    fn default() -> Self {
218        Self {
219            default_priority: Priority::Normal,
220            sender_priorities: HashMap::new(),
221            recipient_priorities: HashMap::new(),
222            domain_priorities: HashMap::new(),
223            inherit_priority_on_retry: true,
224            boost_after_attempts: Some(3),
225            boost_amount: 1,
226        }
227    }
228}
229
230impl PriorityConfig {
231    /// Create a new priority configuration with defaults
232    pub fn new() -> Self {
233        Self::default()
234    }
235
236    /// Add sender priority rule
237    pub fn add_sender_priority(&mut self, sender: impl Into<String>, priority: Priority) {
238        self.sender_priorities.insert(sender.into(), priority);
239    }
240
241    /// Add recipient priority rule
242    pub fn add_recipient_priority(&mut self, recipient: impl Into<String>, priority: Priority) {
243        self.recipient_priorities.insert(recipient.into(), priority);
244    }
245
246    /// Add domain priority rule
247    pub fn add_domain_priority(&mut self, domain: impl Into<String>, priority: Priority) {
248        self.domain_priorities.insert(domain.into(), priority);
249    }
250
251    /// Calculate priority for a mail based on configured rules
252    pub fn calculate_priority(&self, mail: &Mail, current_attempts: u32) -> Priority {
253        // Check if priority is already set as an attribute
254        if let Some(attr) = mail.get_attribute("priority") {
255            if let Some(priority_str) = attr.as_str() {
256                if let Ok(priority) = priority_str.parse::<Priority>() {
257                    return self.apply_boost(priority, current_attempts);
258                }
259            }
260        }
261
262        // Check sender-based rules
263        if let Some(sender) = mail.sender() {
264            let sender_email = sender.as_string();
265            if let Some(&priority) = self.sender_priorities.get(&sender_email) {
266                return self.apply_boost(priority, current_attempts);
267            }
268        }
269
270        // Check recipient-based rules (use highest priority if multiple recipients)
271        let mut max_priority = None;
272        for recipient in mail.recipients() {
273            let recipient_email = recipient.as_string();
274            if let Some(&priority) = self.recipient_priorities.get(&recipient_email) {
275                max_priority = Some(max_priority.map_or(priority, |p: Priority| p.max(priority)));
276            }
277
278            // Check domain-based rules
279            let domain = recipient.domain().as_str();
280            if let Some(&priority) = self.domain_priorities.get(domain) {
281                max_priority = Some(max_priority.map_or(priority, |p: Priority| p.max(priority)));
282            }
283        }
284
285        if let Some(priority) = max_priority {
286            return self.apply_boost(priority, current_attempts);
287        }
288
289        // Use default priority
290        self.apply_boost(self.default_priority, current_attempts)
291    }
292
293    /// Apply priority boost based on retry attempts
294    fn apply_boost(&self, priority: Priority, current_attempts: u32) -> Priority {
295        if let Some(boost_after) = self.boost_after_attempts {
296            if current_attempts >= boost_after {
297                let current_value = priority.as_u8();
298                let boosted_value = current_value.saturating_add(self.boost_amount);
299                return Priority::from_u8(boosted_value.min(3)).unwrap_or(Priority::High);
300            }
301        }
302        priority
303    }
304}
305
306/// Statistics for a single priority level
307#[derive(Debug, Clone, Default)]
308pub struct PriorityStats {
309    /// Total mails in this priority queue
310    pub total: usize,
311    /// Mails ready for delivery
312    pub ready: usize,
313    /// Mails waiting for retry
314    pub delayed: usize,
315    /// Total mails enqueued (lifetime)
316    pub enqueued_total: u64,
317    /// Total mails delivered (lifetime)
318    pub delivered_total: u64,
319}
320
321/// Multi-level priority queue
322pub struct PriorityQueue<T> {
323    /// Separate queue for each priority level
324    queues: HashMap<Priority, VecDeque<(MailId, T)>>,
325    /// Priority configuration
326    config: Arc<RwLock<PriorityConfig>>,
327    /// Per-priority statistics
328    stats: HashMap<Priority, Arc<RwLock<PriorityStats>>>,
329}
330
331impl<T> PriorityQueue<T> {
332    /// Create a new priority queue
333    pub fn new(config: PriorityConfig) -> Self {
334        let mut queues = HashMap::new();
335        let mut stats = HashMap::new();
336
337        for &priority in Priority::all() {
338            queues.insert(priority, VecDeque::new());
339            stats.insert(priority, Arc::new(RwLock::new(PriorityStats::default())));
340        }
341
342        Self {
343            queues,
344            config: Arc::new(RwLock::new(config)),
345            stats,
346        }
347    }
348
349    /// Create with default configuration
350    pub fn with_default_config() -> Self {
351        Self::new(PriorityConfig::default())
352    }
353
354    /// Enqueue an item with a specific priority
355    pub fn enqueue(&mut self, mail_id: MailId, item: T, priority: Priority) {
356        if let Some(queue) = self.queues.get_mut(&priority) {
357            queue.push_back((mail_id, item));
358        }
359
360        // Update statistics
361        if let Some(stats) = self.stats.get(&priority) {
362            if let Ok(mut stats) = stats.write() {
363                stats.total += 1;
364                stats.enqueued_total += 1;
365            }
366        }
367    }
368
369    /// Dequeue the next item based on priority (highest priority first)
370    pub fn dequeue(&mut self) -> Option<(MailId, T, Priority)> {
371        // Try queues in priority order (high to low)
372        for &priority in Priority::all() {
373            if let Some(queue) = self.queues.get_mut(&priority) {
374                if let Some((mail_id, item)) = queue.pop_front() {
375                    // Update statistics
376                    if let Some(stats) = self.stats.get(&priority) {
377                        if let Ok(mut stats) = stats.write() {
378                            stats.total = stats.total.saturating_sub(1);
379                        }
380                    }
381                    return Some((mail_id, item, priority));
382                }
383            }
384        }
385        None
386    }
387
388    /// Peek at the next item without removing it
389    pub fn peek(&self) -> Option<(&MailId, &T, Priority)> {
390        for &priority in Priority::all() {
391            if let Some(queue) = self.queues.get(&priority) {
392                if let Some((mail_id, item)) = queue.front() {
393                    return Some((mail_id, item, priority));
394                }
395            }
396        }
397        None
398    }
399
400    /// Get the number of items in a specific priority queue
401    pub fn len_for_priority(&self, priority: Priority) -> usize {
402        self.queues.get(&priority).map_or(0, |q| q.len())
403    }
404
405    /// Get total number of items across all priorities
406    pub fn len(&self) -> usize {
407        self.queues.values().map(|q| q.len()).sum()
408    }
409
410    /// Check if the queue is empty
411    pub fn is_empty(&self) -> bool {
412        self.len() == 0
413    }
414
415    /// Remove a specific item by mail ID
416    pub fn remove(&mut self, mail_id: &MailId) -> Option<(T, Priority)> {
417        for &priority in Priority::all() {
418            if let Some(queue) = self.queues.get_mut(&priority) {
419                if let Some(pos) = queue.iter().position(|(id, _)| id == mail_id) {
420                    if let Some((_, item)) = queue.remove(pos) {
421                        // Update statistics
422                        if let Some(stats) = self.stats.get(&priority) {
423                            if let Ok(mut stats) = stats.write() {
424                                stats.total = stats.total.saturating_sub(1);
425                            }
426                        }
427                        return Some((item, priority));
428                    }
429                }
430            }
431        }
432        None
433    }
434
435    /// Update priority configuration
436    pub fn update_config(&self, config: PriorityConfig) {
437        if let Ok(mut guard) = self.config.write() {
438            *guard = config;
439        }
440    }
441
442    /// Get current configuration
443    pub fn get_config(&self) -> PriorityConfig {
444        self.config.read().map(|g| g.clone()).unwrap_or_default()
445    }
446
447    /// Get statistics for a specific priority
448    pub fn stats_for_priority(&self, priority: Priority) -> PriorityStats {
449        self.stats
450            .get(&priority)
451            .and_then(|s| s.read().ok().map(|g| g.clone()))
452            .unwrap_or_default()
453    }
454
455    /// Mark item as delivered (for statistics)
456    pub fn mark_delivered(&self, priority: Priority) {
457        if let Some(stats) = self.stats.get(&priority) {
458            if let Ok(mut stats) = stats.write() {
459                stats.delivered_total += 1;
460            }
461        }
462    }
463
464    /// Update ready/delayed counts for a priority
465    pub fn update_ready_delayed_stats(&self, priority: Priority, ready: usize, delayed: usize) {
466        if let Some(stats) = self.stats.get(&priority) {
467            if let Ok(mut stats) = stats.write() {
468                stats.ready = ready;
469                stats.delayed = delayed;
470            }
471        }
472    }
473
474    /// Clear all queues
475    pub fn clear(&mut self) {
476        for queue in self.queues.values_mut() {
477            queue.clear();
478        }
479        for stats in self.stats.values() {
480            if let Ok(mut stats) = stats.write() {
481                stats.total = 0;
482                stats.ready = 0;
483                stats.delayed = 0;
484            }
485        }
486    }
487
488    /// Get all items for a specific priority
489    pub fn items_for_priority(&self, priority: Priority) -> Vec<&(MailId, T)> {
490        self.queues
491            .get(&priority)
492            .map(|q| q.iter().collect())
493            .unwrap_or_default()
494    }
495}
496
497impl<T> Default for PriorityQueue<T> {
498    fn default() -> Self {
499        Self::with_default_config()
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use bytes::Bytes;
507    use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
508
509    #[test]
510    fn test_priority_ordering() {
511        assert!(Priority::High > Priority::Normal);
512        assert!(Priority::Normal > Priority::Low);
513        assert!(Priority::Low > Priority::Bulk);
514    }
515
516    #[test]
517    fn test_priority_from_str() {
518        assert_eq!("high".parse::<Priority>().unwrap(), Priority::High);
519        assert_eq!("normal".parse::<Priority>().unwrap(), Priority::Normal);
520        assert_eq!("low".parse::<Priority>().unwrap(), Priority::Low);
521        assert_eq!("bulk".parse::<Priority>().unwrap(), Priority::Bulk);
522        assert!("invalid".parse::<Priority>().is_err());
523    }
524
525    #[test]
526    fn test_priority_queue_enqueue_dequeue() {
527        let mut queue = PriorityQueue::<String>::with_default_config();
528
529        let mail_id1 = MailId::new();
530        let mail_id2 = MailId::new();
531        let mail_id3 = MailId::new();
532
533        queue.enqueue(mail_id1, "low priority".to_string(), Priority::Low);
534        queue.enqueue(mail_id2, "high priority".to_string(), Priority::High);
535        queue.enqueue(mail_id3, "normal priority".to_string(), Priority::Normal);
536
537        // Should dequeue in priority order: High, Normal, Low
538        let (_, item1, priority1) = queue.dequeue().unwrap();
539        assert_eq!(item1, "high priority");
540        assert_eq!(priority1, Priority::High);
541
542        let (_, item2, priority2) = queue.dequeue().unwrap();
543        assert_eq!(item2, "normal priority");
544        assert_eq!(priority2, Priority::Normal);
545
546        let (_, item3, priority3) = queue.dequeue().unwrap();
547        assert_eq!(item3, "low priority");
548        assert_eq!(priority3, Priority::Low);
549
550        assert!(queue.is_empty());
551    }
552
553    #[test]
554    fn test_priority_queue_remove() {
555        let mut queue = PriorityQueue::<String>::with_default_config();
556
557        let mail_id = MailId::new();
558        queue.enqueue(mail_id, "test".to_string(), Priority::Normal);
559
560        assert_eq!(queue.len(), 1);
561        let (item, priority) = queue.remove(&mail_id).unwrap();
562        assert_eq!(item, "test");
563        assert_eq!(priority, Priority::Normal);
564        assert!(queue.is_empty());
565    }
566
567    #[test]
568    fn test_priority_config_sender_rule() {
569        let mut config = PriorityConfig::new();
570        config.add_sender_priority("vip@example.com", Priority::High);
571
572        let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("test")));
573
574        let mail = Mail::new(
575            Some("vip@example.com".parse().unwrap()),
576            vec!["user@example.com".parse().unwrap()],
577            message,
578            None,
579            None,
580        );
581
582        let priority = config.calculate_priority(&mail, 0);
583        assert_eq!(priority, Priority::High);
584    }
585
586    #[test]
587    fn test_priority_config_domain_rule() {
588        let mut config = PriorityConfig::new();
589        config.add_domain_priority("important.com", Priority::High);
590
591        let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("test")));
592
593        let mail = Mail::new(
594            Some("sender@example.com".parse().unwrap()),
595            vec!["user@important.com".parse().unwrap()],
596            message,
597            None,
598            None,
599        );
600
601        let priority = config.calculate_priority(&mail, 0);
602        assert_eq!(priority, Priority::High);
603    }
604
605    #[test]
606    fn test_priority_boost_on_retry() {
607        let mut config = PriorityConfig::new();
608        config.boost_after_attempts = Some(3);
609        config.boost_amount = 1;
610
611        let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("test")));
612
613        let mail = Mail::new(
614            Some("sender@example.com".parse().unwrap()),
615            vec!["user@example.com".parse().unwrap()],
616            message,
617            None,
618            None,
619        );
620
621        // Before boost threshold
622        let priority = config.calculate_priority(&mail, 2);
623        assert_eq!(priority, Priority::Normal);
624
625        // After boost threshold
626        let priority = config.calculate_priority(&mail, 3);
627        assert_eq!(priority, Priority::High);
628    }
629
630    #[test]
631    fn test_priority_queue_stats() {
632        let mut queue = PriorityQueue::<String>::with_default_config();
633
634        let mail_id = MailId::new();
635        queue.enqueue(mail_id, "test".to_string(), Priority::High);
636
637        let stats = queue.stats_for_priority(Priority::High);
638        assert_eq!(stats.total, 1);
639        assert_eq!(stats.enqueued_total, 1);
640
641        queue.mark_delivered(Priority::High);
642        let stats = queue.stats_for_priority(Priority::High);
643        assert_eq!(stats.delivered_total, 1);
644    }
645}