1use crate::queue::priority::Priority;
4use rusmes_proto::{Mail, MailId};
5use rusmes_storage::StorageBackend;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::{Arc, RwLock};
10use std::time::{Duration, SystemTime};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct QueueEntryData {
15 pub mail_id: MailId,
16 pub attempts: u32,
17 pub max_attempts: u32,
18 #[serde(with = "systemtime_serde")]
19 pub next_retry: SystemTime,
20 pub last_error: Option<String>,
21 #[serde(default)]
22 pub priority: Priority,
23}
24
25#[derive(Debug, Clone)]
27pub struct QueueEntry {
28 pub mail: Mail,
29 pub attempts: u32,
30 pub max_attempts: u32,
31 pub next_retry: SystemTime,
32 pub last_error: Option<String>,
33 pub priority: Priority,
34}
35
36mod systemtime_serde {
38 use serde::{Deserialize, Deserializer, Serialize, Serializer};
39 use std::time::{Duration, SystemTime, UNIX_EPOCH};
40
41 pub fn serialize<S>(time: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
42 where
43 S: Serializer,
44 {
45 let duration = time
46 .duration_since(UNIX_EPOCH)
47 .map_err(serde::ser::Error::custom)?;
48 duration.as_secs().serialize(serializer)
49 }
50
51 pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
52 where
53 D: Deserializer<'de>,
54 {
55 let secs = u64::deserialize(deserializer)?;
56 Ok(UNIX_EPOCH + Duration::from_secs(secs))
57 }
58}
59
60impl QueueEntry {
61 pub fn new(mail: Mail) -> Self {
63 Self::new_with_priority(mail, Priority::default())
64 }
65
66 pub fn new_with_priority(mail: Mail, priority: Priority) -> Self {
68 Self {
69 mail,
70 attempts: 0,
71 max_attempts: 5,
72 next_retry: SystemTime::now(),
73 last_error: None,
74 priority,
75 }
76 }
77
78 pub fn priority(&self) -> Priority {
80 self.priority
81 }
82
83 pub fn set_priority(&mut self, priority: Priority) {
85 self.priority = priority;
86 }
87
88 pub fn calculate_next_retry(&mut self) {
90 let backoff_secs = 2u64.pow(self.attempts.min(10)) * 60; self.next_retry = SystemTime::now() + Duration::from_secs(backoff_secs);
92 self.attempts += 1;
93 }
94
95 pub fn should_retry(&self) -> bool {
97 self.attempts < self.max_attempts && SystemTime::now() >= self.next_retry
98 }
99
100 pub fn is_bounced(&self) -> bool {
102 self.attempts >= self.max_attempts
103 }
104
105 pub fn mail_id(&self) -> &MailId {
107 self.mail.id()
108 }
109
110 pub fn to_data(&self) -> QueueEntryData {
112 QueueEntryData {
113 mail_id: *self.mail.id(),
114 attempts: self.attempts,
115 max_attempts: self.max_attempts,
116 next_retry: self.next_retry,
117 last_error: self.last_error.clone(),
118 priority: self.priority,
119 }
120 }
121
122 pub fn from_data(data: QueueEntryData, mail: Mail) -> Self {
124 Self {
125 mail,
126 attempts: data.attempts,
127 max_attempts: data.max_attempts,
128 next_retry: data.next_retry,
129 last_error: data.last_error,
130 priority: data.priority,
131 }
132 }
133}
134
135#[async_trait::async_trait]
137pub trait QueueStore: Send + Sync {
138 async fn save_entry(&self, entry: &QueueEntry) -> anyhow::Result<()>;
140
141 async fn load_entry(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>>;
143
144 async fn remove_entry(&self, mail_id: &MailId) -> anyhow::Result<()>;
146
147 async fn load_all_entries(&self) -> anyhow::Result<Vec<QueueEntry>>;
149
150 async fn save_to_dlq(&self, entry: &QueueEntry) -> anyhow::Result<()>;
152
153 async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>>;
155
156 async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()>;
158}
159
160pub struct MailQueue {
162 entries: Arc<RwLock<HashMap<MailId, QueueEntry>>>,
163 store: Option<Arc<dyn QueueStore>>,
164 priority_config: Arc<RwLock<crate::queue::priority::PriorityConfig>>,
165}
166
167impl MailQueue {
168 pub fn new() -> Self {
170 Self {
171 entries: Arc::new(RwLock::new(HashMap::new())),
172 store: None,
173 priority_config: Arc::new(RwLock::new(
174 crate::queue::priority::PriorityConfig::default(),
175 )),
176 }
177 }
178
179 pub fn new_with_store(store: Arc<dyn QueueStore>) -> Self {
181 Self {
182 entries: Arc::new(RwLock::new(HashMap::new())),
183 store: Some(store),
184 priority_config: Arc::new(RwLock::new(
185 crate::queue::priority::PriorityConfig::default(),
186 )),
187 }
188 }
189
190 pub fn new_with_priority_config(
192 priority_config: crate::queue::priority::PriorityConfig,
193 ) -> Self {
194 Self {
195 entries: Arc::new(RwLock::new(HashMap::new())),
196 store: None,
197 priority_config: Arc::new(RwLock::new(priority_config)),
198 }
199 }
200
201 pub fn new_with_store_and_priority(
203 store: Arc<dyn QueueStore>,
204 priority_config: crate::queue::priority::PriorityConfig,
205 ) -> Self {
206 Self {
207 entries: Arc::new(RwLock::new(HashMap::new())),
208 store: Some(store),
209 priority_config: Arc::new(RwLock::new(priority_config)),
210 }
211 }
212
213 pub fn update_priority_config(&self, config: crate::queue::priority::PriorityConfig) {
215 if let Ok(mut guard) = self.priority_config.write() {
216 *guard = config;
217 }
218 }
219
220 pub fn get_priority_config(&self) -> crate::queue::priority::PriorityConfig {
222 self.priority_config
223 .read()
224 .map(|g| g.clone())
225 .unwrap_or_default()
226 }
227
228 pub async fn load_from_storage(&self) -> anyhow::Result<()> {
230 if let Some(store) = &self.store {
231 let entries = store.load_all_entries().await?;
232 let mut queue_entries = self
233 .entries
234 .write()
235 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
236 for entry in entries {
237 let mail_id = *entry.mail.id();
238 tracing::info!("Loaded mail {} from storage", mail_id);
239 queue_entries.insert(mail_id, entry);
240 }
241 }
242 Ok(())
243 }
244
245 pub async fn enqueue(&self, mail: Mail) -> anyhow::Result<()> {
247 let mail_id = *mail.id();
248
249 let priority = {
251 let config = self
252 .priority_config
253 .read()
254 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
255 config.calculate_priority(&mail, 0)
256 };
257
258 let entry = QueueEntry::new_with_priority(mail, priority);
259
260 if let Some(store) = &self.store {
262 store.save_entry(&entry).await?;
263 }
264
265 self.entries
267 .write()
268 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
269 .insert(mail_id, entry);
270 tracing::info!(
271 "Enqueued mail {} for delivery with priority {}",
272 mail_id,
273 priority
274 );
275 Ok(())
276 }
277
278 pub async fn enqueue_with_priority(
280 &self,
281 mail: Mail,
282 priority: Priority,
283 ) -> anyhow::Result<()> {
284 let mail_id = *mail.id();
285 let entry = QueueEntry::new_with_priority(mail, priority);
286
287 if let Some(store) = &self.store {
289 store.save_entry(&entry).await?;
290 }
291
292 self.entries
294 .write()
295 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
296 .insert(mail_id, entry);
297 tracing::info!(
298 "Enqueued mail {} for delivery with priority {}",
299 mail_id,
300 priority
301 );
302 Ok(())
303 }
304
305 pub fn get_ready_for_retry(&self, limit: usize) -> Vec<QueueEntry> {
307 let entries = match self.entries.read() {
308 Ok(guard) => guard,
309 Err(poisoned) => poisoned.into_inner(),
310 };
311 let mut ready: Vec<QueueEntry> = entries
312 .values()
313 .filter(|e| e.should_retry())
314 .cloned()
315 .collect();
316
317 ready.sort_by(|a, b| match b.priority.cmp(&a.priority) {
319 std::cmp::Ordering::Equal => a.next_retry.cmp(&b.next_retry),
320 other => other,
321 });
322
323 ready.into_iter().take(limit).collect()
324 }
325
326 pub fn get_ready_for_retry_by_priority(
328 &self,
329 priority: Priority,
330 limit: usize,
331 ) -> Vec<QueueEntry> {
332 let entries = match self.entries.read() {
333 Ok(guard) => guard,
334 Err(poisoned) => poisoned.into_inner(),
335 };
336 entries
337 .values()
338 .filter(|e| e.should_retry() && e.priority == priority)
339 .take(limit)
340 .cloned()
341 .collect()
342 }
343
344 pub async fn mark_failed(&self, mail_id: &MailId, error: String) -> anyhow::Result<()> {
346 let (should_move_to_dlq, entry_to_save) = {
347 let mut entries = self
348 .entries
349 .write()
350 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
351 if let Some(entry) = entries.get_mut(mail_id) {
352 entry.last_error = Some(error.clone());
353 entry.calculate_next_retry();
354
355 let new_priority = {
357 let config = self
358 .priority_config
359 .read()
360 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
361 if config.inherit_priority_on_retry {
362 config.calculate_priority(&entry.mail, entry.attempts)
363 } else {
364 entry.priority
365 }
366 };
367
368 if new_priority != entry.priority {
369 tracing::info!(
370 "Mail {} priority boosted from {} to {} after {} attempts",
371 mail_id,
372 entry.priority,
373 new_priority,
374 entry.attempts
375 );
376 entry.priority = new_priority;
377 }
378
379 if entry.is_bounced() {
380 tracing::warn!(
381 "Mail {} exceeded max delivery attempts ({}), moving to DLQ",
382 mail_id,
383 entry.max_attempts
384 );
385 (true, None)
386 } else {
387 tracing::info!(
388 "Mail {} delivery failed (attempt {}/{}), priority {}, retry at {:?}",
389 mail_id,
390 entry.attempts,
391 entry.max_attempts,
392 entry.priority,
393 entry.next_retry
394 );
395 (false, Some(entry.clone()))
396 }
397 } else {
398 (false, None)
399 }
400 }; if let Some(entry) = entry_to_save {
404 if let Some(store) = &self.store {
405 store.save_entry(&entry).await?;
406 }
407 }
408
409 if should_move_to_dlq {
411 self.move_to_dlq(mail_id).await?;
412 }
413
414 Ok(())
415 }
416
417 async fn move_to_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
419 let entry = self
420 .entries
421 .write()
422 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
423 .remove(mail_id);
424
425 if let Some(entry) = entry {
426 if let Some(store) = &self.store {
427 store.save_to_dlq(&entry).await?;
429 store.remove_entry(mail_id).await?;
431 tracing::info!("Moved mail {} to dead letter queue", mail_id);
432 } else {
433 tracing::warn!("Mail {} bounced but no DLQ storage available", mail_id);
435 }
436 }
437 Ok(())
438 }
439
440 pub async fn mark_delivered(&self, mail_id: &MailId) -> anyhow::Result<()> {
442 if let Some(store) = &self.store {
444 store.remove_entry(mail_id).await?;
445 }
446
447 self.entries
449 .write()
450 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
451 .remove(mail_id);
452 tracing::info!(
453 "Mail {} successfully delivered and removed from queue",
454 mail_id
455 );
456 Ok(())
457 }
458
459 pub fn get_bounced(&self) -> Vec<QueueEntry> {
461 let entries = match self.entries.read() {
462 Ok(guard) => guard,
463 Err(poisoned) => poisoned.into_inner(),
464 };
465 entries
466 .values()
467 .filter(|e| e.is_bounced())
468 .cloned()
469 .collect()
470 }
471
472 pub async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>> {
474 if let Some(store) = &self.store {
475 store.list_dlq().await
476 } else {
477 Ok(Vec::new())
478 }
479 }
480
481 pub async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
483 if let Some(store) = &self.store {
484 store.remove_from_dlq(mail_id).await?;
485 }
486 Ok(())
487 }
488
489 pub async fn retry_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
491 if let Some(store) = &self.store {
492 let dlq_entries = store.list_dlq().await?;
494 if let Some(mut entry) = dlq_entries.into_iter().find(|e| e.mail.id() == mail_id) {
495 entry.attempts = 0;
497 entry.next_retry = SystemTime::now();
498 entry.last_error = None;
499
500 store.save_entry(&entry).await?;
502 self.entries
503 .write()
504 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
505 .insert(*mail_id, entry);
506
507 store.remove_from_dlq(mail_id).await?;
509
510 tracing::info!("Retrying mail {} from dead letter queue", mail_id);
511 }
512 }
513 Ok(())
514 }
515
516 pub async fn remove(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>> {
518 if let Some(store) = &self.store {
519 store.remove_entry(mail_id).await?;
520 }
521 Ok(self
522 .entries
523 .write()
524 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
525 .remove(mail_id))
526 }
527
528 pub fn stats(&self) -> QueueStats {
530 let entries = match self.entries.read() {
531 Ok(guard) => guard,
532 Err(poisoned) => poisoned.into_inner(),
533 };
534 let total = entries.len();
535 let ready = entries.values().filter(|e| e.should_retry()).count();
536 let bounced = entries.values().filter(|e| e.is_bounced()).count();
537
538 QueueStats {
539 total,
540 ready,
541 bounced,
542 delayed: total - ready - bounced,
543 }
544 }
545
546 pub fn stats_for_priority(&self, priority: Priority) -> QueueStats {
548 let entries = match self.entries.read() {
549 Ok(guard) => guard,
550 Err(poisoned) => poisoned.into_inner(),
551 };
552 let priority_entries: Vec<_> = entries
553 .values()
554 .filter(|e| e.priority == priority)
555 .collect();
556
557 let total = priority_entries.len();
558 let ready = priority_entries.iter().filter(|e| e.should_retry()).count();
559 let bounced = priority_entries.iter().filter(|e| e.is_bounced()).count();
560
561 QueueStats {
562 total,
563 ready,
564 bounced,
565 delayed: total - ready - bounced,
566 }
567 }
568
569 pub fn stats_by_priority(&self) -> HashMap<Priority, QueueStats> {
571 let mut stats_map = HashMap::new();
572
573 for &priority in Priority::all() {
574 stats_map.insert(priority, self.stats_for_priority(priority));
575 }
576
577 stats_map
578 }
579
580 pub fn list_all(&self) -> Vec<QueueEntry> {
582 match self.entries.read() {
583 Ok(guard) => guard.values().cloned().collect(),
584 Err(poisoned) => poisoned.into_inner().values().cloned().collect(),
585 }
586 }
587
588 pub fn list_by_priority(&self, priority: Priority) -> Vec<QueueEntry> {
590 match self.entries.read() {
591 Ok(guard) => guard
592 .values()
593 .filter(|e| e.priority == priority)
594 .cloned()
595 .collect(),
596 Err(poisoned) => poisoned
597 .into_inner()
598 .values()
599 .filter(|e| e.priority == priority)
600 .cloned()
601 .collect(),
602 }
603 }
604}
605
606impl Default for MailQueue {
607 fn default() -> Self {
608 Self::new()
609 }
610}
611
612#[derive(Debug, Clone)]
614pub struct QueueStats {
615 pub total: usize,
616 pub ready: usize,
617 pub bounced: usize,
618 pub delayed: usize,
619}
620
621pub struct FilesystemQueueStore {
623 queue_dir: PathBuf,
624 dlq_dir: PathBuf,
625 storage: Arc<dyn StorageBackend>,
626}
627
628impl FilesystemQueueStore {
629 pub fn new(base_path: impl Into<PathBuf>, storage: Arc<dyn StorageBackend>) -> Self {
631 let base_path: PathBuf = base_path.into();
632 let queue_dir = base_path.join("queue");
633 let dlq_dir = base_path.join("dlq");
634
635 Self {
636 queue_dir,
637 dlq_dir,
638 storage,
639 }
640 }
641
642 async fn ensure_dirs(&self) -> anyhow::Result<()> {
644 tokio::fs::create_dir_all(&self.queue_dir).await?;
645 tokio::fs::create_dir_all(&self.dlq_dir).await?;
646 Ok(())
647 }
648
649 fn entry_metadata_path(&self, mail_id: &MailId) -> PathBuf {
651 self.queue_dir.join(format!("{}.json", mail_id))
652 }
653
654 fn dlq_metadata_path(&self, mail_id: &MailId) -> PathBuf {
656 self.dlq_dir.join(format!("{}.json", mail_id))
657 }
658}
659
660#[async_trait::async_trait]
661impl QueueStore for FilesystemQueueStore {
662 async fn save_entry(&self, entry: &QueueEntry) -> anyhow::Result<()> {
663 self.ensure_dirs().await?;
664
665 let data = entry.to_data();
667 let json = serde_json::to_string_pretty(&data)?;
668 let metadata_path = self.entry_metadata_path(entry.mail_id());
669 tokio::fs::write(&metadata_path, json).await?;
670
671 let message_store = self.storage.message_store();
673 let mailbox_id = rusmes_storage::MailboxId::new(); message_store
675 .append_message(&mailbox_id, entry.mail.clone())
676 .await?;
677
678 Ok(())
679 }
680
681 async fn load_entry(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>> {
682 let metadata_path = self.entry_metadata_path(mail_id);
683
684 if !tokio::fs::try_exists(&metadata_path).await? {
686 return Ok(None);
687 }
688
689 let json = tokio::fs::read_to_string(&metadata_path).await?;
691 let data: QueueEntryData = serde_json::from_str(&json)?;
692
693 let message_store = self.storage.message_store();
695 let mail_msg_id = rusmes_proto::MessageId::new(); if let Some(mail) = message_store.get_message(&mail_msg_id).await? {
697 Ok(Some(QueueEntry::from_data(data, mail)))
698 } else {
699 Ok(None)
700 }
701 }
702
703 async fn remove_entry(&self, mail_id: &MailId) -> anyhow::Result<()> {
704 let metadata_path = self.entry_metadata_path(mail_id);
705
706 if tokio::fs::try_exists(&metadata_path).await? {
708 tokio::fs::remove_file(&metadata_path).await?;
709 }
710
711 Ok(())
714 }
715
716 async fn load_all_entries(&self) -> anyhow::Result<Vec<QueueEntry>> {
717 self.ensure_dirs().await?;
718
719 let mut entries = Vec::new();
720 let mut read_dir = tokio::fs::read_dir(&self.queue_dir).await?;
721
722 while let Some(entry) = read_dir.next_entry().await? {
723 let path = entry.path();
724 if path.extension().and_then(|s| s.to_str()) == Some("json") {
725 let json = tokio::fs::read_to_string(&path).await?;
727 if let Ok(data) = serde_json::from_str::<QueueEntryData>(&json) {
728 let message_store = self.storage.message_store();
730 let mail_msg_id = rusmes_proto::MessageId::new(); if let Ok(Some(mail)) = message_store.get_message(&mail_msg_id).await {
732 entries.push(QueueEntry::from_data(data, mail));
733 }
734 }
735 }
736 }
737
738 Ok(entries)
739 }
740
741 async fn save_to_dlq(&self, entry: &QueueEntry) -> anyhow::Result<()> {
742 self.ensure_dirs().await?;
743
744 let data = entry.to_data();
746 let json = serde_json::to_string_pretty(&data)?;
747 let metadata_path = self.dlq_metadata_path(entry.mail_id());
748 tokio::fs::write(&metadata_path, json).await?;
749
750 Ok(())
753 }
754
755 async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>> {
756 self.ensure_dirs().await?;
757
758 let mut entries = Vec::new();
759 let mut read_dir = tokio::fs::read_dir(&self.dlq_dir).await?;
760
761 while let Some(entry) = read_dir.next_entry().await? {
762 let path = entry.path();
763 if path.extension().and_then(|s| s.to_str()) == Some("json") {
764 let json = tokio::fs::read_to_string(&path).await?;
766 if let Ok(data) = serde_json::from_str::<QueueEntryData>(&json) {
767 let message_store = self.storage.message_store();
769 let mail_msg_id = rusmes_proto::MessageId::new(); if let Ok(Some(mail)) = message_store.get_message(&mail_msg_id).await {
771 entries.push(QueueEntry::from_data(data, mail));
772 }
773 }
774 }
775 }
776
777 Ok(entries)
778 }
779
780 async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
781 let metadata_path = self.dlq_metadata_path(mail_id);
782
783 if tokio::fs::try_exists(&metadata_path).await? {
785 tokio::fs::remove_file(&metadata_path).await?;
786 }
787
788 Ok(())
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795 use bytes::Bytes;
796 use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
797
798 #[tokio::test]
799 async fn test_queue_enqueue_dequeue() {
800 let queue = MailQueue::new();
801 let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
802 let mail = Mail::new(
803 Some("sender@example.com".parse().unwrap()),
804 vec!["recipient@example.com".parse().unwrap()],
805 message,
806 None,
807 None,
808 );
809
810 let mail_id = *mail.id();
811 queue.enqueue(mail).await.unwrap();
812
813 let stats = queue.stats();
814 assert_eq!(stats.total, 1);
815 assert_eq!(stats.ready, 1);
816
817 queue.mark_delivered(&mail_id).await.unwrap();
818 let stats = queue.stats();
819 assert_eq!(stats.total, 0);
820 }
821
822 #[test]
823 fn test_retry_backoff() {
824 let mut entry = QueueEntry::new(Mail::new(
825 None,
826 vec![],
827 MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from(""))),
828 None,
829 None,
830 ));
831
832 entry.calculate_next_retry();
833 assert_eq!(entry.attempts, 1);
834
835 entry.calculate_next_retry();
836 assert_eq!(entry.attempts, 2);
837
838 entry.attempts = 5;
840 assert!(entry.is_bounced());
841 }
842}