Skip to main content

rusmes_core/queue/
core.rs

1//! Mail queue management with retry logic
2
3use crate::queue::priority::Priority;
4use dashmap::DashMap;
5use rusmes_proto::{Mail, MailId};
6use rusmes_storage::StorageBackend;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, RwLock};
12use std::time::{Duration, SystemTime};
13
14/// Serializable queue entry metadata
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct QueueEntryData {
17    pub mail_id: MailId,
18    pub attempts: u32,
19    pub max_attempts: u32,
20    #[serde(with = "systemtime_serde")]
21    pub next_retry: SystemTime,
22    pub last_error: Option<String>,
23    #[serde(default)]
24    pub priority: Priority,
25}
26
27/// Queue entry with retry information
28#[derive(Debug, Clone)]
29pub struct QueueEntry {
30    pub mail: Mail,
31    pub attempts: u32,
32    pub max_attempts: u32,
33    pub next_retry: SystemTime,
34    pub last_error: Option<String>,
35    pub priority: Priority,
36}
37
38/// Serialization helper for SystemTime
39mod systemtime_serde {
40    use serde::{Deserialize, Deserializer, Serialize, Serializer};
41    use std::time::{Duration, SystemTime, UNIX_EPOCH};
42
43    pub fn serialize<S>(time: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
44    where
45        S: Serializer,
46    {
47        let duration = time
48            .duration_since(UNIX_EPOCH)
49            .map_err(serde::ser::Error::custom)?;
50        duration.as_secs().serialize(serializer)
51    }
52
53    pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
54    where
55        D: Deserializer<'de>,
56    {
57        let secs = u64::deserialize(deserializer)?;
58        Ok(UNIX_EPOCH + Duration::from_secs(secs))
59    }
60}
61
62impl QueueEntry {
63    /// Create a new queue entry with default priority
64    pub fn new(mail: Mail) -> Self {
65        Self::new_with_priority(mail, Priority::default())
66    }
67
68    /// Create a new queue entry with specified priority
69    pub fn new_with_priority(mail: Mail, priority: Priority) -> Self {
70        Self {
71            mail,
72            attempts: 0,
73            max_attempts: 5,
74            next_retry: SystemTime::now(),
75            last_error: None,
76            priority,
77        }
78    }
79
80    /// Get the priority of this entry
81    pub fn priority(&self) -> Priority {
82        self.priority
83    }
84
85    /// Set the priority of this entry
86    pub fn set_priority(&mut self, priority: Priority) {
87        self.priority = priority;
88    }
89
90    /// Calculate next retry time with exponential backoff
91    pub fn calculate_next_retry(&mut self) {
92        let backoff_secs = 2u64.pow(self.attempts.min(10)) * 60; // 1min, 2min, 4min, 8min...
93        self.next_retry = SystemTime::now() + Duration::from_secs(backoff_secs);
94        self.attempts += 1;
95    }
96
97    /// Check if entry should be retried
98    pub fn should_retry(&self) -> bool {
99        self.attempts < self.max_attempts && SystemTime::now() >= self.next_retry
100    }
101
102    /// Check if entry has exceeded max attempts
103    pub fn is_bounced(&self) -> bool {
104        self.attempts >= self.max_attempts
105    }
106
107    /// Get mail ID
108    pub fn mail_id(&self) -> &MailId {
109        self.mail.id()
110    }
111
112    /// Convert to serializable data
113    pub fn to_data(&self) -> QueueEntryData {
114        QueueEntryData {
115            mail_id: *self.mail.id(),
116            attempts: self.attempts,
117            max_attempts: self.max_attempts,
118            next_retry: self.next_retry,
119            last_error: self.last_error.clone(),
120            priority: self.priority,
121        }
122    }
123
124    /// Create from data and mail
125    pub fn from_data(data: QueueEntryData, mail: Mail) -> Self {
126        Self {
127            mail,
128            attempts: data.attempts,
129            max_attempts: data.max_attempts,
130            next_retry: data.next_retry,
131            last_error: data.last_error,
132            priority: data.priority,
133        }
134    }
135}
136
137/// Queue storage operations
138#[async_trait::async_trait]
139pub trait QueueStore: Send + Sync {
140    /// Save a queue entry to persistent storage
141    async fn save_entry(&self, entry: &QueueEntry) -> anyhow::Result<()>;
142
143    /// Load a queue entry from persistent storage
144    async fn load_entry(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>>;
145
146    /// Remove a queue entry from persistent storage
147    async fn remove_entry(&self, mail_id: &MailId) -> anyhow::Result<()>;
148
149    /// Load all pending queue entries on startup
150    async fn load_all_entries(&self) -> anyhow::Result<Vec<QueueEntry>>;
151
152    /// Save to dead letter queue
153    async fn save_to_dlq(&self, entry: &QueueEntry) -> anyhow::Result<()>;
154
155    /// List all dead letter queue entries
156    async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>>;
157
158    /// Remove from dead letter queue
159    async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()>;
160}
161
162/// Mail queue with retry logic and priority support
163pub struct MailQueue {
164    entries: Arc<RwLock<HashMap<MailId, QueueEntry>>>,
165    store: Option<Arc<dyn QueueStore>>,
166    priority_config: Arc<RwLock<crate::queue::priority::PriorityConfig>>,
167    /// Per-recipient-domain message counter (domain → count)
168    domain_stats: Arc<DashMap<String, AtomicU64>>,
169}
170
171impl MailQueue {
172    /// Create a new mail queue without persistent storage
173    pub fn new() -> Self {
174        Self {
175            entries: Arc::new(RwLock::new(HashMap::new())),
176            store: None,
177            priority_config: Arc::new(RwLock::new(
178                crate::queue::priority::PriorityConfig::default(),
179            )),
180            domain_stats: Arc::new(DashMap::new()),
181        }
182    }
183
184    /// Create a new mail queue with persistent storage
185    pub fn new_with_store(store: Arc<dyn QueueStore>) -> Self {
186        Self {
187            entries: Arc::new(RwLock::new(HashMap::new())),
188            store: Some(store),
189            priority_config: Arc::new(RwLock::new(
190                crate::queue::priority::PriorityConfig::default(),
191            )),
192            domain_stats: Arc::new(DashMap::new()),
193        }
194    }
195
196    /// Create a new mail queue with priority configuration
197    pub fn new_with_priority_config(
198        priority_config: crate::queue::priority::PriorityConfig,
199    ) -> Self {
200        Self {
201            entries: Arc::new(RwLock::new(HashMap::new())),
202            store: None,
203            priority_config: Arc::new(RwLock::new(priority_config)),
204            domain_stats: Arc::new(DashMap::new()),
205        }
206    }
207
208    /// Create a new mail queue with storage and priority configuration
209    pub fn new_with_store_and_priority(
210        store: Arc<dyn QueueStore>,
211        priority_config: crate::queue::priority::PriorityConfig,
212    ) -> Self {
213        Self {
214            entries: Arc::new(RwLock::new(HashMap::new())),
215            store: Some(store),
216            priority_config: Arc::new(RwLock::new(priority_config)),
217            domain_stats: Arc::new(DashMap::new()),
218        }
219    }
220
221    /// Update priority configuration
222    pub fn update_priority_config(&self, config: crate::queue::priority::PriorityConfig) {
223        if let Ok(mut guard) = self.priority_config.write() {
224            *guard = config;
225        }
226    }
227
228    /// Get current priority configuration
229    pub fn get_priority_config(&self) -> crate::queue::priority::PriorityConfig {
230        self.priority_config
231            .read()
232            .map(|g| g.clone())
233            .unwrap_or_default()
234    }
235
236    /// Get a handle to the domain stats map (for metrics consumers such as rusmes-metrics)
237    pub fn domain_stats_map(&self) -> Arc<DashMap<String, AtomicU64>> {
238        Arc::clone(&self.domain_stats)
239    }
240
241    /// Return a snapshot of per-recipient-domain message counts.
242    ///
243    /// This is the primary API consumed by `rusmes-metrics` (Cluster 7) for Prometheus exposition.
244    pub fn queue_stats_per_domain(&self) -> HashMap<String, u64> {
245        self.domain_stats
246            .iter()
247            .map(|entry| (entry.key().clone(), entry.value().load(Ordering::Relaxed)))
248            .collect()
249    }
250
251    /// Increment domain counters for all recipient domains in the given mail.
252    fn record_domain_stats(&self, mail: &Mail) {
253        for recipient in mail.recipients() {
254            let domain = recipient.domain().as_str().to_owned();
255            if let Some(counter) = self.domain_stats.get(&domain) {
256                counter.fetch_add(1, Ordering::Relaxed);
257            } else {
258                // Use entry API to avoid races
259                self.domain_stats
260                    .entry(domain)
261                    .or_insert_with(|| AtomicU64::new(0))
262                    .fetch_add(1, Ordering::Relaxed);
263            }
264        }
265    }
266
267    /// Load all pending entries from storage on startup
268    pub async fn load_from_storage(&self) -> anyhow::Result<()> {
269        if let Some(store) = &self.store {
270            let entries = store.load_all_entries().await?;
271            let mut queue_entries = self
272                .entries
273                .write()
274                .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
275            for entry in entries {
276                let mail_id = *entry.mail.id();
277                tracing::info!("Loaded mail {} from storage", mail_id);
278                queue_entries.insert(mail_id, entry);
279            }
280        }
281        Ok(())
282    }
283
284    /// Enqueue a mail for delivery (atomic operation with persistence)
285    pub async fn enqueue(&self, mail: Mail) -> anyhow::Result<()> {
286        let mail_id = *mail.id();
287
288        // Calculate priority based on configuration
289        let priority = {
290            let config = self
291                .priority_config
292                .read()
293                .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
294            config.calculate_priority(&mail, 0)
295        };
296
297        // Record domain statistics before moving mail into entry
298        self.record_domain_stats(&mail);
299
300        let entry = QueueEntry::new_with_priority(mail, priority);
301
302        // Save to storage first (if available)
303        if let Some(store) = &self.store {
304            store.save_entry(&entry).await?;
305        }
306
307        // Then add to in-memory queue
308        self.entries
309            .write()
310            .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
311            .insert(mail_id, entry);
312        tracing::info!(
313            "Enqueued mail {} for delivery with priority {}",
314            mail_id,
315            priority
316        );
317        Ok(())
318    }
319
320    /// Enqueue a mail with explicit priority
321    pub async fn enqueue_with_priority(
322        &self,
323        mail: Mail,
324        priority: Priority,
325    ) -> anyhow::Result<()> {
326        let mail_id = *mail.id();
327
328        // Record domain statistics
329        self.record_domain_stats(&mail);
330
331        let entry = QueueEntry::new_with_priority(mail, priority);
332
333        // Save to storage first (if available)
334        if let Some(store) = &self.store {
335            store.save_entry(&entry).await?;
336        }
337
338        // Then add to in-memory queue
339        self.entries
340            .write()
341            .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
342            .insert(mail_id, entry);
343        tracing::info!(
344            "Enqueued mail {} for delivery with priority {}",
345            mail_id,
346            priority
347        );
348        Ok(())
349    }
350
351    /// Get next batch of mails ready for retry, ordered by priority
352    pub fn get_ready_for_retry(&self, limit: usize) -> Vec<QueueEntry> {
353        let entries = match self.entries.read() {
354            Ok(guard) => guard,
355            Err(poisoned) => poisoned.into_inner(),
356        };
357        let mut ready: Vec<QueueEntry> = entries
358            .values()
359            .filter(|e| e.should_retry())
360            .cloned()
361            .collect();
362
363        // Sort by priority (highest first), then by next_retry time
364        ready.sort_by(|a, b| match b.priority.cmp(&a.priority) {
365            std::cmp::Ordering::Equal => a.next_retry.cmp(&b.next_retry),
366            other => other,
367        });
368
369        ready.into_iter().take(limit).collect()
370    }
371
372    /// Get mails ready for retry for a specific priority
373    pub fn get_ready_for_retry_by_priority(
374        &self,
375        priority: Priority,
376        limit: usize,
377    ) -> Vec<QueueEntry> {
378        let entries = match self.entries.read() {
379            Ok(guard) => guard,
380            Err(poisoned) => poisoned.into_inner(),
381        };
382        entries
383            .values()
384            .filter(|e| e.should_retry() && e.priority == priority)
385            .take(limit)
386            .cloned()
387            .collect()
388    }
389
390    /// Mark delivery attempt as failed (atomic operation with persistence)
391    pub async fn mark_failed(&self, mail_id: &MailId, error: String) -> anyhow::Result<()> {
392        let (should_move_to_dlq, entry_to_save) = {
393            let mut entries = self
394                .entries
395                .write()
396                .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
397            if let Some(entry) = entries.get_mut(mail_id) {
398                entry.last_error = Some(error.clone());
399                entry.calculate_next_retry();
400
401                // Update priority based on retry attempts (priority inheritance/boost)
402                let new_priority = {
403                    let config = self
404                        .priority_config
405                        .read()
406                        .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
407                    if config.inherit_priority_on_retry {
408                        config.calculate_priority(&entry.mail, entry.attempts)
409                    } else {
410                        entry.priority
411                    }
412                };
413
414                if new_priority != entry.priority {
415                    tracing::info!(
416                        "Mail {} priority boosted from {} to {} after {} attempts",
417                        mail_id,
418                        entry.priority,
419                        new_priority,
420                        entry.attempts
421                    );
422                    entry.priority = new_priority;
423                }
424
425                if entry.is_bounced() {
426                    tracing::warn!(
427                        "Mail {} exceeded max delivery attempts ({}), moving to DLQ",
428                        mail_id,
429                        entry.max_attempts
430                    );
431                    (true, None)
432                } else {
433                    tracing::info!(
434                        "Mail {} delivery failed (attempt {}/{}), priority {}, retry at {:?}",
435                        mail_id,
436                        entry.attempts,
437                        entry.max_attempts,
438                        entry.priority,
439                        entry.next_retry
440                    );
441                    (false, Some(entry.clone()))
442                }
443            } else {
444                (false, None)
445            }
446        }; // Lock dropped here
447
448        // Update in storage (outside lock)
449        if let Some(entry) = entry_to_save {
450            if let Some(store) = &self.store {
451                store.save_entry(&entry).await?;
452            }
453        }
454
455        // Move to DLQ if bounced
456        if should_move_to_dlq {
457            self.move_to_dlq(mail_id).await?;
458        }
459
460        Ok(())
461    }
462
463    /// Move entry to dead letter queue
464    async fn move_to_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
465        let entry = self
466            .entries
467            .write()
468            .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
469            .remove(mail_id);
470
471        if let Some(entry) = entry {
472            if let Some(store) = &self.store {
473                // Save to DLQ
474                store.save_to_dlq(&entry).await?;
475                // Remove from main queue storage
476                store.remove_entry(mail_id).await?;
477                tracing::info!("Moved mail {} to dead letter queue", mail_id);
478            } else {
479                // Without storage, just keep in memory as bounced
480                tracing::warn!("Mail {} bounced but no DLQ storage available", mail_id);
481            }
482        }
483        Ok(())
484    }
485
486    /// Mark delivery as successful and remove from queue (atomic operation)
487    pub async fn mark_delivered(&self, mail_id: &MailId) -> anyhow::Result<()> {
488        // Remove from storage first
489        if let Some(store) = &self.store {
490            store.remove_entry(mail_id).await?;
491        }
492
493        // Then remove from in-memory queue
494        self.entries
495            .write()
496            .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
497            .remove(mail_id);
498        tracing::info!(
499            "Mail {} successfully delivered and removed from queue",
500            mail_id
501        );
502        Ok(())
503    }
504
505    /// Get all bounced messages (exceeded retry limit) - legacy in-memory only
506    pub fn get_bounced(&self) -> Vec<QueueEntry> {
507        let entries = match self.entries.read() {
508            Ok(guard) => guard,
509            Err(poisoned) => poisoned.into_inner(),
510        };
511        entries
512            .values()
513            .filter(|e| e.is_bounced())
514            .cloned()
515            .collect()
516    }
517
518    /// List all entries in dead letter queue
519    pub async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>> {
520        if let Some(store) = &self.store {
521            store.list_dlq().await
522        } else {
523            Ok(Vec::new())
524        }
525    }
526
527    /// Remove a mail from dead letter queue
528    pub async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
529        if let Some(store) = &self.store {
530            store.remove_from_dlq(mail_id).await?;
531        }
532        Ok(())
533    }
534
535    /// Retry a message from dead letter queue
536    pub async fn retry_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
537        if let Some(store) = &self.store {
538            // Load from DLQ
539            let dlq_entries = store.list_dlq().await?;
540            if let Some(mut entry) = dlq_entries.into_iter().find(|e| e.mail.id() == mail_id) {
541                // Reset retry count
542                entry.attempts = 0;
543                entry.next_retry = SystemTime::now();
544                entry.last_error = None;
545
546                // Save to main queue
547                store.save_entry(&entry).await?;
548                self.entries
549                    .write()
550                    .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
551                    .insert(*mail_id, entry);
552
553                // Remove from DLQ
554                store.remove_from_dlq(mail_id).await?;
555
556                tracing::info!("Retrying mail {} from dead letter queue", mail_id);
557            }
558        }
559        Ok(())
560    }
561
562    /// Remove a mail from queue (atomic operation)
563    pub async fn remove(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>> {
564        if let Some(store) = &self.store {
565            store.remove_entry(mail_id).await?;
566        }
567        Ok(self
568            .entries
569            .write()
570            .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
571            .remove(mail_id))
572    }
573
574    /// Get queue statistics
575    pub fn stats(&self) -> QueueStats {
576        let entries = match self.entries.read() {
577            Ok(guard) => guard,
578            Err(poisoned) => poisoned.into_inner(),
579        };
580        let total = entries.len();
581        let ready = entries.values().filter(|e| e.should_retry()).count();
582        let bounced = entries.values().filter(|e| e.is_bounced()).count();
583
584        QueueStats {
585            total,
586            ready,
587            bounced,
588            delayed: total - ready - bounced,
589        }
590    }
591
592    /// Get statistics for a specific priority level
593    pub fn stats_for_priority(&self, priority: Priority) -> QueueStats {
594        let entries = match self.entries.read() {
595            Ok(guard) => guard,
596            Err(poisoned) => poisoned.into_inner(),
597        };
598        let priority_entries: Vec<_> = entries
599            .values()
600            .filter(|e| e.priority == priority)
601            .collect();
602
603        let total = priority_entries.len();
604        let ready = priority_entries.iter().filter(|e| e.should_retry()).count();
605        let bounced = priority_entries.iter().filter(|e| e.is_bounced()).count();
606
607        QueueStats {
608            total,
609            ready,
610            bounced,
611            delayed: total - ready - bounced,
612        }
613    }
614
615    /// Get statistics grouped by priority
616    pub fn stats_by_priority(&self) -> HashMap<Priority, QueueStats> {
617        let mut stats_map = HashMap::new();
618
619        for &priority in Priority::all() {
620            stats_map.insert(priority, self.stats_for_priority(priority));
621        }
622
623        stats_map
624    }
625
626    /// Get all queue entries (for inspection)
627    pub fn list_all(&self) -> Vec<QueueEntry> {
628        match self.entries.read() {
629            Ok(guard) => guard.values().cloned().collect(),
630            Err(poisoned) => poisoned.into_inner().values().cloned().collect(),
631        }
632    }
633
634    /// Get queue entries for a specific priority
635    pub fn list_by_priority(&self, priority: Priority) -> Vec<QueueEntry> {
636        match self.entries.read() {
637            Ok(guard) => guard
638                .values()
639                .filter(|e| e.priority == priority)
640                .cloned()
641                .collect(),
642            Err(poisoned) => poisoned
643                .into_inner()
644                .values()
645                .filter(|e| e.priority == priority)
646                .cloned()
647                .collect(),
648        }
649    }
650}
651
652impl Default for MailQueue {
653    fn default() -> Self {
654        Self::new()
655    }
656}
657
658/// Queue statistics
659#[derive(Debug, Clone)]
660pub struct QueueStats {
661    pub total: usize,
662    pub ready: usize,
663    pub bounced: usize,
664    pub delayed: usize,
665}
666
667/// Filesystem-based queue store implementation
668pub struct FilesystemQueueStore {
669    queue_dir: PathBuf,
670    dlq_dir: PathBuf,
671    storage: Arc<dyn StorageBackend>,
672}
673
674impl FilesystemQueueStore {
675    /// Create a new filesystem queue store
676    pub fn new(base_path: impl Into<PathBuf>, storage: Arc<dyn StorageBackend>) -> Self {
677        let base_path: PathBuf = base_path.into();
678        let queue_dir = base_path.join("queue");
679        let dlq_dir = base_path.join("dlq");
680
681        Self {
682            queue_dir,
683            dlq_dir,
684            storage,
685        }
686    }
687
688    /// Ensure directories exist
689    async fn ensure_dirs(&self) -> anyhow::Result<()> {
690        tokio::fs::create_dir_all(&self.queue_dir).await?;
691        tokio::fs::create_dir_all(&self.dlq_dir).await?;
692        Ok(())
693    }
694
695    /// Get path for queue entry metadata
696    fn entry_metadata_path(&self, mail_id: &MailId) -> PathBuf {
697        self.queue_dir.join(format!("{}.json", mail_id))
698    }
699
700    /// Get path for DLQ entry metadata
701    fn dlq_metadata_path(&self, mail_id: &MailId) -> PathBuf {
702        self.dlq_dir.join(format!("{}.json", mail_id))
703    }
704}
705
706#[async_trait::async_trait]
707impl QueueStore for FilesystemQueueStore {
708    async fn save_entry(&self, entry: &QueueEntry) -> anyhow::Result<()> {
709        self.ensure_dirs().await?;
710
711        // Serialize and save metadata
712        let data = entry.to_data();
713        let json = serde_json::to_string_pretty(&data)?;
714        let metadata_path = self.entry_metadata_path(entry.mail_id());
715        tokio::fs::write(&metadata_path, json).await?;
716
717        // Save mail to storage (using message store)
718        let message_store = self.storage.message_store();
719        let mailbox_id = rusmes_storage::MailboxId::new(); // Queue mailbox
720        message_store
721            .append_message(&mailbox_id, entry.mail.clone())
722            .await?;
723
724        Ok(())
725    }
726
727    async fn load_entry(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>> {
728        let metadata_path = self.entry_metadata_path(mail_id);
729
730        // Check if file exists
731        if !tokio::fs::try_exists(&metadata_path).await? {
732            return Ok(None);
733        }
734
735        // Load metadata
736        let json = tokio::fs::read_to_string(&metadata_path).await?;
737        let data: QueueEntryData = serde_json::from_str(&json)?;
738
739        // Load mail from storage
740        let message_store = self.storage.message_store();
741        let mail_msg_id = rusmes_proto::MessageId::new(); // Would need to store this
742        if let Some(mail) = message_store.get_message(&mail_msg_id).await? {
743            Ok(Some(QueueEntry::from_data(data, mail)))
744        } else {
745            Ok(None)
746        }
747    }
748
749    async fn remove_entry(&self, mail_id: &MailId) -> anyhow::Result<()> {
750        let metadata_path = self.entry_metadata_path(mail_id);
751
752        // Remove metadata file
753        if tokio::fs::try_exists(&metadata_path).await? {
754            tokio::fs::remove_file(&metadata_path).await?;
755        }
756
757        // Note: We keep the mail in storage as it might be referenced elsewhere
758
759        Ok(())
760    }
761
762    async fn load_all_entries(&self) -> anyhow::Result<Vec<QueueEntry>> {
763        self.ensure_dirs().await?;
764
765        let mut entries = Vec::new();
766        let mut read_dir = tokio::fs::read_dir(&self.queue_dir).await?;
767
768        while let Some(entry) = read_dir.next_entry().await? {
769            let path = entry.path();
770            if path.extension().and_then(|s| s.to_str()) == Some("json") {
771                // Load metadata
772                let json = tokio::fs::read_to_string(&path).await?;
773                if let Ok(data) = serde_json::from_str::<QueueEntryData>(&json) {
774                    // Load mail from storage
775                    let message_store = self.storage.message_store();
776                    let mail_msg_id = rusmes_proto::MessageId::new(); // Would need proper mapping
777                    if let Ok(Some(mail)) = message_store.get_message(&mail_msg_id).await {
778                        entries.push(QueueEntry::from_data(data, mail));
779                    }
780                }
781            }
782        }
783
784        Ok(entries)
785    }
786
787    async fn save_to_dlq(&self, entry: &QueueEntry) -> anyhow::Result<()> {
788        self.ensure_dirs().await?;
789
790        // Serialize and save metadata to DLQ
791        let data = entry.to_data();
792        let json = serde_json::to_string_pretty(&data)?;
793        let metadata_path = self.dlq_metadata_path(entry.mail_id());
794        tokio::fs::write(&metadata_path, json).await?;
795
796        // Mail is already in storage
797
798        Ok(())
799    }
800
801    async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>> {
802        self.ensure_dirs().await?;
803
804        let mut entries = Vec::new();
805        let mut read_dir = tokio::fs::read_dir(&self.dlq_dir).await?;
806
807        while let Some(entry) = read_dir.next_entry().await? {
808            let path = entry.path();
809            if path.extension().and_then(|s| s.to_str()) == Some("json") {
810                // Load metadata
811                let json = tokio::fs::read_to_string(&path).await?;
812                if let Ok(data) = serde_json::from_str::<QueueEntryData>(&json) {
813                    // Load mail from storage
814                    let message_store = self.storage.message_store();
815                    let mail_msg_id = rusmes_proto::MessageId::new(); // Would need proper mapping
816                    if let Ok(Some(mail)) = message_store.get_message(&mail_msg_id).await {
817                        entries.push(QueueEntry::from_data(data, mail));
818                    }
819                }
820            }
821        }
822
823        Ok(entries)
824    }
825
826    async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
827        let metadata_path = self.dlq_metadata_path(mail_id);
828
829        // Remove metadata file
830        if tokio::fs::try_exists(&metadata_path).await? {
831            tokio::fs::remove_file(&metadata_path).await?;
832        }
833
834        Ok(())
835    }
836}
837
838#[cfg(test)]
839mod tests {
840    use super::*;
841    use bytes::Bytes;
842    use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
843
844    fn make_mail(sender: Option<&str>, recipients: Vec<&str>) -> Mail {
845        let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
846        Mail::new(
847            sender.and_then(|s| s.parse().ok()),
848            recipients.iter().filter_map(|r| r.parse().ok()).collect(),
849            message,
850            None,
851            None,
852        )
853    }
854
855    #[tokio::test]
856    async fn test_queue_enqueue_dequeue() {
857        let queue = MailQueue::new();
858        let mail = make_mail(Some("sender@example.com"), vec!["recipient@example.com"]);
859
860        let mail_id = *mail.id();
861        queue.enqueue(mail).await.unwrap();
862
863        let stats = queue.stats();
864        assert_eq!(stats.total, 1);
865        assert_eq!(stats.ready, 1);
866
867        queue.mark_delivered(&mail_id).await.unwrap();
868        let stats = queue.stats();
869        assert_eq!(stats.total, 0);
870    }
871
872    #[test]
873    fn test_retry_backoff() {
874        let mut entry = QueueEntry::new(make_mail(None, vec![]));
875
876        entry.calculate_next_retry();
877        assert_eq!(entry.attempts, 1);
878
879        entry.calculate_next_retry();
880        assert_eq!(entry.attempts, 2);
881
882        // After max attempts, should be bounced
883        entry.attempts = 5;
884        assert!(entry.is_bounced());
885    }
886
887    #[tokio::test]
888    async fn queue_priority_ordering() {
889        // Enqueue High, Low, Low, Normal → dequeue order: High, Normal, Low, Low
890        use crate::queue::priority::{Priority, PriorityQueue};
891        use rusmes_proto::MailId;
892
893        let mut queue = PriorityQueue::<&str>::with_default_config();
894
895        queue.enqueue(MailId::new(), "High msg", Priority::High);
896        queue.enqueue(MailId::new(), "Low msg 1", Priority::Low);
897        queue.enqueue(MailId::new(), "Low msg 2", Priority::Low);
898        queue.enqueue(MailId::new(), "Normal msg", Priority::Normal);
899
900        let (_, item1, p1) = queue.dequeue().unwrap();
901        assert_eq!(p1, Priority::High, "First dequeued should be High");
902        assert_eq!(item1, "High msg");
903
904        let (_, item2, p2) = queue.dequeue().unwrap();
905        assert_eq!(p2, Priority::Normal, "Second dequeued should be Normal");
906        assert_eq!(item2, "Normal msg");
907
908        let (_, _, p3) = queue.dequeue().unwrap();
909        assert_eq!(p3, Priority::Low, "Third dequeued should be Low");
910
911        let (_, _, p4) = queue.dequeue().unwrap();
912        assert_eq!(p4, Priority::Low, "Fourth dequeued should be Low");
913
914        assert!(queue.is_empty());
915    }
916
917    #[tokio::test]
918    async fn queue_stats_per_domain_counts() {
919        let queue = MailQueue::new();
920
921        // Enqueue 5 messages to example.com
922        for _ in 0..5 {
923            let mail = make_mail(Some("sender@x.com"), vec!["a@example.com"]);
924            queue.enqueue(mail).await.unwrap();
925        }
926
927        // Enqueue 3 messages to example.org
928        for _ in 0..3 {
929            let mail = make_mail(Some("sender@x.com"), vec!["b@example.org"]);
930            queue.enqueue(mail).await.unwrap();
931        }
932
933        let stats = queue.queue_stats_per_domain();
934        assert_eq!(
935            stats.get("example.com").copied().unwrap_or(0),
936            5,
937            "example.com should have 5 messages"
938        );
939        assert_eq!(
940            stats.get("example.org").copied().unwrap_or(0),
941            3,
942            "example.org should have 3 messages"
943        );
944    }
945}