Skip to main content

rusmes_core/queue/
core.rs

1//! Mail queue management with retry logic
2
3use crate::queue::priority::Priority;
4use rusmes_proto::{Mail, MailId};
5use rusmes_storage::StorageBackend;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::{Arc, RwLock};
10use std::time::{Duration, SystemTime};
11
12/// Serializable queue entry metadata
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct QueueEntryData {
15    pub mail_id: MailId,
16    pub attempts: u32,
17    pub max_attempts: u32,
18    #[serde(with = "systemtime_serde")]
19    pub next_retry: SystemTime,
20    pub last_error: Option<String>,
21    #[serde(default)]
22    pub priority: Priority,
23}
24
25/// Queue entry with retry information
26#[derive(Debug, Clone)]
27pub struct QueueEntry {
28    pub mail: Mail,
29    pub attempts: u32,
30    pub max_attempts: u32,
31    pub next_retry: SystemTime,
32    pub last_error: Option<String>,
33    pub priority: Priority,
34}
35
36/// Serialization helper for SystemTime
37mod systemtime_serde {
38    use serde::{Deserialize, Deserializer, Serialize, Serializer};
39    use std::time::{Duration, SystemTime, UNIX_EPOCH};
40
41    pub fn serialize<S>(time: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
42    where
43        S: Serializer,
44    {
45        let duration = time
46            .duration_since(UNIX_EPOCH)
47            .map_err(serde::ser::Error::custom)?;
48        duration.as_secs().serialize(serializer)
49    }
50
51    pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
52    where
53        D: Deserializer<'de>,
54    {
55        let secs = u64::deserialize(deserializer)?;
56        Ok(UNIX_EPOCH + Duration::from_secs(secs))
57    }
58}
59
60impl QueueEntry {
61    /// Create a new queue entry with default priority
62    pub fn new(mail: Mail) -> Self {
63        Self::new_with_priority(mail, Priority::default())
64    }
65
66    /// Create a new queue entry with specified priority
67    pub fn new_with_priority(mail: Mail, priority: Priority) -> Self {
68        Self {
69            mail,
70            attempts: 0,
71            max_attempts: 5,
72            next_retry: SystemTime::now(),
73            last_error: None,
74            priority,
75        }
76    }
77
78    /// Get the priority of this entry
79    pub fn priority(&self) -> Priority {
80        self.priority
81    }
82
83    /// Set the priority of this entry
84    pub fn set_priority(&mut self, priority: Priority) {
85        self.priority = priority;
86    }
87
88    /// Calculate next retry time with exponential backoff
89    pub fn calculate_next_retry(&mut self) {
90        let backoff_secs = 2u64.pow(self.attempts.min(10)) * 60; // 1min, 2min, 4min, 8min...
91        self.next_retry = SystemTime::now() + Duration::from_secs(backoff_secs);
92        self.attempts += 1;
93    }
94
95    /// Check if entry should be retried
96    pub fn should_retry(&self) -> bool {
97        self.attempts < self.max_attempts && SystemTime::now() >= self.next_retry
98    }
99
100    /// Check if entry has exceeded max attempts
101    pub fn is_bounced(&self) -> bool {
102        self.attempts >= self.max_attempts
103    }
104
105    /// Get mail ID
106    pub fn mail_id(&self) -> &MailId {
107        self.mail.id()
108    }
109
110    /// Convert to serializable data
111    pub fn to_data(&self) -> QueueEntryData {
112        QueueEntryData {
113            mail_id: *self.mail.id(),
114            attempts: self.attempts,
115            max_attempts: self.max_attempts,
116            next_retry: self.next_retry,
117            last_error: self.last_error.clone(),
118            priority: self.priority,
119        }
120    }
121
122    /// Create from data and mail
123    pub fn from_data(data: QueueEntryData, mail: Mail) -> Self {
124        Self {
125            mail,
126            attempts: data.attempts,
127            max_attempts: data.max_attempts,
128            next_retry: data.next_retry,
129            last_error: data.last_error,
130            priority: data.priority,
131        }
132    }
133}
134
135/// Queue storage operations
136#[async_trait::async_trait]
137pub trait QueueStore: Send + Sync {
138    /// Save a queue entry to persistent storage
139    async fn save_entry(&self, entry: &QueueEntry) -> anyhow::Result<()>;
140
141    /// Load a queue entry from persistent storage
142    async fn load_entry(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>>;
143
144    /// Remove a queue entry from persistent storage
145    async fn remove_entry(&self, mail_id: &MailId) -> anyhow::Result<()>;
146
147    /// Load all pending queue entries on startup
148    async fn load_all_entries(&self) -> anyhow::Result<Vec<QueueEntry>>;
149
150    /// Save to dead letter queue
151    async fn save_to_dlq(&self, entry: &QueueEntry) -> anyhow::Result<()>;
152
153    /// List all dead letter queue entries
154    async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>>;
155
156    /// Remove from dead letter queue
157    async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()>;
158}
159
160/// Mail queue with retry logic and priority support
161pub struct MailQueue {
162    entries: Arc<RwLock<HashMap<MailId, QueueEntry>>>,
163    store: Option<Arc<dyn QueueStore>>,
164    priority_config: Arc<RwLock<crate::queue::priority::PriorityConfig>>,
165}
166
167impl MailQueue {
168    /// Create a new mail queue without persistent storage
169    pub fn new() -> Self {
170        Self {
171            entries: Arc::new(RwLock::new(HashMap::new())),
172            store: None,
173            priority_config: Arc::new(RwLock::new(
174                crate::queue::priority::PriorityConfig::default(),
175            )),
176        }
177    }
178
179    /// Create a new mail queue with persistent storage
180    pub fn new_with_store(store: Arc<dyn QueueStore>) -> Self {
181        Self {
182            entries: Arc::new(RwLock::new(HashMap::new())),
183            store: Some(store),
184            priority_config: Arc::new(RwLock::new(
185                crate::queue::priority::PriorityConfig::default(),
186            )),
187        }
188    }
189
190    /// Create a new mail queue with priority configuration
191    pub fn new_with_priority_config(
192        priority_config: crate::queue::priority::PriorityConfig,
193    ) -> Self {
194        Self {
195            entries: Arc::new(RwLock::new(HashMap::new())),
196            store: None,
197            priority_config: Arc::new(RwLock::new(priority_config)),
198        }
199    }
200
201    /// Create a new mail queue with storage and priority configuration
202    pub fn new_with_store_and_priority(
203        store: Arc<dyn QueueStore>,
204        priority_config: crate::queue::priority::PriorityConfig,
205    ) -> Self {
206        Self {
207            entries: Arc::new(RwLock::new(HashMap::new())),
208            store: Some(store),
209            priority_config: Arc::new(RwLock::new(priority_config)),
210        }
211    }
212
213    /// Update priority configuration
214    pub fn update_priority_config(&self, config: crate::queue::priority::PriorityConfig) {
215        if let Ok(mut guard) = self.priority_config.write() {
216            *guard = config;
217        }
218    }
219
220    /// Get current priority configuration
221    pub fn get_priority_config(&self) -> crate::queue::priority::PriorityConfig {
222        self.priority_config
223            .read()
224            .map(|g| g.clone())
225            .unwrap_or_default()
226    }
227
228    /// Load all pending entries from storage on startup
229    pub async fn load_from_storage(&self) -> anyhow::Result<()> {
230        if let Some(store) = &self.store {
231            let entries = store.load_all_entries().await?;
232            let mut queue_entries = self
233                .entries
234                .write()
235                .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
236            for entry in entries {
237                let mail_id = *entry.mail.id();
238                tracing::info!("Loaded mail {} from storage", mail_id);
239                queue_entries.insert(mail_id, entry);
240            }
241        }
242        Ok(())
243    }
244
245    /// Enqueue a mail for delivery (atomic operation with persistence)
246    pub async fn enqueue(&self, mail: Mail) -> anyhow::Result<()> {
247        let mail_id = *mail.id();
248
249        // Calculate priority based on configuration
250        let priority = {
251            let config = self
252                .priority_config
253                .read()
254                .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
255            config.calculate_priority(&mail, 0)
256        };
257
258        let entry = QueueEntry::new_with_priority(mail, priority);
259
260        // Save to storage first (if available)
261        if let Some(store) = &self.store {
262            store.save_entry(&entry).await?;
263        }
264
265        // Then add to in-memory queue
266        self.entries
267            .write()
268            .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
269            .insert(mail_id, entry);
270        tracing::info!(
271            "Enqueued mail {} for delivery with priority {}",
272            mail_id,
273            priority
274        );
275        Ok(())
276    }
277
278    /// Enqueue a mail with explicit priority
279    pub async fn enqueue_with_priority(
280        &self,
281        mail: Mail,
282        priority: Priority,
283    ) -> anyhow::Result<()> {
284        let mail_id = *mail.id();
285        let entry = QueueEntry::new_with_priority(mail, priority);
286
287        // Save to storage first (if available)
288        if let Some(store) = &self.store {
289            store.save_entry(&entry).await?;
290        }
291
292        // Then add to in-memory queue
293        self.entries
294            .write()
295            .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
296            .insert(mail_id, entry);
297        tracing::info!(
298            "Enqueued mail {} for delivery with priority {}",
299            mail_id,
300            priority
301        );
302        Ok(())
303    }
304
305    /// Get next batch of mails ready for retry, ordered by priority
306    pub fn get_ready_for_retry(&self, limit: usize) -> Vec<QueueEntry> {
307        let entries = match self.entries.read() {
308            Ok(guard) => guard,
309            Err(poisoned) => poisoned.into_inner(),
310        };
311        let mut ready: Vec<QueueEntry> = entries
312            .values()
313            .filter(|e| e.should_retry())
314            .cloned()
315            .collect();
316
317        // Sort by priority (highest first), then by next_retry time
318        ready.sort_by(|a, b| match b.priority.cmp(&a.priority) {
319            std::cmp::Ordering::Equal => a.next_retry.cmp(&b.next_retry),
320            other => other,
321        });
322
323        ready.into_iter().take(limit).collect()
324    }
325
326    /// Get mails ready for retry for a specific priority
327    pub fn get_ready_for_retry_by_priority(
328        &self,
329        priority: Priority,
330        limit: usize,
331    ) -> Vec<QueueEntry> {
332        let entries = match self.entries.read() {
333            Ok(guard) => guard,
334            Err(poisoned) => poisoned.into_inner(),
335        };
336        entries
337            .values()
338            .filter(|e| e.should_retry() && e.priority == priority)
339            .take(limit)
340            .cloned()
341            .collect()
342    }
343
344    /// Mark delivery attempt as failed (atomic operation with persistence)
345    pub async fn mark_failed(&self, mail_id: &MailId, error: String) -> anyhow::Result<()> {
346        let (should_move_to_dlq, entry_to_save) = {
347            let mut entries = self
348                .entries
349                .write()
350                .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
351            if let Some(entry) = entries.get_mut(mail_id) {
352                entry.last_error = Some(error.clone());
353                entry.calculate_next_retry();
354
355                // Update priority based on retry attempts (priority inheritance/boost)
356                let new_priority = {
357                    let config = self
358                        .priority_config
359                        .read()
360                        .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
361                    if config.inherit_priority_on_retry {
362                        config.calculate_priority(&entry.mail, entry.attempts)
363                    } else {
364                        entry.priority
365                    }
366                };
367
368                if new_priority != entry.priority {
369                    tracing::info!(
370                        "Mail {} priority boosted from {} to {} after {} attempts",
371                        mail_id,
372                        entry.priority,
373                        new_priority,
374                        entry.attempts
375                    );
376                    entry.priority = new_priority;
377                }
378
379                if entry.is_bounced() {
380                    tracing::warn!(
381                        "Mail {} exceeded max delivery attempts ({}), moving to DLQ",
382                        mail_id,
383                        entry.max_attempts
384                    );
385                    (true, None)
386                } else {
387                    tracing::info!(
388                        "Mail {} delivery failed (attempt {}/{}), priority {}, retry at {:?}",
389                        mail_id,
390                        entry.attempts,
391                        entry.max_attempts,
392                        entry.priority,
393                        entry.next_retry
394                    );
395                    (false, Some(entry.clone()))
396                }
397            } else {
398                (false, None)
399            }
400        }; // Lock dropped here
401
402        // Update in storage (outside lock)
403        if let Some(entry) = entry_to_save {
404            if let Some(store) = &self.store {
405                store.save_entry(&entry).await?;
406            }
407        }
408
409        // Move to DLQ if bounced
410        if should_move_to_dlq {
411            self.move_to_dlq(mail_id).await?;
412        }
413
414        Ok(())
415    }
416
417    /// Move entry to dead letter queue
418    async fn move_to_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
419        let entry = self
420            .entries
421            .write()
422            .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
423            .remove(mail_id);
424
425        if let Some(entry) = entry {
426            if let Some(store) = &self.store {
427                // Save to DLQ
428                store.save_to_dlq(&entry).await?;
429                // Remove from main queue storage
430                store.remove_entry(mail_id).await?;
431                tracing::info!("Moved mail {} to dead letter queue", mail_id);
432            } else {
433                // Without storage, just keep in memory as bounced
434                tracing::warn!("Mail {} bounced but no DLQ storage available", mail_id);
435            }
436        }
437        Ok(())
438    }
439
440    /// Mark delivery as successful and remove from queue (atomic operation)
441    pub async fn mark_delivered(&self, mail_id: &MailId) -> anyhow::Result<()> {
442        // Remove from storage first
443        if let Some(store) = &self.store {
444            store.remove_entry(mail_id).await?;
445        }
446
447        // Then remove from in-memory queue
448        self.entries
449            .write()
450            .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
451            .remove(mail_id);
452        tracing::info!(
453            "Mail {} successfully delivered and removed from queue",
454            mail_id
455        );
456        Ok(())
457    }
458
459    /// Get all bounced messages (exceeded retry limit) - legacy in-memory only
460    pub fn get_bounced(&self) -> Vec<QueueEntry> {
461        let entries = match self.entries.read() {
462            Ok(guard) => guard,
463            Err(poisoned) => poisoned.into_inner(),
464        };
465        entries
466            .values()
467            .filter(|e| e.is_bounced())
468            .cloned()
469            .collect()
470    }
471
472    /// List all entries in dead letter queue
473    pub async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>> {
474        if let Some(store) = &self.store {
475            store.list_dlq().await
476        } else {
477            Ok(Vec::new())
478        }
479    }
480
481    /// Remove a mail from dead letter queue
482    pub async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
483        if let Some(store) = &self.store {
484            store.remove_from_dlq(mail_id).await?;
485        }
486        Ok(())
487    }
488
489    /// Retry a message from dead letter queue
490    pub async fn retry_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
491        if let Some(store) = &self.store {
492            // Load from DLQ
493            let dlq_entries = store.list_dlq().await?;
494            if let Some(mut entry) = dlq_entries.into_iter().find(|e| e.mail.id() == mail_id) {
495                // Reset retry count
496                entry.attempts = 0;
497                entry.next_retry = SystemTime::now();
498                entry.last_error = None;
499
500                // Save to main queue
501                store.save_entry(&entry).await?;
502                self.entries
503                    .write()
504                    .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
505                    .insert(*mail_id, entry);
506
507                // Remove from DLQ
508                store.remove_from_dlq(mail_id).await?;
509
510                tracing::info!("Retrying mail {} from dead letter queue", mail_id);
511            }
512        }
513        Ok(())
514    }
515
516    /// Remove a mail from queue (atomic operation)
517    pub async fn remove(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>> {
518        if let Some(store) = &self.store {
519            store.remove_entry(mail_id).await?;
520        }
521        Ok(self
522            .entries
523            .write()
524            .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
525            .remove(mail_id))
526    }
527
528    /// Get queue statistics
529    pub fn stats(&self) -> QueueStats {
530        let entries = match self.entries.read() {
531            Ok(guard) => guard,
532            Err(poisoned) => poisoned.into_inner(),
533        };
534        let total = entries.len();
535        let ready = entries.values().filter(|e| e.should_retry()).count();
536        let bounced = entries.values().filter(|e| e.is_bounced()).count();
537
538        QueueStats {
539            total,
540            ready,
541            bounced,
542            delayed: total - ready - bounced,
543        }
544    }
545
546    /// Get statistics for a specific priority level
547    pub fn stats_for_priority(&self, priority: Priority) -> QueueStats {
548        let entries = match self.entries.read() {
549            Ok(guard) => guard,
550            Err(poisoned) => poisoned.into_inner(),
551        };
552        let priority_entries: Vec<_> = entries
553            .values()
554            .filter(|e| e.priority == priority)
555            .collect();
556
557        let total = priority_entries.len();
558        let ready = priority_entries.iter().filter(|e| e.should_retry()).count();
559        let bounced = priority_entries.iter().filter(|e| e.is_bounced()).count();
560
561        QueueStats {
562            total,
563            ready,
564            bounced,
565            delayed: total - ready - bounced,
566        }
567    }
568
569    /// Get statistics grouped by priority
570    pub fn stats_by_priority(&self) -> HashMap<Priority, QueueStats> {
571        let mut stats_map = HashMap::new();
572
573        for &priority in Priority::all() {
574            stats_map.insert(priority, self.stats_for_priority(priority));
575        }
576
577        stats_map
578    }
579
580    /// Get all queue entries (for inspection)
581    pub fn list_all(&self) -> Vec<QueueEntry> {
582        match self.entries.read() {
583            Ok(guard) => guard.values().cloned().collect(),
584            Err(poisoned) => poisoned.into_inner().values().cloned().collect(),
585        }
586    }
587
588    /// Get queue entries for a specific priority
589    pub fn list_by_priority(&self, priority: Priority) -> Vec<QueueEntry> {
590        match self.entries.read() {
591            Ok(guard) => guard
592                .values()
593                .filter(|e| e.priority == priority)
594                .cloned()
595                .collect(),
596            Err(poisoned) => poisoned
597                .into_inner()
598                .values()
599                .filter(|e| e.priority == priority)
600                .cloned()
601                .collect(),
602        }
603    }
604}
605
606impl Default for MailQueue {
607    fn default() -> Self {
608        Self::new()
609    }
610}
611
612/// Queue statistics
613#[derive(Debug, Clone)]
614pub struct QueueStats {
615    pub total: usize,
616    pub ready: usize,
617    pub bounced: usize,
618    pub delayed: usize,
619}
620
621/// Filesystem-based queue store implementation
622pub struct FilesystemQueueStore {
623    queue_dir: PathBuf,
624    dlq_dir: PathBuf,
625    storage: Arc<dyn StorageBackend>,
626}
627
628impl FilesystemQueueStore {
629    /// Create a new filesystem queue store
630    pub fn new(base_path: impl Into<PathBuf>, storage: Arc<dyn StorageBackend>) -> Self {
631        let base_path: PathBuf = base_path.into();
632        let queue_dir = base_path.join("queue");
633        let dlq_dir = base_path.join("dlq");
634
635        Self {
636            queue_dir,
637            dlq_dir,
638            storage,
639        }
640    }
641
642    /// Ensure directories exist
643    async fn ensure_dirs(&self) -> anyhow::Result<()> {
644        tokio::fs::create_dir_all(&self.queue_dir).await?;
645        tokio::fs::create_dir_all(&self.dlq_dir).await?;
646        Ok(())
647    }
648
649    /// Get path for queue entry metadata
650    fn entry_metadata_path(&self, mail_id: &MailId) -> PathBuf {
651        self.queue_dir.join(format!("{}.json", mail_id))
652    }
653
654    /// Get path for DLQ entry metadata
655    fn dlq_metadata_path(&self, mail_id: &MailId) -> PathBuf {
656        self.dlq_dir.join(format!("{}.json", mail_id))
657    }
658}
659
660#[async_trait::async_trait]
661impl QueueStore for FilesystemQueueStore {
662    async fn save_entry(&self, entry: &QueueEntry) -> anyhow::Result<()> {
663        self.ensure_dirs().await?;
664
665        // Serialize and save metadata
666        let data = entry.to_data();
667        let json = serde_json::to_string_pretty(&data)?;
668        let metadata_path = self.entry_metadata_path(entry.mail_id());
669        tokio::fs::write(&metadata_path, json).await?;
670
671        // Save mail to storage (using message store)
672        let message_store = self.storage.message_store();
673        let mailbox_id = rusmes_storage::MailboxId::new(); // Queue mailbox
674        message_store
675            .append_message(&mailbox_id, entry.mail.clone())
676            .await?;
677
678        Ok(())
679    }
680
681    async fn load_entry(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>> {
682        let metadata_path = self.entry_metadata_path(mail_id);
683
684        // Check if file exists
685        if !tokio::fs::try_exists(&metadata_path).await? {
686            return Ok(None);
687        }
688
689        // Load metadata
690        let json = tokio::fs::read_to_string(&metadata_path).await?;
691        let data: QueueEntryData = serde_json::from_str(&json)?;
692
693        // Load mail from storage
694        let message_store = self.storage.message_store();
695        let mail_msg_id = rusmes_proto::MessageId::new(); // Would need to store this
696        if let Some(mail) = message_store.get_message(&mail_msg_id).await? {
697            Ok(Some(QueueEntry::from_data(data, mail)))
698        } else {
699            Ok(None)
700        }
701    }
702
703    async fn remove_entry(&self, mail_id: &MailId) -> anyhow::Result<()> {
704        let metadata_path = self.entry_metadata_path(mail_id);
705
706        // Remove metadata file
707        if tokio::fs::try_exists(&metadata_path).await? {
708            tokio::fs::remove_file(&metadata_path).await?;
709        }
710
711        // Note: We keep the mail in storage as it might be referenced elsewhere
712
713        Ok(())
714    }
715
716    async fn load_all_entries(&self) -> anyhow::Result<Vec<QueueEntry>> {
717        self.ensure_dirs().await?;
718
719        let mut entries = Vec::new();
720        let mut read_dir = tokio::fs::read_dir(&self.queue_dir).await?;
721
722        while let Some(entry) = read_dir.next_entry().await? {
723            let path = entry.path();
724            if path.extension().and_then(|s| s.to_str()) == Some("json") {
725                // Load metadata
726                let json = tokio::fs::read_to_string(&path).await?;
727                if let Ok(data) = serde_json::from_str::<QueueEntryData>(&json) {
728                    // Load mail from storage
729                    let message_store = self.storage.message_store();
730                    let mail_msg_id = rusmes_proto::MessageId::new(); // Would need proper mapping
731                    if let Ok(Some(mail)) = message_store.get_message(&mail_msg_id).await {
732                        entries.push(QueueEntry::from_data(data, mail));
733                    }
734                }
735            }
736        }
737
738        Ok(entries)
739    }
740
741    async fn save_to_dlq(&self, entry: &QueueEntry) -> anyhow::Result<()> {
742        self.ensure_dirs().await?;
743
744        // Serialize and save metadata to DLQ
745        let data = entry.to_data();
746        let json = serde_json::to_string_pretty(&data)?;
747        let metadata_path = self.dlq_metadata_path(entry.mail_id());
748        tokio::fs::write(&metadata_path, json).await?;
749
750        // Mail is already in storage
751
752        Ok(())
753    }
754
755    async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>> {
756        self.ensure_dirs().await?;
757
758        let mut entries = Vec::new();
759        let mut read_dir = tokio::fs::read_dir(&self.dlq_dir).await?;
760
761        while let Some(entry) = read_dir.next_entry().await? {
762            let path = entry.path();
763            if path.extension().and_then(|s| s.to_str()) == Some("json") {
764                // Load metadata
765                let json = tokio::fs::read_to_string(&path).await?;
766                if let Ok(data) = serde_json::from_str::<QueueEntryData>(&json) {
767                    // Load mail from storage
768                    let message_store = self.storage.message_store();
769                    let mail_msg_id = rusmes_proto::MessageId::new(); // Would need proper mapping
770                    if let Ok(Some(mail)) = message_store.get_message(&mail_msg_id).await {
771                        entries.push(QueueEntry::from_data(data, mail));
772                    }
773                }
774            }
775        }
776
777        Ok(entries)
778    }
779
780    async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
781        let metadata_path = self.dlq_metadata_path(mail_id);
782
783        // Remove metadata file
784        if tokio::fs::try_exists(&metadata_path).await? {
785            tokio::fs::remove_file(&metadata_path).await?;
786        }
787
788        Ok(())
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795    use bytes::Bytes;
796    use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
797
798    #[tokio::test]
799    async fn test_queue_enqueue_dequeue() {
800        let queue = MailQueue::new();
801        let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
802        let mail = Mail::new(
803            Some("sender@example.com".parse().unwrap()),
804            vec!["recipient@example.com".parse().unwrap()],
805            message,
806            None,
807            None,
808        );
809
810        let mail_id = *mail.id();
811        queue.enqueue(mail).await.unwrap();
812
813        let stats = queue.stats();
814        assert_eq!(stats.total, 1);
815        assert_eq!(stats.ready, 1);
816
817        queue.mark_delivered(&mail_id).await.unwrap();
818        let stats = queue.stats();
819        assert_eq!(stats.total, 0);
820    }
821
822    #[test]
823    fn test_retry_backoff() {
824        let mut entry = QueueEntry::new(Mail::new(
825            None,
826            vec![],
827            MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from(""))),
828            None,
829            None,
830        ));
831
832        entry.calculate_next_retry();
833        assert_eq!(entry.attempts, 1);
834
835        entry.calculate_next_retry();
836        assert_eq!(entry.attempts, 2);
837
838        // After max attempts, should be bounced
839        entry.attempts = 5;
840        assert!(entry.is_bounced());
841    }
842}